Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Parachain node should not recover blocks while syncing #2462

Merged
merged 3 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions client/pov-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@

use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};

use polkadot_node_primitives::{AvailableData, POV_BOMB_LIMIT};
Expand Down Expand Up @@ -228,6 +228,7 @@ pub struct PoVRecovery<Block: BlockT, PC, RC> {
recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
/// Blocks that we are retrying currently
candidates_in_retry: HashSet<Block::Hash>,
parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
}

impl<Block: BlockT, PC, RCInterface> PoVRecovery<Block, PC, RCInterface>
Expand All @@ -244,6 +245,7 @@ where
relay_chain_interface: RCInterface,
para_id: ParaId,
recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
) -> Self {
Self {
candidates: HashMap::new(),
Expand All @@ -256,6 +258,7 @@ where
para_id,
candidates_in_retry: HashSet::new(),
recovery_chan_rx,
parachain_sync_service,
}
}

Expand Down Expand Up @@ -538,14 +541,19 @@ where
pub async fn run(mut self) {
let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse();
let pending_candidates =
match pending_candidates(self.relay_chain_interface.clone(), self.para_id).await {
Ok(pending_candidate_stream) => pending_candidate_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
return
},
};
let pending_candidates = match pending_candidates(
self.relay_chain_interface.clone(),
self.para_id,
self.parachain_sync_service.clone(),
)
.await
{
Ok(pending_candidate_stream) => pending_candidate_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
return
},
};

futures::pin_mut!(pending_candidates);

Expand Down Expand Up @@ -600,13 +608,24 @@ where
async fn pending_candidates(
relay_chain_client: impl RelayChainInterface + Clone,
para_id: ParaId,
sync_service: Arc<dyn SyncOracle + Sync + Send>,
) -> RelayChainResult<impl Stream<Item = (CommittedCandidateReceipt, SessionIndex)>> {
let import_notification_stream = relay_chain_client.import_notification_stream().await?;

let filtered_stream = import_notification_stream.filter_map(move |n| {
let client_for_closure = relay_chain_client.clone();
let sync_oracle = sync_service.clone();
async move {
let hash = n.hash();
if sync_oracle.is_major_syncing() {
altonen marked this conversation as resolved.
Show resolved Hide resolved
tracing::debug!(
target: LOG_TARGET,
relay_hash = ?hash,
"Skipping candidate due to sync.",
);
return None
}

let pending_availability_result = client_for_closure
.candidate_pending_availability(hash, para_id)
.await
Expand Down
6 changes: 6 additions & 0 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawn
pub collator_key: CollatorPair,
pub relay_chain_slot_duration: Duration,
pub recovery_handle: Box<dyn RecoveryHandle>,
pub sync_service: Arc<SyncingService<Block>>,
}

/// Start a collator node for a parachain.
Expand All @@ -91,6 +92,7 @@ pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner
collator_key,
relay_chain_slot_duration,
recovery_handle,
sync_service,
}: StartCollatorParams<'a, Block, BS, Client, RCInterface, Spawner>,
) -> sc_service::error::Result<()>
where
Expand Down Expand Up @@ -136,6 +138,7 @@ where
relay_chain_interface.clone(),
para_id,
recovery_chan_rx,
sync_service,
);

task_manager
Expand Down Expand Up @@ -170,6 +173,7 @@ pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface> {
pub relay_chain_slot_duration: Duration,
pub import_queue: Box<dyn ImportQueueService<Block>>,
pub recovery_handle: Box<dyn RecoveryHandle>,
pub sync_service: Arc<SyncingService<Block>>,
}

/// Start a full node for a parachain.
Expand All @@ -186,6 +190,7 @@ pub fn start_full_node<Block, Client, Backend, RCInterface>(
relay_chain_slot_duration,
import_queue,
recovery_handle,
sync_service,
}: StartFullNodeParams<Block, Client, RCInterface>,
) -> sc_service::error::Result<()>
where
Expand Down Expand Up @@ -231,6 +236,7 @@ where
relay_chain_interface,
para_id,
recovery_chan_rx,
sync_service,
);

task_manager
Expand Down
4 changes: 3 additions & 1 deletion parachain-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ async fn start_node_impl(
&task_manager,
relay_chain_interface.clone(),
transaction_pool,
sync_service,
sync_service.clone(),
params.keystore_container.keystore(),
force_authoring,
para_id,
Expand All @@ -291,6 +291,7 @@ async fn start_node_impl(
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
relay_chain_slot_duration,
recovery_handle: Box::new(overseer_handle),
sync_service,
};

start_collator(params).await?;
Expand All @@ -304,6 +305,7 @@ async fn start_node_impl(
relay_chain_slot_duration,
import_queue: import_queue_service,
recovery_handle: Box::new(overseer_handle),
sync_service,
};

start_full_node(params)?;
Expand Down
12 changes: 9 additions & 3 deletions polkadot-parachain/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ where
&task_manager,
relay_chain_interface.clone(),
transaction_pool,
sync_service,
sync_service.clone(),
params.keystore_container.keystore(),
force_authoring,
)?;
Expand All @@ -480,6 +480,7 @@ where
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
relay_chain_slot_duration,
recovery_handle: Box::new(overseer_handle),
sync_service,
};

start_collator(params).await?;
Expand All @@ -493,6 +494,7 @@ where
relay_chain_slot_duration,
import_queue: import_queue_service,
recovery_handle: Box::new(overseer_handle),
sync_service,
};

start_full_node(params)?;
Expand Down Expand Up @@ -659,7 +661,7 @@ where
&task_manager,
relay_chain_interface.clone(),
transaction_pool,
sync_service,
sync_service.clone(),
params.keystore_container.keystore(),
force_authoring,
)?;
Expand All @@ -679,6 +681,7 @@ where
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
relay_chain_slot_duration,
recovery_handle: Box::new(overseer_handle),
sync_service,
};

start_collator(params).await?;
Expand All @@ -692,6 +695,7 @@ where
relay_chain_slot_duration,
import_queue: import_queue_service,
recovery_handle: Box::new(overseer_handle),
sync_service,
};

start_full_node(params)?;
Expand Down Expand Up @@ -1429,7 +1433,7 @@ where
&task_manager,
relay_chain_interface.clone(),
transaction_pool,
sync_service,
sync_service.clone(),
params.keystore_container.keystore(),
force_authoring,
)?;
Expand All @@ -1449,6 +1453,7 @@ where
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
relay_chain_slot_duration,
recovery_handle: Box::new(overseer_handle),
sync_service,
};

start_collator(params).await?;
Expand All @@ -1462,6 +1467,7 @@ where
relay_chain_slot_duration,
import_queue: import_queue_service,
recovery_handle: Box::new(overseer_handle),
sync_service,
};

start_full_node(params)?;
Expand Down
2 changes: 2 additions & 0 deletions test/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ where
import_queue: import_queue_service,
relay_chain_slot_duration: Duration::from_secs(6),
recovery_handle,
sync_service,
};

start_collator(params).await?;
Expand All @@ -446,6 +447,7 @@ where
import_queue: import_queue_service,
relay_chain_slot_duration: Duration::from_secs(6),
recovery_handle,
sync_service,
};

start_full_node(params)?;
Expand Down