diff --git a/config/pubsub.go b/config/pubsub.go index ba80843005a1cf9afb6586f5bb2db0a5e3fb937b..36f9a9881d5a19e815c47b5a292eaa0433eda467 100644 --- a/config/pubsub.go +++ b/config/pubsub.go @@ -1,5 +1,24 @@ package config +const ( + // LastSeenMessagesStrategy is a strategy that calculates the TTL countdown + // based on the last time a Pubsub message is seen. This means that if a message + // is received and then seen again within the specified TTL window, it + // won't be emitted until the TTL countdown expires from the last time the + // message was seen. + LastSeenMessagesStrategy = "last-seen" + + // FirstSeenMessagesStrategy is a strategy that calculates the TTL + // countdown based on the first time a Pubsub message is seen. This means that if + // a message is received and then seen again within the specified TTL + // window, it won't be emitted. + FirstSeenMessagesStrategy = "first-seen" + + // DefaultSeenMessagesStrategy is the strategy that is used by default if + // no Pubsub.SeenMessagesStrategy is specified. + DefaultSeenMessagesStrategy = LastSeenMessagesStrategy +) + type PubsubConfig struct { // Router can be either floodsub (legacy) or gossipsub (new and // backwards compatible). @@ -12,7 +31,11 @@ type PubsubConfig struct { // Enable pubsub (--enable-pubsub-experiment) Enabled Flag `json:",omitempty"` - // SeenMessagesTTL configures the duration after which a previously seen - // message ID can be forgotten about. + // SeenMessagesTTL is a value that controls the time window within which + // duplicate messages will be identified and won't be emitted. SeenMessagesTTL *OptionalDuration `json:",omitempty"` + + // SeenMessagesStrategy is a setting that determines how the time-to-live + // (TTL) countdown for deduplicating messages is calculated. + SeenMessagesStrategy *OptionalString `json:",omitempty"` } diff --git a/core/node/groups.go b/core/node/groups.go index aa650ddf5d03d0b7bc0fcae3c1c210265980fcfa..e640feff1ab4282c9a57ae51ed4fe9132541c233 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -11,6 +11,7 @@ import ( "github.com/ipfs/go-log" "github.com/ipfs/kubo/config" pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p-pubsub/timecache" "github.com/libp2p/go-libp2p/core/peer" "github.com/ipfs/kubo/core/node/libp2p" @@ -66,6 +67,18 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { pubsub.WithSeenMessagesTTL(cfg.Pubsub.SeenMessagesTTL.WithDefault(pubsub.TimeCacheDuration)), ) + var seenMessagesStrategy timecache.Strategy + configSeenMessagesStrategy := cfg.Pubsub.SeenMessagesStrategy.WithDefault(config.DefaultSeenMessagesStrategy) + switch configSeenMessagesStrategy { + case config.LastSeenMessagesStrategy: + seenMessagesStrategy = timecache.Strategy_LastSeen + case config.FirstSeenMessagesStrategy: + seenMessagesStrategy = timecache.Strategy_FirstSeen + default: + return fx.Error(fmt.Errorf("unsupported Pubsub.SeenMessagesStrategy %q", configSeenMessagesStrategy)) + } + pubsubOptions = append(pubsubOptions, pubsub.WithSeenMessagesStrategy(seenMessagesStrategy)) + switch cfg.Pubsub.Router { case "": fallthrough diff --git a/core/node/libp2p/rcmgr_defaults.go b/core/node/libp2p/rcmgr_defaults.go index 4d578a9b650415224d5435e3430523cf04a6927a..9a3825108f392ca6471c6780ed19c50a2deff528 100644 --- a/core/node/libp2p/rcmgr_defaults.go +++ b/core/node/libp2p/rcmgr_defaults.go @@ -44,17 +44,18 @@ var noLimitIncrease = rcmgr.BaseLimitIncrease{ // This file defines implicit limit defaults used when Swarm.ResourceMgr.Enabled // createDefaultLimitConfig creates LimitConfig to pass to libp2p's resource manager. -// The defaults follow the documentation in docs/config.md. +// The defaults follow the documentation in docs/libp2p-resource-management.md. // Any changes in the logic here should be reflected there. func createDefaultLimitConfig(cfg config.SwarmConfig) (rcmgr.LimitConfig, error) { - maxMemoryDefaultString := humanize.Bytes(uint64(memory.TotalMemory()) / 4) + maxMemoryDefaultString := humanize.Bytes(uint64(memory.TotalMemory()) / 2) maxMemoryString := cfg.ResourceMgr.MaxMemory.WithDefault(maxMemoryDefaultString) maxMemory, err := humanize.ParseBytes(maxMemoryString) if err != nil { return rcmgr.LimitConfig{}, err } - numFD := cfg.ResourceMgr.MaxFileDescriptors.WithDefault(int64(fd.GetNumFDs()) / 2) + maxMemoryMB := maxMemory / (1024 * 1024) + maxFD := int(cfg.ResourceMgr.MaxFileDescriptors.WithDefault(int64(fd.GetNumFDs()) / 2)) // We want to see this message on startup, that's why we are using fmt instead of log. fmt.Printf(` @@ -65,65 +66,53 @@ Computing default go-libp2p Resource Manager limits based on: Applying any user-supplied overrides on top. Run 'ipfs swarm limit all' to see the resulting limits. -`, maxMemoryString, numFD) +`, maxMemoryString, maxFD) + + // At least as of 2023-01-25, it's possible to open a connection that + // doesn't ask for any memory usage with the libp2p Resource Manager/Accountant + // (see https://github.com/libp2p/go-libp2p/issues/2010#issuecomment-1404280736). + // As a result, we can't curretly rely on Memory limits to full protect us. + // Until https://github.com/libp2p/go-libp2p/issues/2010 is addressed, + // we take a proxy now of restricting to 1 inbound connection per MB. + // Note: this is more generous than go-libp2p's default autoscaled limits which do + // 64 connections per 1GB + // (see https://github.com/libp2p/go-libp2p/blob/master/p2p/host/resource-manager/limit_defaults.go#L357 ). + systemConnsInbound := int(1 * maxMemoryMB) scalingLimitConfig := rcmgr.ScalingLimitConfig{ SystemBaseLimit: rcmgr.BaseLimit{ Memory: int64(maxMemory), - FD: int(numFD), + FD: maxFD, // By default, we just limit connections on the inbound side. Conns: bigEnough, - ConnsInbound: rcmgr.DefaultLimits.SystemBaseLimit.ConnsInbound, // same as libp2p default + ConnsInbound: systemConnsInbound, ConnsOutbound: bigEnough, - // We limit streams since they not only take up memory and CPU. - // The Memory limit protects us on the memory side, - // but a StreamsInbound limit helps protect against unbound CPU consumption from stream processing. Streams: bigEnough, - StreamsInbound: rcmgr.DefaultLimits.SystemBaseLimit.StreamsInbound, + StreamsInbound: bigEnough, StreamsOutbound: bigEnough, }, - // Most limits don't see an increase because they're already infinite/bigEnough or at their max value. - // The values that should scale based on the amount of memory allocated to libp2p need to increase accordingly. - SystemLimitIncrease: rcmgr.BaseLimitIncrease{ - Memory: 0, - FDFraction: 0, - - Conns: 0, - ConnsInbound: rcmgr.DefaultLimits.SystemLimitIncrease.ConnsInbound, - ConnsOutbound: 0, - - Streams: 0, - StreamsInbound: rcmgr.DefaultLimits.SystemLimitIncrease.StreamsInbound, - StreamsOutbound: 0, - }, + SystemLimitIncrease: noLimitIncrease, + // Transient connections won't cause any memory to accounted for by the resource manager. + // Only established connections do. + // As a result, we can't rely on System.Memory to protect us from a bunch of transient connection being opened. + // We limit the same values as the System scope, but only allow the Transient scope to take 25% of what is allowed for the System scope. TransientBaseLimit: rcmgr.BaseLimit{ - Memory: rcmgr.DefaultLimits.TransientBaseLimit.Memory, - FD: rcmgr.DefaultLimits.TransientBaseLimit.FD, + Memory: int64(maxMemory / 4), + FD: maxFD / 4, Conns: bigEnough, - ConnsInbound: rcmgr.DefaultLimits.TransientBaseLimit.ConnsInbound, + ConnsInbound: systemConnsInbound / 4, ConnsOutbound: bigEnough, Streams: bigEnough, - StreamsInbound: rcmgr.DefaultLimits.TransientBaseLimit.StreamsInbound, + StreamsInbound: bigEnough, StreamsOutbound: bigEnough, }, - TransientLimitIncrease: rcmgr.BaseLimitIncrease{ - Memory: rcmgr.DefaultLimits.TransientLimitIncrease.Memory, - FDFraction: rcmgr.DefaultLimits.TransientLimitIncrease.FDFraction, - - Conns: 0, - ConnsInbound: rcmgr.DefaultLimits.TransientLimitIncrease.ConnsInbound, - ConnsOutbound: 0, - - Streams: 0, - StreamsInbound: rcmgr.DefaultLimits.TransientLimitIncrease.StreamsInbound, - StreamsOutbound: 0, - }, + TransientLimitIncrease: noLimitIncrease, // Lets get out of the way of the allow list functionality. // If someone specified "Swarm.ResourceMgr.Allowlist" we should let it go through. @@ -184,7 +173,7 @@ Run 'ipfs swarm limit all' to see the resulting limits. // Whatever limits libp2p has specifically tuned for its protocols/services we'll apply. libp2p.SetDefaultServiceLimits(&scalingLimitConfig) - defaultLimitConfig := scalingLimitConfig.Scale(int64(maxMemory), int(numFD)) + defaultLimitConfig := scalingLimitConfig.Scale(int64(maxMemory), maxFD) // Simple checks to overide autoscaling ensuring limits make sense versus the connmgr values. // There are ways to break this, but this should catch most problems already. diff --git a/docs/changelogs/v0.18.md b/docs/changelogs/v0.18.md index e51c6d3b57035bfe1627133ea3fbd92be75836ee..352077e6508b7309422a9c6c6636922348b74c3c 100644 --- a/docs/changelogs/v0.18.md +++ b/docs/changelogs/v0.18.md @@ -1,5 +1,74 @@ # Kubo changelog v0.18 +## v0.18.1 + +This release includes improvements around Pubsub message deduplication, libp2p resource management, and more. + +<!-- TOC depthfrom:3 --> + +- [Overview](#overview) +- [๐ฆ Highlights](#-highlights) + - [New default Pubsub.SeenMessagesStrategy](#new-default-pubsubseenmessagesstrategy) + - [Improving libp2p resource management integration](#improving-libp2p-resource-management-integration) +- [๐ Changelog](#-changelog) +- [๐จโ๐ฉโ๐งโ๐ฆ Contributors](#-contributors) + +<!-- /TOC --> + +### ๐ฆ Highlights + +#### New default `Pubsub.SeenMessagesStrategy` + +A new optional [`Pubsub.SeenMessagesStrategy`](../config.md#pubsubseenmessagesstrategy) configuration option has been added. + +This option allows you to choose between two different strategies for +deduplicating messages: `first-seen` and `last-seen`. + +When unset, the default strategy is `last-seen`, which calculates the +time-to-live (TTL) countdown based on the last time a message is seen. This +means that if a message is received and then seen again within the specified +TTL window based on the last time it was seen, it won't be emitted. + +If you prefer the old behavior, which calculates the TTL countdown based on the +first time a message is seen, you can set `Pubsub.SeenMessagesStrategy` to +`first-seen`. + +#### Improving libp2p resource management integration + +This builds on the default protection nodes get against DoS (resource exhaustion) and eclipse attacks +with the [go-libp2p Network Resource Manager/Accountant](https://github.com/ipfs/kubo/blob/master/docs/libp2p-resource-management.md) +that was fine-tuned in [Kubo 0.18](https://github.com/ipfs/kubo/blob/biglep/resource-manager-example-of-what-want/docs/changelogs/v0.18.md#improving-libp2p-resource-management-integration). + +Adding default hard-limits from the Resource Manager/Accountant after the fact is tricky, +and some additional improvements have been made to improve the [computed defaults](https://github.com/ipfs/kubo/blob/master/docs/libp2p-resource-management.md#computed-default-limits). +As much as possible, the aim is for a user to only think about how much memory they want to bound libp2p to, +and not need to think about translating that to hard numbers for connections, streams, etc. +More updates are likely in future Kubo releases, but with this release: +1. ``System.StreamsInbound`` is no longer bounded directly +2. ``System.ConnsInbound``, ``Transient.Memory``, ``Transiet.ConnsInbound`` have higher default computed values. + +### Changelog + +<details><summary>Full Changelog</summary> + +- github.com/ipfs/kubo: + - Add overview section + - Adjust inbound connection limits depending on memory. + - feat: Pubsub.SeenMessagesStrategy (#9543) ([ipfs/kubo#9543](https://github.com/ipfs/kubo/pull/9543)) + - chore: update version +- github.com/libp2p/go-libp2p-pubsub (v0.8.2 -> v0.8.3): + - feat: expire messages from the cache based on last seen time (#513) ([libp2p/go-libp2p-pubsub#513](https://github.com/libp2p/go-libp2p-pubsub/pull/513)) + +</details> + +### Contributors + +| Contributor | Commits | Lines ยฑ | Files Changed | +|-------------|---------|---------|---------------| +| Mohsin Zaidi | 2 | +511/-55 | 12 | +| Antonio Navarro Perez | 2 | +57/-57 | 5 | +| galargh | 1 | +1/-1 | 1 | + ## v0.18.0 ### Overview @@ -8,22 +77,22 @@ Below is an outline of all that is in this release, so you get a sense of all th <!-- TOC depthfrom:3 --> -- [Overview](#overview) -- [๐ฆ Highlights](#-highlights) - - [Content routing](#content-routing) + - [Overview](#overview) + - [๐ฆ Highlights](#-highlights) + - [Content routing](#content-routing) - [Default InterPlanetary Network Indexer](#default-interplanetary-network-indexer) - [Increase provider record republish interval and expiration](#increase-provider-record-republish-interval-and-expiration) - - [Gateways](#gateways) - - [DAG-JSON and DAG-CBOR response formats](#dag-json-and-dag-cbor-response-formats) + - [Gateways](#gateways) + - [(DAG-)JSON and (DAG-)CBOR response formats](#dag-json-and-dag-cbor-response-formats) - [๐ Fast directory listings with DAG sizes](#-fast-directory-listings-with-dag-sizes) - - [QUIC and WebTransport](#quic-and-webtransport) + - [QUIC and WebTransport](#quic-and-webtransport) - [WebTransport enabled by default](#webtransport-enabled-by-default) - [QUIC and WebTransport share a single port](#quic-and-webtransport-share-a-single-port) - [Differentiating QUIC versions](#differentiating-quic-versions) - [QUICv1 and WebTransport config migration](#quicv1-and-webtransport-config-migration) - - [Improving libp2p resource management integration](#improving-libp2p-resource-management-integration) -- [๐ Changelog](#-changelog) -- [๐จโ๐ฉโ๐งโ๐ฆ Contributors](#-contributors) + - [Improving libp2p resource management integration](#improving-libp2p-resource-management-integration) + - [๐ Changelog](#-changelog) + - [๐จโ๐ฉโ๐งโ๐ฆ Contributors](#-contributors) <!-- /TOC --> diff --git a/docs/config.md b/docs/config.md index da919d84450854927fa4be6c427052d675d3e476..995872c4f37f6344f3d9f16069400d431f0b4d8c 100644 --- a/docs/config.md +++ b/docs/config.md @@ -100,6 +100,7 @@ config file at runtime. - [`Pubsub.Router`](#pubsubrouter) - [`Pubsub.DisableSigning`](#pubsubdisablesigning) - [`Pubsub.SeenMessagesTTL`](#pubsubseenmessagesttl) + - [`Pubsub.SeenMessagesStrategy`](#pubsubseenmessagesstrategy) - [`Peering`](#peering) - [`Peering.Peers`](#peeringpeers) - [`Reprovider`](#reprovider) @@ -1206,8 +1207,8 @@ Type: `bool` ### `Pubsub.SeenMessagesTTL` -Configures the duration after which a previously seen Pubsub Message ID can be -forgotten about. +Controls the time window within which duplicate messages, identified by Message +ID, will be identified and won't be emitted again. A smaller value for this parameter means that Pubsub messages in the cache will be garbage collected sooner, which can result in a smaller cache. At the same @@ -1223,6 +1224,29 @@ Default: see `TimeCacheDuration` from [go-libp2p-pubsub](https://github.com/libp Type: `optionalDuration` +### `Pubsub.SeenMessagesStrategy` + +Determines how the time-to-live (TTL) countdown for deduplicating Pubsub +messages is calculated. + +The Pubsub seen messages cache is a LRU cache that keeps messages for up to a +specified time duration. After this duration has elapsed, expired messages will +be purged from the cache. + +The `last-seen` cache is a sliding-window cache. Every time a message is seen +again with the SeenMessagesTTL duration, its timestamp slides forward. This +keeps frequently occurring messages cached and prevents them from being +continually propagated, especially because of issues that might increase the +number of duplicate messages in the network. + +The `first-seen` cache will store new messages and purge them after the +SeenMessagesTTL duration, even if they are seen multiple times within this +duration. + +Default: `last-seen` (see [go-libp2p-pubsub](https://github.com/libp2p/go-libp2p-pubsub)) + +Type: `optionalString` + ## `Peering` Configures the peering subsystem. The peering subsystem configures Kubo to @@ -1819,7 +1843,7 @@ This value is also used to scale the limit on various resources at various scope when the default limits (discussed in [libp2p resource management](./libp2p-resource-management.md)) are used. For example, increasing this value will increase the default limit for incoming connections. -Default: `[TOTAL_SYSTEM_MEMORY]/4` +Default: `[TOTAL_SYSTEM_MEMORY]/2` Type: `optionalBytes` #### `Swarm.ResourceMgr.MaxFileDescriptors` diff --git a/docs/examples/kubo-as-a-library/go.mod b/docs/examples/kubo-as-a-library/go.mod index edc6848315fee5ed822ee5be6ab365f70fe1878a..0a4a436d9fe9f54fda27f57d4765dc7374b37f80 100644 --- a/docs/examples/kubo-as-a-library/go.mod +++ b/docs/examples/kubo-as-a-library/go.mod @@ -38,6 +38,7 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/elastic/gosigar v0.14.2 // indirect + github.com/emirpasic/gods v1.18.1 // indirect github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5 // indirect github.com/flynn/noise v1.0.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect @@ -122,7 +123,7 @@ require ( github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect github.com/libp2p/go-libp2p-kad-dht v0.20.0 // indirect github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect - github.com/libp2p/go-libp2p-pubsub v0.8.2 // indirect + github.com/libp2p/go-libp2p-pubsub v0.8.3 // indirect github.com/libp2p/go-libp2p-pubsub-router v0.6.0 // indirect github.com/libp2p/go-libp2p-record v0.2.0 // indirect github.com/libp2p/go-libp2p-routing-helpers v0.6.0 // indirect @@ -180,7 +181,6 @@ require ( github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect - github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/exporters/jaeger v1.7.0 // indirect diff --git a/docs/examples/kubo-as-a-library/go.sum b/docs/examples/kubo-as-a-library/go.sum index fa6c022f3349cafe541267cfab596f0f3342b078..eb52ab5e864d6b491c0cb3112d98731925f1e866 100644 --- a/docs/examples/kubo-as-a-library/go.sum +++ b/docs/examples/kubo-as-a-library/go.sum @@ -198,6 +198,8 @@ github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaB github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4= github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -773,8 +775,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRj github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= -github.com/libp2p/go-libp2p-pubsub v0.8.2 h1:QLGUmkgKmwEVxVDYGsqc5t9CykOMY2Y21cXQHjR462I= -github.com/libp2p/go-libp2p-pubsub v0.8.2/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw= +github.com/libp2p/go-libp2p-pubsub v0.8.3 h1:T4+pcfcFm1K2v5oFyk68peSjVroaoM8zFygf6Y5WOww= +github.com/libp2p/go-libp2p-pubsub v0.8.3/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw= github.com/libp2p/go-libp2p-pubsub-router v0.6.0 h1:D30iKdlqDt5ZmLEYhHELCMRj8b4sFAqrUcshIUvVP/s= github.com/libp2p/go-libp2p-pubsub-router v0.6.0/go.mod h1:FY/q0/RBTKsLA7l4vqC2cbRbOvyDotg8PJQ7j8FDudE= github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA= @@ -1284,8 +1286,6 @@ github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84 github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= diff --git a/docs/libp2p-resource-management.md b/docs/libp2p-resource-management.md index d6b782da1f9258a01f0c8ba2167f386aa6f9ffa4..7924849d8913b793d35d8671a22ff727d2732645 100644 --- a/docs/libp2p-resource-management.md +++ b/docs/libp2p-resource-management.md @@ -70,8 +70,7 @@ The reason these scopes are chosen is because: Within these scopes, limits are just set on [memory](https://github.com/libp2p/go-libp2p/tree/master/p2p/host/resource-manager#memory), -[file descriptors (FD)](https://github.com/libp2p/go-libp2p/tree/master/p2p/host/resource-manager#file-descriptors), [*inbound* connections](https://github.com/libp2p/go-libp2p/tree/master/p2p/host/resource-manager#connections), -and [*inbound* streams](https://github.com/libp2p/go-libp2p/tree/master/p2p/host/resource-manager#streams). +[file descriptors (FD)](https://github.com/libp2p/go-libp2p/tree/master/p2p/host/resource-manager#file-descriptors), and [*inbound* connections](https://github.com/libp2p/go-libp2p/tree/master/p2p/host/resource-manager#connections). Limits are set based on the `Swarm.ResourceMgr.MaxMemory` and `Swarm.ResourceMgr.MaxFileDescriptors` inputs above. There are also some special cases where minimum values are enforced. @@ -89,7 +88,6 @@ These become the [active limits](#how-does-one-see-the-active-limits). While `Swarm.ResourceMgr.Limits` can be edited directly, it is also possible to use `ipfs swarm limit` command to inspect and tweak specific limits at runtime. - To see all resources that are close to hitting their respective limit: ```console diff --git a/go.mod b/go.mod index b837fa5041c8bae99d7f77a77982e4242647bea4..5120159fdfeffeed164aa27c151ef45c8a12740d 100644 --- a/go.mod +++ b/go.mod @@ -75,7 +75,7 @@ require ( github.com/libp2p/go-libp2p-http v0.4.0 github.com/libp2p/go-libp2p-kad-dht v0.20.0 github.com/libp2p/go-libp2p-kbucket v0.5.0 - github.com/libp2p/go-libp2p-pubsub v0.8.2 + github.com/libp2p/go-libp2p-pubsub v0.8.3 github.com/libp2p/go-libp2p-pubsub-router v0.6.0 github.com/libp2p/go-libp2p-record v0.2.0 github.com/libp2p/go-libp2p-routing-helpers v0.6.0 @@ -133,6 +133,7 @@ require ( github.com/dgraph-io/ristretto v0.0.2 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/elastic/gosigar v0.14.2 // indirect + github.com/emirpasic/gods v1.18.1 // indirect github.com/felixge/httpsnoop v1.0.2 // indirect github.com/flynn/noise v1.0.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect @@ -223,7 +224,6 @@ require ( github.com/whyrusleeping/cbor-gen v0.0.0-20221220214510-0333c149dec0 // indirect github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect - github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0 // indirect go.opentelemetry.io/otel/metric v0.30.0 // indirect diff --git a/go.sum b/go.sum index 6274ef85bd61c23550f39f7df48b1c33eb38acfa..e044b9a00464da7f731dbb3f61ad8ca6a32404a1 100644 --- a/go.sum +++ b/go.sum @@ -206,6 +206,8 @@ github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/ github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elgris/jsondiff v0.0.0-20160530203242-765b5c24c302 h1:QV0ZrfBLpFc2KDk+a4LJefDczXnonRwrYrQJY/9L4dA= github.com/elgris/jsondiff v0.0.0-20160530203242-765b5c24c302/go.mod h1:qBlWZqWeVx9BjvqBsnC/8RUlAYpIFmPvgROcw0n1scE= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -807,8 +809,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRj github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= -github.com/libp2p/go-libp2p-pubsub v0.8.2 h1:QLGUmkgKmwEVxVDYGsqc5t9CykOMY2Y21cXQHjR462I= -github.com/libp2p/go-libp2p-pubsub v0.8.2/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw= +github.com/libp2p/go-libp2p-pubsub v0.8.3 h1:T4+pcfcFm1K2v5oFyk68peSjVroaoM8zFygf6Y5WOww= +github.com/libp2p/go-libp2p-pubsub v0.8.3/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw= github.com/libp2p/go-libp2p-pubsub-router v0.6.0 h1:D30iKdlqDt5ZmLEYhHELCMRj8b4sFAqrUcshIUvVP/s= github.com/libp2p/go-libp2p-pubsub-router v0.6.0/go.mod h1:FY/q0/RBTKsLA7l4vqC2cbRbOvyDotg8PJQ7j8FDudE= github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA= @@ -1343,8 +1345,6 @@ github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84 github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= diff --git a/test/integration/pubsub_msg_seen_cache_test.go b/test/integration/pubsub_msg_seen_cache_test.go index 394cda5b120705b6a8836a484206cd728859d58e..7495080893dddb8210a51ab7972a1e81fa37584b 100644 --- a/test/integration/pubsub_msg_seen_cache_test.go +++ b/test/integration/pubsub_msg_seen_cache_test.go @@ -22,6 +22,7 @@ import ( "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/libp2p/go-libp2p-pubsub/timecache" "github.com/libp2p/go-libp2p/core/peer" mock "github.com/ipfs/kubo/core/mock" @@ -76,7 +77,6 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error var bootstrapNode, consumerNode, producerNode *core.IpfsNode var bootstrapPeerID, consumerPeerID, producerPeerID peer.ID - sendDupMsg := false mn := mocknet.New() bootstrapNode, err := mockNode(ctx, mn, false, "") // no need for PubSub configuration @@ -98,6 +98,12 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error t.Fatal(err) } + // Used for logging the timeline + startTime := time.Time{} + + // Used for overriding the message ID + sendMsgID := "" + // Set up the pubsub message ID generation override for the producer core.RegisterFXOptionFunc(func(info core.FXNodeInfo) ([]fx.Option, error) { var pubsubOptions []pubsub.Option @@ -105,19 +111,23 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error pubsubOptions, pubsub.WithSeenMessagesTTL(ttl), pubsub.WithMessageIdFn(func(pmsg *pubsub_pb.Message) string { - now := time.Now().Format(time.StampMilli) + now := time.Now() + if startTime.Second() == 0 { + startTime = now + } + timeElapsed := now.Sub(startTime).Seconds() msg := string(pmsg.Data) - var msgID string from, _ := peer.IDFromBytes(pmsg.From) - if (from == producerPeerID) && sendDupMsg { - msgID = "DupMsg" - t.Logf("sending [%s] with duplicate message ID at [%s]", msg, now) + var msgID string + if from == producerPeerID { + msgID = sendMsgID + t.Logf("sending [%s] with message ID [%s] at T%fs", msg, msgID, timeElapsed) } else { msgID = pubsub.DefaultMsgIdFn(pmsg) - t.Logf("sending [%s] with unique message ID at [%s]", msg, now) } return msgID }), + pubsub.WithSeenMessagesStrategy(timecache.Strategy_LastSeen), ) return append( info.FXOptions, @@ -165,8 +175,8 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error t.Fatal(err) } // Utility functions defined inline to include context in closure - now := func() string { - return time.Now().Format(time.StampMilli) + now := func() float64 { + return time.Since(startTime).Seconds() } ctr := 0 msgGen := func() string { @@ -188,57 +198,87 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error msg, err := consumerSubscription.Next(rxCtx) if shouldFind { if err != nil { - t.Logf("did not receive [%s] by [%s]", msgTxt, now()) + t.Logf("expected but did not receive [%s] at T%fs", msgTxt, now()) t.Fatal(err) } - t.Logf("received [%s] at [%s]", string(msg.Data()), now()) + t.Logf("received [%s] at T%fs", string(msg.Data()), now()) if !bytes.Equal(msg.Data(), []byte(msgTxt)) { t.Fatalf("consumed data [%s] does not match published data [%s]", string(msg.Data()), msgTxt) } } else { if err == nil { - t.Logf("received [%s] at [%s]", string(msg.Data()), now()) + t.Logf("not expected but received [%s] at T%fs", string(msg.Data()), now()) t.Fail() } - t.Logf("did not receive [%s] by [%s]", msgTxt, now()) + t.Logf("did not receive [%s] at T%fs", msgTxt, now()) } } - // Send message 1 with the message ID we're going to duplicate later - sendDupMsg = true + const MsgID1 = "MsgID1" + const MsgID2 = "MsgID2" + const MsgID3 = "MsgID3" + + // Send message 1 with the message ID we're going to duplicate + sentMsg1 := time.Now() + sendMsgID = MsgID1 msgTxt := produceMessage() - consumeMessage(msgTxt, true) // should find message + // Should find the message because it's new + consumeMessage(msgTxt, true) - // Send message 2 with the same message ID as before - sendDupMsg = true + // Send message 2 with a duplicate message ID + sendMsgID = MsgID1 msgTxt = produceMessage() - consumeMessage(msgTxt, false) // should NOT find message, because it got deduplicated (sent twice within the SeenMessagesTTL window) - - // Wait for seen cache TTL time to let seen cache entries time out - time.Sleep(ttl) + // Should NOT find message because it got deduplicated (sent 2 times within the SeenMessagesTTL window). + consumeMessage(msgTxt, false) // Send message 3 with a new message ID - // - // This extra step is necessary for testing the cache TTL because the PubSub code only garbage collects when a - // message ID was not already present in the cache. This means that message 2's cache entry, even though it has - // technically timed out, will still cause the message to be considered duplicate. When a message with a different - // ID passes through, it will be added to the cache and garbage collection will clean up message 2's entry. This is - // another bug in the pubsub/cache implementation that will be fixed once the code is refactored for this issue: - // https://github.com/libp2p/go-libp2p-pubsub/issues/502 - sendDupMsg = false + sendMsgID = MsgID2 msgTxt = produceMessage() - consumeMessage(msgTxt, true) // should find message + // Should find the message because it's new + consumeMessage(msgTxt, true) + + // Wait till just before the SeenMessagesTTL window has passed since message 1 was sent + time.Sleep(time.Until(sentMsg1.Add(ttl - 100*time.Millisecond))) + + // Send message 4 with a duplicate message ID + sendMsgID = MsgID1 + msgTxt = produceMessage() + // Should NOT find the message because it got deduplicated (sent 3 times within the SeenMessagesTTL window). This + // time, however, the expiration for the message should also get pushed out for a whole SeenMessagesTTL window since + // the default time cache now implements a sliding window algorithm. + consumeMessage(msgTxt, false) + + // Send message 5 with a duplicate message ID. This will be a second after the last attempt above since NOT finding + // a message takes a second to determine. That would put this attempt at ~1 second after the SeenMessagesTTL window + // starting at message 1 has expired. + sentMsg5 := time.Now() + sendMsgID = MsgID1 + msgTxt = produceMessage() + // Should NOT find the message, because it got deduplicated (sent 2 times since the updated SeenMessagesTTL window + // started). This time again, the expiration should get pushed out for another SeenMessagesTTL window. + consumeMessage(msgTxt, false) + + // Send message 6 with a message ID that hasn't been seen within a SeenMessagesTTL window + sendMsgID = MsgID2 + msgTxt = produceMessage() + // Should find the message since last read > SeenMessagesTTL, so it looks like a new message. + consumeMessage(msgTxt, true) + + // Sleep for a full SeenMessagesTTL window to let cache entries time out + time.Sleep(time.Until(sentMsg5.Add(ttl + 100*time.Millisecond))) - // Send message 4 with the same message ID as before - sendDupMsg = true + // Send message 7 with a duplicate message ID + sendMsgID = MsgID1 msgTxt = produceMessage() - consumeMessage(msgTxt, true) // should find message again (time since the last read > SeenMessagesTTL, so it looks like a new message). + // Should find the message this time since last read > SeenMessagesTTL, so it looks like a new message. + consumeMessage(msgTxt, true) - // Send message 5 with a new message ID + // Send message 8 with a brand new message ID // // This step is not strictly necessary, but has been added for good measure. - sendDupMsg = false + sendMsgID = MsgID3 msgTxt = produceMessage() - consumeMessage(msgTxt, true) // should find message + // Should find the message because it's new + consumeMessage(msgTxt, true) return nil } diff --git a/version.go b/version.go index fd74163038f74bc171606844d6b07e2364ee0938..b03eff4a97773de5f5ae6abbf026917836613d58 100644 --- a/version.go +++ b/version.go @@ -11,7 +11,7 @@ import ( var CurrentCommit string // CurrentVersionNumber is the current application's version literal -const CurrentVersionNumber = "0.18.0" +const CurrentVersionNumber = "0.18.1" const ApiVersion = "/kubo/" + CurrentVersionNumber + "/" //nolint