Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify announce tick event test #175

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions lib/torrent/scheduler/dispatch/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ var (
// Events defines Dispatcher events.
type Events interface {
DispatcherComplete(*Dispatcher)
PeerRemoved(core.PeerID, core.InfoHash)
}

// Messages defines a subset of conn.Conn methods which Dispatcher requires to
Expand Down Expand Up @@ -444,7 +443,6 @@ func (d *Dispatcher) feed(p *peer) {
}
}
d.removePeer(p)
d.events.PeerRemoved(p.id, d.torrent.InfoHash())
}

func (d *Dispatcher) dispatch(p *peer, msg *conn.Message) error {
Expand Down
2 changes: 0 additions & 2 deletions lib/torrent/scheduler/dispatch/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ type noopEvents struct{}

func (e noopEvents) DispatcherComplete(*Dispatcher) {}

func (e noopEvents) PeerRemoved(core.PeerID, core.InfoHash) {}

func testDispatcher(config Config, clk clock.Clock, t storage.Torrent) *Dispatcher {
d, err := newDispatcher(
config,
Expand Down
13 changes: 0 additions & 13 deletions lib/torrent/scheduler/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,6 @@ func (l *liftedEventLoop) DispatcherComplete(d *dispatch.Dispatcher) {
l.send(dispatcherCompleteEvent{d})
}

func (l *liftedEventLoop) PeerRemoved(peerID core.PeerID, h core.InfoHash) {
l.send(peerRemovedEvent{peerID, h})
}

func (l *liftedEventLoop) AnnounceTick() {
l.send(announceTickEvent{})
}
Expand Down Expand Up @@ -377,15 +373,6 @@ func (e dispatcherCompleteEvent) apply(s *state) {
go s.sched.announce(ctrl.dispatcher.Digest(), ctrl.dispatcher.InfoHash(), true)
}

// peerRemovedEvent occurs when a dispatcher removes a peer with a closed
// connection. Currently is a no-op.
type peerRemovedEvent struct {
peerID core.PeerID
infoHash core.InfoHash
}

func (e peerRemovedEvent) apply(s *state) {}

// preemptionTickEvent occurs periodically to preempt unneeded conns and remove
// idle torrentControls.
type preemptionTickEvent struct{}
Expand Down
90 changes: 59 additions & 31 deletions lib/torrent/scheduler/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package scheduler

import (
"fmt"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -41,15 +43,31 @@ type mockEventLoop struct {
c chan event
}

func (l *mockEventLoop) expect(e event) {
func (l *mockEventLoop) next() (event, error) {
select {
case result := <-l.c:
require.Equal(l.t, e, result)
case e := <-l.c:
return e, nil
case <-time.After(5 * time.Second):
l.t.Fatalf("timed out waiting for %T to occur", e)
return nil, fmt.Errorf("timed out waiting for event")
}
}

// expect checks the next event is e and returns it.
func (l *mockEventLoop) expect(e event) event {
n, err := l.next()
require.NoError(l.t, err)
require.Equal(l.t, n, e)
return n
}

// expectType checks the next event is the same type as e and returns it.
func (l *mockEventLoop) expectType(e event) event {
n, err := l.next()
require.NoError(l.t, err)
require.Equal(l.t, reflect.TypeOf(e).Name(), reflect.TypeOf(n).Name())
return n
}

func (l *mockEventLoop) send(e event) bool {
l.c <- e
return true
Expand Down Expand Up @@ -164,68 +182,78 @@ func TestAnnounceTickEventSkipsFullTorrents(t *testing.T) {
},
})

full, err := state.addTorrent(_testNamespace, mocks.newTorrent(), true)
// Spin up some fake peers we can handshake against.
var peers []*core.PeerInfo
for i := 0; i < 5; i++ {
p, err := conn.NewFakePeer()
require.NoError(err)
defer p.Close()

peers = append(peers, p.PeerInfo())
}

// Announce a torrent and fully saturate its connections.
t1, err := state.addTorrent(_testNamespace, mocks.newTorrent(), true)
require.NoError(err)

info := full.dispatcher.Stat()
mocks.announceClient.EXPECT().
Announce(
t1.dispatcher.Digest(),
t1.dispatcher.InfoHash(),
false,
announceclient.V1).
Return(peers, time.Second, nil)

for i := 0; i < 5; i++ {
_, c, cleanup := conn.PipeFixture(conn.Config{}, info)
defer cleanup()
announceTickEvent{}.apply(state)

require.NoError(state.conns.AddPending(c.PeerID(), c.InfoHash(), nil))
require.NoError(state.addOutgoingConn(c, info.Bitfield(), info))
mocks.eventLoop.expectType(announceResultEvent{}).apply(state)
for range peers {
mocks.eventLoop.expectType(outgoingConnEvent{}).apply(state)
}

empty, err := state.addTorrent(_testNamespace, mocks.newTorrent(), true)
// Add a second torrent (behind t1) and announce it. The first torrent is
// full and should be skipped, instead directly announcing the second empty
// torrent.
t2, err := state.addTorrent(_testNamespace, mocks.newTorrent(), true)
require.NoError(err)

// The first torrent is full and should be skipped, announcing the empty
// torrent.
mocks.announceClient.EXPECT().
Announce(
empty.dispatcher.Digest(),
empty.dispatcher.InfoHash(),
t2.dispatcher.Digest(),
t2.dispatcher.InfoHash(),
false,
announceclient.V1).
Return(nil, time.Second, nil)

announceTickEvent{}.apply(state)

// Empty torrent announced.
mocks.eventLoop.expect(announceResultEvent{
infoHash: empty.dispatcher.InfoHash(),
infoHash: t2.dispatcher.InfoHash(),
})

// The empty torrent is pending, so keep skipping full torrent.
// t1 is still full and t2 is pending, so nothing should happen.
announceTickEvent{}.apply(state)
announceTickEvent{}.apply(state)
announceTickEvent{}.apply(state)

// Remove a connection -- torrent is no longer full.
// Close a random connection -- t1 is no longer full.
c := state.conns.ActiveConns()[0]
c.Close()
// TODO(codyg): This is ugly. Conn fixtures aren't connected to our event
// loop, so we have to manually trigger the event.
connClosedEvent{c}.apply(state)

mocks.eventLoop.expect(peerRemovedEvent{
peerID: c.PeerID(),
infoHash: c.InfoHash(),
})
mocks.eventLoop.expect(connClosedEvent{c}).apply(state)

mocks.announceClient.EXPECT().
Announce(
full.dispatcher.Digest(),
full.dispatcher.InfoHash(),
t1.dispatcher.Digest(),
t1.dispatcher.InfoHash(),
false,
announceclient.V1).
Return(nil, time.Second, nil)

announceTickEvent{}.apply(state)

// Previously full torrent announced.
// Previously full torrent (which now has open conn slot) announced.
mocks.eventLoop.expect(announceResultEvent{
infoHash: full.dispatcher.InfoHash(),
infoHash: t1.dispatcher.InfoHash(),
})
}