Skip to content

Commit

Permalink
Use NetworkWorker::next_action instead of poll in sc-network-test
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-markin committed Jan 23, 2023
1 parent fb272a6 commit 4b5d851
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 293 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/network/test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
tokio = "1.22.0"
tokio-scoped = "0.2.0"
async-trait = "0.1.57"
futures = "0.3.21"
futures-timer = "3.0.1"
Expand Down
210 changes: 107 additions & 103 deletions client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,11 @@ mod block_import;
#[cfg(test)]
mod sync;

use std::{
collections::HashMap,
marker::PhantomData,
pin::Pin,
sync::Arc,
task::{Context as FutureContext, Poll},
time::Duration,
};
use std::{collections::HashMap, marker::PhantomData, pin::Pin, sync::Arc, time::Duration};

use futures::{future::BoxFuture, prelude::*};
use libp2p::{build_multiaddr, PeerId};
use log::trace;
use log::{error, trace};
use parking_lot::Mutex;
use sc_block_builder::{BlockBuilder, BlockBuilderProvider};
use sc_client_api::{
Expand Down Expand Up @@ -84,7 +77,7 @@ pub use substrate_test_runtime_client::{
runtime::{Block, Extrinsic, Hash, Transfer},
TestClient, TestClientBuilder, TestClientBuilderExt,
};
use tokio::time::timeout;
use tokio::{task::JoinSet, time::timeout};

type AuthorityId = sp_consensus_babe::AuthorityId;

Expand Down Expand Up @@ -731,7 +724,7 @@ where
{
type Verifier: 'static + Verifier<Block>;
type BlockImport: BlockImport<Block, Error = ConsensusError> + Clone + Send + Sync + 'static;
type PeerData: Default;
type PeerData: Default + Send;

/// This one needs to be implemented!
fn make_verifier(&self, client: PeersClient, peer_data: &Self::PeerData) -> Self::Verifier;
Expand All @@ -743,6 +736,9 @@ where
&mut self,
closure: F,
);
async fn mut_peers_async<F>(&mut self, async_closure: F)
where
F: FnOnce(&mut Vec<Peer<Self::PeerData, Self::BlockImport>>) -> BoxFuture<'_, ()> + Send;

/// Get custom block import handle for fresh client, along with peer data.
fn make_block_import(
Expand Down Expand Up @@ -976,114 +972,106 @@ where
tokio::spawn(f);
}

/// Polls the testnet until all nodes are in sync.
///
/// Must be executed in a task context.
fn poll_until_sync(&mut self, cx: &mut FutureContext) -> Poll<()> {
self.poll(cx);

// Return `NotReady` if there's a mismatch in the highest block number.
let mut highest = None;
for peer in self.peers().iter() {
if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 {
return Poll::Pending
}
if peer.network.num_sync_requests() != 0 {
return Poll::Pending
}
match (highest, peer.client.info().best_hash) {
(None, b) => highest = Some(b),
(Some(ref a), ref b) if a == b => {},
(Some(_), _) => return Poll::Pending,
/// Runs the testnet until all nodes are in sync.
async fn run_until_sync_indefinitely(&mut self) {
'outer: loop {
self.next_action().await;

// Continue advancing if there's a mismatch in the highest block number.
let mut highest = None;
for peer in self.peers().iter() {
if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 {
continue 'outer
}
if peer.network.num_sync_requests() != 0 {
continue 'outer
}
match (highest, peer.client.info().best_hash) {
(None, b) => highest = Some(b),
(Some(ref a), ref b) if a == b => {},
(Some(_), _) => continue 'outer,
}
}
break
}
Poll::Ready(())
}

/// Polls the testnet until theres' no activiy of any kind.
///
/// Must be executed in a task context.
fn poll_until_idle(&mut self, cx: &mut FutureContext) -> Poll<()> {
self.poll(cx);
/// Runs the testnet until theres' no activiy of any kind.
async fn run_until_idle(&mut self) {
'outer: loop {
self.next_action().await;

for peer in self.peers().iter() {
if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 {
return Poll::Pending
}
if peer.network.num_sync_requests() != 0 {
return Poll::Pending
for peer in self.peers().iter() {
if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 {
continue 'outer
}
if peer.network.num_sync_requests() != 0 {
continue 'outer
}
}
}

Poll::Ready(())
break
}
}

/// Polls the testnet until all peers are connected to each other.
///
/// Must be executed in a task context.
fn poll_until_connected(&mut self, cx: &mut FutureContext) -> Poll<()> {
self.poll(cx);
/// Runs the testnet until all peers are connected to each other.
async fn run_until_connected(&mut self) {
loop {
self.next_action().await;

let num_peers = self.peers().len();
if self.peers().iter().all(|p| p.num_peers() == num_peers - 1) {
return Poll::Ready(())
let num_peers = self.peers().len();
if self.peers().iter().all(|p| p.num_peers() == num_peers - 1) {
break
}
}

Poll::Pending
}

/// Run the network until we are sync'ed.
/// Run the testnet until we are sync'ed (with a timeout).
///
/// Calls `poll_until_sync` repeatedly.
/// Calls `run_until_sync_indefinitely` with a timeout.
/// (If we've not synced within 10 mins then panic rather than hang.)
async fn run_until_sync(&mut self) {
timeout(
Duration::from_secs(10 * 60),
futures::future::poll_fn::<(), _>(|cx| self.poll_until_sync(cx)),
)
.await
.expect("sync didn't happen within 10 mins");
}

/// Run the network until there are no pending packets.
///
/// Calls `poll_until_idle` repeatedly with the runtime passed as parameter.
async fn run_until_idle(&mut self) {
futures::future::poll_fn::<(), _>(|cx| self.poll_until_idle(cx)).await;
}

/// Run the network until all peers are connected to each other.
///
/// Calls `poll_until_connected` repeatedly with the runtime passed as parameter.
async fn run_until_connected(&mut self) {
futures::future::poll_fn::<(), _>(|cx| self.poll_until_connected(cx)).await;
}

/// Polls the testnet. Processes all the pending actions.
fn poll(&mut self, cx: &mut FutureContext) {
self.mut_peers(|peers| {
for (i, peer) in peers.iter_mut().enumerate() {
trace!(target: "sync", "-- Polling {}: {}", i, peer.id());
if let Poll::Ready(()) = peer.network.poll_unpin(cx) {
panic!("NetworkWorker has terminated unexpectedly.")
}
trace!(target: "sync", "-- Polling complete {}: {}", i, peer.id());

// We poll `imported_blocks_stream`.
while let Poll::Ready(Some(notification)) =
peer.imported_blocks_stream.as_mut().poll_next(cx)
{
peer.network.service().announce_block(notification.hash, None);
}

// We poll `finality_notification_stream`.
while let Poll::Ready(Some(notification)) =
peer.finality_notification_stream.as_mut().poll_next(cx)
{
peer.network.on_block_finalized(notification.hash, notification.header);
}
}
});
timeout(Duration::from_secs(10 * 60), self.run_until_sync_indefinitely())
.await
.expect("sync didn't happen within 10 mins");
}

/// Advances the testnet. Processes all the pending actions.
async fn next_action(&mut self) {
self.mut_peers_async(|peers| {
Box::pin(async move {
tokio_scoped::scope(|scope| {
for (i, peer) in peers.iter_mut().enumerate() {
scope.spawn({
async move {
trace!(target: "sync", "-- Next action {}: {}", i, peer.id());

timeout(Duration::from_secs(1), peer.network.next_action()).await;
//peer.network.next_action().await;
trace!(target: "sync", "-- Next action complete {}: {}", i, peer.id());

// Get next element from `imported_blocks_stream` if it's available.
while let Some(Some(notification)) =
peer.imported_blocks_stream.next().now_or_never()
{
peer.network.service().announce_block(notification.hash, None);
}

// Get next element from `finality_notification_stream` if it's
// available.
while let Some(Some(notification)) =
peer.finality_notification_stream.next().now_or_never()
{
peer.network
.on_block_finalized(notification.hash, notification.header);
}
}
});
}
})
})
})
.await;
}
}

Expand All @@ -1092,6 +1080,7 @@ pub struct TestNet {
peers: Vec<Peer<(), PeersClient>>,
}

#[async_trait::async_trait]
impl TestNetFactory for TestNet {
type Verifier = PassThroughVerifier;
type PeerData = ();
Expand Down Expand Up @@ -1123,6 +1112,13 @@ impl TestNetFactory for TestNet {
fn mut_peers<F: FnOnce(&mut Vec<Peer<(), Self::BlockImport>>)>(&mut self, closure: F) {
closure(&mut self.peers);
}

async fn mut_peers_async<F>(&mut self, closure: F)
where
F: FnOnce(&mut Vec<Peer<(), Self::BlockImport>>) -> BoxFuture<'_, ()> + Send,
{
closure(&mut self.peers).await
}
}

pub struct ForceFinalized(PeersClient);
Expand Down Expand Up @@ -1150,6 +1146,7 @@ impl JustificationImport<Block> for ForceFinalized {
#[derive(Default)]
pub struct JustificationTestNet(TestNet);

#[async_trait::async_trait]
impl TestNetFactory for JustificationTestNet {
type Verifier = PassThroughVerifier;
type PeerData = ();
Expand All @@ -1174,6 +1171,13 @@ impl TestNetFactory for JustificationTestNet {
self.0.mut_peers(closure)
}

async fn mut_peers_async<F>(&mut self, closure: F)
where
F: FnOnce(&mut Vec<Peer<(), Self::BlockImport>>) -> BoxFuture<'_, ()> + Send,
{
self.0.mut_peers_async(closure).await
}

fn make_block_import(
&self,
client: PeersClient,
Expand Down
Loading

0 comments on commit 4b5d851

Please sign in to comment.