Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Track L1 block for last L2 block body retrieved #7927

Merged
merged 2 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,21 @@ export class Archiver implements ArchiveSource {
await this.sync(blockUntilSynced);
}

this.runningPromise = new RunningPromise(() => this.sync(false), this.pollingIntervalMs);
this.runningPromise = new RunningPromise(() => this.safeSync(), this.pollingIntervalMs);
this.runningPromise.start();
}

/**
* Syncs and catches exceptions.
*/
private async safeSync() {
try {
await this.sync(false);
} catch (error) {
this.log.error('Error syncing archiver', error);
}
}

/**
* Fetches logs from L1 contracts and processes them.
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
Expand All @@ -166,10 +177,14 @@ export class Archiver implements ArchiveSource {
*
* This code does not handle reorgs.
*/
const { blocksSynchedTo, messagesSynchedTo } = await this.store.getSynchPoint();
const { blockBodiesSynchedTo, blocksSynchedTo, messagesSynchedTo } = await this.store.getSynchPoint();
const currentL1BlockNumber = await this.publicClient.getBlockNumber();

if (currentL1BlockNumber <= blocksSynchedTo && currentL1BlockNumber <= messagesSynchedTo) {
if (
currentL1BlockNumber <= blocksSynchedTo &&
currentL1BlockNumber <= messagesSynchedTo &&
currentL1BlockNumber <= blockBodiesSynchedTo
) {
// chain hasn't moved forward
// or it's been rolled back
this.log.debug(`Nothing to sync`, { currentL1BlockNumber, blocksSynchedTo, messagesSynchedTo });
Expand Down Expand Up @@ -220,23 +235,27 @@ export class Archiver implements ArchiveSource {
// Read all data from chain and then write to our stores at the end
const nextExpectedL2BlockNum = BigInt((await this.store.getSynchedL2BlockNumber()) + 1);

this.log.debug(`Retrieving block bodies from ${blockBodiesSynchedTo + 1n} to ${currentL1BlockNumber}`);
const retrievedBlockBodies = await retrieveBlockBodiesFromAvailabilityOracle(
this.publicClient,
this.availabilityOracleAddress,
blockUntilSynced,
blocksSynchedTo + 1n,
blockBodiesSynchedTo + 1n,
currentL1BlockNumber,
);

const blockBodies = retrievedBlockBodies.retrievedData.map(([blockBody]) => blockBody);
await this.store.addBlockBodies(blockBodies);
this.log.debug(
`Retrieved ${retrievedBlockBodies.retrievedData.length} block bodies up to L1 block ${retrievedBlockBodies.lastProcessedL1BlockNumber}`,
);
await this.store.addBlockBodies(retrievedBlockBodies);

// Now that we have block bodies we will retrieve block metadata and build L2 blocks from the bodies and
// the metadata
let retrievedBlocks: DataRetrieval<L2Block>;
{
// @todo @LHerskind Investigate how necessary that nextExpectedL2BlockNum really is.
// Also, I would expect it to break horribly if we have a reorg.
this.log.debug(`Retrieving block metadata from ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`);
const retrievedBlockMetadata = await retrieveBlockMetadataFromRollup(
this.publicClient,
this.rollupAddress,
Expand Down Expand Up @@ -278,17 +297,18 @@ export class Archiver implements ArchiveSource {
} and ${currentL1BlockNumber}.`,
);

// Set the `lastProcessedL1BlockNumber` to the smallest of the header and body retrieval
const min = (a: bigint, b: bigint) => (a < b ? a : b);
retrievedBlocks = {
lastProcessedL1BlockNumber: min(
retrievedBlockMetadata.lastProcessedL1BlockNumber,
retrievedBlockBodies.lastProcessedL1BlockNumber,
),
lastProcessedL1BlockNumber: retrievedBlockMetadata.lastProcessedL1BlockNumber,
retrievedData: blocks,
};
}

this.log.debug(
`Processing retrieved blocks ${retrievedBlocks.retrievedData
.map(b => b.number)
.join(',')} with last processed L1 block ${retrievedBlocks.lastProcessedL1BlockNumber}`,
);

await Promise.all(
retrievedBlocks.retrievedData.map(block => {
const noteEncryptedLogs = block.body.noteEncryptedLogs;
Expand Down
6 changes: 4 additions & 2 deletions yarn-project/archiver/src/archiver/archiver_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import { type DataRetrieval } from './data_retrieval.js';
* Represents the latest L1 block processed by the archiver for various objects in L2.
*/
export type ArchiverL1SynchPoint = {
/** Number of the last L1 block that added a new L2 block. */
/** Number of the last L1 block that added a new L2 block metadata. */
blocksSynchedTo: bigint;
/** Number of the last L1 block that added a new L2 block body. */
blockBodiesSynchedTo: bigint;
/** Number of the last L1 block that added L1 -> L2 messages from the Inbox. */
messagesSynchedTo: bigint;
};
Expand All @@ -53,7 +55,7 @@ export interface ArchiverDataStore {
* @param blockBodies - The L2 block bodies to be added to the store.
* @returns True if the operation is successful.
*/
addBlockBodies(blockBodies: Body[]): Promise<boolean>;
addBlockBodies(blockBodies: DataRetrieval<Body>): Promise<boolean>;

/**
* Gets block bodies that have the same txsEffectsHashes as we supply.
Expand Down
35 changes: 26 additions & 9 deletions yarn-project/archiver/src/archiver/archiver_store_test_suite.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InboxLeaf, L2Block, LogId, LogType, TxHash } from '@aztec/circuit-types';
import { type Body, InboxLeaf, L2Block, LogId, LogType, TxHash } from '@aztec/circuit-types';
import '@aztec/circuit-types/jest';
import { AztecAddress, Fr, INITIAL_L2_BLOCK_NUM, L1_TO_L2_MSG_SUBTREE_HEIGHT } from '@aztec/circuits.js';
import {
Expand All @@ -14,7 +14,7 @@ import {
SerializableContractInstance,
} from '@aztec/types/contracts';

import { type ArchiverDataStore } from './archiver_store.js';
import { type ArchiverDataStore, type ArchiverL1SynchPoint } from './archiver_store.js';
import { type DataRetrieval } from './data_retrieval.js';

/**
Expand All @@ -25,6 +25,7 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
describe(testName, () => {
let store: ArchiverDataStore;
let blocks: DataRetrieval<L2Block>;
let blockBodies: DataRetrieval<Body>;
const blockTests: [number, number, () => L2Block[]][] = [
[1, 1, () => blocks.retrievedData.slice(0, 1)],
[10, 1, () => blocks.retrievedData.slice(9, 10)],
Expand All @@ -39,11 +40,15 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
lastProcessedL1BlockNumber: 5n,
retrievedData: Array.from({ length: 10 }).map((_, i) => L2Block.random(i + 1)),
};
blockBodies = {
retrievedData: blocks.retrievedData.map(block => block.body),
lastProcessedL1BlockNumber: 4n,
};
});

describe('addBlocks', () => {
it('returns success when adding block bodies', async () => {
await expect(store.addBlockBodies(blocks.retrievedData.map(block => block.body))).resolves.toBe(true);
await expect(store.addBlockBodies(blockBodies)).resolves.toBe(true);
});

it('returns success when adding blocks', async () => {
Expand All @@ -59,7 +64,7 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
describe('getBlocks', () => {
beforeEach(async () => {
await store.addBlocks(blocks);
await store.addBlockBodies(blocks.retrievedData.map(block => block.body));
await store.addBlockBodies(blockBodies);
});

it.each(blockTests)('retrieves previously stored blocks', async (start, limit, getExpectedBlocks) => {
Expand Down Expand Up @@ -95,15 +100,26 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
await expect(store.getSynchPoint()).resolves.toEqual({
blocksSynchedTo: 0n,
messagesSynchedTo: 0n,
});
blockBodiesSynchedTo: 0n,
} satisfies ArchiverL1SynchPoint);
});

it('returns the L1 block number in which the most recent L2 block was published', async () => {
await store.addBlocks(blocks);
await expect(store.getSynchPoint()).resolves.toEqual({
blocksSynchedTo: blocks.lastProcessedL1BlockNumber,
messagesSynchedTo: 0n,
});
blockBodiesSynchedTo: 0n,
} satisfies ArchiverL1SynchPoint);
});

it('returns the L1 block number in which the most recent L2 block body was published', async () => {
await store.addBlockBodies(blockBodies);
await expect(store.getSynchPoint()).resolves.toEqual({
blocksSynchedTo: 0n,
messagesSynchedTo: 0n,
blockBodiesSynchedTo: blockBodies.lastProcessedL1BlockNumber,
} satisfies ArchiverL1SynchPoint);
});

it('returns the L1 block number that most recently added messages from inbox', async () => {
Expand All @@ -114,7 +130,8 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
await expect(store.getSynchPoint()).resolves.toEqual({
blocksSynchedTo: 0n,
messagesSynchedTo: 1n,
});
blockBodiesSynchedTo: 0n,
} satisfies ArchiverL1SynchPoint);
});
});

Expand Down Expand Up @@ -179,7 +196,7 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
),
);
await store.addBlocks(blocks);
await store.addBlockBodies(blocks.retrievedData.map(block => block.body));
await store.addBlockBodies(blockBodies);
});

it.each([
Expand Down Expand Up @@ -335,7 +352,7 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
};

await store.addBlocks(blocks);
await store.addBlockBodies(blocks.retrievedData.map(block => block.body));
await store.addBlockBodies(blockBodies);

await Promise.all(
blocks.retrievedData.map(block =>
Expand Down
20 changes: 14 additions & 6 deletions yarn-project/archiver/src/archiver/data_retrieval.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { type Body, type InboxLeaf } from '@aztec/circuit-types';
import { type AppendOnlyTreeSnapshot, Fr, type Header } from '@aztec/circuits.js';
import { type EthAddress } from '@aztec/foundation/eth-address';
import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log';
import { RollupAbi } from '@aztec/l1-artifacts';

import { type PublicClient, getAbiItem } from 'viem';
Expand Down Expand Up @@ -45,6 +46,7 @@ export async function retrieveBlockMetadataFromRollup(
searchStartBlock: bigint,
searchEndBlock: bigint,
expectedNextL2BlockNum: bigint,
logger: DebugLogger = createDebugLogger('aztec:archiver'),
): Promise<DataRetrieval<[Header, AppendOnlyTreeSnapshot]>> {
const retrievedBlockMetadata: [Header, AppendOnlyTreeSnapshot][] = [];
do {
Expand All @@ -61,13 +63,18 @@ export async function retrieveBlockMetadataFromRollup(
break;
}

const lastLog = l2BlockProcessedLogs[l2BlockProcessedLogs.length - 1];
logger.debug(
`Got L2 block processed logs for ${l2BlockProcessedLogs[0].blockNumber}-${lastLog.blockNumber} between ${searchStartBlock}-${searchEndBlock} L1 blocks`,
);

const newBlockMetadata = await processL2BlockProcessedLogs(
publicClient,
expectedNextL2BlockNum,
l2BlockProcessedLogs,
);
retrievedBlockMetadata.push(...newBlockMetadata);
searchStartBlock = l2BlockProcessedLogs[l2BlockProcessedLogs.length - 1].blockNumber! + 1n;
searchStartBlock = lastLog.blockNumber! + 1n;
expectedNextL2BlockNum += BigInt(newBlockMetadata.length);
} while (blockUntilSynced && searchStartBlock <= searchEndBlock);
return { lastProcessedL1BlockNumber: searchStartBlock - 1n, retrievedData: retrievedBlockMetadata };
Expand All @@ -80,16 +87,16 @@ export async function retrieveBlockMetadataFromRollup(
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
* @param searchStartBlock - The block number to use for starting the search.
* @param searchEndBlock - The highest block number that we should search up to.
* @returns A array of tuples of L2 block bodies and their associated hash as well as the next eth block to search from
* @returns A array of L2 block bodies as well as the next eth block to search from
*/
export async function retrieveBlockBodiesFromAvailabilityOracle(
publicClient: PublicClient,
availabilityOracleAddress: EthAddress,
blockUntilSynced: boolean,
searchStartBlock: bigint,
searchEndBlock: bigint,
): Promise<DataRetrieval<[Body, Buffer]>> {
const retrievedBlockBodies: [Body, Buffer][] = [];
): Promise<DataRetrieval<Body>> {
const retrievedBlockBodies: Body[] = [];

do {
if (searchStartBlock > searchEndBlock) {
Expand All @@ -106,9 +113,10 @@ export async function retrieveBlockBodiesFromAvailabilityOracle(
}

const newBlockBodies = await processTxsPublishedLogs(publicClient, l2TxsPublishedLogs);
retrievedBlockBodies.push(...newBlockBodies);
searchStartBlock = l2TxsPublishedLogs[l2TxsPublishedLogs.length - 1].blockNumber! + 1n;
retrievedBlockBodies.push(...newBlockBodies.map(([body]) => body));
searchStartBlock = l2TxsPublishedLogs[l2TxsPublishedLogs.length - 1].blockNumber + 1n;
} while (blockUntilSynced && searchStartBlock <= searchEndBlock);

return { lastProcessedL1BlockNumber: searchStartBlock - 1n, retrievedData: retrievedBlockBodies };
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ describe('Block Body Store', () => {
it('Should add and return block bodies', async () => {
const body = Body.random(1);

await archiverStore.addBlockBodies([body]);
await archiverStore.addBlockBodies({ retrievedData: [body], lastProcessedL1BlockNumber: 5n });

const txsEffectsHash = body.getTxsEffectsHash();

const [returnedBody] = await archiverStore.getBlockBodies([txsEffectsHash]);

expect(body).toStrictEqual(returnedBody);

const { blockBodiesSynchedTo } = await archiverStore.getSynchPoint();
expect(blockBodiesSynchedTo).toEqual(5n);
});
});
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
import { Body } from '@aztec/circuit-types';
import { createDebugLogger } from '@aztec/foundation/log';
import { type AztecKVStore, type AztecMap } from '@aztec/kv-store';
import { type AztecKVStore, type AztecMap, type AztecSingleton } from '@aztec/kv-store';

import { type DataRetrieval } from '../data_retrieval.js';

export class BlockBodyStore {
/** Map block body hash to block body */
#blockBodies: AztecMap<string, Buffer>;

/** Stores L1 block number in which the last processed L2 block body was included */
#lastSynchedL1Block: AztecSingleton<bigint>;

constructor(private db: AztecKVStore, private log = createDebugLogger('aztec:archiver:block_body_store')) {
this.#blockBodies = db.openMap('archiver_block_bodies');
this.#lastSynchedL1Block = db.openSingleton('archiver_block_bodies_last_synched_l1_block');
}

/**
* Append new block bodies to the store's map.
* @param blockBodies - The L2 block bodies to be added to the store.
* @returns True if the operation is successful.
*/
addBlockBodies(blockBodies: Body[]): Promise<boolean> {
addBlockBodies(blockBodies: DataRetrieval<Body>): Promise<boolean> {
return this.db.transaction(() => {
for (const body of blockBodies) {
for (const body of blockBodies.retrievedData) {
void this.#blockBodies.set(body.getTxsEffectsHash().toString('hex'), body.toBuffer());
}

void this.#lastSynchedL1Block.set(blockBodies.lastProcessedL1BlockNumber);
return true;
});
}
Expand Down Expand Up @@ -57,4 +63,12 @@ export class BlockBodyStore {

return blockBody && Body.fromBuffer(blockBody);
}

/**
* Gets the last L1 block number in which a L2 block body was included
* @returns The L1 block number
*/
getSynchedL1BlockNumber(): bigint {
return this.#lastSynchedL1Block.get() ?? 0n;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ export class KVArchiverDataStore implements ArchiverDataStore {
* @param blockBodies - The L2 block bodies to be added to the store.
* @returns True if the operation is successful.
*/
addBlockBodies(blockBodies: Body[]): Promise<boolean> {
addBlockBodies(blockBodies: DataRetrieval<Body>): Promise<boolean> {
return this.#blockBodyStore.addBlockBodies(blockBodies);
}

Expand Down Expand Up @@ -260,6 +260,7 @@ export class KVArchiverDataStore implements ArchiverDataStore {
getSynchPoint(): Promise<ArchiverL1SynchPoint> {
return Promise.resolve({
blocksSynchedTo: this.#blockStore.getSynchedL1BlockNumber(),
blockBodiesSynchedTo: this.#blockBodyStore.getSynchedL1BlockNumber(),
messagesSynchedTo: this.#messageStore.getSynchedL1BlockNumber(),
});
}
Expand Down
Loading
Loading