From 0f9b4509d36e387ce62a90db3647282d66a8d160 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 8 Mar 2021 13:18:53 +0300 Subject: [PATCH] Relay Millau && Rialto headers using (future) finality verifier API (#761) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * finality proofs relay * SyncHeader::is_mandatory * empty ancestry proof * logs * fixed submit condition * fixed wrong split index * tick comment * recent_finality_proofs * basic finality loop tests * removed obsolete files * rename files in substrate relay * fmt * clippy * fixed TODOs * clippy * stop syncing if target node is out of sync * more clippy * more clippy * Update relays/finality-relay/src/finality_loop.rs Co-authored-by: Hernando Castano * Update relays/finality-relay/src/finality_loop.rs Co-authored-by: Hernando Castano * Update relays/finality-relay/src/finality_loop.rs Co-authored-by: Hernando Castano * docs * moved doc * typo * Update relays/finality-relay/src/finality_loop_tests.rs Co-authored-by: Hernando Castano * Update relays/finality-relay/src/finality_loop_tests.rs Co-authored-by: Hernando Castano * header_and_finality_proof_by_number -> header_and_finality_proof * VecDeque isn't required (because of make_contiguous) * fixed wrong expect * Update relays/finality-relay/src/finality_loop.rs Co-authored-by: Hernando Castano * Update relays/substrate/src/rialto_headers_to_millau.rs Co-authored-by: Hernando Castano * Update relays/substrate/src/rialto_headers_to_millau.rs Co-authored-by: Hernando Castano * RialtoSyncHeader * Update relays/finality-relay/src/finality_loop.rs Co-authored-by: Tomasz Drwięga * Update relays/finality-relay/src/finality_loop.rs Co-authored-by: Tomasz Drwięga * removed wrong comment * Update relays/finality-relay/src/finality_loop.rs Co-authored-by: Tomasz Drwięga * fix used runtime methods names * fix for new jsonrpsee * fix comment * initialize finality verifier pallet * fmt Co-authored-by: Hernando Castano Co-authored-by: Tomasz Drwięga --- bridges/bin/millau/runtime/src/lib.rs | 1 + bridges/bin/rialto/runtime/src/lib.rs | 1 + bridges/modules/substrate/src/lib.rs | 2 +- bridges/modules/substrate/src/verifier.rs | 21 +- bridges/primitives/header-chain/src/lib.rs | 20 +- bridges/primitives/millau/src/lib.rs | 13 +- bridges/primitives/rialto/src/lib.rs | 11 - bridges/relays/finality-relay/Cargo.toml | 20 + .../finality-relay/src/finality_loop.rs | 581 ++++++++++++++++++ .../finality-relay/src/finality_loop_tests.rs | 339 ++++++++++ bridges/relays/finality-relay/src/lib.rs | 60 ++ .../headers-relay/src/sync_loop_metrics.rs | 22 +- bridges/relays/substrate-client/Cargo.toml | 2 + bridges/relays/substrate-client/src/chain.rs | 18 +- .../substrate-client/src/finality_source.rs | 147 +++++ bridges/relays/substrate-client/src/guard.rs | 3 +- bridges/relays/substrate-client/src/lib.rs | 1 + .../substrate-client/src/sync_header.rs | 14 +- bridges/relays/substrate/Cargo.toml | 3 +- .../relays/substrate/src/finality_pipeline.rs | 130 ++++ .../relays/substrate/src/finality_target.rs | 91 +++ .../substrate/src/headers_initialize.rs | 8 +- .../relays/substrate/src/headers_maintain.rs | 458 -------------- .../relays/substrate/src/headers_pipeline.rs | 166 ----- .../relays/substrate/src/headers_target.rs | 168 ----- bridges/relays/substrate/src/main.rs | 9 +- .../relays/substrate/src/messages_target.rs | 2 +- .../substrate/src/millau_headers_to_rialto.rs | 58 +- .../src/millau_messages_to_rialto.rs | 4 +- .../substrate/src/rialto_headers_to_millau.rs | 57 +- .../src/rialto_messages_to_millau.rs | 4 +- 31 files changed, 1490 insertions(+), 944 deletions(-) create mode 100644 bridges/relays/finality-relay/Cargo.toml create mode 100644 bridges/relays/finality-relay/src/finality_loop.rs create mode 100644 bridges/relays/finality-relay/src/finality_loop_tests.rs create mode 100644 bridges/relays/finality-relay/src/lib.rs create mode 100644 bridges/relays/substrate-client/src/finality_source.rs create mode 100644 bridges/relays/substrate/src/finality_pipeline.rs create mode 100644 bridges/relays/substrate/src/finality_target.rs delete mode 100644 bridges/relays/substrate/src/headers_maintain.rs delete mode 100644 bridges/relays/substrate/src/headers_pipeline.rs delete mode 100644 bridges/relays/substrate/src/headers_target.rs diff --git a/bridges/bin/millau/runtime/src/lib.rs b/bridges/bin/millau/runtime/src/lib.rs index 090257419d549..ee6f1d483b0ea 100644 --- a/bridges/bin/millau/runtime/src/lib.rs +++ b/bridges/bin/millau/runtime/src/lib.rs @@ -61,6 +61,7 @@ pub use frame_support::{ pub use frame_system::Call as SystemCall; pub use pallet_balances::Call as BalancesCall; +pub use pallet_finality_verifier::Call as FinalityBridgeRialtoCall; pub use pallet_message_lane::Call as MessageLaneCall; pub use pallet_substrate_bridge::Call as BridgeRialtoCall; pub use pallet_sudo::Call as SudoCall; diff --git a/bridges/bin/rialto/runtime/src/lib.rs b/bridges/bin/rialto/runtime/src/lib.rs index eed087bdf82cf..9f289d2c87f79 100644 --- a/bridges/bin/rialto/runtime/src/lib.rs +++ b/bridges/bin/rialto/runtime/src/lib.rs @@ -69,6 +69,7 @@ pub use frame_system::Call as SystemCall; pub use pallet_balances::Call as BalancesCall; pub use pallet_bridge_currency_exchange::Call as BridgeCurrencyExchangeCall; pub use pallet_bridge_eth_poa::Call as BridgeEthPoACall; +pub use pallet_finality_verifier::Call as FinalityBridgeMillauCall; pub use pallet_message_lane::Call as MessageLaneCall; pub use pallet_substrate_bridge::Call as BridgeMillauCall; pub use pallet_sudo::Call as SudoCall; diff --git a/bridges/modules/substrate/src/lib.rs b/bridges/modules/substrate/src/lib.rs index 1ae23fdf91b1c..90213e5c9b9ab 100644 --- a/bridges/modules/substrate/src/lib.rs +++ b/bridges/modules/substrate/src/lib.rs @@ -417,7 +417,7 @@ where } else { // We don't have a scheduled change in storage at the moment. Let's check if the current // header signals an authority set change. - if let Some(change) = verifier::find_scheduled_change(&header) { + if let Some(change) = bp_header_chain::find_grandpa_authorities_scheduled_change(&header) { let next_set = AuthoritySet { authorities: change.next_authorities, set_id: storage.current_authority_set().set_id + 1, diff --git a/bridges/modules/substrate/src/verifier.rs b/bridges/modules/substrate/src/verifier.rs index 0c3bd1b5ddd49..effb5761b650c 100644 --- a/bridges/modules/substrate/src/verifier.rs +++ b/bridges/modules/substrate/src/verifier.rs @@ -25,10 +25,8 @@ use crate::storage::{ImportedHeader, ScheduledChange}; use crate::BridgeStorage; -use bp_header_chain::{justification::verify_justification, AuthoritySet}; +use bp_header_chain::{find_grandpa_authorities_scheduled_change, justification::verify_justification, AuthoritySet}; use finality_grandpa::voter_set::VoterSet; -use sp_finality_grandpa::{ConsensusLog, GRANDPA_ENGINE_ID}; -use sp_runtime::generic::OpaqueDigestItemId; use sp_runtime::traits::{CheckedAdd, Header as HeaderT, One}; use sp_runtime::RuntimeDebug; use sp_std::{prelude::Vec, vec}; @@ -142,7 +140,7 @@ where // time. While this is not strictly true of GRANDPA (it can have multiple pending changes, // even across forks), this assumption simplifies our tracking of authority set changes. let mut signal_hash = parent_header.signal_hash; - let scheduled_change = find_scheduled_change(&header); + let scheduled_change = find_grandpa_authorities_scheduled_change(&header); // Check if our fork is expecting an authority set change let requires_justification = if let Some(hash) = signal_hash { @@ -339,19 +337,6 @@ where Some(ancestors) } -pub(crate) fn find_scheduled_change(header: &H) -> Option> { - let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID); - - let filter_log = |log: ConsensusLog| match log { - ConsensusLog::ScheduledChange(change) => Some(change), - _ => None, - }; - - // find the first consensus digest with the right ID which converts to - // the right kind of consensus log. - header.digest().convert_first(|l| l.try_to(id).and_then(filter_log)) -} - #[cfg(test)] mod tests { use super::*; @@ -361,7 +346,7 @@ mod tests { use codec::Encode; use frame_support::{assert_err, assert_ok}; use frame_support::{StorageMap, StorageValue}; - use sp_finality_grandpa::{AuthorityId, SetId}; + use sp_finality_grandpa::{AuthorityId, ConsensusLog, SetId, GRANDPA_ENGINE_ID}; use sp_runtime::{Digest, DigestItem}; fn schedule_next_change( diff --git a/bridges/primitives/header-chain/src/lib.rs b/bridges/primitives/header-chain/src/lib.rs index a2d8574b6dc48..65181df316859 100644 --- a/bridges/primitives/header-chain/src/lib.rs +++ b/bridges/primitives/header-chain/src/lib.rs @@ -26,9 +26,9 @@ use core::default::Default; use core::fmt::Debug; #[cfg(feature = "std")] use serde::{Deserialize, Serialize}; -use sp_finality_grandpa::{AuthorityList, SetId}; -use sp_runtime::traits::Header as HeaderT; +use sp_finality_grandpa::{AuthorityList, ConsensusLog, SetId, GRANDPA_ENGINE_ID}; use sp_runtime::RuntimeDebug; +use sp_runtime::{generic::OpaqueDigestItemId, traits::Header as HeaderT}; use sp_std::vec::Vec; pub mod justification; @@ -140,6 +140,22 @@ impl AncestryChecker> for LinearAncestryChecker { } } +/// Find header digest that schedules next GRANDPA authorities set. +pub fn find_grandpa_authorities_scheduled_change( + header: &H, +) -> Option> { + let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID); + + let filter_log = |log: ConsensusLog| match log { + ConsensusLog::ScheduledChange(change) => Some(change), + _ => None, + }; + + // find the first consensus digest with the right ID which converts to + // the right kind of consensus log. + header.digest().convert_first(|l| l.try_to(id).and_then(filter_log)) +} + #[cfg(test)] mod tests { use super::*; diff --git a/bridges/primitives/millau/src/lib.rs b/bridges/primitives/millau/src/lib.rs index 23acef4701a65..afaf36de32e75 100644 --- a/bridges/primitives/millau/src/lib.rs +++ b/bridges/primitives/millau/src/lib.rs @@ -229,19 +229,8 @@ pub fn max_extrinsic_size() -> u32 { *BlockLength::get().max.get(DispatchClass::Normal) } -/// Name of the `MillauHeaderApi::best_block` runtime method. -pub const BEST_MILLAU_BLOCKS_METHOD: &str = "MillauHeaderApi_best_blocks"; -/// Name of the `MillauHeaderApi::finalized_block` runtime method. -pub const FINALIZED_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_finalized_block"; -/// Name of the `MillauHeaderApi::is_known_block` runtime method. -pub const IS_KNOWN_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_is_known_block"; -/// Name of the `MillauHeaderApi::incomplete_headers` runtime method. -pub const INCOMPLETE_MILLAU_HEADERS_METHOD: &str = "MillauHeaderApi_incomplete_headers"; - -/// Name of the `RialtoFinalityApi::best_finalized` runtime method. +/// Name of the `MillauFinalityApi::best_finalized` runtime method. pub const BEST_FINALIZED_MILLAU_HEADER_METHOD: &str = "MillauFinalityApi_best_finalized"; -/// Name of the `RialtoFinalityApi::is_known_header` runtime method. -pub const IS_KNOW_MILLAU_HEADER_METHOD: &str = "MillauFinalityApi_is_known_header"; /// Name of the `ToMillauOutboundLaneApi::estimate_message_delivery_and_dispatch_fee` runtime method. pub const TO_MILLAU_ESTIMATE_MESSAGE_FEE_METHOD: &str = diff --git a/bridges/primitives/rialto/src/lib.rs b/bridges/primitives/rialto/src/lib.rs index 856dd8b7fbfed..c598b7ee8379b 100644 --- a/bridges/primitives/rialto/src/lib.rs +++ b/bridges/primitives/rialto/src/lib.rs @@ -190,19 +190,8 @@ pub fn max_extrinsic_size() -> u32 { *BlockLength::get().max.get(DispatchClass::Normal) } -/// Name of the `RialtoHeaderApi::best_blocks` runtime method. -pub const BEST_RIALTO_BLOCKS_METHOD: &str = "RialtoHeaderApi_best_blocks"; -/// Name of the `RialtoHeaderApi::finalized_block` runtime method. -pub const FINALIZED_RIALTO_BLOCK_METHOD: &str = "RialtoHeaderApi_finalized_block"; -/// Name of the `RialtoHeaderApi::is_known_block` runtime method. -pub const IS_KNOWN_RIALTO_BLOCK_METHOD: &str = "RialtoHeaderApi_is_known_block"; -/// Name of the `RialtoHeaderApi::incomplete_headers` runtime method. -pub const INCOMPLETE_RIALTO_HEADERS_METHOD: &str = "RialtoHeaderApi_incomplete_headers"; - /// Name of the `RialtoFinalityApi::best_finalized` runtime method. pub const BEST_FINALIZED_RIALTO_HEADER_METHOD: &str = "RialtoFinalityApi_best_finalized"; -/// Name of the `RialtoFinalityApi::is_known_header` runtime method. -pub const IS_KNOW_RIALTO_HEADER_METHOD: &str = "RialtoFinalityApi_is_known_header"; /// Name of the `ToRialtoOutboundLaneApi::estimate_message_delivery_and_dispatch_fee` runtime method. pub const TO_RIALTO_ESTIMATE_MESSAGE_FEE_METHOD: &str = diff --git a/bridges/relays/finality-relay/Cargo.toml b/bridges/relays/finality-relay/Cargo.toml new file mode 100644 index 0000000000000..9667ba2fa674c --- /dev/null +++ b/bridges/relays/finality-relay/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "finality-relay" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" +license = "GPL-3.0-or-later WITH Classpath-exception-2.0" +description = "Finality proofs relay" + +[dependencies] +async-std = "1.6.5" +async-trait = "0.1.40" +backoff = "0.2" +futures = "0.3.5" +headers-relay = { path = "../headers-relay" } +log = "0.4.11" +num-traits = "0.2" +relay-utils = { path = "../utils" } + +[dev-dependencies] +parking_lot = "0.11.0" diff --git a/bridges/relays/finality-relay/src/finality_loop.rs b/bridges/relays/finality-relay/src/finality_loop.rs new file mode 100644 index 0000000000000..50c23b3757233 --- /dev/null +++ b/bridges/relays/finality-relay/src/finality_loop.rs @@ -0,0 +1,581 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! The loop basically reads all missing headers and their finality proofs from the source client. +//! The proof for the best possible header is then submitted to the target node. The only exception +//! is the mandatory headers, which we always submit to the target node. For such headers, we +//! assume that the persistent proof either exists, or will eventually become available. + +use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader}; + +use async_trait::async_trait; +use backoff::backoff::Backoff; +use futures::{select, Future, FutureExt, Stream, StreamExt}; +use headers_relay::sync_loop_metrics::SyncLoopMetrics; +use num_traits::{One, Saturating}; +use relay_utils::{ + metrics::{start as metrics_start, GlobalMetrics, MetricsParams}, + relay_loop::Client as RelayClient, + retry_backoff, FailedClient, MaybeConnectionError, +}; +use std::{ + pin::Pin, + time::{Duration, Instant}, +}; + +/// Finality proof synchronization loop parameters. +#[derive(Debug, Clone)] +pub struct FinalitySyncParams { + /// Interval at which we check updates on both clients. Normally should be larger than + /// `min(source_block_time, target_block_time)`. + /// + /// This parameter may be used to limit transactions rate. Increase the value && you'll get + /// infrequent updates => sparse headers => potential slow down of bridge applications, but pallet storage + /// won't be super large. Decrease the value to near `source_block_time` and you'll get + /// transaction for (almost) every block of the source chain => all source headers will be known + /// to the target chain => bridge applications will run faster, but pallet storage may explode + /// (but if pruning is there, then it's fine). + pub tick: Duration, + /// Number of finality proofs to keep in internal buffer between loop wakeups. + /// + /// While in "major syncing" state, we still read finality proofs from the stream. They're stored + /// in the internal buffer between loop wakeups. When we're close to the tip of the chain, we may + /// meet finality delays if headers are not finalized frequently. So instead of waiting for next + /// finality proof to appear in the stream, we may use existing proof from that buffer. + pub recent_finality_proofs_limit: usize, + /// Timeout before we treat our transactions as lost and restart the whole sync process. + pub stall_timeout: Duration, +} + +/// Source client used in finality synchronization loop. +#[async_trait] +pub trait SourceClient: RelayClient { + /// Stream of new finality proofs. The stream is allowed to miss proofs for some + /// headers, even if those headers are mandatory. + type FinalityProofsStream: Stream; + + /// Get best finalized block number. + async fn best_finalized_block_number(&self) -> Result; + + /// Get canonical header and its finality proof by number. + async fn header_and_finality_proof( + &self, + number: P::Number, + ) -> Result<(P::Header, Option), Self::Error>; + + /// Subscribe to new finality proofs. + async fn finality_proofs(&self) -> Result; +} + +/// Target client used in finality synchronization loop. +#[async_trait] +pub trait TargetClient: RelayClient { + /// Get best finalized source block number. + async fn best_finalized_source_block_number(&self) -> Result; + + /// Submit header finality proof. + async fn submit_finality_proof(&self, header: P::Header, proof: P::FinalityProof) -> Result<(), Self::Error>; +} + +/// Run finality proofs synchronization loop. +pub fn run( + source_client: impl SourceClient

, + target_client: impl TargetClient

, + sync_params: FinalitySyncParams, + metrics_params: Option, + exit_signal: impl Future, +) { + let exit_signal = exit_signal.shared(); + + let metrics_global = GlobalMetrics::default(); + let metrics_sync = SyncLoopMetrics::default(); + let metrics_enabled = metrics_params.is_some(); + metrics_start( + format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME), + metrics_params, + &metrics_global, + &metrics_sync, + ); + + relay_utils::relay_loop::run( + relay_utils::relay_loop::RECONNECT_DELAY, + source_client, + target_client, + |source_client, target_client| { + run_until_connection_lost( + source_client, + target_client, + sync_params.clone(), + if metrics_enabled { + Some(metrics_global.clone()) + } else { + None + }, + if metrics_enabled { + Some(metrics_sync.clone()) + } else { + None + }, + exit_signal.clone(), + ) + }, + ); +} + +/// Unjustified headers container. Ordered by header number. +pub(crate) type UnjustifiedHeaders

= Vec<

::Header>; +/// Finality proofs container. Ordered by target header number. +pub(crate) type FinalityProofs

= Vec<( +

::Number, +

::FinalityProof, +)>; + +/// Error that may happen inside finality synchronization loop. +#[derive(Debug)] +enum Error { + /// Source client request has failed with given error. + Source(SourceError), + /// Target client request has failed with given error. + Target(TargetError), + /// Finality proof for mandatory header is missing from the source node. + MissingMandatoryFinalityProof(P::Number), + /// The synchronization has stalled. + Stalled, +} + +impl Error +where + P: FinalitySyncPipeline, + SourceError: MaybeConnectionError, + TargetError: MaybeConnectionError, +{ + fn fail_if_connection_error(&self) -> Result<(), FailedClient> { + match *self { + Error::Source(ref error) if error.is_connection_error() => Err(FailedClient::Source), + Error::Target(ref error) if error.is_connection_error() => Err(FailedClient::Target), + Error::Stalled => Err(FailedClient::Both), + _ => Ok(()), + } + } +} + +/// Information about transaction that we have submitted. +#[derive(Debug, Clone)] +struct Transaction { + /// Time when we have submitted this transaction. + pub time: Instant, + /// The number of the header we have submitted. + pub submitted_header_number: Number, +} + +/// Finality proofs stream that may be restarted. +struct RestartableFinalityProofsStream { + /// Flag that the stream needs to be restarted. + needs_restart: bool, + /// The stream itself. + stream: Pin>, +} + +/// Finality synchronization loop state. +struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> { + /// Synchronization loop progress. + progress: &'a mut (Instant, Option), + /// Finality proofs stream. + finality_proofs_stream: &'a mut RestartableFinalityProofsStream, + /// Recent finality proofs that we have read from the stream. + recent_finality_proofs: &'a mut FinalityProofs

, + /// Last transaction that we have submitted to the target node. + last_transaction: Option>, +} + +async fn run_until_connection_lost( + source_client: impl SourceClient

, + target_client: impl TargetClient

, + sync_params: FinalitySyncParams, + metrics_global: Option, + metrics_sync: Option, + exit_signal: impl Future, +) -> Result<(), FailedClient> { + let restart_finality_proofs_stream = || async { + source_client.finality_proofs().await.map_err(|error| { + log::error!( + target: "bridge", + "Failed to subscribe to {} justifications: {:?}. Going to reconnect", + P::SOURCE_NAME, + error, + ); + + FailedClient::Source + }) + }; + + let exit_signal = exit_signal.fuse(); + futures::pin_mut!(exit_signal); + + let mut finality_proofs_stream = RestartableFinalityProofsStream { + needs_restart: false, + stream: Box::pin(restart_finality_proofs_stream().await?), + }; + let mut recent_finality_proofs = Vec::new(); + + let mut progress = (Instant::now(), None); + let mut retry_backoff = retry_backoff(); + let mut last_transaction = None; + + loop { + // run loop iteration + let iteration_result = run_loop_iteration( + &source_client, + &target_client, + FinalityLoopState { + progress: &mut progress, + finality_proofs_stream: &mut finality_proofs_stream, + recent_finality_proofs: &mut recent_finality_proofs, + last_transaction: last_transaction.clone(), + }, + &sync_params, + &metrics_sync, + ) + .await; + + // update global metrics + if let Some(ref metrics_global) = metrics_global { + metrics_global.update().await; + } + + // deal with errors + let next_tick = match iteration_result { + Ok(updated_last_transaction) => { + last_transaction = updated_last_transaction; + retry_backoff.reset(); + sync_params.tick + } + Err(error) => { + log::error!(target: "bridge", "Finality sync loop iteration has failed with error: {:?}", error); + error.fail_if_connection_error()?; + retry_backoff + .next_backoff() + .unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY) + } + }; + if finality_proofs_stream.needs_restart { + finality_proofs_stream.needs_restart = false; + finality_proofs_stream.stream = Box::pin(restart_finality_proofs_stream().await?); + } + + // wait till exit signal, or new source block + select! { + _ = async_std::task::sleep(next_tick).fuse() => {}, + _ = exit_signal => return Ok(()), + } + } +} + +async fn run_loop_iteration( + source_client: &SC, + target_client: &TC, + state: FinalityLoopState<'_, P, SC::FinalityProofsStream>, + sync_params: &FinalitySyncParams, + metrics_sync: &Option, +) -> Result>, Error> +where + P: FinalitySyncPipeline, + SC: SourceClient

, + TC: TargetClient

, +{ + // read best source headers ids from source and target nodes + let best_number_at_source = source_client + .best_finalized_block_number() + .await + .map_err(Error::Source)?; + let best_number_at_target = target_client + .best_finalized_source_block_number() + .await + .map_err(Error::Target)?; + if let Some(ref metrics_sync) = *metrics_sync { + metrics_sync.update_best_block_at_source(best_number_at_source); + metrics_sync.update_best_block_at_target(best_number_at_target); + } + *state.progress = print_sync_progress::

(*state.progress, best_number_at_source, best_number_at_target); + + // if we have already submitted header, then we just need to wait for it + // if we're waiting too much, then we believe our transaction has been lost and restart sync + if let Some(last_transaction) = state.last_transaction { + if best_number_at_target >= last_transaction.submitted_header_number { + // transaction has been mined && we can continue + } else if last_transaction.time.elapsed() > sync_params.stall_timeout { + log::error!( + target: "bridge", + "Finality synchronization from {} to {} has stalled. Going to restart", + P::SOURCE_NAME, + P::TARGET_NAME, + ); + + return Err(Error::Stalled); + } else { + return Ok(Some(last_transaction)); + } + } + + // submit new header if we have something new + match select_header_to_submit( + source_client, + target_client, + state.finality_proofs_stream, + state.recent_finality_proofs, + best_number_at_source, + best_number_at_target, + sync_params, + ) + .await? + { + Some((header, justification)) => { + let new_transaction = Transaction { + time: Instant::now(), + submitted_header_number: header.number(), + }; + + log::debug!( + target: "bridge", + "Going to submit finality proof of {} header #{:?} to {}", + P::SOURCE_NAME, + new_transaction.submitted_header_number, + P::TARGET_NAME, + ); + + target_client + .submit_finality_proof(header, justification) + .await + .map_err(Error::Target)?; + Ok(Some(new_transaction)) + } + None => Ok(None), + } +} + +async fn select_header_to_submit( + source_client: &SC, + _target_client: &TC, + finality_proofs_stream: &mut RestartableFinalityProofsStream, + recent_finality_proofs: &mut FinalityProofs

, + best_number_at_source: P::Number, + best_number_at_target: P::Number, + sync_params: &FinalitySyncParams, +) -> Result, Error> +where + P: FinalitySyncPipeline, + SC: SourceClient

, + TC: TargetClient

, +{ + let mut selected_finality_proof = None; + let mut unjustified_headers = Vec::new(); + + // to see that the loop is progressing + log::trace!( + target: "bridge", + "Considering range of headers ({:?}; {:?}]", + best_number_at_target, + best_number_at_source, + ); + + // read missing headers. if we see that the header schedules GRANDPA change, we need to + // submit this header + let mut header_number = best_number_at_target + One::one(); + while header_number <= best_number_at_source { + let (header, finality_proof) = source_client + .header_and_finality_proof(header_number) + .await + .map_err(Error::Source)?; + let is_mandatory = header.is_mandatory(); + + match (is_mandatory, finality_proof) { + (true, Some(finality_proof)) => { + log::trace!(target: "bridge", "Header {:?} is mandatory", header_number); + return Ok(Some((header, finality_proof))); + } + (true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())), + (false, Some(finality_proof)) => { + log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number); + selected_finality_proof = Some((header, finality_proof)); + prune_unjustified_headers::

(header_number, &mut unjustified_headers); + } + (false, None) => { + unjustified_headers.push(header); + } + } + + header_number = header_number + One::one(); + } + + // see if we can improve finality by using recent finality proofs + if !unjustified_headers.is_empty() && !recent_finality_proofs.is_empty() { + const NOT_EMPTY_PROOF: &str = "we have checked that the vec is not empty; qed"; + + // we need proofs for headers in range unjustified_range_begin..=unjustified_range_end + let unjustified_range_begin = unjustified_headers.first().expect(NOT_EMPTY_PROOF).number(); + let unjustified_range_end = unjustified_headers.last().expect(NOT_EMPTY_PROOF).number(); + + // we have proofs for headers in range buffered_range_begin..=buffered_range_end + let buffered_range_begin = recent_finality_proofs.first().expect(NOT_EMPTY_PROOF).0; + let buffered_range_end = recent_finality_proofs.last().expect(NOT_EMPTY_PROOF).0; + + // we have two ranges => find intersection + let intersection_begin = std::cmp::max(unjustified_range_begin, buffered_range_begin); + let intersection_end = std::cmp::min(unjustified_range_end, buffered_range_end); + let intersection = intersection_begin..=intersection_end; + + // find last proof from intersection + let selected_finality_proof_index = recent_finality_proofs + .binary_search_by_key(intersection.end(), |(number, _)| *number) + .unwrap_or_else(|index| index.saturating_sub(1)); + let (selected_header_number, finality_proof) = &recent_finality_proofs[selected_finality_proof_index]; + if intersection.contains(selected_header_number) { + // now remove all obsolete headers and extract selected header + let selected_header = prune_unjustified_headers::

(*selected_header_number, &mut unjustified_headers) + .expect("unjustified_headers contain all headers from intersection; qed"); + selected_finality_proof = Some((selected_header, finality_proof.clone())); + } + } + + // read all proofs from the stream, probably selecting updated proof that we're going to submit + loop { + let next_proof = finality_proofs_stream.stream.next(); + let finality_proof = match next_proof.now_or_never() { + Some(Some(finality_proof)) => finality_proof, + Some(None) => { + finality_proofs_stream.needs_restart = true; + break; + } + None => break, + }; + let finality_proof_target_header_number = match finality_proof.target_header_number() { + Some(target_header_number) => target_header_number, + None => { + continue; + } + }; + + let justified_header = + prune_unjustified_headers::

(finality_proof_target_header_number, &mut unjustified_headers); + if let Some(justified_header) = justified_header { + recent_finality_proofs.clear(); + selected_finality_proof = Some((justified_header, finality_proof)); + } else { + // the number of proofs read during single wakeup is expected to be low, so we aren't pruning + // `recent_finality_proofs` collection too often + recent_finality_proofs.push((finality_proof_target_header_number, finality_proof)); + } + } + + // remove obsolete 'recent' finality proofs + keep its size under certain limit + let oldest_finality_proof_to_keep = selected_finality_proof + .as_ref() + .map(|(header, _)| header.number()) + .unwrap_or(best_number_at_target); + prune_recent_finality_proofs::

( + oldest_finality_proof_to_keep, + recent_finality_proofs, + sync_params.recent_finality_proofs_limit, + ); + + Ok(selected_finality_proof) +} + +/// Remove headers from `unjustified_headers` collection with number lower or equal than `justified_header_number`. +/// +/// Returns the header that matches `justified_header_number` (if any). +pub(crate) fn prune_unjustified_headers( + justified_header_number: P::Number, + unjustified_headers: &mut UnjustifiedHeaders

, +) -> Option { + prune_ordered_vec(justified_header_number, unjustified_headers, usize::MAX, |header| { + header.number() + }) +} + +pub(crate) fn prune_recent_finality_proofs( + justified_header_number: P::Number, + recent_finality_proofs: &mut FinalityProofs

, + recent_finality_proofs_limit: usize, +) { + prune_ordered_vec( + justified_header_number, + recent_finality_proofs, + recent_finality_proofs_limit, + |(header_number, _)| *header_number, + ); +} + +fn prune_ordered_vec( + header_number: Number, + ordered_vec: &mut Vec, + maximal_vec_size: usize, + extract_header_number: impl Fn(&T) -> Number, +) -> Option { + let position = ordered_vec.binary_search_by_key(&header_number, extract_header_number); + + // first extract element we're interested in + let extracted_element = match position { + Ok(position) => { + let updated_vec = ordered_vec.split_off(position + 1); + let extracted_element = ordered_vec.pop().expect( + "binary_search_by_key has returned Ok(); so there's element at `position`;\ + we're splitting vec at `position+1`; so we have pruned at least 1 element;\ + qed", + ); + *ordered_vec = updated_vec; + Some(extracted_element) + } + Err(position) => { + *ordered_vec = ordered_vec.split_off(position); + None + } + }; + + // now - limit vec by size + let split_index = ordered_vec.len().saturating_sub(maximal_vec_size); + *ordered_vec = ordered_vec.split_off(split_index); + + extracted_element +} + +fn print_sync_progress( + progress_context: (Instant, Option), + best_number_at_source: P::Number, + best_number_at_target: P::Number, +) -> (Instant, Option) { + let (prev_time, prev_best_number_at_target) = progress_context; + let now = Instant::now(); + + let need_update = now - prev_time > Duration::from_secs(10) + || prev_best_number_at_target + .map(|prev_best_number_at_target| { + best_number_at_target.saturating_sub(prev_best_number_at_target) > 10.into() + }) + .unwrap_or(true); + + if !need_update { + return (prev_time, prev_best_number_at_target); + } + + log::info!( + target: "bridge", + "Synced {:?} of {:?} headers", + best_number_at_target, + best_number_at_source, + ); + (now, Some(best_number_at_target)) +} diff --git a/bridges/relays/finality-relay/src/finality_loop_tests.rs b/bridges/relays/finality-relay/src/finality_loop_tests.rs new file mode 100644 index 0000000000000..5dfe8edd2124a --- /dev/null +++ b/bridges/relays/finality-relay/src/finality_loop_tests.rs @@ -0,0 +1,339 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Tests for finality synchronization loop. + +#![cfg(test)] + +use crate::finality_loop::{ + prune_recent_finality_proofs, prune_unjustified_headers, run, FinalityProofs, FinalitySyncParams, SourceClient, + TargetClient, UnjustifiedHeaders, +}; +use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader}; + +use async_trait::async_trait; +use futures::{FutureExt, Stream, StreamExt}; +use parking_lot::Mutex; +use relay_utils::{relay_loop::Client as RelayClient, MaybeConnectionError}; +use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration}; + +type IsMandatory = bool; +type TestNumber = u64; + +#[derive(Debug, Clone)] +enum TestError { + NonConnection, +} + +impl MaybeConnectionError for TestError { + fn is_connection_error(&self) -> bool { + false + } +} + +#[derive(Debug, Clone)] +struct TestFinalitySyncPipeline; + +impl FinalitySyncPipeline for TestFinalitySyncPipeline { + const SOURCE_NAME: &'static str = "TestSource"; + const TARGET_NAME: &'static str = "TestTarget"; + + type Hash = u64; + type Number = TestNumber; + type Header = TestSourceHeader; + type FinalityProof = TestFinalityProof; +} + +#[derive(Debug, Clone, PartialEq)] +struct TestSourceHeader(IsMandatory, TestNumber); + +impl SourceHeader for TestSourceHeader { + fn number(&self) -> TestNumber { + self.1 + } + + fn is_mandatory(&self) -> bool { + self.0 + } +} + +#[derive(Debug, Clone, PartialEq)] +struct TestFinalityProof(Option); + +impl FinalityProof for TestFinalityProof { + fn target_header_number(&self) -> Option { + self.0 + } +} + +#[derive(Debug, Clone, Default)] +struct ClientsData { + source_best_block_number: TestNumber, + source_headers: HashMap)>, + source_proofs: Vec, + + target_best_block_number: TestNumber, + target_headers: Vec<(TestSourceHeader, TestFinalityProof)>, +} + +#[derive(Clone)] +struct TestSourceClient { + on_method_call: Arc, + data: Arc>, +} + +#[async_trait] +impl RelayClient for TestSourceClient { + type Error = TestError; + + async fn reconnect(&mut self) -> Result<(), TestError> { + unreachable!() + } +} + +#[async_trait] +impl SourceClient for TestSourceClient { + type FinalityProofsStream = Pin>>; + + async fn best_finalized_block_number(&self) -> Result { + let mut data = self.data.lock(); + (self.on_method_call)(&mut *data); + Ok(data.source_best_block_number) + } + + async fn header_and_finality_proof( + &self, + number: TestNumber, + ) -> Result<(TestSourceHeader, Option), TestError> { + let mut data = self.data.lock(); + (self.on_method_call)(&mut *data); + data.source_headers + .get(&number) + .cloned() + .ok_or(TestError::NonConnection) + } + + async fn finality_proofs(&self) -> Result { + let mut data = self.data.lock(); + (self.on_method_call)(&mut *data); + Ok(futures::stream::iter(data.source_proofs.clone()).boxed()) + } +} + +#[derive(Clone)] +struct TestTargetClient { + on_method_call: Arc, + data: Arc>, +} + +#[async_trait] +impl RelayClient for TestTargetClient { + type Error = TestError; + + async fn reconnect(&mut self) -> Result<(), TestError> { + unreachable!() + } +} + +#[async_trait] +impl TargetClient for TestTargetClient { + async fn best_finalized_source_block_number(&self) -> Result { + let mut data = self.data.lock(); + (self.on_method_call)(&mut *data); + Ok(data.target_best_block_number) + } + + async fn submit_finality_proof(&self, header: TestSourceHeader, proof: TestFinalityProof) -> Result<(), TestError> { + let mut data = self.data.lock(); + (self.on_method_call)(&mut *data); + data.target_best_block_number = header.number(); + data.target_headers.push((header, proof)); + Ok(()) + } +} + +fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static) -> ClientsData { + let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded(); + let internal_state_function: Arc = Arc::new(move |data| { + if state_function(data) { + exit_sender.unbounded_send(()).unwrap(); + } + }); + let clients_data = Arc::new(Mutex::new(ClientsData { + source_best_block_number: 10, + source_headers: vec![ + (6, (TestSourceHeader(false, 6), None)), + (7, (TestSourceHeader(false, 7), Some(TestFinalityProof(Some(7))))), + (8, (TestSourceHeader(true, 8), Some(TestFinalityProof(Some(8))))), + (9, (TestSourceHeader(false, 9), Some(TestFinalityProof(Some(9))))), + (10, (TestSourceHeader(false, 10), None)), + ] + .into_iter() + .collect(), + source_proofs: vec![TestFinalityProof(Some(12)), TestFinalityProof(Some(14))], + + target_best_block_number: 5, + target_headers: vec![], + })); + let source_client = TestSourceClient { + on_method_call: internal_state_function.clone(), + data: clients_data.clone(), + }; + let target_client = TestTargetClient { + on_method_call: internal_state_function, + data: clients_data.clone(), + }; + let sync_params = FinalitySyncParams { + tick: Duration::from_secs(0), + recent_finality_proofs_limit: 1024, + stall_timeout: Duration::from_secs(1), + }; + + run( + source_client, + target_client, + sync_params, + None, + exit_receiver.into_future().map(|(_, _)| ()), + ); + + let clients_data = clients_data.lock().clone(); + clients_data +} + +#[test] +fn finality_sync_loop_works() { + let client_data = run_sync_loop(|data| { + // header#7 has persistent finality proof, but it isn't mandatory => it isn't submitted, because + // header#8 has persistent finality proof && it is mandatory => it is submitted + // header#9 has persistent finality proof, but it isn't mandatory => it is submitted, because + // there are no more persistent finality proofs + // + // once this ^^^ is done, we generate more blocks && read proof for blocks 12, 14 and 16 from the stream + // but we only submit proof for 16 + // + // proof for block 15 is ignored - we haven't managed to decode it + if data.target_best_block_number == 9 { + data.source_best_block_number = 17; + data.source_headers.insert(11, (TestSourceHeader(false, 11), None)); + data.source_headers + .insert(12, (TestSourceHeader(false, 12), Some(TestFinalityProof(Some(12))))); + data.source_headers.insert(13, (TestSourceHeader(false, 13), None)); + data.source_headers + .insert(14, (TestSourceHeader(false, 14), Some(TestFinalityProof(Some(14))))); + data.source_headers + .insert(15, (TestSourceHeader(false, 15), Some(TestFinalityProof(None)))); + data.source_headers + .insert(16, (TestSourceHeader(false, 16), Some(TestFinalityProof(Some(16))))); + data.source_headers.insert(17, (TestSourceHeader(false, 17), None)); + } + + data.target_best_block_number == 16 + }); + + assert_eq!( + client_data.target_headers, + vec![ + (TestSourceHeader(true, 8), TestFinalityProof(Some(8))), + (TestSourceHeader(false, 9), TestFinalityProof(Some(9))), + (TestSourceHeader(false, 16), TestFinalityProof(Some(16))), + ], + ); +} + +#[test] +fn prune_unjustified_headers_works() { + let original_unjustified_headers: UnjustifiedHeaders = vec![ + TestSourceHeader(false, 10), + TestSourceHeader(false, 13), + TestSourceHeader(false, 15), + TestSourceHeader(false, 17), + TestSourceHeader(false, 19), + ] + .into_iter() + .collect(); + + // when header is in the collection + let mut unjustified_headers = original_unjustified_headers.clone(); + assert_eq!( + prune_unjustified_headers::(10, &mut unjustified_headers), + Some(TestSourceHeader(false, 10)), + ); + assert_eq!(&original_unjustified_headers[1..], unjustified_headers,); + + // when the header doesn't exist in the collection + let mut unjustified_headers = original_unjustified_headers.clone(); + assert_eq!( + prune_unjustified_headers::(11, &mut unjustified_headers), + None, + ); + assert_eq!(&original_unjustified_headers[1..], unjustified_headers,); + + // when last entry is pruned + let mut unjustified_headers = original_unjustified_headers.clone(); + assert_eq!( + prune_unjustified_headers::(19, &mut unjustified_headers), + Some(TestSourceHeader(false, 19)), + ); + + assert_eq!(&original_unjustified_headers[5..], unjustified_headers,); + + // when we try and prune past last entry + let mut unjustified_headers = original_unjustified_headers.clone(); + assert_eq!( + prune_unjustified_headers::(20, &mut unjustified_headers), + None, + ); + assert_eq!(&original_unjustified_headers[5..], unjustified_headers,); +} + +#[test] +fn prune_recent_finality_proofs_works() { + let original_recent_finality_proofs: FinalityProofs = vec![ + (10, TestFinalityProof(Some(10))), + (13, TestFinalityProof(Some(13))), + (15, TestFinalityProof(Some(15))), + (17, TestFinalityProof(Some(17))), + (19, TestFinalityProof(Some(19))), + ] + .into_iter() + .collect(); + + // when there's proof for justified header in the vec + let mut recent_finality_proofs = original_recent_finality_proofs.clone(); + prune_recent_finality_proofs::(10, &mut recent_finality_proofs, 1024); + assert_eq!(&original_recent_finality_proofs[1..], recent_finality_proofs,); + + // when there are no proof for justified header in the vec + let mut recent_finality_proofs = original_recent_finality_proofs.clone(); + prune_recent_finality_proofs::(11, &mut recent_finality_proofs, 1024); + assert_eq!(&original_recent_finality_proofs[1..], recent_finality_proofs,); + + // when there are too many entries after initial prune && they also need to be pruned + let mut recent_finality_proofs = original_recent_finality_proofs.clone(); + prune_recent_finality_proofs::(10, &mut recent_finality_proofs, 2); + assert_eq!(&original_recent_finality_proofs[3..], recent_finality_proofs,); + + // when last entry is pruned + let mut recent_finality_proofs = original_recent_finality_proofs.clone(); + prune_recent_finality_proofs::(19, &mut recent_finality_proofs, 2); + assert_eq!(&original_recent_finality_proofs[5..], recent_finality_proofs,); + + // when post-last entry is pruned + let mut recent_finality_proofs = original_recent_finality_proofs.clone(); + prune_recent_finality_proofs::(20, &mut recent_finality_proofs, 2); + assert_eq!(&original_recent_finality_proofs[5..], recent_finality_proofs,); +} diff --git a/bridges/relays/finality-relay/src/lib.rs b/bridges/relays/finality-relay/src/lib.rs new file mode 100644 index 0000000000000..e9d946e27f189 --- /dev/null +++ b/bridges/relays/finality-relay/src/lib.rs @@ -0,0 +1,60 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! This crate has single entrypoint to run synchronization loop that is built around finality +//! proofs, as opposed to headers synchronization loop, which is built around headers. The headers +//! are still submitted to the target node, but are treated as auxiliary data as we are not trying +//! to submit all source headers to the target node. + +pub use crate::finality_loop::{run, FinalitySyncParams, SourceClient, TargetClient}; + +use std::fmt::Debug; + +mod finality_loop; +mod finality_loop_tests; + +/// Finality proofs synchronization pipeline. +pub trait FinalitySyncPipeline: Clone + Debug + Send + Sync { + /// Name of the finality proofs source. + const SOURCE_NAME: &'static str; + /// Name of the finality proofs target. + const TARGET_NAME: &'static str; + + /// Headers we're syncing are identified by this hash. + type Hash: Eq + Clone + Copy + Send + Sync + Debug; + /// Headers we're syncing are identified by this number. + type Number: relay_utils::BlockNumberBase; + /// Type of header that we're syncing. + type Header: SourceHeader; + /// Finality proof type. + type FinalityProof: FinalityProof; +} + +/// Header that we're receiving from source node. +pub trait SourceHeader: Clone + Debug + PartialEq + Send + Sync { + /// Returns number of header. + fn number(&self) -> Number; + /// Returns true if this header needs to be submitted to target node. + fn is_mandatory(&self) -> bool; +} + +/// Abstract finality proof that is justifying block finality. +pub trait FinalityProof: Clone + Send + Sync + Debug { + /// Return header id that this proof is generated for. + /// + /// None is returned if proof is invalid from relayer PoV. + fn target_header_number(&self) -> Option; +} diff --git a/bridges/relays/headers-relay/src/sync_loop_metrics.rs b/bridges/relays/headers-relay/src/sync_loop_metrics.rs index 456aa0a6b0522..9cec0e891dc81 100644 --- a/bridges/relays/headers-relay/src/sync_loop_metrics.rs +++ b/bridges/relays/headers-relay/src/sync_loop_metrics.rs @@ -57,18 +57,28 @@ impl Default for SyncLoopMetrics { } impl SyncLoopMetrics { - /// Update metrics. - pub fn update(&self, sync: &HeadersSync

) { - let headers = sync.headers(); - let source_best_number = sync.source_best_number().unwrap_or_else(Zero::zero); - let target_best_number = sync.target_best_header().map(|id| id.0).unwrap_or_else(Zero::zero); - + /// Update best block number at source. + pub fn update_best_block_at_source>(&self, source_best_number: Number) { self.best_block_numbers .with_label_values(&["source"]) .set(source_best_number.into()); + } + + /// Update best block number at target. + pub fn update_best_block_at_target>(&self, target_best_number: Number) { self.best_block_numbers .with_label_values(&["target"]) .set(target_best_number.into()); + } + + /// Update metrics. + pub fn update(&self, sync: &HeadersSync

) { + let headers = sync.headers(); + let source_best_number = sync.source_best_number().unwrap_or_else(Zero::zero); + let target_best_number = sync.target_best_header().map(|id| id.0).unwrap_or_else(Zero::zero); + + self.update_best_block_at_source(source_best_number); + self.update_best_block_at_target(target_best_number); self.blocks_in_state .with_label_values(&["maybe_orphan"]) diff --git a/bridges/relays/substrate-client/Cargo.toml b/bridges/relays/substrate-client/Cargo.toml index 18df0dba874b5..8a794c86081b6 100644 --- a/bridges/relays/substrate-client/Cargo.toml +++ b/bridges/relays/substrate-client/Cargo.toml @@ -18,8 +18,10 @@ rand = "0.7" # Bridge dependencies +bp-header-chain = { path = "../../primitives/header-chain" } bp-message-lane = { path = "../../primitives/message-lane" } bp-runtime = { path = "../../primitives/runtime" } +finality-relay = { path = "../finality-relay" } headers-relay = { path = "../headers-relay" } relay-utils = { path = "../utils" } diff --git a/bridges/relays/substrate-client/src/chain.rs b/bridges/relays/substrate-client/src/chain.rs index 352a63dc48884..60a3cc5d17491 100644 --- a/bridges/relays/substrate-client/src/chain.rs +++ b/bridges/relays/substrate-client/src/chain.rs @@ -23,7 +23,9 @@ use num_traits::{CheckedSub, Zero}; use sp_core::{storage::StorageKey, Pair}; use sp_runtime::{ generic::SignedBlock, - traits::{AtLeast32Bit, Dispatchable, MaybeDisplay, MaybeSerialize, MaybeSerializeDeserialize, Member}, + traits::{ + AtLeast32Bit, Block as BlockT, Dispatchable, MaybeDisplay, MaybeSerialize, MaybeSerializeDeserialize, Member, + }, Justification, }; use std::{fmt::Debug, time::Duration}; @@ -51,7 +53,7 @@ pub trait Chain: ChainBase { + AtLeast32Bit + Copy; /// Block type. - type SignedBlock: Member + Serialize + DeserializeOwned + BlockWithJustification; + type SignedBlock: Member + Serialize + DeserializeOwned + BlockWithJustification; /// The aggregated `Call` type. type Call: Dispatchable + Debug; } @@ -67,7 +69,9 @@ pub trait ChainWithBalances: Chain { } /// Block with justification. -pub trait BlockWithJustification { +pub trait BlockWithJustification

{ + /// Return block header. + fn header(&self) -> Header; /// Return block justification, if known. fn justification(&self) -> Option<&Justification>; } @@ -90,13 +94,11 @@ pub trait TransactionSignScheme { ) -> Self::SignedTransaction; } -impl BlockWithJustification for () { - fn justification(&self) -> Option<&Justification> { - None +impl BlockWithJustification for SignedBlock { + fn header(&self) -> Block::Header { + self.block.header().clone() } -} -impl BlockWithJustification for SignedBlock { fn justification(&self) -> Option<&Justification> { self.justification.as_ref() } diff --git a/bridges/relays/substrate-client/src/finality_source.rs b/bridges/relays/substrate-client/src/finality_source.rs new file mode 100644 index 0000000000000..2c76619e867fa --- /dev/null +++ b/bridges/relays/substrate-client/src/finality_source.rs @@ -0,0 +1,147 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Default generic implementation of finality source for basic Substrate client. + +use crate::chain::{BlockWithJustification, Chain}; +use crate::client::Client; +use crate::error::Error; +use crate::sync_header::SyncHeader; + +use async_trait::async_trait; +use finality_relay::{FinalityProof, FinalitySyncPipeline, SourceClient, SourceHeader}; +use futures::stream::{unfold, Stream, StreamExt}; +use relay_utils::relay_loop::Client as RelayClient; +use sp_runtime::traits::Header as HeaderT; +use std::{marker::PhantomData, pin::Pin}; + +/// Wrapped raw Justification. +#[derive(Debug, Clone)] +pub struct Justification
{ + raw_justification: sp_runtime::Justification, + _phantom: PhantomData
, +} + +impl
Justification
{ + /// Extract raw justification. + pub fn into_inner(self) -> sp_runtime::Justification { + self.raw_justification + } +} + +impl
FinalityProof for Justification
+where + Header: HeaderT, +{ + fn target_header_number(&self) -> Option { + bp_header_chain::justification::decode_justification_target::
(&self.raw_justification) + .ok() + .map(|(_, number)| number) + } +} + +/// Substrate node as finality source. +pub struct FinalitySource { + client: Client, + _phantom: PhantomData

, +} + +impl FinalitySource { + /// Create new headers source using given client. + pub fn new(client: Client) -> Self { + FinalitySource { + client, + _phantom: Default::default(), + } + } +} + +impl Clone for FinalitySource { + fn clone(&self) -> Self { + FinalitySource { + client: self.client.clone(), + _phantom: Default::default(), + } + } +} + +#[async_trait] +impl RelayClient for FinalitySource { + type Error = Error; + + async fn reconnect(&mut self) -> Result<(), Error> { + self.client.reconnect().await + } +} + +#[async_trait] +impl SourceClient

for FinalitySource +where + C: Chain, + C::BlockNumber: relay_utils::BlockNumberBase, + P: FinalitySyncPipeline< + Hash = C::Hash, + Number = C::BlockNumber, + Header = SyncHeader, + FinalityProof = Justification, + >, + P::Header: SourceHeader, +{ + type FinalityProofsStream = Pin>>>; + + async fn best_finalized_block_number(&self) -> Result { + // we **CAN** continue to relay finality proofs if source node is out of sync, because + // target node may be missing proofs that are already available at the source + let finalized_header_hash = self.client.best_finalized_header_hash().await?; + let finalized_header = self.client.header_by_hash(finalized_header_hash).await?; + Ok(*finalized_header.number()) + } + + async fn header_and_finality_proof( + &self, + number: P::Number, + ) -> Result<(P::Header, Option), Error> { + let header_hash = self.client.block_hash_by_number(number).await?; + let signed_block = self.client.get_block(Some(header_hash)).await?; + Ok(( + signed_block.header().into(), + signed_block + .justification() + .cloned() + .map(|raw_justification| Justification { + raw_justification, + _phantom: Default::default(), + }), + )) + } + + async fn finality_proofs(&self) -> Result { + Ok(unfold( + self.client.clone().subscribe_justifications().await?, + move |mut subscription| async move { + let next_justification = subscription.next().await?; + Some(( + Justification { + raw_justification: next_justification.0, + _phantom: Default::default(), + }, + subscription, + )) + }, + ) + .boxed()) + } +} diff --git a/bridges/relays/substrate-client/src/guard.rs b/bridges/relays/substrate-client/src/guard.rs index d439ec890763d..e924b94b6f79f 100644 --- a/bridges/relays/substrate-client/src/guard.rs +++ b/bridges/relays/substrate-client/src/guard.rs @@ -187,7 +187,8 @@ mod tests { type AccountId = u32; type Index = u32; - type SignedBlock = (); + type SignedBlock = + sp_runtime::generic::SignedBlock>; type Call = (); } diff --git a/bridges/relays/substrate-client/src/lib.rs b/bridges/relays/substrate-client/src/lib.rs index c6d077b21c562..6a21e3bffa207 100644 --- a/bridges/relays/substrate-client/src/lib.rs +++ b/bridges/relays/substrate-client/src/lib.rs @@ -24,6 +24,7 @@ mod error; mod rpc; mod sync_header; +pub mod finality_source; pub mod guard; pub mod headers_source; diff --git a/bridges/relays/substrate-client/src/sync_header.rs b/bridges/relays/substrate-client/src/sync_header.rs index fd1c582b9480e..30e33a39279a2 100644 --- a/bridges/relays/substrate-client/src/sync_header.rs +++ b/bridges/relays/substrate-client/src/sync_header.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . +use bp_header_chain::find_grandpa_authorities_scheduled_change; +use finality_relay::SourceHeader as FinalitySourceHeader; use headers_relay::sync_types::SourceHeader; use num_traits::{CheckedSub, One}; use relay_utils::HeaderId; @@ -47,7 +49,7 @@ impl

From
for SyncHeader
{ impl SourceHeader for SyncHeader
{ fn id(&self) -> HeaderId { - relay_utils::HeaderId(*self.number(), self.hash()) + relay_utils::HeaderId(*self.0.number(), self.hash()) } fn parent_id(&self) -> HeaderId { @@ -59,3 +61,13 @@ impl SourceHeader for SyncHeader< ) } } + +impl FinalitySourceHeader for SyncHeader
{ + fn number(&self) -> Header::Number { + *self.0.number() + } + + fn is_mandatory(&self) -> bool { + find_grandpa_authorities_scheduled_change(&self.0).is_some() + } +} diff --git a/bridges/relays/substrate/Cargo.toml b/bridges/relays/substrate/Cargo.toml index 6ea5e839d8ef0..08cf40c383521 100644 --- a/bridges/relays/substrate/Cargo.toml +++ b/bridges/relays/substrate/Cargo.toml @@ -26,12 +26,13 @@ bp-polkadot = { path = "../../primitives/polkadot" } bp-runtime = { path = "../../primitives/runtime" } bp-rialto = { path = "../../primitives/rialto" } bridge-runtime-common = { path = "../../bin/runtime-common" } +finality-relay = { path = "../finality-relay" } headers-relay = { path = "../headers-relay" } messages-relay = { path = "../messages-relay" } millau-runtime = { path = "../../bin/millau/runtime" } pallet-bridge-call-dispatch = { path = "../../modules/call-dispatch" } +pallet-finality-verifier = { path = "../../modules/finality-verifier" } pallet-message-lane = { path = "../../modules/message-lane" } -pallet-substrate-bridge = { path = "../../modules/substrate" } relay-kusama-client = { path = "../kusama-client" } relay-millau-client = { path = "../millau-client" } relay-polkadot-client = { path = "../polkadot-client" } diff --git a/bridges/relays/substrate/src/finality_pipeline.rs b/bridges/relays/substrate/src/finality_pipeline.rs new file mode 100644 index 0000000000000..7ec592d5dee1a --- /dev/null +++ b/bridges/relays/substrate/src/finality_pipeline.rs @@ -0,0 +1,130 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Substrate-to-Substrate headers sync entrypoint. + +use crate::finality_target::SubstrateFinalityTarget; + +use async_trait::async_trait; +use codec::Encode; +use finality_relay::{FinalitySyncParams, FinalitySyncPipeline}; +use relay_substrate_client::{ + finality_source::{FinalitySource, Justification}, + BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf, SyncHeader, +}; +use relay_utils::BlockNumberBase; +use std::{fmt::Debug, marker::PhantomData, time::Duration}; + +/// Default synchronization loop timeout. +const STALL_TIMEOUT: Duration = Duration::from_secs(120); +/// Default limit of recent finality proofs. +/// +/// Finality delay of 4096 blocks is unlikely to happen in practice in +/// Substrate+GRANDPA based chains (good to know). +const RECENT_FINALITY_PROOFS_LIMIT: usize = 4096; + +/// Headers sync pipeline for Substrate <-> Substrate relays. +#[async_trait] +pub trait SubstrateFinalitySyncPipeline: FinalitySyncPipeline { + /// Name of the runtime method that returns id of best finalized source header at target chain. + const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str; + + /// Signed transaction type. + type SignedTransaction: Send + Sync + Encode; + + /// Make submit header transaction. + async fn make_submit_finality_proof_transaction( + &self, + header: Self::Header, + proof: Self::FinalityProof, + ) -> Result; +} + +/// Substrate-to-Substrate finality proof pipeline. +#[derive(Debug, Clone)] +pub struct SubstrateFinalityToSubstrate { + /// Client for the target chain. + pub(crate) target_client: Client, + /// Data required to sign target chain transactions. + pub(crate) target_sign: TargetSign, + /// Unused generic arguments dump. + _marker: PhantomData, +} + +impl SubstrateFinalityToSubstrate { + /// Create new Substrate-to-Substrate headers pipeline. + pub fn new(target_client: Client, target_sign: TargetSign) -> Self { + SubstrateFinalityToSubstrate { + target_client, + target_sign, + _marker: Default::default(), + } + } +} + +impl FinalitySyncPipeline + for SubstrateFinalityToSubstrate +where + SourceChain: Clone + Chain + Debug, + BlockNumberOf: BlockNumberBase, + TargetChain: Clone + Chain + Debug, + TargetSign: Clone + Send + Sync + Debug, +{ + const SOURCE_NAME: &'static str = SourceChain::NAME; + const TARGET_NAME: &'static str = TargetChain::NAME; + + type Hash = HashOf; + type Number = BlockNumberOf; + type Header = SyncHeader; + type FinalityProof = Justification; +} + +/// Run Substrate-to-Substrate finality sync. +pub async fn run( + pipeline: P, + source_client: Client, + target_client: Client, + metrics_params: Option, +) where + P: SubstrateFinalitySyncPipeline< + Hash = HashOf, + Number = BlockNumberOf, + Header = SyncHeader, + FinalityProof = Justification, + >, + SourceChain: Clone + Chain, + BlockNumberOf: BlockNumberBase, + TargetChain: Clone + Chain, +{ + log::info!( + target: "bridge", + "Starting {} -> {} finality proof relay", + SourceChain::NAME, + TargetChain::NAME, + ); + + finality_relay::run( + FinalitySource::new(source_client), + SubstrateFinalityTarget::new(target_client, pipeline), + FinalitySyncParams { + tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL), + recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT, + stall_timeout: STALL_TIMEOUT, + }, + metrics_params, + futures::future::pending(), + ); +} diff --git a/bridges/relays/substrate/src/finality_target.rs b/bridges/relays/substrate/src/finality_target.rs new file mode 100644 index 0000000000000..18312556f9e17 --- /dev/null +++ b/bridges/relays/substrate/src/finality_target.rs @@ -0,0 +1,91 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Substrate client as Substrate finality proof target. The chain we connect to should have +//! runtime that implements `FinalityApi` to allow bridging with +//! chain. + +use crate::finality_pipeline::SubstrateFinalitySyncPipeline; + +use async_trait::async_trait; +use codec::{Decode, Encode}; +use finality_relay::TargetClient; +use futures::TryFutureExt; +use relay_substrate_client::{Chain, Client, Error as SubstrateError}; +use relay_utils::relay_loop::Client as RelayClient; +use sp_core::Bytes; + +/// Substrate client as Substrate finality target. +pub struct SubstrateFinalityTarget { + client: Client, + pipeline: P, +} + +impl SubstrateFinalityTarget { + /// Create new Substrate headers target. + pub fn new(client: Client, pipeline: P) -> Self { + SubstrateFinalityTarget { client, pipeline } + } +} + +impl Clone for SubstrateFinalityTarget { + fn clone(&self) -> Self { + SubstrateFinalityTarget { + client: self.client.clone(), + pipeline: self.pipeline.clone(), + } + } +} + +#[async_trait] +impl RelayClient for SubstrateFinalityTarget { + type Error = SubstrateError; + + async fn reconnect(&mut self) -> Result<(), SubstrateError> { + self.client.reconnect().await + } +} + +#[async_trait] +impl TargetClient

for SubstrateFinalityTarget +where + C: Chain, + P::Number: Decode, + P::Hash: Decode, + P: SubstrateFinalitySyncPipeline, +{ + async fn best_finalized_source_block_number(&self) -> Result { + // we can't continue to relay finality if target node is out of sync, because + // it may have already received (some of) headers that we're going to relay + self.client.ensure_synced().await?; + + Ok(crate::messages_source::read_client_state::( + &self.client, + P::BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET, + ) + .await? + .best_finalized_peer_at_best_self + .0) + } + + async fn submit_finality_proof(&self, header: P::Header, proof: P::FinalityProof) -> Result<(), SubstrateError> { + self.pipeline + .make_submit_finality_proof_transaction(header, proof) + .and_then(|tx| self.client.submit_extrinsic(Bytes(tx.encode()))) + .await + .map(drop) + } +} diff --git a/bridges/relays/substrate/src/headers_initialize.rs b/bridges/relays/substrate/src/headers_initialize.rs index 6b66a2e9bd7fb..2386f28f1b1ac 100644 --- a/bridges/relays/substrate/src/headers_initialize.rs +++ b/bridges/relays/substrate/src/headers_initialize.rs @@ -17,12 +17,12 @@ //! Initialize Substrate -> Substrate headers bridge. //! //! Initialization is a transaction that calls `initialize()` function of the -//! `pallet-substrate-bridge` pallet. This transaction brings initial header +//! `pallet-finality-verifier` pallet. This transaction brings initial header //! and authorities set from source to target chain. The headers sync starts //! with this header. use codec::Decode; -use pallet_substrate_bridge::InitializationData; +use pallet_finality_verifier::InitializationData; use relay_substrate_client::{Chain, Client}; use sp_core::Bytes; use sp_finality_grandpa::{AuthorityList as GrandpaAuthoritiesSet, SetId as GrandpaAuthoritiesSetId}; @@ -132,10 +132,6 @@ async fn prepare_initialization_data( header: initial_header, authority_list: initial_authorities_set, set_id: initial_authorities_set_id.unwrap_or(0), - // There may be multiple scheduled changes, so on real chains we should select proper - // moment, when there's nothing scheduled. On ephemeral (temporary) chains, it is ok to - // start with genesis. - scheduled_change: None, is_halted: false, }) } diff --git a/bridges/relays/substrate/src/headers_maintain.rs b/bridges/relays/substrate/src/headers_maintain.rs deleted file mode 100644 index fb609a2ef0705..0000000000000 --- a/bridges/relays/substrate/src/headers_maintain.rs +++ /dev/null @@ -1,458 +0,0 @@ -// Copyright 2019-2020 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -//! Substrate-to-Substrate headers synchronization maintain procedure. -//! -//! Regular headers synchronization only depends on persistent justifications -//! that are generated when authorities set changes. This happens rarely on -//! real-word chains. So some other way to finalize headers is required. -//! -//! Full nodes are listening to GRANDPA messages, so they may have track authorities -//! votes on their own. They're returning both persistent and ephemeral justifications -//! (justifications that are not stored in the database and not broadcasted over network) -//! throught `grandpa_subscribeJustifications` RPC subscription. -//! -//! The idea of this maintain procedure is that when we see justification that 'improves' -//! best finalized header on the target chain, we submit this justification to the target -//! node. - -use crate::headers_pipeline::SubstrateHeadersSyncPipeline; - -use async_std::sync::{Arc, Mutex}; -use async_trait::async_trait; -use codec::{Decode, Encode}; -use futures::future::{poll_fn, FutureExt, TryFutureExt}; -use headers_relay::{ - sync::HeadersSync, - sync_loop::SyncMaintain, - sync_types::{HeaderIdOf, HeaderStatus}, -}; -use relay_substrate_client::{Chain, Client, Error as SubstrateError, JustificationsSubscription}; -use relay_utils::HeaderId; -use sp_core::Bytes; -use sp_runtime::{traits::Header as HeaderT, Justification}; -use std::{collections::VecDeque, marker::PhantomData, task::Poll}; - -/// Substrate-to-Substrate headers synchronization maintain procedure. -pub struct SubstrateHeadersToSubstrateMaintain -{ - pipeline: P, - source_client: Client, - target_client: Client, - justifications: Arc>>, - _marker: PhantomData, -} - -/// Future and already received justifications from the source chain. -struct Justifications { - /// Justifications stream. None if it hasn't been initialized yet, or it has been dropped - /// by the rpc library. - stream: Option, - /// Justifications that we have read from the stream but have not sent to the - /// target node, because their targets were still not synced. - queue: VecDeque<(HeaderIdOf

, Justification)>, -} - -impl - SubstrateHeadersToSubstrateMaintain -{ - /// Create new maintain procedure. - pub async fn new(pipeline: P, source_client: Client, target_client: Client) -> Self { - let justifications = subscribe_justifications(&source_client).await; - SubstrateHeadersToSubstrateMaintain { - pipeline, - source_client, - target_client, - justifications: Arc::new(Mutex::new(Justifications { - stream: justifications, - queue: VecDeque::new(), - })), - _marker: Default::default(), - } - } -} - -#[async_trait] -impl Clone - for SubstrateHeadersToSubstrateMaintain -{ - fn clone(&self) -> Self { - SubstrateHeadersToSubstrateMaintain { - pipeline: self.pipeline.clone(), - source_client: self.source_client.clone(), - target_client: self.target_client.clone(), - justifications: self.justifications.clone(), - _marker: Default::default(), - } - } -} - -#[async_trait] -impl SyncMaintain

for SubstrateHeadersToSubstrateMaintain -where - SourceChain: Chain, - ::Number: Into, - ::Hash: Into, - TargetChain: Chain, - P::Number: Decode, - P::Hash: Decode, - P: SubstrateHeadersSyncPipeline, -{ - async fn maintain(&self, sync: &mut HeadersSync

) { - // lock justifications before doing anything else - let mut justifications = match self.justifications.try_lock() { - Some(justifications) => justifications, - None => { - // this should never happen, as we use single-thread executor - log::warn!(target: "bridge", "Failed to acquire {} justifications lock", P::SOURCE_NAME); - return; - } - }; - - // we need to read best finalized header from the target node to be able to - // choose justification to submit - let best_finalized = match best_finalized_header_id::(&self.target_client).await { - Ok(best_finalized) => best_finalized, - Err(error) => { - log::warn!( - target: "bridge", - "Failed to read best finalized {} block from maintain: {:?}", - P::SOURCE_NAME, - error, - ); - return; - } - }; - - log::debug!( - target: "bridge", - "Read best finalized {} block from {}: {:?}", - P::SOURCE_NAME, - P::TARGET_NAME, - best_finalized, - ); - - // Select justification to submit to the target node. We're submitting at most one justification - // on every maintain call. So maintain rate directly affects finalization rate. - let (resubscribe, justification_to_submit) = poll_fn(|context| { - // read justifications from the stream and push to the queue - let resubscribe = !justifications.read_from_stream::(context); - - // remove all obsolete justifications from the queue - remove_obsolete::

(&mut justifications.queue, best_finalized); - - // select justification to submit - Poll::Ready((resubscribe, select_justification(&mut justifications.queue, sync))) - }) - .await; - - // if justifications subscription has been dropped, resubscribe - if resubscribe { - justifications.stream = subscribe_justifications(&self.source_client).await; - } - - // finally - submit selected justification - if let Some((target, justification)) = justification_to_submit { - let submit_result = self - .pipeline - .make_complete_header_transaction(target, justification) - .and_then(|tx| self.target_client.submit_extrinsic(Bytes(tx.encode()))) - .await; - - match submit_result { - Ok(_) => log::debug!( - target: "bridge", - "Submitted justification received over {} subscription. Target: {:?}", - P::SOURCE_NAME, - target, - ), - Err(error) => log::warn!( - target: "bridge", - "Failed to submit justification received over {} subscription for {:?}: {:?}", - P::SOURCE_NAME, - target, - error, - ), - } - } - } -} - -impl

Justifications

-where - P::Number: Decode, - P::Hash: Decode, - P: SubstrateHeadersSyncPipeline, -{ - /// Read justifications from the subscription stream without blocking. - /// - /// Returns `true` if justifications stream is still readable and `false` if it has been - /// dropped by the RPC crate && we need to resubscribe. - #[must_use] - fn read_from_stream<'a, SourceHeader>(&mut self, context: &mut std::task::Context<'a>) -> bool - where - SourceHeader: HeaderT, - SourceHeader::Number: Into, - SourceHeader::Hash: Into, - { - let stream = match self.stream.as_mut() { - Some(stream) => stream, - None => return false, - }; - - loop { - let maybe_next_justification = stream.next(); - futures::pin_mut!(maybe_next_justification); - - let maybe_next_justification = maybe_next_justification.poll_unpin(context); - let justification = match maybe_next_justification { - Poll::Ready(justification) => justification, - Poll::Pending => return true, - }; - - let justification = match justification { - Some(justification) => justification, - None => { - log::warn!( - target: "bridge", - "{} justifications stream has been dropped. Will be trying to resubscribe", - P::SOURCE_NAME, - ); - - return false; - } - }; - - // decode justification target - let target = bp_header_chain::justification::decode_justification_target::(&justification); - let target = match target { - Ok((target_hash, target_number)) => HeaderId(target_number.into(), target_hash.into()), - Err(error) => { - log::warn!( - target: "bridge", - "Failed to decode justification from {} subscription: {:?}", - P::SOURCE_NAME, - error, - ); - continue; - } - }; - - log::debug!( - target: "bridge", - "Received {} justification over subscription. Target: {:?}", - P::SOURCE_NAME, - target, - ); - - self.queue.push_back((target, justification.0)); - } - } -} - -/// Clean queue of all justifications that are justifying already finalized blocks. -fn remove_obsolete( - queue: &mut VecDeque<(HeaderIdOf

, Justification)>, - best_finalized: HeaderIdOf

, -) { - while queue - .front() - .map(|(target, _)| target.0 <= best_finalized.0) - .unwrap_or(false) - { - queue.pop_front(); - } -} - -/// Select appropriate justification that would improve best finalized block on target node. -/// -/// It is assumed that the selected justification will be submitted to the target node. The -/// justification itself and all preceeding justifications are removed from the queue. -fn select_justification

( - queue: &mut VecDeque<(HeaderIdOf

, Justification)>, - sync: &mut HeadersSync

, -) -> Option<(HeaderIdOf

, Justification)> -where - P: SubstrateHeadersSyncPipeline, -{ - let mut selected_justification = None; - while let Some((target, justification)) = queue.pop_front() { - // if we're waiting for this justification, report it - if sync.headers().requires_completion_data(&target) { - sync.headers_mut().completion_response(&target, Some(justification)); - // we won't submit previous justifications as we going to submit justification for - // next header - selected_justification = None; - // we won't submit next justifications as we need to submit previous justifications - // first - break; - } - - // if we know that the header is already synced (it is known to the target node), let's - // select it for submission. We still may select better justification on the next iteration. - if sync.headers().status(&target) == HeaderStatus::Synced { - selected_justification = Some((target, justification)); - continue; - } - - // finally - return justification back to the queue - queue.push_back((target, justification)); - break; - } - - selected_justification -} - -/// Returns best finalized source header on the target chain. -async fn best_finalized_header_id(client: &Client) -> Result, SubstrateError> -where - P: SubstrateHeadersSyncPipeline, - P::Number: Decode, - P::Hash: Decode, - C: Chain, -{ - let call = P::FINALIZED_BLOCK_METHOD.into(); - let data = Bytes(Vec::new()); - - let encoded_response = client.state_call(call, data, None).await?; - let decoded_response: (P::Number, P::Hash) = - Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; - - let best_header_id = HeaderId(decoded_response.0, decoded_response.1); - Ok(best_header_id) -} - -/// Subscribe to justifications stream at source node. -async fn subscribe_justifications(client: &Client) -> Option { - match client.subscribe_justifications().await { - Ok(source_justifications) => { - log::debug!( - target: "bridge", - "Successfully (re)subscribed to {} justifications", - C::NAME, - ); - - Some(source_justifications) - } - Err(error) => { - log::warn!( - target: "bridge", - "Failed to subscribe to {} justifications: {:?}", - C::NAME, - error, - ); - - None - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::headers_pipeline::sync_params; - use crate::millau_headers_to_rialto::MillauHeadersToRialto; - - fn parent_hash(index: u8) -> bp_millau::Hash { - if index == 1 { - Default::default() - } else { - header(index - 1).hash() - } - } - - fn header_hash(index: u8) -> bp_millau::Hash { - header(index).hash() - } - - fn header(index: u8) -> bp_millau::Header { - bp_millau::Header::new( - index as _, - Default::default(), - Default::default(), - parent_hash(index), - Default::default(), - ) - } - - #[test] - fn obsolete_justifications_are_removed() { - let mut queue = vec![ - (HeaderId(1, header_hash(1)), vec![1]), - (HeaderId(2, header_hash(2)), vec![2]), - (HeaderId(3, header_hash(3)), vec![3]), - ] - .into_iter() - .collect(); - - remove_obsolete::(&mut queue, HeaderId(2, header_hash(2))); - - assert_eq!( - queue, - vec![(HeaderId(3, header_hash(3)), vec![3])] - .into_iter() - .collect::>(), - ); - } - - #[test] - fn latest_justification_is_selected() { - let mut queue = vec![ - (HeaderId(1, header_hash(1)), vec![1]), - (HeaderId(2, header_hash(2)), vec![2]), - (HeaderId(3, header_hash(3)), vec![3]), - ] - .into_iter() - .collect(); - let mut sync = HeadersSync::::new(sync_params()); - sync.headers_mut().header_response(header(1).into()); - sync.headers_mut().header_response(header(2).into()); - sync.headers_mut().header_response(header(3).into()); - sync.target_best_header_response(HeaderId(2, header_hash(2))); - - assert_eq!( - select_justification(&mut queue, &mut sync), - Some((HeaderId(2, header_hash(2)), vec![2])), - ); - } - - #[test] - fn required_justification_is_reported() { - let mut queue = vec![ - (HeaderId(1, header_hash(1)), vec![1]), - (HeaderId(2, header_hash(2)), vec![2]), - (HeaderId(3, header_hash(3)), vec![3]), - ] - .into_iter() - .collect(); - let mut sync = HeadersSync::::new(sync_params()); - sync.headers_mut().header_response(header(1).into()); - sync.headers_mut().header_response(header(2).into()); - sync.headers_mut().header_response(header(3).into()); - sync.headers_mut() - .incomplete_headers_response(vec![HeaderId(2, header_hash(2))].into_iter().collect()); - sync.target_best_header_response(HeaderId(2, header_hash(2))); - - assert_eq!(sync.headers_mut().header_to_complete(), None,); - - assert_eq!(select_justification(&mut queue, &mut sync), None,); - - assert_eq!( - sync.headers_mut().header_to_complete(), - Some((HeaderId(2, header_hash(2)), &vec![2])), - ); - } -} diff --git a/bridges/relays/substrate/src/headers_pipeline.rs b/bridges/relays/substrate/src/headers_pipeline.rs deleted file mode 100644 index 541a613735c52..0000000000000 --- a/bridges/relays/substrate/src/headers_pipeline.rs +++ /dev/null @@ -1,166 +0,0 @@ -// Copyright 2019-2020 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -//! Substrate-to-Substrate headers sync entrypoint. - -use crate::{headers_maintain::SubstrateHeadersToSubstrateMaintain, headers_target::SubstrateHeadersTarget}; - -use async_trait::async_trait; -use codec::Encode; -use headers_relay::{ - sync::{HeadersSyncParams, TargetTransactionMode}, - sync_types::{HeaderIdOf, HeadersSyncPipeline, QueuedHeader, SourceHeader}, -}; -use relay_substrate_client::{ - headers_source::HeadersSource, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf, -}; -use relay_utils::BlockNumberBase; -use sp_runtime::Justification; -use std::marker::PhantomData; - -/// Headers sync pipeline for Substrate <-> Substrate relays. -#[async_trait] -pub trait SubstrateHeadersSyncPipeline: HeadersSyncPipeline { - /// Name of the `best_block` runtime method. - const BEST_BLOCK_METHOD: &'static str; - /// Name of the `finalized_block` runtime method. - const FINALIZED_BLOCK_METHOD: &'static str; - /// Name of the `is_known_block` runtime method. - const IS_KNOWN_BLOCK_METHOD: &'static str; - /// Name of the `incomplete_headers` runtime method. - const INCOMPLETE_HEADERS_METHOD: &'static str; - - /// Signed transaction type. - type SignedTransaction: Send + Sync + Encode; - - /// Make submit header transaction. - async fn make_submit_header_transaction( - &self, - header: QueuedHeader, - ) -> Result; - - /// Make completion transaction for the header. - async fn make_complete_header_transaction( - &self, - id: HeaderIdOf, - completion: Justification, - ) -> Result; -} - -/// Substrate-to-Substrate headers pipeline. -#[derive(Debug, Clone)] -pub struct SubstrateHeadersToSubstrate { - /// Client for the target chain. - pub(crate) target_client: Client, - /// Data required to sign target chain transactions. - pub(crate) target_sign: TargetSign, - /// Unused generic arguments dump. - _marker: PhantomData<(SourceChain, SourceSyncHeader)>, -} - -impl - SubstrateHeadersToSubstrate -{ - /// Create new Substrate-to-Substrate headers pipeline. - pub fn new(target_client: Client, target_sign: TargetSign) -> Self { - SubstrateHeadersToSubstrate { - target_client, - target_sign, - _marker: Default::default(), - } - } -} - -impl HeadersSyncPipeline - for SubstrateHeadersToSubstrate -where - SourceChain: Clone + Chain, - BlockNumberOf: BlockNumberBase, - SourceSyncHeader: - SourceHeader, BlockNumberOf> + std::ops::Deref, - TargetChain: Clone + Chain, - TargetSign: Clone + Send + Sync, -{ - const SOURCE_NAME: &'static str = SourceChain::NAME; - const TARGET_NAME: &'static str = TargetChain::NAME; - - type Hash = HashOf; - type Number = BlockNumberOf; - type Header = SourceSyncHeader; - type Extra = (); - type Completion = Justification; - - fn estimate_size(source: &QueuedHeader) -> usize { - source.header().encode().len() - } -} - -/// Return sync parameters for Substrate-to-Substrate headers sync. -pub fn sync_params() -> HeadersSyncParams { - HeadersSyncParams { - max_future_headers_to_download: 32, - max_headers_in_submitted_status: 8, - max_headers_in_single_submit: 1, - max_headers_size_in_single_submit: 1024 * 1024, - prune_depth: 256, - target_tx_mode: TargetTransactionMode::Signed, - } -} - -/// Run Substrate-to-Substrate headers sync. -pub async fn run( - pipeline: P, - source_client: Client, - target_client: Client, - metrics_params: Option, -) where - P: SubstrateHeadersSyncPipeline< - Hash = HashOf, - Number = BlockNumberOf, - Completion = Justification, - Extra = (), - >, - P::Header: SourceHeader, BlockNumberOf>, - SourceChain: Clone + Chain, - SourceChain::Header: Into, - BlockNumberOf: BlockNumberBase, - TargetChain: Clone + Chain, -{ - let sync_maintain = SubstrateHeadersToSubstrateMaintain::<_, SourceChain, _>::new( - pipeline.clone(), - source_client.clone(), - target_client.clone(), - ) - .await; - - log::info!( - target: "bridge", - "Starting {} -> {} headers relay", - SourceChain::NAME, - TargetChain::NAME, - ); - - headers_relay::sync_loop::run( - HeadersSource::new(source_client), - SourceChain::AVERAGE_BLOCK_INTERVAL, - SubstrateHeadersTarget::new(target_client, pipeline), - TargetChain::AVERAGE_BLOCK_INTERVAL, - sync_maintain, - sync_params(), - metrics_params, - futures::future::pending(), - ); -} diff --git a/bridges/relays/substrate/src/headers_target.rs b/bridges/relays/substrate/src/headers_target.rs deleted file mode 100644 index 2b5f63a7feae3..0000000000000 --- a/bridges/relays/substrate/src/headers_target.rs +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright 2019-2020 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -//! Substrate client as Substrate headers target. The chain we connect to should have -//! runtime that implements `HeaderApi` to allow bridging with -//! chain. - -use crate::headers_pipeline::SubstrateHeadersSyncPipeline; - -use async_trait::async_trait; -use codec::{Decode, Encode}; -use futures::TryFutureExt; -use headers_relay::{ - sync_loop::TargetClient, - sync_types::{HeaderIdOf, QueuedHeader, SubmittedHeaders}, -}; -use relay_substrate_client::{Chain, Client, Error as SubstrateError}; -use relay_utils::{relay_loop::Client as RelayClient, HeaderId}; -use sp_core::Bytes; -use sp_runtime::Justification; -use std::collections::HashSet; - -/// Substrate client as Substrate headers target. -pub struct SubstrateHeadersTarget { - client: Client, - pipeline: P, -} - -impl SubstrateHeadersTarget { - /// Create new Substrate headers target. - pub fn new(client: Client, pipeline: P) -> Self { - SubstrateHeadersTarget { client, pipeline } - } -} - -impl Clone for SubstrateHeadersTarget { - fn clone(&self) -> Self { - SubstrateHeadersTarget { - client: self.client.clone(), - pipeline: self.pipeline.clone(), - } - } -} - -#[async_trait] -impl RelayClient for SubstrateHeadersTarget { - type Error = SubstrateError; - - async fn reconnect(&mut self) -> Result<(), SubstrateError> { - self.client.reconnect().await - } -} - -#[async_trait] -impl TargetClient

for SubstrateHeadersTarget -where - C: Chain, - P::Number: Decode, - P::Hash: Decode + Encode, - P: SubstrateHeadersSyncPipeline, -{ - async fn best_header_id(&self) -> Result, SubstrateError> { - // we can't continue to relay headers if target node is out of sync, because - // it may have already received (some of) headers that we're going to relay - self.client.ensure_synced().await?; - - let call = P::BEST_BLOCK_METHOD.into(); - let data = Bytes(Vec::new()); - - let encoded_response = self.client.state_call(call, data, None).await?; - let decoded_response: Vec<(P::Number, P::Hash)> = - Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; - - // If we parse an empty list of headers it means that bridge pallet has not been initalized - // yet. Otherwise we expect to always have at least one header. - decoded_response - .last() - .ok_or(SubstrateError::UninitializedBridgePallet) - .map(|(num, hash)| HeaderId(*num, *hash)) - } - - async fn is_known_header(&self, id: HeaderIdOf

) -> Result<(HeaderIdOf

, bool), SubstrateError> { - let call = P::IS_KNOWN_BLOCK_METHOD.into(); - let data = Bytes(id.1.encode()); - - let encoded_response = self.client.state_call(call, data, None).await?; - let is_known_block: bool = - Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; - - Ok((id, is_known_block)) - } - - async fn submit_headers( - &self, - mut headers: Vec>, - ) -> SubmittedHeaders, SubstrateError> { - debug_assert_eq!( - headers.len(), - 1, - "Substrate pallet only supports single header / transaction" - ); - - let header = headers.remove(0); - let id = header.id(); - let submit_transaction_result = self - .pipeline - .make_submit_header_transaction(header) - .and_then(|tx| self.client.submit_extrinsic(Bytes(tx.encode()))) - .await; - - match submit_transaction_result { - Ok(_) => SubmittedHeaders { - submitted: vec![id], - incomplete: Vec::new(), - rejected: Vec::new(), - fatal_error: None, - }, - Err(error) => SubmittedHeaders { - submitted: Vec::new(), - incomplete: Vec::new(), - rejected: vec![id], - fatal_error: Some(error), - }, - } - } - - async fn incomplete_headers_ids(&self) -> Result>, SubstrateError> { - let call = P::INCOMPLETE_HEADERS_METHOD.into(); - let data = Bytes(Vec::new()); - - let encoded_response = self.client.state_call(call, data, None).await?; - let decoded_response: Vec<(P::Number, P::Hash)> = - Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; - - let incomplete_headers = decoded_response - .into_iter() - .map(|(number, hash)| HeaderId(number, hash)) - .collect(); - Ok(incomplete_headers) - } - - async fn complete_header( - &self, - id: HeaderIdOf

, - completion: Justification, - ) -> Result, SubstrateError> { - let tx = self.pipeline.make_complete_header_transaction(id, completion).await?; - self.client.submit_extrinsic(Bytes(tx.encode())).await?; - Ok(id) - } - - async fn requires_extra(&self, header: QueuedHeader

) -> Result<(HeaderIdOf

, bool), SubstrateError> { - Ok((header.id(), false)) - } -} diff --git a/bridges/relays/substrate/src/main.rs b/bridges/relays/substrate/src/main.rs index 3a3c3620aeb25..722f850d89ef7 100644 --- a/bridges/relays/substrate/src/main.rs +++ b/bridges/relays/substrate/src/main.rs @@ -38,10 +38,9 @@ pub type MillauClient = relay_substrate_client::Client; pub type RialtoClient = relay_substrate_client::Client; mod cli; +mod finality_pipeline; +mod finality_target; mod headers_initialize; -mod headers_maintain; -mod headers_pipeline; -mod headers_target; mod messages_lane; mod messages_source; mod messages_target; @@ -101,7 +100,7 @@ async fn run_init_bridge(command: cli::InitBridge) -> Result<(), String> { &rialto_sign.signer, rialto_signer_next_index, rialto_runtime::SudoCall::sudo(Box::new( - rialto_runtime::BridgeMillauCall::initialize(initialization_data).into(), + rialto_runtime::FinalityBridgeMillauCall::initialize(initialization_data).into(), )) .into(), ) @@ -137,7 +136,7 @@ async fn run_init_bridge(command: cli::InitBridge) -> Result<(), String> { &millau_sign.signer, millau_signer_next_index, millau_runtime::SudoCall::sudo(Box::new( - millau_runtime::BridgeRialtoCall::initialize(initialization_data).into(), + millau_runtime::FinalityBridgeRialtoCall::initialize(initialization_data).into(), )) .into(), ) diff --git a/bridges/relays/substrate/src/messages_target.rs b/bridges/relays/substrate/src/messages_target.rs index e5ac8880c845d..db12be361f653 100644 --- a/bridges/relays/substrate/src/messages_target.rs +++ b/bridges/relays/substrate/src/messages_target.rs @@ -1,4 +1,4 @@ -// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// Copyright 2019-2021 Parity Technologies (UK) Ltd. // This file is part of Parity Bridges Common. // Parity Bridges Common is free software: you can redistribute it and/or modify diff --git a/bridges/relays/substrate/src/millau_headers_to_rialto.rs b/bridges/relays/substrate/src/millau_headers_to_rialto.rs index 8b77e71657a09..7b9df18d8a92c 100644 --- a/bridges/relays/substrate/src/millau_headers_to_rialto.rs +++ b/bridges/relays/substrate/src/millau_headers_to_rialto.rs @@ -17,70 +17,52 @@ //! Millau-to-Rialto headers sync entrypoint. use crate::{ - headers_pipeline::{SubstrateHeadersSyncPipeline, SubstrateHeadersToSubstrate}, + finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate}, MillauClient, RialtoClient, }; use async_trait::async_trait; -use bp_millau::{ - BEST_MILLAU_BLOCKS_METHOD, FINALIZED_MILLAU_BLOCK_METHOD, INCOMPLETE_MILLAU_HEADERS_METHOD, - IS_KNOWN_MILLAU_BLOCK_METHOD, -}; -use headers_relay::sync_types::QueuedHeader; -use relay_millau_client::{HeaderId as MillauHeaderId, Millau, SyncHeader as MillauSyncHeader}; -use relay_rialto_client::{BridgeMillauCall, Rialto, SigningParams as RialtoSigningParams}; -use relay_substrate_client::{Error as SubstrateError, TransactionSignScheme}; +use relay_millau_client::{Millau, SyncHeader as MillauSyncHeader}; +use relay_rialto_client::{Rialto, SigningParams as RialtoSigningParams}; +use relay_substrate_client::{finality_source::Justification, Error as SubstrateError, TransactionSignScheme}; use sp_core::Pair; -use sp_runtime::Justification; -/// Millau-to-Rialto headers sync pipeline. -pub(crate) type MillauHeadersToRialto = - SubstrateHeadersToSubstrate; -/// Millau header in-the-queue. -type QueuedMillauHeader = QueuedHeader; +/// Millau-to-Rialto finality sync pipeline. +pub(crate) type MillauFinalityToRialto = SubstrateFinalityToSubstrate; #[async_trait] -impl SubstrateHeadersSyncPipeline for MillauHeadersToRialto { - const BEST_BLOCK_METHOD: &'static str = BEST_MILLAU_BLOCKS_METHOD; - const FINALIZED_BLOCK_METHOD: &'static str = FINALIZED_MILLAU_BLOCK_METHOD; - const IS_KNOWN_BLOCK_METHOD: &'static str = IS_KNOWN_MILLAU_BLOCK_METHOD; - const INCOMPLETE_HEADERS_METHOD: &'static str = INCOMPLETE_MILLAU_HEADERS_METHOD; +impl SubstrateFinalitySyncPipeline for MillauFinalityToRialto { + const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_millau::BEST_FINALIZED_MILLAU_HEADER_METHOD; type SignedTransaction = ::SignedTransaction; - async fn make_submit_header_transaction( - &self, - header: QueuedMillauHeader, - ) -> Result { - let account_id = self.target_sign.signer.public().as_array_ref().clone().into(); - let nonce = self.target_client.next_account_index(account_id).await?; - let call = BridgeMillauCall::import_signed_header(header.header().clone().into_inner()).into(); - let transaction = Rialto::sign_transaction(&self.target_client, &self.target_sign.signer, nonce, call); - Ok(transaction) - } - - async fn make_complete_header_transaction( + async fn make_submit_finality_proof_transaction( &self, - id: MillauHeaderId, - completion: Justification, + header: MillauSyncHeader, + proof: Justification, ) -> Result { let account_id = self.target_sign.signer.public().as_array_ref().clone().into(); let nonce = self.target_client.next_account_index(account_id).await?; - let call = BridgeMillauCall::finalize_header(id.1, completion).into(); + let call = rialto_runtime::FinalityBridgeMillauCall::submit_finality_proof( + header.into_inner(), + proof.into_inner(), + (), + ) + .into(); let transaction = Rialto::sign_transaction(&self.target_client, &self.target_sign.signer, nonce, call); Ok(transaction) } } -/// Run Millau-to-Rialto headers sync. +/// Run Millau-to-Rialto finality sync. pub async fn run( millau_client: MillauClient, rialto_client: RialtoClient, rialto_sign: RialtoSigningParams, metrics_params: Option, ) { - crate::headers_pipeline::run( - MillauHeadersToRialto::new(rialto_client.clone(), rialto_sign), + crate::finality_pipeline::run( + MillauFinalityToRialto::new(rialto_client.clone(), rialto_sign), millau_client, rialto_client, metrics_params, diff --git a/bridges/relays/substrate/src/millau_messages_to_rialto.rs b/bridges/relays/substrate/src/millau_messages_to_rialto.rs index ebab5cfb381d3..ecd59e146fa13 100644 --- a/bridges/relays/substrate/src/millau_messages_to_rialto.rs +++ b/bridges/relays/substrate/src/millau_messages_to_rialto.rs @@ -51,8 +51,8 @@ impl SubstrateMessageLane for MillauMessagesToRialto { bp_millau::FROM_MILLAU_LATEST_CONFIRMED_NONCE_METHOD; const INBOUND_LANE_UNREWARDED_RELAYERS_STATE: &'static str = bp_millau::FROM_MILLAU_UNREWARDED_RELAYERS_STATE; - const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_millau::FINALIZED_MILLAU_BLOCK_METHOD; - const BEST_FINALIZED_TARGET_HEADER_ID_AT_SOURCE: &'static str = bp_rialto::FINALIZED_RIALTO_BLOCK_METHOD; + const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_millau::BEST_FINALIZED_MILLAU_HEADER_METHOD; + const BEST_FINALIZED_TARGET_HEADER_ID_AT_SOURCE: &'static str = bp_rialto::BEST_FINALIZED_RIALTO_HEADER_METHOD; type SourceSignedTransaction = ::SignedTransaction; type TargetSignedTransaction = ::SignedTransaction; diff --git a/bridges/relays/substrate/src/rialto_headers_to_millau.rs b/bridges/relays/substrate/src/rialto_headers_to_millau.rs index 3a13c6e148c63..6560d83c8c265 100644 --- a/bridges/relays/substrate/src/rialto_headers_to_millau.rs +++ b/bridges/relays/substrate/src/rialto_headers_to_millau.rs @@ -17,69 +17,52 @@ //! Rialto-to-Millau headers sync entrypoint. use crate::{ - headers_pipeline::{SubstrateHeadersSyncPipeline, SubstrateHeadersToSubstrate}, + finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate}, MillauClient, RialtoClient, }; use async_trait::async_trait; -use bp_rialto::{ - BEST_RIALTO_BLOCKS_METHOD, FINALIZED_RIALTO_BLOCK_METHOD, INCOMPLETE_RIALTO_HEADERS_METHOD, - IS_KNOWN_RIALTO_BLOCK_METHOD, -}; -use headers_relay::sync_types::QueuedHeader; -use relay_millau_client::{BridgeRialtoCall, Millau, SigningParams as MillauSigningParams}; -use relay_rialto_client::{HeaderId as RialtoHeaderId, Rialto, SyncHeader as RialtoSyncHeader}; -use relay_substrate_client::{Error as SubstrateError, TransactionSignScheme}; +use relay_millau_client::{Millau, SigningParams as MillauSigningParams}; +use relay_rialto_client::{Rialto, SyncHeader as RialtoSyncHeader}; +use relay_substrate_client::{finality_source::Justification, Error as SubstrateError, TransactionSignScheme}; use sp_core::Pair; -use sp_runtime::Justification; -/// Rialto-to-Millau headers sync pipeline. -type RialtoHeadersToMillau = SubstrateHeadersToSubstrate; -/// Rialto header in-the-queue. -type QueuedRialtoHeader = QueuedHeader; +/// Rialto-to-Millau finality sync pipeline. +pub(crate) type RialtoFinalityToMillau = SubstrateFinalityToSubstrate; #[async_trait] -impl SubstrateHeadersSyncPipeline for RialtoHeadersToMillau { - const BEST_BLOCK_METHOD: &'static str = BEST_RIALTO_BLOCKS_METHOD; - const FINALIZED_BLOCK_METHOD: &'static str = FINALIZED_RIALTO_BLOCK_METHOD; - const IS_KNOWN_BLOCK_METHOD: &'static str = IS_KNOWN_RIALTO_BLOCK_METHOD; - const INCOMPLETE_HEADERS_METHOD: &'static str = INCOMPLETE_RIALTO_HEADERS_METHOD; +impl SubstrateFinalitySyncPipeline for RialtoFinalityToMillau { + const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_rialto::BEST_FINALIZED_RIALTO_HEADER_METHOD; type SignedTransaction = ::SignedTransaction; - async fn make_submit_header_transaction( - &self, - header: QueuedRialtoHeader, - ) -> Result { - let account_id = self.target_sign.signer.public().as_array_ref().clone().into(); - let nonce = self.target_client.next_account_index(account_id).await?; - let call = BridgeRialtoCall::import_signed_header(header.header().clone().into_inner()).into(); - let transaction = Millau::sign_transaction(&self.target_client, &self.target_sign.signer, nonce, call); - Ok(transaction) - } - - async fn make_complete_header_transaction( + async fn make_submit_finality_proof_transaction( &self, - id: RialtoHeaderId, - completion: Justification, + header: RialtoSyncHeader, + proof: Justification, ) -> Result { let account_id = self.target_sign.signer.public().as_array_ref().clone().into(); let nonce = self.target_client.next_account_index(account_id).await?; - let call = BridgeRialtoCall::finalize_header(id.1, completion).into(); + let call = millau_runtime::FinalityBridgeRialtoCall::submit_finality_proof( + header.into_inner(), + proof.into_inner(), + (), + ) + .into(); let transaction = Millau::sign_transaction(&self.target_client, &self.target_sign.signer, nonce, call); Ok(transaction) } } -/// Run Rialto-to-Millau headers sync. +/// Run Rialto-to-Millau finality sync. pub async fn run( rialto_client: RialtoClient, millau_client: MillauClient, millau_sign: MillauSigningParams, metrics_params: Option, ) { - crate::headers_pipeline::run( - RialtoHeadersToMillau::new(millau_client.clone(), millau_sign), + crate::finality_pipeline::run( + RialtoFinalityToMillau::new(millau_client.clone(), millau_sign), rialto_client, millau_client, metrics_params, diff --git a/bridges/relays/substrate/src/rialto_messages_to_millau.rs b/bridges/relays/substrate/src/rialto_messages_to_millau.rs index 1c11a111413c9..07af9ce82866d 100644 --- a/bridges/relays/substrate/src/rialto_messages_to_millau.rs +++ b/bridges/relays/substrate/src/rialto_messages_to_millau.rs @@ -51,8 +51,8 @@ impl SubstrateMessageLane for RialtoMessagesToMillau { bp_rialto::FROM_RIALTO_LATEST_CONFIRMED_NONCE_METHOD; const INBOUND_LANE_UNREWARDED_RELAYERS_STATE: &'static str = bp_rialto::FROM_RIALTO_UNREWARDED_RELAYERS_STATE; - const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_rialto::FINALIZED_RIALTO_BLOCK_METHOD; - const BEST_FINALIZED_TARGET_HEADER_ID_AT_SOURCE: &'static str = bp_millau::FINALIZED_MILLAU_BLOCK_METHOD; + const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_rialto::BEST_FINALIZED_RIALTO_HEADER_METHOD; + const BEST_FINALIZED_TARGET_HEADER_ID_AT_SOURCE: &'static str = bp_millau::BEST_FINALIZED_MILLAU_HEADER_METHOD; type SourceSignedTransaction = ::SignedTransaction; type TargetSignedTransaction = ::SignedTransaction;