From 96ed4f29aca3f67d53fa63eafe5aa097fdeaa5ba Mon Sep 17 00:00:00 2001 From: serge Date: Mon, 11 Oct 2021 13:30:16 +0000 Subject: [PATCH] Relayer added state module (#65) * relayer: save last processed block in json * relayer: moved fs stuff to separate module * relayer: converted state module to a class, add it as dependency to Target and Source relayer: make linter happy Update relayer/src/state.ts Co-authored-by: Nazar Mokrynskyi relayer: added ChainName type import --- relayer/src/index.ts | 5 ++++- relayer/src/source.ts | 23 ++++++++++++----------- relayer/src/state.ts | 42 ++++++++++++++++++++++++++++++++++++++++++ relayer/src/target.ts | 31 +++++++++++++++---------------- 4 files changed, 73 insertions(+), 28 deletions(-) create mode 100644 relayer/src/state.ts diff --git a/relayer/src/index.ts b/relayer/src/index.ts index c6854fef3b6a6..f6c59757efcb4 100644 --- a/relayer/src/index.ts +++ b/relayer/src/index.ts @@ -8,6 +8,7 @@ import Target from "./target"; import logger from "./logger"; import { createParachainsMap } from './utils'; import { ChainName } from './types'; +import State from './state'; const config = new Config({ accountSeed: process.env.ACCOUNT_SEED, @@ -27,9 +28,10 @@ const createApi = async (url: string) => { // TODO: remove IIFE when Eslint is updated to v8.0.0 (will support top-level await) (async () => { try { + const state = new State({ folder: "./state" }); const targetApi = await createApi(config.targetChainUrl); - const target = new Target({ api: targetApi, logger }); + const target = new Target({ api: targetApi, logger, state }); const sources = await Promise.all( config.sourceChains.map(async ({ url, parachains }) => { @@ -57,6 +59,7 @@ const createApi = async (url: string) => { logger, feedId, signer: sourceSigner, + state, }); }) ); diff --git a/relayer/src/source.ts b/relayer/src/source.ts index fa7ca6b686a7b..4d79d7742c9f4 100644 --- a/relayer/src/source.ts +++ b/relayer/src/source.ts @@ -11,6 +11,7 @@ import { Logger } from "pino"; import { ParaHeadAndId, TxData, ChainName } from "./types"; import { getParaHeadAndIdFromEvent, isRelevantRecord } from './utils'; import Parachain from "./parachain"; +import State from './state'; interface SourceConstructorParams { api: ApiPromise; @@ -19,6 +20,7 @@ interface SourceConstructorParams { parachainsMap: Map; logger: Logger; signer: AddressOrPair; + state: State; } interface TxDataInput { @@ -36,6 +38,7 @@ class Source { private readonly feedId: U64; private readonly parachainsMap: Map; private readonly logger: Logger; + private readonly state: State; public readonly signer: AddressOrPair; constructor(params: SourceConstructorParams) { @@ -45,6 +48,7 @@ class Source { this.parachainsMap = params.parachainsMap; this.logger = params.logger; this.signer = params.signer; + this.state = params.state; this.getBlocksByHash = this.getBlocksByHash.bind(this); this.getParablocks = this.getParablocks.bind(this); } @@ -120,18 +124,15 @@ class Source { } private addBlockTxData({ block, number, hash, feedId, chain, signer }: TxDataInput): TxData { - const metadata = { - hash, - number, - }; - - return { feedId, block, - metadata, chain, signer, + metadata: { + hash, + number, + }, }; } @@ -139,8 +140,8 @@ class Source { const relayBlock = this.getBlock(hash); const parablocks = relayBlock.pipe(concatMap(this.getParablocks)); - const relayBlockWithMetadata = relayBlock.pipe( - map(({ block }) => { + const relayBlockWithMetadata = relayBlock + .pipe(map(({ block }) => { const blockStr = block.toString(); const number = block.header.number.toBn(); return this.addBlockTxData({ @@ -151,8 +152,8 @@ class Source { chain: this.chain, signer: this.signer }); - }) - ); + })) + .pipe(tap(({ metadata }) => this.state.saveLastProcessedBlock(this.chain, metadata.number))); // TODO: check relay block and parablocks size // const size = Buffer.byteLength(block.toString()); diff --git a/relayer/src/state.ts b/relayer/src/state.ts new file mode 100644 index 0000000000000..87df0b3641b42 --- /dev/null +++ b/relayer/src/state.ts @@ -0,0 +1,42 @@ +import * as fsp from "fs/promises"; +import { BN } from '@polkadot/util'; + +import { ChainName } from './types'; + +// TODO: consider providing fs methods to constructor +class State { + lastBlockPath: string; + feedsPath: string; + + constructor({ folder }: { folder: string; }) { + this.lastBlockPath = `${folder}/last_processed_block.json`; + this.feedsPath = `${folder}/feeds.json`; + } + + async saveLastProcessedBlock(chain: ChainName, number: BN): Promise { + const file = await fsp.readFile(this.lastBlockPath, 'utf8'); + const lastProcessedBlockRecord = JSON.parse(file); + + lastProcessedBlockRecord[chain] = number; + + await fsp.writeFile(this.lastBlockPath, JSON.stringify(lastProcessedBlockRecord, null, 4)); + } + + async getFeedIdByAddress(address: string): Promise { + const file = await fsp.readFile(this.feedsPath, 'utf8'); + const feeds = JSON.parse(file); + + return feeds[address]; + } + + async saveFeedId(address: string, feedId: BN): Promise { + const file = await fsp.readFile(this.feedsPath, 'utf8'); + const feeds = JSON.parse(file); + + feeds[address] = feedId.toBn(); + + await fsp.writeFile(this.feedsPath, JSON.stringify(feeds, null, 4)); + } +} + +export default State; diff --git a/relayer/src/target.ts b/relayer/src/target.ts index a6c77ab53ed49..a599c1e057e44 100644 --- a/relayer/src/target.ts +++ b/relayer/src/target.ts @@ -1,4 +1,3 @@ -import * as fs from 'fs'; import { ApiPromise } from "@polkadot/api"; import { Subscription, EMPTY, catchError } from "rxjs"; import { concatMap, takeWhile } from "rxjs/operators"; @@ -8,7 +7,9 @@ import { KeyringPair } from "@polkadot/keyring/types"; import { ISubmittableResult, Observable } from "@polkadot/types/types"; import { EventRecord } from "@polkadot/types/interfaces"; import { U64 } from "@polkadot/types/primitive"; + import { TxData } from "./types"; +import State from './state'; // TODO: remove hardcoded url const polkadotAppsUrl = @@ -17,15 +18,18 @@ const polkadotAppsUrl = interface TargetConstructorParams { api: ApiPromise; logger: Logger; + state: State; } class Target { private readonly api: ApiPromise; private readonly logger: Logger; + private readonly state: State; - constructor({ api, logger }: TargetConstructorParams) { + constructor({ api, logger, state }: TargetConstructorParams) { this.api = api; this.logger = logger; + this.state = state; this.sendBlockTx = this.sendBlockTx.bind(this); this.logTxResult = this.logTxResult.bind(this); } @@ -132,33 +136,28 @@ class Target { const { address } = (signer as KeyringPair); this.logger.info(`Checking feed for ${address}`); - const filePath = './state/feeds.json'; - - const file = await fs.promises.readFile(filePath, 'utf8'); - const feeds = JSON.parse(file); + const feedId = await this.state.getFeedIdByAddress(address); - if (feeds[address]) { + if (feedId) { // query chain state to check if there is an entry for this feedId - const { isEmpty } = await this.api.query.feeds.feeds(feeds[address]); + const { isEmpty } = await this.api.query.feeds.feeds(feedId); // if it's not empty that means feedId exists both locally (in the feeds.json) and on the chain and we can re-use it if (!isEmpty) { - const feedId = this.api.createType("U64", feeds[address]); - this.logger.info(`Feed already exists: ${feedId}`); - return feedId; + const feedIdAsU64 = this.api.createType("U64", feedId); + this.logger.info(`Feed already exists: ${feedIdAsU64}`); + return feedIdAsU64; } // if feedId only exists locally, but not on the chain - we have to create a new one this.logger.error('Feed does not exist on the chain'); } - const feedId = await this.sendCreateFeedTx(signer); - - feeds[address] = feedId.toBn(); + const newFeedId = await this.sendCreateFeedTx(signer); - await fs.promises.writeFile(filePath, JSON.stringify(feeds, null, 4)); + await this.state.saveFeedId(address, newFeedId); - return feedId; + return newFeedId; } processSubscriptions(subscription: Observable): Observable {