Skip to content

Commit

Permalink
Bridge: drop subscriptions when they are no longer required (parityte…
Browse files Browse the repository at this point in the history
…ch#4481)

The bridge relay is **not** using `tokio`, while `jsonrpsee` does. To
make it work together, we are spawning a separate tokio task for every
jsonrpsee subscription, which holds a subscription reference. It looks
like we are not stopping those tasks when we no longer need it and when
there are more than `1024` active subscriptions, `jsonrpsee` stops
opening new subscriptions. This PR adds an `cancel` signal that is sent
to the background task when we no longer need a subscription.
  • Loading branch information
svyatonik authored and TarekkMA committed Aug 2, 2024
1 parent 7047bf1 commit 0fb23b0
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 14 deletions.
66 changes: 54 additions & 12 deletions bridges/relays/client-substrate/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ pub fn is_ancient_block<N: From<u32> + PartialOrd + Saturating>(block: N, best:
}

/// Opaque justifications subscription type.
pub struct Subscription<T>(pub(crate) Mutex<futures::channel::mpsc::Receiver<Option<T>>>);
pub struct Subscription<T>(
pub(crate) Mutex<futures::channel::mpsc::Receiver<Option<T>>>,
// The following field is not explicitly used by the code. But when it is dropped,
// the bakground task receives a shutdown signal.
#[allow(dead_code)] pub(crate) futures::channel::oneshot::Sender<()>,
);

/// Opaque GRANDPA authorities set.
pub type OpaqueGrandpaAuthoritiesSet = Vec<u8>;
Expand Down Expand Up @@ -621,6 +626,7 @@ impl<C: Chain> Client<C> {
e
})??;

let (cancel_sender, cancel_receiver) = futures::channel::oneshot::channel();
let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);
let (tracker, subscription) = self
.jsonrpsee_execute(move |client| async move {
Expand All @@ -639,7 +645,7 @@ impl<C: Chain> Client<C> {
self_clone,
stall_timeout,
tx_hash,
Subscription(Mutex::new(receiver)),
Subscription(Mutex::new(receiver), cancel_sender),
);
Ok((tracker, subscription))
})
Expand All @@ -649,6 +655,7 @@ impl<C: Chain> Client<C> {
"extrinsic".into(),
subscription,
sender,
cancel_receiver,
));
Ok(tracker)
}
Expand Down Expand Up @@ -790,14 +797,16 @@ impl<C: Chain> Client<C> {
Ok(FC::subscribe_justifications(&client).await?)
})
.await?;
let (cancel_sender, cancel_receiver) = futures::channel::oneshot::channel();
let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);
self.data.read().await.tokio.spawn(Subscription::background_worker(
C::NAME.into(),
"justification".into(),
subscription,
sender,
cancel_receiver,
));
Ok(Subscription(Mutex::new(receiver)))
Ok(Subscription(Mutex::new(receiver), cancel_sender))
}

/// Generates a proof of key ownership for the given authority in the given set.
Expand Down Expand Up @@ -843,9 +852,17 @@ impl<C: Chain> Client<C> {
impl<T: DeserializeOwned> Subscription<T> {
/// Consumes subscription and returns future statuses stream.
pub fn into_stream(self) -> impl futures::Stream<Item = T> {
futures::stream::unfold(self, |this| async {
futures::stream::unfold(Some(self), |mut this| async move {
let Some(this) = this.take() else { return None };
let item = this.0.lock().await.next().await.unwrap_or(None);
item.map(|i| (i, this))
match item {
Some(item) => Some((item, Some(this))),
None => {
// let's make it explicit here
let _ = this.1.send(());
None
},
}
})
}

Expand All @@ -860,36 +877,61 @@ impl<T: DeserializeOwned> Subscription<T> {
async fn background_worker(
chain_name: String,
item_type: String,
mut subscription: jsonrpsee::core::client::Subscription<T>,
subscription: jsonrpsee::core::client::Subscription<T>,
mut sender: futures::channel::mpsc::Sender<Option<T>>,
cancel_receiver: futures::channel::oneshot::Receiver<()>,
) {
log::trace!(
target: "bridge",
"Starting background worker for {} {} subscription stream.",
chain_name,
item_type,
);

futures::pin_mut!(subscription, cancel_receiver);
loop {
match subscription.next().await {
Some(Ok(item)) =>
match futures::future::select(subscription.next(), &mut cancel_receiver).await {
futures::future::Either::Left((Some(Ok(item)), _)) =>
if sender.send(Some(item)).await.is_err() {
log::trace!(
target: "bridge",
"{} {} subscription stream: no listener. Stopping background worker.",
chain_name,
item_type,
);

break
},
Some(Err(e)) => {
futures::future::Either::Left((Some(Err(e)), _)) => {
log::trace!(
target: "bridge",
"{} {} subscription stream has returned '{:?}'. Stream needs to be restarted.",
"{} {} subscription stream has returned '{:?}'. Stream needs to be restarted. Stopping background worker.",
chain_name,
item_type,
e,
);
let _ = sender.send(None).await;
break
},
None => {
futures::future::Either::Left((None, _)) => {
log::trace!(
target: "bridge",
"{} {} subscription stream has returned None. Stream needs to be restarted.",
"{} {} subscription stream has returned None. Stream needs to be restarted. Stopping background worker.",
chain_name,
item_type,
);
let _ = sender.send(None).await;
break
},
futures::future::Either::Right((_, _)) => {
log::trace!(
target: "bridge",
"{} {} subscription stream: listener has been dropped. Stopping background worker.",
chain_name,
item_type,
);
break;
},
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions bridges/relays/client-substrate/src/transaction_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,13 @@ mod tests {
TrackedTransactionStatus<HeaderIdOf<TestChain>>,
InvalidationStatus<HeaderIdOf<TestChain>>,
)> {
let (cancel_sender, _cancel_receiver) = futures::channel::oneshot::channel();
let (mut sender, receiver) = futures::channel::mpsc::channel(1);
let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new(
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Duration::from_secs(0),
Default::default(),
Subscription(async_std::sync::Mutex::new(receiver)),
Subscription(async_std::sync::Mutex::new(receiver), cancel_sender),
);

let wait_for_stall_timeout = futures::future::pending();
Expand Down Expand Up @@ -428,12 +429,13 @@ mod tests {

#[async_std::test]
async fn lost_on_timeout_when_waiting_for_invalidation_status() {
let (cancel_sender, _cancel_receiver) = futures::channel::oneshot::channel();
let (_sender, receiver) = futures::channel::mpsc::channel(1);
let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new(
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Duration::from_secs(0),
Default::default(),
Subscription(async_std::sync::Mutex::new(receiver)),
Subscription(async_std::sync::Mutex::new(receiver), cancel_sender),
);

let wait_for_stall_timeout = futures::future::ready(()).shared();
Expand Down

0 comments on commit 0fb23b0

Please sign in to comment.