Skip to content

Commit

Permalink
Sync tx submission with chainHead_follow (#1305)
Browse files Browse the repository at this point in the history
* sync tx submission with chainHead_follow

* make it compile

* add small comment
  • Loading branch information
jsdw authored Dec 4, 2023
1 parent 14b7127 commit c3b4331
Showing 1 changed file with 47 additions and 13 deletions.
60 changes: 47 additions & 13 deletions subxt/src/backend/unstable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,26 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
&self,
extrinsic: &[u8],
) -> Result<StreamOfResults<TransactionStatus<T::Hash>>, Error> {
// First, subscribe to all new block hashes
let mut new_blocks = self.follow_handle.subscribe().events().filter_map(|ev| {
// We care about new and finalized block hashes.
enum SeenBlock<Ref> {
New(Ref),
Finalized(Vec<Ref>),
}
enum SeenBlockMarker {
New,
Finalized,
}

// First, subscribe to all new and finalized block refs.
// - we subscribe to new refs so that when we see `BestChainBlockIncluded`, we
// can try to return a block ref for the best block.
// - we subscribe to finalized refs so that when we see `Finalized`, we can
// guarantee that when we return here, the finalized block we report has been
// reported from chainHead_follow already.
let mut seen_blocks_sub = self.follow_handle.subscribe().events().filter_map(|ev| {
std::future::ready(match ev {
FollowEvent::NewBlock(ev) => Some(ev.block_hash),
FollowEvent::NewBlock(ev) => Some(SeenBlock::New(ev.block_hash)),
FollowEvent::Finalized(ev) => Some(SeenBlock::Finalized(ev.finalized_block_hashes)),
_ => None,
})
});
Expand All @@ -453,8 +469,9 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
let mut seen_blocks = HashMap::new();
let mut done = false;

// If we see the finalized event, we start waiting until we find a block that
// matches, so we can guarantee to return a pinned block hash.
// If we see the finalized event, we start waiting until we find a finalized block that
// matches, so we can guarantee to return a pinned block hash and be properly in sync
// with chainHead_follow.
let mut finalized_hash: Option<T::Hash> = None;

// Now we can attempt to associate tx events with pinned blocks.
Expand All @@ -465,25 +482,42 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
return Poll::Ready(None);
}

// Save any pinned blocks. Keep doing this until no more, so that we always have the most uptodate
// pinned blocks when we are looking at our tx events.
if let Poll::Ready(Some(block_ref)) = new_blocks.poll_next_unpin(cx) {
seen_blocks.insert(block_ref.hash(), block_ref);
// Make a note of new or finalized blocks that have come in since we started the TX.
if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) {
match seen_block {
SeenBlock::New(block_ref) => {
// Optimization: once we have a `finalized_hash`, we only care about finalized
// block refs now and can avoid bothering to save new blocks.
if finalized_hash.is_none() {
seen_blocks
.insert(block_ref.hash(), (SeenBlockMarker::New, block_ref));
}
}
SeenBlock::Finalized(block_refs) => {
for block_ref in block_refs {
seen_blocks.insert(
block_ref.hash(),
(SeenBlockMarker::Finalized, block_ref),
);
}
}
}
continue;
}

// If we have a finalized hash, we are done looking for tx events and we are just waiting
// for a pinned block with a matching hash (which must appear eventually given it's finalized).
if let Some(hash) = &finalized_hash {
if let Some(block_ref) = seen_blocks.remove(hash) {
if let Some((SeenBlockMarker::Finalized, block_ref)) = seen_blocks.remove(hash)
{
// Found it! Hand back the event with a pinned block. We're done.
done = true;
let ev = TransactionStatus::InFinalizedBlock {
hash: block_ref.into(),
};
return Poll::Ready(Some(Ok(ev)));
} else {
// Keep waiting for more new blocks until we find it (get rid of any other block refs
// Keep waiting for more finalized blocks until we find it (get rid of any other block refs
// now, since none of them were what we were looking for anyway).
seen_blocks.clear();
continue;
Expand Down Expand Up @@ -517,8 +551,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
// block that likely isn't accessible. We have no guarantee that a best
// block on the node a tx was sent to will ever be known about on the
// chainHead_follow subscription.
let block_ref = match seen_blocks.get(&block.hash).cloned() {
Some(block_ref) => block_ref.into(),
let block_ref = match seen_blocks.get(&block.hash) {
Some((_, block_ref)) => block_ref.clone().into(),
None => BlockRef::from_hash(block.hash),
};
TransactionStatus::InBestBlock { hash: block_ref }
Expand Down

0 comments on commit c3b4331

Please sign in to comment.