diff --git a/core/core.go b/core/core.go index 79cc39782e90185ab52fec8e8b1460f2442fc7c5..86f51b348aabad7798a5debf8e6def9066c0140b 100644 --- a/core/core.go +++ b/core/core.go @@ -296,7 +296,7 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin n.P2P = p2p.NewP2P(n.Identity, n.PeerHost, n.Peerstore) if follow { - n.Namecache = namecache.NewNameCache(ctx, n.Namesys, n.Pinning, n.DAG, n.Blockstore) + n.Namecache = namecache.NewNameCache(ctx, n.Namesys, n.DAG) n.Namecache, err = namecache.NewPersistentCache(n.Namecache, n.Repo.Datastore()) if err != nil { return err diff --git a/dagutils/diff.go b/dagutils/diff.go index 94ab974729d127c49141d412ef6a6b73c49c7408..6a9f2d65d35c6642b2fd929c5da856d6b44fb9d5 100644 --- a/dagutils/diff.go +++ b/dagutils/diff.go @@ -99,7 +99,7 @@ func ApplyChange(ctx context.Context, ds ipld.DAGService, nd *dag.ProtoNode, cs // 1. two node's links number are greater than 0. // 2. both of two nodes are ProtoNode. // Otherwise, it compares the cid and emits a Mod change object. -func Diff(ctx context.Context, ds ipld.DAGService, a, b ipld.Node) ([]*Change, error) { +func Diff(ctx context.Context, ds ipld.NodeGetter, a, b ipld.Node) ([]*Change, error) { // Base case where both nodes are leaves, just compare // their CIDs. if len(a.Links()) == 0 && len(b.Links()) == 0 { diff --git a/namecache/namecache.go b/namecache/namecache.go index 2ecf7b7fff9f4bbdc91e4089b429a823dc76f160..458db2583437fd5c0189ba5096057dcd73ee730c 100644 --- a/namecache/namecache.go +++ b/namecache/namecache.go @@ -8,14 +8,14 @@ import ( "sync" "time" - namesys "github.com/ipfs/go-ipfs/namesys" + "github.com/ipfs/go-ipfs/core/coreapi/interface" + "github.com/ipfs/go-ipfs/dagutils" + "github.com/ipfs/go-ipfs/namesys" - uio "gx/ipfs/QmQ1JnYpnzkaurjW1yxkQxC2w3K1PorNE1nv1vaP5Le7sq/go-unixfs/io" - cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" + "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" ipld "gx/ipfs/QmRL22E4paat7ky7vx9MLpR97JHHbFPrg3ytFQw6qp1y1s/go-ipld-format" - bstore "gx/ipfs/QmS2aqUZLJp8kF1ihE5rvDGE5LvmKDPnx32w9Z1BW9xLV5/go-ipfs-blockstore" - path "gx/ipfs/QmWqh9oob7ZHQRwU5CdTqpnC8ip8BEkFNrwXRxeNo5Y7vA/go-path" - resolver "gx/ipfs/QmWqh9oob7ZHQRwU5CdTqpnC8ip8BEkFNrwXRxeNo5Y7vA/go-path/resolver" + "gx/ipfs/QmWqh9oob7ZHQRwU5CdTqpnC8ip8BEkFNrwXRxeNo5Y7vA/go-path" + dag "gx/ipfs/Qmb2UEG2TAeVrEJSjqsZF7Y2he7wRDkrdt6c3bECxwZf4k/go-merkledag" logging "gx/ipfs/QmcuXC5cxs79ro2cUuHs4HQ2bkDLJUYokwL8aivcX6HW3C/go-log" ) @@ -39,19 +39,17 @@ type NameCache interface { type nameCache struct { nsys namesys.NameSystem dag ipld.NodeGetter - bstore bstore.GCBlockstore ctx context.Context follows map[string]func() mx sync.Mutex } -func NewNameCache(ctx context.Context, nsys namesys.NameSystem, dag ipld.NodeGetter, bstore bstore.GCBlockstore) NameCache { +func NewNameCache(ctx context.Context, nsys namesys.NameSystem, dag ipld.NodeGetter) NameCache { return &nameCache{ ctx: ctx, nsys: nsys, dag: dag, - bstore: bstore, follows: make(map[string]func()), } } @@ -67,7 +65,7 @@ func (nc *nameCache) Follow(name string, prefetch bool, followInterval time.Dura } if _, ok := nc.follows[name]; ok { - return fmt.Errorf("Already following %s", name) + return fmt.Errorf("already following %s", name) } ctx, cancel := context.WithCancel(nc.ctx) @@ -88,7 +86,7 @@ func (nc *nameCache) Unfollow(name string) error { cancel, ok := nc.follows[name] if !ok { - return fmt.Errorf("Unknown name %s", name) + return fmt.Errorf("unknown name %s", name) } cancel() @@ -110,8 +108,9 @@ func (nc *nameCache) ListFollows() []string { } func (nc *nameCache) followName(ctx context.Context, name string, prefetch bool, followInterval time.Duration) { - // if cid != nil, we have prefetched data under the node - c, err := nc.resolveAndFetch(ctx, name, prefetch) + emptynode := new(dag.ProtoNode) + + c, err := nc.resolveAndUpdate(ctx, name, prefetch, emptynode.Cid()) if err != nil { log.Errorf("Error following %s: %s", name, err.Error()) } @@ -122,11 +121,7 @@ func (nc *nameCache) followName(ctx context.Context, name string, prefetch bool, for { select { case <-ticker.C: - if c != cid.Undef { - c, err = nc.resolveAndUpdate(ctx, name, c) - } else { - c, err = nc.resolveAndFetch(ctx, name, prefetch) - } + c, err = nc.resolveAndUpdate(ctx, name, prefetch, c) if err != nil { log.Errorf("Error following %s: %s", name, err.Error()) @@ -138,47 +133,55 @@ func (nc *nameCache) followName(ctx context.Context, name string, prefetch bool, } } -func (nc *nameCache) resolveAndFetch(ctx context.Context, name string, prefetch bool) (cid.Cid, error) { +func (nc *nameCache) resolveAndUpdate(ctx context.Context, name string, prefetch bool, oldcid cid.Cid) (cid.Cid, error) { ptr, err := nc.resolve(ctx, name) if err != nil { return cid.Undef, err } - if !prefetch { - return cid.Undef, nil - } - - c, err := pathToCid(ptr) + newcid, err := pathToCid(ptr) if err != nil { return cid.Undef, err } - defer nc.bstore.PinLock().Unlock() + if newcid.Equals(oldcid) || !prefetch { + return newcid, nil + } - n, err := nc.pathToNode(ctx, ptr) + oldnd, err := nc.dag.Get(ctx, oldcid) if err != nil { return cid.Undef, err } - return c, err -} - -func (nc *nameCache) resolveAndUpdate(ctx context.Context, name string, oldcid cid.Cid) (cid.Cid, error) { - ptr, err := nc.resolve(ctx, name) + newnd, err := nc.dag.Get(ctx, newcid) if err != nil { return cid.Undef, err } - newcid, err := pathToCid(ptr) + changes, err := dagutils.Diff(ctx, nc.dag, oldnd, newnd) if err != nil { return cid.Undef, err } - if newcid.Equals(oldcid) { - return oldcid, nil - } + log.Debugf("fetching changes in %s (%s -> %s)", name, oldcid, newcid) + for _, change := range changes { + if change.Type == iface.DiffRemove { + continue + } + + toFetch, err := nc.dag.Get(ctx, change.After) + if err != nil { + return cid.Undef, err + } - // TODO: handle prefetching + // just iterate over all nodes + walker := ipld.NewWalker(ctx, ipld.NewNavigableIPLDNode(toFetch, nc.dag)) + if err := walker.Iterate(func(node ipld.NavigableNode) error { + return nil + }); err != ipld.EndOfDag { + return cid.Undef, fmt.Errorf("unexpected error when prefetching followed name: %s", err) + } + } return newcid, err } @@ -196,20 +199,9 @@ func (nc *nameCache) resolve(ctx context.Context, name string) (path.Path, error log.Debugf("resolved %s to %s", name, p) - // TODO: handle prefetching - return p, nil } func pathToCid(p path.Path) (cid.Cid, error) { return cid.Decode(p.Segments()[1]) } - -func (nc *nameCache) pathToNode(ctx context.Context, p path.Path) (ipld.Node, error) { - r := &resolver.Resolver{ - DAG: nc.dag, - ResolveOnce: uio.ResolveUnixfsOnce, - } - - return r.ResolvePath(ctx, p) -}