Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: storage override #1425

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions client/cli/src/frontier_db_cmd/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use substrate_test_runtime_client::{
TestClientBuilder,
};
// Frontier
use fp_storage::{EthereumStorageSchema, ETHEREUM_CURRENT_TRANSACTION_STATUS, PALLET_ETHEREUM};
use fp_storage::{constants::*, EthereumStorageSchema};
use frontier_template_runtime::RuntimeApi;

use crate::frontier_db_cmd::{Column, FrontierDbCmd, Operation};
Expand Down Expand Up @@ -545,7 +545,7 @@ fn commitment_create() {
let statuses = vec![t1];

// Build a block and fill the pallet-ethereum status.
let key = storage_prefix_build(PALLET_ETHEREUM, ETHEREUM_CURRENT_TRANSACTION_STATUS);
let key = storage_prefix_build(PALLET_ETHEREUM, ETHEREUM_CURRENT_TRANSACTION_STATUSES);
let chain = client.chain_info();
let mut builder = BlockBuilderBuilder::new(&*client)
.on_parent_block(chain.best_hash)
Expand Down Expand Up @@ -628,7 +628,7 @@ fn commitment_update() {
let statuses_a1 = vec![t1.clone()];
let statuses_a2 = vec![t1, t2];

let key = storage_prefix_build(PALLET_ETHEREUM, ETHEREUM_CURRENT_TRANSACTION_STATUS);
let key = storage_prefix_build(PALLET_ETHEREUM, ETHEREUM_CURRENT_TRANSACTION_STATUSES);

// First we create block and insert data in the offchain db.

Expand Down Expand Up @@ -756,7 +756,7 @@ fn mapping_read_works() {
let statuses = vec![t1];

// Build a block and fill the pallet-ethereum status.
let key = storage_prefix_build(PALLET_ETHEREUM, ETHEREUM_CURRENT_TRANSACTION_STATUS);
let key = storage_prefix_build(PALLET_ETHEREUM, ETHEREUM_CURRENT_TRANSACTION_STATUSES);
let chain = client.chain_info();
let mut builder = BlockBuilderBuilder::new(&*client)
.on_parent_block(chain.best_hash)
Expand Down
2 changes: 0 additions & 2 deletions client/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ sp-blockchain = { workspace = true }
sp-core = { workspace = true }
sp-database = { workspace = true }
sp-runtime = { workspace = true }
sp-storage = { workspace = true, optional = true }
# Frontier
fc-api = { workspace = true }
fc-storage = { workspace = true, optional = true }
Expand Down Expand Up @@ -61,7 +60,6 @@ sql = [
"tokio",
"sc-client-api",
"sp-api",
"sp-storage",
"fc-storage",
"fp-consensus",
"fp-rpc",
Expand Down
94 changes: 27 additions & 67 deletions client/db/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ use sp_runtime::{
};
// Frontier
use fc_api::{FilteredLog, TransactionMetadata};
use fc_storage::OverrideHandle;
use fc_storage::{StorageOverride, StorageQuerier};
use fp_consensus::{FindLogError, Hashes, Log as ConsensusLog, PostLog, PreLog};
use fp_rpc::EthereumRuntimeRPCApi;
use fp_storage::{EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA};
use fp_storage::EthereumStorageSchema;

/// Maximum number to topics allowed to be filtered upon
const MAX_TOPIC_COUNT: u16 = 4;
Expand Down Expand Up @@ -98,7 +98,7 @@ pub struct Backend<Block: BlockT> {
pool: SqlitePool,

/// The additional overrides for the logs handler.
overrides: Arc<OverrideHandle<Block>>,
storage_override: Arc<dyn StorageOverride<Block>>,

/// The number of allowed operations for the Sqlite filter call.
/// A value of `0` disables the timeout.
Expand All @@ -114,7 +114,7 @@ where
config: BackendConfig<'_>,
pool_size: u32,
num_ops_timeout: Option<NonZeroU32>,
overrides: Arc<OverrideHandle<Block>>,
storage_override: Arc<dyn StorageOverride<Block>>,
) -> Result<Self, Error> {
let any_pool = SqlitePoolOptions::new()
.max_connections(pool_size)
Expand All @@ -123,7 +123,7 @@ where
let _ = Self::create_indexes_if_not_exist(&any_pool).await?;
Ok(Self {
pool: any_pool,
overrides,
storage_override,
num_ops_timeout: num_ops_timeout
.map(|n| n.get())
.unwrap_or(0)
Expand Down Expand Up @@ -232,8 +232,10 @@ where
.expect("runtime api reachable")
.expect("ethereum genesis block");

let schema =
Self::onchain_storage_schema(client.as_ref(), substrate_genesis_hash).encode();
let schema = StorageQuerier::new(client)
.storage_schema(substrate_genesis_hash)
.unwrap_or(EthereumStorageSchema::V3)
.encode();
let ethereum_block_hash = ethereum_block.header.hash().as_bytes().to_owned();
let substrate_block_hash = substrate_genesis_hash.as_bytes();
let block_number = 0i32;
Expand Down Expand Up @@ -266,7 +268,7 @@ where
fn insert_block_metadata_inner<Client, BE>(
client: Arc<Client>,
hash: H256,
overrides: Arc<OverrideHandle<Block>>,
storage_override: &dyn StorageOverride<Block>,
) -> Result<BlockMetadata, Error>
where
Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
Expand All @@ -277,16 +279,14 @@ where
if let Ok(Some(header)) = client.header(hash) {
match fp_consensus::find_log(header.digest()) {
Ok(log) => {
let schema = Self::onchain_storage_schema(client.as_ref(), hash);
let schema = StorageQuerier::new(client.clone())
.storage_schema(hash)
.unwrap_or(EthereumStorageSchema::V3);
let log_hashes = match log {
ConsensusLog::Post(PostLog::Hashes(post_hashes)) => post_hashes,
ConsensusLog::Post(PostLog::Block(block)) => Hashes::from_block(block),
ConsensusLog::Post(PostLog::BlockHash(expect_eth_block_hash)) => {
let ethereum_block = overrides
.schemas
.get(&schema)
.unwrap_or(&overrides.fallback)
.current_block(hash);
let ethereum_block = storage_override.current_block(hash);
match ethereum_block {
Some(block) => {
let got_eth_block_hash = block.header.hash();
Expand Down Expand Up @@ -366,9 +366,9 @@ where
BE::State: StateBackend<BlakeTwo256>,
{
// Spawn a blocking task to get block metadata from substrate backend.
let overrides = self.overrides.clone();
let storage_override = self.storage_override.clone();
let metadata = tokio::task::spawn_blocking(move || {
Self::insert_block_metadata_inner(client.clone(), hash, overrides)
Self::insert_block_metadata_inner(client.clone(), hash, &*storage_override)
})
.await
.map_err(|_| Error::Protocol("tokio blocking metadata task failed".to_string()))??;
Expand Down Expand Up @@ -435,14 +435,9 @@ where
}

/// Index the logs for the newly indexed blocks upto a `max_pending_blocks` value.
pub async fn index_block_logs<Client, BE>(&self, client: Arc<Client>, block_hash: Block::Hash)
where
Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
BE: BackendT<Block> + 'static,
BE::State: StateBackend<BlakeTwo256>,
{
pub async fn index_block_logs(&self, block_hash: Block::Hash) {
let pool = self.pool().clone();
let overrides = self.overrides.clone();
let storage_override = self.storage_override.clone();
let _ = async {
// The overarching db transaction for the task.
// Due to the async nature of this task, the same work is likely to happen
Expand All @@ -469,7 +464,7 @@ where
Ok(_) => {
// Spawn a blocking task to get log data from substrate backend.
let logs = tokio::task::spawn_blocking(move || {
Self::get_logs(client.clone(), overrides, block_hash)
Self::get_logs(storage_override, block_hash)
})
.await
.map_err(|_| Error::Protocol("tokio blocking task failed".to_string()))?;
Expand Down Expand Up @@ -512,26 +507,14 @@ where
log::debug!(target: "frontier-sql", "Batch committed");
}

fn get_logs<Client, BE>(
client: Arc<Client>,
overrides: Arc<OverrideHandle<Block>>,
fn get_logs(
storage_override: Arc<dyn StorageOverride<Block>>,
substrate_block_hash: H256,
) -> Vec<Log>
where
Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
BE: BackendT<Block> + 'static,
BE::State: StateBackend<BlakeTwo256>,
{
) -> Vec<Log> {
let mut logs: Vec<Log> = vec![];
let mut transaction_count: usize = 0;
let mut log_count: usize = 0;
let schema = Self::onchain_storage_schema(client.as_ref(), substrate_block_hash);
let handler = overrides
.schemas
.get(&schema)
.unwrap_or(&overrides.fallback);

let receipts = handler
let receipts = storage_override
.current_receipts(substrate_block_hash)
.unwrap_or_default();

Expand Down Expand Up @@ -565,20 +548,6 @@ where
logs
}

fn onchain_storage_schema<Client, BE>(client: &Client, at: Block::Hash) -> EthereumStorageSchema
where
Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
BE: BackendT<Block> + 'static,
BE::State: StateBackend<BlakeTwo256>,
{
match client.storage(at, &sp_storage::StorageKey(PALLET_ETHEREUM_SCHEMA.to_vec())) {
Ok(Some(bytes)) => Decode::decode(&mut &bytes.0[..])
.ok()
.unwrap_or(EthereumStorageSchema::Undefined),
_ => EthereumStorageSchema::Undefined,
}
}

/// Retrieves the status if a block has been already indexed.
pub async fn is_block_indexed(&self, block_hash: Block::Hash) -> bool {
sqlx::query("SELECT substrate_block_hash FROM sync_status WHERE substrate_block_hash = ?")
Expand Down Expand Up @@ -1034,7 +1003,7 @@ LIMIT 10001",
mod test {
use super::*;

use std::{collections::BTreeMap, path::Path};
use std::path::Path;

use maplit::hashset;
use scale_codec::Encode;
Expand All @@ -1051,7 +1020,7 @@ mod test {
};
// Frontier
use fc_api::Backend as BackendT;
use fc_storage::{OverrideHandle, SchemaV3Override, StorageOverride};
use fc_storage::SchemaV3StorageOverride;
use fp_storage::{EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA};

type OpaqueBlock =
Expand Down Expand Up @@ -1129,16 +1098,7 @@ mod test {
);
let client = Arc::new(client);
// Overrides
let mut overrides_map = BTreeMap::new();
overrides_map.insert(
EthereumStorageSchema::V3,
Box::new(SchemaV3Override::new(client.clone())) as Box<dyn StorageOverride<_>>,
);
let overrides = Arc::new(OverrideHandle {
schemas: overrides_map,
fallback: Box::new(SchemaV3Override::new(client.clone())),
});

let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
// Indexer backend
let indexer_backend = Backend::new(
BackendConfig::Sqlite(SqliteBackendConfig {
Expand All @@ -1153,7 +1113,7 @@ mod test {
}),
1,
None,
overrides.clone(),
storage_override.clone(),
)
.await
.expect("indexer pool to be created");
Expand Down
29 changes: 9 additions & 20 deletions client/mapping-sync/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,17 @@ use sp_blockchain::{Backend as _, HeaderBackend};
use sp_consensus::SyncOracle;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Zero};
// Frontier
use fc_storage::OverrideHandle;
use fc_storage::StorageOverride;
use fp_consensus::{FindLogError, Hashes, Log, PostLog, PreLog};
use fp_rpc::EthereumRuntimeRPCApi;

use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrategy};

pub fn sync_block<Block: BlockT, C, BE>(
client: &C,
overrides: Arc<OverrideHandle<Block>>,
pub fn sync_block<Block: BlockT>(
storage_override: Arc<dyn StorageOverride<Block>>,
backend: &fc_db::kv::Backend<Block>,
header: &Block::Header,
) -> Result<(), String>
where
C: HeaderBackend<Block> + StorageProvider<Block, BE>,
BE: Backend<Block>,
{
) -> Result<(), String> {
let substrate_block_hash = header.hash();
match fp_consensus::find_log(header.digest()) {
Ok(log) => {
Expand Down Expand Up @@ -77,13 +72,7 @@ where
backend.mapping().write_hashes(mapping_commitment)
}
PostLog::BlockHash(expect_eth_block_hash) => {
let schema =
fc_storage::onchain_storage_schema(client, substrate_block_hash);
let ethereum_block = overrides
.schemas
.get(&schema)
.unwrap_or(&overrides.fallback)
.current_block(substrate_block_hash);
let ethereum_block = storage_override.current_block(substrate_block_hash);
match ethereum_block {
Some(block) => {
let got_eth_block_hash = block.header.hash();
Expand Down Expand Up @@ -158,7 +147,7 @@ where
pub fn sync_one_block<Block: BlockT, C, BE>(
client: &C,
substrate_backend: &BE,
overrides: Arc<OverrideHandle<Block>>,
storage_override: Arc<dyn StorageOverride<Block>>,
frontier_backend: &fc_db::kv::Backend<Block>,
sync_from: <Block::Header as HeaderT>::Number,
strategy: SyncStrategy,
Expand Down Expand Up @@ -220,7 +209,7 @@ where
{
return Ok(false);
}
sync_block(client, overrides, frontier_backend, &operating_header)?;
sync_block(storage_override, frontier_backend, &operating_header)?;

current_syncing_tips.push(*operating_header.parent_hash());
frontier_backend
Expand All @@ -247,7 +236,7 @@ where
pub fn sync_blocks<Block: BlockT, C, BE>(
client: &C,
substrate_backend: &BE,
overrides: Arc<OverrideHandle<Block>>,
storage_override: Arc<dyn StorageOverride<Block>>,
frontier_backend: &fc_db::kv::Backend<Block>,
limit: usize,
sync_from: <Block::Header as HeaderT>::Number,
Expand All @@ -270,7 +259,7 @@ where
|| sync_one_block(
client,
substrate_backend,
overrides.clone(),
storage_override.clone(),
frontier_backend,
sync_from,
strategy,
Expand Down
Loading
Loading