diff --git a/client/consensus/rhd/src/lib.rs b/client/consensus/rhd/src/lib.rs index 8521d96ba317b..4f63193d709da 100644 --- a/client/consensus/rhd/src/lib.rs +++ b/client/consensus/rhd/src/lib.rs @@ -9,23 +9,105 @@ use tokio::runtime::TaskExecutor; use tokio::timer::Delay; use parking_lot::{RwLock, Mutex}; -use sp_core::crypto::Pair; -use sp_runtime::traits::{Block as BlockT, Header}; -use sp_consensus::{Environment, Proposer}; +use codec::{Encode, Decode, Codec}; + +use sp_core::{ + Blake2Hasher, + H256, + Pair, + // TODO: need add RHD to key_types + crypto::key_types::RHD; +}; +use sp_runtime::{ + generic::{ + BlockId, + OpaqueDigestItemId + }, + traits::{ + Block as BlockT, + Header, + DigestItemFor, + ProvideRuntimeApi, + Zero, + }, + Justification, + ConsensusEngineId, +}; +use sp_consensus::{ + self, + BlockImport, + Environment, + Proposer, + BlockCheckParams, + ForkChoiceStrategy, + BlockImportParams, + BlockOrigin, + ImportResult, + Error as ConsensusError, + SelectChain, + SyncOracle, + CanAuthorWith, + import_queue::{ + Verifier, + BasicQueue, + CacheKeyId + }, +}; +// TODO: need to supply, if we want to export api +use sp_consensus_rhd::{ + RhdApi, + RhdPreDigest, + CompatibleDigestItem, + AuthorityId +}; +use sc_client_api::{ + backend::{ + AuxStore, + Backend + }, + call_executor::CallExecutor, + BlockchainEvents, + ProvideUncles, +}; +use sc_keystore::KeyStorePtr; +use sc_client::Client; +use sp_block_builder::BlockBuilder as BlockBuilderApi; +use sp_blockchain::{ + Result as ClientResult, + Error as ClientError, + HeaderBackend, + ProvideCache, + HeaderMetadata, + well_known_cache_keys::{ + self, + Id as CacheKeyId + }, +}; +use sp_api::ApiExt; + + + +mod app { + use sp_application_crypto::{ + app_crypto, + sr25519 + }; + app_crypto!(sr25519, RHD); +} +#[cfg(feature = "std")] +pub type AuthorityPair = app::Pair; +pub type AuthoritySignature = app::Signature; +pub type AuthorityId = app::Public; +pub const RHD_ENGINE_ID: ConsensusEngineId = *b"RHDE"; -type AuthorityId

=

::Public; -/// Result of a committed round of BFT -pub type Committed = rhododendron::Committed::Hash, LocalizedSignature>; +pub type Committed = rhododendron::Committed::Hash, LocalizedSignature>; -/// Communication between BFT participants. -pub type Communication = rhododendron::Communication::Hash, AuthorityId, LocalizedSignature>; +pub type Communication = rhododendron::Communication::Hash, AuthorityId, LocalizedSignature>; -/// Misbehavior observed from BFT participants. pub type Misbehavior = rhododendron::Misbehavior; -/// Shared offline validator tracker. pub type SharedOfflineTracker = Arc>; @@ -65,65 +147,36 @@ impl Drop for AgreementHandle { } -pub struct RhdService { - // TODO: Use consensus common authority key - key: Arc>, - client: Arc, - live_agreement: Mutex>, - round_cache: Arc>>, - round_timeout_multiplier: u64, - factory: P, -} -impl RhdService where - C: Pair; + +/// +pub struct RhdWorker where B: BlockT + Clone + Eq, - P: Environment, - P::Proposer: Proposer, - // TODO: need modify - I: BlockImport + Authorities, + B::Hash: ::std::hash::Hash, + P: Proposer, + InStream: Stream, Error=Error>, + OutSink: Sink, SinkError=Error>, { - pub fn new(client: Arc, key: Arc>, factory: P) -> RhdService { - RhdService { - key: key, - client: client, - live_agreement: Mutex::new(None), - round_cache: Arc::new(Mutex::new(RoundCache { - hash: None, - start_round: 0, - })), - round_timeout_multiplier: 10, - factory, - } - } - - pub fn build_upon(&self, header: &B::Header, input: In, output: Out) - -> Result>::Proposer, I, In, Out>>, P::Error> - where - In: Stream, Error=Error>, - Out: Sink, SinkError=Error> { - - - } - } -/// -pub struct RhdWorker where +impl RhdWorker where B: BlockT + Clone + Eq, B::Hash: ::std::hash::Hash, P: Proposer, + I: BlockImport, InStream: Stream, Error=Error>, - OutSink: Sink, SinkError=Error>, -{ - inner: rhododendron::Agreement, InStream, OutSink>, - status: Arc, - cancel: oneshot::Receiver<()>, - import: Arc, + OutSink: Sink, SinkError=Error> { + + pub fn new() { + + + } + } + impl Future for RhdWorker where B: BlockT + Clone + Eq, B::Hash: ::std::hash::Hash, @@ -148,7 +201,7 @@ impl Drop for RhdWorker /// Instance of BFT agreement. -struct RhdInstance { +struct RhdContext { key: Arc>, authorities: Vec>, parent_hash: B::Hash, @@ -157,23 +210,503 @@ struct RhdInstance { proposer: P, } -impl> rhododendron::Context for RhdInstance where +impl> rhododendron::Context for RhdContext where B: Clone + Eq, B::Hash: ::std::hash::Hash, { - type Error = P::Error; - type AuthorityId = AuthorityId; - type Digest = B::Hash; - type Signature = LocalizedSignature; - type Candidate = B; - type RoundTimeout = Box>; - type CreateProposal = ::Future; - type EvaluateProposal = ::Future; + type Error = P::Error; + type AuthorityId = AuthorityId; + type Digest = B::Hash; + // TODO: how to replace localizedsignature + type Signature = LocalizedSignature; + type Candidate = B; + type RoundTimeout = Box>; + type CreateProposal = ::Future; + type EvaluateProposal = ::Future; + + + // fn generate_round_communication_entities() + // generate round_in and round_out here + + + +} + + +#[allow(deprecated)] +fn authorities(client: &C, at: &BlockId) -> Result, ConsensusError> where + A: Codec, + B: BlockT, + C: ProvideRuntimeApi + BlockOf + ProvideCache, + C::Api: AuraApi, +{ + client + .cache() + .and_then(|cache| cache + .get_at(&well_known_cache_keys::AUTHORITIES, at) + .and_then(|(_, _, v)| Decode::decode(&mut &v[..]).ok()) + ) + .or_else(|| AuraApi::authorities(&*client.runtime_api(), at).ok()) + .ok_or_else(|| sp_consensus::Error::InvalidAuthoritiesSet.into()) +} + + +pub enum CheckedHeader { + Checked(H, S), +} + +struct VerificationParams { + pub header: B::Header, + pub pre_digest: Option, +} + +struct VerifiedHeaderInfo { + pub pre_digest: DigestItemFor, + pub seal: DigestItemFor, + pub author: AuthorityId, +} + +fn check_header( + params: VerificationParams, +) -> Result>, Error> where + DigestItemFor: CompatibleDigestItem, +{ + let VerificationParams { + mut header, + pre_digest, + } = params; + + let authorities = authorities(self.client.as_ref(), &BlockId::Hash(parent_hash)) + .map_err(|e| format!("Could not fetch authorities at {:?}: {:?}", parent_hash, e))?; + let author = match authorities.get(pre_digest.authority_index() as usize) { + Some(author) => author.0.clone(), + None => return Err(babe_err(Error::SlotAuthorNotFound)), + }; + + let seal = match header.digest_mut().pop() { + Some(x) => x, + None => return Err(babe_err(Error::HeaderUnsealed(header.hash()))), + }; + + let info = VerifiedHeaderInfo { + pre_digest: CompatibleDigestItem::babe_pre_digest(pre_digest), + seal, + author, + }; + Ok(CheckedHeader::Checked(header, info)) +} + + + + +pub struct RhdVerifier { + client: Arc>, + api: Arc, +} + +impl Verifier for RhdVerifier where + Block: BlockT, + B: Backend + 'static, + E: CallExecutor + 'static + Clone + Send + Sync, + RA: Send + Sync, + PRA: ProvideRuntimeApi + Send + Sync + AuxStore + ProvideCache, + PRA::Api: BlockBuilderApi + BabeApi, +{ + fn verify( + &mut self, + origin: BlockOrigin, + header: Block::Header, + justification: Option, + mut body: Option>, + ) -> Result<(BlockImportParams, Option)>>), String> { + + let pre_digest = find_pre_digest::(&header)?; + + let v_params = VerificationParams { + header: header.clone(), + pre_digest: Some(pre_digest.clone()), + }; + + let checked_result = check_header::(v_params)?; + match checked_result { + CheckedHeader::Checked(pre_header, verified_info) => { + let block_import_params = BlockImportParams { + origin, + header: pre_header, + post_digests: vec![verified_info.seal], + body, + // TODO: need set true? for instant finalization + finalized: false, + justification, + auxiliary: Vec::new(), + fork_choice: ForkChoiceStrategy::LongestChain, + allow_missing_state: false, + import_existing: false, + }; + + Ok((block_import_params, Default::default())) + }, + // TODO: we'd better add this branch + // CheckedHeader::NotChecked => {} + + } + + + } +} + + + +pub(crate) enum VoterCommand { + Start, + Pause(String), +// ChangeAuthorities(NewAuthoritySet), +} + +pub struct RhdBlockImport { + inner: I, + client: Arc>, + api: Arc, + voter_commands_tx: mpsc::UnboundedSender, } +impl Clone for RhdBlockImport { + fn clone(&self) -> Self { + RhdBlockImport { + inner: self.inner.clone(), + client: self.client.clone(), + api: self.api.clone(), + voter_commands_tx: self.voter_commands_tx.clone() + } + } +} + +impl RhdBlockImport { + fn new( + client: Arc>, + api: Arc, + block_import: I, + voter_commands_tx: mpsc::UnboundedSender + ) -> Self { + RhdBlockImport { + client, + api, + inner: block_import, + voter_commands_tx + } + } +} + +impl BlockImport for RhdBlockImport where + Block: BlockT, + I: BlockImport + Send + Sync, + I::Error: Into, + B: Backend + 'static, + E: CallExecutor + 'static + Clone + Send + Sync, + RA: Send + Sync, + PRA: ProvideRuntimeApi + ProvideCache, + PRA::Api: BabeApi, +{ + type Error = ConsensusError; + + fn check_block( + &mut self, + block: BlockCheckParams, + ) -> Result { + self.inner.check_block(block) + //.map_err(Into::into) + } + + fn import_block( + &mut self, + mut block: BlockImportParams, + new_cache: HashMap>, + ) -> Result { + + + + + + } + + +} + + + +pub struct LinkHalf, RA> { + client: Arc>, + voter_commands_rx: mpsc::UnboundedReceiver, +} + +pub fn generate_block_import_object, I, RA, PRA>( +// config: Config, +// wrapped_block_import: I, + client: Arc>, + api: Arc, +) -> ClientResult<(RhdBlockImport, LinkHalf)> where + B: Backend, + E: CallExecutor + Send + Sync, + RA: Send + Sync, +{ + + let default_block_import = client.clone(); + let (voter_commands_tx, voter_commands_rx) = mpsc::unbounded(); + + let import = RhdBlockImport::new( + client: client.clone(), + api, + default_block_import, + voter_commands_tx + ); + let link = LinkHalf { + client: client.clone(), + voter_commands_rx, + }; + + Ok((import, link)) +} + + + +/// The Aura import queue type. +pub type RhdImportQueue = BasicQueue; + +pub fn generate_import_queue, I, RA, PRA>( +// babe_link: BabeLink, + block_import: I, + justification_import: Option>, + finality_proof_import: Option>, + client: Arc>, + api: Arc, +// inherent_data_providers: InherentDataProviders, +) -> ClientResult> where + B: Backend + 'static, + I: BlockImport + Send + Sync + 'static, + E: CallExecutor + Clone + Send + Sync + 'static, + RA: Send + Sync + 'static, + PRA: ProvideRuntimeApi + ProvideCache + Send + Sync + AuxStore + 'static, + PRA::Api: BlockBuilderApi + BabeApi + ApiExt, +{ + + let verifier = RhdVerifier { + client: client.clone(), + api, + }; + + Ok(BasicQueue::new( + verifier, + Box::new(block_import), + justification_import, + finality_proof_import, + )) +} + + +// let proposer = sc_basic_authority::ProposerFactory { +// client: service.client(), +// transaction_pool: service.transaction_pool(), +// }; + + +pub struct RhdParams { + pub keystore: KeyStorePtr, + pub client: Arc, + pub select_chain: SC, + /// The environment we are producing blocks for. + pub env: E, + pub block_import: I, + pub sync_oracle: SO, + /// Force authoring of blocks even if we are offline + pub force_authoring: bool, + /// Checks if the current native implementation can author with a runtime at a given block. + pub can_author_with: CAW, +} + +pub fn run_rhd_worker(RhdParams { + keystore, + client, + select_chain, + env, + block_import, + sync_oracle, + inherent_data_providers, + force_authoring, + babe_link, + can_author_with, +}: RhdParams) + -> Result,sp_consensus::Error,> where + B: BlockT, + C: ProvideRuntimeApi + ProvideCache + ProvideUncles + BlockchainEvents + HeaderBackend + HeaderMetadata + Send + Sync + 'static, + C::Api: BabeApi, + SC: SelectChain + 'static, + E: Environment + Send + Sync, + E::Proposer: Proposer, + >::Create: Unpin + Send + 'static, + I: BlockImport + Send + Sync + 'static, + Error: std::error::Error + Send + From<::sp_consensus::Error> + From + 'static, + SO: SyncOracle + Send + Sync + Clone, + CAW: CanAuthorWith + Send, +{ + let rhd_worker = RhdWorker::new( + client.clone(), + Arc::new(Mutex::new(block_import)), + // env here is a proposer + env, + sync_oracle.clone(), + force_authoring, + keystore, + ); + + Ok(rhd_worker) +} + + +struct RhdVoterWorker, RA, SC, VR> { + voter: Box>> + Send>, + env: Arc>, + voter_commands_rx: mpsc::UnboundedReceiver>>, +} + +impl RhdVoterWorker +where + Block: BlockT, + N: NetworkT + Sync, +NumberFor: BlockNumberOps, + RA: 'static + Send + Sync, + E: CallExecutor + Send + Sync + 'static, + B: Backend + 'static, + SC: SelectChain + 'static, + VR: VotingRule> + Clone + 'static, +{ + fn new( + client: Arc>, + config: Config, + network: NetworkBridge, + select_chain: SC, + voting_rule: VR, + persistent_data: PersistentData, + voter_commands_rx: mpsc::UnboundedReceiver>>, + ) -> Self { + + // When make new voter instance, generate a channel two ends, and pass tx to voter + // voter_commands_rx used to receive cmd directive from substrate: start, pause,... + // voter_commitout_tx used to send commit message to substrate, indicate that one round has been finished for this local node + + + } + + fn handle_voter_command( + &mut self, + command: VoterCommand> + ) -> Result<(), Error> { + + } + +} + +impl Future for RhdVoterWorker +where + Block: BlockT, + N: NetworkT + Sync, + NumberFor: BlockNumberOps, + RA: 'static + Send + Sync, + E: CallExecutor + Send + Sync + 'static, + B: Backend + 'static, + SC: SelectChain + 'static, + VR: VotingRule> + Clone + 'static, +{ + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + + } +} + + + +pub fn run_rhd_voter_worker, N, RA, SC, VR, X, Sp>( + grandpa_params: GrandpaParams, +) -> sp_blockchain::Result + Send + 'static> where + Block::Hash: Ord, + B: Backend + 'static, + E: CallExecutor + Send + Sync + 'static, + N: NetworkT + Send + Sync + Clone + 'static, + SC: SelectChain + 'static, + VR: VotingRule> + Clone + 'static, + NumberFor: BlockNumberOps, + DigestFor: Encode, + RA: Send + Sync + 'static, + X: futures03::Future + Clone + Send + Unpin + 'static, + Sp: futures03::task::Spawn + 'static, +{ + + +} + + +// use gossip_engine to generate RoundIncomingStream +// let incoming = Compat::new(self.gossip_engine.messages_for(topic) +// type RoundIncomingStream = mpsc::UnboundedReceiver +// +struct RoundOutgoingSink { + round: RoundNumber, + set_id: SetIdNumber, + locals: Option<(AuthorityPair, AuthorityId)>, + sender: mpsc::UnboundedSender>, + network: GossipEngine, + has_voted: HasVoted, +} + +impl Sink for RoundOutgoingSink { + type SinkItem = Message; + type SinkError = Error; + + +} + + + + + + + + + + + + + +fn find_pre_digest(header: &B::Header) -> Result> +{ + // genesis block doesn't contain a pre digest so let's generate a + // dummy one to not break any invariants in the rest of the code + if header.number().is_zero() { + return Ok(BabePreDigest::Secondary { + slot_number: 0, + authority_index: 0, + }); + } + + let mut pre_digest: Option<_> = None; + for log in header.digest().logs() { + trace!(target: "babe", "Checking log {:?}, looking for pre runtime digest", log); + match (log.as_babe_pre_digest(), pre_digest.is_some()) { + (Some(_), true) => return Err(babe_err(Error::MultiplePreRuntimeDigests)), + (None, _) => trace!(target: "babe", "Ignoring digest not meant for us"), + (s, false) => pre_digest = s, + } + } + pre_digest.ok_or_else(|| babe_err(Error::NoPreRuntimeDigest)) +} + + + + #[cfg(test)]