Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Change best effort queue behaviour in dispute-coordinator #6275

Merged
merged 17 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 88 additions & 97 deletions node/core/dispute-coordinator/src/participation/queues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{
cmp::Ordering,
collections::{BTreeMap, HashMap},
};
use std::{cmp::Ordering, collections::BTreeMap};

use futures::channel::oneshot;
use polkadot_node_subsystem::{messages::ChainApiMessage, overseer};
Expand Down Expand Up @@ -50,25 +47,14 @@ const PRIORITY_QUEUE_SIZE: usize = 20_000;
#[cfg(test)]
const PRIORITY_QUEUE_SIZE: usize = 2;

/// Type for counting how often a candidate was added to the best effort queue.
type BestEffortCount = u32;

/// Queues for dispute participation.
/// In both queues we have a strict ordering of candidates and participation will
/// happen in that order. Refer to `CandidateComparator` for details on the ordering.
pub struct Queues {
/// Set of best effort participation requests.
///
/// Note that as size is limited to `BEST_EFFORT_QUEUE_SIZE` we simply do a linear search for
/// the entry with the highest `added_count` to determine what dispute to participate next in.
///
/// This mechanism leads to an amplifying effect - the more validators already participated,
/// the more likely it becomes that more validators will participate soon, which should lead to
/// a quick resolution of disputes, even in the best effort queue.
best_effort: HashMap<CandidateHash, BestEffortEntry>,
best_effort: BTreeMap<CandidateComparator, ParticipationRequest>,

/// Priority queue.
///
/// In the priority queue, we have a strict ordering of candidates and participation will
/// happen in that order.
priority: BTreeMap<CandidateComparator, ParticipationRequest>,
}

Expand Down Expand Up @@ -143,14 +129,13 @@ impl ParticipationRequest {
impl Queues {
/// Create new `Queues`.
pub fn new() -> Self {
Self { best_effort: HashMap::new(), priority: BTreeMap::new() }
Self { best_effort: BTreeMap::new(), priority: BTreeMap::new() }
}

/// Will put message in queue, either priority or best effort depending on priority.
///
/// If the message was already previously present on best effort, it will be moved to priority
/// if it considered priority now, otherwise the `added_count` on the best effort queue will be
/// bumped.
/// if it is considered priority now.
///
/// Returns error in case a queue was found full already.
pub async fn queue(
Expand All @@ -159,94 +144,76 @@ impl Queues {
priority: ParticipationPriority,
req: ParticipationRequest,
) -> Result<()> {
let comparator = match priority {
ParticipationPriority::BestEffort => None,
ParticipationPriority::Priority =>
CandidateComparator::new(sender, &req.candidate_receipt).await?,
};
self.queue_with_comparator(comparator, req)?;
let comparator = CandidateComparator::new(sender, &req.candidate_receipt).await?;

self.queue_with_comparator(comparator, priority, req)?;
Ok(())
}

/// Get the next best request for dispute participation
///
/// if any. Priority queue is always considered first, then the best effort queue based on
/// `added_count`.
/// Get the next best request for dispute participation if any.
/// First the priority queue is considered and then the best effort one.
pub fn dequeue(&mut self) -> Option<ParticipationRequest> {
if let Some(req) = self.pop_priority() {
// In case a candidate became best effort over time, we might have it also queued in
// the best effort queue - get rid of any such entry:
self.best_effort.remove(req.candidate_hash());
return Some(req)
return Some(req.1)
}
self.pop_best_effort()
self.pop_best_effort().map(|d| d.1)
}

fn queue_with_comparator(
&mut self,
comparator: Option<CandidateComparator>,
comparator: CandidateComparator,
priority: ParticipationPriority,
req: ParticipationRequest,
) -> std::result::Result<(), QueueError> {
if let Some(comparator) = comparator {
if priority.is_priority() {
if self.priority.len() >= PRIORITY_QUEUE_SIZE {
return Err(QueueError::PriorityFull)
}
// Remove any best effort entry:
self.best_effort.remove(&req.candidate_hash);
self.best_effort.remove(&comparator);
self.priority.insert(comparator, req);
} else {
if self.priority.contains_key(&comparator) {
// The candidate is already in priority queue - don't
// add in in best effort too.
return Ok(())
}
if self.best_effort.len() >= BEST_EFFORT_QUEUE_SIZE {
return Err(QueueError::BestEffortFull)
}
// Note: The request might have been added to priority in a previous call already, we
// take care of that case in `dequeue` (more efficient).
self.best_effort
.entry(req.candidate_hash)
.or_insert(BestEffortEntry { req, added_count: 0 })
.added_count += 1;
self.best_effort.insert(comparator, req);
}
Ok(())
}

/// Get the next best from the best effort queue.
///
/// If there are multiple best - just pick one.
fn pop_best_effort(&mut self) -> Option<ParticipationRequest> {
let best = self.best_effort.iter().reduce(|(hash1, entry1), (hash2, entry2)| {
if entry1.added_count > entry2.added_count {
(hash1, entry1)
} else {
(hash2, entry2)
}
});
if let Some((best_hash, _)) = best {
let best_hash = best_hash.clone();
self.best_effort.remove(&best_hash).map(|e| e.req)
} else {
None
}
/// Get best from the best effort queue.
fn pop_best_effort(&mut self) -> Option<(CandidateComparator, ParticipationRequest)> {
return Self::pop_impl(&mut self.best_effort)
}

/// Get best priority queue entry.
fn pop_priority(&mut self) -> Option<ParticipationRequest> {
fn pop_priority(&mut self) -> Option<(CandidateComparator, ParticipationRequest)> {
return Self::pop_impl(&mut self.priority)
}

// `pop_best_effort` and `pop_priority` do the same but on different `BTreeMap`s. This function has
// the extracted implementation
fn pop_impl(
target: &mut BTreeMap<CandidateComparator, ParticipationRequest>,
) -> Option<(CandidateComparator, ParticipationRequest)> {
// Once https://github.com/rust-lang/rust/issues/62924 is there, we can use a simple:
// priority.pop_first().
if let Some((comparator, _)) = self.priority.iter().next() {
// target.pop_first().
if let Some((comparator, _)) = target.iter().next() {
let comparator = comparator.clone();
self.priority.remove(&comparator)
target
.remove(&comparator)
.map(|participation_request| (comparator, participation_request))
} else {
None
}
}
}

/// Entry for the best effort queue.
struct BestEffortEntry {
req: ParticipationRequest,
/// How often was the above request added to the queue.
added_count: BestEffortCount,
}

/// `Comparator` for ordering of disputes for candidates.
///
/// This `comparator` makes it possible to order disputes based on age and to ensure some fairness
Expand All @@ -266,9 +233,12 @@ struct BestEffortEntry {
#[derive(Copy, Clone)]
#[cfg_attr(test, derive(Debug))]
struct CandidateComparator {
/// Block number of the relay parent.
/// Block number of the relay parent. It's wrapped in an `Option<>` because there are cases when
/// it can't be obtained. For example when the node is lagging behind and new leaves are received
/// with a slight delay. Candidates with unknown relay parent are treated with the lowest priority.
///
/// Important, so we will be participating in oldest disputes first.
/// The order enforced by `CandidateComparator` is important because we want to participate in
/// the oldest disputes first.
///
/// Note: In theory it would make more sense to use the `BlockNumber` of the including
/// block, as inclusion time is the actual relevant event when it comes to ordering. The
Expand All @@ -277,8 +247,10 @@ struct CandidateComparator {
/// just using the lowest `BlockNumber` of all available including blocks - the problem is,
/// that is not stable. If a new fork appears after the fact, we would start ordering the same
/// candidate differently, which would result in the same candidate getting queued twice.
relay_parent_block_number: BlockNumber,
/// By adding the `CandidateHash`, we can guarantee a unique ordering across candidates.
relay_parent_block_number: Option<BlockNumber>,
/// By adding the `CandidateHash`, we can guarantee a unique ordering across candidates with the
/// same relay parent block number. Candidates without `relay_parent_block_number` are ordered by
/// the `candidate_hash` (and treated with the lowest priority, as already mentioned).
candidate_hash: CandidateHash,
}

Expand All @@ -287,33 +259,35 @@ impl CandidateComparator {
///
/// Useful for testing.
#[cfg(test)]
pub fn new_dummy(block_number: BlockNumber, candidate_hash: CandidateHash) -> Self {
pub fn new_dummy(block_number: Option<BlockNumber>, candidate_hash: CandidateHash) -> Self {
Self { relay_parent_block_number: block_number, candidate_hash }
}

/// Create a candidate comparator for a given candidate.
///
/// Returns:
/// `Ok(None)` in case we could not lookup the candidate's relay parent, returns a
/// `FatalError` in case the chain API call fails with an unexpected error.
/// - `Ok(CandidateComparator{Some(relay_parent_block_number), candidate_hash})` when the
/// relay parent can be obtained. This is the happy case.
/// - `Ok(CandidateComparator{None, candidate_hash})` in case the candidate's relay parent
/// can't be obtained.
/// - `FatalError` in case the chain API call fails with an unexpected error.
pub async fn new(
sender: &mut impl overseer::DisputeCoordinatorSenderTrait,
candidate: &CandidateReceipt,
) -> FatalResult<Option<Self>> {
) -> FatalResult<Self> {
let candidate_hash = candidate.hash();
let n = match get_block_number(sender, candidate.descriptor().relay_parent).await? {
None => {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
"Candidate's relay_parent could not be found via chain API - `CandidateComparator could not be provided!"
);
return Ok(None)
},
Some(n) => n,
};
let n = get_block_number(sender, candidate.descriptor().relay_parent).await?;

if n.is_none() {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
"Candidate's relay_parent could not be found via chain API - `CandidateComparator` \
with an empty relay parent block number will be provided!"
);
}

Ok(Some(CandidateComparator { relay_parent_block_number: n, candidate_hash }))
Ok(CandidateComparator { relay_parent_block_number: n, candidate_hash })
}
}

Expand All @@ -333,11 +307,28 @@ impl PartialOrd for CandidateComparator {

impl Ord for CandidateComparator {
fn cmp(&self, other: &Self) -> Ordering {
match self.relay_parent_block_number.cmp(&other.relay_parent_block_number) {
Ordering::Equal => (),
o => return o,
return match (self.relay_parent_block_number, other.relay_parent_block_number) {
(None, None) => {
// No relay parents for both -> compare hashes
self.candidate_hash.cmp(&other.candidate_hash)
},
(Some(self_relay_parent_block_num), Some(other_relay_parent_block_num)) => {
match self_relay_parent_block_num.cmp(&other_relay_parent_block_num) {
// if the relay parent is the same for both -> compare hashes
Ordering::Equal => self.candidate_hash.cmp(&other.candidate_hash),
// if not - return the result from comparing the relay parent block numbers
o => return o,
}
},
(Some(_), None) => {
// Candidates with known relay parents are always with priority
Ordering::Less
},
(None, Some(_)) => {
// Ditto
Ordering::Greater
},
}
self.candidate_hash.cmp(&other.candidate_hash)
}
}

Expand Down
Loading