From 0adb3a59bd14a75f51c5dbc5f0ea656f723492c4 Mon Sep 17 00:00:00 2001 From: yukang Date: Fri, 3 Nov 2023 22:57:01 +0800 Subject: [PATCH 01/14] add run migration in background --- Cargo.lock | 3 + db-migration/Cargo.toml | 3 + db-migration/src/lib.rs | 184 +++++++++++++++++- shared/src/shared_builder.rs | 11 +- util/migrate/src/migrate.rs | 28 ++- util/migrate/src/migrations/mod.rs | 2 + .../migrations/set_2019_block_cycle_zero.rs | 98 ++++++++++ util/migrate/src/migrations/test_migrate.rs | 62 ++++++ 8 files changed, 378 insertions(+), 13 deletions(-) create mode 100644 util/migrate/src/migrations/set_2019_block_cycle_zero.rs create mode 100644 util/migrate/src/migrations/test_migrate.rs diff --git a/Cargo.lock b/Cargo.lock index b3a05d75ad..9fca0ac1f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -748,12 +748,15 @@ name = "ckb-db-migration" version = "0.114.0-pre" dependencies = [ "ckb-app-config", + "ckb-channel", "ckb-db", "ckb-db-schema", "ckb-error", "ckb-logger", + "ckb-stop-handler", "console", "indicatif", + "once_cell", "tempfile", ] diff --git a/db-migration/Cargo.toml b/db-migration/Cargo.toml index 3fec4f0d9e..b93c936d90 100644 --- a/db-migration/Cargo.toml +++ b/db-migration/Cargo.toml @@ -15,6 +15,9 @@ ckb-db = { path = "../db", version = "= 0.114.0-pre" } ckb-logger = { path = "../util/logger", version = "= 0.114.0-pre" } ckb-error = { path = "../error", version = "= 0.114.0-pre" } ckb-db-schema = { path = "../db-schema", version = "= 0.114.0-pre" } +ckb-channel = { path = "../util/channel", version = "= 0.114.0-pre" } +ckb-stop-handler = { path = "../util/stop-handler", version = "= 0.114.0-pre" } +once_cell = "1.8.0" indicatif = "0.16" console = ">=0.9.1, <1.0.0" diff --git a/db-migration/src/lib.rs b/db-migration/src/lib.rs index e2243f216f..585e4e2c2a 100644 --- a/db-migration/src/lib.rs +++ b/db-migration/src/lib.rs @@ -1,13 +1,25 @@ //! TODO(doc): @quake +use ckb_channel::{unbounded, Receiver}; use ckb_db::{ReadOnlyDB, RocksDB}; use ckb_db_schema::{COLUMN_META, META_TIP_HEADER_KEY, MIGRATION_VERSION_KEY}; use ckb_error::{Error, InternalErrorKind}; use ckb_logger::{debug, error, info}; +use ckb_stop_handler::register_thread; use console::Term; pub use indicatif::{HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle}; use std::cmp::Ordering; use std::collections::BTreeMap; +use std::collections::VecDeque; +use std::fs::OpenOptions; +use std::io::Write; +use std::sync::atomic::AtomicBool; use std::sync::Arc; +use std::sync::Mutex; +use std::thread; +use std::thread::JoinHandle; + +pub static SHUTDOWN_BACKGROUND_MIGRATION: once_cell::sync::Lazy = + once_cell::sync::Lazy::new(|| AtomicBool::new(false)); #[cfg(test)] mod tests; @@ -19,7 +31,59 @@ fn internal_error(reason: String) -> Error { /// TODO(doc): @quake #[derive(Default)] pub struct Migrations { - migrations: BTreeMap>, + migrations: BTreeMap>, +} + +/// Commands +#[derive(PartialEq, Eq)] +enum Command { + Start, + //Stop, +} + +struct MigrationWorker { + tasks: Arc)>>>, + db: RocksDB, + inbox: Receiver, +} + +impl MigrationWorker { + pub fn new( + tasks: Arc)>>>, + db: RocksDB, + inbox: Receiver, + ) -> Self { + Self { tasks, db, inbox } + } + + pub fn start(self) -> JoinHandle<()> { + thread::spawn(move || { + let msg = match self.inbox.recv() { + Ok(msg) => Some(msg), + Err(_err) => return, + }; + + if let Some(Command::Start) = msg { + // Progress Bar is no need in background, but here we fake one to keep the trait API + // consistent with the foreground migration. + loop { + let db = self.db.clone(); + let pb = move |_count: u64| -> ProgressBar { ProgressBar::new(0) }; + if let Some((name, task)) = self.tasks.lock().unwrap().pop_front() { + eprintln!("start to run migrate: {}", name); + let db = task.migrate(db, Arc::new(pb)).unwrap(); + db.put_default(MIGRATION_VERSION_KEY, task.version()) + .map_err(|err| { + internal_error(format!("failed to migrate the database: {err}")) + }) + .unwrap(); + } else { + break; + } + } + } + }) + } } impl Migrations { @@ -31,7 +95,7 @@ impl Migrations { } /// TODO(doc): @quake - pub fn add_migration(&mut self, migration: Box) { + pub fn add_migration(&mut self, migration: Arc) { self.migrations .insert(migration.version().to_string(), migration); } @@ -97,6 +161,27 @@ impl Migrations { .any(|m| m.expensive()) } + pub fn run_in_background(&self, db: &ReadOnlyDB) -> bool { + let db_version = match db + .get_pinned_default(MIGRATION_VERSION_KEY) + .expect("get the version of database") + { + Some(version_bytes) => { + String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8") + } + None => { + // if version is none, but db is not empty + // patch 220464f + return self.is_non_empty_rdb(db); + } + }; + + self.migrations + .values() + .skip_while(|m| m.version() <= db_version.as_str()) + .all(|m| m.run_in_background()) + } + fn is_non_empty_rdb(&self, db: &ReadOnlyDB) -> bool { if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY) { if v.is_some() { @@ -139,6 +224,34 @@ impl Migrations { Ok(db) } + fn run_migrate_async(&self, db: RocksDB, v: &str) { + let migrations: VecDeque<(String, Arc)> = self + .migrations + .iter() + .filter(|(mv, _)| mv.as_str() > v) + .map(|(mv, m)| (mv.to_string(), Arc::clone(m))) + .collect::>(); + + let all_can_resume = migrations.iter().all(|(_, m)| m.can_resume()); + let tasks = Arc::new(Mutex::new(migrations)); + let (tx, rx) = unbounded(); + let worker = MigrationWorker::new(tasks, db.clone(), rx); + + let exit_signal = ckb_stop_handler::new_crossbeam_exit_rx(); + thread::spawn(move || { + let _ = exit_signal.recv(); + SHUTDOWN_BACKGROUND_MIGRATION.store(true, std::sync::atomic::Ordering::SeqCst); + eprintln!("set shutdown flat to true"); + }); + + let handler = worker.start(); + tx.send(Command::Start).expect("send start command"); + if all_can_resume { + eprintln!("register thread: migration ...."); + register_thread("migration", handler); + } + } + fn get_migration_version(&self, db: &RocksDB) -> Result, Error> { let raw = db .get_pinned_default(MIGRATION_VERSION_KEY) @@ -199,6 +312,38 @@ impl Migrations { } } + /// TODO(doc): @quake + pub fn migrate_async(&self, db: RocksDB) -> Result { + let db_version = self.get_migration_version(&db)?; + match db_version { + Some(ref v) => { + info!("Current database version {}", v); + if let Some(m) = self.migrations.values().last() { + if m.version() < v.as_str() { + error!( + "Database downgrade detected. \ + The database schema version is newer than client schema version,\ + please upgrade to the newer version" + ); + return Err(internal_error( + "Database downgrade is not supported".to_string(), + )); + } + } + self.run_migrate_async(db.clone(), v.as_str()); + Ok(db) + } + None => { + // if version is none, but db is not empty + // patch 220464f + if self.is_non_empty_db(&db) { + return self.patch_220464f(db); + } + Ok(db) + } + } + } + fn patch_220464f(&self, db: RocksDB) -> Result { const V: &str = "20210609195048"; // AddExtraDataHash - 1 self.run_migrate(db, V) @@ -206,7 +351,7 @@ impl Migrations { } /// TODO(doc): @quake -pub trait Migration { +pub trait Migration: Send + Sync { /// TODO(doc): @quake fn migrate( &self, @@ -223,6 +368,29 @@ pub trait Migration { fn expensive(&self) -> bool { true } + + fn run_in_background(&self) -> bool { + false + } + + /// Check if the background migration should be stopped. + /// If a migration need to implement the recovery logic, it should check this flag periodically, + /// store the migration progress when exiting and recover from the current progress when restarting. + fn stop_background(&self) -> bool { + SHUTDOWN_BACKGROUND_MIGRATION.load(std::sync::atomic::Ordering::SeqCst) + } + + /// Check if the background migration can be resumed. + /// + /// If a migration can be resumed, it should implement the recovery logic in `migrate` function. + /// and the `MigirateWorker` will add the migration's handler with `register_thread`, so that then + /// main thread can wait for the background migration to store the progress and exit. + /// + /// Otherwise, the migration will be restarted from the beginning. + /// + fn can_resume(&self) -> bool { + false + } } /// TODO(doc): @quake @@ -256,3 +424,13 @@ impl Migration for DefaultMigration { false } } + +pub fn append_to_file(path: &str, data: &str) -> std::io::Result<()> { + let mut file = OpenOptions::new() + .write(true) + .append(true) + .create(true) + .open(path)?; + + writeln!(file, "{}", data) +} diff --git a/shared/src/shared_builder.rs b/shared/src/shared_builder.rs index 7f5a0673bb..f3563295a5 100644 --- a/shared/src/shared_builder.rs +++ b/shared/src/shared_builder.rs @@ -71,7 +71,8 @@ pub fn open_or_create_db( } Ordering::Equal => Ok(RocksDB::open(config, COLUMNS)), Ordering::Less => { - if migrate.require_expensive(&db) { + let can_run_in_background = migrate.can_run_in_background(&db); + if migrate.require_expensive(&db) && !can_run_in_background { eprintln!( "For optimal performance, CKB recommends migrating your data into a new format.\n\ If you prefer to stick with the older version, \n\ @@ -82,6 +83,14 @@ pub fn open_or_create_db( root_dir.display() ); Err(ExitCode::Failure) + } else if can_run_in_background { + info!("process migrations in background ..."); + let db = RocksDB::open(config, COLUMNS); + migrate.migrate_async(db.clone()).map_err(|err| { + eprintln!("Run error: {err:?}"); + ExitCode::Failure + })?; + Ok(db) } else { info!("Processing fast migrations ..."); diff --git a/util/migrate/src/migrate.rs b/util/migrate/src/migrate.rs index 99d6a65b7e..578d5801a0 100644 --- a/util/migrate/src/migrate.rs +++ b/util/migrate/src/migrate.rs @@ -7,6 +7,7 @@ use ckb_db_schema::{COLUMNS, COLUMN_META}; use ckb_error::Error; use std::cmp::Ordering; use std::path::PathBuf; +use std::sync::Arc; const INIT_DB_VERSION: &str = "20191127135521"; @@ -20,15 +21,16 @@ impl Migrate { /// Construct new migrate pub fn new>(path: P) -> Self { let mut migrations = Migrations::default(); - migrations.add_migration(Box::new(DefaultMigration::new(INIT_DB_VERSION))); - migrations.add_migration(Box::new(migrations::ChangeMoleculeTableToStruct)); // since v0.35.0 - migrations.add_migration(Box::new(migrations::CellMigration)); // since v0.37.0 - migrations.add_migration(Box::new(migrations::AddNumberHashMapping)); // since v0.40.0 - migrations.add_migration(Box::new(migrations::AddExtraDataHash)); // since v0.43.0 - migrations.add_migration(Box::new(migrations::AddBlockExtensionColumnFamily)); // since v0.100.0 - migrations.add_migration(Box::new(migrations::AddChainRootMMR)); // TODO(light-client) update the comment: which version? - migrations.add_migration(Box::new(migrations::AddBlockFilterColumnFamily)); // since v0.105.0 - migrations.add_migration(Box::new(migrations::AddBlockFilterHash)); // since v0.108.0 + migrations.add_migration(Arc::new(DefaultMigration::new(INIT_DB_VERSION))); + migrations.add_migration(Arc::new(migrations::ChangeMoleculeTableToStruct)); // since v0.35.0 + migrations.add_migration(Arc::new(migrations::CellMigration)); // since v0.37.0 + migrations.add_migration(Arc::new(migrations::AddNumberHashMapping)); // since v0.40.0 + migrations.add_migration(Arc::new(migrations::AddExtraDataHash)); // since v0.43.0 + migrations.add_migration(Arc::new(migrations::AddBlockExtensionColumnFamily)); // since v0.100.0 + migrations.add_migration(Arc::new(migrations::AddChainRootMMR)); // TODO(light-client) update the comment: which version? + migrations.add_migration(Arc::new(migrations::AddBlockFilterColumnFamily)); // since v0.105.0 + migrations.add_migration(Arc::new(migrations::AddBlockFilterHash)); // since v0.108.0 + migrations.add_migration(Arc::new(migrations::BlockExt2019ToZero::new(hardforks))); // since v0.111.1 Migrate { migrations, @@ -59,6 +61,10 @@ impl Migrate { self.migrations.expensive(db) } + pub fn can_run_in_background(&self, db: &ReadOnlyDB) -> bool { + self.migrations.run_in_background(db) + } + /// Open bulk load db. pub fn open_bulk_load_db(&self) -> Result, Error> { RocksDB::prepare_for_bulk_load_open(&self.path, COLUMNS) @@ -69,6 +75,10 @@ impl Migrate { self.migrations.migrate(db) } + pub fn migrate_async(self, db: RocksDB) -> Result { + self.migrations.migrate_async(db) + } + /// Perform init_db_version. pub fn init_db_version(self, db: &RocksDB) -> Result<(), Error> { self.migrations.init_db_version(db) diff --git a/util/migrate/src/migrations/mod.rs b/util/migrate/src/migrations/mod.rs index 37e6136371..e78bac582c 100644 --- a/util/migrate/src/migrations/mod.rs +++ b/util/migrate/src/migrations/mod.rs @@ -6,6 +6,7 @@ mod add_extra_data_hash; mod add_number_hash_mapping; mod cell; mod table_to_struct; +mod test_migrate; pub use add_block_extension_cf::AddBlockExtensionColumnFamily; pub use add_block_filter::AddBlockFilterColumnFamily; @@ -15,3 +16,4 @@ pub use add_extra_data_hash::AddExtraDataHash; pub use add_number_hash_mapping::AddNumberHashMapping; pub use cell::CellMigration; pub use table_to_struct::ChangeMoleculeTableToStruct; +pub use test_migrate::TestMigration; diff --git a/util/migrate/src/migrations/set_2019_block_cycle_zero.rs b/util/migrate/src/migrations/set_2019_block_cycle_zero.rs new file mode 100644 index 0000000000..1b25e4a3c9 --- /dev/null +++ b/util/migrate/src/migrations/set_2019_block_cycle_zero.rs @@ -0,0 +1,98 @@ +use ckb_app_config::StoreConfig; +//use ckb_db_migration::SHUTDOWN_BACKGROUND_MIGRATION; +use ckb_db_migration::{Migration, ProgressBar, ProgressStyle}; +use ckb_db_schema::COLUMN_EPOCH; +use ckb_store::{ChainDB, ChainStore}; +use ckb_types::{ + core::hardfork::HardForks, + packed, + prelude::{Entity, FromSliceShouldBeOk, Pack, Reader}, +}; + +const VERSION: &str = "20231101000000"; +pub struct BlockExt2019ToZero { + hardforks: HardForks, +} + +impl BlockExt2019ToZero { + pub fn new(hardforks: HardForks) -> Self { + BlockExt2019ToZero { hardforks } + } +} + +impl Migration for BlockExt2019ToZero { + fn run_in_background(&self) -> bool { + true + } + + fn migrate( + &self, + db: ckb_db::RocksDB, + pb: std::sync::Arc ProgressBar + Send + Sync>, + ) -> Result { + let chain_db = ChainDB::new(db, StoreConfig::default()); + let limit_epoch = self.hardforks.ckb2021.rfc_0032(); + + eprintln!("begin to run block_ext 2019 to zero migrate..."); + if limit_epoch == 0 { + return Ok(chain_db.into_inner()); + } + + eprintln!("now limit epoch is {}", limit_epoch); + let epoch_number: packed::Uint64 = limit_epoch.pack(); + + if let Some(epoch_hash) = chain_db.get(COLUMN_EPOCH, epoch_number.as_slice()) { + let epoch_ext = chain_db + .get_epoch_ext( + &packed::Byte32Reader::from_slice_should_be_ok(epoch_hash.as_ref()).to_entity(), + ) + .expect("db must have epoch ext"); + let mut header = chain_db + .get_block_header(&epoch_ext.last_block_hash_in_previous_epoch()) + .expect("db must have header"); + + let pb = ::std::sync::Arc::clone(&pb); + let pbi = pb(header.number() + 1); + pbi.set_style( + ProgressStyle::default_bar() + .template( + "{prefix:.bold.dim} {spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta}) {msg}", + ) + .progress_chars("#>-"), + ); + pbi.set_position(0); + pbi.enable_steady_tick(5000); + + loop { + let db_txn = chain_db.begin_transaction(); + for _ in 0..10000 { + let hash = header.hash(); + + let mut old_block_ext = db_txn.get_block_ext(&hash).unwrap(); + old_block_ext.cycles = None; + db_txn.insert_block_ext(&hash, &old_block_ext)?; + + header = db_txn + .get_block_header(&header.parent_hash()) + .expect("db must have header"); + + pbi.inc(1); + + if header.is_genesis() { + break; + } + } + db_txn.commit()?; + + if header.is_genesis() { + break; + } + } + } + + Ok(chain_db.into_inner()) + } + fn version(&self) -> &str { + VERSION + } +} diff --git a/util/migrate/src/migrations/test_migrate.rs b/util/migrate/src/migrations/test_migrate.rs new file mode 100644 index 0000000000..df56f6bee2 --- /dev/null +++ b/util/migrate/src/migrations/test_migrate.rs @@ -0,0 +1,62 @@ +use ckb_app_config::StoreConfig; +use ckb_db::{Direction, IteratorMode, Result, RocksDB}; +use ckb_db_migration::{Migration, ProgressBar, ProgressStyle}; +use ckb_db_schema::{COLUMN_BLOCK_BODY, COLUMN_INDEX, COLUMN_NUMBER_HASH}; +use ckb_migration_template::multi_thread_migration; +use ckb_store::{ChainDB, ChainStore}; +use ckb_types::{molecule::io::Write, packed, prelude::*}; +use std::sync::Arc; +pub struct TestMigration; + +const VERSION: &str = "20231104000000"; + +impl Migration for TestMigration { + fn run_in_background(&self) -> bool { + true + } + + fn migrate( + &self, + db: RocksDB, + pb: Arc ProgressBar + Send + Sync>, + ) -> Result { + multi_thread_migration! { + { + for number in i * chunk_size..end { + let block_number: packed::Uint64 = number.pack(); + let raw_hash = chain_db.get(COLUMN_INDEX, block_number.as_slice()).expect("DB data integrity"); + let txs_len = chain_db.get_iter( + COLUMN_BLOCK_BODY, + IteratorMode::From(&raw_hash, Direction::Forward), + ) + .take_while(|(key, _)| key.starts_with(&raw_hash)) + .count(); + + let raw_txs_len: packed::Uint32 = (txs_len as u32).pack(); + + let mut raw_key = Vec::with_capacity(40); + raw_key.write_all(block_number.as_slice()).expect("write_all block_number"); + raw_key.write_all(&raw_hash).expect("write_all hash"); + let key = packed::NumberHash::new_unchecked(raw_key.into()); + + wb.put( + COLUMN_NUMBER_HASH, + key.as_slice(), + raw_txs_len.as_slice(), + ) + .expect("put number_hash"); + + if wb.len() > BATCH { + chain_db.write(&wb).expect("write db batch"); + wb.clear().unwrap(); + } + pbi.inc(1); + } + } + } + } + + fn version(&self) -> &str { + VERSION + } +} From 30f9bcec5cc507071b7f17e0d912aff24fb04c23 Mon Sep 17 00:00:00 2001 From: yukang Date: Fri, 10 Nov 2023 07:54:13 +0800 Subject: [PATCH 02/14] cleanup --- util/migrate/src/migrations/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/util/migrate/src/migrations/mod.rs b/util/migrate/src/migrations/mod.rs index e78bac582c..bc6b1e4338 100644 --- a/util/migrate/src/migrations/mod.rs +++ b/util/migrate/src/migrations/mod.rs @@ -5,8 +5,8 @@ mod add_chain_root_mmr; mod add_extra_data_hash; mod add_number_hash_mapping; mod cell; +mod set_2019_block_cycle_zero; mod table_to_struct; -mod test_migrate; pub use add_block_extension_cf::AddBlockExtensionColumnFamily; pub use add_block_filter::AddBlockFilterColumnFamily; @@ -15,5 +15,5 @@ pub use add_chain_root_mmr::AddChainRootMMR; pub use add_extra_data_hash::AddExtraDataHash; pub use add_number_hash_mapping::AddNumberHashMapping; pub use cell::CellMigration; +pub use set_2019_block_cycle_zero::BlockExt2019ToZero; pub use table_to_struct::ChangeMoleculeTableToStruct; -pub use test_migrate::TestMigration; From 0cb3600ecabe6419ec223d3474cb1e894cbe546f Mon Sep 17 00:00:00 2001 From: driftluo Date: Thu, 2 Nov 2023 11:30:42 +0800 Subject: [PATCH 03/14] fix: fix vm version select --- ckb-bin/src/subcommand/export.rs | 3 +- ckb-bin/src/subcommand/import.rs | 3 +- ckb-bin/src/subcommand/migrate.rs | 2 +- ckb-bin/src/subcommand/replay.rs | 8 ++--- ckb-bin/src/subcommand/stats.rs | 3 +- shared/src/shared_builder.rs | 32 ++++++++++++++----- spec/src/hardfork.rs | 6 +--- util/constant/src/hardfork/mainnet.rs | 3 +- util/constant/src/hardfork/testnet.rs | 3 +- util/launcher/src/lib.rs | 2 +- util/migrate/src/migrate.rs | 3 +- .../migrations/set_2019_block_cycle_zero.rs | 5 +-- util/migrate/src/tests.rs | 8 +++-- 13 files changed, 48 insertions(+), 33 deletions(-) diff --git a/ckb-bin/src/subcommand/export.rs b/ckb-bin/src/subcommand/export.rs index 04459d3f67..17f2d05acf 100644 --- a/ckb-bin/src/subcommand/export.rs +++ b/ckb-bin/src/subcommand/export.rs @@ -10,8 +10,9 @@ pub fn export(args: ExportArgs, async_handle: Handle) -> Result<(), ExitCode> { &args.config.db, None, async_handle, + args.consensus, )?; - let (shared, _) = builder.consensus(args.consensus).build()?; + let (shared, _) = builder.build()?; Export::new(shared, args.target).execute().map_err(|err| { eprintln!("Export error: {err:?}"); ExitCode::Failure diff --git a/ckb-bin/src/subcommand/import.rs b/ckb-bin/src/subcommand/import.rs index 8c03cfd7f9..d6fba348c3 100644 --- a/ckb-bin/src/subcommand/import.rs +++ b/ckb-bin/src/subcommand/import.rs @@ -11,8 +11,9 @@ pub fn import(args: ImportArgs, async_handle: Handle) -> Result<(), ExitCode> { &args.config.db, None, async_handle, + args.consensus, )?; - let (shared, mut pack) = builder.consensus(args.consensus).build()?; + let (shared, mut pack) = builder.build()?; let chain_service = ChainService::new(shared, pack.take_proposal_table()); let chain_controller = chain_service.start::<&str>(Some("ImportChainService")); diff --git a/ckb-bin/src/subcommand/migrate.rs b/ckb-bin/src/subcommand/migrate.rs index c62b4dd47a..6dc55dd39e 100644 --- a/ckb-bin/src/subcommand/migrate.rs +++ b/ckb-bin/src/subcommand/migrate.rs @@ -6,7 +6,7 @@ use std::cmp::Ordering; use crate::helper::prompt; pub fn migrate(args: MigrateArgs) -> Result<(), ExitCode> { - let migrate = Migrate::new(&args.config.db.path); + let migrate = Migrate::new(&args.config.db.path, args.consensus.hardfork_switch); { let read_only_db = migrate.open_read_only_db().map_err(|e| { diff --git a/ckb-bin/src/subcommand/replay.rs b/ckb-bin/src/subcommand/replay.rs index 4f524f47db..ac7da08fb2 100644 --- a/ckb-bin/src/subcommand/replay.rs +++ b/ckb-bin/src/subcommand/replay.rs @@ -17,9 +17,9 @@ pub fn replay(args: ReplayArgs, async_handle: Handle) -> Result<(), ExitCode> { &args.config.db, None, async_handle.clone(), + args.consensus.clone(), )?; let (shared, _) = shared_builder - .consensus(args.consensus.clone()) .tx_pool_config(args.config.tx_pool.clone()) .build()?; @@ -44,11 +44,9 @@ pub fn replay(args: ReplayArgs, async_handle: Handle) -> Result<(), ExitCode> { &tmp_db_config, None, async_handle, + args.consensus, )?; - let (tmp_shared, mut pack) = shared_builder - .consensus(args.consensus) - .tx_pool_config(args.config.tx_pool) - .build()?; + let (tmp_shared, mut pack) = shared_builder.tx_pool_config(args.config.tx_pool).build()?; let chain = ChainService::new(tmp_shared, pack.take_proposal_table()); if let Some((from, to)) = args.profile { diff --git a/ckb-bin/src/subcommand/stats.rs b/ckb-bin/src/subcommand/stats.rs index e0edc73fc1..ace9e3084b 100644 --- a/ckb-bin/src/subcommand/stats.rs +++ b/ckb-bin/src/subcommand/stats.rs @@ -32,8 +32,9 @@ impl Statics { &args.config.db, None, async_handle, + args.consensus, )?; - let (shared, _) = shared_builder.consensus(args.consensus).build()?; + let (shared, _) = shared_builder.build()?; let tip_number = shared.snapshot().tip_number(); diff --git a/shared/src/shared_builder.rs b/shared/src/shared_builder.rs index f3563295a5..161cc8a14d 100644 --- a/shared/src/shared_builder.rs +++ b/shared/src/shared_builder.rs @@ -21,10 +21,19 @@ use ckb_db_schema::COLUMNS; use ckb_error::{Error, InternalErrorKind}; use ckb_logger::{error, info}; use ckb_migrate::migrate::Migrate; -use ckb_notify::{NotifyController, NotifyService}; +use ckb_notify::{NotifyController, NotifyService, PoolTransactionEntry}; +use ckb_shared::Shared; use ckb_store::{ChainDB, ChainStore, Freezer}; use ckb_types::core::service::PoolTransactionEntry; use ckb_types::core::tx_pool::Reject; + +use ckb_store::ChainDB; +use ckb_store::ChainStore; +use ckb_tx_pool::{ + error::Reject, service::TxVerificationResult, TokioRwLock, TxEntry, TxPool, + TxPoolServiceBuilder, +}; +use ckb_types::core::hardfork::HardForks; use ckb_types::core::EpochExt; use ckb_types::core::HeaderView; use ckb_verification::cache::init_cache; @@ -38,7 +47,7 @@ use tempfile::TempDir; pub struct SharedBuilder { db: RocksDB, ancient_path: Option, - consensus: Option, + consensus: Consensus, tx_pool_config: Option, store_config: Option, block_assembler_config: Option, @@ -51,8 +60,9 @@ pub fn open_or_create_db( bin_name: &str, root_dir: &Path, config: &DBConfig, + hardforks: HardForks, ) -> Result { - let migrate = Migrate::new(&config.path); + let migrate = Migrate::new(&config.path, hardforks); let read_only_db = migrate.open_read_only_db().map_err(|e| { eprintln!("Migration error {e}"); @@ -128,13 +138,19 @@ impl SharedBuilder { db_config: &DBConfig, ancient: Option, async_handle: Handle, + consensus: Consensus, ) -> Result { - let db = open_or_create_db(bin_name, root_dir, db_config)?; + let db = open_or_create_db( + bin_name, + root_dir, + db_config, + consensus.hardfork_switch.clone(), + )?; Ok(SharedBuilder { db, ancient_path: ancient, - consensus: None, + consensus, tx_pool_config: None, notify_config: None, store_config: None, @@ -179,7 +195,7 @@ impl SharedBuilder { RUNTIME_HANDLE.with(|runtime| SharedBuilder { db, ancient_path: None, - consensus: None, + consensus: Consensus::default(), tx_pool_config: None, notify_config: None, store_config: None, @@ -192,7 +208,7 @@ impl SharedBuilder { impl SharedBuilder { /// TODO(doc): @quake pub fn consensus(mut self, value: Consensus) -> Self { - self.consensus = Some(value); + self.consensus = value; self } @@ -325,7 +341,7 @@ impl SharedBuilder { let tx_pool_config = tx_pool_config.unwrap_or_default(); let notify_config = notify_config.unwrap_or_default(); let store_config = store_config.unwrap_or_default(); - let consensus = Arc::new(consensus.unwrap_or_default()); + let consensus = Arc::new(consensus); let notify_controller = start_notify_service(notify_config, async_handle.clone()); diff --git a/spec/src/hardfork.rs b/spec/src/hardfork.rs index 6d304025b1..653f5ebf80 100644 --- a/spec/src/hardfork.rs +++ b/spec/src/hardfork.rs @@ -20,11 +20,7 @@ impl HardForkConfig { /// sets all `None` to default values, otherwise, return an `Err`. pub fn complete_mainnet(&self) -> Result { let mut ckb2021 = CKB2021::new_builder(); - ckb2021 = self.update_2021( - ckb2021, - mainnet::CKB2021_START_EPOCH, - mainnet::RFC0028_RFC0032_RFC0033_RFC0034_START_EPOCH, - )?; + ckb2021 = self.update_2021(ckb2021, mainnet::CKB2021_START_EPOCH)?; Ok(HardForks { ckb2021: ckb2021.build()?, diff --git a/util/constant/src/hardfork/mainnet.rs b/util/constant/src/hardfork/mainnet.rs index a3f028a452..0d7572bd15 100644 --- a/util/constant/src/hardfork/mainnet.rs +++ b/util/constant/src/hardfork/mainnet.rs @@ -4,8 +4,7 @@ pub const CHAIN_SPEC_NAME: &str = "ckb"; /// hardcode rfc0028/rfc0032/rfc0033/rfc0034 epoch pub const RFC0028_RFC0032_RFC0033_RFC0034_START_EPOCH: u64 = 5414; /// First epoch number for CKB v2021, at about 2022/05/10 1:00 UTC -// pub const CKB2021_START_EPOCH: u64 = 5414; -pub const CKB2021_START_EPOCH: u64 = 0; +pub const CKB2021_START_EPOCH: u64 = 5414; /// hardcode ckb2023 epoch pub const CKB2023_START_EPOCH: u64 = u64::MAX; diff --git a/util/constant/src/hardfork/testnet.rs b/util/constant/src/hardfork/testnet.rs index 6f2b061368..16d77f610c 100644 --- a/util/constant/src/hardfork/testnet.rs +++ b/util/constant/src/hardfork/testnet.rs @@ -4,8 +4,7 @@ pub const CHAIN_SPEC_NAME: &str = "ckb_testnet"; /// hardcode rfc0028/rfc0032/rfc0033/rfc0034 epoch pub const RFC0028_RFC0032_RFC0033_RFC0034_START_EPOCH: u64 = 3113; /// First epoch number for CKB v2021, at about 2021/10/24 3:15 UTC. -// pub const CKB2021_START_EPOCH: u64 = 3113; -pub const CKB2021_START_EPOCH: u64 = 0; +pub const CKB2021_START_EPOCH: u64 = 3113; /// hardcode ckb2023 epoch pub const CKB2023_START_EPOCH: u64 = u64::MAX; diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index f00dd801aa..db3af82673 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -195,10 +195,10 @@ impl Launcher { &self.args.config.db, Some(self.args.config.ancient.clone()), self.async_handle.clone(), + self.args.consensus.clone(), )?; let (shared, pack) = shared_builder - .consensus(self.args.consensus.clone()) .tx_pool_config(self.args.config.tx_pool.clone()) .notify_config(self.args.config.notify.clone()) .store_config(self.args.config.store) diff --git a/util/migrate/src/migrate.rs b/util/migrate/src/migrate.rs index 578d5801a0..2dd64bdd0e 100644 --- a/util/migrate/src/migrate.rs +++ b/util/migrate/src/migrate.rs @@ -5,6 +5,7 @@ use ckb_db::{ReadOnlyDB, RocksDB}; use ckb_db_migration::{DefaultMigration, Migrations}; use ckb_db_schema::{COLUMNS, COLUMN_META}; use ckb_error::Error; +use ckb_types::core::hardfork::HardForks; use std::cmp::Ordering; use std::path::PathBuf; use std::sync::Arc; @@ -19,7 +20,7 @@ pub struct Migrate { impl Migrate { /// Construct new migrate - pub fn new>(path: P) -> Self { + pub fn new>(path: P, hardforks: HardForks) -> Self { let mut migrations = Migrations::default(); migrations.add_migration(Arc::new(DefaultMigration::new(INIT_DB_VERSION))); migrations.add_migration(Arc::new(migrations::ChangeMoleculeTableToStruct)); // since v0.35.0 diff --git a/util/migrate/src/migrations/set_2019_block_cycle_zero.rs b/util/migrate/src/migrations/set_2019_block_cycle_zero.rs index 1b25e4a3c9..63a337b9cf 100644 --- a/util/migrate/src/migrations/set_2019_block_cycle_zero.rs +++ b/util/migrate/src/migrations/set_2019_block_cycle_zero.rs @@ -1,5 +1,8 @@ use ckb_app_config::StoreConfig; +<<<<<<< HEAD //use ckb_db_migration::SHUTDOWN_BACKGROUND_MIGRATION; +======= +>>>>>>> bd4c28d58 (fix: fix vm version select) use ckb_db_migration::{Migration, ProgressBar, ProgressStyle}; use ckb_db_schema::COLUMN_EPOCH; use ckb_store::{ChainDB, ChainStore}; @@ -38,9 +41,7 @@ impl Migration for BlockExt2019ToZero { return Ok(chain_db.into_inner()); } - eprintln!("now limit epoch is {}", limit_epoch); let epoch_number: packed::Uint64 = limit_epoch.pack(); - if let Some(epoch_hash) = chain_db.get(COLUMN_EPOCH, epoch_number.as_slice()) { let epoch_ext = chain_db .get_epoch_ext( diff --git a/util/migrate/src/tests.rs b/util/migrate/src/tests.rs index ba68c542d6..73ec9da409 100644 --- a/util/migrate/src/tests.rs +++ b/util/migrate/src/tests.rs @@ -9,7 +9,9 @@ use ckb_db_schema::{ }; use ckb_systemtime::unix_time_as_millis; use ckb_types::{ - core::{capacity_bytes, BlockBuilder, BlockExt, Capacity, TransactionBuilder}, + core::{ + capacity_bytes, hardfork::HardForks, BlockBuilder, BlockExt, Capacity, TransactionBuilder, + }, packed::{self, Bytes}, prelude::*, utilities::DIFF_TWO, @@ -149,13 +151,13 @@ fn test_mock_migration() { drop(db_txn); drop(db); - let mg = Migrate::new(tmp_dir.as_ref().to_path_buf()); + let mg = Migrate::new(tmp_dir.as_ref().to_path_buf(), HardForks::new_mirana()); let db = mg.open_bulk_load_db().unwrap().unwrap(); mg.migrate(db).unwrap(); - let mg2 = Migrate::new(tmp_dir.as_ref().to_path_buf()); + let mg2 = Migrate::new(tmp_dir.as_ref().to_path_buf(), HardForks::new_mirana()); let rdb = mg2.open_read_only_db().unwrap().unwrap(); From 32e0413d7d94a4e8d4e28fb26a0a0138f1805483 Mon Sep 17 00:00:00 2001 From: yukang Date: Fri, 10 Nov 2023 07:58:04 +0800 Subject: [PATCH 04/14] fix conflicts --- spec/src/hardfork.rs | 6 +++++- util/migrate/src/migrations/set_2019_block_cycle_zero.rs | 4 ---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/spec/src/hardfork.rs b/spec/src/hardfork.rs index 653f5ebf80..6d304025b1 100644 --- a/spec/src/hardfork.rs +++ b/spec/src/hardfork.rs @@ -20,7 +20,11 @@ impl HardForkConfig { /// sets all `None` to default values, otherwise, return an `Err`. pub fn complete_mainnet(&self) -> Result { let mut ckb2021 = CKB2021::new_builder(); - ckb2021 = self.update_2021(ckb2021, mainnet::CKB2021_START_EPOCH)?; + ckb2021 = self.update_2021( + ckb2021, + mainnet::CKB2021_START_EPOCH, + mainnet::RFC0028_RFC0032_RFC0033_RFC0034_START_EPOCH, + )?; Ok(HardForks { ckb2021: ckb2021.build()?, diff --git a/util/migrate/src/migrations/set_2019_block_cycle_zero.rs b/util/migrate/src/migrations/set_2019_block_cycle_zero.rs index 63a337b9cf..1b733578d4 100644 --- a/util/migrate/src/migrations/set_2019_block_cycle_zero.rs +++ b/util/migrate/src/migrations/set_2019_block_cycle_zero.rs @@ -1,8 +1,4 @@ use ckb_app_config::StoreConfig; -<<<<<<< HEAD -//use ckb_db_migration::SHUTDOWN_BACKGROUND_MIGRATION; -======= ->>>>>>> bd4c28d58 (fix: fix vm version select) use ckb_db_migration::{Migration, ProgressBar, ProgressStyle}; use ckb_db_schema::COLUMN_EPOCH; use ckb_store::{ChainDB, ChainStore}; From 8b1a4a93ca4c44d0dd7e79bb2a1e303902b9dcc7 Mon Sep 17 00:00:00 2001 From: yukang Date: Wed, 15 Nov 2023 12:17:18 +0800 Subject: [PATCH 05/14] code refactor and cleanup --- ckb-bin/src/subcommand/migrate.rs | 2 +- db-migration/src/lib.rs | 87 +++++++++---------------------- db-migration/src/tests.rs | 24 +++++---- shared/src/shared_builder.rs | 4 +- util/migrate/src/migrate.rs | 8 +-- 5 files changed, 45 insertions(+), 80 deletions(-) diff --git a/ckb-bin/src/subcommand/migrate.rs b/ckb-bin/src/subcommand/migrate.rs index 6dc55dd39e..e39e27a188 100644 --- a/ckb-bin/src/subcommand/migrate.rs +++ b/ckb-bin/src/subcommand/migrate.rs @@ -67,7 +67,7 @@ pub fn migrate(args: MigrateArgs) -> Result<(), ExitCode> { })?; if let Some(db) = bulk_load_db_db { - migrate.migrate(db).map_err(|err| { + migrate.migrate(db, false).map_err(|err| { eprintln!("Run error: {err:?}"); ExitCode::Failure })?; diff --git a/db-migration/src/lib.rs b/db-migration/src/lib.rs index 585e4e2c2a..e82995ab9a 100644 --- a/db-migration/src/lib.rs +++ b/db-migration/src/lib.rs @@ -10,8 +10,6 @@ pub use indicatif::{HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarge use std::cmp::Ordering; use std::collections::BTreeMap; use std::collections::VecDeque; -use std::fs::OpenOptions; -use std::io::Write; use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::sync::Mutex; @@ -70,7 +68,7 @@ impl MigrationWorker { let db = self.db.clone(); let pb = move |_count: u64| -> ProgressBar { ProgressBar::new(0) }; if let Some((name, task)) = self.tasks.lock().unwrap().pop_front() { - eprintln!("start to run migrate: {}", name); + eprintln!("start to run migrate in background: {}", name); let db = task.migrate(db, Arc::new(pb)).unwrap(); db.put_default(MIGRATION_VERSION_KEY, task.version()) .map_err(|err| { @@ -241,7 +239,7 @@ impl Migrations { thread::spawn(move || { let _ = exit_signal.recv(); SHUTDOWN_BACKGROUND_MIGRATION.store(true, std::sync::atomic::Ordering::SeqCst); - eprintln!("set shutdown flat to true"); + eprintln!("set shutdown flag to true"); }); let handler = worker.start(); @@ -280,57 +278,18 @@ impl Migrations { } /// TODO(doc): @quake - pub fn migrate(&self, db: RocksDB) -> Result { + pub fn migrate(&self, db: RocksDB, run_in_background: bool) -> Result { let db_version = self.get_migration_version(&db)?; match db_version { Some(ref v) => { info!("Current database version {}", v); - if let Some(m) = self.migrations.values().last() { - if m.version() < v.as_str() { - error!( - "Database downgrade detected. \ - The database schema version is more recent than the client schema version.\ - Please upgrade to the latest client version." - ); - return Err(internal_error( - "Database downgrade is not supported".to_string(), - )); - } - } - - let db = self.run_migrate(db, v.as_str())?; - Ok(db) - } - None => { - // if version is none, but db is not empty - // patch 220464f - if self.is_non_empty_db(&db) { - return self.patch_220464f(db); - } - Ok(db) - } - } - } - - /// TODO(doc): @quake - pub fn migrate_async(&self, db: RocksDB) -> Result { - let db_version = self.get_migration_version(&db)?; - match db_version { - Some(ref v) => { - info!("Current database version {}", v); - if let Some(m) = self.migrations.values().last() { - if m.version() < v.as_str() { - error!( - "Database downgrade detected. \ - The database schema version is newer than client schema version,\ - please upgrade to the newer version" - ); - return Err(internal_error( - "Database downgrade is not supported".to_string(), - )); - } - } - self.run_migrate_async(db.clone(), v.as_str()); + self.check_migration_downgrade(v)?; + let db = if !run_in_background { + self.run_migrate(db, v.as_str())? + } else { + self.run_migrate_async(db.clone(), v.as_str()); + db + }; Ok(db) } None => { @@ -348,6 +307,22 @@ impl Migrations { const V: &str = "20210609195048"; // AddExtraDataHash - 1 self.run_migrate(db, V) } + + fn check_migration_downgrade(&self, cur_version: &str) -> Result<(), Error> { + if let Some(m) = self.migrations.values().last() { + if m.version() < cur_version { + error!( + "Database downgrade detected. \ + The database schema version is newer than `ckb` schema version,\ + please upgrade `ckb` to the latest version" + ); + return Err(internal_error( + "Database downgrade is not supported".to_string(), + )); + } + } + Ok(()) + } } /// TODO(doc): @quake @@ -424,13 +399,3 @@ impl Migration for DefaultMigration { false } } - -pub fn append_to_file(path: &str, data: &str) -> std::io::Result<()> { - let mut file = OpenOptions::new() - .write(true) - .append(true) - .create(true) - .open(path)?; - - writeln!(file, "{}", data) -} diff --git a/db-migration/src/tests.rs b/db-migration/src/tests.rs index 9d52ea661a..a872ffc819 100644 --- a/db-migration/src/tests.rs +++ b/db-migration/src/tests.rs @@ -19,10 +19,10 @@ fn test_default_migration() { }; { let mut migrations = Migrations::default(); - migrations.add_migration(Box::new(DefaultMigration::new("20191116225943"))); + migrations.add_migration(Arc::new(DefaultMigration::new("20191116225943"))); let db = RocksDB::open(&config, 1); migrations.init_db_version(&db).unwrap(); - let r = migrations.migrate(db).unwrap(); + let r = migrations.migrate(db, false).unwrap(); assert_eq!( b"20191116225943".to_vec(), r.get_pinned_default(MIGRATION_VERSION_KEY) @@ -33,9 +33,11 @@ fn test_default_migration() { } { let mut migrations = Migrations::default(); - migrations.add_migration(Box::new(DefaultMigration::new("20191116225943"))); - migrations.add_migration(Box::new(DefaultMigration::new("20191127101121"))); - let r = migrations.migrate(RocksDB::open(&config, 1)).unwrap(); + migrations.add_migration(Arc::new(DefaultMigration::new("20191116225943"))); + migrations.add_migration(Arc::new(DefaultMigration::new("20191127101121"))); + let r = migrations + .migrate(RocksDB::open(&config, 1), false) + .unwrap(); assert_eq!( b"20191127101121".to_vec(), r.get_pinned_default(MIGRATION_VERSION_KEY) @@ -87,10 +89,10 @@ fn test_customized_migration() { { let mut migrations = Migrations::default(); - migrations.add_migration(Box::new(DefaultMigration::new("20191116225943"))); + migrations.add_migration(Arc::new(DefaultMigration::new("20191116225943"))); let db = RocksDB::open(&config, 1); migrations.init_db_version(&db).unwrap(); - let db = migrations.migrate(db).unwrap(); + let db = migrations.migrate(db, false).unwrap(); let txn = db.transaction(); txn.put(COLUMN, &[1, 1], &[1, 1, 1]).unwrap(); @@ -99,9 +101,11 @@ fn test_customized_migration() { } { let mut migrations = Migrations::default(); - migrations.add_migration(Box::new(DefaultMigration::new("20191116225943"))); - migrations.add_migration(Box::new(CustomizedMigration)); - let db = migrations.migrate(RocksDB::open(&config, 1)).unwrap(); + migrations.add_migration(Arc::new(DefaultMigration::new("20191116225943"))); + migrations.add_migration(Arc::new(CustomizedMigration)); + let db = migrations + .migrate(RocksDB::open(&config, 1), false) + .unwrap(); assert!( vec![1u8, 1, 1, 1].as_slice() == db.get_pinned(COLUMN, &[1, 1]).unwrap().unwrap().as_ref() diff --git a/shared/src/shared_builder.rs b/shared/src/shared_builder.rs index 161cc8a14d..76a8b999b0 100644 --- a/shared/src/shared_builder.rs +++ b/shared/src/shared_builder.rs @@ -96,7 +96,7 @@ pub fn open_or_create_db( } else if can_run_in_background { info!("process migrations in background ..."); let db = RocksDB::open(config, COLUMNS); - migrate.migrate_async(db.clone()).map_err(|err| { + migrate.migrate(db.clone(), true).map_err(|err| { eprintln!("Run error: {err:?}"); ExitCode::Failure })?; @@ -110,7 +110,7 @@ pub fn open_or_create_db( })?; if let Some(db) = bulk_load_db_db { - migrate.migrate(db).map_err(|err| { + migrate.migrate(db, false).map_err(|err| { eprintln!("Run error: {err:?}"); ExitCode::Failure })?; diff --git a/util/migrate/src/migrate.rs b/util/migrate/src/migrate.rs index 2dd64bdd0e..03ba319db6 100644 --- a/util/migrate/src/migrate.rs +++ b/util/migrate/src/migrate.rs @@ -72,12 +72,8 @@ impl Migrate { } /// Perform migrate. - pub fn migrate(self, db: RocksDB) -> Result { - self.migrations.migrate(db) - } - - pub fn migrate_async(self, db: RocksDB) -> Result { - self.migrations.migrate_async(db) + pub fn migrate(self, db: RocksDB, run_in_background: bool) -> Result { + self.migrations.migrate(db, run_in_background) } /// Perform init_db_version. From d93a94fe570e264f894cf1e51f5a6b84f6aec92a Mon Sep 17 00:00:00 2001 From: yukang Date: Wed, 15 Nov 2023 17:10:48 +0800 Subject: [PATCH 06/14] Fix clippy --- db-migration/src/lib.rs | 14 +++++++------- util/migrate/src/migrate.rs | 3 ++- util/migrate/src/tests.rs | 2 +- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/db-migration/src/lib.rs b/db-migration/src/lib.rs index e82995ab9a..efe0fbfce4 100644 --- a/db-migration/src/lib.rs +++ b/db-migration/src/lib.rs @@ -16,6 +16,7 @@ use std::sync::Mutex; use std::thread; use std::thread::JoinHandle; +/// Shutdown flag for background migration. pub static SHUTDOWN_BACKGROUND_MIGRATION: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| AtomicBool::new(false)); @@ -39,18 +40,15 @@ enum Command { //Stop, } +type MigrationTasks = VecDeque<(String, Arc)>; struct MigrationWorker { - tasks: Arc)>>>, + tasks: Arc>, db: RocksDB, inbox: Receiver, } impl MigrationWorker { - pub fn new( - tasks: Arc)>>>, - db: RocksDB, - inbox: Receiver, - ) -> Self { + pub fn new(tasks: Arc>, db: RocksDB, inbox: Receiver) -> Self { Self { tasks, db, inbox } } @@ -159,7 +157,8 @@ impl Migrations { .any(|m| m.expensive()) } - pub fn run_in_background(&self, db: &ReadOnlyDB) -> bool { + /// Check if all the pending migrations will be executed in background. + pub fn can_run_in_background(&self, db: &ReadOnlyDB) -> bool { let db_version = match db .get_pinned_default(MIGRATION_VERSION_KEY) .expect("get the version of database") @@ -344,6 +343,7 @@ pub trait Migration: Send + Sync { true } + /// Will this migration be executed in background. fn run_in_background(&self) -> bool { false } diff --git a/util/migrate/src/migrate.rs b/util/migrate/src/migrate.rs index 03ba319db6..35d3c3a880 100644 --- a/util/migrate/src/migrate.rs +++ b/util/migrate/src/migrate.rs @@ -62,8 +62,9 @@ impl Migrate { self.migrations.expensive(db) } + /// Check whether the pending migrations are all background migrations. pub fn can_run_in_background(&self, db: &ReadOnlyDB) -> bool { - self.migrations.run_in_background(db) + self.migrations.can_run_in_background(db) } /// Open bulk load db. diff --git a/util/migrate/src/tests.rs b/util/migrate/src/tests.rs index 73ec9da409..1d48487a01 100644 --- a/util/migrate/src/tests.rs +++ b/util/migrate/src/tests.rs @@ -155,7 +155,7 @@ fn test_mock_migration() { let db = mg.open_bulk_load_db().unwrap().unwrap(); - mg.migrate(db).unwrap(); + mg.migrate(db, false).unwrap(); let mg2 = Migrate::new(tmp_dir.as_ref().to_path_buf(), HardForks::new_mirana()); From 5446bc79d0a275071241eeee0521c46373815b8b Mon Sep 17 00:00:00 2001 From: yukang Date: Thu, 16 Nov 2023 12:11:56 +0800 Subject: [PATCH 07/14] add UI test for background migrations --- db-migration/src/tests.rs | 115 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/db-migration/src/tests.rs b/db-migration/src/tests.rs index a872ffc819..d81d79a0ea 100644 --- a/db-migration/src/tests.rs +++ b/db-migration/src/tests.rs @@ -1,4 +1,5 @@ use ckb_app_config::DBConfig; +use ckb_db::ReadOnlyDB; use ckb_db::RocksDB; use ckb_db_schema::MIGRATION_VERSION_KEY; use ckb_error::Error; @@ -124,3 +125,117 @@ fn test_customized_migration() { ); } } + +#[test] +fn test_background_migration() { + pub struct BackgroundMigration { + version: String, + } + + impl BackgroundMigration { + pub fn new(version: &str) -> Self { + BackgroundMigration { + version: version.to_string(), + } + } + } + + impl Migration for BackgroundMigration { + fn run_in_background(&self) -> bool { + true + } + + fn migrate( + &self, + db: RocksDB, + _pb: Arc ProgressBar + Send + Sync>, + ) -> Result { + let db_tx = db.transaction(); + let v = self.version.as_bytes(); + db_tx.put("1", v, &[1])?; + db_tx.commit()?; + Ok(db) + } + + fn version(&self) -> &str { + self.version.as_str() + } + } + + let tmp_dir = tempfile::Builder::new() + .prefix("test_default_migration") + .tempdir() + .unwrap(); + let config = DBConfig { + path: tmp_dir.as_ref().to_path_buf(), + ..Default::default() + }; + { + let mut migrations = Migrations::default(); + migrations.add_migration(Arc::new(DefaultMigration::new("20191116225943"))); + let db = RocksDB::open(&config, 12); + migrations.init_db_version(&db).unwrap(); + let r = migrations.migrate(db, false).unwrap(); + assert_eq!( + b"20191116225943".to_vec(), + r.get_pinned_default(MIGRATION_VERSION_KEY) + .unwrap() + .unwrap() + .to_vec() + ); + } + { + let mut migrations = Migrations::default(); + migrations.add_migration(Arc::new(DefaultMigration::new("20191116225943"))); + migrations.add_migration(Arc::new(BackgroundMigration::new("20231127101121"))); + + let db = ReadOnlyDB::open_cf(&config.path, vec!["4"]) + .unwrap() + .unwrap(); + + assert!(migrations.can_run_in_background(&db)); + migrations.add_migration(Arc::new(DefaultMigration::new("20191127101121"))); + assert!(!migrations.can_run_in_background(&db)); + } + + { + let mut migrations = Migrations::default(); + migrations.add_migration(Arc::new(DefaultMigration::new("20191116225943"))); + migrations.add_migration(Arc::new(BackgroundMigration::new("20231127101121"))); + migrations.add_migration(Arc::new(BackgroundMigration::new("20241127101122"))); + + let db = ReadOnlyDB::open_cf(&config.path, vec!["4"]) + .unwrap() + .unwrap(); + + assert!(migrations.can_run_in_background(&db)); + let db = migrations + .migrate(RocksDB::open(&config, 12), true) + .unwrap(); + // sleep 1 seconds + std::thread::sleep(std::time::Duration::from_secs(1)); + assert_eq!( + b"20241127101122".to_vec(), + db.get_pinned_default(MIGRATION_VERSION_KEY) + .unwrap() + .unwrap() + .to_vec() + ); + + // confirm the background migration is executed + let db_tx = db.transaction(); + let v = db_tx + .get_pinned("1", "20231127101121".as_bytes()) + .unwrap() + .unwrap() + .to_vec(); + assert_eq!(v, vec![1]); + + let v = db_tx + .get_pinned("1", "20241127101122".as_bytes()) + .unwrap() + .unwrap() + .to_vec(); + assert_eq!(v, vec![1]); + } +} From 43834d59ca6c9bfcbddceb85725245748a39f83e Mon Sep 17 00:00:00 2001 From: yukang Date: Mon, 20 Nov 2023 15:21:00 +0800 Subject: [PATCH 08/14] fix BlockExt2019ToZero to check tip header --- db-migration/src/lib.rs | 36 ++++++++++--------- .../migrations/set_2019_block_cycle_zero.rs | 25 ++++++++++--- 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/db-migration/src/lib.rs b/db-migration/src/lib.rs index efe0fbfce4..fd8604731d 100644 --- a/db-migration/src/lib.rs +++ b/db-migration/src/lib.rs @@ -60,22 +60,26 @@ impl MigrationWorker { }; if let Some(Command::Start) = msg { - // Progress Bar is no need in background, but here we fake one to keep the trait API - // consistent with the foreground migration. - loop { - let db = self.db.clone(); - let pb = move |_count: u64| -> ProgressBar { ProgressBar::new(0) }; - if let Some((name, task)) = self.tasks.lock().unwrap().pop_front() { - eprintln!("start to run migrate in background: {}", name); - let db = task.migrate(db, Arc::new(pb)).unwrap(); - db.put_default(MIGRATION_VERSION_KEY, task.version()) - .map_err(|err| { - internal_error(format!("failed to migrate the database: {err}")) - }) - .unwrap(); - } else { - break; - } + let mut idx = 0; + let migrations_count = self.tasks.lock().unwrap().len() as u64; + let mpb = Arc::new(MultiProgress::new()); + + while let Some((name, task)) = self.tasks.lock().unwrap().pop_front() { + eprintln!("start to run migrate in background: {}", name); + let mpbc = Arc::clone(&mpb); + idx += 1; + let pb = move |count: u64| -> ProgressBar { + let pb = mpbc.add(ProgressBar::new(count)); + pb.set_draw_target(ProgressDrawTarget::term(Term::stdout(), None)); + pb.set_prefix(format!("[{}/{}]", idx, migrations_count)); + pb + }; + let db = task.migrate(self.db.clone(), Arc::new(pb)).unwrap(); + db.put_default(MIGRATION_VERSION_KEY, task.version()) + .map_err(|err| { + internal_error(format!("failed to migrate the database: {err}")) + }) + .unwrap(); } } }) diff --git a/util/migrate/src/migrations/set_2019_block_cycle_zero.rs b/util/migrate/src/migrations/set_2019_block_cycle_zero.rs index 1b733578d4..9db9c3e5cd 100644 --- a/util/migrate/src/migrations/set_2019_block_cycle_zero.rs +++ b/util/migrate/src/migrations/set_2019_block_cycle_zero.rs @@ -32,22 +32,38 @@ impl Migration for BlockExt2019ToZero { let chain_db = ChainDB::new(db, StoreConfig::default()); let limit_epoch = self.hardforks.ckb2021.rfc_0032(); - eprintln!("begin to run block_ext 2019 to zero migrate..."); + eprintln!( + "begin to run block_ext 2019 to zero migrate...: {}", + limit_epoch + ); + if limit_epoch == 0 { return Ok(chain_db.into_inner()); } - let epoch_number: packed::Uint64 = limit_epoch.pack(); - if let Some(epoch_hash) = chain_db.get(COLUMN_EPOCH, epoch_number.as_slice()) { + let hard_fork_epoch_number: packed::Uint64 = limit_epoch.pack(); + let tip_header = chain_db.get_tip_header().expect("db must have tip header"); + let tip_epoch_number = tip_header.epoch().pack(); + + let header = if tip_epoch_number < hard_fork_epoch_number { + Some(tip_header) + } else if let Some(epoch_hash) = + chain_db.get(COLUMN_EPOCH, hard_fork_epoch_number.as_slice()) + { let epoch_ext = chain_db .get_epoch_ext( &packed::Byte32Reader::from_slice_should_be_ok(epoch_hash.as_ref()).to_entity(), ) .expect("db must have epoch ext"); - let mut header = chain_db + let header = chain_db .get_block_header(&epoch_ext.last_block_hash_in_previous_epoch()) .expect("db must have header"); + Some(header) + } else { + None + }; + if let Some(mut header) = header { let pb = ::std::sync::Arc::clone(&pb); let pbi = pb(header.number() + 1); pbi.set_style( @@ -64,7 +80,6 @@ impl Migration for BlockExt2019ToZero { let db_txn = chain_db.begin_transaction(); for _ in 0..10000 { let hash = header.hash(); - let mut old_block_ext = db_txn.get_block_ext(&hash).unwrap(); old_block_ext.cycles = None; db_txn.insert_block_ext(&hash, &old_block_ext)?; From 5e74a71847fc5fcfd75450eeff934441501b8b99 Mon Sep 17 00:00:00 2001 From: yukang Date: Tue, 21 Nov 2023 17:35:05 +0800 Subject: [PATCH 09/14] fix conflicts --- shared/src/shared_builder.rs | 12 +--- util/migrate/src/migrations/test_migrate.rs | 62 --------------------- 2 files changed, 2 insertions(+), 72 deletions(-) delete mode 100644 util/migrate/src/migrations/test_migrate.rs diff --git a/shared/src/shared_builder.rs b/shared/src/shared_builder.rs index 76a8b999b0..680653ec5b 100644 --- a/shared/src/shared_builder.rs +++ b/shared/src/shared_builder.rs @@ -21,19 +21,11 @@ use ckb_db_schema::COLUMNS; use ckb_error::{Error, InternalErrorKind}; use ckb_logger::{error, info}; use ckb_migrate::migrate::Migrate; -use ckb_notify::{NotifyController, NotifyService, PoolTransactionEntry}; -use ckb_shared::Shared; +use ckb_notify::{NotifyController, NotifyService}; use ckb_store::{ChainDB, ChainStore, Freezer}; +use ckb_types::core::hardfork::HardForks; use ckb_types::core::service::PoolTransactionEntry; use ckb_types::core::tx_pool::Reject; - -use ckb_store::ChainDB; -use ckb_store::ChainStore; -use ckb_tx_pool::{ - error::Reject, service::TxVerificationResult, TokioRwLock, TxEntry, TxPool, - TxPoolServiceBuilder, -}; -use ckb_types::core::hardfork::HardForks; use ckb_types::core::EpochExt; use ckb_types::core::HeaderView; use ckb_verification::cache::init_cache; diff --git a/util/migrate/src/migrations/test_migrate.rs b/util/migrate/src/migrations/test_migrate.rs deleted file mode 100644 index df56f6bee2..0000000000 --- a/util/migrate/src/migrations/test_migrate.rs +++ /dev/null @@ -1,62 +0,0 @@ -use ckb_app_config::StoreConfig; -use ckb_db::{Direction, IteratorMode, Result, RocksDB}; -use ckb_db_migration::{Migration, ProgressBar, ProgressStyle}; -use ckb_db_schema::{COLUMN_BLOCK_BODY, COLUMN_INDEX, COLUMN_NUMBER_HASH}; -use ckb_migration_template::multi_thread_migration; -use ckb_store::{ChainDB, ChainStore}; -use ckb_types::{molecule::io::Write, packed, prelude::*}; -use std::sync::Arc; -pub struct TestMigration; - -const VERSION: &str = "20231104000000"; - -impl Migration for TestMigration { - fn run_in_background(&self) -> bool { - true - } - - fn migrate( - &self, - db: RocksDB, - pb: Arc ProgressBar + Send + Sync>, - ) -> Result { - multi_thread_migration! { - { - for number in i * chunk_size..end { - let block_number: packed::Uint64 = number.pack(); - let raw_hash = chain_db.get(COLUMN_INDEX, block_number.as_slice()).expect("DB data integrity"); - let txs_len = chain_db.get_iter( - COLUMN_BLOCK_BODY, - IteratorMode::From(&raw_hash, Direction::Forward), - ) - .take_while(|(key, _)| key.starts_with(&raw_hash)) - .count(); - - let raw_txs_len: packed::Uint32 = (txs_len as u32).pack(); - - let mut raw_key = Vec::with_capacity(40); - raw_key.write_all(block_number.as_slice()).expect("write_all block_number"); - raw_key.write_all(&raw_hash).expect("write_all hash"); - let key = packed::NumberHash::new_unchecked(raw_key.into()); - - wb.put( - COLUMN_NUMBER_HASH, - key.as_slice(), - raw_txs_len.as_slice(), - ) - .expect("put number_hash"); - - if wb.len() > BATCH { - chain_db.write(&wb).expect("write db batch"); - wb.clear().unwrap(); - } - pbi.inc(1); - } - } - } - } - - fn version(&self) -> &str { - VERSION - } -} From 65e99f8c5557b41441d567c4bf1d839a7f7b2280 Mon Sep 17 00:00:00 2001 From: yukang Date: Tue, 21 Nov 2023 23:51:38 +0800 Subject: [PATCH 10/14] use OnceCell for flag --- db-migration/src/lib.rs | 69 ++++++++++++++++++++++----------------- db-migration/src/tests.rs | 65 ++++++++++++++++++++++++++++++++++-- 2 files changed, 102 insertions(+), 32 deletions(-) diff --git a/db-migration/src/lib.rs b/db-migration/src/lib.rs index fd8604731d..4710cf5009 100644 --- a/db-migration/src/lib.rs +++ b/db-migration/src/lib.rs @@ -1,5 +1,7 @@ //! TODO(doc): @quake -use ckb_channel::{unbounded, Receiver}; +use ckb_channel::select; +use ckb_channel::unbounded; +use ckb_channel::Receiver; use ckb_db::{ReadOnlyDB, RocksDB}; use ckb_db_schema::{COLUMN_META, META_TIP_HEADER_KEY, MIGRATION_VERSION_KEY}; use ckb_error::{Error, InternalErrorKind}; @@ -7,18 +9,17 @@ use ckb_logger::{debug, error, info}; use ckb_stop_handler::register_thread; use console::Term; pub use indicatif::{HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle}; +use once_cell::sync::OnceCell; use std::cmp::Ordering; use std::collections::BTreeMap; use std::collections::VecDeque; -use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::sync::Mutex; use std::thread; use std::thread::JoinHandle; /// Shutdown flag for background migration. -pub static SHUTDOWN_BACKGROUND_MIGRATION: once_cell::sync::Lazy = - once_cell::sync::Lazy::new(|| AtomicBool::new(false)); +pub static SHUTDOWN_BACKGROUND_MIGRATION: OnceCell = OnceCell::new(); #[cfg(test)] mod tests; @@ -34,10 +35,10 @@ pub struct Migrations { } /// Commands -#[derive(PartialEq, Eq)] +#[derive(PartialEq, Eq, Debug)] enum Command { Start, - //Stop, + Stop, } type MigrationTasks = VecDeque<(String, Arc)>; @@ -54,32 +55,37 @@ impl MigrationWorker { pub fn start(self) -> JoinHandle<()> { thread::spawn(move || { - let msg = match self.inbox.recv() { - Ok(msg) => Some(msg), - Err(_err) => return, - }; - - if let Some(Command::Start) = msg { + if let Ok(Command::Start) = self.inbox.recv() { let mut idx = 0; let migrations_count = self.tasks.lock().unwrap().len() as u64; let mpb = Arc::new(MultiProgress::new()); while let Some((name, task)) = self.tasks.lock().unwrap().pop_front() { - eprintln!("start to run migrate in background: {}", name); - let mpbc = Arc::clone(&mpb); - idx += 1; - let pb = move |count: u64| -> ProgressBar { - let pb = mpbc.add(ProgressBar::new(count)); - pb.set_draw_target(ProgressDrawTarget::term(Term::stdout(), None)); - pb.set_prefix(format!("[{}/{}]", idx, migrations_count)); - pb - }; - let db = task.migrate(self.db.clone(), Arc::new(pb)).unwrap(); - db.put_default(MIGRATION_VERSION_KEY, task.version()) - .map_err(|err| { - internal_error(format!("failed to migrate the database: {err}")) - }) - .unwrap(); + select! { + recv(self.inbox) -> msg => { + if let Ok(Command::Stop) = msg { + eprintln!("stop to run migrate in background: {}", name); + break; + } + } + default => { + eprintln!("start to run migrate in background: {}", name); + let mpbc = Arc::clone(&mpb); + idx += 1; + let pb = move |count: u64| -> ProgressBar { + let pb = mpbc.add(ProgressBar::new(count)); + pb.set_draw_target(ProgressDrawTarget::term(Term::stdout(), None)); + pb.set_prefix(format!("[{}/{}]", idx, migrations_count)); + pb + }; + let db = task.migrate(self.db.clone(), Arc::new(pb)).unwrap(); + db.put_default(MIGRATION_VERSION_KEY, task.version()) + .map_err(|err| { + internal_error(format!("failed to migrate the database: {err}")) + }) + .unwrap(); + } + } } } }) @@ -239,10 +245,13 @@ impl Migrations { let worker = MigrationWorker::new(tasks, db.clone(), rx); let exit_signal = ckb_stop_handler::new_crossbeam_exit_rx(); + let clone = v.to_string(); + let tx_clone = tx.clone(); thread::spawn(move || { let _ = exit_signal.recv(); - SHUTDOWN_BACKGROUND_MIGRATION.store(true, std::sync::atomic::Ordering::SeqCst); - eprintln!("set shutdown flag to true"); + let res = SHUTDOWN_BACKGROUND_MIGRATION.set(true); + let _ = tx_clone.send(Command::Stop); + eprintln!("set shutdown flag to true: {:?} version: {}", res, clone); }); let handler = worker.start(); @@ -356,7 +365,7 @@ pub trait Migration: Send + Sync { /// If a migration need to implement the recovery logic, it should check this flag periodically, /// store the migration progress when exiting and recover from the current progress when restarting. fn stop_background(&self) -> bool { - SHUTDOWN_BACKGROUND_MIGRATION.load(std::sync::atomic::Ordering::SeqCst) + *SHUTDOWN_BACKGROUND_MIGRATION.get().unwrap_or(&false) } /// Check if the background migration can be resumed. diff --git a/db-migration/src/tests.rs b/db-migration/src/tests.rs index d81d79a0ea..7c784e9708 100644 --- a/db-migration/src/tests.rs +++ b/db-migration/src/tests.rs @@ -128,6 +128,8 @@ fn test_customized_migration() { #[test] fn test_background_migration() { + use ckb_stop_handler::broadcast_exit_signals; + pub struct BackgroundMigration { version: String, } @@ -162,6 +164,35 @@ fn test_background_migration() { } } + pub struct RunStopMigration { + version: String, + } + impl Migration for RunStopMigration { + fn run_in_background(&self) -> bool { + true + } + + fn migrate( + &self, + db: RocksDB, + _pb: Arc ProgressBar + Send + Sync>, + ) -> Result { + let db_tx = db.transaction(); + loop { + if self.stop_background() { + let v = self.version.as_bytes(); + db_tx.put("1", v, &[2])?; + db_tx.commit()?; + return Ok(db); + } + } + } + + fn version(&self) -> &str { + &self.version.as_str() + } + } + let tmp_dir = tempfile::Builder::new() .prefix("test_default_migration") .tempdir() @@ -212,8 +243,9 @@ fn test_background_migration() { let db = migrations .migrate(RocksDB::open(&config, 12), true) .unwrap(); - // sleep 1 seconds - std::thread::sleep(std::time::Duration::from_secs(1)); + + // wait for background migration to finish + std::thread::sleep(std::time::Duration::from_millis(1000)); assert_eq!( b"20241127101122".to_vec(), db.get_pinned_default(MIGRATION_VERSION_KEY) @@ -238,4 +270,33 @@ fn test_background_migration() { .to_vec(); assert_eq!(v, vec![1]); } + + { + let mut migrations = Migrations::default(); + migrations.add_migration(Arc::new(RunStopMigration { + version: "20251116225943".to_string(), + })); + + let db = ReadOnlyDB::open_cf(&config.path, vec!["4"]) + .unwrap() + .unwrap(); + + assert!(migrations.can_run_in_background(&db)); + let db = migrations + .migrate(RocksDB::open(&config, 12), true) + .unwrap(); + + std::thread::sleep(std::time::Duration::from_millis(100)); + //send stop signal + broadcast_exit_signals(); + std::thread::sleep(std::time::Duration::from_millis(200)); + + let db_tx = db.transaction(); + let v = db_tx + .get_pinned("1", "20251116225943".as_bytes()) + .unwrap() + .unwrap() + .to_vec(); + assert_eq!(v, vec![2]); + } } From f1f77c5963ac7df89e55eadcc8a9e4db13f7e6df Mon Sep 17 00:00:00 2001 From: yukang Date: Wed, 22 Nov 2023 00:35:37 +0800 Subject: [PATCH 11/14] fix issues from exit signal --- db-migration/src/lib.rs | 10 ++++++---- db-migration/src/tests.rs | 2 +- .../src/migrations/set_2019_block_cycle_zero.rs | 4 ++++ util/stop-handler/src/stop_register.rs | 2 +- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/db-migration/src/lib.rs b/db-migration/src/lib.rs index 4710cf5009..fbd120ed38 100644 --- a/db-migration/src/lib.rs +++ b/db-migration/src/lib.rs @@ -78,12 +78,13 @@ impl MigrationWorker { pb.set_prefix(format!("[{}/{}]", idx, migrations_count)); pb }; - let db = task.migrate(self.db.clone(), Arc::new(pb)).unwrap(); - db.put_default(MIGRATION_VERSION_KEY, task.version()) + if let Ok(db) = task.migrate(self.db.clone(), Arc::new(pb)) { + db.put_default(MIGRATION_VERSION_KEY, task.version()) .map_err(|err| { internal_error(format!("failed to migrate the database: {err}")) }) .unwrap(); + } } } } @@ -247,19 +248,20 @@ impl Migrations { let exit_signal = ckb_stop_handler::new_crossbeam_exit_rx(); let clone = v.to_string(); let tx_clone = tx.clone(); - thread::spawn(move || { + let notifier = thread::spawn(move || { let _ = exit_signal.recv(); let res = SHUTDOWN_BACKGROUND_MIGRATION.set(true); let _ = tx_clone.send(Command::Stop); eprintln!("set shutdown flag to true: {:?} version: {}", res, clone); }); + register_thread("migration-notifier", notifier); let handler = worker.start(); - tx.send(Command::Start).expect("send start command"); if all_can_resume { eprintln!("register thread: migration ...."); register_thread("migration", handler); } + tx.send(Command::Start).expect("send start command"); } fn get_migration_version(&self, db: &RocksDB) -> Result, Error> { diff --git a/db-migration/src/tests.rs b/db-migration/src/tests.rs index 7c784e9708..b3a6fa18c3 100644 --- a/db-migration/src/tests.rs +++ b/db-migration/src/tests.rs @@ -189,7 +189,7 @@ fn test_background_migration() { } fn version(&self) -> &str { - &self.version.as_str() + self.version.as_str() } } diff --git a/util/migrate/src/migrations/set_2019_block_cycle_zero.rs b/util/migrate/src/migrations/set_2019_block_cycle_zero.rs index 9db9c3e5cd..4c95044b1e 100644 --- a/util/migrate/src/migrations/set_2019_block_cycle_zero.rs +++ b/util/migrate/src/migrations/set_2019_block_cycle_zero.rs @@ -1,6 +1,7 @@ use ckb_app_config::StoreConfig; use ckb_db_migration::{Migration, ProgressBar, ProgressStyle}; use ckb_db_schema::COLUMN_EPOCH; +use ckb_error::InternalErrorKind; use ckb_store::{ChainDB, ChainStore}; use ckb_types::{ core::hardfork::HardForks, @@ -78,6 +79,9 @@ impl Migration for BlockExt2019ToZero { loop { let db_txn = chain_db.begin_transaction(); + if self.stop_background() { + return Err(InternalErrorKind::Database.other("intrupted").into()); + } for _ in 0..10000 { let hash = header.hash(); let mut old_block_ext = db_txn.get_block_ext(&hash).unwrap(); diff --git a/util/stop-handler/src/stop_register.rs b/util/stop-handler/src/stop_register.rs index f508a75dcf..ccc3d9a6f4 100644 --- a/util/stop-handler/src/stop_register.rs +++ b/util/stop-handler/src/stop_register.rs @@ -68,7 +68,7 @@ pub fn broadcast_exit_signals() { }); } -/// Register a thread `JoinHandle` to `CKB_HANDLES` +/// Register a thread `JoinHandle` to `CKB_HANDLES` pub fn register_thread(name: &str, thread_handle: std::thread::JoinHandle<()>) { trace!("Registering thread {}", name); CKB_HANDLES From b38bc7a65b16345d2cf40f690fd15a4bd3630da7 Mon Sep 17 00:00:00 2001 From: yukang Date: Fri, 24 Nov 2023 00:38:50 +0800 Subject: [PATCH 12/14] fix words in migration --- ckb-bin/src/subcommand/migrate.rs | 3 ++- db-migration/src/lib.rs | 4 ++-- shared/src/shared_builder.rs | 3 ++- util/migrate/src/migrations/set_2019_block_cycle_zero.rs | 4 ++-- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/ckb-bin/src/subcommand/migrate.rs b/ckb-bin/src/subcommand/migrate.rs index e39e27a188..1bc0db2f43 100644 --- a/ckb-bin/src/subcommand/migrate.rs +++ b/ckb-bin/src/subcommand/migrate.rs @@ -42,10 +42,11 @@ pub fn migrate(args: MigrateArgs) -> Result<(), ExitCode> { let input = prompt("\ \n\ Doing migration will take quite a long time before CKB could work again.\n\ - Another choice is to delete all data, then synchronize them again.\n\ \n\ Once the migration started, the data will be no longer compatible with all older versions CKB,\n\ so we strongly recommended you to backup the old data before migrating.\n\ + \n\ + If the migration failed, try to delete all data and sync from scratch.\n\ \nIf you want to migrate the data, please input YES, otherwise, the current process will exit.\n\ > ", ); diff --git a/db-migration/src/lib.rs b/db-migration/src/lib.rs index fbd120ed38..038bbf1d00 100644 --- a/db-migration/src/lib.rs +++ b/db-migration/src/lib.rs @@ -252,13 +252,13 @@ impl Migrations { let _ = exit_signal.recv(); let res = SHUTDOWN_BACKGROUND_MIGRATION.set(true); let _ = tx_clone.send(Command::Stop); - eprintln!("set shutdown flag to true: {:?} version: {}", res, clone); + info!("set shutdown flag to true: {:?} version: {}", res, clone); }); register_thread("migration-notifier", notifier); let handler = worker.start(); if all_can_resume { - eprintln!("register thread: migration ...."); + info!("register thread: migration ...."); register_thread("migration", handler); } tx.send(Command::Start).expect("send start command"); diff --git a/shared/src/shared_builder.rs b/shared/src/shared_builder.rs index 680653ec5b..a0d3476ce0 100644 --- a/shared/src/shared_builder.rs +++ b/shared/src/shared_builder.rs @@ -74,12 +74,13 @@ pub fn open_or_create_db( Ordering::Equal => Ok(RocksDB::open(config, COLUMNS)), Ordering::Less => { let can_run_in_background = migrate.can_run_in_background(&db); + eprintln!("can_run_in_background: {}", can_run_in_background); if migrate.require_expensive(&db) && !can_run_in_background { eprintln!( "For optimal performance, CKB recommends migrating your data into a new format.\n\ If you prefer to stick with the older version, \n\ it's important to note that they may have unfixed vulnerabilities.\n\ - Before migrating, we strongly recommend backuping your data directory. + Before migrating, we strongly recommend backuping your data directory.\n\ To migrate, run `\"{}\" migrate -C \"{}\"` and confirm by typing \"YES\".", bin_name, root_dir.display() diff --git a/util/migrate/src/migrations/set_2019_block_cycle_zero.rs b/util/migrate/src/migrations/set_2019_block_cycle_zero.rs index 4c95044b1e..4e4d8eb951 100644 --- a/util/migrate/src/migrations/set_2019_block_cycle_zero.rs +++ b/util/migrate/src/migrations/set_2019_block_cycle_zero.rs @@ -92,11 +92,11 @@ impl Migration for BlockExt2019ToZero { .get_block_header(&header.parent_hash()) .expect("db must have header"); - pbi.inc(1); - if header.is_genesis() { break; } + + pbi.inc(1); } db_txn.commit()?; From afd32991184d90c7222c509df8a004c0bd1489c9 Mon Sep 17 00:00:00 2001 From: yukang Date: Tue, 26 Dec 2023 10:38:54 +0800 Subject: [PATCH 13/14] fix tests --- util/migrate/src/migrations/set_2019_block_cycle_zero.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/util/migrate/src/migrations/set_2019_block_cycle_zero.rs b/util/migrate/src/migrations/set_2019_block_cycle_zero.rs index 4e4d8eb951..e45f6fd253 100644 --- a/util/migrate/src/migrations/set_2019_block_cycle_zero.rs +++ b/util/migrate/src/migrations/set_2019_block_cycle_zero.rs @@ -88,14 +88,14 @@ impl Migration for BlockExt2019ToZero { old_block_ext.cycles = None; db_txn.insert_block_ext(&hash, &old_block_ext)?; - header = db_txn - .get_block_header(&header.parent_hash()) - .expect("db must have header"); - if header.is_genesis() { break; } + header = db_txn + .get_block_header(&header.parent_hash()) + .expect("db must have header"); + pbi.inc(1); } db_txn.commit()?; From 9d2b2014f14e366c25fd0c54e2d467e40cefe66b Mon Sep 17 00:00:00 2001 From: yukang Date: Tue, 26 Dec 2023 13:11:31 +0800 Subject: [PATCH 14/14] rollback spec --- util/constant/src/hardfork/mainnet.rs | 3 ++- util/constant/src/hardfork/testnet.rs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/util/constant/src/hardfork/mainnet.rs b/util/constant/src/hardfork/mainnet.rs index 0d7572bd15..a3f028a452 100644 --- a/util/constant/src/hardfork/mainnet.rs +++ b/util/constant/src/hardfork/mainnet.rs @@ -4,7 +4,8 @@ pub const CHAIN_SPEC_NAME: &str = "ckb"; /// hardcode rfc0028/rfc0032/rfc0033/rfc0034 epoch pub const RFC0028_RFC0032_RFC0033_RFC0034_START_EPOCH: u64 = 5414; /// First epoch number for CKB v2021, at about 2022/05/10 1:00 UTC -pub const CKB2021_START_EPOCH: u64 = 5414; +// pub const CKB2021_START_EPOCH: u64 = 5414; +pub const CKB2021_START_EPOCH: u64 = 0; /// hardcode ckb2023 epoch pub const CKB2023_START_EPOCH: u64 = u64::MAX; diff --git a/util/constant/src/hardfork/testnet.rs b/util/constant/src/hardfork/testnet.rs index 16d77f610c..6f2b061368 100644 --- a/util/constant/src/hardfork/testnet.rs +++ b/util/constant/src/hardfork/testnet.rs @@ -4,7 +4,8 @@ pub const CHAIN_SPEC_NAME: &str = "ckb_testnet"; /// hardcode rfc0028/rfc0032/rfc0033/rfc0034 epoch pub const RFC0028_RFC0032_RFC0033_RFC0034_START_EPOCH: u64 = 3113; /// First epoch number for CKB v2021, at about 2021/10/24 3:15 UTC. -pub const CKB2021_START_EPOCH: u64 = 3113; +// pub const CKB2021_START_EPOCH: u64 = 3113; +pub const CKB2021_START_EPOCH: u64 = 0; /// hardcode ckb2023 epoch pub const CKB2023_START_EPOCH: u64 = u64::MAX;