diff --git a/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs b/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs index ef462a54fca5b..7121410ea109b 100644 --- a/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs +++ b/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs @@ -43,7 +43,7 @@ use crate::{ }; /// Response type received from network. -type Response = Result, RequestFailure>; +type Response = Result<(Vec, ProtocolName), RequestFailure>; /// Used to receive a response from the network. type ResponseReceiver = oneshot::Receiver; @@ -125,6 +125,7 @@ impl OnDemandJustificationsEngine { peer, self.protocol_name.clone(), payload, + None, tx, IfDisconnected::ImmediateError, ); @@ -204,7 +205,7 @@ impl OnDemandJustificationsEngine { }, } }) - .and_then(|encoded| { + .and_then(|(encoded, _)| { decode_and_verify_finality_proof::( &encoded[..], req_info.block, diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index 745550412fc21..1f234683392f1 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -231,13 +231,20 @@ impl Behaviour { pub fn send_request( &mut self, target: &PeerId, - protocol: &str, + protocol: ProtocolName, request: Vec, - pending_response: oneshot::Sender, RequestFailure>>, + fallback_request: Option<(Vec, ProtocolName)>, + pending_response: oneshot::Sender, ProtocolName), RequestFailure>>, connect: IfDisconnected, ) { - self.request_responses - .send_request(target, protocol, request, pending_response, connect) + self.request_responses.send_request( + target, + protocol, + request, + fallback_request, + pending_response, + connect, + ) } /// Returns a shared reference to the user protocol. diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 5af072aaddc62..0cd1cf06bb33e 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -56,6 +56,7 @@ use libp2p::{ use std::{ collections::{hash_map::Entry, HashMap}, io, iter, + ops::Deref, pin::Pin, task::{Context, Poll}, time::{Duration, Instant}, @@ -172,6 +173,13 @@ pub struct OutgoingResponse { pub sent_feedback: Option>, } +/// Information stored about a pending request. +struct PendingRequest { + started_at: Instant, + response_tx: oneshot::Sender, ProtocolName), RequestFailure>>, + fallback_request: Option<(Vec, ProtocolName)>, +} + /// When sending a request, what to do on a disconnected recipient. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub enum IfDisconnected { @@ -264,8 +272,7 @@ pub struct RequestResponsesBehaviour { >, /// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply. - pending_requests: - HashMap, RequestFailure>>)>, + pending_requests: HashMap, /// Whenever an incoming request arrives, a `Future` is added to this list and will yield the /// start time and the response to send back to the remote. @@ -348,29 +355,25 @@ impl RequestResponsesBehaviour { pub fn send_request( &mut self, target: &PeerId, - protocol_name: &str, + protocol_name: ProtocolName, request: Vec, - pending_response: oneshot::Sender, RequestFailure>>, + fallback_request: Option<(Vec, ProtocolName)>, + pending_response: oneshot::Sender, ProtocolName), RequestFailure>>, connect: IfDisconnected, ) { log::trace!(target: "sub-libp2p", "send request to {target} ({protocol_name:?}), {} bytes", request.len()); - if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) { - if protocol.is_connected(target) || connect.should_connect() { - let request_id = protocol.send_request(target, request); - let prev_req_id = self.pending_requests.insert( - (protocol_name.to_string().into(), request_id).into(), - (Instant::now(), pending_response), - ); - debug_assert!(prev_req_id.is_none(), "Expect request id to be unique."); - } else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() { - log::debug!( - target: "sub-libp2p", - "Not connected to peer {:?}. At the same time local \ - node is no longer interested in the result.", - target, - ); - } + if let Some((protocol, _)) = self.protocols.get_mut(protocol_name.deref()) { + Self::send_request_inner( + protocol, + &mut self.pending_requests, + target, + protocol_name, + request, + fallback_request, + pending_response, + connect, + ) } else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() { log::debug!( target: "sub-libp2p", @@ -380,6 +383,37 @@ impl RequestResponsesBehaviour { ); } } + + fn send_request_inner( + behaviour: &mut Behaviour, + pending_requests: &mut HashMap, + target: &PeerId, + protocol_name: ProtocolName, + request: Vec, + fallback_request: Option<(Vec, ProtocolName)>, + pending_response: oneshot::Sender, ProtocolName), RequestFailure>>, + connect: IfDisconnected, + ) { + if behaviour.is_connected(target) || connect.should_connect() { + let request_id = behaviour.send_request(target, request); + let prev_req_id = pending_requests.insert( + (protocol_name.to_string().into(), request_id).into(), + PendingRequest { + started_at: Instant::now(), + response_tx: pending_response, + fallback_request, + }, + ); + debug_assert!(prev_req_id.is_none(), "Expect request id to be unique."); + } else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() { + log::debug!( + target: "sub-libp2p", + "Not connected to peer {:?}. At the same time local \ + node is no longer interested in the result.", + target, + ); + } + } } impl NetworkBehaviour for RequestResponsesBehaviour { @@ -596,8 +630,10 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } } + let mut fallback_requests = vec![]; + // Poll request-responses protocols. - for (protocol, (behaviour, resp_builder)) in &mut self.protocols { + for (protocol, (ref mut behaviour, ref mut resp_builder)) in &mut self.protocols { 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx, params) { let ev = match ev { // Main events we are interested in. @@ -698,17 +734,21 @@ impl NetworkBehaviour for RequestResponsesBehaviour { .pending_requests .remove(&(protocol.clone(), request_id).into()) { - Some((started, pending_response)) => { + Some(PendingRequest { started_at, response_tx, .. }) => { log::trace!( target: "sub-libp2p", "received response from {peer} ({protocol:?}), {} bytes", response.as_ref().map_or(0usize, |response| response.len()), ); - let delivered = pending_response - .send(response.map_err(|()| RequestFailure::Refused)) + let delivered = response_tx + .send( + response + .map_err(|()| RequestFailure::Refused) + .map(|resp| (resp, protocol.clone())), + ) .map_err(|_| RequestFailure::Obsolete); - (started, delivered) + (started_at, delivered) }, None => { log::warn!( @@ -742,8 +782,34 @@ impl NetworkBehaviour for RequestResponsesBehaviour { .pending_requests .remove(&(protocol.clone(), request_id).into()) { - Some((started, pending_response)) => { - if pending_response + Some(PendingRequest { + started_at, + response_tx, + fallback_request, + }) => { + // Try using the fallback request if the protocol was not + // supported. + if let OutboundFailure::UnsupportedProtocols = error { + if let Some((fallback_request, fallback_protocol)) = + fallback_request + { + log::trace!( + target: "sub-libp2p", + "Request with id {:?} failed. Trying the fallback protocol. {}", + request_id, + fallback_protocol.deref() + ); + fallback_requests.push(( + peer, + fallback_protocol, + fallback_request, + response_tx, + )); + continue + } + } + + if response_tx .send(Err(RequestFailure::Network(error.clone()))) .is_err() { @@ -754,7 +820,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { request_id, ); } - started + started_at }, None => { log::warn!( @@ -825,6 +891,25 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } } + // Send out fallback requests. + for (peer, protocol, request, pending_response) in fallback_requests.drain(..) { + if let Some((behaviour, _)) = self.protocols.get_mut(&protocol) { + Self::send_request_inner( + behaviour, + &mut self.pending_requests, + &peer, + protocol, + request, + None, + pending_response, + // We can error if not connected because the + // previous attempt would have tried to establish a + // connection already or errored and we wouldn't have gotten here. + IfDisconnected::ImmediateError, + ); + } + } + break Poll::Pending } } @@ -976,6 +1061,7 @@ mod tests { use super::*; use crate::mock::MockPeerStore; + use assert_matches::assert_matches; use futures::{channel::oneshot, executor::LocalPool, task::Spawn}; use libp2p::{ core::{ @@ -1025,7 +1111,7 @@ mod tests { #[test] fn basic_request_response_works() { - let protocol_name = "/test/req-resp/1"; + let protocol_name = ProtocolName::from("/test/req-resp/1"); let mut pool = LocalPool::new(); // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. @@ -1053,7 +1139,7 @@ mod tests { .unwrap(); let protocol_config = ProtocolConfig { - name: From::from(protocol_name), + name: protocol_name.clone(), fallback_names: Vec::new(), max_request_size: 1024, max_response_size: 1024 * 1024, @@ -1102,8 +1188,9 @@ mod tests { let (sender, receiver) = oneshot::channel(); swarm.behaviour_mut().send_request( &peer_id, - protocol_name, + protocol_name.clone(), b"this is a request".to_vec(), + None, sender, IfDisconnected::ImmediateError, ); @@ -1118,13 +1205,16 @@ mod tests { } } - assert_eq!(response_receiver.unwrap().await.unwrap().unwrap(), b"this is a response"); + assert_eq!( + response_receiver.unwrap().await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name) + ); }); } #[test] fn max_response_size_exceeded() { - let protocol_name = "/test/req-resp/1"; + let protocol_name = ProtocolName::from("/test/req-resp/1"); let mut pool = LocalPool::new(); // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. @@ -1150,7 +1240,7 @@ mod tests { .unwrap(); let protocol_config = ProtocolConfig { - name: From::from(protocol_name), + name: protocol_name.clone(), fallback_names: Vec::new(), max_request_size: 1024, max_response_size: 8, // <-- important for the test @@ -1201,8 +1291,9 @@ mod tests { let (sender, receiver) = oneshot::channel(); swarm.behaviour_mut().send_request( &peer_id, - protocol_name, + protocol_name.clone(), b"this is a request".to_vec(), + None, sender, IfDisconnected::ImmediateError, ); @@ -1236,14 +1327,14 @@ mod tests { /// See [`ProtocolRequestId`] for additional information. #[test] fn request_id_collision() { - let protocol_name_1 = "/test/req-resp-1/1"; - let protocol_name_2 = "/test/req-resp-2/1"; + let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1"); + let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1"); let mut pool = LocalPool::new(); let mut swarm_1 = { let protocol_configs = vec![ ProtocolConfig { - name: From::from(protocol_name_1), + name: protocol_name_1.clone(), fallback_names: Vec::new(), max_request_size: 1024, max_response_size: 1024 * 1024, @@ -1251,7 +1342,7 @@ mod tests { inbound_queue: None, }, ProtocolConfig { - name: From::from(protocol_name_2), + name: protocol_name_2.clone(), fallback_names: Vec::new(), max_request_size: 1024, max_response_size: 1024 * 1024, @@ -1269,7 +1360,7 @@ mod tests { let protocol_configs = vec![ ProtocolConfig { - name: From::from(protocol_name_1), + name: protocol_name_1.clone(), fallback_names: Vec::new(), max_request_size: 1024, max_response_size: 1024 * 1024, @@ -1277,7 +1368,7 @@ mod tests { inbound_queue: Some(tx_1), }, ProtocolConfig { - name: From::from(protocol_name_2), + name: protocol_name_2.clone(), fallback_names: Vec::new(), max_request_size: 1024, max_response_size: 1024 * 1024, @@ -1359,15 +1450,17 @@ mod tests { let (sender_2, receiver_2) = oneshot::channel(); swarm_1.behaviour_mut().send_request( &peer_id, - protocol_name_1, + protocol_name_1.clone(), b"this is a request".to_vec(), + None, sender_1, IfDisconnected::ImmediateError, ); swarm_1.behaviour_mut().send_request( &peer_id, - protocol_name_2, + protocol_name_2.clone(), b"this is a request".to_vec(), + None, sender_2, IfDisconnected::ImmediateError, ); @@ -1385,8 +1478,239 @@ mod tests { } } let (response_receiver_1, response_receiver_2) = response_receivers.unwrap(); - assert_eq!(response_receiver_1.await.unwrap().unwrap(), b"this is a response"); - assert_eq!(response_receiver_2.await.unwrap().unwrap(), b"this is a response"); + assert_eq!( + response_receiver_1.await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name_1) + ); + assert_eq!( + response_receiver_2.await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name_2) + ); + }); + } + + #[test] + fn request_fallback() { + let protocol_name_1 = ProtocolName::from("/test/req-resp/2"); + let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1"); + let protocol_name_2 = ProtocolName::from("/test/another"); + let mut pool = LocalPool::new(); + + let protocol_config_1 = ProtocolConfig { + name: protocol_name_1.clone(), + fallback_names: Vec::new(), + max_request_size: 1024, + max_response_size: 1024 * 1024, + request_timeout: Duration::from_secs(30), + inbound_queue: None, + }; + let protocol_config_1_fallback = ProtocolConfig { + name: protocol_name_1_fallback.clone(), + fallback_names: Vec::new(), + max_request_size: 1024, + max_response_size: 1024 * 1024, + request_timeout: Duration::from_secs(30), + inbound_queue: None, + }; + let protocol_config_2 = ProtocolConfig { + name: protocol_name_2.clone(), + fallback_names: Vec::new(), + max_request_size: 1024, + max_response_size: 1024 * 1024, + request_timeout: Duration::from_secs(30), + inbound_queue: None, + }; + + // This swarm only speaks protocol_name_1_fallback and protocol_name_2. + // It only responds to requests. + let mut older_swarm = { + let (tx_1, mut rx_1) = async_channel::bounded::(64); + let (tx_2, mut rx_2) = async_channel::bounded::(64); + let mut protocol_config_1_fallback = protocol_config_1_fallback.clone(); + protocol_config_1_fallback.inbound_queue = Some(tx_1); + + let mut protocol_config_2 = protocol_config_2.clone(); + protocol_config_2.inbound_queue = Some(tx_2); + + pool.spawner() + .spawn_obj( + async move { + for _ in 0..2 { + if let Some(rq) = rx_1.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); + assert_eq!(rq.payload, b"request on protocol /test/req-resp/1"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok( + b"this is a response on protocol /test/req-resp/1".to_vec() + ), + reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), + }); + fb_rx.await.unwrap(); + } + } + + if let Some(rq) = rx_2.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); + assert_eq!(rq.payload, b"request on protocol /test/other"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this is a response on protocol /test/other".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), + }); + fb_rx.await.unwrap(); + } + } + .boxed() + .into(), + ) + .unwrap(); + + build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter()) + }; + + // This swarm speaks all protocols. + let mut new_swarm = build_swarm( + vec![ + protocol_config_1.clone(), + protocol_config_1_fallback.clone(), + protocol_config_2.clone(), + ] + .into_iter(), + ); + + { + let dial_addr = older_swarm.1.clone(); + Swarm::dial(&mut new_swarm.0, dial_addr).unwrap(); + } + + // Running `older_swarm`` in the background. + pool.spawner() + .spawn_obj({ + async move { + loop { + _ = older_swarm.0.select_next_some().await; + } + } + .boxed() + .into() + }) + .unwrap(); + + // Run the newer swarm. Attempt to make requests on all protocols. + let (mut swarm, _) = new_swarm; + let mut older_peer_id = None; + + pool.run_until(async move { + let mut response_receiver = None; + // Try the new protocol with a fallback. + loop { + match swarm.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + older_peer_id = Some(peer_id); + let (sender, receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + &peer_id, + protocol_name_1.clone(), + b"request on protocol /test/req-resp/2".to_vec(), + Some(( + b"request on protocol /test/req-resp/1".to_vec(), + protocol_config_1_fallback.name.clone(), + )), + sender, + IfDisconnected::ImmediateError, + ); + response_receiver = Some(receiver); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, + } + } + assert_eq!( + response_receiver.unwrap().await.unwrap().unwrap(), + ( + b"this is a response on protocol /test/req-resp/1".to_vec(), + protocol_name_1_fallback.clone() + ) + ); + // Try the old protocol with a useless fallback. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_1_fallback.clone(), + b"request on protocol /test/req-resp/1".to_vec(), + Some(( + b"dummy request, will fail if processed".to_vec(), + protocol_config_1_fallback.name.clone(), + )), + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, + } + } + assert_eq!( + response_receiver.await.unwrap().unwrap(), + ( + b"this is a response on protocol /test/req-resp/1".to_vec(), + protocol_name_1_fallback.clone() + ) + ); + // Try the new protocol with no fallback. Should fail. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_1.clone(), + b"request on protocol /test/req-resp-2".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + assert_matches!( + result.unwrap_err(), + RequestFailure::Network(OutboundFailure::UnsupportedProtocols) + ); + break + }, + _ => {}, + } + } + assert!(response_receiver.await.unwrap().is_err()); + // Try the other protocol with no fallback. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_2.clone(), + b"request on protocol /test/other".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, + } + } + assert_eq!( + response_receiver.await.unwrap().unwrap(), + (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone()) + ); }); } } diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 06db23844d0d9..47e23337633ba 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -1048,11 +1048,12 @@ where target: PeerId, protocol: ProtocolName, request: Vec, + fallback_request: Option<(Vec, ProtocolName)>, connect: IfDisconnected, - ) -> Result, RequestFailure> { + ) -> Result<(Vec, ProtocolName), RequestFailure> { let (tx, rx) = oneshot::channel(); - self.start_request(target, protocol, request, tx, connect); + self.start_request(target, protocol, request, fallback_request, tx, connect); match rx.await { Ok(v) => v, @@ -1068,13 +1069,15 @@ where target: PeerId, protocol: ProtocolName, request: Vec, - tx: oneshot::Sender, RequestFailure>>, + fallback_request: Option<(Vec, ProtocolName)>, + tx: oneshot::Sender, ProtocolName), RequestFailure>>, connect: IfDisconnected, ) { let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request { target, protocol: protocol.into(), request, + fallback_request, pending_response: tx, connect, }); @@ -1160,7 +1163,8 @@ enum ServiceToWorkerMsg { target: PeerId, protocol: ProtocolName, request: Vec, - pending_response: oneshot::Sender, RequestFailure>>, + fallback_request: Option<(Vec, ProtocolName)>, + pending_response: oneshot::Sender, ProtocolName), RequestFailure>>, connect: IfDisconnected, }, NetworkStatus { @@ -1287,13 +1291,15 @@ where target, protocol, request, + fallback_request, pending_response, connect, } => { self.network_service.behaviour_mut().send_request( &target, - &protocol, + protocol, request, + fallback_request, pending_response, connect, ); diff --git a/substrate/client/network/src/service/traits.rs b/substrate/client/network/src/service/traits.rs index d4d4a05a86f1d..74ddb986c247a 100644 --- a/substrate/client/network/src/service/traits.rs +++ b/substrate/client/network/src/service/traits.rs @@ -551,8 +551,9 @@ pub trait NetworkRequest { target: PeerId, protocol: ProtocolName, request: Vec, + fallback_request: Option<(Vec, ProtocolName)>, connect: IfDisconnected, - ) -> Result, RequestFailure>; + ) -> Result<(Vec, ProtocolName), RequestFailure>; /// Variation of `request` which starts a request whose response is delivered on a provided /// channel. @@ -569,7 +570,8 @@ pub trait NetworkRequest { target: PeerId, protocol: ProtocolName, request: Vec, - tx: oneshot::Sender, RequestFailure>>, + fallback_request: Option<(Vec, ProtocolName)>, + tx: oneshot::Sender, ProtocolName), RequestFailure>>, connect: IfDisconnected, ); } @@ -585,13 +587,20 @@ where target: PeerId, protocol: ProtocolName, request: Vec, + fallback_request: Option<(Vec, ProtocolName)>, connect: IfDisconnected, - ) -> Pin, RequestFailure>> + Send + 'async_trait>> + ) -> Pin< + Box< + dyn Future, ProtocolName), RequestFailure>> + + Send + + 'async_trait, + >, + > where 'life0: 'async_trait, Self: 'async_trait, { - T::request(self, target, protocol, request, connect) + T::request(self, target, protocol, request, fallback_request, connect) } fn start_request( @@ -599,10 +608,11 @@ where target: PeerId, protocol: ProtocolName, request: Vec, - tx: oneshot::Sender, RequestFailure>>, + fallback_request: Option<(Vec, ProtocolName)>, + tx: oneshot::Sender, ProtocolName), RequestFailure>>, connect: IfDisconnected, ) { - T::start_request(self, target, protocol, request, tx, connect) + T::start_request(self, target, protocol, request, fallback_request, tx, connect) } } diff --git a/substrate/client/network/sync/src/block_relay_protocol.rs b/substrate/client/network/sync/src/block_relay_protocol.rs index 7a313458bf034..b4ef72a10c6b8 100644 --- a/substrate/client/network/sync/src/block_relay_protocol.rs +++ b/substrate/client/network/sync/src/block_relay_protocol.rs @@ -18,7 +18,10 @@ use futures::channel::oneshot; use libp2p::PeerId; -use sc_network::request_responses::{ProtocolConfig, RequestFailure}; +use sc_network::{ + request_responses::{ProtocolConfig, RequestFailure}, + ProtocolName, +}; use sc_network_common::sync::message::{BlockData, BlockRequest}; use sp_runtime::traits::Block as BlockT; use std::sync::Arc; @@ -43,7 +46,7 @@ pub trait BlockDownloader: Send + Sync { &self, who: PeerId, request: BlockRequest, - ) -> Result, RequestFailure>, oneshot::Canceled>; + ) -> Result, ProtocolName), RequestFailure>, oneshot::Canceled>; /// Parses the protocol specific response to retrieve the block data. fn block_response_into_blocks( diff --git a/substrate/client/network/sync/src/block_request_handler.rs b/substrate/client/network/sync/src/block_request_handler.rs index f363dda3a2d18..f669a22cd2e94 100644 --- a/substrate/client/network/sync/src/block_request_handler.rs +++ b/substrate/client/network/sync/src/block_request_handler.rs @@ -570,7 +570,7 @@ impl BlockDownloader for FullBlockDownloader { &self, who: PeerId, request: BlockRequest, - ) -> Result, RequestFailure>, oneshot::Canceled> { + ) -> Result, ProtocolName), RequestFailure>, oneshot::Canceled> { // Build the request protobuf. let bytes = BlockRequestSchema { fields: request.fields.to_be_u32(), diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index d7b024cd801c7..952300a14d891 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -1263,7 +1263,7 @@ where let ResponseEvent { peer_id, request, response } = response_event; match response { - Ok(Ok(resp)) => match request { + Ok(Ok((resp, _))) => match request { PeerRequest::Block(req) => { match self.block_downloader.block_response_into_blocks(&req, resp) { Ok(blocks) => { diff --git a/substrate/client/network/sync/src/mock.rs b/substrate/client/network/sync/src/mock.rs index 42220096e0695..a4f5eb564c2cd 100644 --- a/substrate/client/network/sync/src/mock.rs +++ b/substrate/client/network/sync/src/mock.rs @@ -22,7 +22,7 @@ use crate::block_relay_protocol::{BlockDownloader as BlockDownloaderT, BlockResp use futures::channel::oneshot; use libp2p::PeerId; -use sc_network::RequestFailure; +use sc_network::{ProtocolName, RequestFailure}; use sc_network_common::sync::message::{BlockData, BlockRequest}; use sp_runtime::traits::Block as BlockT; @@ -35,7 +35,7 @@ mockall::mock! { &self, who: PeerId, request: BlockRequest, - ) -> Result, RequestFailure>, oneshot::Canceled>; + ) -> Result, ProtocolName), RequestFailure>, oneshot::Canceled>; fn block_response_into_blocks( &self, request: &BlockRequest, diff --git a/substrate/client/network/sync/src/pending_responses.rs b/substrate/client/network/sync/src/pending_responses.rs index 55308dfc1ea90..e21a576322508 100644 --- a/substrate/client/network/sync/src/pending_responses.rs +++ b/substrate/client/network/sync/src/pending_responses.rs @@ -28,7 +28,7 @@ use futures::{ }; use libp2p::PeerId; use log::error; -use sc_network::request_responses::RequestFailure; +use sc_network::{request_responses::RequestFailure, types::ProtocolName}; use sp_runtime::traits::Block as BlockT; use std::task::{Context, Poll, Waker}; use tokio_stream::StreamMap; @@ -37,7 +37,7 @@ use tokio_stream::StreamMap; const LOG_TARGET: &'static str = "sync"; /// Response result. -type ResponseResult = Result, RequestFailure>, oneshot::Canceled>; +type ResponseResult = Result, ProtocolName), RequestFailure>, oneshot::Canceled>; /// A future yielding [`ResponseResult`]. type ResponseFuture = BoxFuture<'static, ResponseResult>; diff --git a/substrate/client/network/sync/src/service/mock.rs b/substrate/client/network/sync/src/service/mock.rs index 6e307d8698444..420de8cd5fdcf 100644 --- a/substrate/client/network/sync/src/service/mock.rs +++ b/substrate/client/network/sync/src/service/mock.rs @@ -117,14 +117,16 @@ mockall::mock! { target: PeerId, protocol: ProtocolName, request: Vec, + fallback_request: Option<(Vec, ProtocolName)>, connect: IfDisconnected, - ) -> Result, RequestFailure>; + ) -> Result<(Vec, ProtocolName), RequestFailure>; fn start_request( &self, target: PeerId, protocol: ProtocolName, request: Vec, - tx: oneshot::Sender, RequestFailure>>, + fallback_request: Option<(Vec, ProtocolName)>, + tx: oneshot::Sender, ProtocolName), RequestFailure>>, connect: IfDisconnected, ); } diff --git a/substrate/client/network/sync/src/service/network.rs b/substrate/client/network/sync/src/service/network.rs index 12a47d6a9b544..07f28519afb2b 100644 --- a/substrate/client/network/sync/src/service/network.rs +++ b/substrate/client/network/sync/src/service/network.rs @@ -54,7 +54,7 @@ pub enum ToServiceCommand { PeerId, ProtocolName, Vec, - oneshot::Sender, RequestFailure>>, + oneshot::Sender, ProtocolName), RequestFailure>>, IfDisconnected, ), @@ -94,7 +94,7 @@ impl NetworkServiceHandle { who: PeerId, protocol: ProtocolName, request: Vec, - tx: oneshot::Sender, RequestFailure>>, + tx: oneshot::Sender, ProtocolName), RequestFailure>>, connect: IfDisconnected, ) { let _ = self @@ -134,7 +134,7 @@ impl NetworkServiceProvider { ToServiceCommand::ReportPeer(peer, reputation_change) => service.report_peer(peer, reputation_change), ToServiceCommand::StartRequest(peer, protocol, request, tx, connect) => - service.start_request(peer, protocol, request, tx, connect), + service.start_request(peer, protocol, request, None, tx, connect), ToServiceCommand::WriteNotification(peer, protocol, message) => service.write_notification(peer, protocol, message), ToServiceCommand::SetNotificationHandshake(protocol, handshake) =>