diff --git a/lib/torrent/scheduler/dispatch/dispatcher.go b/lib/torrent/scheduler/dispatch/dispatcher.go index a9456d86c..76c4cb525 100644 --- a/lib/torrent/scheduler/dispatch/dispatcher.go +++ b/lib/torrent/scheduler/dispatch/dispatcher.go @@ -45,7 +45,6 @@ var ( // Events defines Dispatcher events. type Events interface { DispatcherComplete(*Dispatcher) - PeerRemoved(core.PeerID, core.InfoHash) } // Messages defines a subset of conn.Conn methods which Dispatcher requires to @@ -444,7 +443,6 @@ func (d *Dispatcher) feed(p *peer) { } } d.removePeer(p) - d.events.PeerRemoved(p.id, d.torrent.InfoHash()) } func (d *Dispatcher) dispatch(p *peer, msg *conn.Message) error { diff --git a/lib/torrent/scheduler/dispatch/dispatcher_test.go b/lib/torrent/scheduler/dispatch/dispatcher_test.go index b3bdfc5c5..1a78ce202 100644 --- a/lib/torrent/scheduler/dispatch/dispatcher_test.go +++ b/lib/torrent/scheduler/dispatch/dispatcher_test.go @@ -102,8 +102,6 @@ type noopEvents struct{} func (e noopEvents) DispatcherComplete(*Dispatcher) {} -func (e noopEvents) PeerRemoved(core.PeerID, core.InfoHash) {} - func testDispatcher(config Config, clk clock.Clock, t storage.Torrent) *Dispatcher { d, err := newDispatcher( config, diff --git a/lib/torrent/scheduler/events.go b/lib/torrent/scheduler/events.go index 637829e0e..8f5ca2db7 100644 --- a/lib/torrent/scheduler/events.go +++ b/lib/torrent/scheduler/events.go @@ -112,10 +112,6 @@ func (l *liftedEventLoop) DispatcherComplete(d *dispatch.Dispatcher) { l.send(dispatcherCompleteEvent{d}) } -func (l *liftedEventLoop) PeerRemoved(peerID core.PeerID, h core.InfoHash) { - l.send(peerRemovedEvent{peerID, h}) -} - func (l *liftedEventLoop) AnnounceTick() { l.send(announceTickEvent{}) } @@ -377,15 +373,6 @@ func (e dispatcherCompleteEvent) apply(s *state) { go s.sched.announce(ctrl.dispatcher.Digest(), ctrl.dispatcher.InfoHash(), true) } -// peerRemovedEvent occurs when a dispatcher removes a peer with a closed -// connection. Currently is a no-op. -type peerRemovedEvent struct { - peerID core.PeerID - infoHash core.InfoHash -} - -func (e peerRemovedEvent) apply(s *state) {} - // preemptionTickEvent occurs periodically to preempt unneeded conns and remove // idle torrentControls. type preemptionTickEvent struct{} diff --git a/lib/torrent/scheduler/events_test.go b/lib/torrent/scheduler/events_test.go index 9d9045ac6..5115e3def 100644 --- a/lib/torrent/scheduler/events_test.go +++ b/lib/torrent/scheduler/events_test.go @@ -14,6 +14,8 @@ package scheduler import ( + "fmt" + "reflect" "testing" "time" @@ -41,15 +43,31 @@ type mockEventLoop struct { c chan event } -func (l *mockEventLoop) expect(e event) { +func (l *mockEventLoop) next() (event, error) { select { - case result := <-l.c: - require.Equal(l.t, e, result) + case e := <-l.c: + return e, nil case <-time.After(5 * time.Second): - l.t.Fatalf("timed out waiting for %T to occur", e) + return nil, fmt.Errorf("timed out waiting for event") } } +// expect checks the next event is e and returns it. +func (l *mockEventLoop) expect(e event) event { + n, err := l.next() + require.NoError(l.t, err) + require.Equal(l.t, n, e) + return n +} + +// expectType checks the next event is the same type as e and returns it. +func (l *mockEventLoop) expectType(e event) event { + n, err := l.next() + require.NoError(l.t, err) + require.Equal(l.t, reflect.TypeOf(e).Name(), reflect.TypeOf(n).Name()) + return n +} + func (l *mockEventLoop) send(e event) bool { l.c <- e return true @@ -164,68 +182,78 @@ func TestAnnounceTickEventSkipsFullTorrents(t *testing.T) { }, }) - full, err := state.addTorrent(_testNamespace, mocks.newTorrent(), true) + // Spin up some fake peers we can handshake against. + var peers []*core.PeerInfo + for i := 0; i < 5; i++ { + p, err := conn.NewFakePeer() + require.NoError(err) + defer p.Close() + + peers = append(peers, p.PeerInfo()) + } + + // Announce a torrent and fully saturate its connections. + t1, err := state.addTorrent(_testNamespace, mocks.newTorrent(), true) require.NoError(err) - info := full.dispatcher.Stat() + mocks.announceClient.EXPECT(). + Announce( + t1.dispatcher.Digest(), + t1.dispatcher.InfoHash(), + false, + announceclient.V1). + Return(peers, time.Second, nil) - for i := 0; i < 5; i++ { - _, c, cleanup := conn.PipeFixture(conn.Config{}, info) - defer cleanup() + announceTickEvent{}.apply(state) - require.NoError(state.conns.AddPending(c.PeerID(), c.InfoHash(), nil)) - require.NoError(state.addOutgoingConn(c, info.Bitfield(), info)) + mocks.eventLoop.expectType(announceResultEvent{}).apply(state) + for range peers { + mocks.eventLoop.expectType(outgoingConnEvent{}).apply(state) } - empty, err := state.addTorrent(_testNamespace, mocks.newTorrent(), true) + // Add a second torrent (behind t1) and announce it. The first torrent is + // full and should be skipped, instead directly announcing the second empty + // torrent. + t2, err := state.addTorrent(_testNamespace, mocks.newTorrent(), true) require.NoError(err) - // The first torrent is full and should be skipped, announcing the empty - // torrent. mocks.announceClient.EXPECT(). Announce( - empty.dispatcher.Digest(), - empty.dispatcher.InfoHash(), + t2.dispatcher.Digest(), + t2.dispatcher.InfoHash(), false, announceclient.V1). Return(nil, time.Second, nil) announceTickEvent{}.apply(state) - // Empty torrent announced. mocks.eventLoop.expect(announceResultEvent{ - infoHash: empty.dispatcher.InfoHash(), + infoHash: t2.dispatcher.InfoHash(), }) - // The empty torrent is pending, so keep skipping full torrent. + // t1 is still full and t2 is pending, so nothing should happen. announceTickEvent{}.apply(state) announceTickEvent{}.apply(state) announceTickEvent{}.apply(state) - // Remove a connection -- torrent is no longer full. + // Close a random connection -- t1 is no longer full. c := state.conns.ActiveConns()[0] c.Close() - // TODO(codyg): This is ugly. Conn fixtures aren't connected to our event - // loop, so we have to manually trigger the event. - connClosedEvent{c}.apply(state) - mocks.eventLoop.expect(peerRemovedEvent{ - peerID: c.PeerID(), - infoHash: c.InfoHash(), - }) + mocks.eventLoop.expect(connClosedEvent{c}).apply(state) mocks.announceClient.EXPECT(). Announce( - full.dispatcher.Digest(), - full.dispatcher.InfoHash(), + t1.dispatcher.Digest(), + t1.dispatcher.InfoHash(), false, announceclient.V1). Return(nil, time.Second, nil) announceTickEvent{}.apply(state) - // Previously full torrent announced. + // Previously full torrent (which now has open conn slot) announced. mocks.eventLoop.expect(announceResultEvent{ - infoHash: full.dispatcher.InfoHash(), + infoHash: t1.dispatcher.InfoHash(), }) }