Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Add explicit limits to notifications sizes and adjust yamux buffer size #7925

Merged
4 commits merged into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,8 @@ pub struct GrandpaParams<Block: BlockT, C, N, SC, VR> {
pub fn grandpa_peers_set_config() -> sc_network::config::NonDefaultSetConfig {
sc_network::config::NonDefaultSetConfig {
notifications_protocol: communication::GRANDPA_PROTOCOL_NAME.into(),
// Notifications reach ~256kiB in size at the time of writing on Kusama and Polkadot.
max_notification_size: 1024 * 1024,
set_config: sc_network::config::SetConfig {
in_peers: 25,
out_peers: 25,
Expand Down
2 changes: 2 additions & 0 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ pub struct NonDefaultSetConfig {
/// > **Note**: This field isn't present for the default set, as this is handled internally
/// > by the networking code.
pub notifications_protocol: Cow<'static, str>,
/// Maximum allowed size of single notifications.
pub max_notification_size: u64,
/// Base configuration.
pub set_config: SetConfig,
}
Expand Down
2 changes: 2 additions & 0 deletions client/network/src/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ fn build_nodes_one_proto()
extra_sets: vec![
config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME,
max_notification_size: 1024 * 1024,
set_config: Default::default()
}
],
Expand All @@ -157,6 +158,7 @@ fn build_nodes_one_proto()
extra_sets: vec![
config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME,
max_notification_size: 1024 * 1024,
set_config: config::SetConfig {
reserved_nodes: vec![config::MultiaddrWithPeerId {
multiaddr: listen_addr,
Expand Down
13 changes: 8 additions & 5 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,16 +475,19 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
best_hash,
genesis_hash,
).encode();

GenericProto::new(
protocol_id.clone(),
versions,
build_status_message::<B>(&config, best_number, best_hash, genesis_hash),
peerset,
iter::once((block_announces_protocol, block_announces_handshake))
.chain(iter::once((transactions_protocol, vec![])))
.chain(network_config.extra_sets.iter()
.map(|s| (s.notifications_protocol.clone(), handshake_message.clone()))
),
iter::once((block_announces_protocol, block_announces_handshake, 1024 * 1024))
.chain(iter::once((transactions_protocol, vec![], 1024 * 1024)))
.chain(network_config.extra_sets.iter().map(|s| (
s.notifications_protocol.clone(),
handshake_message.clone(),
s.max_notification_size
))),
)
};

Expand Down
6 changes: 3 additions & 3 deletions client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub struct GenericProto {
/// Notification protocols. Entries are only ever added and not removed.
/// Contains, for each protocol, the protocol name and the message to send as part of the
/// initial handshake.
notif_protocols: Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>)>,
notif_protocols: Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>, u64)>,

/// Receiver for instructions about who to connect to or disconnect from.
peerset: sc_peerset::Peerset,
Expand Down Expand Up @@ -374,10 +374,10 @@ impl GenericProto {
versions: &[u8],
handshake_message: Vec<u8>,
peerset: sc_peerset::Peerset,
notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>)>,
notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>, u64)>,
) -> Self {
let notif_protocols = notif_protocols
.map(|(n, hs)| (n, Arc::new(RwLock::new(hs))))
.map(|(n, hs, sz)| (n, Arc::new(RwLock::new(hs)), sz))
.collect::<Vec<_>>();

assert!(!notif_protocols.is_empty());
Expand Down
31 changes: 19 additions & 12 deletions client/network/src/protocol/generic_proto/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
pub struct NotifsHandlerProto {
/// Name of protocols, prototypes for upgrades for inbound substreams, and the message we
/// send or respond with in the handshake.
protocols: Vec<(Cow<'static, str>, NotificationsIn, Arc<RwLock<Vec<u8>>>)>,
protocols: Vec<(Cow<'static, str>, NotificationsIn, Arc<RwLock<Vec<u8>>>, u64)>,

/// Configuration for the legacy protocol upgrade.
legacy_protocol: RegisteredProtocol,
Expand Down Expand Up @@ -161,6 +161,9 @@ struct Protocol {
/// Handshake to send when opening a substream or receiving an open request.
handshake: Arc<RwLock<Vec<u8>>>,

/// Maximum allowed size of individual notifications.
max_notification_size: u64,

/// Current state of the substreams for this protocol.
state: State,
}
Expand Down Expand Up @@ -226,22 +229,23 @@ impl IntoProtocolsHandler for NotifsHandlerProto {

fn inbound_protocol(&self) -> SelectUpgrade<UpgradeCollec<NotificationsIn>, RegisteredProtocol> {
let protocols = self.protocols.iter()
.map(|(_, p, _)| p.clone())
.map(|(_, p, _, _)| p.clone())
.collect::<UpgradeCollec<_>>();

SelectUpgrade::new(protocols, self.legacy_protocol.clone())
}

fn into_handler(self, peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
NotifsHandler {
protocols: self.protocols.into_iter().map(|(name, in_upgrade, handshake)| {
protocols: self.protocols.into_iter().map(|(name, in_upgrade, handshake, max_size)| {
Protocol {
name,
in_upgrade,
handshake,
state: State::Closed {
pending_opening: false,
}
},
max_notification_size: max_size,
}
}).collect(),
peer_id: peer_id.clone(),
Expand Down Expand Up @@ -467,18 +471,19 @@ pub enum NotifsHandlerError {
impl NotifsHandlerProto {
/// Builds a new handler.
///
/// `list` is a list of notification protocols names, and the message to send as part of the
/// handshake. At the moment, the message is always the same whether we open a substream
/// ourselves or respond to handshake from the remote.
/// `list` is a list of notification protocols names, the message to send as part of the
/// handshake, and the maximum allowed size of a notification. At the moment, the message
/// is always the same whether we open a substream ourselves or respond to handshake from
/// the remote.
pub fn new(
legacy_protocol: RegisteredProtocol,
list: impl Into<Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>)>>,
list: impl Into<Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>, u64)>>,
) -> Self {
let protocols = list
.into()
.into_iter()
.map(|(proto_name, msg)| {
(proto_name.clone(), NotificationsIn::new(proto_name), msg)
.map(|(proto_name, msg, max_notif_size)| {
(proto_name.clone(), NotificationsIn::new(proto_name, max_notif_size), msg, max_notif_size)
})
.collect();

Expand Down Expand Up @@ -624,7 +629,8 @@ impl ProtocolsHandler for NotifsHandler {
if !*pending_opening {
let proto = NotificationsOut::new(
protocol_info.name.clone(),
protocol_info.handshake.read().clone()
protocol_info.handshake.read().clone(),
protocol_info.max_notification_size
);

self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
Expand All @@ -643,7 +649,8 @@ impl ProtocolsHandler for NotifsHandler {
if !*pending_opening {
let proto = NotificationsOut::new(
protocol_info.name.clone(),
handshake_message.clone()
handshake_message.clone(),
protocol_info.max_notification_size,
);

self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/protocol/generic_proto/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
let behaviour = CustomProtoWithAddr {
inner: GenericProto::new(
"test", &[1], vec![], peerset,
iter::once(("/foo".into(), Vec::new()))
iter::once(("/foo".into(), Vec::new(), 1024 * 1024))
),
addrs: addrs
.iter()
Expand Down
46 changes: 31 additions & 15 deletions client/network/src/protocol/generic_proto/upgrade/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use futures::prelude::*;
use asynchronous_codec::Framed;
use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade};
use log::error;
use std::{borrow::Cow, convert::Infallible, io, iter, mem, pin::Pin, task::{Context, Poll}};
use std::{borrow::Cow, convert::{Infallible, TryFrom as _}, io, iter, mem, pin::Pin, task::{Context, Poll}};
use unsigned_varint::codec::UviBytes;

/// Maximum allowed size of the two handshake messages, in bytes.
Expand All @@ -53,6 +53,8 @@ const MAX_HANDSHAKE_SIZE: usize = 1024;
pub struct NotificationsIn {
/// Protocol name to use when negotiating the substream.
protocol_name: Cow<'static, str>,
/// Maximum allowed size for a single notification.
max_notification_size: u64,
}

/// Upgrade that opens a substream, waits for the remote to accept by sending back a status
Expand All @@ -63,6 +65,8 @@ pub struct NotificationsOut {
protocol_name: Cow<'static, str>,
/// Message to send when we start the handshake.
initial_message: Vec<u8>,
/// Maximum allowed size for a single notification.
max_notification_size: u64,
}

/// A substream for incoming notification messages.
Expand Down Expand Up @@ -102,9 +106,10 @@ pub struct NotificationsOutSubstream<TSubstream> {

impl NotificationsIn {
/// Builds a new potential upgrade.
pub fn new(protocol_name: impl Into<Cow<'static, str>>) -> Self {
pub fn new(protocol_name: impl Into<Cow<'static, str>>, max_notification_size: u64) -> Self {
NotificationsIn {
protocol_name: protocol_name.into(),
max_notification_size,
}
}
}
Expand Down Expand Up @@ -148,8 +153,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
socket.read_exact(&mut initial_message).await?;
}

let mut codec = UviBytes::default();
codec.set_max_len(usize::try_from(self.max_notification_size).unwrap_or(usize::max_value()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need that try_from? Why not simply make max_notification_size an usize to begin with?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size of something that is transmitted on the network should never be a usize, as we want this size limit to be the same for all participants.
However a buffer length, however, is correctly a usize.


let substream = NotificationsInSubstream {
socket: Framed::new(socket, UviBytes::default()),
socket: Framed::new(socket, codec),
handshake: NotificationsInSubstreamHandshake::NotSent,
};

Expand Down Expand Up @@ -287,7 +295,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,

impl NotificationsOut {
/// Builds a new potential upgrade.
pub fn new(protocol_name: impl Into<Cow<'static, str>>, initial_message: impl Into<Vec<u8>>) -> Self {
pub fn new(
protocol_name: impl Into<Cow<'static, str>>,
initial_message: impl Into<Vec<u8>>,
max_notification_size: u64,
) -> Self {
let initial_message = initial_message.into();
if initial_message.len() > MAX_HANDSHAKE_SIZE {
error!(target: "sub-libp2p", "Outbound networking handshake is above allowed protocol limit");
Expand All @@ -296,6 +308,7 @@ impl NotificationsOut {
NotificationsOut {
protocol_name: protocol_name.into(),
initial_message,
max_notification_size,
}
}
}
Expand Down Expand Up @@ -342,8 +355,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
socket.read_exact(&mut handshake).await?;
}

let mut codec = UviBytes::default();
codec.set_max_len(usize::try_from(self.max_notification_size).unwrap_or(usize::max_value()));

Ok((handshake, NotificationsOutSubstream {
socket: Framed::new(socket, UviBytes::default()),
socket: Framed::new(socket, codec),
}))
})
}
Expand Down Expand Up @@ -436,7 +452,7 @@ mod tests {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let (handshake, mut substream) = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, &b"initial message"[..]),
NotificationsOut::new(PROTO_NAME, &b"initial message"[..], 1024 * 1024),
upgrade::Version::V1
).await.unwrap();

Expand All @@ -451,7 +467,7 @@ mod tests {
let (socket, _) = listener.accept().await.unwrap();
let (initial_message, mut substream) = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME)
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
).await.unwrap();

assert_eq!(initial_message, b"initial message");
Expand All @@ -475,7 +491,7 @@ mod tests {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let (handshake, mut substream) = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, vec![]),
NotificationsOut::new(PROTO_NAME, vec![], 1024 * 1024),
upgrade::Version::V1
).await.unwrap();

Expand All @@ -490,7 +506,7 @@ mod tests {
let (socket, _) = listener.accept().await.unwrap();
let (initial_message, mut substream) = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME)
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
).await.unwrap();

assert!(initial_message.is_empty());
Expand All @@ -512,7 +528,7 @@ mod tests {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let outcome = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, &b"hello"[..]),
NotificationsOut::new(PROTO_NAME, &b"hello"[..], 1024 * 1024),
upgrade::Version::V1
).await;

Expand All @@ -529,7 +545,7 @@ mod tests {
let (socket, _) = listener.accept().await.unwrap();
let (initial_msg, substream) = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME)
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
).await.unwrap();

assert_eq!(initial_msg, b"hello");
Expand All @@ -551,7 +567,7 @@ mod tests {
let ret = upgrade::apply_outbound(
socket,
// We check that an initial message that is too large gets refused.
NotificationsOut::new(PROTO_NAME, (0..32768).map(|_| 0).collect::<Vec<_>>()),
NotificationsOut::new(PROTO_NAME, (0..32768).map(|_| 0).collect::<Vec<_>>(), 1024 * 1024),
upgrade::Version::V1
).await;
assert!(ret.is_err());
Expand All @@ -564,7 +580,7 @@ mod tests {
let (socket, _) = listener.accept().await.unwrap();
let ret = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME)
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
).await;
assert!(ret.is_err());
});
Expand All @@ -581,7 +597,7 @@ mod tests {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let ret = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, &b"initial message"[..]),
NotificationsOut::new(PROTO_NAME, &b"initial message"[..], 1024 * 1024),
upgrade::Version::V1
).await;
assert!(ret.is_err());
Expand All @@ -594,7 +610,7 @@ mod tests {
let (socket, _) = listener.accept().await.unwrap();
let (initial_message, mut substream) = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME)
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
).await.unwrap();
assert_eq!(initial_message, b"initial message");

Expand Down
Loading