Skip to content

Commit

Permalink
Merge pull request #4226 from nervosnetwork/yukang-add-background-mig…
Browse files Browse the repository at this point in the history
…ration

Add migration run in background mode
  • Loading branch information
chenyukang authored Dec 26, 2023
2 parents 5c3e0bb + 9d2b201 commit dbb055d
Show file tree
Hide file tree
Showing 16 changed files with 557 additions and 67 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion ckb-bin/src/subcommand/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ pub fn export(args: ExportArgs, async_handle: Handle) -> Result<(), ExitCode> {
&args.config.db,
None,
async_handle,
args.consensus,
)?;
let (shared, _) = builder.consensus(args.consensus).build()?;
let (shared, _) = builder.build()?;
Export::new(shared, args.target).execute().map_err(|err| {
eprintln!("Export error: {err:?}");
ExitCode::Failure
Expand Down
3 changes: 2 additions & 1 deletion ckb-bin/src/subcommand/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ pub fn import(args: ImportArgs, async_handle: Handle) -> Result<(), ExitCode> {
&args.config.db,
None,
async_handle,
args.consensus,
)?;
let (shared, mut pack) = builder.consensus(args.consensus).build()?;
let (shared, mut pack) = builder.build()?;

let chain_service = ChainService::new(shared, pack.take_proposal_table());
let chain_controller = chain_service.start::<&str>(Some("ImportChainService"));
Expand Down
7 changes: 4 additions & 3 deletions ckb-bin/src/subcommand/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::cmp::Ordering;
use crate::helper::prompt;

pub fn migrate(args: MigrateArgs) -> Result<(), ExitCode> {
let migrate = Migrate::new(&args.config.db.path);
let migrate = Migrate::new(&args.config.db.path, args.consensus.hardfork_switch);

{
let read_only_db = migrate.open_read_only_db().map_err(|e| {
Expand Down Expand Up @@ -42,10 +42,11 @@ pub fn migrate(args: MigrateArgs) -> Result<(), ExitCode> {
let input = prompt("\
\n\
Doing migration will take quite a long time before CKB could work again.\n\
Another choice is to delete all data, then synchronize them again.\n\
\n\
Once the migration started, the data will be no longer compatible with all older versions CKB,\n\
so we strongly recommended you to backup the old data before migrating.\n\
\n\
If the migration failed, try to delete all data and sync from scratch.\n\
\nIf you want to migrate the data, please input YES, otherwise, the current process will exit.\n\
> ",
);
Expand All @@ -67,7 +68,7 @@ pub fn migrate(args: MigrateArgs) -> Result<(), ExitCode> {
})?;

if let Some(db) = bulk_load_db_db {
migrate.migrate(db).map_err(|err| {
migrate.migrate(db, false).map_err(|err| {
eprintln!("Run error: {err:?}");
ExitCode::Failure
})?;
Expand Down
8 changes: 3 additions & 5 deletions ckb-bin/src/subcommand/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ pub fn replay(args: ReplayArgs, async_handle: Handle) -> Result<(), ExitCode> {
&args.config.db,
None,
async_handle.clone(),
args.consensus.clone(),
)?;
let (shared, _) = shared_builder
.consensus(args.consensus.clone())
.tx_pool_config(args.config.tx_pool.clone())
.build()?;

Expand All @@ -44,11 +44,9 @@ pub fn replay(args: ReplayArgs, async_handle: Handle) -> Result<(), ExitCode> {
&tmp_db_config,
None,
async_handle,
args.consensus,
)?;
let (tmp_shared, mut pack) = shared_builder
.consensus(args.consensus)
.tx_pool_config(args.config.tx_pool)
.build()?;
let (tmp_shared, mut pack) = shared_builder.tx_pool_config(args.config.tx_pool).build()?;
let chain = ChainService::new(tmp_shared, pack.take_proposal_table());

if let Some((from, to)) = args.profile {
Expand Down
3 changes: 2 additions & 1 deletion ckb-bin/src/subcommand/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ impl Statics {
&args.config.db,
None,
async_handle,
args.consensus,
)?;
let (shared, _) = shared_builder.consensus(args.consensus).build()?;
let (shared, _) = shared_builder.build()?;

let tip_number = shared.snapshot().tip_number();

Expand Down
3 changes: 3 additions & 0 deletions db-migration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ ckb-db = { path = "../db", version = "= 0.114.0-pre" }
ckb-logger = { path = "../util/logger", version = "= 0.114.0-pre" }
ckb-error = { path = "../error", version = "= 0.114.0-pre" }
ckb-db-schema = { path = "../db-schema", version = "= 0.114.0-pre" }
ckb-channel = { path = "../util/channel", version = "= 0.114.0-pre" }
ckb-stop-handler = { path = "../util/stop-handler", version = "= 0.114.0-pre" }
once_cell = "1.8.0"
indicatif = "0.16"
console = ">=0.9.1, <1.0.0"

Expand Down
194 changes: 176 additions & 18 deletions db-migration/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
//! TODO(doc): @quake
use ckb_channel::select;
use ckb_channel::unbounded;
use ckb_channel::Receiver;
use ckb_db::{ReadOnlyDB, RocksDB};
use ckb_db_schema::{COLUMN_META, META_TIP_HEADER_KEY, MIGRATION_VERSION_KEY};
use ckb_error::{Error, InternalErrorKind};
use ckb_logger::{debug, error, info};
use ckb_stop_handler::register_thread;
use console::Term;
pub use indicatif::{HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
use once_cell::sync::OnceCell;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::thread::JoinHandle;

/// Shutdown flag for background migration.
pub static SHUTDOWN_BACKGROUND_MIGRATION: OnceCell<bool> = OnceCell::new();

#[cfg(test)]
mod tests;
Expand All @@ -19,7 +31,66 @@ fn internal_error(reason: String) -> Error {
/// TODO(doc): @quake
#[derive(Default)]
pub struct Migrations {
migrations: BTreeMap<String, Box<dyn Migration>>,
migrations: BTreeMap<String, Arc<dyn Migration>>,
}

/// Commands
#[derive(PartialEq, Eq, Debug)]
enum Command {
Start,
Stop,
}

type MigrationTasks = VecDeque<(String, Arc<dyn Migration>)>;
struct MigrationWorker {
tasks: Arc<Mutex<MigrationTasks>>,
db: RocksDB,
inbox: Receiver<Command>,
}

impl MigrationWorker {
pub fn new(tasks: Arc<Mutex<MigrationTasks>>, db: RocksDB, inbox: Receiver<Command>) -> Self {
Self { tasks, db, inbox }
}

pub fn start(self) -> JoinHandle<()> {
thread::spawn(move || {
if let Ok(Command::Start) = self.inbox.recv() {
let mut idx = 0;
let migrations_count = self.tasks.lock().unwrap().len() as u64;
let mpb = Arc::new(MultiProgress::new());

while let Some((name, task)) = self.tasks.lock().unwrap().pop_front() {
select! {
recv(self.inbox) -> msg => {
if let Ok(Command::Stop) = msg {
eprintln!("stop to run migrate in background: {}", name);
break;
}
}
default => {
eprintln!("start to run migrate in background: {}", name);
let mpbc = Arc::clone(&mpb);
idx += 1;
let pb = move |count: u64| -> ProgressBar {
let pb = mpbc.add(ProgressBar::new(count));
pb.set_draw_target(ProgressDrawTarget::term(Term::stdout(), None));
pb.set_prefix(format!("[{}/{}]", idx, migrations_count));
pb
};
if let Ok(db) = task.migrate(self.db.clone(), Arc::new(pb)) {
db.put_default(MIGRATION_VERSION_KEY, task.version())
.map_err(|err| {
internal_error(format!("failed to migrate the database: {err}"))
})
.unwrap();
}
}
}
}
}
})
}
}

impl Migrations {
Expand All @@ -31,7 +102,7 @@ impl Migrations {
}

/// TODO(doc): @quake
pub fn add_migration(&mut self, migration: Box<dyn Migration>) {
pub fn add_migration(&mut self, migration: Arc<dyn Migration>) {
self.migrations
.insert(migration.version().to_string(), migration);
}
Expand Down Expand Up @@ -97,6 +168,28 @@ impl Migrations {
.any(|m| m.expensive())
}

/// Check if all the pending migrations will be executed in background.
pub fn can_run_in_background(&self, db: &ReadOnlyDB) -> bool {
let db_version = match db
.get_pinned_default(MIGRATION_VERSION_KEY)
.expect("get the version of database")
{
Some(version_bytes) => {
String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
}
None => {
// if version is none, but db is not empty
// patch 220464f
return self.is_non_empty_rdb(db);
}
};

self.migrations
.values()
.skip_while(|m| m.version() <= db_version.as_str())
.all(|m| m.run_in_background())
}

fn is_non_empty_rdb(&self, db: &ReadOnlyDB) -> bool {
if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY) {
if v.is_some() {
Expand Down Expand Up @@ -139,6 +232,38 @@ impl Migrations {
Ok(db)
}

fn run_migrate_async(&self, db: RocksDB, v: &str) {
let migrations: VecDeque<(String, Arc<dyn Migration>)> = self
.migrations
.iter()
.filter(|(mv, _)| mv.as_str() > v)
.map(|(mv, m)| (mv.to_string(), Arc::clone(m)))
.collect::<VecDeque<_>>();

let all_can_resume = migrations.iter().all(|(_, m)| m.can_resume());
let tasks = Arc::new(Mutex::new(migrations));
let (tx, rx) = unbounded();
let worker = MigrationWorker::new(tasks, db.clone(), rx);

let exit_signal = ckb_stop_handler::new_crossbeam_exit_rx();
let clone = v.to_string();
let tx_clone = tx.clone();
let notifier = thread::spawn(move || {
let _ = exit_signal.recv();
let res = SHUTDOWN_BACKGROUND_MIGRATION.set(true);
let _ = tx_clone.send(Command::Stop);
info!("set shutdown flag to true: {:?} version: {}", res, clone);
});
register_thread("migration-notifier", notifier);

let handler = worker.start();
if all_can_resume {
info!("register thread: migration ....");
register_thread("migration", handler);
}
tx.send(Command::Start).expect("send start command");
}

fn get_migration_version(&self, db: &RocksDB) -> Result<Option<String>, Error> {
let raw = db
.get_pinned_default(MIGRATION_VERSION_KEY)
Expand Down Expand Up @@ -167,25 +292,18 @@ impl Migrations {
}

/// TODO(doc): @quake
pub fn migrate(&self, db: RocksDB) -> Result<RocksDB, Error> {
pub fn migrate(&self, db: RocksDB, run_in_background: bool) -> Result<RocksDB, Error> {
let db_version = self.get_migration_version(&db)?;
match db_version {
Some(ref v) => {
info!("Current database version {}", v);
if let Some(m) = self.migrations.values().last() {
if m.version() < v.as_str() {
error!(
"Database downgrade detected. \
The database schema version is more recent than the client schema version.\
Please upgrade to the latest client version."
);
return Err(internal_error(
"Database downgrade is not supported".to_string(),
));
}
}

let db = self.run_migrate(db, v.as_str())?;
self.check_migration_downgrade(v)?;
let db = if !run_in_background {
self.run_migrate(db, v.as_str())?
} else {
self.run_migrate_async(db.clone(), v.as_str());
db
};
Ok(db)
}
None => {
Expand All @@ -203,10 +321,26 @@ impl Migrations {
const V: &str = "20210609195048"; // AddExtraDataHash - 1
self.run_migrate(db, V)
}

fn check_migration_downgrade(&self, cur_version: &str) -> Result<(), Error> {
if let Some(m) = self.migrations.values().last() {
if m.version() < cur_version {
error!(
"Database downgrade detected. \
The database schema version is newer than `ckb` schema version,\
please upgrade `ckb` to the latest version"
);
return Err(internal_error(
"Database downgrade is not supported".to_string(),
));
}
}
Ok(())
}
}

/// TODO(doc): @quake
pub trait Migration {
pub trait Migration: Send + Sync {
/// TODO(doc): @quake
fn migrate(
&self,
Expand All @@ -223,6 +357,30 @@ pub trait Migration {
fn expensive(&self) -> bool {
true
}

/// Will this migration be executed in background.
fn run_in_background(&self) -> bool {
false
}

/// Check if the background migration should be stopped.
/// If a migration need to implement the recovery logic, it should check this flag periodically,
/// store the migration progress when exiting and recover from the current progress when restarting.
fn stop_background(&self) -> bool {
*SHUTDOWN_BACKGROUND_MIGRATION.get().unwrap_or(&false)
}

/// Check if the background migration can be resumed.
///
/// If a migration can be resumed, it should implement the recovery logic in `migrate` function.
/// and the `MigirateWorker` will add the migration's handler with `register_thread`, so that then
/// main thread can wait for the background migration to store the progress and exit.
///
/// Otherwise, the migration will be restarted from the beginning.
///
fn can_resume(&self) -> bool {
false
}
}

/// TODO(doc): @quake
Expand Down
Loading

0 comments on commit dbb055d

Please sign in to comment.