diff --git a/Cargo.lock b/Cargo.lock index e4e3e1b84625c..8328185442f19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8635,6 +8635,7 @@ dependencies = [ "substrate-test-runtime", "substrate-test-runtime-client", "tokio", + "tokio-scoped", ] [[package]] @@ -11059,6 +11060,16 @@ dependencies = [ "webpki 0.22.0", ] +[[package]] +name = "tokio-scoped" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4beb8ba13bc53ac53ce1d52b42f02e5d8060f0f42138862869beb769722b256" +dependencies = [ + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-stream" version = "0.1.11" diff --git a/client/network/test/Cargo.toml b/client/network/test/Cargo.toml index 8368fa278712a..a6f1997f28364 100644 --- a/client/network/test/Cargo.toml +++ b/client/network/test/Cargo.toml @@ -14,6 +14,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] tokio = "1.22.0" +tokio-scoped = "0.2.0" async-trait = "0.1.57" futures = "0.3.21" futures-timer = "3.0.1" diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index b3653ac7c0f88..364f2430653e4 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -22,18 +22,11 @@ mod block_import; #[cfg(test)] mod sync; -use std::{ - collections::HashMap, - marker::PhantomData, - pin::Pin, - sync::Arc, - task::{Context as FutureContext, Poll}, - time::Duration, -}; +use std::{collections::HashMap, marker::PhantomData, pin::Pin, sync::Arc, time::Duration}; use futures::{future::BoxFuture, prelude::*}; use libp2p::{build_multiaddr, PeerId}; -use log::trace; +use log::{error, trace}; use parking_lot::Mutex; use sc_block_builder::{BlockBuilder, BlockBuilderProvider}; use sc_client_api::{ @@ -84,7 +77,7 @@ pub use substrate_test_runtime_client::{ runtime::{Block, Extrinsic, Hash, Transfer}, TestClient, TestClientBuilder, TestClientBuilderExt, }; -use tokio::time::timeout; +use tokio::{task::JoinSet, time::timeout}; type AuthorityId = sp_consensus_babe::AuthorityId; @@ -731,7 +724,7 @@ where { type Verifier: 'static + Verifier; type BlockImport: BlockImport + Clone + Send + Sync + 'static; - type PeerData: Default; + type PeerData: Default + Send; /// This one needs to be implemented! fn make_verifier(&self, client: PeersClient, peer_data: &Self::PeerData) -> Self::Verifier; @@ -743,6 +736,9 @@ where &mut self, closure: F, ); + async fn mut_peers_async(&mut self, async_closure: F) + where + F: FnOnce(&mut Vec>) -> BoxFuture<'_, ()> + Send; /// Get custom block import handle for fresh client, along with peer data. fn make_block_import( @@ -976,114 +972,106 @@ where tokio::spawn(f); } - /// Polls the testnet until all nodes are in sync. - /// - /// Must be executed in a task context. - fn poll_until_sync(&mut self, cx: &mut FutureContext) -> Poll<()> { - self.poll(cx); - - // Return `NotReady` if there's a mismatch in the highest block number. - let mut highest = None; - for peer in self.peers().iter() { - if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 { - return Poll::Pending - } - if peer.network.num_sync_requests() != 0 { - return Poll::Pending - } - match (highest, peer.client.info().best_hash) { - (None, b) => highest = Some(b), - (Some(ref a), ref b) if a == b => {}, - (Some(_), _) => return Poll::Pending, + /// Runs the testnet until all nodes are in sync. + async fn run_until_sync_indefinitely(&mut self) { + 'outer: loop { + self.next_action().await; + + // Continue advancing if there's a mismatch in the highest block number. + let mut highest = None; + for peer in self.peers().iter() { + if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 { + continue 'outer + } + if peer.network.num_sync_requests() != 0 { + continue 'outer + } + match (highest, peer.client.info().best_hash) { + (None, b) => highest = Some(b), + (Some(ref a), ref b) if a == b => {}, + (Some(_), _) => continue 'outer, + } } + break } - Poll::Ready(()) } - /// Polls the testnet until theres' no activiy of any kind. - /// - /// Must be executed in a task context. - fn poll_until_idle(&mut self, cx: &mut FutureContext) -> Poll<()> { - self.poll(cx); + /// Runs the testnet until theres' no activiy of any kind. + async fn run_until_idle(&mut self) { + 'outer: loop { + self.next_action().await; - for peer in self.peers().iter() { - if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 { - return Poll::Pending - } - if peer.network.num_sync_requests() != 0 { - return Poll::Pending + for peer in self.peers().iter() { + if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 { + continue 'outer + } + if peer.network.num_sync_requests() != 0 { + continue 'outer + } } - } - Poll::Ready(()) + break + } } - /// Polls the testnet until all peers are connected to each other. - /// - /// Must be executed in a task context. - fn poll_until_connected(&mut self, cx: &mut FutureContext) -> Poll<()> { - self.poll(cx); + /// Runs the testnet until all peers are connected to each other. + async fn run_until_connected(&mut self) { + loop { + self.next_action().await; - let num_peers = self.peers().len(); - if self.peers().iter().all(|p| p.num_peers() == num_peers - 1) { - return Poll::Ready(()) + let num_peers = self.peers().len(); + if self.peers().iter().all(|p| p.num_peers() == num_peers - 1) { + break + } } - - Poll::Pending } - /// Run the network until we are sync'ed. + /// Run the testnet until we are sync'ed (with a timeout). /// - /// Calls `poll_until_sync` repeatedly. + /// Calls `run_until_sync_indefinitely` with a timeout. /// (If we've not synced within 10 mins then panic rather than hang.) async fn run_until_sync(&mut self) { - timeout( - Duration::from_secs(10 * 60), - futures::future::poll_fn::<(), _>(|cx| self.poll_until_sync(cx)), - ) - .await - .expect("sync didn't happen within 10 mins"); - } - - /// Run the network until there are no pending packets. - /// - /// Calls `poll_until_idle` repeatedly with the runtime passed as parameter. - async fn run_until_idle(&mut self) { - futures::future::poll_fn::<(), _>(|cx| self.poll_until_idle(cx)).await; - } - - /// Run the network until all peers are connected to each other. - /// - /// Calls `poll_until_connected` repeatedly with the runtime passed as parameter. - async fn run_until_connected(&mut self) { - futures::future::poll_fn::<(), _>(|cx| self.poll_until_connected(cx)).await; - } - - /// Polls the testnet. Processes all the pending actions. - fn poll(&mut self, cx: &mut FutureContext) { - self.mut_peers(|peers| { - for (i, peer) in peers.iter_mut().enumerate() { - trace!(target: "sync", "-- Polling {}: {}", i, peer.id()); - if let Poll::Ready(()) = peer.network.poll_unpin(cx) { - panic!("NetworkWorker has terminated unexpectedly.") - } - trace!(target: "sync", "-- Polling complete {}: {}", i, peer.id()); - - // We poll `imported_blocks_stream`. - while let Poll::Ready(Some(notification)) = - peer.imported_blocks_stream.as_mut().poll_next(cx) - { - peer.network.service().announce_block(notification.hash, None); - } - - // We poll `finality_notification_stream`. - while let Poll::Ready(Some(notification)) = - peer.finality_notification_stream.as_mut().poll_next(cx) - { - peer.network.on_block_finalized(notification.hash, notification.header); - } - } - }); + timeout(Duration::from_secs(10 * 60), self.run_until_sync_indefinitely()) + .await + .expect("sync didn't happen within 10 mins"); + } + + /// Advances the testnet. Processes all the pending actions. + async fn next_action(&mut self) { + self.mut_peers_async(|peers| { + Box::pin(async move { + tokio_scoped::scope(|scope| { + for (i, peer) in peers.iter_mut().enumerate() { + scope.spawn({ + async move { + trace!(target: "sync", "-- Next action {}: {}", i, peer.id()); + + timeout(Duration::from_secs(1), peer.network.next_action()).await; + //peer.network.next_action().await; + trace!(target: "sync", "-- Next action complete {}: {}", i, peer.id()); + + // Get next element from `imported_blocks_stream` if it's available. + while let Some(Some(notification)) = + peer.imported_blocks_stream.next().now_or_never() + { + peer.network.service().announce_block(notification.hash, None); + } + + // Get next element from `finality_notification_stream` if it's + // available. + while let Some(Some(notification)) = + peer.finality_notification_stream.next().now_or_never() + { + peer.network + .on_block_finalized(notification.hash, notification.header); + } + } + }); + } + }) + }) + }) + .await; } } @@ -1092,6 +1080,7 @@ pub struct TestNet { peers: Vec>, } +#[async_trait::async_trait] impl TestNetFactory for TestNet { type Verifier = PassThroughVerifier; type PeerData = (); @@ -1123,6 +1112,13 @@ impl TestNetFactory for TestNet { fn mut_peers>)>(&mut self, closure: F) { closure(&mut self.peers); } + + async fn mut_peers_async(&mut self, closure: F) + where + F: FnOnce(&mut Vec>) -> BoxFuture<'_, ()> + Send, + { + closure(&mut self.peers).await + } } pub struct ForceFinalized(PeersClient); @@ -1150,6 +1146,7 @@ impl JustificationImport for ForceFinalized { #[derive(Default)] pub struct JustificationTestNet(TestNet); +#[async_trait::async_trait] impl TestNetFactory for JustificationTestNet { type Verifier = PassThroughVerifier; type PeerData = (); @@ -1174,6 +1171,13 @@ impl TestNetFactory for JustificationTestNet { self.0.mut_peers(closure) } + async fn mut_peers_async(&mut self, closure: F) + where + F: FnOnce(&mut Vec>) -> BoxFuture<'_, ()> + Send, + { + self.0.mut_peers_async(closure).await + } + fn make_block_import( &self, client: PeersClient, diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index b629574fe9874..913ec7eb4a971 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -44,16 +44,15 @@ async fn sync_peers_works() { sp_tracing::try_init_simple(); let mut net = TestNet::new(3); - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); + loop { + net.next_action().await; for peer in 0..3 { if net.peer(peer).num_peers() != 2 { - return Poll::Pending + continue } } - Poll::Ready(()) - }) - .await; + break + } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -69,49 +68,44 @@ async fn sync_cycle_from_offline_to_syncing_to_offline() { // Generate blocks. net.peer(2).push_blocks(100, false); - // Block until all nodes are online and nodes 0 and 1 and major syncing. - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); + // Run until all nodes are online and nodes 0 and 1 are major syncing. + loop { + net.next_action().await; for peer in 0..3 { // Online if net.peer(peer).is_offline() { - return Poll::Pending + continue } if peer < 2 { // Major syncing. if net.peer(peer).blocks_count() < 100 && !net.peer(peer).is_major_syncing() { - return Poll::Pending + continue } } } - Poll::Ready(()) - }) - .await; + break + } - // Block until all nodes are done syncing. - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); + // Run until all nodes are done syncing. + loop { + net.next_action().await; for peer in 0..3 { if net.peer(peer).is_major_syncing() { - return Poll::Pending + continue } } - Poll::Ready(()) - }) - .await; + break + } // Now drop nodes 1 and 2, and check that node 0 is offline. net.peers.remove(2); net.peers.remove(1); - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); - if !net.peer(0).is_offline() { - Poll::Pending - } else { - Poll::Ready(()) + loop { + net.next_action().await; + if net.peer(0).is_offline() { + break } - }) - .await; + } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -126,28 +120,22 @@ async fn syncing_node_not_major_syncing_when_disconnected() { assert!(!net.peer(1).is_major_syncing()); // Check that we switch to major syncing. - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); - if !net.peer(1).is_major_syncing() { - Poll::Pending - } else { - Poll::Ready(()) + loop { + net.next_action().await; + if net.peer(1).is_major_syncing() { + break } - }) - .await; + } // Destroy two nodes, and check that we switch to non-major syncing. net.peers.remove(2); net.peers.remove(0); - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); - if net.peer(0).is_major_syncing() { - Poll::Pending - } else { - Poll::Ready(()) + loop { + net.next_action().await; + if !net.peer(0).is_major_syncing() { + break } - }) - .await; + } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -233,15 +221,12 @@ async fn sync_no_common_longer_chain_fails() { let mut net = TestNet::new(3); net.peer(0).push_blocks(20, true); net.peer(1).push_blocks(20, false); - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); - if net.peer(0).is_major_syncing() { - Poll::Pending - } else { - Poll::Ready(()) + loop { + net.next_action().await; + if !net.peer(0).is_major_syncing() { + break } - }) - .await; + } let peer1 = &net.peers()[1]; assert!(!net.peers()[0].blockchain_canon_equals(peer1)); } @@ -276,25 +261,24 @@ async fn sync_justifications() { net.peer(1).request_justification(&hashof15, 15); net.peer(1).request_justification(&hashof20, 20); - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); + loop { + net.next_action().await; for height in (10..21).step_by(5) { if net.peer(0).client().justifications(hashes[height - 1]).unwrap() != Some(Justifications::from((*b"FRNK", Vec::new()))) { - return Poll::Pending + continue } if net.peer(1).client().justifications(hashes[height - 1]).unwrap() != Some(Justifications::from((*b"FRNK", Vec::new()))) { - return Poll::Pending + continue } } - Poll::Ready(()) - }) - .await; + break + } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -317,20 +301,17 @@ async fn sync_justifications_across_forks() { net.peer(1).request_justification(&f1_best, 10); net.peer(1).request_justification(&f2_best, 11); - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); + loop { + net.next_action().await; if net.peer(0).client().justifications(f1_best).unwrap() == Some(Justifications::from((*b"FRNK", Vec::new()))) && net.peer(1).client().justifications(f1_best).unwrap() == Some(Justifications::from((*b"FRNK", Vec::new()))) { - Poll::Ready(()) - } else { - Poll::Pending + break } - }) - .await; + } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -411,16 +392,13 @@ async fn can_sync_small_non_best_forks() { assert!(net.peer(0).client().header(small_hash).unwrap().is_some()); assert!(net.peer(1).client().header(small_hash).unwrap().is_none()); - // poll until the two nodes connect, otherwise announcing the block will not work - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); - if net.peer(0).num_peers() == 0 { - Poll::Pending - } else { - Poll::Ready(()) + // Run until the two nodes connect, otherwise announcing the block will not work. + loop { + net.next_action().await; + if net.peer(0).num_peers() != 0 { + break } - }) - .await; + } // synchronization: 0 synced to longer chain and 1 didn't sync to small chain. @@ -433,28 +411,24 @@ async fn can_sync_small_non_best_forks() { // after announcing, peer 1 downloads the block. - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); + loop { + net.next_action().await; assert!(net.peer(0).client().header(small_hash).unwrap().is_some()); - if net.peer(1).client().header(small_hash).unwrap().is_none() { - return Poll::Pending + if net.peer(1).client().header(small_hash).unwrap().is_some() { + break } - Poll::Ready(()) - }) - .await; + } net.run_until_sync().await; let another_fork = net.peer(0).push_blocks_at(BlockId::Number(35), 2, true).pop().unwrap(); net.peer(0).announce_block(another_fork, None); - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); - if net.peer(1).client().header(another_fork).unwrap().is_none() { - return Poll::Pending + loop { + net.next_action().await; + if net.peer(1).client().header(another_fork).unwrap().is_some() { + break } - Poll::Ready(()) - }) - .await; + } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -483,15 +457,13 @@ async fn can_sync_forks_ahead_of_the_best_chain() { assert_eq!(net.peer(1).client().info().best_number, 2); // after announcing, peer 1 downloads the block. - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); + loop { + net.next_action().await; - if net.peer(1).client().header(fork_hash).unwrap().is_none() { - return Poll::Pending + if net.peer(1).client().header(fork_hash).unwrap().is_some() { + break } - Poll::Ready(()) - }) - .await; + } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -515,16 +487,13 @@ async fn can_sync_explicit_forks() { assert!(net.peer(0).client().header(small_hash).unwrap().is_some()); assert!(net.peer(1).client().header(small_hash).unwrap().is_none()); - // poll until the two nodes connect, otherwise announcing the block will not work - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); - if net.peer(0).num_peers() == 0 || net.peer(1).num_peers() == 0 { - Poll::Pending - } else { - Poll::Ready(()) + // Run until the two nodes connect, otherwise announcing the block will not work. + loop { + net.next_action().await; + if net.peer(0).num_peers() == 1 && net.peer(1).num_peers() == 1 { + break } - }) - .await; + } // synchronization: 0 synced to longer chain and 1 didn't sync to small chain. @@ -538,16 +507,14 @@ async fn can_sync_explicit_forks() { net.peer(1).set_sync_fork_request(vec![first_peer_id], small_hash, small_number); // peer 1 downloads the block. - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); + loop { + net.next_action().await; assert!(net.peer(0).client().header(small_hash).unwrap().is_some()); - if net.peer(1).client().header(small_hash).unwrap().is_none() { - return Poll::Pending + if net.peer(1).client().header(small_hash).unwrap().is_some() { + break } - Poll::Ready(()) - }) - .await; + } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -584,21 +551,13 @@ async fn does_not_sync_announced_old_best_block() { net.peer(1).push_blocks(20, true); net.peer(0).announce_block(old_hash, None); - futures::future::poll_fn::<(), _>(|cx| { - // poll once to import announcement - net.poll(cx); - Poll::Ready(()) - }) - .await; + // Drive network once to import announcement. + net.next_action().await; assert!(!net.peer(1).is_major_syncing()); net.peer(0).announce_block(old_hash_with_parent, None); - futures::future::poll_fn::<(), _>(|cx| { - // poll once to import announcement - net.poll(cx); - Poll::Ready(()) - }) - .await; + // Drive network once to import announcement. + net.next_action().await; assert!(!net.peer(1).is_major_syncing()); } @@ -610,15 +569,12 @@ async fn full_sync_requires_block_body() { net.peer(0).push_headers(1); // Wait for nodes to connect - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); - if net.peer(0).num_peers() == 0 || net.peer(1).num_peers() == 0 { - Poll::Pending - } else { - Poll::Ready(()) + loop { + net.next_action().await; + if net.peer(0).num_peers() == 1 && net.peer(1).num_peers() == 1 { + break } - }) - .await; + } net.run_until_idle().await; assert_eq!(net.peer(1).client.info().best_number, 0); } @@ -632,15 +588,12 @@ async fn imports_stale_once() { net.peer(0).announce_block(hash, None); net.peer(0).announce_block(hash, None); - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); + loop { + net.next_action().await; if net.peer(1).client().header(hash).unwrap().is_some() { - Poll::Ready(()) - } else { - Poll::Pending + break } - }) - .await; + } } // given the network with 2 full nodes @@ -829,15 +782,12 @@ async fn sync_to_tip_requires_that_sync_protocol_is_informed_about_best_block() ..Default::default() }); - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); + loop { + net.next_action().await; if net.peer(2).has_block(block_hash) { - Poll::Ready(()) - } else { - Poll::Pending + break } - }) - .await; + } // However peer 1 should still not have the block. assert!(!net.peer(1).has_block(block_hash)); @@ -914,18 +864,15 @@ async fn block_announce_data_is_propagated() { }); // Wait until peer 1 is connected to both nodes. - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); + loop { + net.next_action().await; if net.peer(1).num_peers() == 2 && net.peer(0).num_peers() == 1 && net.peer(2).num_peers() == 1 { - Poll::Ready(()) - } else { - Poll::Pending + break } - }) - .await; + } let block_hash = net .peer(0) @@ -1017,18 +964,15 @@ async fn multiple_requests_are_accepted_as_long_as_they_are_not_fulfilled() { .finalize_block(hashof10, Some((*b"FRNK", Vec::new())), true) .unwrap(); - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); + loop { + net.next_action().await; - if net.peer(1).client().justifications(hashof10).unwrap() != + if net.peer(1).client().justifications(hashof10).unwrap() == Some(Justifications::from((*b"FRNK", Vec::new()))) { - return Poll::Pending + break } - - Poll::Ready(()) - }) - .await; + } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -1045,14 +989,12 @@ async fn syncs_all_forks_from_single_peer() { let branch1 = net.peer(0).push_blocks_at(BlockId::Number(10), 2, true).pop().unwrap(); // Wait till peer 1 starts downloading - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); - if net.peer(1).network().best_seen_block() != Some(12) { - return Poll::Pending + loop { + net.next_action().await; + if net.peer(1).network().best_seen_block() == Some(12) { + break } - Poll::Ready(()) - }) - .await; + } // Peer 0 produces and announces another fork let branch2 = net.peer(0).push_blocks_at(BlockId::Number(10), 2, false).pop().unwrap(); @@ -1140,26 +1082,20 @@ async fn syncs_state() { let hashof60 = hashes[59]; net.peer(1).client().finalize_block(hashof60, Some(just), true).unwrap(); // Wait for state sync. - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); + loop { + net.next_action().await; if net.peer(1).client.info().finalized_state.is_some() { - Poll::Ready(()) - } else { - Poll::Pending + break } - }) - .await; + } assert!(!net.peer(1).client().has_state_at(&BlockId::Number(64))); // Wait for the rest of the states to be imported. - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); + loop { + net.next_action().await; if net.peer(1).client().has_state_at(&BlockId::Number(64)) { - Poll::Ready(()) - } else { - Poll::Pending + break } - }) - .await; + } } } @@ -1238,15 +1174,12 @@ async fn warp_sync() { assert!(net.peer(3).client().has_state_at(&BlockId::Number(64))); // Wait for peer 1 download block history - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); + loop { + net.next_action().await; if net.peer(3).has_body(gap_end) && net.peer(3).has_body(target) { - Poll::Ready(()) - } else { - Poll::Pending + break } - }) - .await; + } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)]