Skip to content

Commit

Permalink
feat: add EtlConfig as well as setting the directory to datadir (#7124
Browse files Browse the repository at this point in the history
)

Co-authored-by: Mikhail Sozin <mikhail.sozin@chainstack.com>
Co-authored-by: Misha <mikawamp@gmail.com>
Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
  • Loading branch information
4 people committed Mar 13, 2024
1 parent 5d6ac4c commit 28f3a2e
Show file tree
Hide file tree
Showing 18 changed files with 118 additions and 43 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl Command {
header_downloader,
body_downloader,
factory.clone(),
stage_conf.etl.etl_file_size,
stage_conf.etl.clone(),
)
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
Expand Down
4 changes: 1 addition & 3 deletions bin/reth/src/commands/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,6 @@ impl ImportCommand {

let max_block = file_client.max_block().unwrap_or(0);

let etl_file_size = config.stages.etl.etl_file_size;

let mut pipeline = Pipeline::builder()
.with_tip_sender(tip_tx)
// we want to sync all blocks the file client provides or 0 if empty
Expand All @@ -193,7 +191,7 @@ impl ImportCommand {
header_downloader,
body_downloader,
factory.clone(),
etl_file_size,
config.stages.etl,
)
.set(SenderRecoveryStage {
commit_threshold: config.stages.sender_recovery.commit_threshold,
Expand Down
15 changes: 12 additions & 3 deletions bin/reth/src/commands/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
};
use clap::Parser;
use reth_beacon_consensus::BeaconConsensus;
use reth_config::Config;
use reth_config::{config::EtlConfig, Config};
use reth_db::init_db;
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
use reth_node_ethereum::EthEvmConfig;
Expand Down Expand Up @@ -86,6 +86,10 @@ pub struct Command {
#[arg(long)]
etl_file_size: Option<usize>,

/// Directory where to collect ETL files
#[arg(long)]
etl_dir: Option<PathBuf>,

/// Normally, running the stage requires unwinding for stages that already
/// have been run, in order to not rewrite to the same database slots.
///
Expand Down Expand Up @@ -155,7 +159,12 @@ impl Command {

let batch_size = self.batch_size.unwrap_or(self.to - self.from + 1);

let etl_file_size = self.etl_file_size.unwrap_or(500 * 1024 * 1024);
let etl_config = EtlConfig::new(
Some(
self.etl_dir.unwrap_or_else(|| EtlConfig::from_datadir(&data_dir.data_dir_path())),
),
self.etl_file_size.unwrap_or(EtlConfig::default_file_size()),
);

let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
match self.stage {
Expand Down Expand Up @@ -235,7 +244,7 @@ impl Command {
)
}
StageEnum::TxLookup => {
(Box::new(TransactionLookupStage::new(batch_size, etl_file_size, None)), None)
(Box::new(TransactionLookupStage::new(batch_size, etl_config, None)), None)
}
StageEnum::AccountHashing => {
(Box::new(AccountHashingStage::new(1, batch_size)), None)
Expand Down
14 changes: 14 additions & 0 deletions book/cli/reth/recover/storage-tries.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ Options:
-h, --help
Print help (see a summary with '-h')
Database:
--db.log-level <LOG_LEVEL>
Database logging level. Levels higher than "notice" require a debug build
Possible values:
- fatal: Enables logging for critical conditions, i.e. assertion failures
- error: Enables logging for error conditions
- warn: Enables logging for warning conditions
- notice: Enables logging for normal but significant condition
- verbose: Enables logging for verbose informational
- debug: Enables logging for debug-level messages
- trace: Enables logging for trace debug-level messages
- extra: Enables logging for extra debug-level messages
Logging:
--log.stdout.format <FORMAT>
The format to use for logs written to stdout
Expand Down
5 changes: 4 additions & 1 deletion book/cli/reth/stage/run.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ Options:
Batch size for stage execution and unwind

--etl-file-size <ETL_FILE_SIZE>
Size for temporary file during ETL stages
The maximum size in bytes of data held in memory before being flushed to disk as a file

--etl-dir <ETL_DIR>
Directory where to collect ETL files

-s, --skip-unwind
Normally, running the stage requires unwinding for stages that already have been run, in order to not rewrite to the same database slots.
Expand Down
31 changes: 27 additions & 4 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use reth_network::{NetworkConfigBuilder, PeersConfig, SessionsConfig};
use reth_primitives::PruneModes;
use secp256k1::SecretKey;
use serde::{Deserialize, Deserializer, Serialize};
use std::{path::PathBuf, time::Duration};
use std::{
path::{Path, PathBuf},
time::Duration,
};

/// Configuration for the reth node.
#[derive(Debug, Clone, Default, Deserialize, PartialEq, Serialize)]
Expand Down Expand Up @@ -238,16 +241,36 @@ impl Default for TransactionLookupConfig {
}

/// Common ETL related configuration.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
#[serde(default)]
pub struct EtlConfig {
/// Data directory where temporary files are created.
pub dir: Option<PathBuf>,
/// The maximum size in bytes of data held in memory before being flushed to disk as a file.
pub etl_file_size: usize,
pub file_size: usize,
}

impl Default for EtlConfig {
fn default() -> Self {
Self { etl_file_size: 500 * (1024 * 1024) }
Self { dir: None, file_size: Self::default_file_size() }
}
}

impl EtlConfig {
/// Creates an ETL configuration
pub fn new(dir: Option<PathBuf>, file_size: usize) -> Self {
Self { dir, file_size }
}

/// Return default ETL directory from datadir path.
pub fn from_datadir(path: &Path) -> PathBuf {
path.join("etl-tmp")
}

/// Default size in bytes of data held in memory before being flushed to disk as a file.
pub const fn default_file_size() -> usize {
// 500 MB
500 * (1024 * 1024)
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/consensus/beacon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ reth-tracing.workspace = true
reth-revm.workspace = true
reth-downloaders.workspace = true
reth-node-ethereum.workspace = true
reth-config.workspace = true

assert_matches.workspace = true

Expand Down
3 changes: 2 additions & 1 deletion crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
use reth_blockchain_tree::{
config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree,
};
use reth_config::config::EtlConfig;
use reth_db::{test_utils::TempDatabase, DatabaseEnv as DE};
type DatabaseEnv = TempDatabase<DE>;
use reth_downloaders::{
Expand Down Expand Up @@ -406,7 +407,7 @@ where
header_downloader,
body_downloader,
executor_factory.clone(),
500 * (1024 * 1024),
EtlConfig::default(),
))
}
};
Expand Down
19 changes: 15 additions & 4 deletions crates/etl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
cmp::Reverse,
collections::BinaryHeap,
io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write},
path::Path,
path::{Path, PathBuf},
};

use rayon::prelude::*;
Expand All @@ -42,6 +42,8 @@ where
<K as Encode>::Encoded: std::fmt::Debug,
<V as Compress>::Compressed: std::fmt::Debug,
{
/// Parent directory where to create ETL files
parent_dir: Option<PathBuf>,
/// Directory for temporary file storage
dir: Option<TempDir>,
/// Collection of temporary ETL files
Expand All @@ -66,8 +68,9 @@ where
/// Create a new collector with some capacity.
///
/// Once the capacity (in bytes) is reached, the data is sorted and flushed to disk.
pub fn new(buffer_capacity_bytes: usize) -> Self {
pub fn new(buffer_capacity_bytes: usize, parent_dir: Option<PathBuf>) -> Self {
Self {
parent_dir,
dir: None,
buffer_size_bytes: 0,
files: Vec::new(),
Expand Down Expand Up @@ -115,7 +118,15 @@ where
/// doesn't exist, it will be created.
fn dir(&mut self) -> io::Result<&TempDir> {
if self.dir.is_none() {
self.dir = Some(TempDir::new()?);
self.dir = match &self.parent_dir {
Some(dir) => {
if !dir.exists() {
std::fs::create_dir_all(dir)?;
}
Some(TempDir::new_in(dir)?)
}
None => Some(TempDir::new()?),
};
}
Ok(self.dir.as_ref().unwrap())
}
Expand Down Expand Up @@ -273,7 +284,7 @@ mod tests {
let mut entries: Vec<_> =
(0..10_000).map(|id| (TxHash::random(), id as TxNumber)).collect();

let mut collector = Collector::new(1024);
let mut collector = Collector::new(1024, None);
assert!(collector.dir.is_none());

for (k, v) in entries.clone() {
Expand Down
8 changes: 7 additions & 1 deletion crates/node-builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use reth_beacon_consensus::{
BeaconConsensusEngine,
};
use reth_blockchain_tree::{BlockchainTreeConfig, ShareableBlockchainTree};
use reth_config::config::EtlConfig;
use reth_db::{
database::Database,
database_metrics::{DatabaseMetadata, DatabaseMetrics},
Expand Down Expand Up @@ -512,7 +513,7 @@ where
executor,
data_dir,
mut config,
reth_config,
mut reth_config,
..
} = ctx;

Expand Down Expand Up @@ -556,6 +557,11 @@ where
hooks.add(StaticFileHook::new(static_file_producer.clone(), Box::new(executor.clone())));
info!(target: "reth::cli", "StaticFileProducer initialized");

// Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to
if reth_config.stages.etl.dir.is_none() {
reth_config.stages.etl.dir = Some(EtlConfig::from_datadir(&data_dir.data_dir_path()));
}

// Configure the pipeline
let (mut pipeline, client) = if config.dev.dev {
info!(target: "reth::cli", "Starting Reth in dev mode");
Expand Down
4 changes: 2 additions & 2 deletions crates/node-core/src/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ impl NodeConfig {
header_downloader,
body_downloader,
factory.clone(),
stage_config.etl.etl_file_size,
stage_config.etl.clone(),
)
.set(SenderRecoveryStage {
commit_threshold: stage_config.sender_recovery.commit_threshold,
Expand Down Expand Up @@ -871,7 +871,7 @@ impl NodeConfig {
.set(MerkleStage::new_execution(stage_config.merkle.clean_threshold))
.set(TransactionLookupStage::new(
stage_config.transaction_lookup.chunk_size,
stage_config.etl.etl_file_size,
stage_config.etl.clone(),
prune_modes.transaction_lookup,
))
.set(IndexAccountHistoryStage::new(
Expand Down
1 change: 1 addition & 0 deletions crates/stages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ reth-trie = { workspace = true, features = ["metrics"] }
reth-tokio-util.workspace = true
reth-etl.workspace = true
reth-static-file.workspace = true
reth-config.workspace = true

# async
tokio = { workspace = true, features = ["sync"] }
Expand Down
3 changes: 2 additions & 1 deletion crates/stages/benches/criterion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use criterion::{
BenchmarkGroup, Criterion,
};
use pprof::criterion::{Output, PProfProfiler};
use reth_config::config::EtlConfig;
use reth_db::{test_utils::TempDatabase, DatabaseEnv};

use reth_primitives::{stage::StageCheckpoint, BlockNumber};
Expand Down Expand Up @@ -57,7 +58,7 @@ fn transaction_lookup(c: &mut Criterion) {
let mut group = c.benchmark_group("Stages");
// don't need to run each stage for that many times
group.sample_size(10);
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, 500 * 1024 * 1024, None);
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, EtlConfig::default(), None);

let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS);

Expand Down
3 changes: 2 additions & 1 deletion crates/stages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
//! # use reth_provider::HeaderSyncMode;
//! # use reth_provider::test_utils::create_test_provider_factory;
//! # use reth_static_file::StaticFileProducer;
//! # use reth_config::config::EtlConfig;
//! #
//! # let chain_spec = MAINNET.clone();
//! # let consensus: Arc<dyn Consensus> = Arc::new(TestConsensus::default());
Expand Down Expand Up @@ -59,7 +60,7 @@
//! headers_downloader,
//! bodies_downloader,
//! executor_factory,
//! 500*1024*1024,
//! EtlConfig::default(),
//! )
//! )
//! .build(provider_factory, static_file_producer);
Expand Down

0 comments on commit 28f3a2e

Please sign in to comment.