Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
alindima committed May 24, 2024
1 parent c721152 commit 6faa1ee
Showing 1 changed file with 75 additions and 53 deletions.
128 changes: 75 additions & 53 deletions polkadot/node/network/availability-recovery/src/task/strategy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use polkadot_node_subsystem::{
overseer, RecoveryError,
};
use polkadot_primitives::{AuthorityDiscoveryId, BlakeTwo256, ChunkIndex, HashT, ValidatorIndex};
use sc_network::{IfDisconnected, OutboundFailure, RequestFailure};
use sc_network::{IfDisconnected, OutboundFailure, ProtocolName, RequestFailure};
use std::{
collections::{BTreeMap, HashMap, VecDeque},
time::Duration,
Expand Down Expand Up @@ -79,7 +79,7 @@ pub const REGULAR_CHUNKS_REQ_RETRY_LIMIT: u32 = 5;
type OngoingRequests = FuturesUndead<(
AuthorityDiscoveryId,
ValidatorIndex,
Result<Option<ErasureChunk>, RequestError>,
Result<(Option<ErasureChunk>, ProtocolName), RequestError>,
)>;

const fn is_unavailable(
Expand Down Expand Up @@ -384,15 +384,14 @@ impl State {
let res = match res.await {
Ok((bytes, protocol)) =>
if v2_protocol_name == protocol {
params.metrics.on_chunk_response_v2();
match req_res::v2::ChunkFetchingResponse::decode(&mut &bytes[..]) {
Ok(req_res::v2::ChunkFetchingResponse::Chunk(chunk)) =>
Ok(Some(chunk.into())),
Ok(req_res::v2::ChunkFetchingResponse::NoSuchChunk) => Ok(None),
Ok((Some(chunk.into()), protocol)),
Ok(req_res::v2::ChunkFetchingResponse::NoSuchChunk) =>
Ok((None, protocol)),
Err(e) => Err(RequestError::InvalidResponse(e)),
}
} else if v1_protocol_name == protocol {
params.metrics.on_chunk_response_v1();
// V1 protocol version must not be used when chunk mapping node
// feature is enabled, because we can't know the real index of the
// returned chunk.
Expand All @@ -410,9 +409,12 @@ impl State {
}

match req_res::v1::ChunkFetchingResponse::decode(&mut &bytes[..]) {
Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) =>
Ok(Some(chunk.recombine_into_chunk(&raw_request_v1))),
Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) => Ok(None),
Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) => Ok((
Some(chunk.recombine_into_chunk(&raw_request_v1)),
protocol,
)),
Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) =>
Ok((None, protocol)),
Err(e) => Err(RequestError::InvalidResponse(e)),
}
} else {
Expand Down Expand Up @@ -480,42 +482,54 @@ impl State {
let mut is_error = false;

match request_result {
Ok(Some(chunk)) =>
if is_chunk_valid(params, &chunk) {
metrics.on_chunk_request_succeeded(strategy_type);
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
?authority_id,
?validator_index,
"Received valid chunk",
);
self.insert_chunk(
chunk.index,
Chunk { chunk: chunk.chunk, validator_index },
);
} else {
metrics.on_chunk_request_invalid(strategy_type);
error_count += 1;
// Record that we got an invalid chunk so that subsequent strategies don't
// try requesting this again.
self.record_error_fatal(authority_id.clone(), validator_index);
is_error = true;
},
Ok(None) => {
metrics.on_chunk_request_no_such_chunk(strategy_type);
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
?authority_id,
?validator_index,
"Validator did not have the chunk",
);
error_count += 1;
// Record that the validator did not have this chunk so that subsequent
// strategies don't try requesting this again.
self.record_error_fatal(authority_id.clone(), validator_index);
is_error = true;
Ok((maybe_chunk, protocol)) => {
match protocol {
name if name == params.req_v1_protocol_name =>
params.metrics.on_chunk_response_v1(),
name if name == params.req_v2_protocol_name =>
params.metrics.on_chunk_response_v2(),
_ => {},
}

match maybe_chunk {
Some(chunk) =>
if is_chunk_valid(params, &chunk) {
metrics.on_chunk_request_succeeded(strategy_type);
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
?authority_id,
?validator_index,
"Received valid chunk",
);
self.insert_chunk(
chunk.index,
Chunk { chunk: chunk.chunk, validator_index },
);
} else {
metrics.on_chunk_request_invalid(strategy_type);
error_count += 1;
// Record that we got an invalid chunk so that subsequent strategies
// don't try requesting this again.
self.record_error_fatal(authority_id.clone(), validator_index);
is_error = true;
},
None => {
metrics.on_chunk_request_no_such_chunk(strategy_type);
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
?authority_id,
?validator_index,
"Validator did not have the chunk",
);
error_count += 1;
// Record that the validator did not have this chunk so that subsequent
// strategies don't try requesting this again.
self.record_error_fatal(authority_id.clone(), validator_index);
is_error = true;
},
}
},
Err(err) => {
error_count += 1;
Expand Down Expand Up @@ -1061,7 +1075,7 @@ mod tests {
future::ready((
params.validator_authority_keys[0].clone(),
0.into(),
Ok(Some(chunks[0].clone())),
Ok((Some(chunks[0].clone()), "".into())),
))
.boxed(),
);
Expand All @@ -1070,13 +1084,17 @@ mod tests {
future::ready((
params.validator_authority_keys[1].clone(),
1.into(),
Ok(Some(chunks[1].clone())),
Ok((Some(chunks[1].clone()), "".into())),
))
.boxed(),
);
ongoing_reqs.push(
future::ready((params.validator_authority_keys[2].clone(), 2.into(), Ok(None)))
.boxed(),
future::ready((
params.validator_authority_keys[2].clone(),
2.into(),
Ok((None, "".into())),
))
.boxed(),
);
ongoing_reqs.push(
future::ready((
Expand Down Expand Up @@ -1201,7 +1219,7 @@ mod tests {
future::ready((
params.validator_authority_keys[0].clone(),
0.into(),
Ok(Some(chunks[0].clone())),
Ok((Some(chunks[0].clone()), "".into())),
))
.boxed(),
);
Expand All @@ -1210,13 +1228,17 @@ mod tests {
future::ready((
params.validator_authority_keys[1].clone(),
1.into(),
Ok(Some(chunks[1].clone())),
Ok((Some(chunks[1].clone()), "".into())),
))
.boxed(),
);
ongoing_reqs.push(
future::ready((params.validator_authority_keys[2].clone(), 2.into(), Ok(None)))
.boxed(),
future::ready((
params.validator_authority_keys[2].clone(),
2.into(),
Ok((None, "".into())),
))
.boxed(),
);
ongoing_reqs.push(
future::ready((
Expand Down

0 comments on commit 6faa1ee

Please sign in to comment.