From bde0bbe5017a9c08a1a62e04de71e2add54af5ae Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Mon, 12 Feb 2024 15:23:55 +0200 Subject: [PATCH] rpc-v2/tx: Implement `transaction_unstable_broadcast` and `transaction_unstable_stop` (#3079) This PR implements the [transaction_unstable_broadcast](https://github.com/paritytech/json-rpc-interface-spec/blob/main/src/api/transaction_unstable_broadcast.md) and [transaction_unstable_stop](https://github.com/paritytech/json-rpc-interface-spec/blob/main/src/api/transaction_unstable_stop.md). The [transaction_unstable_broadcast](https://github.com/paritytech/json-rpc-interface-spec/blob/main/src/api/transaction_unstable_broadcast.md) submits the provided transaction at the best block of the chain. If the transaction is dropped or declared invalid, the API tries to resubmit the transaction at the next available best block. ### Broadcasting The broadcasting operation continues until either: - the user called `transaction_unstable_stop` with the operation ID that identifies the broadcasting operation - the transaction state is one of the following: - Finalized: the transaction is part of the chain - FinalizedTimeout: we have waited for 256 finalized blocks and timedout - Usurped the transaction has been replaced in the tx pool The broadcasting retires to submit the transaction when the transaction state is: - Invalid: the transaction might become valid at a later time - Dropped: the transaction pool's capacity is full at the moment, but might clear when other transactions are finalized/dropped ### Stopping The `transaction_unstable_broadcast` spawns an abortable future and tracks the abort handler. When the [transaction_unstable_stop](https://github.com/paritytech/json-rpc-interface-spec/blob/main/src/api/transaction_unstable_stop.md) is called with a valid operation ID; the abort handler of the corresponding `transaction_unstable_broadcast` future is called. This behavior ensures the broadcast future is finishes on the next polling. When the `transaction_unstable_stop` is called with an invalid operation ID, an invalid jsonrpc specific error object is returned. ### Testing This PR adds the testing harness of the transaction API and validates two basic scenarios: - transaction enters and exits the transaction pool - transaction stop returns appropriate values when called with valid and invalid operation IDs Closes: https://github.com/paritytech/polkadot-sdk/issues/3039 Note that the API should be enabled after: https://github.com/paritytech/polkadot-sdk/issues/3084. cc @paritytech/subxt-team --------- Signed-off-by: Alexandru Vasile Co-authored-by: Sebastian Kunert --- Cargo.lock | 3 + prdoc/pr_3079.prdoc | 15 ++ substrate/client/rpc-spec-v2/Cargo.toml | 3 + .../client/rpc-spec-v2/src/chain_head/mod.rs | 2 +- .../client/rpc-spec-v2/src/transaction/api.rs | 27 +- .../rpc-spec-v2/src/transaction/error.rs | 27 ++ .../client/rpc-spec-v2/src/transaction/mod.rs | 7 +- .../rpc-spec-v2/src/transaction/tests.rs | 238 +++++++++++++++++ .../src/transaction/transaction.rs | 13 +- .../src/transaction/transaction_broadcast.rs | 251 ++++++++++++++++++ 10 files changed, 574 insertions(+), 12 deletions(-) create mode 100644 prdoc/pr_3079.prdoc create mode 100644 substrate/client/rpc-spec-v2/src/transaction/tests.rs create mode 100644 substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs diff --git a/Cargo.lock b/Cargo.lock index f5b272d11756..68d399f78a77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16433,11 +16433,13 @@ dependencies = [ "parity-scale-codec", "parking_lot 0.12.1", "pretty_assertions", + "rand", "sc-block-builder", "sc-chain-spec", "sc-client-api", "sc-rpc", "sc-service", + "sc-transaction-pool", "sc-transaction-pool-api", "sc-utils", "serde", @@ -16453,6 +16455,7 @@ dependencies = [ "sp-version", "substrate-test-runtime", "substrate-test-runtime-client", + "substrate-test-runtime-transaction-pool", "thiserror", "tokio", "tokio-stream", diff --git a/prdoc/pr_3079.prdoc b/prdoc/pr_3079.prdoc new file mode 100644 index 000000000000..c745c1ffbfe5 --- /dev/null +++ b/prdoc/pr_3079.prdoc @@ -0,0 +1,15 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Implement transaction_unstable_broadcast and transaction_unstable_stop + +doc: + - audience: Node Dev + description: | + A new RPC class is added to handle transactions. The `transaction_unstable_broadcast` broadcasts + the provided transaction to the peers of the node, until the `transaction_unstable_stop` is called. + The APIs are marked as unstable and subject to change in the future. + To know if the transaction was added to the chain, users can decode the bodies of announced finalized blocks. + This is a low-level approach for `transactionWatch_unstable_submitAndWatch`. + +crates: [ ] diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml index 12a02e0b4508..1b7870764dc3 100644 --- a/substrate/client/rpc-spec-v2/Cargo.toml +++ b/substrate/client/rpc-spec-v2/Cargo.toml @@ -41,12 +41,14 @@ tokio = { version = "1.22.0", features = ["sync"] } array-bytes = "6.1" log = { workspace = true, default-features = true } futures-util = { version = "0.3.30", default-features = false } +rand = "0.8.5" [dev-dependencies] serde_json = "1.0.111" tokio = { version = "1.22.0", features = ["macros"] } substrate-test-runtime-client = { path = "../../test-utils/runtime/client" } substrate-test-runtime = { path = "../../test-utils/runtime" } +substrate-test-runtime-transaction-pool = { path = "../../test-utils/runtime/transaction-pool" } sp-consensus = { path = "../../primitives/consensus/common" } sp-externalities = { path = "../../primitives/externalities" } sp-maybe-compressed-blob = { path = "../../primitives/maybe-compressed-blob" } @@ -54,3 +56,4 @@ sc-block-builder = { path = "../block-builder" } sc-service = { path = "../service", features = ["test-helpers"] } assert_matches = "1.3.0" pretty_assertions = "1.2.1" +sc-transaction-pool = { path = "../transaction-pool" } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs index 4cbbd00f64f3..c9fe19aca2b1 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs @@ -23,7 +23,7 @@ //! Methods are prefixed by `chainHead`. #[cfg(test)] -mod test_utils; +pub mod test_utils; #[cfg(test)] mod tests; diff --git a/substrate/client/rpc-spec-v2/src/transaction/api.rs b/substrate/client/rpc-spec-v2/src/transaction/api.rs index 53c83b662a35..33af9c953338 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/api.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/api.rs @@ -18,8 +18,8 @@ //! API trait for transactions. -use crate::transaction::event::TransactionEvent; -use jsonrpsee::proc_macros::rpc; +use crate::transaction::{error::ErrorBroadcast, event::TransactionEvent}; +use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use sp_core::Bytes; #[rpc(client, server)] @@ -28,6 +28,10 @@ pub trait TransactionApi { /// /// See [`TransactionEvent`](crate::transaction::event::TransactionEvent) for details on /// transaction life cycle. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. #[subscription( name = "transactionWatch_unstable_submitAndWatch" => "transactionWatch_unstable_watchEvent", unsubscribe = "transactionWatch_unstable_unwatch", @@ -35,3 +39,22 @@ pub trait TransactionApi { )] fn submit_and_watch(&self, bytes: Bytes); } + +#[rpc(client, server)] +pub trait TransactionBroadcastApi { + /// Broadcast an extrinsic to the chain. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[method(name = "transaction_unstable_broadcast")] + fn broadcast(&self, bytes: Bytes) -> RpcResult>; + + /// Broadcast an extrinsic to the chain. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[method(name = "transaction_unstable_stop")] + fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>; +} diff --git a/substrate/client/rpc-spec-v2/src/transaction/error.rs b/substrate/client/rpc-spec-v2/src/transaction/error.rs index d2de07afd595..116977af6600 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/error.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/error.rs @@ -21,6 +21,7 @@ //! Errors are interpreted as transaction events for subscriptions. use crate::transaction::event::{TransactionError, TransactionEvent}; +use jsonrpsee::types::error::ErrorObject; use sc_transaction_pool_api::error::Error as PoolError; use sp_runtime::transaction_validity::InvalidTransaction; @@ -98,3 +99,29 @@ impl From for TransactionEvent { } } } + +/// TransactionBroadcast error. +#[derive(Debug, thiserror::Error)] +pub enum ErrorBroadcast { + /// The provided operation ID is invalid. + #[error("Invalid operation id")] + InvalidOperationID, +} + +/// General purpose errors, as defined in +/// . +pub mod json_rpc_spec { + /// Invalid parameter error. + pub const INVALID_PARAM_ERROR: i32 = -32602; +} + +impl From for ErrorObject<'static> { + fn from(e: ErrorBroadcast) -> Self { + let msg = e.to_string(); + + match e { + ErrorBroadcast::InvalidOperationID => + ErrorObject::owned(json_rpc_spec::INVALID_PARAM_ERROR, msg, None::<()>), + } + } +} diff --git a/substrate/client/rpc-spec-v2/src/transaction/mod.rs b/substrate/client/rpc-spec-v2/src/transaction/mod.rs index 212912ba1c72..74268a5372a3 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/mod.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/mod.rs @@ -25,14 +25,19 @@ //! //! Methods are prefixed by `transaction`. +#[cfg(test)] +mod tests; + pub mod api; pub mod error; pub mod event; pub mod transaction; +pub mod transaction_broadcast; -pub use api::TransactionApiServer; +pub use api::{TransactionApiServer, TransactionBroadcastApiServer}; pub use event::{ TransactionBlock, TransactionBroadcasted, TransactionDropped, TransactionError, TransactionEvent, }; pub use transaction::Transaction; +pub use transaction_broadcast::TransactionBroadcast; diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests.rs new file mode 100644 index 000000000000..45477494768a --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/transaction/tests.rs @@ -0,0 +1,238 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use super::*; +use crate::{ + chain_head::test_utils::ChainHeadMockClient, hex_string, + transaction::TransactionBroadcast as RpcTransactionBroadcast, +}; +use assert_matches::assert_matches; +use codec::Encode; +use futures::Future; +use jsonrpsee::{core::error::Error, rpc_params, RpcModule}; +use sc_transaction_pool::*; +use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool}; +use sp_core::{testing::TaskExecutor, traits::SpawnNamed}; +use std::{pin::Pin, sync::Arc, time::Duration}; +use substrate_test_runtime_client::{prelude::*, AccountKeyring::*, Client}; +use substrate_test_runtime_transaction_pool::{uxt, TestApi}; +use tokio::sync::mpsc; + +type Block = substrate_test_runtime_client::runtime::Block; + +/// Wrap the `TaskExecutor` to know when the broadcast future is dropped. +#[derive(Clone)] +struct TaskExecutorBroadcast { + executor: TaskExecutor, + sender: mpsc::UnboundedSender<()>, +} + +/// The channel that receives events when the broadcast futures are dropped. +type TaskExecutorRecv = mpsc::UnboundedReceiver<()>; + +impl TaskExecutorBroadcast { + /// Construct a new `TaskExecutorBroadcast` and a receiver to know when the broadcast futures + /// are dropped. + fn new() -> (Self, TaskExecutorRecv) { + let (sender, recv) = mpsc::unbounded_channel(); + + (Self { executor: TaskExecutor::new(), sender }, recv) + } +} + +impl SpawnNamed for TaskExecutorBroadcast { + fn spawn( + &self, + name: &'static str, + group: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + let sender = self.sender.clone(); + let future = Box::pin(async move { + future.await; + let _ = sender.send(()); + }); + + self.executor.spawn(name, group, future) + } + + fn spawn_blocking( + &self, + name: &'static str, + group: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + let sender = self.sender.clone(); + let future = Box::pin(async move { + future.await; + let _ = sender.send(()); + }); + + self.executor.spawn_blocking(name, group, future) + } +} + +/// Initial Alice account nonce. +const ALICE_NONCE: u64 = 209; + +fn create_basic_pool_with_genesis( + test_api: Arc, +) -> (BasicPool, Pin + Send>>) { + let genesis_hash = { + test_api + .chain() + .read() + .block_by_number + .get(&0) + .map(|blocks| blocks[0].0.header.hash()) + .expect("there is block 0. qed") + }; + BasicPool::new_test(test_api, genesis_hash, genesis_hash) +} + +fn maintained_pool() -> (BasicPool, Arc, futures::executor::ThreadPool) { + let api = Arc::new(TestApi::with_alice_nonce(ALICE_NONCE)); + let (pool, background_task) = create_basic_pool_with_genesis(api.clone()); + + let thread_pool = futures::executor::ThreadPool::new().unwrap(); + thread_pool.spawn_ok(background_task); + (pool, api, thread_pool) +} + +fn setup_api() -> ( + Arc, + Arc>, + Arc>>, + RpcModule< + TransactionBroadcast, ChainHeadMockClient>>, + >, + TaskExecutorRecv, +) { + let (pool, api, _) = maintained_pool(); + let pool = Arc::new(pool); + + let builder = TestClientBuilder::new(); + let client = Arc::new(builder.build()); + let client_mock = Arc::new(ChainHeadMockClient::new(client.clone())); + + let (task_executor, executor_recv) = TaskExecutorBroadcast::new(); + + let tx_api = + RpcTransactionBroadcast::new(client_mock.clone(), pool.clone(), Arc::new(task_executor)) + .into_rpc(); + + (api, pool, client_mock, tx_api, executor_recv) +} + +#[tokio::test] +async fn tx_broadcast_enters_pool() { + let (api, pool, client_mock, tx_api, _) = setup_api(); + + // Start at block 1. + let block_1_header = api.push_block(1, vec![], true); + + let uxt = uxt(Alice, ALICE_NONCE); + let xt = hex_string(&uxt.encode()); + + let operation_id: String = + tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap(); + + // Announce block 1 to `transaction_unstable_broadcast`. + client_mock.trigger_import_stream(block_1_header).await; + + // Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool. + + // TODO: Improve testability by extending the `transaction_unstable_broadcast` with + // a middleware trait that intercepts the transaction status for testing. + let mut num_retries = 12; + while num_retries > 0 && pool.status().ready != 1 { + tokio::time::sleep(Duration::from_secs(5)).await; + num_retries -= 1; + } + assert_eq!(1, pool.status().ready); + assert_eq!(uxt.encode().len(), pool.status().ready_bytes); + + // Import block 2 with the transaction included. + let block_2_header = api.push_block(2, vec![uxt.clone()], true); + let block_2 = block_2_header.hash(); + + // Announce block 2 to the pool. + let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None }; + pool.maintain(event).await; + + assert_eq!(0, pool.status().ready); + + // Stop call can still be made. + let _: () = tx_api + .call("transaction_unstable_stop", rpc_params![&operation_id]) + .await + .unwrap(); +} + +#[tokio::test] +async fn tx_broadcast_invalid_tx() { + let (_, pool, _, tx_api, mut exec_recv) = setup_api(); + + // Invalid parameters. + let err = tx_api + .call::<_, serde_json::Value>("transaction_unstable_broadcast", [1u8]) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid params" + ); + + assert_eq!(0, pool.status().ready); + + // Invalid transaction that cannot be decoded. The broadcast silently exits. + let xt = "0xdeadbeef"; + let operation_id: String = + tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap(); + + assert_eq!(0, pool.status().ready); + + // Await the broadcast future to exit. + // Without this we'd be subject to races, where we try to call the stop before the tx is + // dropped. + exec_recv.recv().await.unwrap(); + + // The broadcast future was dropped, and the operation is no longer active. + // When the operation is not active, either from the tx being finalized or a + // terminal error; the stop method should return an error. + let err = tx_api + .call::<_, serde_json::Value>("transaction_unstable_stop", rpc_params![&operation_id]) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id" + ); +} + +#[tokio::test] +async fn tx_invalid_stop() { + let (_, _, _, tx_api, _) = setup_api(); + + // Make an invalid stop call. + let err = tx_api + .call::<_, serde_json::Value>("transaction_unstable_stop", ["invalid_operation_id"]) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id" + ); +} diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs index b2cfa36c9c99..17889b3bad2a 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs @@ -29,21 +29,18 @@ use crate::{ }, SubscriptionTaskExecutor, }; +use codec::Decode; +use futures::{StreamExt, TryFutureExt}; use jsonrpsee::{core::async_trait, types::error::ErrorObject, PendingSubscriptionSink}; +use sc_rpc::utils::pipe_from_stream; use sc_transaction_pool_api::{ error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource, TransactionStatus, }; -use std::sync::Arc; - -use sc_rpc::utils::pipe_from_stream; -use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; use sp_core::Bytes; use sp_runtime::traits::Block as BlockT; - -use codec::Decode; -use futures::{StreamExt, TryFutureExt}; +use std::sync::Arc; /// An API for transaction RPC calls. pub struct Transaction { @@ -82,7 +79,7 @@ where Pool: TransactionPool + Sync + Send + 'static, Pool::Hash: Unpin, ::Hash: Unpin, - Client: HeaderBackend + ProvideRuntimeApi + Send + Sync + 'static, + Client: HeaderBackend + Send + Sync + 'static, { fn submit_and_watch(&self, pending: PendingSubscriptionSink, xt: Bytes) { let client = self.client.clone(); diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs new file mode 100644 index 000000000000..92c838261874 --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -0,0 +1,251 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! API implementation for broadcasting transactions. + +use crate::{transaction::api::TransactionBroadcastApiServer, SubscriptionTaskExecutor}; +use codec::Decode; +use futures::{FutureExt, Stream, StreamExt}; +use futures_util::stream::AbortHandle; +use jsonrpsee::core::{async_trait, RpcResult}; +use parking_lot::RwLock; +use rand::{distributions::Alphanumeric, Rng}; +use sc_client_api::BlockchainEvents; +use sc_transaction_pool_api::{ + error::IntoPoolError, TransactionFor, TransactionPool, TransactionSource, +}; +use sp_blockchain::HeaderBackend; +use sp_core::Bytes; +use sp_runtime::traits::Block as BlockT; +use std::{collections::HashMap, sync::Arc}; + +use super::error::ErrorBroadcast; + +/// An API for transaction RPC calls. +pub struct TransactionBroadcast { + /// Substrate client. + client: Arc, + /// Transactions pool. + pool: Arc, + /// Executor to spawn subscriptions. + executor: SubscriptionTaskExecutor, + /// The brodcast operation IDs. + broadcast_ids: Arc>>, +} + +/// The state of a broadcast operation. +struct BroadcastState { + /// Handle to abort the running future that broadcasts the transaction. + handle: AbortHandle, +} + +impl TransactionBroadcast { + /// Creates a new [`TransactionBroadcast`]. + pub fn new(client: Arc, pool: Arc, executor: SubscriptionTaskExecutor) -> Self { + TransactionBroadcast { client, pool, executor, broadcast_ids: Default::default() } + } + + /// Generate an unique operation ID for the `transaction_broadcast` RPC method. + pub fn generate_unique_id(&self) -> String { + let generate_operation_id = || { + // The length of the operation ID. + const OPERATION_ID_LEN: usize = 16; + + rand::thread_rng() + .sample_iter(Alphanumeric) + .take(OPERATION_ID_LEN) + .map(char::from) + .collect::() + }; + + let mut id = generate_operation_id(); + + let broadcast_ids = self.broadcast_ids.read(); + + while broadcast_ids.contains_key(&id) { + id = generate_operation_id(); + } + + id + } +} + +/// Currently we treat all RPC transactions as externals. +/// +/// Possibly in the future we could allow opt-in for special treatment +/// of such transactions, so that the block authors can inject +/// some unique transactions via RPC and have them included in the pool. +const TX_SOURCE: TransactionSource = TransactionSource::External; + +#[async_trait] +impl TransactionBroadcastApiServer for TransactionBroadcast +where + Pool: TransactionPool + Sync + Send + 'static, + Pool::Error: IntoPoolError, + ::Hash: Unpin, + Client: HeaderBackend + BlockchainEvents + Send + Sync + 'static, +{ + fn broadcast(&self, bytes: Bytes) -> RpcResult> { + let pool = self.pool.clone(); + + // The unique ID of this operation. + let id = self.generate_unique_id(); + + let mut best_block_import_stream = + Box::pin(self.client.import_notification_stream().filter_map( + |notification| async move { notification.is_new_best.then_some(notification.hash) }, + )); + + let broadcast_transaction_fut = async move { + // There is nothing we could do with an extrinsic of invalid format. + let Ok(decoded_extrinsic) = TransactionFor::::decode(&mut &bytes[..]) else { + return; + }; + + // Flag to determine if the we should broadcast the transaction again. + let mut is_done = false; + + while !is_done { + // Wait for the last block to become available. + let Some(best_block_hash) = + last_stream_element(&mut best_block_import_stream).await + else { + return; + }; + + let mut stream = match pool + .submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic.clone()) + .await + { + Ok(stream) => stream, + // The transaction was not included to the pool. + Err(e) => { + let Ok(pool_err) = e.into_pool_error() else { return }; + + if pool_err.is_retriable() { + // Try to resubmit the transaction at a later block for + // recoverable errors. + continue + } else { + return; + } + }, + }; + + while let Some(event) = stream.next().await { + // Check if the transaction could be submitted again + // at a later time. + if event.is_retriable() { + break; + } + + // Stop if this is the final event of the transaction stream + // and the event is not retriable. + if event.is_final() { + is_done = true; + break; + } + } + } + }; + + // Convert the future into an abortable future, for easily terminating it from the + // `transaction_stop` method. + let (fut, handle) = futures::future::abortable(broadcast_transaction_fut); + let broadcast_ids = self.broadcast_ids.clone(); + let drop_id = id.clone(); + // The future expected by the executor must be `Future` instead of + // `Future>`. + let fut = fut.map(move |_| { + // Remove the entry from the broadcast IDs map. + broadcast_ids.write().remove(&drop_id); + }); + + // Keep track of this entry and the abortable handle. + { + let mut broadcast_ids = self.broadcast_ids.write(); + broadcast_ids.insert(id.clone(), BroadcastState { handle }); + } + + sc_rpc::utils::spawn_subscription_task(&self.executor, fut); + + Ok(Some(id)) + } + + fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast> { + let mut broadcast_ids = self.broadcast_ids.write(); + + let Some(broadcast_state) = broadcast_ids.remove(&operation_id) else { + return Err(ErrorBroadcast::InvalidOperationID) + }; + + broadcast_state.handle.abort(); + + Ok(()) + } +} + +/// Returns the last element of the providided stream, or `None` if the stream is closed. +async fn last_stream_element(stream: &mut S) -> Option +where + S: Stream + Unpin, +{ + let Some(mut element) = stream.next().await else { return None }; + + // We are effectively polling the stream for the last available item at this time. + // The `now_or_never` returns `None` if the stream is `Pending`. + // + // If the stream contains `Hash0x1 Hash0x2 Hash0x3 Hash0x4`, we want only `Hash0x4`. + while let Some(next) = stream.next().now_or_never() { + let Some(next) = next else { + // Nothing to do if the stream terminated. + return None + }; + element = next; + } + + Some(element) +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio_stream::wrappers::ReceiverStream; + + #[tokio::test] + async fn check_last_stream_element() { + let (tx, rx) = tokio::sync::mpsc::channel(16); + + let mut stream = ReceiverStream::new(rx); + // Check the stream with one element queued. + tx.send(1).await.unwrap(); + assert_eq!(last_stream_element(&mut stream).await, Some(1)); + + // Check the stream with multiple elements. + tx.send(1).await.unwrap(); + tx.send(2).await.unwrap(); + tx.send(3).await.unwrap(); + assert_eq!(last_stream_element(&mut stream).await, Some(3)); + + // Drop the stream with some elements + tx.send(1).await.unwrap(); + tx.send(2).await.unwrap(); + drop(tx); + assert_eq!(last_stream_element(&mut stream).await, None); + } +}