Skip to content

Commit

Permalink
Collapse pending/active/capacity maps in connstate.State (#173)
Browse files Browse the repository at this point in the history
Instead of managing three maps for pending/active/capacity (which was complex in it's own
respect), just have a single map with a status field, and then group conns by torrent. Allows us
to ask questions like "how many active conns does this torrent have?" without having to loop
through a global active conns map.

This is shown by the new Saturated function.
  • Loading branch information
codygibb authored Jun 6, 2019
1 parent c721f6c commit dd763e3
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 119 deletions.
198 changes: 120 additions & 78 deletions lib/torrent/scheduler/connstate/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
"errors"
"time"

"github.com/andres-erbsen/clock"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/torrent/networkevent"
"github.com/uber/kraken/lib/torrent/scheduler/conn"
"github.com/andres-erbsen/clock"
"go.uber.org/zap"
)

Expand All @@ -32,11 +32,29 @@ var (
ErrConnClosed = errors.New("conn is closed")
ErrInvalidActiveTransition = errors.New("conn must be pending to transition to active")
ErrTooManyMutualConns = errors.New("conn has too many mutual connections")

// This should NEVER happen.
errUnknownStatus = errors.New("invariant violation: unknown status")
)

type status int

const (
// _uninit indicates the connection is uninitialized. This is the default
// status for empty entries.
_uninit status = iota
_pending
_active
)

type entry struct {
status status
conn *conn.Conn
}

type connKey struct {
peerID core.PeerID
infoHash core.InfoHash
hash core.InfoHash
peerID core.PeerID
}

type blacklistEntry struct {
Expand All @@ -52,7 +70,7 @@ func (e *blacklistEntry) Remaining(now time.Time) time.Duration {
}

// State provides connection lifecycle management and enforces connection
// limits. A connection to a peer is identified by peer id and torrent info hash.
// limits. A connection to a peer is identified by torrent info hash and peer id.
// Each connection may exist in the following states: pending, active, or
// blacklisted. Pending connections are unestablished connections which "reserve"
// connection capacity until they are done handshaking. Active connections are
Expand All @@ -65,11 +83,13 @@ type State struct {
clk clock.Clock
netevents networkevent.Producer
localPeerID core.PeerID
capacity map[core.InfoHash]int
active map[connKey]*conn.Conn
pending map[connKey]bool
blacklist map[connKey]*blacklistEntry
logger *zap.SugaredLogger

// All pending or active conns. These count towards conn capacity.
conns map[core.InfoHash]map[core.PeerID]entry

// All blacklisted conns. These do not count towards conn capacity.
blacklist map[connKey]*blacklistEntry
}

// New creates a new State.
Expand All @@ -87,34 +107,38 @@ func New(
clk: clk,
netevents: netevents,
localPeerID: localPeerID,
capacity: make(map[core.InfoHash]int),
active: make(map[connKey]*conn.Conn),
pending: make(map[connKey]bool),
blacklist: make(map[connKey]*blacklistEntry),
logger: logger,
conns: make(map[core.InfoHash]map[core.PeerID]entry),
blacklist: make(map[connKey]*blacklistEntry),
}
}

// MaxConnsPerTorrent returns the max number of connections a torrent is
// permitted to have.
func (s *State) MaxConnsPerTorrent() int {
return s.config.MaxOpenConnectionsPerTorrent
}

// ActiveConns returns a list of all active connections.
func (s *State) ActiveConns() []*conn.Conn {
conns := make([]*conn.Conn, len(s.active))
var i int
for _, c := range s.active {
conns[i] = c
i++
var active []*conn.Conn
for _, peers := range s.conns {
for _, e := range peers {
if e.status == _active {
active = append(active, e.conn)
}
}
}
return conns
return active
}

// NumActiveConns returns the total number of active connections.
func (s *State) NumActiveConns() int {
return len(s.active)
// Saturated returns true if h is at capacity and all the conns are active.
func (s *State) Saturated(h core.InfoHash) bool {
peers, ok := s.conns[h]
if !ok {
return false
}
var active int
for _, e := range peers {
if e.status == _active {
active++
}
}
return active == s.config.MaxOpenConnectionsPerTorrent
}

// Blacklist blacklists peerID/h for the configured BlacklistDuration.
Expand All @@ -124,7 +148,7 @@ func (s *State) Blacklist(peerID core.PeerID, h core.InfoHash) error {
return nil
}

k := connKey{peerID, h}
k := connKey{h, peerID}
if e, ok := s.blacklist[k]; ok && e.Blacklisted(s.clk.Now()) {
return errors.New("conn is already blacklisted")
}
Expand All @@ -140,14 +164,14 @@ func (s *State) Blacklist(peerID core.PeerID, h core.InfoHash) error {

// Blacklisted returns true if peerID/h is blacklisted.
func (s *State) Blacklisted(peerID core.PeerID, h core.InfoHash) bool {
e, ok := s.blacklist[connKey{peerID, h}]
e, ok := s.blacklist[connKey{h, peerID}]
return ok && e.Blacklisted(s.clk.Now())
}

// ClearBlacklist un-blacklists all connections for h.
func (s *State) ClearBlacklist(h core.InfoHash) {
for k := range s.blacklist {
if k.infoHash == h {
if k.hash == h {
delete(s.blacklist, k)
}
}
Expand All @@ -156,91 +180,77 @@ func (s *State) ClearBlacklist(h core.InfoHash) {
// AddPending sets the connection for peerID/h as pending and reserves capacity
// for it.
func (s *State) AddPending(peerID core.PeerID, h core.InfoHash, neighbors []core.PeerID) error {
k := connKey{peerID, h}
cap, ok := s.capacity[h]
if !ok {
cap = s.config.MaxOpenConnectionsPerTorrent
s.capacity[h] = cap
}
if cap == 0 {
if len(s.conns[h]) == s.config.MaxOpenConnectionsPerTorrent {
return ErrTorrentAtCapacity
}
if s.pending[k] {
switch s.get(h, peerID).status {
case _uninit:
if s.numMutualConns(h, neighbors) > s.config.MaxMutualConnections {
return ErrTooManyMutualConns
}
s.put(h, peerID, entry{status: _pending})
s.log("hash", h, "peer", peerID).Infof(
"Added pending conn, capacity now at %d", s.capacity(h))
return nil
case _pending:
return ErrConnAlreadyPending
}
if _, ok := s.active[k]; ok {
case _active:
return ErrConnAlreadyActive
default:
return errUnknownStatus
}
if s.numMutualConns(h, neighbors) > s.config.MaxMutualConnections {
return ErrTooManyMutualConns
}
s.pending[k] = true
s.capacity[k.infoHash]--

s.log("peer", peerID, "hash", h).Infof(
"Added pending conn, capacity now at %d", s.capacity[k.infoHash])

return nil
}

// DeletePending deletes the pending connection for peerID/h and frees capacity.
func (s *State) DeletePending(peerID core.PeerID, h core.InfoHash) {
k := connKey{peerID, h}
if !s.pending[k] {
if s.get(h, peerID).status != _pending {
return
}
delete(s.pending, k)
s.capacity[k.infoHash]++

s.log("peer", peerID, "hash", h).Infof(
"Deleted pending conn, capacity now at %d", s.capacity[k.infoHash])
s.delete(h, peerID)
s.log("hash", h, "peer", peerID).Infof(
"Deleted pending conn, capacity now at %d", s.capacity(h))
}

// MovePendingToActive sets a previously pending connection as active.
func (s *State) MovePendingToActive(c *conn.Conn) error {
if c.IsClosed() {
return ErrConnClosed
}
k := connKey{c.PeerID(), c.InfoHash()}
if !s.pending[k] {
if s.get(c.InfoHash(), c.PeerID()).status != _pending {
return ErrInvalidActiveTransition
}
delete(s.pending, k)
s.active[k] = c
s.put(c.InfoHash(), c.PeerID(), entry{status: _active, conn: c})

s.log("peer", k.peerID, "hash", k.infoHash).Info("Moved conn from pending to active")
s.netevents.Produce(networkevent.AddActiveConnEvent(
c.InfoHash(), s.localPeerID, c.PeerID()))
s.log("hash", c.InfoHash(), "peer", c.PeerID()).Info("Moved conn from pending to active")
s.netevents.Produce(networkevent.AddActiveConnEvent(c.InfoHash(), s.localPeerID, c.PeerID()))

return nil
}

// DeleteActive deletes c. No-ops if c is not an active conn.
func (s *State) DeleteActive(c *conn.Conn) {
k := connKey{c.PeerID(), c.InfoHash()}
cur, ok := s.active[k]
if !ok || cur != c {
// It is possible that some new conn shares the same connKey as the old conn,
e := s.get(c.InfoHash(), c.PeerID())
if e.status != _active {
return
}
if e.conn != c {
// It is possible that some new conn shares the same hash/peer as the old conn,
// so we need to make sure we're deleting the right one.
return
}
delete(s.active, k)
s.capacity[k.infoHash]++
s.delete(c.InfoHash(), c.PeerID())

s.log("peer", k.peerID, "hash", k.infoHash).Infof(
"Deleted active conn, capacity now at %d", s.capacity[k.infoHash])
s.log("hash", c.InfoHash(), "peer", c.PeerID()).Infof(
"Deleted active conn, capacity now at %d", s.capacity(c.InfoHash()))
s.netevents.Produce(networkevent.DropActiveConnEvent(
c.InfoHash(), s.localPeerID, c.PeerID()))

return
}

func (s *State) numMutualConns(h core.InfoHash, neighbors []core.PeerID) int {
var n int
for _, id := range neighbors {
if _, ok := s.active[connKey{id, h}]; ok {
n++
} else if _, ok := s.pending[connKey{id, h}]; ok {
e := s.get(h, id)
if e.status == _pending || e.status == _active {
n++
}
}
Expand All @@ -260,14 +270,46 @@ func (s *State) BlacklistSnapshot() []BlacklistedConn {
for k, e := range s.blacklist {
c := BlacklistedConn{
PeerID: k.peerID,
InfoHash: k.infoHash,
InfoHash: k.hash,
Remaining: e.Remaining(s.clk.Now()),
}
conns = append(conns, c)
}
return conns
}

func (s *State) get(h core.InfoHash, peerID core.PeerID) entry {
peers, ok := s.conns[h]
if !ok {
return entry{}
}
return peers[peerID]
}

func (s *State) put(h core.InfoHash, peerID core.PeerID, e entry) {
peers, ok := s.conns[h]
if !ok {
peers = make(map[core.PeerID]entry)
s.conns[h] = peers
}
peers[peerID] = e
}

func (s *State) delete(h core.InfoHash, peerID core.PeerID) {
peers, ok := s.conns[h]
if !ok {
return
}
delete(peers, peerID)
if len(peers) == 0 {
delete(s.conns, h)
}
}

func (s *State) capacity(h core.InfoHash) int {
return s.config.MaxOpenConnectionsPerTorrent - len(s.conns[h])
}

func (s *State) log(args ...interface{}) *zap.SugaredLogger {
return s.logger.With(args...)
}
34 changes: 34 additions & 0 deletions lib/torrent/scheduler/connstate/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/torrent/networkevent"
"github.com/uber/kraken/lib/torrent/scheduler/conn"
"github.com/uber/kraken/lib/torrent/storage"
)

func testState(config Config, clk clock.Clock) *State {
Expand Down Expand Up @@ -250,6 +251,39 @@ func TestStateActiveConns(t *testing.T) {
require.Empty(s.ActiveConns())
}

func TestStateSaturated(t *testing.T) {
require := require.New(t)

s := testState(Config{MaxOpenConnectionsPerTorrent: 10}, clock.New())

info := storage.TorrentInfoFixture(1, 1)

var conns []*conn.Conn
for i := 0; i < 10; i++ {
c, _, cleanup := conn.PipeFixture(conn.Config{}, info)
defer cleanup()

require.NoError(s.AddPending(c.PeerID(), info.InfoHash(), nil))
conns = append(conns, c)
}

// Pending conns do not count towards saturated.
require.False(s.Saturated(info.InfoHash()))

for i := 0; i < 9; i++ {
require.NoError(s.MovePendingToActive(conns[i]))
require.False(s.Saturated(info.InfoHash()))
}

// Adding 10th conn should mean we're saturated.
require.NoError(s.MovePendingToActive(conns[9]))
require.True(s.Saturated(info.InfoHash()))

// Removing one should mean we're no longer saturated.
s.DeleteActive(conns[5])
require.False(s.Saturated(info.InfoHash()))
}

func TestMaxMutualConns(t *testing.T) {
require := require.New(t)

Expand Down
10 changes: 0 additions & 10 deletions lib/torrent/scheduler/dispatch/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,6 @@ func (d *Dispatcher) LastWriteTime() time.Time {
return d.torrent.getLastWriteTime()
}

// NumPeers returns the number of peers connected to the dispatcher.
func (d *Dispatcher) NumPeers() int {
var n int
d.peers.Range(func(k, v interface{}) bool {
n++
return true
})
return n
}

// Empty returns true if the Dispatcher has no peers.
func (d *Dispatcher) Empty() bool {
empty := true
Expand Down
Loading

0 comments on commit dd763e3

Please sign in to comment.