Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
Merge pull request paritytech#285 from subspace/partial-replica-impl
Browse files Browse the repository at this point in the history
Partial replica support
  • Loading branch information
i1i1 authored Mar 25, 2022
2 parents 3042853 + 0b0645a commit 2b9d699
Show file tree
Hide file tree
Showing 12 changed files with 686 additions and 249 deletions.
4 changes: 1 addition & 3 deletions crates/subspace-core-primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl AsMut<[u8]> for Piece {
}

/// Flat representation of multiple pieces concatenated for higher efficient for processing.
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Encode, Decode, TypeInfo)]
#[derive(Debug, Default, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Encode, Decode, TypeInfo)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "std", serde(rename_all = "camelCase"))]
pub struct FlatPieces(Vec<u8>);
Expand Down Expand Up @@ -457,8 +457,6 @@ impl RootBlock {
}
}

/// Index of piece on disk
pub type PieceOffset = u64;
/// Piece index in consensus
pub type PieceIndex = u64;

Expand Down
7 changes: 4 additions & 3 deletions crates/subspace-farmer/benches/plot-write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,21 @@ use tempfile::TempDir;
#[tokio::main]
async fn main() {
let batch_size = 4096; // 16M
let piece_count = 2u64.pow(30); // 1G
let piece_count = 2u64.pow(20); // 4G
let base_directory = TempDir::new_in(std::env::current_dir().unwrap()).unwrap();

let mut pieces = Vec::with_capacity(batch_size as usize * PIECE_SIZE);
pieces.resize(batch_size as usize * PIECE_SIZE, 0u8);
rand::thread_rng().fill(&mut pieces[..]);
let pieces = Arc::new(pieces.try_into().unwrap());

let plot = Plot::open_or_create(&base_directory).unwrap();
let plot = Plot::open_or_create(&base_directory, [0; 32].into(), Some(piece_count)).unwrap();

let start = std::time::Instant::now();

for index in (0..piece_count / batch_size).map(|i| i * batch_size) {
plot.write_many(Arc::clone(&pieces), index as u64).unwrap();
plot.write_many(Arc::clone(&pieces), (index..index + batch_size).collect())
.unwrap();
}
drop(plot);

Expand Down
52 changes: 28 additions & 24 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,55 @@ use anyhow::{anyhow, Result};
use jsonrpsee::ws_server::WsServerBuilder;
use log::info;
use std::mem;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::PublicKey;
use subspace_core_primitives::PIECE_SIZE;
use subspace_farmer::ws_rpc_server::{RpcServer, RpcServerImpl};
use subspace_farmer::{
Commitments, FarmerData, Farming, Identity, ObjectMappings, Plot, Plotting, RpcClient, WsRpc,
};
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::libp2p::Multiaddr;
use subspace_networking::multimess::MultihashCode;
use subspace_networking::Config;
use subspace_solving::SubspaceCodec;

use crate::FarmingArgs;

/// Start farming by using plot in specified path and connecting to WebSocket server at specified
/// address.
pub(crate) async fn farm(
base_directory: PathBuf,
bootstrap_nodes: Vec<Multiaddr>,
listen_on: Vec<Multiaddr>,
node_rpc_url: &str,
ws_server_listen_addr: SocketAddr,
reward_address: Option<PublicKey>,
FarmingArgs {
bootstrap_nodes,
custom_path,
listen_on,
node_rpc_url,
ws_server_listen_addr,
reward_address,
plot_size,
}: FarmingArgs,
best_block_number_check_interval: Duration,
) -> Result<(), anyhow::Error> {
let base_directory = crate::utils::get_path(custom_path);

let identity = Identity::open_or_create(&base_directory)?;
let address = identity.public_key().to_bytes().into();

let reward_address = reward_address.unwrap_or(address);

// TODO: This doesn't account for the fact that node can
// have a completely different history to what farmer expects
info!("Opening plot");
let plot_fut = tokio::task::spawn_blocking({
let base_directory = base_directory.clone();

move || Plot::open_or_create(&base_directory)
// TODO: Piece count should account for database overhead of various additional databases
move || {
Plot::open_or_create(
&base_directory,
address,
plot_size.map(|plot_size| plot_size / PIECE_SIZE as u64),
)
}
});
let plot = plot_fut.await.unwrap()?;

Expand All @@ -55,25 +71,13 @@ pub(crate) async fn farm(
.await??;

info!("Connecting to node at {}", node_rpc_url);
let client = WsRpc::new(node_rpc_url).await?;
let client = WsRpc::new(&node_rpc_url).await?;

let farmer_metadata = client
.farmer_metadata()
.await
.map_err(|error| anyhow::Error::msg(error.to_string()))?;

let identity = Identity::open_or_create(&base_directory)?;

let reward_address = reward_address.unwrap_or_else(|| {
identity
.public_key()
.as_ref()
.to_vec()
.try_into()
.map(From::<[u8; 32]>::from)
.expect("Length of public key is always correct")
});

let subspace_codec = SubspaceCodec::new(identity.public_key());

// Start RPC server
Expand Down
81 changes: 41 additions & 40 deletions crates/subspace-farmer/src/bin/subspace-farmer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,34 @@ enum IdentityCommand {
},
}

/// Arguments for farmer
#[derive(Debug, Parser)]
struct FarmingArgs {
/// Multiaddrs of bootstrap nodes to connect to on startup, multiple are supported
#[clap(long)]
bootstrap_nodes: Vec<Multiaddr>,
/// Custom path for data storage instead of platform-specific default
#[clap(long, value_hint = ValueHint::FilePath)]
custom_path: Option<PathBuf>,
/// Multiaddr to listen on for subspace networking, for instance `/ip4/0.0.0.0/tcp/0`,
/// multiple are supported, subspace networking is disabled when none specified
#[clap(long)]
listen_on: Vec<Multiaddr>,
/// WebSocket RPC URL of the Subspace node to connect to
#[clap(long, value_hint = ValueHint::Url, default_value = "ws://127.0.0.1:9944")]
node_rpc_url: String,
/// Host and port where built-in WebSocket RPC server should listen for incoming connections
#[clap(long, short, default_value = "127.0.0.1:9955")]
ws_server_listen_addr: SocketAddr,
/// Address for farming rewards
#[clap(long, parse(try_from_str = parse_reward_address))]
reward_address: Option<PublicKey>,
// TODO: Should we require user to always set plot size?
/// Maximum plot size in human readable format (e.g. 10G, 2T) or just bytes (e.g. 4096).
#[clap(long, parse(try_from_str = parse_human_readable))]
plot_size: Option<u64>,
}

#[derive(Debug, Parser)]
#[clap(about, version)]
enum Command {
Expand All @@ -60,27 +88,17 @@ enum Command {
custom_path: Option<PathBuf>,
},
/// Start a farmer using previously created plot
Farm {
/// Multiaddrs of bootstrap nodes to connect to on startup, multiple are supported
#[clap(long)]
bootstrap_node: Vec<Multiaddr>,
/// Custom path for data storage instead of platform-specific default
#[clap(long, value_hint = ValueHint::FilePath)]
custom_path: Option<PathBuf>,
/// Multiaddr to listen on for subspace networking, for instance `/ip4/0.0.0.0/tcp/0`,
/// multiple are supported, subspace networking is disabled when none specified
#[clap(long)]
listen_on: Vec<Multiaddr>,
/// WebSocket RPC URL of the Subspace node to connect to
#[clap(long, value_hint = ValueHint::Url, default_value = "ws://127.0.0.1:9944")]
node_rpc_url: String,
/// Host and port where built-in WebSocket RPC server should listen for incoming connections
#[clap(long, short, default_value = "127.0.0.1:9955")]
ws_server_listen_addr: SocketAddr,
/// Address for farming rewards
#[clap(long, parse(try_from_str = parse_reward_address))]
reward_address: Option<PublicKey>,
},
Farm(FarmingArgs),
}

fn parse_human_readable(s: &str) -> Result<u64, std::num::ParseIntError> {
const SUFFIXES: &[(&str, u64)] = &[("G", 10u64.pow(9)), ("T", 10u64.pow(12))];

SUFFIXES
.iter()
.find_map(|(suf, mul)| s.strip_suffix(suf).map(|s| (s, mul)))
.map(|(s, mul)| s.parse::<u64>().map(|num| num * mul))
.unwrap_or_else(|| s.parse::<u64>())
}

fn parse_reward_address(s: &str) -> Result<PublicKey, PublicError> {
Expand All @@ -106,25 +124,8 @@ async fn main() -> Result<()> {
commands::wipe(&path)?;
info!("Done");
}
Command::Farm {
bootstrap_node,
custom_path,
listen_on,
node_rpc_url,
ws_server_listen_addr,
reward_address,
} => {
let path = utils::get_path(custom_path);
commands::farm(
path,
bootstrap_node,
listen_on,
&node_rpc_url,
ws_server_listen_addr,
reward_address,
BEST_BLOCK_NUMBER_CHECK_INTERVAL,
)
.await?;
Command::Farm(args) => {
commands::farm(args, BEST_BLOCK_NUMBER_CHECK_INTERVAL).await?;
}
}
Ok(())
Expand Down
67 changes: 48 additions & 19 deletions crates/subspace-farmer/src/commitments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod commitment_databases;
#[cfg(test)]
mod tests;

use crate::plot::Plot;
use crate::plot::{PieceOffset, Plot};
use arc_swap::ArcSwapOption;
use commitment_databases::{CommitmentDatabases, CreateDbEntryResult, DbEntry};
use event_listener_primitives::{Bag, HandlerId};
Expand All @@ -13,7 +13,7 @@ use rocksdb::DB;
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
use subspace_core_primitives::{FlatPieces, PieceOffset, Salt, Tag, PIECE_SIZE};
use subspace_core_primitives::{Piece, Salt, Tag, PIECE_SIZE};
use thiserror::Error;

const BATCH_SIZE: u64 = (16 * 1024 * 1024 / PIECE_SIZE) as u64;
Expand Down Expand Up @@ -170,12 +170,7 @@ impl Commitments {
Ok(())
}

/// Create commitments for all salts for specified pieces
pub(crate) fn create_for_pieces(
&self,
pieces: &Arc<FlatPieces>,
start_offset: u64,
) -> Result<(), CommitmentError> {
pub(crate) fn remove_pieces(&self, pieces: &[Piece]) -> Result<(), CommitmentError> {
let salts = self.inner.commitment_databases.lock().get_salts();

for salt in salts {
Expand All @@ -194,28 +189,62 @@ impl Commitments {

let db_guard = db_entry.lock();

let db = match db_guard.clone() {
Some(db) => db,
if let Some(db) = db_guard.as_ref() {
for piece in pieces {
let tag = subspace_solving::create_tag(piece, salt);
db.delete(tag).map_err(CommitmentError::CommitmentDb)?;
}
}
}

Ok(())
}

/// Create commitments for all salts for specified pieces
pub(crate) fn create_for_pieces<'a, 'iter, F, Iter>(
&'a self,
pieces_with_offsets: F,
) -> Result<(), CommitmentError>
where
F: Fn() -> Iter,
Iter: Iterator<Item = (PieceOffset, &'iter [u8])>,
{
let salts = self.inner.commitment_databases.lock().get_salts();

for salt in salts {
let db_entry = match self
.inner
.commitment_databases
.lock()
.get_db_entry(&salt)
.cloned()
{
Some(db_entry) => db_entry,
None => {
continue;
}
};

let tags: Vec<Tag> = pieces
.par_chunks_exact(PIECE_SIZE)
.map(|piece| subspace_solving::create_tag(piece, salt))
.collect();
let db_guard = db_entry.lock();

for (tag, offset) in tags.iter().zip(start_offset..) {
db.put(tag, offset.to_le_bytes())
.map_err(CommitmentError::CommitmentDb)?;
}
if let Some(db) = db_guard.as_ref() {
let tags_with_offset: Vec<(PieceOffset, Tag)> = pieces_with_offsets()
.map(|(piece_offset, piece)| {
(piece_offset, subspace_solving::create_tag(piece, salt))
})
.collect();

for (piece_offset, tag) in tags_with_offset {
db.put(tag, piece_offset.to_le_bytes())
.map_err(CommitmentError::CommitmentDb)?;
}
};
}

Ok(())
}

/// Finds the commitment/s falling in the range of the challenge
/// Finds the commitment falling in the range of the challenge
pub(crate) fn find_by_range(
&self,
target: Tag,
Expand Down
Loading

0 comments on commit 2b9d699

Please sign in to comment.