diff --git a/substrate/client/network/sync/Cargo.toml b/substrate/client/network/sync/Cargo.toml
index 39312cc4b327..a1ea39a852fc 100644
--- a/substrate/client/network/sync/Cargo.toml
+++ b/substrate/client/network/sync/Cargo.toml
@@ -30,6 +30,7 @@ schnellru = "0.2.1"
smallvec = "1.11.0"
thiserror = "1.0"
tokio-stream = "0.1.14"
+tokio = { version = "1.32.0", features = ["time", "macros"] }
fork-tree = { path = "../../../utils/fork-tree" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus" }
sc-client-api = { path = "../../api" }
diff --git a/substrate/client/network/sync/src/block_announce_validator.rs b/substrate/client/network/sync/src/block_announce_validator.rs
index f083f9e29e44..961b581cddce 100644
--- a/substrate/client/network/sync/src/block_announce_validator.rs
+++ b/substrate/client/network/sync/src/block_announce_validator.rs
@@ -16,10 +16,11 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
-//! `BlockAnnounceValidator` is responsible for async validation of block announcements.
+//! [`BlockAnnounceValidator`] is responsible for async validation of block announcements.
+//! [`Stream`] implemented by [`BlockAnnounceValidator`] never terminates.
use crate::futures_stream::FuturesStream;
-use futures::{Future, FutureExt, Stream, StreamExt};
+use futures::{stream::FusedStream, Future, FutureExt, Stream, StreamExt};
use libp2p::PeerId;
use log::{debug, error, trace, warn};
use sc_network_common::sync::message::BlockAnnounce;
@@ -300,6 +301,13 @@ impl Stream for BlockAnnounceValidator {
}
}
+// As [`BlockAnnounceValidator`] never terminates, we can easily implement [`FusedStream`] for it.
+impl FusedStream for BlockAnnounceValidator {
+ fn is_terminated(&self) -> bool {
+ false
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs
index 0f689742bc58..02a0dc2e9151 100644
--- a/substrate/client/network/sync/src/engine.rs
+++ b/substrate/client/network/sync/src/engine.rs
@@ -47,7 +47,6 @@ use futures::{
future::{BoxFuture, Fuse},
FutureExt, StreamExt,
};
-use futures_timer::Delay;
use libp2p::{request_response::OutboundFailure, PeerId};
use log::{debug, trace};
use prometheus_endpoint::{
@@ -56,6 +55,7 @@ use prometheus_endpoint::{
};
use prost::Message;
use schnellru::{ByLength, LruMap};
+use tokio::time::{Interval, MissedTickBehavior};
use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
use sc_consensus::import_queue::ImportQueueService;
@@ -85,7 +85,6 @@ use std::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
- task::Poll,
time::{Duration, Instant},
};
@@ -254,7 +253,7 @@ pub struct SyncingEngine {
service_rx: TracingUnboundedReceiver>,
/// Channel for receiving inbound connections from `Protocol`.
- rx: sc_utils::mpsc::TracingUnboundedReceiver>,
+ sync_events_rx: sc_utils::mpsc::TracingUnboundedReceiver>,
/// Assigned roles.
roles: Roles,
@@ -266,7 +265,7 @@ pub struct SyncingEngine {
event_streams: Vec>,
/// Interval at which we call `tick`.
- tick_timeout: Delay,
+ tick_timeout: Interval,
/// All connected peers. Contains both full and light node peers.
peers: HashMap>,
@@ -304,7 +303,7 @@ pub struct SyncingEngine {
boot_node_ids: HashSet,
/// A channel to get target block header if we skip over proofs downloading during warp sync.
- warp_sync_target_block_header_rx:
+ warp_sync_target_block_header_rx_fused:
Fuse>>,
/// Protocol name used for block announcements
@@ -363,7 +362,7 @@ where
block_downloader: Arc>,
state_request_protocol_name: ProtocolName,
warp_sync_protocol_name: Option,
- rx: sc_utils::mpsc::TracingUnboundedReceiver>,
+ sync_events_rx: sc_utils::mpsc::TracingUnboundedReceiver>,
) -> Result<(Self, SyncingService, NonDefaultSetConfig), ClientError> {
let mode = net_config.network_config.sync_mode;
let max_parallel_downloads = net_config.network_config.max_parallel_downloads;
@@ -436,7 +435,7 @@ where
// Make sure polling of the target block channel is a no-op if there is no block to
// retrieve.
- let warp_sync_target_block_header_rx = warp_sync_target_block_header_rx
+ let warp_sync_target_block_header_rx_fused = warp_sync_target_block_header_rx
.map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse());
let block_announce_config = Self::get_block_announce_proto_config(
@@ -478,6 +477,12 @@ where
let max_out_peers = net_config.network_config.default_peers_set.out_peers;
let max_in_peers = (max_full_peers - max_out_peers) as usize;
+ let tick_timeout = {
+ let mut interval = tokio::time::interval(TICK_TIMEOUT);
+ interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
+ interval
+ };
+
Ok((
Self {
roles,
@@ -493,11 +498,11 @@ where
num_connected: num_connected.clone(),
is_major_syncing: is_major_syncing.clone(),
service_rx,
- rx,
+ sync_events_rx,
genesis_hash,
important_peers,
default_peers_set_no_slot_connected_peers: HashSet::new(),
- warp_sync_target_block_header_rx,
+ warp_sync_target_block_header_rx_fused,
boot_node_ids,
default_peers_set_no_slot_peers,
default_peers_set_num_full,
@@ -505,7 +510,7 @@ where
num_in_peers: 0usize,
max_in_peers,
event_streams: Vec::new(),
- tick_timeout: Delay::new(TICK_TIMEOUT),
+ tick_timeout,
syncing_started: None,
last_notification_io: Instant::now(),
metrics: if let Some(r) = metrics_registry {
@@ -691,230 +696,222 @@ where
self.syncing_started = Some(Instant::now());
loop {
- futures::future::poll_fn(|cx| self.poll(cx)).await;
+ tokio::select! {
+ _ = self.tick_timeout.tick() => self.perform_periodic_actions(),
+ command = self.service_rx.select_next_some() =>
+ self.process_service_command(command),
+ sync_event = self.sync_events_rx.select_next_some() =>
+ self.process_sync_event(sync_event),
+ warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused =>
+ self.pass_warp_sync_target_block_header(warp_target_block_header),
+ response_event = self.pending_responses.select_next_some() =>
+ self.process_response_event(response_event),
+ validation_result = self.block_announce_validator.select_next_some() =>
+ self.process_block_announce_validation_result(validation_result),
+ }
+
+ // Update atomic variables
+ self.num_connected.store(self.peers.len(), Ordering::Relaxed);
+ self.is_major_syncing
+ .store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed);
+
+ // Send outbound requests on `ChanSync`'s behalf.
+ self.send_chain_sync_requests();
}
}
- pub fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> {
- self.num_connected.store(self.peers.len(), Ordering::Relaxed);
- self.is_major_syncing
- .store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed);
-
- while let Poll::Ready(()) = self.tick_timeout.poll_unpin(cx) {
- self.report_metrics();
- self.tick_timeout.reset(TICK_TIMEOUT);
-
- // if `SyncingEngine` has just started, don't evict seemingly inactive peers right away
- // as they may not have produced blocks not because they've disconnected but because
- // they're still waiting to receive enough relaychain blocks to start producing blocks.
- if let Some(started) = self.syncing_started {
- if started.elapsed() < INITIAL_EVICTION_WAIT_PERIOD {
- continue
- }
+ fn perform_periodic_actions(&mut self) {
+ self.report_metrics();
- self.syncing_started = None;
- self.last_notification_io = Instant::now();
+ // if `SyncingEngine` has just started, don't evict seemingly inactive peers right away
+ // as they may not have produced blocks not because they've disconnected but because
+ // they're still waiting to receive enough relaychain blocks to start producing blocks.
+ if let Some(started) = self.syncing_started {
+ if started.elapsed() < INITIAL_EVICTION_WAIT_PERIOD {
+ return
}
- // if syncing hasn't sent or received any blocks within `INACTIVITY_EVICT_THRESHOLD`,
- // it means the local node has stalled and is connected to peers who either don't
- // consider it connected or are also all stalled. In order to unstall the node,
- // disconnect all peers and allow `ProtocolController` to establish new connections.
- if self.last_notification_io.elapsed() > INACTIVITY_EVICT_THRESHOLD {
- log::debug!(
- target: LOG_TARGET,
- "syncing has halted due to inactivity, evicting all peers",
- );
+ self.syncing_started = None;
+ self.last_notification_io = Instant::now();
+ }
- for peer in self.peers.keys() {
- self.network_service.report_peer(*peer, rep::INACTIVE_SUBSTREAM);
- self.network_service
- .disconnect_peer(*peer, self.block_announce_protocol_name.clone());
- }
+ // if syncing hasn't sent or received any blocks within `INACTIVITY_EVICT_THRESHOLD`,
+ // it means the local node has stalled and is connected to peers who either don't
+ // consider it connected or are also all stalled. In order to unstall the node,
+ // disconnect all peers and allow `ProtocolController` to establish new connections.
+ if self.last_notification_io.elapsed() > INACTIVITY_EVICT_THRESHOLD {
+ log::debug!(
+ target: LOG_TARGET,
+ "syncing has halted due to inactivity, evicting all peers",
+ );
- // after all the peers have been evicted, start timer again to prevent evicting
- // new peers that join after the old peer have been evicted
- self.last_notification_io = Instant::now();
+ for peer in self.peers.keys() {
+ self.network_service.report_peer(*peer, rep::INACTIVE_SUBSTREAM);
+ self.network_service
+ .disconnect_peer(*peer, self.block_announce_protocol_name.clone());
}
+
+ // after all the peers have been evicted, start timer again to prevent evicting
+ // new peers that join after the old peer have been evicted
+ self.last_notification_io = Instant::now();
}
+ }
- while let Poll::Ready(Some(event)) = self.service_rx.poll_next_unpin(cx) {
- match event {
- ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
- self.chain_sync.set_sync_fork_request(peers, &hash, number);
- },
- ToServiceCommand::EventStream(tx) => self.event_streams.push(tx),
- ToServiceCommand::RequestJustification(hash, number) =>
- self.chain_sync.request_justification(&hash, number),
- ToServiceCommand::ClearJustificationRequests =>
- self.chain_sync.clear_justification_requests(),
- ToServiceCommand::BlocksProcessed(imported, count, results) => {
- for result in self.chain_sync.on_blocks_processed(imported, count, results) {
- match result {
- Ok(action) => match action {
- BlockRequestAction::SendRequest { peer_id, request } => {
- // drop obsolete pending response first
- self.pending_responses.remove(&peer_id);
- self.send_block_request(peer_id, request);
- },
- BlockRequestAction::RemoveStale { peer_id } => {
- self.pending_responses.remove(&peer_id);
- },
+ fn process_service_command(&mut self, command: ToServiceCommand) {
+ match command {
+ ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
+ self.chain_sync.set_sync_fork_request(peers, &hash, number);
+ },
+ ToServiceCommand::EventStream(tx) => self.event_streams.push(tx),
+ ToServiceCommand::RequestJustification(hash, number) =>
+ self.chain_sync.request_justification(&hash, number),
+ ToServiceCommand::ClearJustificationRequests =>
+ self.chain_sync.clear_justification_requests(),
+ ToServiceCommand::BlocksProcessed(imported, count, results) => {
+ for result in self.chain_sync.on_blocks_processed(imported, count, results) {
+ match result {
+ Ok(action) => match action {
+ BlockRequestAction::SendRequest { peer_id, request } => {
+ // drop obsolete pending response first
+ self.pending_responses.remove(&peer_id);
+ self.send_block_request(peer_id, request);
},
- Err(BadPeer(peer_id, repu)) => {
+ BlockRequestAction::RemoveStale { peer_id } => {
self.pending_responses.remove(&peer_id);
- self.network_service.disconnect_peer(
- peer_id,
- self.block_announce_protocol_name.clone(),
- );
- self.network_service.report_peer(peer_id, repu)
},
- }
- }
- },
- ToServiceCommand::JustificationImported(peer_id, hash, number, success) => {
- self.chain_sync.on_justification_import(hash, number, success);
- if !success {
- log::info!(
- target: LOG_TARGET,
- "💔 Invalid justification provided by {peer_id} for #{hash}",
- );
- self.network_service
- .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
- self.network_service.report_peer(
- peer_id,
- ReputationChange::new_fatal("Invalid justification"),
- );
+ },
+ Err(BadPeer(peer_id, repu)) => {
+ self.pending_responses.remove(&peer_id);
+ self.network_service.disconnect_peer(
+ peer_id,
+ self.block_announce_protocol_name.clone(),
+ );
+ self.network_service.report_peer(peer_id, repu)
+ },
}
- },
- ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data),
- ToServiceCommand::NewBestBlockImported(hash, number) =>
- self.new_best_block_imported(hash, number),
- ToServiceCommand::Status(tx) => {
- let mut status = self.chain_sync.status();
- status.num_connected_peers = self.peers.len() as u32;
- let _ = tx.send(status);
- },
- ToServiceCommand::NumActivePeers(tx) => {
- let _ = tx.send(self.num_active_peers());
- },
- ToServiceCommand::SyncState(tx) => {
- let _ = tx.send(self.chain_sync.status());
- },
- ToServiceCommand::BestSeenBlock(tx) => {
- let _ = tx.send(self.chain_sync.status().best_seen_block);
- },
- ToServiceCommand::NumSyncPeers(tx) => {
- let _ = tx.send(self.chain_sync.status().num_peers);
- },
- ToServiceCommand::NumQueuedBlocks(tx) => {
- let _ = tx.send(self.chain_sync.status().queued_blocks);
- },
- ToServiceCommand::NumDownloadedBlocks(tx) => {
- let _ = tx.send(self.chain_sync.num_downloaded_blocks());
- },
- ToServiceCommand::NumSyncRequests(tx) => {
- let _ = tx.send(self.chain_sync.num_sync_requests());
- },
- ToServiceCommand::PeersInfo(tx) => {
- let peers_info = self
- .peers
- .iter()
- .map(|(peer_id, peer)| (*peer_id, peer.info.clone()))
- .collect();
- let _ = tx.send(peers_info);
- },
- ToServiceCommand::OnBlockFinalized(hash, header) =>
- self.chain_sync.on_block_finalized(&hash, *header.number()),
- }
+ }
+ },
+ ToServiceCommand::JustificationImported(peer_id, hash, number, success) => {
+ self.chain_sync.on_justification_import(hash, number, success);
+ if !success {
+ log::info!(
+ target: LOG_TARGET,
+ "💔 Invalid justification provided by {peer_id} for #{hash}",
+ );
+ self.network_service
+ .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
+ self.network_service
+ .report_peer(peer_id, ReputationChange::new_fatal("Invalid justification"));
+ }
+ },
+ ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data),
+ ToServiceCommand::NewBestBlockImported(hash, number) =>
+ self.new_best_block_imported(hash, number),
+ ToServiceCommand::Status(tx) => {
+ let mut status = self.chain_sync.status();
+ status.num_connected_peers = self.peers.len() as u32;
+ let _ = tx.send(status);
+ },
+ ToServiceCommand::NumActivePeers(tx) => {
+ let _ = tx.send(self.num_active_peers());
+ },
+ ToServiceCommand::SyncState(tx) => {
+ let _ = tx.send(self.chain_sync.status());
+ },
+ ToServiceCommand::BestSeenBlock(tx) => {
+ let _ = tx.send(self.chain_sync.status().best_seen_block);
+ },
+ ToServiceCommand::NumSyncPeers(tx) => {
+ let _ = tx.send(self.chain_sync.status().num_peers);
+ },
+ ToServiceCommand::NumQueuedBlocks(tx) => {
+ let _ = tx.send(self.chain_sync.status().queued_blocks);
+ },
+ ToServiceCommand::NumDownloadedBlocks(tx) => {
+ let _ = tx.send(self.chain_sync.num_downloaded_blocks());
+ },
+ ToServiceCommand::NumSyncRequests(tx) => {
+ let _ = tx.send(self.chain_sync.num_sync_requests());
+ },
+ ToServiceCommand::PeersInfo(tx) => {
+ let peers_info = self
+ .peers
+ .iter()
+ .map(|(peer_id, peer)| (*peer_id, peer.info.clone()))
+ .collect();
+ let _ = tx.send(peers_info);
+ },
+ ToServiceCommand::OnBlockFinalized(hash, header) =>
+ self.chain_sync.on_block_finalized(&hash, *header.number()),
}
+ }
- while let Poll::Ready(Some(event)) = self.rx.poll_next_unpin(cx) {
- match event {
- sc_network::SyncEvent::NotificationStreamOpened {
- remote,
- received_handshake,
- sink,
- inbound,
- tx,
- } => match self.on_sync_peer_connected(remote, &received_handshake, sink, inbound) {
- Ok(()) => {
- let _ = tx.send(true);
- },
- Err(()) => {
- log::debug!(
- target: LOG_TARGET,
- "Failed to register peer {remote:?}: {received_handshake:?}",
- );
- let _ = tx.send(false);
- },
+ fn process_sync_event(&mut self, event: sc_network::SyncEvent) {
+ match event {
+ sc_network::SyncEvent::NotificationStreamOpened {
+ remote,
+ received_handshake,
+ sink,
+ inbound,
+ tx,
+ } => match self.on_sync_peer_connected(remote, &received_handshake, sink, inbound) {
+ Ok(()) => {
+ let _ = tx.send(true);
},
- sc_network::SyncEvent::NotificationStreamClosed { remote } => {
- if self.on_sync_peer_disconnected(remote).is_err() {
- log::trace!(
- target: LOG_TARGET,
- "Disconnected peer which had earlier been refused by on_sync_peer_connected {}",
- remote
- );
- }
+ Err(()) => {
+ log::debug!(
+ target: LOG_TARGET,
+ "Failed to register peer {remote:?}: {received_handshake:?}",
+ );
+ let _ = tx.send(false);
},
- sc_network::SyncEvent::NotificationsReceived { remote, messages } => {
- for message in messages {
- if self.peers.contains_key(&remote) {
- if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) {
- self.last_notification_io = Instant::now();
- self.push_block_announce_validation(remote, announce);
- } else {
- log::warn!(target: "sub-libp2p", "Failed to decode block announce");
- }
+ },
+ sc_network::SyncEvent::NotificationStreamClosed { remote } => {
+ if self.on_sync_peer_disconnected(remote).is_err() {
+ log::trace!(
+ target: LOG_TARGET,
+ "Disconnected peer which had earlier been refused by on_sync_peer_connected {}",
+ remote
+ );
+ }
+ },
+ sc_network::SyncEvent::NotificationsReceived { remote, messages } => {
+ for message in messages {
+ if self.peers.contains_key(&remote) {
+ if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) {
+ self.last_notification_io = Instant::now();
+ self.push_block_announce_validation(remote, announce);
} else {
- log::trace!(
- target: LOG_TARGET,
- "Received sync for peer earlier refused by sync layer: {remote}",
- );
+ log::warn!(target: "sub-libp2p", "Failed to decode block announce");
}
+ } else {
+ log::trace!(
+ target: LOG_TARGET,
+ "Received sync for peer earlier refused by sync layer: {remote}",
+ );
}
- },
- sc_network::SyncEvent::NotificationSinkReplaced { remote, sink } => {
- if let Some(peer) = self.peers.get_mut(&remote) {
- peer.sink = sink;
- }
- },
- }
+ }
+ },
+ sc_network::SyncEvent::NotificationSinkReplaced { remote, sink } => {
+ if let Some(peer) = self.peers.get_mut(&remote) {
+ peer.sink = sink;
+ }
+ },
}
+ }
- // Retreive warp sync target block header just before polling `ChainSync`
- // to make progress as soon as we receive it.
- match self.warp_sync_target_block_header_rx.poll_unpin(cx) {
- Poll::Ready(Ok(target)) => {
- self.chain_sync.set_warp_sync_target_block(target);
+ fn pass_warp_sync_target_block_header(&mut self, header: Result) {
+ match header {
+ Ok(header) => {
+ self.chain_sync.set_warp_sync_target_block(header);
},
- Poll::Ready(Err(err)) => {
+ Err(err) => {
log::error!(
target: LOG_TARGET,
"Failed to get target block for warp sync. Error: {err:?}",
);
},
- Poll::Pending => {},
}
-
- // Send outbound requests on `ChanSync`'s behalf.
- self.send_chain_sync_requests();
-
- // Poll & process pending responses.
- while let Poll::Ready(Some(event)) = self.pending_responses.poll_next_unpin(cx) {
- self.process_response_event(event);
- }
-
- // Poll block announce validations last, because if a block announcement was received
- // through the event stream between `SyncingEngine` and `Protocol` and the validation
- // finished right after it is queued, the resulting block request (if any) can be sent
- // right away.
- while let Poll::Ready(Some(result)) = self.block_announce_validator.poll_next_unpin(cx) {
- self.process_block_announce_validation_result(result);
- }
-
- Poll::Pending
}
/// Called by peer when it is disconnecting.
diff --git a/substrate/client/network/sync/src/pending_responses.rs b/substrate/client/network/sync/src/pending_responses.rs
index 9e2fd5cfd674..55308dfc1ea9 100644
--- a/substrate/client/network/sync/src/pending_responses.rs
+++ b/substrate/client/network/sync/src/pending_responses.rs
@@ -17,20 +17,20 @@
// along with this program. If not, see .
//! [`PendingResponses`] is responsible for keeping track of pending responses and
-//! polling them.
+//! polling them. [`Stream`] implemented by [`PendingResponses`] never terminates.
use crate::types::PeerRequest;
use futures::{
channel::oneshot,
future::BoxFuture,
- stream::{BoxStream, Stream},
+ stream::{BoxStream, FusedStream, Stream},
FutureExt, StreamExt,
};
use libp2p::PeerId;
use log::error;
use sc_network::request_responses::RequestFailure;
use sp_runtime::traits::Block as BlockT;
-use std::task::{Context, Poll};
+use std::task::{Context, Poll, Waker};
use tokio_stream::StreamMap;
/// Log target for this file.
@@ -53,11 +53,13 @@ pub(crate) struct ResponseEvent {
pub(crate) struct PendingResponses {
/// Pending responses
pending_responses: StreamMap, ResponseResult)>>,
+ /// Waker to implement never terminating stream
+ waker: Option,
}
impl PendingResponses {
pub fn new() -> Self {
- Self { pending_responses: StreamMap::new() }
+ Self { pending_responses: StreamMap::new(), waker: None }
}
pub fn insert(
@@ -82,6 +84,10 @@ impl PendingResponses {
);
debug_assert!(false);
}
+
+ if let Some(waker) = self.waker.take() {
+ waker.wake();
+ }
}
pub fn remove(&mut self, peer_id: &PeerId) -> bool {
@@ -93,8 +99,6 @@ impl PendingResponses {
}
}
-impl Unpin for PendingResponses {}
-
impl Stream for PendingResponses {
type Item = ResponseEvent;
@@ -102,8 +106,8 @@ impl Stream for PendingResponses {
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll