Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collapse pending/active/capacity maps in connstate.State #173

Merged
merged 4 commits into from
Jun 6, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 118 additions & 72 deletions lib/torrent/scheduler/connstate/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
"errors"
"time"

"github.com/andres-erbsen/clock"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/torrent/networkevent"
"github.com/uber/kraken/lib/torrent/scheduler/conn"
"github.com/andres-erbsen/clock"
"go.uber.org/zap"
)

Expand All @@ -32,11 +32,27 @@ var (
ErrConnClosed = errors.New("conn is closed")
ErrInvalidActiveTransition = errors.New("conn must be pending to transition to active")
ErrTooManyMutualConns = errors.New("conn has too many mutual connections")

// This should NEVER happen.
errUnknownStatus = errors.New("invariant violation: unknown status")
)

type status int

const (
_uninit status = iota
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some comments about what each means

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think pending / active are fairly obvious in the context of this file... There's a big comment above State which describes the semantics of pending / active.

_pending
_active
)

type entry struct {
status status
conn *conn.Conn
}

type connKey struct {
peerID core.PeerID
infoHash core.InfoHash
hash core.InfoHash
peerID core.PeerID
}

type blacklistEntry struct {
Expand All @@ -52,7 +68,7 @@ func (e *blacklistEntry) Remaining(now time.Time) time.Duration {
}

// State provides connection lifecycle management and enforces connection
// limits. A connection to a peer is identified by peer id and torrent info hash.
// limits. A connection to a peer is identified by torrent info hash and peer id.
// Each connection may exist in the following states: pending, active, or
// blacklisted. Pending connections are unestablished connections which "reserve"
// connection capacity until they are done handshaking. Active connections are
Expand All @@ -65,11 +81,13 @@ type State struct {
clk clock.Clock
netevents networkevent.Producer
localPeerID core.PeerID
capacity map[core.InfoHash]int
active map[connKey]*conn.Conn
pending map[connKey]bool
blacklist map[connKey]*blacklistEntry
logger *zap.SugaredLogger

// All pending or active conns. These count towards conn capacity.
conns map[core.InfoHash]map[core.PeerID]entry

// All blacklisted conns. These do not count towards conn capacity.
blacklist map[connKey]*blacklistEntry
}

// New creates a new State.
Expand All @@ -87,11 +105,9 @@ func New(
clk: clk,
netevents: netevents,
localPeerID: localPeerID,
capacity: make(map[core.InfoHash]int),
active: make(map[connKey]*conn.Conn),
pending: make(map[connKey]bool),
blacklist: make(map[connKey]*blacklistEntry),
logger: logger,
conns: make(map[core.InfoHash]map[core.PeerID]entry),
blacklist: make(map[connKey]*blacklistEntry),
}
}

Expand All @@ -103,18 +119,30 @@ func (s *State) MaxConnsPerTorrent() int {

// ActiveConns returns a list of all active connections.
func (s *State) ActiveConns() []*conn.Conn {
conns := make([]*conn.Conn, len(s.active))
var i int
for _, c := range s.active {
conns[i] = c
i++
var active []*conn.Conn
for _, peers := range s.conns {
for _, e := range peers {
if e.status == _active {
active = append(active, e.conn)
}
}
}
return conns
return active
}

// NumActiveConns returns the total number of active connections.
func (s *State) NumActiveConns() int {
return len(s.active)
// Saturated returns true if h is at capacity and all the conns are active.
func (s *State) Saturated(h core.InfoHash) bool {
peers, ok := s.conns[h]
if !ok {
return false
}
var active int
for _, e := range peers {
if e.status == _active {
active++
}
}
return active == s.config.MaxOpenConnectionsPerTorrent
}

// Blacklist blacklists peerID/h for the configured BlacklistDuration.
Expand All @@ -124,7 +152,7 @@ func (s *State) Blacklist(peerID core.PeerID, h core.InfoHash) error {
return nil
}

k := connKey{peerID, h}
k := connKey{h, peerID}
if e, ok := s.blacklist[k]; ok && e.Blacklisted(s.clk.Now()) {
return errors.New("conn is already blacklisted")
}
Expand All @@ -140,14 +168,14 @@ func (s *State) Blacklist(peerID core.PeerID, h core.InfoHash) error {

// Blacklisted returns true if peerID/h is blacklisted.
func (s *State) Blacklisted(peerID core.PeerID, h core.InfoHash) bool {
e, ok := s.blacklist[connKey{peerID, h}]
e, ok := s.blacklist[connKey{h, peerID}]
return ok && e.Blacklisted(s.clk.Now())
}

// ClearBlacklist un-blacklists all connections for h.
func (s *State) ClearBlacklist(h core.InfoHash) {
for k := range s.blacklist {
if k.infoHash == h {
if k.hash == h {
delete(s.blacklist, k)
}
}
Expand All @@ -156,91 +184,77 @@ func (s *State) ClearBlacklist(h core.InfoHash) {
// AddPending sets the connection for peerID/h as pending and reserves capacity
// for it.
func (s *State) AddPending(peerID core.PeerID, h core.InfoHash, neighbors []core.PeerID) error {
k := connKey{peerID, h}
cap, ok := s.capacity[h]
if !ok {
cap = s.config.MaxOpenConnectionsPerTorrent
s.capacity[h] = cap
}
if cap == 0 {
if len(s.conns[h]) == s.config.MaxOpenConnectionsPerTorrent {
return ErrTorrentAtCapacity
}
if s.pending[k] {
switch s.get(h, peerID).status {
case _uninit:
if s.numMutualConns(h, neighbors) > s.config.MaxMutualConnections {
return ErrTooManyMutualConns
}
s.put(h, peerID, entry{status: _pending})
s.log("hash", h, "peer", peerID).Infof(
"Added pending conn, capacity now at %d", s.capacity(h))
return nil
case _pending:
return ErrConnAlreadyPending
}
if _, ok := s.active[k]; ok {
case _active:
return ErrConnAlreadyActive
default:
return errUnknownStatus
}
if s.numMutualConns(h, neighbors) > s.config.MaxMutualConnections {
return ErrTooManyMutualConns
}
s.pending[k] = true
s.capacity[k.infoHash]--

s.log("peer", peerID, "hash", h).Infof(
"Added pending conn, capacity now at %d", s.capacity[k.infoHash])

return nil
}

// DeletePending deletes the pending connection for peerID/h and frees capacity.
func (s *State) DeletePending(peerID core.PeerID, h core.InfoHash) {
k := connKey{peerID, h}
if !s.pending[k] {
if s.get(h, peerID).status != _pending {
return
}
delete(s.pending, k)
s.capacity[k.infoHash]++

s.log("peer", peerID, "hash", h).Infof(
"Deleted pending conn, capacity now at %d", s.capacity[k.infoHash])
s.delete(h, peerID)
s.log("hash", h, "peer", peerID).Infof(
"Deleted pending conn, capacity now at %d", s.capacity(h))
}

// MovePendingToActive sets a previously pending connection as active.
func (s *State) MovePendingToActive(c *conn.Conn) error {
if c.IsClosed() {
return ErrConnClosed
}
k := connKey{c.PeerID(), c.InfoHash()}
if !s.pending[k] {
if s.get(c.InfoHash(), c.PeerID()).status != _pending {
return ErrInvalidActiveTransition
}
delete(s.pending, k)
s.active[k] = c
s.put(c.InfoHash(), c.PeerID(), entry{status: _active, conn: c})

s.log("peer", k.peerID, "hash", k.infoHash).Info("Moved conn from pending to active")
s.netevents.Produce(networkevent.AddActiveConnEvent(
c.InfoHash(), s.localPeerID, c.PeerID()))
s.log("hash", c.InfoHash(), "peer", c.PeerID()).Info("Moved conn from pending to active")
s.netevents.Produce(networkevent.AddActiveConnEvent(c.InfoHash(), s.localPeerID, c.PeerID()))

return nil
}

// DeleteActive deletes c. No-ops if c is not an active conn.
func (s *State) DeleteActive(c *conn.Conn) {
k := connKey{c.PeerID(), c.InfoHash()}
cur, ok := s.active[k]
if !ok || cur != c {
// It is possible that some new conn shares the same connKey as the old conn,
e := s.get(c.InfoHash(), c.PeerID())
if e.status != _active {
return
}
if e.conn != c {
// It is possible that some new conn shares the same hash/peer as the old conn,
// so we need to make sure we're deleting the right one.
return
}
delete(s.active, k)
s.capacity[k.infoHash]++
s.delete(c.InfoHash(), c.PeerID())

s.log("peer", k.peerID, "hash", k.infoHash).Infof(
"Deleted active conn, capacity now at %d", s.capacity[k.infoHash])
s.log("hash", c.InfoHash(), "peer", c.PeerID()).Infof(
"Deleted active conn, capacity now at %d", s.capacity(c.InfoHash()))
s.netevents.Produce(networkevent.DropActiveConnEvent(
c.InfoHash(), s.localPeerID, c.PeerID()))

return
}

func (s *State) numMutualConns(h core.InfoHash, neighbors []core.PeerID) int {
var n int
for _, id := range neighbors {
if _, ok := s.active[connKey{id, h}]; ok {
n++
} else if _, ok := s.pending[connKey{id, h}]; ok {
e := s.get(h, id)
if e.status == _pending || e.status == _active {
n++
}
}
Expand All @@ -260,14 +274,46 @@ func (s *State) BlacklistSnapshot() []BlacklistedConn {
for k, e := range s.blacklist {
c := BlacklistedConn{
PeerID: k.peerID,
InfoHash: k.infoHash,
InfoHash: k.hash,
Remaining: e.Remaining(s.clk.Now()),
}
conns = append(conns, c)
}
return conns
}

func (s *State) get(h core.InfoHash, peerID core.PeerID) entry {
peers, ok := s.conns[h]
if !ok {
return entry{}
}
return peers[peerID]
}

func (s *State) put(h core.InfoHash, peerID core.PeerID, e entry) {
peers, ok := s.conns[h]
if !ok {
peers = make(map[core.PeerID]entry)
s.conns[h] = peers
}
peers[peerID] = e
}

func (s *State) delete(h core.InfoHash, peerID core.PeerID) {
peers, ok := s.conns[h]
if !ok {
return
}
delete(peers, peerID)
if len(peers) == 0 {
delete(s.conns, h)
}
}

func (s *State) capacity(h core.InfoHash) int {
return s.config.MaxOpenConnectionsPerTorrent - len(s.conns[h])
}

func (s *State) log(args ...interface{}) *zap.SugaredLogger {
return s.logger.With(args...)
}
34 changes: 34 additions & 0 deletions lib/torrent/scheduler/connstate/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/torrent/networkevent"
"github.com/uber/kraken/lib/torrent/scheduler/conn"
"github.com/uber/kraken/lib/torrent/storage"
)

func testState(config Config, clk clock.Clock) *State {
Expand Down Expand Up @@ -250,6 +251,39 @@ func TestStateActiveConns(t *testing.T) {
require.Empty(s.ActiveConns())
}

func TestStateSaturated(t *testing.T) {
require := require.New(t)

s := testState(Config{MaxOpenConnectionsPerTorrent: 10}, clock.New())

info := storage.TorrentInfoFixture(1, 1)

var conns []*conn.Conn
for i := 0; i < 10; i++ {
c, _, cleanup := conn.PipeFixture(conn.Config{}, info)
defer cleanup()

require.NoError(s.AddPending(c.PeerID(), info.InfoHash(), nil))
conns = append(conns, c)
}

// Pending conns do not count towards saturated.
require.False(s.Saturated(info.InfoHash()))

for i := 0; i < 9; i++ {
require.NoError(s.MovePendingToActive(conns[i]))
require.False(s.Saturated(info.InfoHash()))
}

// Adding 10th conn should mean we're saturated.
require.NoError(s.MovePendingToActive(conns[9]))
require.True(s.Saturated(info.InfoHash()))

// Removing one should mean we're no longer saturated.
s.DeleteActive(conns[5])
require.False(s.Saturated(info.InfoHash()))
}

func TestMaxMutualConns(t *testing.T) {
require := require.New(t)

Expand Down
10 changes: 0 additions & 10 deletions lib/torrent/scheduler/dispatch/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,6 @@ func (d *Dispatcher) LastWriteTime() time.Time {
return d.torrent.getLastWriteTime()
}

// NumPeers returns the number of peers connected to the dispatcher.
func (d *Dispatcher) NumPeers() int {
var n int
d.peers.Range(func(k, v interface{}) bool {
n++
return true
})
return n
}

// Empty returns true if the Dispatcher has no peers.
func (d *Dispatcher) Empty() bool {
empty := true
Expand Down
Loading