Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
implement custom proposer (#1320)
Browse files Browse the repository at this point in the history
* network bridge skeleton

* move some primitives around and add debug impls

* protocol registration glue & abstract network interface

* add send_msgs to subsystemctx

* select logic

* transform different events into actions and handle

* implement remaining network bridge state machine

* start test skeleton

* make network methods asynchronous

* extract subsystem out to subsystem crate

* port over overseer to subsystem context trait

* fix minimal example

* fix overseer doc test

* update network-bridge crate

* write a subsystem test-helpers crate

* write a network test helper for network-bridge

* set up (broken) view test

* Revamp network to be more async-friendly and not require Sync

* fix spacing

* fix test compilation

* insert side-channel for actions

* Add some more message types to AllMessages

* introduce a test harness

* impl ProvideInherent for InclusionInherent

* reduce import churn; correct expect message

* move inclusion inherent identifier into primitives

It's not clear precisely why this is desired, but it's a pattern
I've seen in several places, so I'm going this to be on the
safe side. Worst case, we can revert this commit pretty easily.

* bump kusama spec_version to placate CI

* copy sc_basic_authorship::{ProposerFactory, Proposer}

We have from the problem description:

> This Proposer will require an OverseerHandle to make requests via.

That's next on the plate.

* use polkadot custom proposer instead of basic-authorship one

* add some tests

* ensure service compiles and passes tests

* fix typo

* fix service-new compilation

* Subsystem test helpers send messages synchronously

* remove smelly action inspector

* remove superfluous let binding

* fix warnings

* add license header

* empty commit; maybe github will notice the one with changes

* Update node/network/bridge/src/lib.rs

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* add sanity check to only include valid inherents

* stub: encapsulate block production mechanics instead of copying them

The goal is to end up with something like what's in
validation::block_production::*, which encapsulates
basic block production mechanics. This is a better idea than
just straight-up copying those mechanics.

* partial implementation of propose fn

Doesn't actually compile yet; need to bring in some other
commits to ensure ProvisionerMessage is a thing, and also
figure out how to get the block hash given the current
context.

* fix compilation

* clear a few more compile errors

* finish fn propose

* broken: add timeout to proposal

* add timeout to proposal

* guide: provisioner is responsible for selecting parachain candidates

* implement ProvisionerMessage::RequestInherentData & update fn propose

* impl CreateProposer::init; clean up

* impl std::error::Error for Error

* document error-handling rationale

* cause polkadot-service-new to compile correctly

* Move potentially-blocking call from fn init -> fn propose

This means that we can wrap the delayed call into the same
timeout check used elsewhere.

* document struct Proposer

* extract provisioner data fetch

This satisfies two requirements:

- only applies the timeout to actually fetching the provisioner data,
  not to constructing the block after
- simplifies the problem of injecting default data if we could not
  get the real provisioner data in time.

Co-authored-by: Robert Habermeier <rphmeier@gmail.com>
Co-authored-by: Gavin Wood <gavin@parity.io>
  • Loading branch information
3 people authored Jul 5, 2020
1 parent 78e6e08 commit 69ce9ff
Show file tree
Hide file tree
Showing 10 changed files with 352 additions and 7 deletions.
28 changes: 27 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ members = [
"service",
"validation",

"node/core/proposer",
"node/network/bridge",
"node/overseer",
"node/primitives",
Expand Down
27 changes: 27 additions & 0 deletions node/core/proposer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "polkadot-node-core-proposer"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"

[dependencies]
futures = "0.3.4"
futures-timer = "3.0.1"
log = "0.4.8"
parity-scale-codec = "1.3.0"
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-overseer = { path = "../../overseer" }
polkadot-primitives = { path = "../../../primitives" }
sc-basic-authorship = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-block-builder = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-telemetry = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-inherents = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "master" }
tokio-executor = { version = "0.2.0-alpha.6", features = ["blocking"] }
wasm-timer = "0.2.4"
262 changes: 262 additions & 0 deletions node/core/proposer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
use futures::prelude::*;
use futures::select;
use polkadot_node_subsystem::{messages::{AllMessages, ProvisionerInherentData, ProvisionerMessage}, SubsystemError};
use polkadot_overseer::OverseerHandler;
use polkadot_primitives::{
inclusion_inherent,
parachain::ParachainHost,
Block, Hash, Header,
};
use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider};
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_blockchain::HeaderBackend;
use sp_consensus::{Proposal, RecordProof};
use sp_inherents::InherentData;
use sp_runtime::traits::{DigestFor, HashFor};
use sp_transaction_pool::TransactionPool;
use std::{fmt, pin::Pin, sync::Arc, time};

/// How long proposal can take before we give up and err out
const PROPOSE_TIMEOUT: core::time::Duration = core::time::Duration::from_secs(2);

/// Custom Proposer factory for Polkadot
pub struct ProposerFactory<TxPool, Backend, Client> {
inner: sc_basic_authorship::ProposerFactory<TxPool, Backend, Client>,
overseer: OverseerHandler,
}

impl<TxPool, Backend, Client> ProposerFactory<TxPool, Backend, Client> {
pub fn new(
client: Arc<Client>,
transaction_pool: Arc<TxPool>,
overseer: OverseerHandler,
) -> Self {
ProposerFactory {
inner: sc_basic_authorship::ProposerFactory::new(
client,
transaction_pool,
None,
),
overseer,
}
}
}

impl<TxPool, Backend, Client> sp_consensus::Environment<Block>
for ProposerFactory<TxPool, Backend, Client>
where
TxPool: 'static + TransactionPool<Block = Block>,
Client: 'static
+ BlockBuilderProvider<Backend, Block, Client>
+ ProvideRuntimeApi<Block>
+ HeaderBackend<Block>
+ Send
+ Sync,
Client::Api:
ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
Backend:
'static + sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>>,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HashFor<Block>> + Send,
{
type CreateProposer = Pin<Box<
dyn Future<Output = Result<Self::Proposer, Self::Error>> + Send + 'static,
>>;
type Proposer = Proposer<TxPool, Backend, Client>;
type Error = Error;

fn init(&mut self, parent_header: &Header) -> Self::CreateProposer {
// create the inner proposer
let proposer = self.inner.init(parent_header).into_inner();

// data to be moved into the future
let overseer = self.overseer.clone();
let parent_header_hash = parent_header.hash();

async move {
Ok(Proposer {
inner: proposer?,
overseer,
parent_header_hash,
})
}.boxed()
}
}

/// Custom Proposer for Polkadot.
///
/// This proposer gets the ProvisionerInherentData and injects it into the wrapped
/// proposer's inherent data, then delegates the actual proposal generation.
pub struct Proposer<TxPool: TransactionPool<Block = Block>, Backend, Client> {
inner: sc_basic_authorship::Proposer<Backend, Block, Client, TxPool>,
overseer: OverseerHandler,
parent_header_hash: Hash,
}

// This impl has the same generic bounds as the Proposer impl.
impl<TxPool, Backend, Client> Proposer<TxPool, Backend, Client>
where
TxPool: 'static + TransactionPool<Block = Block>,
Client: 'static
+ BlockBuilderProvider<Backend, Block, Client>
+ ProvideRuntimeApi<Block>
+ HeaderBackend<Block>
+ Send
+ Sync,
Client::Api:
ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
Backend:
'static + sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>>,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HashFor<Block>> + Send,
{
/// Get provisioner inherent data
///
/// This function has a constant timeout: `PROPOSE_TIMEOUT`.
fn get_provisioner_data(&self) -> impl Future<Output = Result<ProvisionerInherentData, Error>> {
// clone this (lightweight) data because we're going to move it into the future
let mut overseer = self.overseer.clone();
let parent_header_hash = self.parent_header_hash.clone();

let mut provisioner_inherent_data = async move {
let (sender, receiver) = futures::channel::oneshot::channel();

// strictly speaking, we don't _have_ to .await this send_msg before opening the
// receiver; it's possible that the response there would be ready slightly before
// this call completes. IMO it's not worth the hassle or overhead of spawning a
// distinct task for that kind of miniscule efficiency improvement.
overseer.send_msg(AllMessages::Provisioner(
ProvisionerMessage::RequestInherentData(parent_header_hash, sender),
)).await?;

receiver.await.map_err(Error::ClosedChannelFromProvisioner)
}
.boxed()
.fuse();

let mut timeout = wasm_timer::Delay::new(PROPOSE_TIMEOUT).fuse();

async move {
select! {
pid = provisioner_inherent_data => pid,
_ = timeout => Err(Error::Timeout),
}
}
}
}

impl<TxPool, Backend, Client> sp_consensus::Proposer<Block> for Proposer<TxPool, Backend, Client>
where
TxPool: 'static + TransactionPool<Block = Block>,
Client: 'static
+ BlockBuilderProvider<Backend, Block, Client>
+ ProvideRuntimeApi<Block>
+ HeaderBackend<Block>
+ Send
+ Sync,
Client::Api:
ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
Backend:
'static + sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>>,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HashFor<Block>> + Send,
{
type Transaction = sc_client_api::TransactionFor<Backend, Block>;
type Proposal = Pin<Box<
dyn Future<Output = Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error>> + Send,
>>;
type Error = Error;

fn propose(
self,
mut inherent_data: InherentData,
inherent_digests: DigestFor<Block>,
max_duration: time::Duration,
record_proof: RecordProof,
) -> Self::Proposal {
let provisioner_data = self.get_provisioner_data();

async move {
let provisioner_data = match provisioner_data.await {
Ok(pd) => pd,
Err(err) => {
log::warn!("could not get provisioner inherent data; injecting default data: {}", err);
Default::default()
}
};

inherent_data.put_data(
inclusion_inherent::INHERENT_IDENTIFIER,
&provisioner_data,
)?;

self.inner
.propose(inherent_data, inherent_digests, max_duration, record_proof)
.await
.map_err(Into::into)
}
.boxed()
}
}

// It would have been more ergonomic to use thiserror to derive the
// From implementations, Display, and std::error::Error, but unfortunately
// two of the wrapped errors (sp_inherents::Error, SubsystemError) also
// don't impl std::error::Error, which breaks the thiserror derive.
#[derive(Debug)]
pub enum Error {
Consensus(sp_consensus::Error),
Blockchain(sp_blockchain::Error),
Inherent(sp_inherents::Error),
Timeout,
ClosedChannelFromProvisioner(futures::channel::oneshot::Canceled),
Subsystem(SubsystemError)
}

impl From<sp_consensus::Error> for Error {
fn from(e: sp_consensus::Error) -> Error {
Error::Consensus(e)
}
}

impl From<sp_blockchain::Error> for Error {
fn from(e: sp_blockchain::Error) -> Error {
Error::Blockchain(e)
}
}

impl From<sp_inherents::Error> for Error {
fn from(e: sp_inherents::Error) -> Error {
Error::Inherent(e)
}
}

impl From<SubsystemError> for Error {
fn from(e: SubsystemError) -> Error {
Error::Subsystem(e)
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Consensus(err) => write!(f, "consensus error: {}", err),
Self::Blockchain(err) => write!(f, "blockchain error: {}", err),
Self::Inherent(err) => write!(f, "inherent error: {:?}", err),
Self::Timeout => write!(f, "timeout: provisioner did not return inherent data after {:?}", PROPOSE_TIMEOUT),
Self::ClosedChannelFromProvisioner(err) => write!(f, "provisioner closed inherent data channel before sending: {}", err),
Self::Subsystem(err) => write!(f, "subsystem error: {:?}", err),
}
}
}

impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Consensus(err) => Some(err),
Self::Blockchain(err) => Some(err),
Self::ClosedChannelFromProvisioner(err) => Some(err),
_ => None
}
}
}
2 changes: 1 addition & 1 deletion node/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ kusama-runtime = { path = "../../runtime/kusama" }
westend-runtime = { path = "../../runtime/westend" }
polkadot-network = { path = "../../network", optional = true }
polkadot-rpc = { path = "../../rpc" }
polkadot-node-core-proposer = { path = "../core/proposer" }
sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down Expand Up @@ -57,7 +58,6 @@ sp-session = { git = "https://github.com/paritytech/substrate", branch = "master
sp-offchain = { package = "sp-offchain", git = "https://github.com/paritytech/substrate", branch = "master" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", git = "https://github.com/paritytech/substrate", branch = "master" }
frame-benchmarking = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-basic-authorship = { git = "https://github.com/paritytech/substrate", branch = "master" }

[dev-dependencies]
polkadot-test-runtime-client = { path = "../../runtime/test-runtime/client" }
Expand Down
7 changes: 4 additions & 3 deletions node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use polkadot_subsystem::{
Subsystem, SubsystemContext, SpawnedSubsystem,
messages::{CandidateValidationMessage, CandidateBackingMessage},
};
use polkadot_node_core_proposer::ProposerFactory;
use sp_trie::PrefixedMemoryDB;
pub use service::{
Role, PruningMode, TransactionPoolOptions, Error, RuntimeGenesis,
Expand Down Expand Up @@ -362,6 +363,7 @@ macro_rules! new_full {
.collect();

let (overseer, handler) = real_overseer(leaves, spawner)?;
let handler_clone = handler.clone();

task_manager.spawn_essential_handle().spawn_blocking("overseer", Box::pin(async move {
use futures::{pin_mut, select, FutureExt};
Expand All @@ -388,11 +390,10 @@ macro_rules! new_full {
let can_author_with =
consensus_common::CanAuthorWithNativeVersion::new(client.executor().clone());

// TODO: custom proposer (https://github.com/paritytech/polkadot/issues/1248)
let proposer = sc_basic_authorship::ProposerFactory::new(
let proposer = ProposerFactory::new(
client.clone(),
transaction_pool,
None,
handler_clone,
);

let babe_config = babe::BabeParams {
Expand Down
Loading

0 comments on commit 69ce9ff

Please sign in to comment.