Skip to content

Commit

Permalink
BlockId removal: tx-pool refactor (#1678)
Browse files Browse the repository at this point in the history
It changes following APIs:
- trait `ChainApi`
-- `validate_transaction`

- trait `TransactionPool` 
--`submit_at`
--`submit_one`
--`submit_and_watch`

and some implementation details, in particular:
- impl `Pool` 
--`submit_at`
--`resubmit_at`
--`submit_one`
--`submit_and_watch`
--`prune_known`
--`prune`
--`prune_tags`
--`resolve_block_number`
--`verify`
--`verify_one`

- revalidation queue

All tests are also adjusted.

---------

Co-authored-by: command-bot <>
Co-authored-by: Bastian Köcher <git@kchr.de>
  • Loading branch information
michalkucharczyk and bkchr authored Sep 27, 2023
1 parent a846b74 commit ab3a3bc
Show file tree
Hide file tree
Showing 20 changed files with 609 additions and 460 deletions.
4 changes: 2 additions & 2 deletions cumulus/test/service/benches/transaction_throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use cumulus_test_runtime::{AccountId, BalancesCall, ExistentialDeposit, SudoCall
use futures::{future, StreamExt};
use sc_transaction_pool_api::{TransactionPool as _, TransactionSource, TransactionStatus};
use sp_core::{crypto::Pair, sr25519};
use sp_runtime::{generic::BlockId, OpaqueExtrinsic};
use sp_runtime::OpaqueExtrinsic;

use cumulus_primitives_core::ParaId;
use cumulus_test_service::{
Expand Down Expand Up @@ -117,7 +117,7 @@ async fn submit_tx_and_wait_for_inclusion(
let best_hash = client.chain_info().best_hash;

let mut watch = tx_pool
.submit_and_watch(&BlockId::Hash(best_hash), TransactionSource::External, tx.clone())
.submit_and_watch(best_hash, TransactionSource::External, tx.clone())
.await
.expect("Submits tx to pool")
.fuse();
Expand Down
8 changes: 4 additions & 4 deletions substrate/bin/node/bench/src/construct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use sc_transaction_pool_api::{
};
use sp_consensus::{Environment, Proposer};
use sp_inherents::InherentDataProvider;
use sp_runtime::{generic::BlockId, traits::NumberFor, OpaqueExtrinsic};
use sp_runtime::{traits::NumberFor, OpaqueExtrinsic};

use crate::{
common::SizeType,
Expand Down Expand Up @@ -233,7 +233,7 @@ impl sc_transaction_pool_api::TransactionPool for Transactions {
/// Returns a future that imports a bunch of unverified transactions to the pool.
fn submit_at(
&self,
_at: &BlockId<Self::Block>,
_at: Self::Hash,
_source: TransactionSource,
_xts: Vec<TransactionFor<Self>>,
) -> PoolFuture<Vec<Result<node_primitives::Hash, Self::Error>>, Self::Error> {
Expand All @@ -243,7 +243,7 @@ impl sc_transaction_pool_api::TransactionPool for Transactions {
/// Returns a future that imports one unverified transaction to the pool.
fn submit_one(
&self,
_at: &BlockId<Self::Block>,
_at: Self::Hash,
_source: TransactionSource,
_xt: TransactionFor<Self>,
) -> PoolFuture<TxHash<Self>, Self::Error> {
Expand All @@ -252,7 +252,7 @@ impl sc_transaction_pool_api::TransactionPool for Transactions {

fn submit_and_watch(
&self,
_at: &BlockId<Self::Block>,
_at: Self::Hash,
_source: TransactionSource,
_xt: TransactionFor<Self>,
) -> PoolFuture<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
Expand Down
6 changes: 3 additions & 3 deletions substrate/bin/node/bench/src/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use node_testing::bench::{BenchDb, BlockType, DatabaseType, KeyTypes};

use sc_transaction_pool::BasicPool;
use sc_transaction_pool_api::{TransactionPool, TransactionSource};
use sp_runtime::generic::BlockId;

use crate::core::{self, Mode, Path};

Expand Down Expand Up @@ -58,10 +57,11 @@ impl core::BenchmarkDescription for PoolBenchmarkDescription {
impl core::Benchmark for PoolBenchmark {
fn run(&mut self, mode: Mode) -> std::time::Duration {
let context = self.database.create_context();
let genesis_hash = context.client.chain_info().genesis_hash;

let _ = context
.client
.runtime_version_at(context.client.chain_info().genesis_hash)
.runtime_version_at(genesis_hash)
.expect("Failed to get runtime version")
.spec_version;

Expand Down Expand Up @@ -90,7 +90,7 @@ impl core::Benchmark for PoolBenchmark {
let start = std::time::Instant::now();
let submissions = generated_transactions
.into_iter()
.map(|tx| txpool.submit_one(&BlockId::Number(0), TransactionSource::External, tx));
.map(|tx| txpool.submit_one(genesis_hash, TransactionSource::External, tx));
futures::executor::block_on(futures::future::join_all(submissions));
let elapsed = start.elapsed();

Expand Down
4 changes: 2 additions & 2 deletions substrate/bin/node/cli/benches/transaction_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use sc_transaction_pool::PoolLimit;
use sc_transaction_pool_api::{TransactionPool as _, TransactionSource, TransactionStatus};
use sp_core::{crypto::Pair, sr25519};
use sp_keyring::Sr25519Keyring;
use sp_runtime::{generic::BlockId, OpaqueExtrinsic};
use sp_runtime::OpaqueExtrinsic;
use tokio::runtime::Handle;

fn new_node(tokio_handle: Handle) -> node_cli::service::NewFullBase {
Expand Down Expand Up @@ -191,7 +191,7 @@ async fn submit_tx_and_wait_for_inclusion(
let best_hash = client.chain_info().best_hash;

let mut watch = tx_pool
.submit_and_watch(&BlockId::Hash(best_hash), TransactionSource::External, tx.clone())
.submit_and_watch(best_hash, TransactionSource::External, tx.clone())
.await
.expect("Submits tx to pool")
.fuse();
Expand Down
47 changes: 20 additions & 27 deletions substrate/client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,8 +642,8 @@ mod tests {
client.clone(),
);

block_on(txpool.submit_at(&BlockId::number(0), SOURCE, vec![extrinsic(0), extrinsic(1)]))
.unwrap();
let hashof0 = client.info().genesis_hash;
block_on(txpool.submit_at(hashof0, SOURCE, vec![extrinsic(0), extrinsic(1)])).unwrap();

block_on(
txpool.maintain(chain_event(
Expand All @@ -658,7 +658,7 @@ mod tests {

let cell = Mutex::new((false, time::Instant::now()));
let proposer = proposer_factory.init_with_now(
&client.expect_header(client.info().genesis_hash).unwrap(),
&client.expect_header(hashof0).unwrap(),
Box::new(move || {
let mut value = cell.lock();
if !value.0 {
Expand Down Expand Up @@ -736,7 +736,7 @@ mod tests {

let genesis_hash = client.info().best_hash;

block_on(txpool.submit_at(&BlockId::number(0), SOURCE, vec![extrinsic(0)])).unwrap();
block_on(txpool.submit_at(genesis_hash, SOURCE, vec![extrinsic(0)])).unwrap();

block_on(
txpool.maintain(chain_event(
Expand Down Expand Up @@ -800,7 +800,7 @@ mod tests {
};

block_on(txpool.submit_at(
&BlockId::number(0),
client.info().genesis_hash,
SOURCE,
vec![medium(0), medium(1), huge(2), medium(3), huge(4), medium(5), medium(6)],
))
Expand Down Expand Up @@ -897,9 +897,8 @@ mod tests {
spawner.clone(),
client.clone(),
);
let genesis_header = client
.expect_header(client.info().genesis_hash)
.expect("there should be header");
let genesis_hash = client.info().genesis_hash;
let genesis_header = client.expect_header(genesis_hash).expect("there should be header");

let extrinsics_num = 5;
let extrinsics = std::iter::once(
Expand All @@ -922,7 +921,7 @@ mod tests {
.sum::<usize>() +
Vec::<Extrinsic>::new().encoded_size();

block_on(txpool.submit_at(&BlockId::number(0), SOURCE, extrinsics.clone())).unwrap();
block_on(txpool.submit_at(genesis_hash, SOURCE, extrinsics.clone())).unwrap();

block_on(txpool.maintain(chain_event(genesis_header.clone())));

Expand Down Expand Up @@ -999,6 +998,7 @@ mod tests {
spawner.clone(),
client.clone(),
);
let genesis_hash = client.info().genesis_hash;

let tiny = |nonce| {
ExtrinsicBuilder::new_fill_block(Perbill::from_parts(TINY)).nonce(nonce).build()
Expand All @@ -1011,7 +1011,7 @@ mod tests {

block_on(
txpool.submit_at(
&BlockId::number(0),
genesis_hash,
SOURCE,
// add 2 * MAX_SKIPPED_TRANSACTIONS that exhaust resources
(0..MAX_SKIPPED_TRANSACTIONS * 2)
Expand All @@ -1024,21 +1024,17 @@ mod tests {
)
.unwrap();

block_on(
txpool.maintain(chain_event(
client
.expect_header(client.info().genesis_hash)
.expect("there should be header"),
)),
);
block_on(txpool.maintain(chain_event(
client.expect_header(genesis_hash).expect("there should be header"),
)));
assert_eq!(txpool.ready().count(), MAX_SKIPPED_TRANSACTIONS * 3);

let mut proposer_factory =
ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None);

let cell = Mutex::new(time::Instant::now());
let proposer = proposer_factory.init_with_now(
&client.expect_header(client.info().genesis_hash).unwrap(),
&client.expect_header(genesis_hash).unwrap(),
Box::new(move || {
let mut value = cell.lock();
let old = *value;
Expand Down Expand Up @@ -1071,6 +1067,7 @@ mod tests {
spawner.clone(),
client.clone(),
);
let genesis_hash = client.info().genesis_hash;

let tiny = |who| {
ExtrinsicBuilder::new_fill_block(Perbill::from_parts(TINY))
Expand All @@ -1086,7 +1083,7 @@ mod tests {

block_on(
txpool.submit_at(
&BlockId::number(0),
genesis_hash,
SOURCE,
(0..MAX_SKIPPED_TRANSACTIONS + 2)
.into_iter()
Expand All @@ -1098,13 +1095,9 @@ mod tests {
)
.unwrap();

block_on(
txpool.maintain(chain_event(
client
.expect_header(client.info().genesis_hash)
.expect("there should be header"),
)),
);
block_on(txpool.maintain(chain_event(
client.expect_header(genesis_hash).expect("there should be header"),
)));
assert_eq!(txpool.ready().count(), MAX_SKIPPED_TRANSACTIONS * 2 + 4);

let mut proposer_factory =
Expand All @@ -1114,7 +1107,7 @@ mod tests {
let cell = Arc::new(Mutex::new((0, time::Instant::now())));
let cell2 = cell.clone();
let proposer = proposer_factory.init_with_now(
&client.expect_header(client.info().genesis_hash).unwrap(),
&client.expect_header(genesis_hash).unwrap(),
Box::new(move || {
let mut value = cell.lock();
let (called, old) = *value;
Expand Down
23 changes: 13 additions & 10 deletions substrate/client/consensus/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ mod tests {
use sc_transaction_pool::{BasicPool, FullChainApi, Options, RevalidationType};
use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool, TransactionSource};
use sp_inherents::InherentData;
use sp_runtime::generic::{BlockId, Digest, DigestItem};
use sp_runtime::generic::{Digest, DigestItem};
use substrate_test_runtime_client::{
AccountKeyring::*, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
};
Expand Down Expand Up @@ -400,10 +400,11 @@ mod tests {
let client = Arc::new(client);
let spawner = sp_core::testing::TaskExecutor::new();
let genesis_hash = client.info().genesis_hash;
let pool_api = Arc::new(FullChainApi::new(client.clone(), None, &spawner.clone()));
let pool = Arc::new(BasicPool::with_revalidation_type(
Options::default(),
true.into(),
api(),
pool_api,
None,
RevalidationType::Full,
spawner.clone(),
Expand Down Expand Up @@ -444,7 +445,7 @@ mod tests {
rt.block_on(future);
});
// submit a transaction to pool.
let result = pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Alice, 0)).await;
let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await;
// assert that it was successfully imported
assert!(result.is_ok());
// assert that the background task returns ok
Expand Down Expand Up @@ -475,10 +476,11 @@ mod tests {
let client = Arc::new(client);
let spawner = sp_core::testing::TaskExecutor::new();
let genesis_hash = client.info().genesis_hash;
let pool_api = Arc::new(FullChainApi::new(client.clone(), None, &spawner.clone()));
let pool = Arc::new(BasicPool::with_revalidation_type(
Options::default(),
true.into(),
api(),
pool_api,
None,
RevalidationType::Full,
spawner.clone(),
Expand Down Expand Up @@ -535,7 +537,7 @@ mod tests {

let mut finality_stream = client.finality_notification_stream();
// submit a transaction to pool.
let result = pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Alice, 0)).await;
let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await;
// assert that it was successfully imported
assert!(result.is_ok());
// assert that the background task returns ok
Expand Down Expand Up @@ -571,10 +573,11 @@ mod tests {
let client = Arc::new(client);
let spawner = sp_core::testing::TaskExecutor::new();
let genesis_hash = client.info().genesis_hash;
let pool_api = Arc::new(FullChainApi::new(client.clone(), None, &spawner.clone()));
let pool = Arc::new(BasicPool::with_revalidation_type(
Options::default(),
true.into(),
api(),
pool_api,
None,
RevalidationType::Full,
spawner.clone(),
Expand Down Expand Up @@ -602,7 +605,7 @@ mod tests {
rt.block_on(future);
});
// submit a transaction to pool.
let result = pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Alice, 0)).await;
let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await;
// assert that it was successfully imported
assert!(result.is_ok());
let (tx, rx) = futures::channel::oneshot::channel();
Expand Down Expand Up @@ -688,7 +691,7 @@ mod tests {
rt.block_on(future);
});
// submit a transaction to pool.
let result = pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Alice, 0)).await;
let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await;
// assert that it was successfully imported
assert!(result.is_ok());

Expand Down Expand Up @@ -719,7 +722,7 @@ mod tests {
}
);

assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Alice, 1)).await.is_ok());
assert!(pool.submit_one(created_block.hash, SOURCE, uxt(Alice, 1)).await.is_ok());

let header = client.header(created_block.hash).expect("db error").expect("imported above");
assert_eq!(header.number, 1);
Expand All @@ -741,7 +744,7 @@ mod tests {
.is_ok());
assert_matches::assert_matches!(rx1.await.expect("should be no error receiving"), Ok(_));

assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Bob, 0)).await.is_ok());
assert!(pool.submit_one(created_block.hash, SOURCE, uxt(Bob, 0)).await.is_ok());
let (tx2, rx2) = futures::channel::oneshot::channel();
assert!(sink
.send(EngineCommand::SealNewBlock {
Expand Down
8 changes: 2 additions & 6 deletions substrate/client/rpc-spec-v2/src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use std::sync::Arc;
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_core::Bytes;
use sp_runtime::{generic, traits::Block as BlockT};
use sp_runtime::traits::Block as BlockT;

use codec::Decode;
use futures::{FutureExt, StreamExt, TryFutureExt};
Expand Down Expand Up @@ -110,11 +110,7 @@ where

let submit = self
.pool
.submit_and_watch(
&generic::BlockId::hash(best_block_hash),
TX_SOURCE,
decoded_extrinsic,
)
.submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic)
.map_err(|e| {
e.into_pool_error()
.map(Error::from)
Expand Down
Loading

0 comments on commit ab3a3bc

Please sign in to comment.