Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
Relayer added state module (paritytech#65)
Browse files Browse the repository at this point in the history
* relayer: save last processed block in json

* relayer: moved fs stuff to separate module

* relayer: converted state module to a class, add it as dependency to Target and Source

relayer: make linter happy

Update relayer/src/state.ts

Co-authored-by: Nazar Mokrynskyi <nazar@mokrynskyi.com>

relayer: added ChainName type import
  • Loading branch information
isSerge authored Oct 11, 2021
1 parent f318e72 commit 96ed4f2
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 28 deletions.
5 changes: 4 additions & 1 deletion relayer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import Target from "./target";
import logger from "./logger";
import { createParachainsMap } from './utils';
import { ChainName } from './types';
import State from './state';

const config = new Config({
accountSeed: process.env.ACCOUNT_SEED,
Expand All @@ -27,9 +28,10 @@ const createApi = async (url: string) => {
// TODO: remove IIFE when Eslint is updated to v8.0.0 (will support top-level await)
(async () => {
try {
const state = new State({ folder: "./state" });
const targetApi = await createApi(config.targetChainUrl);

const target = new Target({ api: targetApi, logger });
const target = new Target({ api: targetApi, logger, state });

const sources = await Promise.all(
config.sourceChains.map(async ({ url, parachains }) => {
Expand Down Expand Up @@ -57,6 +59,7 @@ const createApi = async (url: string) => {
logger,
feedId,
signer: sourceSigner,
state,
});
})
);
Expand Down
23 changes: 12 additions & 11 deletions relayer/src/source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { Logger } from "pino";
import { ParaHeadAndId, TxData, ChainName } from "./types";
import { getParaHeadAndIdFromEvent, isRelevantRecord } from './utils';
import Parachain from "./parachain";
import State from './state';

interface SourceConstructorParams {
api: ApiPromise;
Expand All @@ -19,6 +20,7 @@ interface SourceConstructorParams {
parachainsMap: Map<string, Parachain>;
logger: Logger;
signer: AddressOrPair;
state: State;
}

interface TxDataInput {
Expand All @@ -36,6 +38,7 @@ class Source {
private readonly feedId: U64;
private readonly parachainsMap: Map<string, Parachain>;
private readonly logger: Logger;
private readonly state: State;
public readonly signer: AddressOrPair;

constructor(params: SourceConstructorParams) {
Expand All @@ -45,6 +48,7 @@ class Source {
this.parachainsMap = params.parachainsMap;
this.logger = params.logger;
this.signer = params.signer;
this.state = params.state;
this.getBlocksByHash = this.getBlocksByHash.bind(this);
this.getParablocks = this.getParablocks.bind(this);
}
Expand Down Expand Up @@ -120,27 +124,24 @@ class Source {
}

private addBlockTxData({ block, number, hash, feedId, chain, signer }: TxDataInput): TxData {
const metadata = {
hash,
number,
};


return {
feedId,
block,
metadata,
chain,
signer,
metadata: {
hash,
number,
},
};
}

private getBlocksByHash(hash: Hash): Observable<TxData> {
const relayBlock = this.getBlock(hash);
const parablocks = relayBlock.pipe(concatMap(this.getParablocks));

const relayBlockWithMetadata = relayBlock.pipe(
map(({ block }) => {
const relayBlockWithMetadata = relayBlock
.pipe(map(({ block }) => {
const blockStr = block.toString();
const number = block.header.number.toBn();
return this.addBlockTxData({
Expand All @@ -151,8 +152,8 @@ class Source {
chain: this.chain,
signer: this.signer
});
})
);
}))
.pipe(tap(({ metadata }) => this.state.saveLastProcessedBlock(this.chain, metadata.number)));

// TODO: check relay block and parablocks size
// const size = Buffer.byteLength(block.toString());
Expand Down
42 changes: 42 additions & 0 deletions relayer/src/state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import * as fsp from "fs/promises";
import { BN } from '@polkadot/util';

import { ChainName } from './types';

// TODO: consider providing fs methods to constructor
class State {
lastBlockPath: string;
feedsPath: string;

constructor({ folder }: { folder: string; }) {
this.lastBlockPath = `${folder}/last_processed_block.json`;
this.feedsPath = `${folder}/feeds.json`;
}

async saveLastProcessedBlock(chain: ChainName, number: BN): Promise<void> {
const file = await fsp.readFile(this.lastBlockPath, 'utf8');
const lastProcessedBlockRecord = JSON.parse(file);

lastProcessedBlockRecord[chain] = number;

await fsp.writeFile(this.lastBlockPath, JSON.stringify(lastProcessedBlockRecord, null, 4));
}

async getFeedIdByAddress(address: string): Promise<string> {
const file = await fsp.readFile(this.feedsPath, 'utf8');
const feeds = JSON.parse(file);

return feeds[address];
}

async saveFeedId(address: string, feedId: BN): Promise<void> {
const file = await fsp.readFile(this.feedsPath, 'utf8');
const feeds = JSON.parse(file);

feeds[address] = feedId.toBn();

await fsp.writeFile(this.feedsPath, JSON.stringify(feeds, null, 4));
}
}

export default State;
31 changes: 15 additions & 16 deletions relayer/src/target.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import * as fs from 'fs';
import { ApiPromise } from "@polkadot/api";
import { Subscription, EMPTY, catchError } from "rxjs";
import { concatMap, takeWhile } from "rxjs/operators";
Expand All @@ -8,7 +7,9 @@ import { KeyringPair } from "@polkadot/keyring/types";
import { ISubmittableResult, Observable } from "@polkadot/types/types";
import { EventRecord } from "@polkadot/types/interfaces";
import { U64 } from "@polkadot/types/primitive";

import { TxData } from "./types";
import State from './state';

// TODO: remove hardcoded url
const polkadotAppsUrl =
Expand All @@ -17,15 +18,18 @@ const polkadotAppsUrl =
interface TargetConstructorParams {
api: ApiPromise;
logger: Logger;
state: State;
}

class Target {
private readonly api: ApiPromise;
private readonly logger: Logger;
private readonly state: State;

constructor({ api, logger }: TargetConstructorParams) {
constructor({ api, logger, state }: TargetConstructorParams) {
this.api = api;
this.logger = logger;
this.state = state;
this.sendBlockTx = this.sendBlockTx.bind(this);
this.logTxResult = this.logTxResult.bind(this);
}
Expand Down Expand Up @@ -132,33 +136,28 @@ class Target {
const { address } = (signer as KeyringPair);
this.logger.info(`Checking feed for ${address}`);

const filePath = './state/feeds.json';

const file = await fs.promises.readFile(filePath, 'utf8');
const feeds = JSON.parse(file);
const feedId = await this.state.getFeedIdByAddress(address);

if (feeds[address]) {
if (feedId) {
// query chain state to check if there is an entry for this feedId
const { isEmpty } = await this.api.query.feeds.feeds(feeds[address]);
const { isEmpty } = await this.api.query.feeds.feeds(feedId);

// if it's not empty that means feedId exists both locally (in the feeds.json) and on the chain and we can re-use it
if (!isEmpty) {
const feedId = this.api.createType("U64", feeds[address]);
this.logger.info(`Feed already exists: ${feedId}`);
return feedId;
const feedIdAsU64 = this.api.createType("U64", feedId);
this.logger.info(`Feed already exists: ${feedIdAsU64}`);
return feedIdAsU64;
}

// if feedId only exists locally, but not on the chain - we have to create a new one
this.logger.error('Feed does not exist on the chain');
}

const feedId = await this.sendCreateFeedTx(signer);

feeds[address] = feedId.toBn();
const newFeedId = await this.sendCreateFeedTx(signer);

await fs.promises.writeFile(filePath, JSON.stringify(feeds, null, 4));
await this.state.saveFeedId(address, newFeedId);

return feedId;
return newFeedId;
}

processSubscriptions(subscription: Observable<TxData>): Observable<Subscription> {
Expand Down

0 comments on commit 96ed4f2

Please sign in to comment.