Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Delay reputation updates #7214

Merged
merged 52 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
752e0f8
Add futures-timer
AndreiEres May 11, 2023
1155196
Make cost_or_benefit public
AndreiEres May 11, 2023
0d4cdb0
Update ReportPeer message format
AndreiEres May 11, 2023
baf0d91
Add delay to reputation updates (dirtywork)
AndreiEres May 11, 2023
c29e469
Update ReputationAggregator
AndreiEres May 17, 2023
72a8336
Update tests
AndreiEres May 24, 2023
2ce22a1
Fix flucky tests
AndreiEres May 24, 2023
19c7628
Move reputation to state
AndreiEres May 26, 2023
9f51ba7
Use the main loop for handling reputation sendings
AndreiEres May 26, 2023
c6aad01
Update
AndreiEres May 26, 2023
ca4a3da
Move reputation to utils
AndreiEres May 26, 2023
f44b6f7
Update reputation sending
AndreiEres May 26, 2023
d596fd7
Fix arguments order
AndreiEres May 26, 2023
0889400
Update state
AndreiEres May 26, 2023
f07c1b5
Remove new from state
AndreiEres May 26, 2023
99ca0eb
Add constant
AndreiEres May 26, 2023
df1f71d
Add failing test for delay
AndreiEres May 26, 2023
657436b
Change mocking approach
AndreiEres May 30, 2023
40c5a39
Fix type errors
AndreiEres May 30, 2023
4ff2bd8
Fix comments
AndreiEres May 31, 2023
45f0ccf
Add message handling to select
AndreiEres May 31, 2023
6edfe3e
Fix bitfields-distribution tests
AndreiEres May 31, 2023
a82dc2f
Add docs to reputation aggregator
AndreiEres May 31, 2023
4c1b453
Replace .into_base_rep
AndreiEres May 31, 2023
4cd0489
Use one REPUTATION_CHANGE_INTERVAL by default
AndreiEres May 31, 2023
39ea432
Add reputation change to statement-distribution
AndreiEres May 31, 2023
cbf06d6
Update polkadot-availability-bitfield-distribution
AndreiEres May 31, 2023
bed24c3
Update futures selecting in subsystems
AndreiEres Jun 1, 2023
b989d63
Update reputation adding
AndreiEres Jun 1, 2023
8ccf8fa
Send malicious changes right away without adding to state
AndreiEres Jun 1, 2023
03ab24b
Add reputation to StatementDistributionSubsystem
AndreiEres Jun 1, 2023
f1e3208
Handle reputation in statement distribution
AndreiEres Jun 1, 2023
db2d9f2
Add delay test for polkadot-statement-distribution
AndreiEres Jun 1, 2023
daa2f32
Fix collator-protocol tests before applying reputation delay
AndreiEres Jun 1, 2023
2a0cb04
Remove into_base_rep
AndreiEres Jun 1, 2023
d68fddc
Add reputation to State
AndreiEres Jun 1, 2023
adf5b08
Fix failed tests
AndreiEres Jun 1, 2023
52494fc
Add reputation delay
AndreiEres Jun 1, 2023
62c3e33
Update tests
AndreiEres Jun 1, 2023
df61a7f
Add batched network message for peer reporting
AndreiEres Jun 1, 2023
704f87b
Update approval-distribution tests
AndreiEres Jun 2, 2023
f7d4905
Update bitfield-distribution tests
AndreiEres Jun 2, 2023
86b3b63
Update statement-distribution tests
AndreiEres Jun 2, 2023
0c637ee
Update collator-protocol tests
AndreiEres Jun 5, 2023
f1ea5d4
Remove levels in matching
AndreiEres Jun 5, 2023
6cc8fc9
Address clippy errors
AndreiEres Jun 5, 2023
a60e449
Fix overseer test
AndreiEres Jun 6, 2023
0a8738e
Add a metric for original count of rep changes
AndreiEres Jun 7, 2023
2ee603f
Update Reputation
AndreiEres Jun 7, 2023
3f8b6e5
Revert "Add a metric for original count of rep changes"
AndreiEres Jun 7, 2023
375230c
Update node/subsystem-util/src/reputation.rs
AndreiEres Jun 9, 2023
de9e834
Remove redundant vec
AndreiEres Jun 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion node/network/approval-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ polkadot-node-metrics = { path = "../../metrics" }
polkadot-node-network-protocol = { path = "../protocol" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-jaeger = { path = "../../jaeger" }
rand = "0.8"

futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }

[dev-dependencies]
sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }

polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
polkadot-primitives-test-helpers = { path = "../../../primitives/test-helpers" }

Expand Down
192 changes: 147 additions & 45 deletions node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

#![warn(missing_docs)]

use futures::{channel::oneshot, FutureExt as _};
use futures::{channel::oneshot, select, FutureExt as _};
use polkadot_node_jaeger as jaeger;
use polkadot_node_network_protocol::{
self as net_protocol,
Expand All @@ -38,11 +38,15 @@ use polkadot_node_subsystem::{
},
overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_util::reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL};
use polkadot_primitives::{
BlockNumber, CandidateIndex, Hash, SessionIndex, ValidatorIndex, ValidatorSignature,
};
use rand::{CryptoRng, Rng, SeedableRng};
use std::collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque};
use std::{
collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque},
time::Duration,
};

use self::metrics::Metrics;

Expand Down Expand Up @@ -187,6 +191,9 @@ struct State {

/// Current approval checking finality lag.
approval_checking_lag: BlockNumber,

/// Aggregated reputation change
reputation: ReputationAggregator,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -755,7 +762,13 @@ impl State {
"Unexpected assignment",
);
if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
modify_reputation(ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
}
}
return
Expand All @@ -780,7 +793,13 @@ impl State {
?message_subject,
"Duplicate assignment",
);
modify_reputation(ctx.sender(), peer_id, COST_DUPLICATE_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_DUPLICATE_MESSAGE,
)
.await;
}
return
}
Expand All @@ -792,13 +811,25 @@ impl State {
?message_subject,
"Assignment from a peer is out of view",
);
modify_reputation(ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
},
}

// if the assignment is known to be valid, reward the peer
if entry.knowledge.contains(&message_subject, message_kind) {
modify_reputation(ctx.sender(), peer_id, BENEFIT_VALID_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
BENEFIT_VALID_MESSAGE,
)
.await;
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "Known assignment");
peer_knowledge.received.insert(message_subject, message_kind);
Expand Down Expand Up @@ -834,7 +865,13 @@ impl State {
);
match result {
AssignmentCheckResult::Accepted => {
modify_reputation(ctx.sender(), peer_id, BENEFIT_VALID_MESSAGE_FIRST).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
BENEFIT_VALID_MESSAGE_FIRST,
)
.await;
entry.knowledge.known_messages.insert(message_subject.clone(), message_kind);
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
peer_knowledge.received.insert(message_subject.clone(), message_kind);
Expand Down Expand Up @@ -862,8 +899,13 @@ impl State {
?peer_id,
"Got an assignment too far in the future",
);
modify_reputation(ctx.sender(), peer_id, COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE)
.await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE,
)
.await;
return
},
AssignmentCheckResult::Bad(error) => {
Expand All @@ -874,7 +916,13 @@ impl State {
%error,
"Got a bad assignment from peer",
);
modify_reputation(ctx.sender(), peer_id, COST_INVALID_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_INVALID_MESSAGE,
)
.await;
return
},
}
Expand Down Expand Up @@ -1024,7 +1072,13 @@ impl State {
_ => {
if let Some(peer_id) = source.peer_id() {
if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
modify_reputation(ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
}
}
return
Expand All @@ -1043,7 +1097,13 @@ impl State {
?message_subject,
"Unknown approval assignment",
);
modify_reputation(ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
return
}

Expand All @@ -1060,7 +1120,13 @@ impl State {
"Duplicate approval",
);

modify_reputation(ctx.sender(), peer_id, COST_DUPLICATE_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_DUPLICATE_MESSAGE,
)
.await;
}
return
}
Expand All @@ -1072,14 +1138,26 @@ impl State {
?message_subject,
"Approval from a peer is out of view",
);
modify_reputation(ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
},
}

// if the approval is known to be valid, reward the peer
if entry.knowledge.contains(&message_subject, message_kind) {
gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "Known approval");
modify_reputation(ctx.sender(), peer_id, BENEFIT_VALID_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
BENEFIT_VALID_MESSAGE,
)
.await;
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
peer_knowledge.received.insert(message_subject.clone(), message_kind);
}
Expand Down Expand Up @@ -1110,15 +1188,27 @@ impl State {
);
match result {
ApprovalCheckResult::Accepted => {
modify_reputation(ctx.sender(), peer_id, BENEFIT_VALID_MESSAGE_FIRST).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
BENEFIT_VALID_MESSAGE_FIRST,
)
.await;

entry.knowledge.insert(message_subject.clone(), message_kind);
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
peer_knowledge.received.insert(message_subject.clone(), message_kind);
}
},
ApprovalCheckResult::Bad(error) => {
modify_reputation(ctx.sender(), peer_id, COST_INVALID_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_INVALID_MESSAGE,
)
.await;
gum::info!(
target: LOG_TARGET,
?peer_id,
Expand Down Expand Up @@ -1669,6 +1759,7 @@ async fn adjust_required_routing_and_propagate<Context, BlockFilter, RoutingModi

/// Modify the reputation of a peer based on its behavior.
async fn modify_reputation(
reputation: &mut ReputationAggregator,
sender: &mut impl overseer::ApprovalDistributionSenderTrait,
peer_id: PeerId,
rep: Rep,
Expand All @@ -1679,8 +1770,7 @@ async fn modify_reputation(
?peer_id,
"Reputation change for peer",
);

sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer_id, rep)).await;
reputation.modify(sender, peer_id, rep).await;
}

#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
Expand All @@ -1696,45 +1786,57 @@ impl ApprovalDistribution {
// According to the docs of `rand`, this is a ChaCha12 RNG in practice
// and will always be chosen for strong performance and security properties.
let mut rng = rand::rngs::StdRng::from_entropy();
self.run_inner(ctx, &mut state, &mut rng).await
self.run_inner(ctx, &mut state, REPUTATION_CHANGE_INTERVAL, &mut rng).await
}

/// Used for testing.
async fn run_inner<Context>(
self,
mut ctx: Context,
state: &mut State,
reputation_interval: Duration,
rng: &mut (impl CryptoRng + Rng),
) {
let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
let mut reputation_delay = new_reputation_delay();

loop {
let message = match ctx.recv().await {
Ok(message) => message,
Err(e) => {
gum::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
return
select! {
_ = reputation_delay => {
state.reputation.send(ctx.sender()).await;
reputation_delay = new_reputation_delay();
},
};
match message {
FromOrchestra::Communication { msg } =>
Self::handle_incoming(&mut ctx, state, msg, &self.metrics, rng).await,
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
// the relay chain blocks relevant to the approval subsystems
// are those that are available, but not finalized yet
// actived and deactivated heads hence are irrelevant to this subsystem, other than
// for tracing purposes.
if let Some(activated) = update.activated {
let head = activated.hash;
let approval_distribution_span =
jaeger::PerLeafSpan::new(activated.span, "approval-distribution");
state.spans.insert(head, approval_distribution_span);
message = ctx.recv().fuse() => {
let message = match message {
Ok(message) => message,
Err(e) => {
gum::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
return
},
};
match message {
FromOrchestra::Communication { msg } =>
Self::handle_incoming(&mut ctx, state, msg, &self.metrics, rng).await,
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
// the relay chain blocks relevant to the approval subsystems
// are those that are available, but not finalized yet
// actived and deactivated heads hence are irrelevant to this subsystem, other than
// for tracing purposes.
if let Some(activated) = update.activated {
let head = activated.hash;
let approval_distribution_span =
jaeger::PerLeafSpan::new(activated.span, "approval-distribution");
state.spans.insert(head, approval_distribution_span);
}
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
gum::trace!(target: LOG_TARGET, number = %number, "finalized signal");
state.handle_block_finalized(&mut ctx, &self.metrics, number).await;
},
FromOrchestra::Signal(OverseerSignal::Conclude) => return,
}
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
gum::trace!(target: LOG_TARGET, number = %number, "finalized signal");
state.handle_block_finalized(&mut ctx, &self.metrics, number).await;
},
FromOrchestra::Signal(OverseerSignal::Conclude) => return,
}
}
}
Expand Down
Loading