From 1fe99c2ec383736496cc89c125536edad682d926 Mon Sep 17 00:00:00 2001 From: Jerome Gravel-Niquet Date: Fri, 22 Sep 2023 13:03:29 -0400 Subject: [PATCH 1/3] support using an existing table schema if it's the same as the configured schema --- crates/corro-agent/src/api/public/mod.rs | 169 ++++++++++++++++++----- crates/corro-types/src/pubsub.rs | 10 +- crates/corro-types/src/schema.rs | 148 +++++++++++++++----- 3 files changed, 249 insertions(+), 78 deletions(-) diff --git a/crates/corro-agent/src/api/public/mod.rs b/crates/corro-agent/src/api/public/mod.rs index 953bb255..9d20c82a 100644 --- a/crates/corro-agent/src/api/public/mod.rs +++ b/crates/corro-agent/src/api/public/mod.rs @@ -616,7 +616,7 @@ async fn execute_schema(agent: &Agent, statements: Vec) -> eyre::Result< let mut schema_write = agent.schema().write(); // clone the previous schema and apply - let new_schema = { + let mut new_schema = { let mut schema = schema_write.clone(); for (name, def) in partial_schema.tables.iter() { // overwrite table because users are expected to return a full table def @@ -628,7 +628,7 @@ async fn execute_schema(agent: &Agent, statements: Vec) -> eyre::Result< block_in_place(|| { let tx = conn.transaction()?; - make_schema_inner(&tx, &schema_write, &new_schema)?; + make_schema_inner(&tx, &schema_write, &mut new_schema)?; for tbl_name in partial_schema.tables.keys() { tx.execute("DELETE FROM __corro_schema WHERE tbl_name = ?", [tbl_name])?; @@ -950,8 +950,9 @@ mod tests { let (tripwire, _tripwire_worker, _tripwire_tx) = Tripwire::new_simple(); let dir = tempfile::tempdir()?; + let db_path = dir.path().join("./test.sqlite"); - let pool = SplitPool::create(dir.path().join("./test.sqlite"), tripwire.clone()).await?; + let pool = SplitPool::create(&db_path, tripwire.clone()).await?; { let mut conn = pool.write_priority().await?; @@ -1024,40 +1025,134 @@ mod tests { assert_eq!(status_code, StatusCode::OK); - let schema = agent.schema().read(); - let tests = schema - .tables - .get("tests") - .expect("no tests table in schema"); - - let id_col = tests.columns.get("id").unwrap(); - assert_eq!(id_col.name, "id"); - assert_eq!(id_col.sql_type, SqliteType::Integer); - assert!(id_col.nullable); - assert!(id_col.primary_key); - - let foo_col = tests.columns.get("foo").unwrap(); - assert_eq!(foo_col.name, "foo"); - assert_eq!(foo_col.sql_type, SqliteType::Text); - assert!(foo_col.nullable); - assert!(!foo_col.primary_key); - - let tests = schema - .tables - .get("tests2") - .expect("no tests2 table in schema"); - - let id_col = tests.columns.get("id").unwrap(); - assert_eq!(id_col.name, "id"); - assert_eq!(id_col.sql_type, SqliteType::Integer); - assert!(id_col.nullable); - assert!(id_col.primary_key); - - let foo_col = tests.columns.get("foo").unwrap(); - assert_eq!(foo_col.name, "foo"); - assert_eq!(foo_col.sql_type, SqliteType::Text); - assert!(foo_col.nullable); - assert!(!foo_col.primary_key); + { + let schema = agent.schema().read(); + let tests = schema + .tables + .get("tests") + .expect("no tests table in schema"); + + let id_col = tests.columns.get("id").unwrap(); + assert_eq!(id_col.name, "id"); + assert_eq!(id_col.sql_type, SqliteType::Integer); + assert!(id_col.nullable); + assert!(id_col.primary_key); + + let foo_col = tests.columns.get("foo").unwrap(); + assert_eq!(foo_col.name, "foo"); + assert_eq!(foo_col.sql_type, SqliteType::Text); + assert!(foo_col.nullable); + assert!(!foo_col.primary_key); + + let tests = schema + .tables + .get("tests2") + .expect("no tests2 table in schema"); + + let id_col = tests.columns.get("id").unwrap(); + assert_eq!(id_col.name, "id"); + assert_eq!(id_col.sql_type, SqliteType::Integer); + assert!(id_col.nullable); + assert!(id_col.primary_key); + + let foo_col = tests.columns.get("foo").unwrap(); + assert_eq!(foo_col.name, "foo"); + assert_eq!(foo_col.sql_type, SqliteType::Text); + assert!(foo_col.nullable); + assert!(!foo_col.primary_key); + } + + // w/ existing table! + + let create_stmt = "CREATE TABLE tests3 (id BIGINT PRIMARY KEY, foo TEXT, updated_at INTEGER NOT NULL DEFAULT 0);"; + + { + // adding the table and an index + let conn = agent.pool().write_priority().await?; + conn.execute_batch(create_stmt)?; + conn.execute_batch("CREATE INDEX tests3_updated_at ON tests3 (updated_at);")?; + assert_eq!( + conn.execute( + "INSERT INTO tests3 VALUES (123, 'some foo text', 123456789);", + () + )?, + 1 + ); + assert_eq!( + conn.execute( + "INSERT INTO tests3 VALUES (1234, 'some foo text 2', 1234567890);", + () + )?, + 1 + ); + } + + let (status_code, _body) = api_v1_db_schema( + Extension(agent.clone()), + axum::Json(vec![create_stmt.into()]), + ) + .await; + + assert_eq!(status_code, StatusCode::OK); + + { + let schema = agent.schema().read(); + + // check that the tests table is still there! + let tests = schema + .tables + .get("tests") + .expect("no tests table in schema"); + + let id_col = tests.columns.get("id").unwrap(); + assert_eq!(id_col.name, "id"); + assert_eq!(id_col.sql_type, SqliteType::Integer); + assert!(id_col.nullable); + assert!(id_col.primary_key); + + let foo_col = tests.columns.get("foo").unwrap(); + assert_eq!(foo_col.name, "foo"); + assert_eq!(foo_col.sql_type, SqliteType::Text); + assert!(foo_col.nullable); + assert!(!foo_col.primary_key); + + let tests = schema + .tables + .get("tests3") + .expect("no tests3 table in schema"); + + let id_col = tests.columns.get("id").unwrap(); + assert_eq!(id_col.name, "id"); + assert_eq!(id_col.sql_type, SqliteType::Integer); + assert!(id_col.nullable); + assert!(id_col.primary_key); + + let foo_col = tests.columns.get("foo").unwrap(); + assert_eq!(foo_col.name, "foo"); + assert_eq!(foo_col.sql_type, SqliteType::Text); + assert!(foo_col.nullable); + assert!(!foo_col.primary_key); + + let updated_at_col = tests.columns.get("updated_at").unwrap(); + assert_eq!(updated_at_col.name, "updated_at"); + assert_eq!(updated_at_col.sql_type, SqliteType::Integer); + assert!(!updated_at_col.nullable); + assert!(!updated_at_col.primary_key); + + let updated_at_idx = tests.indexes.get("tests3_updated_at").unwrap(); + assert_eq!(updated_at_idx.name, "tests3_updated_at"); + assert_eq!(updated_at_idx.tbl_name, "tests3"); + assert_eq!(updated_at_idx.columns.len(), 1); + assert!(updated_at_idx.where_clause.is_none()); + } + + let conn = agent.pool().read().await?; + let count: usize = + conn.query_row("SELECT COUNT(*) FROM tests3__crsql_clock;", (), |row| { + row.get(0) + })?; + // should've created a specific qty of clock table rows, just a sanity check! + assert_eq!(count, 4); Ok(()) } diff --git a/crates/corro-types/src/pubsub.rs b/crates/corro-types/src/pubsub.rs index 41b1ed81..959ba1b8 100644 --- a/crates/corro-types/src/pubsub.rs +++ b/crates/corro-types/src/pubsub.rs @@ -1161,7 +1161,7 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_matcher() -> Result<(), Box> { let schema_sql = "CREATE TABLE sw (pk TEXT primary key, sandwich TEXT);"; - let schema = parse_sql(schema_sql)?; + let mut schema = parse_sql(schema_sql)?; let sql = "SELECT sandwich FROM sw WHERE pk=\"mad\""; @@ -1188,7 +1188,7 @@ mod tests { { let tx = conn.transaction()?; - make_schema_inner(&tx, &NormalizedSchema::default(), &schema)?; + make_schema_inner(&tx, &NormalizedSchema::default(), &mut schema)?; tx.commit()?; } @@ -1291,7 +1291,7 @@ mod tests { ); "; - let schema = parse_sql(schema_sql).unwrap(); + let mut schema = parse_sql(schema_sql).unwrap(); let tmpdir = tempfile::tempdir().unwrap(); let db_path = tmpdir.path().join("test.db"); @@ -1317,7 +1317,7 @@ mod tests { { let tx = conn.transaction().unwrap(); - make_schema_inner(&tx, &NormalizedSchema::default(), &schema).unwrap(); + make_schema_inner(&tx, &NormalizedSchema::default(), &mut schema).unwrap(); tx.commit().unwrap(); } @@ -1368,7 +1368,7 @@ mod tests { { let tx = conn2.transaction().unwrap(); - make_schema_inner(&tx, &NormalizedSchema::default(), &schema).unwrap(); + make_schema_inner(&tx, &NormalizedSchema::default(), &mut schema).unwrap(); tx.commit().unwrap(); } diff --git a/crates/corro-types/src/schema.rs b/crates/corro-types/src/schema.rs index 77505c7e..c2affb69 100644 --- a/crates/corro-types/src/schema.rs +++ b/crates/corro-types/src/schema.rs @@ -127,8 +127,26 @@ pub enum SchemaError { AddPrimaryKey(String, String), #[error("can't modify primary keys (table: '{0}')")] ModifyPrimaryKeys(String), + + #[error("tried importing an existing schema for table '{0}' due to a failed CREATE TABLE but didn't find anything (this should never happen)")] + ImportedSchemaNotFound(String), + + #[error("existing schema for table '{tbl_name}' primary keys mismatched, expected: {expected:?}, got: {got:?}")] + ImportedSchemaPkMismatch { + tbl_name: String, + expected: IndexSet, + got: IndexSet, + }, + + #[error("existing schema for table '{tbl_name}' columns mismatched, expected: {expected:?}, got: {got:?}")] + ImportedSchemaColumnsMismatch { + tbl_name: String, + expected: IndexMap, + got: IndexMap, + }, } +#[allow(clippy::result_large_err)] pub fn init_schema(conn: &Connection) -> Result { let mut dump = String::new(); @@ -159,10 +177,11 @@ pub fn init_schema(conn: &Connection) -> Result { parse_sql(dump.as_str()) } +#[allow(clippy::result_large_err)] pub fn make_schema_inner( tx: &Transaction, schema: &NormalizedSchema, - new_schema: &NormalizedSchema, + new_schema: &mut NormalizedSchema, ) -> Result<(), SchemaError> { if let Some(name) = schema .tables @@ -177,51 +196,104 @@ pub fn make_schema_inner( )); } - let new_table_names = new_schema - .tables - .keys() - .collect::>() - .difference(&schema.tables.keys().collect::>()) - .cloned() - .collect::>(); + let mut schema_to_merge = NormalizedSchema::default(); - debug!("new table names: {new_table_names:?}"); + { + let new_table_names = new_schema + .tables + .keys() + .collect::>() + .difference(&schema.tables.keys().collect::>()) + .cloned() + .collect::>(); - let new_tables_iter = new_schema - .tables - .iter() - .filter(|(table, _)| new_table_names.contains(table)); + debug!("new table names: {new_table_names:?}"); - for (name, table) in new_tables_iter { - info!("creating table '{name}'"); - tx.execute_batch( - &Cmd::Stmt(Stmt::CreateTable { - temporary: false, - if_not_exists: false, - tbl_name: QualifiedName::single(Name(name.clone())), - body: table.raw.clone(), - }) - .to_string(), - )?; - - tx.execute_batch(&format!("SELECT crsql_as_crr('{name}');"))?; + let new_tables_iter = new_schema + .tables + .iter() + .filter(|(table, _)| new_table_names.contains(table)); - for (idx_name, index) in table.indexes.iter() { - info!("creating index '{idx_name}'"); - tx.execute_batch( - &Cmd::Stmt(Stmt::CreateIndex { - unique: false, + for (name, table) in new_tables_iter { + info!("creating table '{name}'"); + let create_table_res = tx.execute_batch( + &Cmd::Stmt(Stmt::CreateTable { + temporary: false, if_not_exists: false, - idx_name: QualifiedName::single(Name(idx_name.clone())), - tbl_name: Name(index.tbl_name.clone()), - columns: index.columns.clone(), - where_clause: index.where_clause.clone(), + tbl_name: QualifiedName::single(Name(name.clone())), + body: table.raw.clone(), }) .to_string(), - )?; + ); + + if let Err(e) = create_table_res { + debug!("could not create table '{name}', trying to reconcile schema if table already exists"); + let sql: Vec = tx + .prepare( + "SELECT sql FROM sqlite_schema WHERE tbl_name = ? AND type IN ('table', 'index') AND name IS NOT NULL AND sql IS NOT NULL")?.query_map( + [name], + |row| row.get(0), + )?.collect::>>()?; + + if sql.is_empty() { + return Err(e.into()); + } + + let sql = sql.join(";"); + info!("found existing schema for '{name}'"); + + let parsed_table = parse_sql(&sql)? + .tables + .remove(name) + .ok_or_else(|| SchemaError::ImportedSchemaNotFound(name.clone()))?; + + if parsed_table.pk != table.pk { + return Err(SchemaError::ImportedSchemaPkMismatch { + tbl_name: name.clone(), + expected: table.pk.clone(), + got: parsed_table.pk, + }); + } + + if parsed_table.columns != table.columns { + return Err(SchemaError::ImportedSchemaColumnsMismatch { + tbl_name: name.clone(), + expected: table.columns.clone(), + got: parsed_table.columns, + }); + } + + schema_to_merge.tables.insert(name.clone(), parsed_table); + } + + tx.execute_batch(&format!("SELECT crsql_as_crr('{name}');"))?; + + if schema_to_merge.tables.contains_key(name) { + // just merged! + continue; + } + + for (idx_name, index) in table.indexes.iter() { + info!("creating index '{idx_name}'"); + tx.execute_batch( + &Cmd::Stmt(Stmt::CreateIndex { + unique: false, + if_not_exists: false, + idx_name: QualifiedName::single(Name(idx_name.clone())), + tbl_name: Name(index.tbl_name.clone()), + columns: index.columns.clone(), + where_clause: index.where_clause.clone(), + }) + .to_string(), + )?; + } } } + for (name, table) in schema_to_merge.tables { + new_schema.tables.insert(name, table); + } + // iterate intersecting tables for name in new_schema .tables @@ -464,6 +536,7 @@ pub fn make_schema_inner( Ok(()) } +#[allow(clippy::result_large_err)] pub fn parse_sql_to_schema(schema: &mut NormalizedSchema, sql: &str) -> Result<(), SchemaError> { debug!("parsing {sql}"); let mut parser = sqlite3_parser::lexer::sql::Parser::new(sql.as_bytes()); @@ -536,6 +609,7 @@ pub fn parse_sql_to_schema(schema: &mut NormalizedSchema, sql: &str) -> Result<( Ok(()) } +#[allow(clippy::result_large_err)] pub fn parse_sql(sql: &str) -> Result { let mut schema = NormalizedSchema::default(); @@ -544,6 +618,7 @@ pub fn parse_sql(sql: &str) -> Result { Ok(schema) } +#[allow(clippy::result_large_err)] fn prepare_index( name: &QualifiedName, tbl_name: &Name, @@ -566,6 +641,7 @@ fn prepare_index( })) } +#[allow(clippy::result_large_err)] fn prepare_table( tbl_name: &QualifiedName, columns: &[ColumnDefinition], From f890b29254fc7308841e33ffe14b6428644626aa Mon Sep 17 00:00:00 2001 From: Jerome Gravel-Niquet Date: Fri, 22 Sep 2023 13:06:03 -0400 Subject: [PATCH 2/3] add a changelog --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..f8890d38 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,9 @@ +# Changelog + +## Unreleased + +- Support existing tables being added to the schema [#64] + +## v0.1.0 + +Initial release! \ No newline at end of file From d691ea47345b961c15e47ce8de65a3c13fa8e4a7 Mon Sep 17 00:00:00 2001 From: Jerome Gravel-Niquet Date: Fri, 22 Sep 2023 13:07:07 -0400 Subject: [PATCH 3/3] link to PR --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f8890d38..1f6d8484 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## Unreleased -- Support existing tables being added to the schema [#64] +- Support existing tables being added to the schema [#64](https://github.com/superfly/corrosion/pull/64) ## v0.1.0