Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

authorithy-discovery: Make changing of peer-id while active a bit more robust #3786

Merged
merged 58 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
0103c7f
authorithy-discovery: Make changing of peer-id while active a bit mor…
alexggh Mar 21, 2024
4172677
Make clippy happy
alexggh Mar 22, 2024
64f38d2
Fix warnings
alexggh Mar 22, 2024
ce87688
Refactor gossip support
alexggh Mar 25, 2024
bd69a55
Merge remote-tracking branch 'origin/master' into fix_change_node_id_…
alexggh Mar 25, 2024
a69ba99
Make clippy happy
alexggh Mar 25, 2024
311aade
Merge remote-tracking branch 'origin/master' into alexaggh/fix_change…
alexggh Apr 1, 2024
1305763
Some other hacks
alexggh Apr 3, 2024
7e16e58
Add more changes
alexggh Apr 3, 2024
a82ebed
More refactoring
alexggh Apr 3, 2024
d569ab3
A bit more refactoring
alexggh Apr 3, 2024
1c9a40d
A few more improvements
alexggh Apr 3, 2024
91e647c
Fixup even more
alexggh Apr 3, 2024
0a53ec2
Another something
alexggh Apr 3, 2024
ef6ddb6
Fixup everything
alexggh Apr 3, 2024
b3eb615
Post refactoring
alexggh Apr 3, 2024
f6d4b29
Fixup Cargo's
alexggh Apr 3, 2024
b41cdf7
Merge remote-tracking branch 'origin/master' into alexaggh/fix_change…
alexggh Apr 3, 2024
1ae1759
Make clippy happy
alexggh Apr 3, 2024
7b56bc8
Some minor tweaks
alexggh Apr 3, 2024
d610df6
Minor updates
alexggh Apr 3, 2024
4263dc4
Minor review feedback
alexggh Apr 3, 2024
4d7b164
Add authorithy-discovery-unittests
alexggh Apr 5, 2024
7cc3333
Add unittest for gossip-support changes
alexggh Apr 5, 2024
fabdb4b
Merge remote-tracking branch 'origin/master' into alexaggh/fix_change…
alexggh Apr 17, 2024
42afffc
Simplify indentation levels
alexggh Apr 17, 2024
28ac0a2
Minor cleanups
alexggh Apr 17, 2024
95950a7
Merge remote-tracking branch 'origin/master' into alexaggh/fix_change…
alexggh Apr 24, 2024
bd21fda
Update litep2p
alexggh Apr 24, 2024
57f1b38
Use put_valut_to from litep2p
alexggh Apr 24, 2024
9f56b02
Minor feedback
alexggh Apr 24, 2024
6d196c4
Minor changes
alexggh Apr 25, 2024
9979f8f
Minor updates
alexggh Apr 25, 2024
fd5dc46
Integrate with https://github.com/paritytech/litep2p/pull/96
alexggh Apr 30, 2024
20e351e
Fix warning on quorum failed
alexggh May 1, 2024
98398e3
Reconnect only if new peer ids pop-up
alexggh May 1, 2024
8066044
Revert kademlia removal
alexggh May 2, 2024
8eef5c3
Merge remote-tracking branch 'origin/master' into alexaggh/fix_change…
alexggh May 2, 2024
b4fe357
Update cargo.lock
alexggh May 2, 2024
97f09a3
Merge branch 'master' into alexaggh/fix_change_node_id_at_restart
alexggh May 2, 2024
586a0f1
Merge remote-tracking branch 'origin/master' into alexaggh/fix_change…
alexggh May 9, 2024
09b6306
Update substrate/client/network/src/litep2p/mod.rs
alexggh May 12, 2024
fb534b5
Update polkadot/node/network/gossip-support/src/tests.rs
alexggh May 12, 2024
7399dc5
Update substrate/client/authority-discovery/src/worker/tests.rs
alexggh May 12, 2024
17df838
Update assert messages
alexggh May 12, 2024
3313fd7
Address review feedback
alexggh May 12, 2024
999a710
Use a single signature
alexggh May 29, 2024
3b4650a
Merge remote-tracking branch 'origin/master' into alexaggh/fix_change…
alexggh Jul 12, 2024
7e81ca2
Remove unused
alexggh Jul 15, 2024
f083318
Update substrate/client/network/src/litep2p/mod.rs
alexggh Jul 18, 2024
a5fcc6b
Update substrate/client/authority-discovery/src/worker.rs
alexggh Jul 18, 2024
e54c96d
Update substrate/client/authority-discovery/src/worker.rs
alexggh Jul 18, 2024
66959dd
Update documentation
alexggh Jul 18, 2024
06285ef
Update substrate/client/authority-discovery/src/worker.rs
alexggh Jul 18, 2024
3a23fbd
Address review feedback
alexggh Jul 18, 2024
77e4061
Merge remote-tracking branch 'origin/master' into alexaggh/fix_change…
alexggh Jul 23, 2024
0256b3c
Add PrDoc
alexggh Jul 23, 2024
19054d7
Merge branch 'master' into alexaggh/fix_change_node_id_at_restart
alexggh Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions polkadot/node/network/gossip-support/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ const BACKOFF_DURATION: Duration = Duration::from_secs(5);
#[cfg(test)]
const BACKOFF_DURATION: Duration = Duration::from_millis(500);

// The authorithy_discovery queries runs every ten minutes,
// so no point in trying more often than that, so let's
// try re-resolving the authorithies every 10 minutes and force
// the reconnection to the ones that changed their address.
const TRY_RECONNECT_AFTER: Duration = Duration::from_secs(60 * 10);

/// Duration after which we consider low connectivity a problem.
///
/// Especially at startup low connectivity is expected (authority discovery cache needs to be
Expand All @@ -91,6 +97,14 @@ pub struct GossipSupport<AD> {
// `None` otherwise.
last_failure: Option<Instant>,

// Validators can restart during a session, so if they change
// their PeerID, we will connect to them in the best case after
// a session, so we need to try more often to resolved peers and
// reconnect to them. The authorithy_discovery queries runs every ten
// minutes, so no point in trying more often than that, so let's
// try reconnecting every 10 minutes here as well.
last_connection_request: Option<Instant>,

/// First time we did not reach our connectivity threshold.
///
/// This is the time of the first failed attempt to connect to >2/3 of all validators in a
Expand Down Expand Up @@ -131,6 +145,7 @@ where
keystore,
last_session_index: None,
last_failure: None,
last_connection_request: None,
failure_start: None,
resolved_authorities: HashMap::new(),
connected_authorities: HashMap::new(),
Expand Down Expand Up @@ -196,7 +211,11 @@ where
for leaf in leaves {
let current_index = util::request_session_index_for_child(leaf, sender).await.await??;
let since_failure = self.last_failure.map(|i| i.elapsed()).unwrap_or_default();
let force_request = since_failure >= BACKOFF_DURATION;
let since_last_reconnect =
self.last_connection_request.map(|i| i.elapsed()).unwrap_or_default();

let force_request =
since_failure >= BACKOFF_DURATION || since_last_reconnect >= TRY_RECONNECT_AFTER;
let leaf_session = Some((current_index, leaf));
let maybe_new_session = match self.last_session_index {
Some(i) if current_index <= i => None,
Expand Down Expand Up @@ -248,7 +267,7 @@ where
// connections to a much broader set of validators.
{
let mut connections = authorities_past_present_future(sender, leaf).await?;

self.last_connection_request = Some(Instant::now());
// Remove all of our locally controlled validator indices so we don't connect to
// ourself.
let connections =
Expand Down Expand Up @@ -405,10 +424,11 @@ where
.await
.into_iter()
.flat_map(|list| list.into_iter())
.find_map(|addr| parse_addr(addr).ok().map(|(p, _)| p));
.flat_map(|addr| parse_addr(addr).ok().map(|(p, _)| p))
.collect::<Vec<_>>();

if let Some(p) = peer_id {
authority_ids.entry(p).or_default().insert(authority);
for p in peer_id {
authority_ids.entry(p).or_default().insert(authority.clone());
}
}

Expand Down
25 changes: 18 additions & 7 deletions substrate/client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {
/// Set of in-flight lookups.
in_flight_lookups: HashMap<KademliaKey, AuthorityId>,

/// Set of lookups we can still received records.
alexggh marked this conversation as resolved.
Show resolved Hide resolved
/// These are the entries in the `in_flight_lookups` for which
/// we got at least on successfull result.
alexggh marked this conversation as resolved.
Show resolved Hide resolved
known_lookups: HashMap<KademliaKey, AuthorityId>,

addr_cache: addr_cache::AddrCache,

metrics: Option<Metrics>,
Expand Down Expand Up @@ -237,6 +242,7 @@ where
query_interval,
pending_lookups: Vec::new(),
in_flight_lookups: HashMap::new(),
known_lookups: HashMap::new(),
addr_cache,
role,
metrics,
Expand Down Expand Up @@ -292,9 +298,7 @@ where
fn process_message_from_service(&self, msg: ServicetoWorkerMsg) {
match msg {
ServicetoWorkerMsg::GetAddressesByAuthorityId(authority, sender) => {
let _ = sender.send(
self.addr_cache.get_addresses_by_authority_id(&authority).map(Clone::clone),
);
let _ = sender.send(self.addr_cache.get_addresses_by_authority_id(&authority));
},
ServicetoWorkerMsg::GetAuthorityIdsByPeerId(peer_id, sender) => {
let _ = sender
Expand Down Expand Up @@ -408,6 +412,7 @@ where
// Ignore all still in-flight lookups. Those that are still in-flight are likely stalled as
// query interval ticks are far enough apart for all lookups to succeed.
self.in_flight_lookups.clear();
self.known_lookups.clear();

if let Some(metrics) = &self.metrics {
metrics
Expand Down Expand Up @@ -500,10 +505,16 @@ where
.map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentKeys)?
.ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?;

let authority_id: AuthorityId = self
.in_flight_lookups
.remove(&remote_key)
.ok_or(Error::ReceivingUnexpectedRecord)?;
let authority_id: AuthorityId =
if let Some(authority_id) = self.in_flight_lookups.remove(&remote_key) {
authority_id
} else if let Some(authority_id) = self.known_lookups.get(&remote_key) {
authority_id.clone()
dmitry-markin marked this conversation as resolved.
Show resolved Hide resolved
} else {
return Err(Error::ReceivingUnexpectedRecord);
};

self.known_lookups.insert(remote_key.clone(), authority_id.clone());

let local_peer_id = self.network.local_peer_id();

dmitry-markin marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
50 changes: 38 additions & 12 deletions substrate/client/authority-discovery/src/worker/addr_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use libp2p::{
PeerId,
};
use sp_authority_discovery::AuthorityId;
use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
time::Instant,
};

/// Cache for [`AuthorityId`] -> [`HashSet<Multiaddr>`] and [`PeerId`] -> [`HashSet<AuthorityId>`]
/// mappings.
Expand All @@ -33,7 +36,7 @@ pub(super) struct AddrCache {
/// Since we may store the mapping across several sessions, a single
/// `PeerId` might correspond to multiple `AuthorityId`s. However,
/// it's not expected that a single `AuthorityId` can have multiple `PeerId`s.
authority_id_to_addresses: HashMap<AuthorityId, HashSet<Multiaddr>>,
authority_id_to_addresses: HashMap<AuthorityId, HashMap<Multiaddr, Instant>>,
peer_id_to_authority_ids: HashMap<PeerId, HashSet<AuthorityId>>,
}

Expand All @@ -48,8 +51,12 @@ impl AddrCache {
/// Inserts the given [`AuthorityId`] and [`Vec<Multiaddr>`] pair for future lookups by
/// [`AuthorityId`] or [`PeerId`].
pub fn insert(&mut self, authority_id: AuthorityId, addresses: Vec<Multiaddr>) {
let addresses = addresses.into_iter().collect::<HashSet<_>>();
let peer_ids = addresses_to_peer_ids(&addresses);
let mut addresses = addresses
.into_iter()
.map(|addr| (addr, Instant::now() + std::time::Duration::from_secs(24 * 60 * 60)))
.collect::<HashMap<_, _>>();

let mut peer_ids = addresses_to_peer_ids(&addresses);

if peer_ids.is_empty() {
log::debug!(
Expand All @@ -74,9 +81,26 @@ impl AddrCache {
"Found addresses for authority {authority_id:?}: {addresses:?}",
);

let old_addresses =
self.authority_id_to_addresses.get(&authority_id).cloned().unwrap_or_default();

let time_now = Instant::now();

let to_keep_addresses = old_addresses
.iter()
.filter(|(addr, expires)| **expires >= time_now && !addresses.contains_key(addr))
.map(|(addr, expires)| (addr.clone(), *expires))
.collect::<HashMap<_, _>>();
alexggh marked this conversation as resolved.
Show resolved Hide resolved

addresses.extend(to_keep_addresses.clone());

let old_addresses = self.authority_id_to_addresses.insert(authority_id.clone(), addresses);

let old_peer_ids = addresses_to_peer_ids(&old_addresses.unwrap_or_default());

let to_kepp_peer_ids = addresses_to_peer_ids(&to_keep_addresses);
peer_ids.extend(to_kepp_peer_ids);
alexggh marked this conversation as resolved.
Show resolved Hide resolved

// Add the new peer ids
peer_ids.difference(&old_peer_ids).for_each(|new_peer_id| {
self.peer_id_to_authority_ids
Expand Down Expand Up @@ -118,8 +142,10 @@ impl AddrCache {
pub fn get_addresses_by_authority_id(
&self,
authority_id: &AuthorityId,
) -> Option<&HashSet<Multiaddr>> {
self.authority_id_to_addresses.get(authority_id)
) -> Option<HashSet<Multiaddr>> {
self.authority_id_to_addresses
.get(authority_id)
.map(|result| result.keys().map(|addr| addr.clone()).collect::<HashSet<_>>())
}

/// Returns the [`AuthorityId`]s for the given [`PeerId`].
Expand Down Expand Up @@ -170,8 +196,8 @@ fn peer_id_from_multiaddr(addr: &Multiaddr) -> Option<PeerId> {
})
}

fn addresses_to_peer_ids(addresses: &HashSet<Multiaddr>) -> HashSet<PeerId> {
addresses.iter().filter_map(peer_id_from_multiaddr).collect::<HashSet<_>>()
fn addresses_to_peer_ids(addresses: &HashMap<Multiaddr, Instant>) -> HashSet<PeerId> {
addresses.keys().filter_map(peer_id_from_multiaddr).collect::<HashSet<_>>()
}

#[cfg(test)]
Expand Down Expand Up @@ -253,7 +279,7 @@ mod tests {
cache.insert(third.0.clone(), vec![third.1.clone()]);

assert_eq!(
Some(&HashSet::from([third.1.clone()])),
Some(HashSet::from([third.1.clone()])),
cache.get_addresses_by_authority_id(&third.0),
"Expect `get_addresses_by_authority_id` to return addresses of third authority.",
);
Expand Down Expand Up @@ -347,7 +373,7 @@ mod tests {
cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr3).unwrap())
);
assert_eq!(
&HashSet::from([multiaddr2.clone(), multiaddr3.clone(), multiaddr4.clone()]),
HashSet::from([multiaddr2.clone(), multiaddr3.clone(), multiaddr4.clone()]),
cache.get_addresses_by_authority_id(&authority1).unwrap(),
);

Expand Down Expand Up @@ -377,11 +403,11 @@ mod tests {

assert_eq!(2, addr_cache.num_authority_ids());
assert_eq!(
&HashSet::from([addr.clone()]),
HashSet::from([addr.clone()]),
addr_cache.get_addresses_by_authority_id(&authority_id0).unwrap()
);
assert_eq!(
&HashSet::from([addr]),
HashSet::from([addr]),
addr_cache.get_addresses_by_authority_id(&authority_id1).unwrap()
);
}
Expand Down
6 changes: 3 additions & 3 deletions substrate/client/authority-discovery/src/worker/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ impl DhtValueFoundTester {
&mut self,
strict_record_validation: bool,
values: Vec<(KademliaKey, Vec<u8>)>,
) -> Option<&HashSet<Multiaddr>> {
) -> Option<HashSet<Multiaddr>> {
let (_dht_event_tx, dht_event_rx) = channel(1);
let local_test_api =
Arc::new(TestApi { authorities: vec![self.remote_authority_public.into()] });
Expand Down Expand Up @@ -597,7 +597,7 @@ fn strict_accept_address_with_peer_signature() {
let cached_remote_addresses = tester.process_value_found(true, kv_pairs);

assert_eq!(
Some(&HashSet::from([addr])),
Some(HashSet::from([addr])),
cached_remote_addresses,
"Expect worker to only cache `Multiaddr`s with `PeerId`s.",
);
Expand Down Expand Up @@ -675,7 +675,7 @@ fn do_not_cache_addresses_without_peer_id() {
let cached_remote_addresses = tester.process_value_found(false, kv_pairs);

assert_eq!(
Some(&HashSet::from([multiaddr_with_peer_id])),
Some(HashSet::from([multiaddr_with_peer_id])),
cached_remote_addresses,
"Expect worker to only cache `Multiaddr`s with `PeerId`s.",
);
Expand Down
11 changes: 8 additions & 3 deletions substrate/client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use libp2p::{
},
PeerId,
};
use log::{debug, info, trace, warn};
use log::{debug, error, info, trace, warn};
use sp_core::hexdisplay::HexDisplay;
use std::{
cmp,
Expand Down Expand Up @@ -784,8 +784,13 @@ impl NetworkBehaviour for DiscoveryBehaviour {
// Let's directly finish the query, as we are only interested in a
// quorum of 1.
alexggh marked this conversation as resolved.
Show resolved Hide resolved
if let Some(kad) = self.kademlia.as_mut() {
if let Some(mut query) = kad.query_mut(&id) {
query.finish();
if let Some(query) = kad.query_mut(&id) {
// Let the query continue, to increase the chances we
// discover all possible addresses, for the cases where more
// addresses might exist in DHT, for example when the node
// changes its PeerId.

// query.finish();
alexggh marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Loading