Skip to content

Commit

Permalink
collator-side: send parent head data (#3521)
Browse files Browse the repository at this point in the history
On top of #3302.

We want the validators to upgrade first before we add changes to the
collation side to send the new variants, which is why this part is
extracted into a separate PR.

The detection of when to send the parent head is based on the core
assignments at the relay parent of the candidate. We probably want to
make it more flexible in the future, but for now, it will work for a
simple use case when a para always has multiple cores assigned to it.

---------

Signed-off-by: Matteo Muraca <mmuraca247@gmail.com>
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Matteo Muraca <56828990+muraca@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Juan Ignacio Rios <54085674+JuaniRios@users.noreply.github.com>
Co-authored-by: Branislav Kontur <bkontur@gmail.com>
Co-authored-by: Bastian Köcher <git@kchr.de>
  • Loading branch information
6 people authored Mar 19, 2024
1 parent 923f27c commit 5fd72a1
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 47 deletions.
10 changes: 6 additions & 4 deletions cumulus/polkadot-parachain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ jsonrpsee = { version = "0.22", features = ["server"] }
people-rococo-runtime = { path = "../parachains/runtimes/people/people-rococo" }
people-westend-runtime = { path = "../parachains/runtimes/people/people-westend" }
parachains-common = { path = "../parachains/common" }
testnet-parachains-constants = { path = "../parachains/runtimes/constants", default-features = false, features = ["rococo", "westend"] }
testnet-parachains-constants = { path = "../parachains/runtimes/constants", default-features = false, features = [
"rococo",
"westend",
] }

# Substrate
frame-benchmarking = { path = "../../substrate/frame/benchmarking" }
Expand Down Expand Up @@ -168,6 +171,5 @@ try-runtime = [
"shell-runtime/try-runtime",
"sp-runtime/try-runtime",
]
fast-runtime = [
"bridge-hub-rococo-runtime/fast-runtime",
]
fast-runtime = ["bridge-hub-rococo-runtime/fast-runtime"]
elastic-scaling-experimental = ["polkadot-service/elastic-scaling-experimental"]
4 changes: 4 additions & 0 deletions polkadot/node/network/collator-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ parity-scale-codec = { version = "3.6.1", features = ["std"] }

polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
polkadot-primitives-test-helpers = { path = "../../../primitives/test-helpers" }

[features]
default = []
elastic-scaling-experimental = []
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use polkadot_node_network_protocol::{
PeerId,
};
use polkadot_node_primitives::PoV;
use polkadot_primitives::{CandidateHash, CandidateReceipt, Hash, HeadData, Id as ParaId};
use polkadot_node_subsystem::messages::ParentHeadData;
use polkadot_primitives::{CandidateHash, CandidateReceipt, Hash, Id as ParaId};

/// The status of a collation as seen from the collator.
pub enum CollationStatus {
Expand Down Expand Up @@ -59,12 +60,10 @@ impl CollationStatus {
pub struct Collation {
/// Candidate receipt.
pub receipt: CandidateReceipt,
/// Parent head-data hash.
pub parent_head_data_hash: Hash,
/// Proof to verify the state transition of the parachain.
pub pov: PoV,
/// Parent head-data needed for elastic scaling.
pub parent_head_data: HeadData,
/// Parent head-data (or just hash).
pub parent_head_data: ParentHeadData,
/// Collation status.
pub status: CollationStatus,
}
Expand Down
85 changes: 52 additions & 33 deletions polkadot/node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use polkadot_node_primitives::{CollationSecondedSignal, PoV, Statement};
use polkadot_node_subsystem::{
jaeger,
messages::{
CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeTxMessage, RuntimeApiMessage,
CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeTxMessage, ParentHeadData,
RuntimeApiMessage,
},
overseer, CollatorProtocolSenderTrait, FromOrchestra, OverseerSignal, PerLeafSpan,
};
Expand Down Expand Up @@ -395,12 +396,11 @@ async fn distribute_collation<Context>(
return Ok(())
}

// Determine which core the para collated-on is assigned to.
// Determine which core(s) the para collated-on is assigned to.
// If it is not scheduled then ignore the message.
let (our_core, num_cores) =
match determine_core(ctx.sender(), id, candidate_relay_parent, relay_parent_mode).await? {
Some(core) => core,
None => {
let (our_cores, num_cores) =
match determine_cores(ctx.sender(), id, candidate_relay_parent, relay_parent_mode).await? {
(cores, _num_cores) if cores.is_empty() => {
gum::warn!(
target: LOG_TARGET,
para_id = %id,
Expand All @@ -409,8 +409,20 @@ async fn distribute_collation<Context>(

return Ok(())
},
(cores, num_cores) => (cores, num_cores),
};

let elastic_scaling = our_cores.len() > 1;
if elastic_scaling {
gum::debug!(
target: LOG_TARGET,
para_id = %id,
cores = ?our_cores,
"{} is assigned to {} cores at {}", id, our_cores.len(), candidate_relay_parent,
);
}

let our_core = our_cores[0];
// Determine the group on that core.
//
// When prospective parachains are disabled, candidate relay parent here is
Expand Down Expand Up @@ -464,15 +476,15 @@ async fn distribute_collation<Context>(
state.collation_result_senders.insert(candidate_hash, result_sender);
}

let parent_head_data = if elastic_scaling {
ParentHeadData::WithData { hash: parent_head_data_hash, head_data: parent_head_data }
} else {
ParentHeadData::OnlyHash(parent_head_data_hash)
};

per_relay_parent.collations.insert(
candidate_hash,
Collation {
receipt,
parent_head_data_hash,
pov,
parent_head_data,
status: CollationStatus::Created,
},
Collation { receipt, pov, parent_head_data, status: CollationStatus::Created },
);

// If prospective parachains are disabled, a leaf should be known to peer.
Expand Down Expand Up @@ -513,15 +525,17 @@ async fn distribute_collation<Context>(
Ok(())
}

/// Get the Id of the Core that is assigned to the para being collated on if any
/// Get the core indices that are assigned to the para being collated on if any
/// and the total number of cores.
async fn determine_core(
async fn determine_cores(
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
para_id: ParaId,
relay_parent: Hash,
relay_parent_mode: ProspectiveParachainsMode,
) -> Result<Option<(CoreIndex, usize)>> {
) -> Result<(Vec<CoreIndex>, usize)> {
let cores = get_availability_cores(sender, relay_parent).await?;
let n_cores = cores.len();
let mut assigned_cores = Vec::new();

for (idx, core) in cores.iter().enumerate() {
let core_para_id = match core {
Expand All @@ -538,11 +552,11 @@ async fn determine_core(
};

if core_para_id == Some(para_id) {
return Ok(Some(((idx as u32).into(), cores.len())))
assigned_cores.push(CoreIndex::from(idx as u32));
}
}

Ok(None)
Ok((assigned_cores, n_cores))
}

/// Validators of a particular group index.
Expand Down Expand Up @@ -725,7 +739,7 @@ async fn advertise_collation<Context>(
let wire_message = protocol_v2::CollatorProtocolMessage::AdvertiseCollation {
relay_parent,
candidate_hash: *candidate_hash,
parent_head_data_hash: collation.parent_head_data_hash,
parent_head_data_hash: collation.parent_head_data.hash(),
};
Versioned::V2(protocol_v2::CollationProtocol::CollatorProtocol(wire_message))
},
Expand Down Expand Up @@ -849,28 +863,33 @@ async fn send_collation(
request: VersionedCollationRequest,
receipt: CandidateReceipt,
pov: PoV,
_parent_head_data: HeadData,
parent_head_data: ParentHeadData,
) {
let (tx, rx) = oneshot::channel();

let relay_parent = request.relay_parent();
let peer_id = request.peer_id();
let candidate_hash = receipt.hash();

// The response payload is the same for v1 and v2 versions of protocol
// and doesn't have v2 alias for simplicity.
// For now, we don't send parent head data to the collation requester.
let result =
// if assigned_multiple_cores {
// Ok(request_v1::CollationFetchingResponse::CollationWithParentHeadData {
// receipt,
// pov,
// parent_head_data,
// })
// } else {
#[cfg(feature = "elastic-scaling-experimental")]
let result = match parent_head_data {
ParentHeadData::WithData { head_data, .. } =>
Ok(request_v2::CollationFetchingResponse::CollationWithParentHeadData {
receipt,
pov,
parent_head_data: head_data,
}),
ParentHeadData::OnlyHash(_) =>
Ok(request_v1::CollationFetchingResponse::Collation(receipt, pov)),
};
#[cfg(not(feature = "elastic-scaling-experimental"))]
let result = {
// suppress unused warning
let _parent_head_data = parent_head_data;

Ok(request_v1::CollationFetchingResponse::Collation(receipt, pov))
// }
;
};

let response =
OutgoingResponse { result, reputation_changes: Vec::new(), sent_feedback: Some(tx) };

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,21 @@ impl Default for TestState {
}

impl TestState {
/// Adds a few more scheduled cores to the state for the same para id
/// compared to the default.
#[cfg(feature = "elastic-scaling-experimental")]
pub fn with_elastic_scaling() -> Self {
let mut state = Self::default();
let para_id = state.para_id;
state
.availability_cores
.push(CoreState::Scheduled(ScheduledCore { para_id, collator: None }));
state
.availability_cores
.push(CoreState::Scheduled(ScheduledCore { para_id, collator: None }));
state
}

fn current_group_validator_indices(&self) -> &[ValidatorIndex] {
let core_num = self.availability_cores.len();
let GroupIndex(group_idx) = self.group_rotation_info.group_for_core(CoreIndex(0), core_num);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,119 @@ fn distribute_collation_up_to_limit() {
)
}

/// Tests that collator send the parent head data in
/// case the para is assigned to multiple cores (elastic scaling).
#[test]
#[cfg(feature = "elastic-scaling-experimental")]
fn send_parent_head_data_for_elastic_scaling() {
let test_state = TestState::with_elastic_scaling();

let local_peer_id = test_state.local_peer_id;
let collator_pair = test_state.collator_pair.clone();

test_harness(
local_peer_id,
collator_pair,
ReputationAggregator::new(|_| true),
|test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let req_v1_cfg = test_harness.req_v1_cfg;
let mut req_v2_cfg = test_harness.req_v2_cfg;

let head_b = Hash::from_low_u64_be(129);
let head_b_num: u32 = 63;

// Set collating para id.
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::CollateOn(test_state.para_id),
)
.await;
update_view(&mut virtual_overseer, &test_state, vec![(head_b, head_b_num)], 1).await;

let pov_data = PoV { block_data: BlockData(vec![1 as u8]) };
let candidate = TestCandidateBuilder {
para_id: test_state.para_id,
relay_parent: head_b,
pov_hash: pov_data.hash(),
..Default::default()
}
.build();

let phd = HeadData(vec![1, 2, 3]);
let phdh = phd.hash();

distribute_collation_with_receipt(
&mut virtual_overseer,
&test_state,
head_b,
true,
candidate.clone(),
pov_data.clone(),
phdh,
)
.await;

let peer = test_state.validator_peer_id[0];
let validator_id = test_state.current_group_validator_authority_ids()[0].clone();
connect_peer(
&mut virtual_overseer,
peer,
CollationVersion::V2,
Some(validator_id.clone()),
)
.await;
expect_declare_msg_v2(&mut virtual_overseer, &test_state, &peer).await;

send_peer_view_change(&mut virtual_overseer, &peer, vec![head_b]).await;
let hashes: Vec<_> = vec![candidate.hash()];
expect_advertise_collation_msg(&mut virtual_overseer, &peer, head_b, Some(hashes))
.await;

let (pending_response, rx) = oneshot::channel();
req_v2_cfg
.inbound_queue
.as_mut()
.unwrap()
.send(RawIncomingRequest {
peer,
payload: request_v2::CollationFetchingRequest {
relay_parent: head_b,
para_id: test_state.para_id,
candidate_hash: candidate.hash(),
}
.encode(),
pending_response,
})
.await
.unwrap();

assert_matches!(
rx.await,
Ok(full_response) => {
let response: request_v2::CollationFetchingResponse =
request_v2::CollationFetchingResponse::decode(&mut
full_response.result
.expect("We should have a proper answer").as_ref()
).expect("Decoding should work");
assert_matches!(
response,
request_v1::CollationFetchingResponse::CollationWithParentHeadData {
receipt, pov, parent_head_data
} => {
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_data);
assert_eq!(parent_head_data, phd);
}
);
}
);

TestHarness { virtual_overseer, req_v1_cfg, req_v2_cfg }
},
)
}

/// Tests that collator correctly handles peer V2 requests.
#[test]
fn advertise_and_send_collation_by_hash() {
Expand Down
9 changes: 5 additions & 4 deletions polkadot/node/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,15 @@ try-runtime = [
"sp-runtime/try-runtime",
"westend-runtime?/try-runtime",
]
fast-runtime = [
"rococo-runtime?/fast-runtime",
"westend-runtime?/fast-runtime",
]
fast-runtime = ["rococo-runtime?/fast-runtime", "westend-runtime?/fast-runtime"]

malus = ["full-node"]
runtime-metrics = [
"polkadot-runtime-parachains/runtime-metrics",
"rococo-runtime?/runtime-metrics",
"westend-runtime?/runtime-metrics",
]

elastic-scaling-experimental = [
"polkadot-collator-protocol?/elastic-scaling-experimental",
]
Loading

0 comments on commit 5fd72a1

Please sign in to comment.