Skip to content

Commit

Permalink
Merge 8b7828e into 014b5f0
Browse files Browse the repository at this point in the history
  • Loading branch information
spalladino authored Aug 13, 2024
2 parents 014b5f0 + 8b7828e commit 094dd8f
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 41 deletions.
44 changes: 32 additions & 12 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,21 @@ export class Archiver implements ArchiveSource {
await this.sync(blockUntilSynced);
}

this.runningPromise = new RunningPromise(() => this.sync(false), this.pollingIntervalMs);
this.runningPromise = new RunningPromise(() => this.safeSync(), this.pollingIntervalMs);
this.runningPromise.start();
}

/**
* Syncs and catches exceptions.
*/
private async safeSync() {
try {
await this.sync(false);
} catch (error) {
this.log.error('Error syncing archiver', error);
}
}

/**
* Fetches logs from L1 contracts and processes them.
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
Expand All @@ -166,10 +177,14 @@ export class Archiver implements ArchiveSource {
*
* This code does not handle reorgs.
*/
const { blocksSynchedTo, messagesSynchedTo } = await this.store.getSynchPoint();
const { blockBodiesSynchedTo, blocksSynchedTo, messagesSynchedTo } = await this.store.getSynchPoint();
const currentL1BlockNumber = await this.publicClient.getBlockNumber();

if (currentL1BlockNumber <= blocksSynchedTo && currentL1BlockNumber <= messagesSynchedTo) {
if (
currentL1BlockNumber <= blocksSynchedTo &&
currentL1BlockNumber <= messagesSynchedTo &&
currentL1BlockNumber <= blockBodiesSynchedTo
) {
// chain hasn't moved forward
// or it's been rolled back
this.log.debug(`Nothing to sync`, { currentL1BlockNumber, blocksSynchedTo, messagesSynchedTo });
Expand Down Expand Up @@ -220,23 +235,27 @@ export class Archiver implements ArchiveSource {
// Read all data from chain and then write to our stores at the end
const nextExpectedL2BlockNum = BigInt((await this.store.getSynchedL2BlockNumber()) + 1);

this.log.debug(`Retrieving block bodies from ${blockBodiesSynchedTo + 1n} to ${currentL1BlockNumber}`);
const retrievedBlockBodies = await retrieveBlockBodiesFromAvailabilityOracle(
this.publicClient,
this.availabilityOracleAddress,
blockUntilSynced,
blocksSynchedTo + 1n,
blockBodiesSynchedTo + 1n,
currentL1BlockNumber,
);

const blockBodies = retrievedBlockBodies.retrievedData.map(([blockBody]) => blockBody);
await this.store.addBlockBodies(blockBodies);
this.log.debug(
`Retrieved ${retrievedBlockBodies.retrievedData.length} block bodies up to L1 block ${retrievedBlockBodies.lastProcessedL1BlockNumber}`,
);
await this.store.addBlockBodies(retrievedBlockBodies);

// Now that we have block bodies we will retrieve block metadata and build L2 blocks from the bodies and
// the metadata
let retrievedBlocks: DataRetrieval<L2Block>;
{
// @todo @LHerskind Investigate how necessary that nextExpectedL2BlockNum really is.
// Also, I would expect it to break horribly if we have a reorg.
this.log.debug(`Retrieving block metadata from ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`);
const retrievedBlockMetadata = await retrieveBlockMetadataFromRollup(
this.publicClient,
this.rollupAddress,
Expand Down Expand Up @@ -278,17 +297,18 @@ export class Archiver implements ArchiveSource {
} and ${currentL1BlockNumber}.`,
);

// Set the `lastProcessedL1BlockNumber` to the smallest of the header and body retrieval
const min = (a: bigint, b: bigint) => (a < b ? a : b);
retrievedBlocks = {
lastProcessedL1BlockNumber: min(
retrievedBlockMetadata.lastProcessedL1BlockNumber,
retrievedBlockBodies.lastProcessedL1BlockNumber,
),
lastProcessedL1BlockNumber: retrievedBlockMetadata.lastProcessedL1BlockNumber,
retrievedData: blocks,
};
}

this.log.debug(
`Processing retrieved blocks ${retrievedBlocks.retrievedData
.map(b => b.number)
.join(',')} with last processed L1 block ${retrievedBlocks.lastProcessedL1BlockNumber}`,
);

await Promise.all(
retrievedBlocks.retrievedData.map(block => {
const noteEncryptedLogs = block.body.noteEncryptedLogs;
Expand Down
6 changes: 4 additions & 2 deletions yarn-project/archiver/src/archiver/archiver_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import { type DataRetrieval } from './data_retrieval.js';
* Represents the latest L1 block processed by the archiver for various objects in L2.
*/
export type ArchiverL1SynchPoint = {
/** Number of the last L1 block that added a new L2 block. */
/** Number of the last L1 block that added a new L2 block metadata. */
blocksSynchedTo: bigint;
/** Number of the last L1 block that added a new L2 block body. */
blockBodiesSynchedTo: bigint;
/** Number of the last L1 block that added L1 -> L2 messages from the Inbox. */
messagesSynchedTo: bigint;
};
Expand All @@ -53,7 +55,7 @@ export interface ArchiverDataStore {
* @param blockBodies - The L2 block bodies to be added to the store.
* @returns True if the operation is successful.
*/
addBlockBodies(blockBodies: Body[]): Promise<boolean>;
addBlockBodies(blockBodies: DataRetrieval<Body>): Promise<boolean>;

/**
* Gets block bodies that have the same txsEffectsHashes as we supply.
Expand Down
35 changes: 26 additions & 9 deletions yarn-project/archiver/src/archiver/archiver_store_test_suite.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InboxLeaf, L2Block, LogId, LogType, TxHash } from '@aztec/circuit-types';
import { type Body, InboxLeaf, L2Block, LogId, LogType, TxHash } from '@aztec/circuit-types';
import '@aztec/circuit-types/jest';
import { AztecAddress, Fr, INITIAL_L2_BLOCK_NUM, L1_TO_L2_MSG_SUBTREE_HEIGHT } from '@aztec/circuits.js';
import {
Expand All @@ -14,7 +14,7 @@ import {
SerializableContractInstance,
} from '@aztec/types/contracts';

import { type ArchiverDataStore } from './archiver_store.js';
import { type ArchiverDataStore, type ArchiverL1SynchPoint } from './archiver_store.js';
import { type DataRetrieval } from './data_retrieval.js';

/**
Expand All @@ -25,6 +25,7 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
describe(testName, () => {
let store: ArchiverDataStore;
let blocks: DataRetrieval<L2Block>;
let blockBodies: DataRetrieval<Body>;
const blockTests: [number, number, () => L2Block[]][] = [
[1, 1, () => blocks.retrievedData.slice(0, 1)],
[10, 1, () => blocks.retrievedData.slice(9, 10)],
Expand All @@ -39,11 +40,15 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
lastProcessedL1BlockNumber: 5n,
retrievedData: Array.from({ length: 10 }).map((_, i) => L2Block.random(i + 1)),
};
blockBodies = {
retrievedData: blocks.retrievedData.map(block => block.body),
lastProcessedL1BlockNumber: 4n,
};
});

describe('addBlocks', () => {
it('returns success when adding block bodies', async () => {
await expect(store.addBlockBodies(blocks.retrievedData.map(block => block.body))).resolves.toBe(true);
await expect(store.addBlockBodies(blockBodies)).resolves.toBe(true);
});

it('returns success when adding blocks', async () => {
Expand All @@ -59,7 +64,7 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
describe('getBlocks', () => {
beforeEach(async () => {
await store.addBlocks(blocks);
await store.addBlockBodies(blocks.retrievedData.map(block => block.body));
await store.addBlockBodies(blockBodies);
});

it.each(blockTests)('retrieves previously stored blocks', async (start, limit, getExpectedBlocks) => {
Expand Down Expand Up @@ -95,15 +100,26 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
await expect(store.getSynchPoint()).resolves.toEqual({
blocksSynchedTo: 0n,
messagesSynchedTo: 0n,
});
blockBodiesSynchedTo: 0n,
} satisfies ArchiverL1SynchPoint);
});

it('returns the L1 block number in which the most recent L2 block was published', async () => {
await store.addBlocks(blocks);
await expect(store.getSynchPoint()).resolves.toEqual({
blocksSynchedTo: blocks.lastProcessedL1BlockNumber,
messagesSynchedTo: 0n,
});
blockBodiesSynchedTo: 0n,
} satisfies ArchiverL1SynchPoint);
});

it('returns the L1 block number in which the most recent L2 block body was published', async () => {
await store.addBlockBodies(blockBodies);
await expect(store.getSynchPoint()).resolves.toEqual({
blocksSynchedTo: 0n,
messagesSynchedTo: 0n,
blockBodiesSynchedTo: blockBodies.lastProcessedL1BlockNumber,
} satisfies ArchiverL1SynchPoint);
});

it('returns the L1 block number that most recently added messages from inbox', async () => {
Expand All @@ -114,7 +130,8 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
await expect(store.getSynchPoint()).resolves.toEqual({
blocksSynchedTo: 0n,
messagesSynchedTo: 1n,
});
blockBodiesSynchedTo: 0n,
} satisfies ArchiverL1SynchPoint);
});
});

Expand Down Expand Up @@ -179,7 +196,7 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
),
);
await store.addBlocks(blocks);
await store.addBlockBodies(blocks.retrievedData.map(block => block.body));
await store.addBlockBodies(blockBodies);
});

it.each([
Expand Down Expand Up @@ -335,7 +352,7 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
};

await store.addBlocks(blocks);
await store.addBlockBodies(blocks.retrievedData.map(block => block.body));
await store.addBlockBodies(blockBodies);

await Promise.all(
blocks.retrievedData.map(block =>
Expand Down
20 changes: 14 additions & 6 deletions yarn-project/archiver/src/archiver/data_retrieval.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { type Body, type InboxLeaf } from '@aztec/circuit-types';
import { type AppendOnlyTreeSnapshot, Fr, type Header } from '@aztec/circuits.js';
import { type EthAddress } from '@aztec/foundation/eth-address';
import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log';
import { RollupAbi } from '@aztec/l1-artifacts';

import { type PublicClient, getAbiItem } from 'viem';
Expand Down Expand Up @@ -45,6 +46,7 @@ export async function retrieveBlockMetadataFromRollup(
searchStartBlock: bigint,
searchEndBlock: bigint,
expectedNextL2BlockNum: bigint,
logger: DebugLogger = createDebugLogger('aztec:archiver'),
): Promise<DataRetrieval<[Header, AppendOnlyTreeSnapshot]>> {
const retrievedBlockMetadata: [Header, AppendOnlyTreeSnapshot][] = [];
do {
Expand All @@ -61,13 +63,18 @@ export async function retrieveBlockMetadataFromRollup(
break;
}

const lastLog = l2BlockProcessedLogs[l2BlockProcessedLogs.length - 1];
logger.debug(
`Got L2 block processed logs for ${l2BlockProcessedLogs[0].blockNumber}-${lastLog.blockNumber} between ${searchStartBlock}-${searchEndBlock} L1 blocks`,
);

const newBlockMetadata = await processL2BlockProcessedLogs(
publicClient,
expectedNextL2BlockNum,
l2BlockProcessedLogs,
);
retrievedBlockMetadata.push(...newBlockMetadata);
searchStartBlock = l2BlockProcessedLogs[l2BlockProcessedLogs.length - 1].blockNumber! + 1n;
searchStartBlock = lastLog.blockNumber! + 1n;
expectedNextL2BlockNum += BigInt(newBlockMetadata.length);
} while (blockUntilSynced && searchStartBlock <= searchEndBlock);
return { lastProcessedL1BlockNumber: searchStartBlock - 1n, retrievedData: retrievedBlockMetadata };
Expand All @@ -80,16 +87,16 @@ export async function retrieveBlockMetadataFromRollup(
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
* @param searchStartBlock - The block number to use for starting the search.
* @param searchEndBlock - The highest block number that we should search up to.
* @returns A array of tuples of L2 block bodies and their associated hash as well as the next eth block to search from
* @returns A array of L2 block bodies as well as the next eth block to search from
*/
export async function retrieveBlockBodiesFromAvailabilityOracle(
publicClient: PublicClient,
availabilityOracleAddress: EthAddress,
blockUntilSynced: boolean,
searchStartBlock: bigint,
searchEndBlock: bigint,
): Promise<DataRetrieval<[Body, Buffer]>> {
const retrievedBlockBodies: [Body, Buffer][] = [];
): Promise<DataRetrieval<Body>> {
const retrievedBlockBodies: Body[] = [];

do {
if (searchStartBlock > searchEndBlock) {
Expand All @@ -106,9 +113,10 @@ export async function retrieveBlockBodiesFromAvailabilityOracle(
}

const newBlockBodies = await processTxsPublishedLogs(publicClient, l2TxsPublishedLogs);
retrievedBlockBodies.push(...newBlockBodies);
searchStartBlock = l2TxsPublishedLogs[l2TxsPublishedLogs.length - 1].blockNumber! + 1n;
retrievedBlockBodies.push(...newBlockBodies.map(([body]) => body));
searchStartBlock = l2TxsPublishedLogs[l2TxsPublishedLogs.length - 1].blockNumber + 1n;
} while (blockUntilSynced && searchStartBlock <= searchEndBlock);

return { lastProcessedL1BlockNumber: searchStartBlock - 1n, retrievedData: retrievedBlockBodies };
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ describe('Block Body Store', () => {
it('Should add and return block bodies', async () => {
const body = Body.random(1);

await archiverStore.addBlockBodies([body]);
await archiverStore.addBlockBodies({ retrievedData: [body], lastProcessedL1BlockNumber: 5n });

const txsEffectsHash = body.getTxsEffectsHash();

const [returnedBody] = await archiverStore.getBlockBodies([txsEffectsHash]);

expect(body).toStrictEqual(returnedBody);

const { blockBodiesSynchedTo } = await archiverStore.getSynchPoint();
expect(blockBodiesSynchedTo).toEqual(5n);
});
});
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
import { Body } from '@aztec/circuit-types';
import { createDebugLogger } from '@aztec/foundation/log';
import { type AztecKVStore, type AztecMap } from '@aztec/kv-store';
import { type AztecKVStore, type AztecMap, type AztecSingleton } from '@aztec/kv-store';

import { type DataRetrieval } from '../data_retrieval.js';

export class BlockBodyStore {
/** Map block body hash to block body */
#blockBodies: AztecMap<string, Buffer>;

/** Stores L1 block number in which the last processed L2 block body was included */
#lastSynchedL1Block: AztecSingleton<bigint>;

constructor(private db: AztecKVStore, private log = createDebugLogger('aztec:archiver:block_body_store')) {
this.#blockBodies = db.openMap('archiver_block_bodies');
this.#lastSynchedL1Block = db.openSingleton('archiver_block_bodies_last_synched_l1_block');
}

/**
* Append new block bodies to the store's map.
* @param blockBodies - The L2 block bodies to be added to the store.
* @returns True if the operation is successful.
*/
addBlockBodies(blockBodies: Body[]): Promise<boolean> {
addBlockBodies(blockBodies: DataRetrieval<Body>): Promise<boolean> {
return this.db.transaction(() => {
for (const body of blockBodies) {
for (const body of blockBodies.retrievedData) {
void this.#blockBodies.set(body.getTxsEffectsHash().toString('hex'), body.toBuffer());
}

void this.#lastSynchedL1Block.set(blockBodies.lastProcessedL1BlockNumber);
return true;
});
}
Expand Down Expand Up @@ -57,4 +63,12 @@ export class BlockBodyStore {

return blockBody && Body.fromBuffer(blockBody);
}

/**
* Gets the last L1 block number in which a L2 block body was included
* @returns The L1 block number
*/
getSynchedL1BlockNumber(): bigint {
return this.#lastSynchedL1Block.get() ?? 0n;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ export class KVArchiverDataStore implements ArchiverDataStore {
* @param blockBodies - The L2 block bodies to be added to the store.
* @returns True if the operation is successful.
*/
addBlockBodies(blockBodies: Body[]): Promise<boolean> {
addBlockBodies(blockBodies: DataRetrieval<Body>): Promise<boolean> {
return this.#blockBodyStore.addBlockBodies(blockBodies);
}

Expand Down Expand Up @@ -260,6 +260,7 @@ export class KVArchiverDataStore implements ArchiverDataStore {
getSynchPoint(): Promise<ArchiverL1SynchPoint> {
return Promise.resolve({
blocksSynchedTo: this.#blockStore.getSynchedL1BlockNumber(),
blockBodiesSynchedTo: this.#blockBodyStore.getSynchedL1BlockNumber(),
messagesSynchedTo: this.#messageStore.getSynchedL1BlockNumber(),
});
}
Expand Down
Loading

0 comments on commit 094dd8f

Please sign in to comment.