diff --git a/core/finality-grandpa/src/communication.rs b/core/finality-grandpa/src/communication.rs index 32b2cc3a7605b..3a1b8e996f8aa 100644 --- a/core/finality-grandpa/src/communication.rs +++ b/core/finality-grandpa/src/communication.rs @@ -395,14 +395,16 @@ pub(crate) struct CommitsOut { network: N, set_id: u64, _marker: ::std::marker::PhantomData, + is_voter: bool, } impl CommitsOut { /// Create a new commit output stream. - pub(crate) fn new(network: N, set_id: u64) -> Self { + pub(crate) fn new(network: N, set_id: u64, is_voter: bool) -> Self { CommitsOut { network, set_id, + is_voter, _marker: Default::default(), } } @@ -413,6 +415,10 @@ impl Sink for CommitsOut { type SinkError = Error; fn start_send(&mut self, input: (u64, Commit)) -> StartSend { + if !self.is_voter { + return Ok(AsyncSink::Ready); + } + let (round, commit) = input; let (precommits, auth_data) = commit.precommits.into_iter() .map(|signed| (signed.precommit, (signed.signature, signed.id))) diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index 2eb6bc3023c2b..fa9703c8cd62f 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -528,10 +528,13 @@ impl, N, RA> voter::Environment( round, self.set_id, - self.config.local_key.clone(), + local_key.cloned(), self.voters.clone(), self.network.clone(), ); @@ -1407,6 +1410,7 @@ pub fn block_import, RA, PRA>( } fn committer_communication, B, E, N, RA>( + local_key: Option>, set_id: u64, voters: &Arc>, client: &Arc>, @@ -1442,9 +1446,14 @@ fn committer_communication, B, E, N, RA>( commit_in, ); + let is_voter = local_key + .map(|pair| voters.contains_key(&pair.public().into())) + .unwrap_or(false); + let commit_out = ::communication::CommitsOut::::new( network.clone(), set_id, + is_voter, ); let commit_in = commit_in.map_err(Into::into); @@ -1524,6 +1533,7 @@ pub fn run_grandpa, N, RA>( ); let committer_data = committer_communication( + config.local_key.clone(), env.set_id, &env.voters, &client, diff --git a/core/network/src/consensus_gossip.rs b/core/network/src/consensus_gossip.rs index b9eec32b30cd1..6997abd84119c 100644 --- a/core/network/src/consensus_gossip.rs +++ b/core/network/src/consensus_gossip.rs @@ -41,7 +41,6 @@ struct MessageEntry { message_hash: B::Hash, message: ConsensusMessage, broadcast: bool, - instant: Instant, } /// Consensus network protocol handler. Manages statements and candidate requests. @@ -50,6 +49,7 @@ pub struct ConsensusGossip { live_message_sinks: HashMap>>, messages: Vec>, known_messages: HashSet<(B::Hash, B::Hash)>, + message_times: HashMap<(B::Hash, B::Hash), Instant>, session_start: Option, } @@ -61,6 +61,7 @@ impl ConsensusGossip { live_message_sinks: HashMap::new(), messages: Default::default(), known_messages: Default::default(), + message_times: Default::default(), session_start: None } } @@ -155,9 +156,10 @@ impl ConsensusGossip { topic, message_hash, broadcast, - instant: Instant::now(), message: get_message(), }); + + self.message_times.insert((topic, message_hash), Instant::now()); } } @@ -174,20 +176,33 @@ impl ConsensusGossip { !sinks.is_empty() }); - let hashes = &mut self.known_messages; + let message_times = &mut self.message_times; + let known_messages = &mut self.known_messages; let before = self.messages.len(); let now = Instant::now(); + self.messages.retain(|entry| { - if entry.instant + MESSAGE_LIFETIME >= now && predicate(&entry.topic) { - true - } else { - hashes.remove(&(entry.topic, entry.message_hash)); - false - } + message_times.get(&(entry.topic, entry.message_hash)) + .map(|instant| *instant + MESSAGE_LIFETIME >= now && predicate(&entry.topic)) + .unwrap_or(false) }); - trace!(target:"gossip", "Cleaned up {} stale messages, {} left", before - self.messages.len(), self.messages.len()); + + known_messages.retain(|(topic, message_hash)| { + message_times.get(&(*topic, *message_hash)) + .map(|instant| *instant + (2 * MESSAGE_LIFETIME) >= now && predicate(topic)) + .unwrap_or(false) + }); + + trace!(target:"gossip", "Cleaned up {} stale messages, {} left ({} known)", + before - self.messages.len(), + self.messages.len(), + known_messages.len(), + ); + + message_times.retain(|h, _| known_messages.contains(h)); + for (_, ref mut peer) in self.peers.iter_mut() { - peer.known_messages.retain(|h| hashes.contains(h)); + peer.known_messages.retain(|h| known_messages.contains(h)); } } @@ -318,10 +333,10 @@ mod tests { consensus.messages.push(MessageEntry { topic: $topic, message_hash: $hash, - instant: $now, message: $m, broadcast: false, - }) + }); + consensus.message_times.insert(($topic, $hash), $now); } } @@ -346,9 +361,15 @@ mod tests { assert_eq!(consensus.known_messages.len(), 1); assert!(consensus.known_messages.contains(&(best_hash, m2_hash))); - // make timestamp expired + // make timestamp expired, but the message is still kept as known consensus.messages.clear(); - push_msg!(best_hash, m2_hash, now - MESSAGE_LIFETIME, m2); + push_msg!(best_hash, m2_hash, now - MESSAGE_LIFETIME, m2.clone()); + consensus.collect_garbage(|_topic| true); + assert!(consensus.messages.is_empty()); + assert_eq!(consensus.known_messages.len(), 1); + + // make timestamp expired past the known message lifetime + push_msg!(best_hash, m2_hash, now - (2 * MESSAGE_LIFETIME), m2); consensus.collect_garbage(|_topic| true); assert!(consensus.messages.is_empty()); assert!(consensus.known_messages.is_empty());