Skip to content

Commit

Permalink
fix: close short-lived connections first when pruning by tag value (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
maschad committed Dec 13, 2022
1 parent 1b30f81 commit e0dc269
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 29 deletions.
42 changes: 27 additions & 15 deletions src/connection-manager/index.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
import { logger } from '@libp2p/logger'
import errCode from 'err-code'
import mergeOptions from 'merge-options'
import { LatencyMonitor, SummaryObject } from './latency-monitor.js'
import type { AbortOptions } from '@libp2p/interfaces'
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
import type { Startable } from '@libp2p/interfaces/startable'
import { codes } from '../errors.js'
import { isPeerId, PeerId } from '@libp2p/interface-peer-id'
import { setMaxListeners } from 'events'
import type { Connection, MultiaddrConnection } from '@libp2p/interface-connection'
import type { ConnectionManager, Dialer } from '@libp2p/interface-connection-manager'
import * as STATUS from '@libp2p/interface-connection/status'
import type { Metrics } from '@libp2p/interface-metrics'
import { isPeerId, PeerId } from '@libp2p/interface-peer-id'
import type { AddressSorter, PeerStore } from '@libp2p/interface-peer-store'
import { isMultiaddr, multiaddr, Multiaddr, Resolver } from '@multiformats/multiaddr'
import { PeerMap } from '@libp2p/peer-collections'
import { TimeoutController } from 'timeout-abort-controller'
import { KEEP_ALIVE } from '@libp2p/interface-peer-store/tags'
import { RateLimiterMemory } from 'rate-limiter-flexible'
import type { Metrics } from '@libp2p/interface-metrics'
import type { Upgrader } from '@libp2p/interface-transport'
import type { AbortOptions } from '@libp2p/interfaces'
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
import type { Startable } from '@libp2p/interfaces/startable'
import { logger } from '@libp2p/logger'
import { PeerMap } from '@libp2p/peer-collections'
import { isMultiaddr, multiaddr, Multiaddr, Resolver } from '@multiformats/multiaddr'
import errCode from 'err-code'
import { setMaxListeners } from 'events'
import mergeOptions from 'merge-options'
import { RateLimiterMemory } from 'rate-limiter-flexible'
import { TimeoutController } from 'timeout-abort-controller'
import { codes } from '../errors.js'
import { getPeer } from '../get-peer.js'
import { LatencyMonitor, SummaryObject } from './latency-monitor.js'

const log = logger('libp2p:connection-manager')

Expand Down Expand Up @@ -650,6 +650,18 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
return -1
}

// if the peers have an equal tag value then we want to close short-lived connections first
const connectionALifespan = a.stat.timeline.open
const connectionBLifespan = b.stat.timeline.open

if (connectionALifespan < connectionBLifespan) {
return 1
}

if (connectionALifespan > connectionBLifespan) {
return -1
}

return 0
})

Expand Down
83 changes: 69 additions & 14 deletions test/connection-manager/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
/* eslint-env mocha */

import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { createNode } from '../utils/creators/peer.js'
import { createBaseOptions } from '../utils/base-options.browser.js'
import type { Libp2pNode } from '../../src/libp2p.js'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { mockConnection, mockDuplex, mockMultiaddrConnection, mockMetrics } from '@libp2p/interface-mocks'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { CustomEvent } from '@libp2p/interfaces/events'
import type { Connection } from '@libp2p/interface-connection'
import type { Dialer } from '@libp2p/interface-connection-manager'
import { mockConnection, mockDuplex, mockMetrics, mockMultiaddrConnection } from '@libp2p/interface-mocks'
import type { PeerStore } from '@libp2p/interface-peer-store'
import { KEEP_ALIVE } from '@libp2p/interface-peer-store/tags'
import pWaitFor from 'p-wait-for'
import type { Upgrader } from '@libp2p/interface-transport'
import { CustomEvent } from '@libp2p/interfaces/events'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { multiaddr } from '@multiformats/multiaddr'
import { expect } from 'aegir/chai'
import pWaitFor from 'p-wait-for'
import sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
import type { Dialer } from '@libp2p/interface-connection-manager'
import type { Connection } from '@libp2p/interface-connection'
import type { Upgrader } from '@libp2p/interface-transport'
import type { PeerStore } from '@libp2p/interface-peer-store'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import type { Libp2pNode } from '../../src/libp2p.js'
import { createBaseOptions } from '../utils/base-options.browser.js'
import { createNode } from '../utils/creators/peer.js'

const defaultOptions = {
maxConnections: 10,
Expand Down Expand Up @@ -114,6 +114,61 @@ describe('Connection Manager', () => {
expect(lowestSpy).to.have.property('callCount', 1)
})

it('should close shortest-lived connection if the tag values are equal', async () => {
const max = 5
libp2p = await createNode({
config: createBaseOptions({
connectionManager: {
maxConnections: max,
minConnections: 2
}
}),
started: false
})

await libp2p.start()

const connectionManager = libp2p.connectionManager as DefaultConnectionManager
const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_pruneConnections')
const spies = new Map<string, sinon.SinonSpy<[], Promise<void>>>()

const createConnection = async (value: number, open: number = Date.now(), peerTag: string = 'test-tag') => {
// #TODO: Mock the connection timeline to simulate an older connection
const connection = mockConnection(mockMultiaddrConnection({ ...mockDuplex(), timeline: { open } }, await createEd25519PeerId()))
const spy = sinon.spy(connection, 'close')

// The lowest tag value will have the longest connection
spies.set(peerTag, spy)
await libp2p.peerStore.tagPeer(connection.remotePeer, peerTag, {
value
})

await connectionManager._onConnect(new CustomEvent('connection', { detail: connection }))
}

// Create one short of enough connections to iniate pruning
for (let i = 1; i < max; i++) {
const value = i * 10
await createConnection(value)
}

const value = 0 * 10
// Add a connection with the lowest tag value BUT the longest lived connection
await createConnection(value, 18000, 'longest')
// Add one more connection with the lowest tag value BUT the shortest-lived connection
await createConnection(value, Date.now(), 'shortest')

// get the lowest tagged value, but this would be also the longest lived connection
const longestLivedWithLowestTagSpy = spies.get('longest')

// Get lowest tagged connection but with a shorter-lived connection
const shortestLivedWithLowestTagSpy = spies.get('shortest')

expect(connectionManagerMaybeDisconnectOneSpy.callCount).to.equal(1)
expect(longestLivedWithLowestTagSpy).to.have.property('callCount', 0)
expect(shortestLivedWithLowestTagSpy).to.have.property('callCount', 1)
})

it('should close connection when the maximum has been reached even without tags', async () => {
const max = 5
libp2p = await createNode({
Expand Down

0 comments on commit e0dc269

Please sign in to comment.