diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index fe872d65..bab99b78 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -37,6 +37,7 @@ jobs: run: | python -m pytest --cov-report xml --cov=cumulus_library tests - name: Generate coverage report + if: github.ref != 'refs/heads/main' uses: orgoro/coverage@v3.1 with: coverageFile: coverage.xml diff --git a/MAINTAINER.md b/MAINTAINER.md new file mode 100644 index 00000000..1da6a43c --- /dev/null +++ b/MAINTAINER.md @@ -0,0 +1,39 @@ +# Maintainer notes + +This document is intended for users contributing/manitaining this repository. +It is not comprehensive, but aims to capture items relevant to architecture +that are not covered in another document. + +## Intended usage and database schemas + +Since these terms are used somewhat haphazardly in different database implementations, +we'll quickly define them for the purposes of this document: + +- database - a single instance of a database product, local or in the cloud. it can +contain serveral schemas. +- schema - a namespaced collection of tables inside a database + +Cumulus, as a holistic system, is designed to allow querying against the entire history +of a medical institution. You do not need to preselect a cohort - that can be done +by the author of a given study. We generally recommend using this approach, and it +is the one that we are trying to use in house. + +However, for technical and philosophical reasons, users may wish instead to select +a cohort at their EHR, and upload that data to a specific named schema in their +database, and work against that. It's important that we remember this use case +as we roll out new features. + +From the perspective of this repository, and studies which run on top of it, it's +important to remember these dual use cases - we should never make assumptions +about which database schema will be used, and it may change from one run to the next. +But all data associated with a single schema (source data and Cumulus studies) should +exist inside that schema. + +As of this writing, the sole exception to this is for third party vocabulary systems. +For these, the CLI will automatically create these in a unique schema, basically +(but not enforced) as read only tables that can be referenced by other studies +via cross-database joining. Additional tables should not be created by users in these +schemas. + +A user could elect to use these vocabulary builders and skip the entire rest of the +Cumulus ecosystem, if they wanted to. \ No newline at end of file diff --git a/cumulus_library/actions/builder.py b/cumulus_library/actions/builder.py index f3d2b8a5..0469e804 100644 --- a/cumulus_library/actions/builder.py +++ b/cumulus_library/actions/builder.py @@ -34,7 +34,7 @@ def _temporary_sys_path(add: pathlib.Path) -> None: def _load_and_execute_builder( - manifest_parser: study_parser.StudyManifestParser, + manifest: study_parser.StudyManifestParser, filename: str, cursor: databases.DatabaseCursor, schema: str, @@ -48,7 +48,7 @@ def _load_and_execute_builder( ) -> None: """Loads a table builder from a file. - :param manifest_parser: a StudyManifestParser object + :param manifest: a StudyManifestParser object :param filename: filename of a module implementing a TableBuilder :param cursor: a database cursor for query execution :param schema: the database schema to write to @@ -64,14 +64,14 @@ def _load_and_execute_builder( # jump through some importlib hoops to import the module directly from # a source file defined in the manifest. spec = importlib.util.spec_from_file_location( - "table_builder", f"{manifest_parser._study_path}/{filename}" + "table_builder", f"{manifest._study_path}/{filename}" ) table_builder_module = importlib.util.module_from_spec(spec) sys.modules["table_builder"] = table_builder_module # Inject the study dir into sys.path so that builders can import # from surrounding utility code, even if the study isn't installed. # (i.e. you're working from a git checkout and do something like `-s .`) - with _temporary_sys_path(manifest_parser._study_path.parent): + with _temporary_sys_path(manifest._study_path.parent): spec.loader.exec_module(table_builder_module) # We're going to find all subclasses of BaseTableBuild in this file. @@ -88,7 +88,7 @@ def _load_and_execute_builder( if len(table_builder_subclasses) == 0: raise errors.StudyManifestParsingError( - f"Error loading {manifest_parser._study_path}{filename}\n" + f"Error loading {manifest._study_path}{filename}\n" "Custom builders must extend the BaseTableBuilder class." ) @@ -107,9 +107,7 @@ def _load_and_execute_builder( table_builder.comment_queries(doc_str=doc_str) new_filename = pathlib.Path(f"{filename}").stem + ".sql" table_builder.write_queries( - path=pathlib.Path( - f"{manifest_parser._study_path}/reference_sql/" + new_filename - ) + path=pathlib.Path(f"{manifest._study_path}/reference_sql/" + new_filename) ) else: table_builder.execute_queries( @@ -119,6 +117,7 @@ def _load_and_execute_builder( drop_table=drop_table, parser=db_parser, config=config, + manifest=manifest, ) # After running the executor code, we'll remove @@ -129,7 +128,7 @@ def _load_and_execute_builder( def run_protected_table_builder( - manifest_parser: study_parser.StudyManifestParser, + manifest: study_parser.StudyManifestParser, cursor: databases.DatabaseCursor, schema: str, *, @@ -138,7 +137,7 @@ def run_protected_table_builder( ) -> None: """Creates protected tables for persisting selected data across runs - :param manifest_parser: a StudyManifestParser object + :param manifest: a StudyManifestParser object :param cursor: A DatabaseCursor object :param schema: The name of the schema to write tables to :keyword config: a StudyConfig object @@ -149,14 +148,15 @@ def run_protected_table_builder( cursor, schema, verbose, - study_name=manifest_parser._study_config.get("study_prefix"), - study_stats=manifest_parser._study_config.get("statistics_config"), + study_name=manifest._study_config.get("study_prefix"), + study_stats=manifest._study_config.get("statistics_config"), config=config, + manifest=manifest, ) def run_table_builder( - manifest_parser: study_parser.StudyManifestParser, + manifest: study_parser.StudyManifestParser, cursor: databases.DatabaseCursor, schema: str, *, @@ -166,16 +166,16 @@ def run_table_builder( ) -> None: """Loads modules from a manifest and executes code via BaseTableBuilder - :param manifest_parser: a StudyManifestParser object + :param manifest: a StudyManifestParser object :param cursor: A DatabaseCursor object :param schema: The name of the schema to write tables to :keyword config: a StudyConfig object :keyword verbose: if true, will replace progress bars with sql query output :keyword db_parser: an object implementing DatabaseParser for the target database """ - for file in manifest_parser.get_table_builder_file_list(): + for file in manifest.get_table_builder_file_list(): _load_and_execute_builder( - manifest_parser, + manifest, file, cursor, schema, @@ -186,7 +186,7 @@ def run_table_builder( def run_counts_builders( - manifest_parser: study_parser.StudyManifestParser, + manifest: study_parser.StudyManifestParser, cursor: databases.DatabaseCursor, schema: str, *, @@ -200,20 +200,25 @@ def run_counts_builders( given dataset, where other statistical methods may use sampling techniques or adjustable input parameters that may need to be preserved for later review. - :param manifest_parser: a StudyManifestParser object + :param manifest: a StudyManifestParser object :param cursor: A DatabaseCursor object :param schema: The name of the schema to write tables to :keyword config: a StudyConfig object :keyword verbose: if true, will replace progress bars with sql query output """ - for file in manifest_parser.get_counts_builder_file_list(): + for file in manifest.get_counts_builder_file_list(): _load_and_execute_builder( - manifest_parser, file, cursor, schema, verbose=verbose, config=config + manifest, + file, + cursor, + schema, + verbose=verbose, + config=config, ) def run_statistics_builders( - manifest_parser: study_parser.StudyManifestParser, + manifest: study_parser.StudyManifestParser, cursor: databases.DatabaseCursor, schema: str, *, @@ -222,7 +227,7 @@ def run_statistics_builders( ) -> None: """Loads statistics modules from toml definitions and executes - :param manifest_parser: a StudyManifestParser object + :param manifest: a StudyManifestParser object :param cursor: A DatabaseCursor object :param schema: The name of the schema to write tables to :keyword config: a StudyConfig object @@ -230,12 +235,12 @@ def run_statistics_builders( """ if not config.stats_build: return - for file in manifest_parser.get_statistics_file_list(): + for file in manifest.get_statistics_file_list(): # This open is a bit redundant with the open inside of the PSM builder, # but we're letting it slide so that builders function similarly # across the board safe_timestamp = base_utils.get_tablename_safe_iso_timestamp() - toml_path = pathlib.Path(f"{manifest_parser._study_path}/{file}") + toml_path = pathlib.Path(f"{manifest._study_path}/{file}") with open(toml_path, encoding="UTF-8") as file: config = toml.load(file) config_type = config["config_type"] @@ -243,7 +248,7 @@ def run_statistics_builders( if config_type == "psm": builder = psm.PsmBuilder( toml_path, - manifest_parser.data_path / f"{manifest_parser.get_study_prefix()}/psm", + manifest.data_path / f"{manifest.get_study_prefix()}/psm", config=config, ) else: @@ -251,11 +256,16 @@ def run_statistics_builders( f"{toml_path} references an invalid statistics type {config_type}." ) builder.execute_queries( - cursor, schema, verbose, table_suffix=safe_timestamp, config=config + cursor, + schema, + verbose, + table_suffix=safe_timestamp, + config=config, + manifest=manifest, ) insert_query = base_templates.get_insert_into_query( - f"{manifest_parser.get_study_prefix()}__{enums.ProtectedTables.STATISTICS.value}", + f"{manifest.get_study_prefix()}__{enums.ProtectedTables.STATISTICS.value}", [ "study_name", "library_version", @@ -266,7 +276,7 @@ def run_statistics_builders( ], [ [ - manifest_parser.get_study_prefix(), + manifest.get_study_prefix(), __version__, config_type, f"{target_table}_{safe_timestamp}", @@ -279,7 +289,7 @@ def run_statistics_builders( def run_matching_table_builder( - manifest_parser: study_parser.StudyManifestParser, + manifest: study_parser.StudyManifestParser, cursor: databases.DatabaseCursor, schema: str, builder: str, @@ -290,19 +300,19 @@ def run_matching_table_builder( ): """targets all table builders matching a target string for running - :param manifest_parser: a StudyManifestParser object + :param manifest: a StudyManifestParser object :param cursor: A DatabaseCursor object :param schema: The name of the schema to write tables to :param builder: filename of a module implementing a TableBuilder :keyword config: a StudyConfig object :keyword verbose: if true, will replace progress bars with sql query output :keyword db_parser: an object implementing DatabaseParser for the target database""" - all_generators = manifest_parser.get_all_generators() + all_generators = manifest.get_all_generators() for file in all_generators: if builder and file.find(builder) == -1: continue _load_and_execute_builder( - manifest_parser, + manifest, file, cursor, schema, @@ -314,7 +324,7 @@ def run_matching_table_builder( def build_study( - manifest_parser: study_parser.StudyManifestParser, + manifest: study_parser.StudyManifestParser, cursor: databases.DatabaseCursor, *, config: base_utils.StudyConfig, @@ -323,7 +333,7 @@ def build_study( ) -> list: """Creates tables in the schema by iterating through the sql_config.file_names - :param manifest_parser: a StudyManifestParser object + :param manifest: a StudyManifestParser object :param cursor: A DatabaseCursor object :keyword config: a StudyConfig object :keyword verbose: if true, will replace progress bars with sql query output @@ -331,22 +341,28 @@ def build_study( :returns: loaded queries (for unit testing only) """ queries = [] - for file in manifest_parser.get_sql_file_list(continue_from): + for file in manifest.get_sql_file_list(continue_from): for query in base_utils.parse_sql( - base_utils.load_text(f"{manifest_parser._study_path}/{file}") + base_utils.load_text(f"{manifest._study_path}/{file}") ): queries.append([query, file]) if len(queries) == 0: return [] + for query in queries: + query[0] = base_utils.update_query_if_schema_specified(query[0], manifest) + query[0] = query[0].replace( + f"`{manifest.get_study_prefix()}__", + "`", + ) # We want to only show a progress bar if we are :not: printing SQL lines with base_utils.get_progress_bar(disable=verbose) as progress: task = progress.add_task( - f"Creating {manifest_parser.get_study_prefix()} study in db...", + f"Creating {manifest.get_study_prefix()} study in db...", total=len(queries), visible=not verbose, ) _execute_build_queries( - manifest_parser, + manifest, cursor, verbose, queries, @@ -370,7 +386,7 @@ def _query_error(query_and_filename: list, exit_message: str) -> None: def _execute_build_queries( - manifest_parser: study_parser.StudyManifestParser, + manifest: study_parser.StudyManifestParser, cursor: databases.DatabaseCursor, verbose: bool, queries: list, @@ -380,7 +396,7 @@ def _execute_build_queries( ) -> None: """Handler for executing create table queries and displaying console output. - :param manifest_parser: a StudyManifestParser object + :param manifest: a StudyManifestParser object :param cursor: A DatabaseCursor object :param verbose: toggle from progress bar to query output :param queries: a list of queries read from files in sql_config.file_names @@ -390,15 +406,18 @@ def _execute_build_queries( """ for query in queries: create_line = query[0].split("\n")[0] - if f" {manifest_parser.get_study_prefix()}__" not in create_line: + if ( + f" {manifest.get_study_prefix()}__" not in create_line + and not manifest.get_dedicated_schema() + ): _query_error( query, "This query does not contain the study prefix. All tables should " - f"start with a string like `{manifest_parser.get_study_prefix()}__`, " + f"start with a string like `{manifest.get_study_prefix()}__`, " "and it should be in the first line of the query.", ) if any( - f" {manifest_parser.get_study_prefix()}__{word.value}_" in create_line + f" {manifest.get_study_prefix()}__{word.value}_" in create_line for word in enums.ProtectedTableKeywords ): _query_error( @@ -415,9 +434,12 @@ def _execute_build_queries( "This query contains a table name with more than one '__' in it. " "Double underscores are reserved for special use cases. Please " "rename this table so the only double undercore is after the " - f"study prefix, e.g. `{manifest_parser.get_study_prefix()}__`", + f"study prefix, e.g. `{manifest.get_study_prefix()}__`", ) - if f"{manifest_parser.get_study_prefix()}__" not in query[0].split("\n")[0]: + if ( + f"{manifest.get_study_prefix()}__" not in query[0].split("\n")[0] + and not manifest.get_dedicated_schema() + ): _query_error( query, "This query does not contain the study prefix. All tables should " diff --git a/cumulus_library/actions/cleaner.py b/cumulus_library/actions/cleaner.py index 95ed342c..f54a93a7 100644 --- a/cumulus_library/actions/cleaner.py +++ b/cumulus_library/actions/cleaner.py @@ -100,7 +100,10 @@ def clean_study( raise errors.CumulusLibraryError( "Either a manifest parser or a filter prefix must be provided" ) - if not prefix: + if manifest_parser and manifest_parser.get_dedicated_schema(): + drop_prefix = "" + display_prefix = "" + elif not prefix: drop_prefix = f"{manifest_parser.get_study_prefix()}__" display_prefix = manifest_parser.get_study_prefix() else: @@ -109,7 +112,7 @@ def clean_study( if stats_clean: confirm = input( - "This will remove all historical stats tables beginning in the " + "This will remove all historical stats tables in the " f"{display_prefix} study - are you sure? (y/N)" ) if confirm is None or confirm.lower() not in ("y", "yes"): @@ -136,6 +139,7 @@ def clean_study( ( f"__{word.value}_" in view_table[0] or view_table[0].endswith(f"__{word.value}") + or view_table[0].startswith(f"{word.value}_") ) for word in enums.ProtectedTableKeywords ): @@ -148,6 +152,14 @@ def clean_study( confirm = input("Remove these tables? (y/N)") if confirm.lower() not in ("y", "yes"): sys.exit("Table cleaning aborted") + if dedicated := manifest_parser.get_dedicated_schema(): + view_table_list = [ + ( + f"`{dedicated}`.`{x[0]}`", + x[1], + ) + for x in view_table_list + ] # We want to only show a progress bar if we are :not: printing SQL lines with base_utils.get_progress_bar(disable=verbose) as progress: task = progress.add_task( diff --git a/cumulus_library/actions/exporter.py b/cumulus_library/actions/exporter.py index 9e854574..a7a68abd 100644 --- a/cumulus_library/actions/exporter.py +++ b/cumulus_library/actions/exporter.py @@ -54,10 +54,12 @@ def export_study( :returns: a list of queries, (only for unit tests) """ reset_counts_exports(manifest_parser) + if manifest_parser.get_dedicated_schema(): + prefix = f"{manifest_parser.get_dedicated_schema()}." + else: + prefix = f"{manifest_parser.get_study_prefix()}__" if archive: - table_query = base_templates.get_show_tables( - schema_name, f"{manifest_parser.get_study_prefix()}__" - ) + table_query = base_templates.get_show_tables(schema_name, prefix) result = db.cursor().execute(table_query).fetchall() table_list = [row[0] for row in result] else: @@ -70,6 +72,7 @@ def export_study( description=f"Exporting {manifest_parser.get_study_prefix()} data...", ): query = f"SELECT * FROM {table}" + query = base_utils.update_query_if_schema_specified(query, manifest_parser) dataframe_chunks, db_schema = db.execute_as_pandas(query, chunksize=chunksize) path.mkdir(parents=True, exist_ok=True) schema = pyarrow.schema(db.col_pyarrow_types_from_sql(db_schema)) @@ -86,7 +89,7 @@ def export_study( for chunk in dataframe_chunks: _write_chunk(p_writer, chunk, schema) # pragma: no cover _write_chunk(c_writer, chunk, schema) # pragma: no cover - queries.append(queries) + queries.append(query) if archive: base_utils.zip_dir(path, data_path, manifest_parser.get_study_prefix()) return queries diff --git a/cumulus_library/base_table_builder.py b/cumulus_library/base_table_builder.py index 4981c439..0a6bbae9 100644 --- a/cumulus_library/base_table_builder.py +++ b/cumulus_library/base_table_builder.py @@ -6,7 +6,7 @@ from abc import ABC, abstractmethod from typing import final -from cumulus_library import base_utils +from cumulus_library import base_utils, study_parser from cumulus_library.databases import DatabaseCursor @@ -42,6 +42,7 @@ def execute_queries( verbose: bool, *args, drop_table: bool = False, + manifest: study_parser.StudyManifestParser = None, **kwargs, ): """Executes queries set up by a prepare_queries call @@ -51,7 +52,7 @@ def execute_queries( :param verbose: toggle for verbose output mode :param drop_table: drops any tables found in prepared_queries results """ - self.prepare_queries(cursor, schema, *args, **kwargs) + self.prepare_queries(cursor, schema, *args, manifest=manifest, **kwargs) if drop_table: table_names = [] for query in self.queries: @@ -69,6 +70,7 @@ def execute_queries( ) table_name = table_name[0] + # TODO: this may not be required? reinvestigate # if it contains a schema, remove it (usually it won't, but some # CTAS forms may) if "." in table_name: @@ -84,6 +86,7 @@ def execute_queries( ) for query in self.queries: try: + query = base_utils.update_query_if_schema_specified(query, manifest) with base_utils.query_console_output( verbose, query, progress, task ): diff --git a/cumulus_library/base_utils.py b/cumulus_library/base_utils.py index 8e5c6a46..8451a36b 100644 --- a/cumulus_library/base_utils.py +++ b/cumulus_library/base_utils.py @@ -10,7 +10,7 @@ from rich import progress -from cumulus_library import databases +from cumulus_library import databases, study_parser @dataclasses.dataclass @@ -116,6 +116,17 @@ def zip_dir(read_path, write_path, archive_name): shutil.rmtree(read_path) +def update_query_if_schema_specified( + query: str, manifest: study_parser.StudyManifestParser +): + if manifest and manifest.get_dedicated_schema(): + query = query.replace( + f"{manifest.get_study_prefix()}__", + f"{manifest.get_dedicated_schema()}.", + ) + return query + + def unzip_file(file_path: pathlib.Path, write_path: pathlib.Path): """Expands a zip archive""" with zipfile.ZipFile(file_path, mode="r") as z: diff --git a/cumulus_library/cli.py b/cumulus_library/cli.py index c34dc5d1..09571fcd 100755 --- a/cumulus_library/cli.py +++ b/cumulus_library/cli.py @@ -43,15 +43,33 @@ def __init__(self, db: databases.DatabaseBackend, data_path: str): self.cursor = db.cursor() self.schema_name = db.schema_name - def update_transactions(self, prefix: str, status: str): + def get_schema(self, manifest: study_parser.StudyManifestParser): + if dedicated := manifest.get_dedicated_schema(): + self.db.create_schema(dedicated) + return dedicated + return self.schema_name + + def update_transactions( + self, manifest: study_parser.StudyManifestParser, status: str + ): """Adds a record to a study's transactions table""" + if manifest.get_dedicated_schema(): + transactions = ( + f"{manifest.get_dedicated_schema()}." + f"{enums.ProtectedTables.TRANSACTIONS.value}" + ) + else: + transactions = ( + f"{manifest.get_study_prefix()}__" + f"{enums.ProtectedTables.TRANSACTIONS.value}" + ) self.cursor.execute( base_templates.get_insert_into_query( - f"{prefix}__{enums.ProtectedTables.TRANSACTIONS.value}", + transactions, protected_table_builder.TRANSACTIONS_COLS, [ [ - prefix, + manifest.get_study_prefix(), __version__, status, base_utils.get_utc_datetime(), @@ -90,21 +108,23 @@ def clean_study( for target in targets: if prefix: - manifest_parser = study_parser.StudyManifestParser() + manifest = study_parser.StudyManifestParser() + schema = self.get_schema(manifest) cleaner.clean_study( - manifest_parser=manifest_parser, + manifest_parser=manifest, cursor=self.cursor, - schema_name=self.schema_name, + schema_name=schema, verbose=self.verbose, stats_clean=stats_clean, prefix=target, ) else: - manifest_parser = study_parser.StudyManifestParser(study_dict[target]) + manifest = study_parser.StudyManifestParser(study_dict[target]) + schema = self.get_schema(manifest) cleaner.clean_study( - manifest_parser=manifest_parser, + manifest_parser=manifest, cursor=self.cursor, - schema_name=self.schema_name, + schema_name=schema, verbose=self.verbose, stats_clean=stats_clean, ) @@ -122,21 +142,22 @@ def clean_and_build_study( :param config: A StudyConfig object containing optional params :keyword continue_from: Restart a run from a specific sql file (for dev only) """ - manifest_parser = study_parser.StudyManifestParser(target, self.data_path) + manifest = study_parser.StudyManifestParser(target, self.data_path) + schema = self.get_schema(manifest) try: + builder.run_protected_table_builder( + manifest, + self.cursor, + schema, + verbose=self.verbose, + config=config, + ) if not continue_from: - builder.run_protected_table_builder( - manifest_parser, - self.cursor, - self.schema_name, - verbose=self.verbose, - config=config, - ) - self.update_transactions(manifest_parser.get_study_prefix(), "started") + self.update_transactions(manifest, "started") cleaned_tables = cleaner.clean_study( - manifest_parser=manifest_parser, + manifest_parser=manifest, cursor=self.cursor, - schema_name=self.schema_name, + schema_name=schema, verbose=self.verbose, stats_clean=False, ) @@ -144,45 +165,44 @@ def clean_and_build_study( if len(cleaned_tables) == 0: config.stats_build = True builder.run_table_builder( - manifest_parser, + manifest, self.cursor, - self.schema_name, + schema, verbose=self.verbose, db_parser=self.db.parser(), config=config, ) else: - self.update_transactions(manifest_parser.get_study_prefix(), "resumed") - + self.update_transactions(manifest, "resumed") builder.build_study( - manifest_parser, + manifest, self.cursor, verbose=self.verbose, continue_from=continue_from, config=config, ) builder.run_counts_builders( - manifest_parser, + manifest, self.cursor, - self.schema_name, + schema, verbose=self.verbose, config=config, ) builder.run_statistics_builders( - manifest_parser, + manifest, self.cursor, - self.schema_name, + schema, verbose=self.verbose, config=config, ) - self.update_transactions(manifest_parser.get_study_prefix(), "finished") + self.update_transactions(manifest, "finished") except errors.StudyManifestFilesystemError as e: # This should be thrown prior to any database connections, so # skipping logging raise e except Exception as e: - self.update_transactions(manifest_parser.get_study_prefix(), "error") + self.update_transactions(manifest, "error") raise e def run_matching_table_builder( @@ -197,11 +217,12 @@ def run_matching_table_builder( :param table_builder_name: a builder file referenced in the study's manifest :param config: A StudyConfig object containing optional params """ - manifest_parser = study_parser.StudyManifestParser(target) + manifest = study_parser.StudyManifestParser(target) + schema = self.get_schema(manifest) builder.run_matching_table_builder( - manifest_parser, + manifest, self.cursor, - self.schema_name, + schema, table_builder_name, verbose=self.verbose, db_parser=self.db.parser(), @@ -218,10 +239,8 @@ def export_study( """ if data_path is None: sys.exit("Missing destination - please provide a path argument.") - manifest_parser = study_parser.StudyManifestParser(target, data_path) - exporter.export_study( - manifest_parser, self.db, self.schema_name, data_path, archive - ) + manifest = study_parser.StudyManifestParser(target, data_path) + exporter.export_study(manifest, self.db, self.schema_name, data_path, archive) def generate_study_sql( self, @@ -236,11 +255,12 @@ def generate_study_sql( :param config: A StudyConfig object containing optional params :param builder: Specify a single builder to generate sql from """ - manifest_parser = study_parser.StudyManifestParser(target) + manifest = study_parser.StudyManifestParser(target) + schema = self.get_schema(manifest) file_generator.run_generate_sql( - manifest_parser=manifest_parser, + manifest_parser=manifest, cursor=self.cursor, - schema=self.schema_name, + schema=schema, table_builder=builder, verbose=self.verbose, db_parser=self.db.parser(), @@ -255,11 +275,12 @@ def generate_study_markdown( :param target: A path to the study directory """ - manifest_parser = study_parser.StudyManifestParser(target) + manifest = study_parser.StudyManifestParser(target) + schema = self.get_schema(manifest) file_generator.run_generate_markdown( - manifest_parser=manifest_parser, + manifest_parser=manifest, cursor=self.cursor, - schema=self.schema_name, + schema=schema, verbose=self.verbose, db_parser=self.db.parser(), ) @@ -321,7 +342,6 @@ def get_studies_by_manifest_path(path: pathlib.Path) -> dict[str, pathlib.Path]: def run_cli(args: dict): """Controls which library tasks are run based on CLI arguments""" console = rich.console.Console() - if args["action"] == "upload": try: uploader.upload_files(args) @@ -372,6 +392,7 @@ def run_cli(args: dict): runner.clean_and_build_study( study_dict[target], config=config, + continue_from=args["continue_from"], ) elif args["action"] == "export": diff --git a/cumulus_library/databases.py b/cumulus_library/databases.py index a6e48db7..4e74b8f6 100644 --- a/cumulus_library/databases.py +++ b/cumulus_library/databases.py @@ -20,6 +20,7 @@ from typing import Any, Protocol import boto3 +import botocore import cumulus_fhir_support import duckdb import numpy @@ -219,6 +220,10 @@ def upload_file( have an API for file upload (i.e. cloud databases)""" return None + @abc.abstractmethod + def create_schema(self, schema_name): + """Creates a new schema object inside the catalog""" + @abc.abstractmethod def close(self) -> None: """Clean up any resources necessary""" @@ -369,6 +374,14 @@ def upload_file( ) return f"s3://{bucket}/{s3_key}" + def create_schema(self, schema_name) -> None: + """Creates a new schema object inside the database""" + glue_client = boto3.client("glue") + try: + glue_client.get_database(Name=schema_name) + except botocore.exceptions.ClientError: + glue_client.create_database(DatabaseInput={"Name": schema_name}) + def close(self) -> None: return self.connection.close() # pragma: no cover @@ -600,6 +613,10 @@ def parser(self) -> DatabaseParser: def operational_errors(self) -> tuple[Exception]: return (duckdb.OperationalError,) # pragma: no cover + def create_schema(self, schema_name): + """Creates a new schema object inside the database""" + self.connection.sql(f"CREATE SCHEMA {schema_name}") + def close(self) -> None: self.connection.close() diff --git a/cumulus_library/protected_table_builder.py b/cumulus_library/protected_table_builder.py index 80832405..0ab3227c 100644 --- a/cumulus_library/protected_table_builder.py +++ b/cumulus_library/protected_table_builder.py @@ -1,6 +1,6 @@ """Builder for creating tables for tracking state/logging changes""" -from cumulus_library import base_table_builder, enums +from cumulus_library import base_table_builder, enums, study_parser from cumulus_library.template_sql import base_templates TRANSACTIONS_COLS = ["study_name", "library_version", "status", "event_time"] @@ -37,12 +37,21 @@ def prepare_queries( study_name: str, study_stats: dict, *args, + manifest: study_parser.StudyManifestParser | None = None, **kwargs, ): + if manifest and manifest.get_dedicated_schema(): + db_schema = manifest.get_dedicated_schema() + transactions = enums.ProtectedTables.TRANSACTIONS.value + statistics = enums.ProtectedTables.STATISTICS.value + else: + db_schema = schema + transactions = f"{study_name}__{enums.ProtectedTables.TRANSACTIONS.value}" + statistics = f"{study_name}__{enums.ProtectedTables.STATISTICS.value}" self.queries.append( base_templates.get_ctas_empty_query( - schema, - f"{study_name}__{enums.ProtectedTables.TRANSACTIONS.value}", + db_schema, + transactions, TRANSACTIONS_COLS, TRANSACTION_COLS_TYPES, ) @@ -50,8 +59,8 @@ def prepare_queries( if study_stats: self.queries.append( base_templates.get_ctas_empty_query( - schema, - f"{study_name}__{enums.ProtectedTables.STATISTICS.value}", + db_schema, + statistics, STATISTICS_COLS, STATISTICS_COLS_TYPES, ) diff --git a/cumulus_library/study_parser.py b/cumulus_library/study_parser.py index 29d6b8fa..403e48eb 100644 --- a/cumulus_library/study_parser.py +++ b/cumulus_library/study_parser.py @@ -61,6 +61,14 @@ def get_study_prefix(self) -> str | None: """ return self._study_config.get("study_prefix") + def get_dedicated_schema(self) -> str | None: + """Reads the contents of the dedicated schema in the options dict + + :returns: A dictionary of objects, or None if not found + """ + options = self._study_config.get("advanced_options", {}) + return options.get("dedicated_schema") + def get_sql_file_list(self, continue_from: str | None = None) -> list[str] | None: """Reads the contents of the sql_config array from the manifest diff --git a/docs/creating-studies.md b/docs/creating-studies.md index b0d31bd5..bf2bbf95 100644 --- a/docs/creating-studies.md +++ b/docs/creating-studies.md @@ -47,6 +47,7 @@ study_prefix = "my_study" # build tables, you can provide a list of files implementing BaseTableBuilder. # See the core study for examples of this pattern. These run before # any SQL execution + # [table_builder_config] # file_names = [ # "my_table_builder.py", @@ -54,11 +55,14 @@ study_prefix = "my_study" # The following section describes all tables that should be generated directly # from SQL files. + [sql_config] + # 'file_names' defines a list of sql files to execute, in order, in this folder. # Recommended order: Any ancillary config (like a list of condition codes), # tables/view selecting subsets of data from FHIR data, tables/views creating # summary statistics. + file_names = [ "setup.sql", "lab_observations.sql", @@ -69,9 +73,12 @@ file_names = [ # The following section defines parameters related to exporting study data from # your athena database + [export_config] + # The following tables will be output to disk when an export is run. In most cases, # only count tables should be output in this way. + export_list = [ "my_study__count_influenza_test_month", ] @@ -81,6 +88,7 @@ export_list = [ # queries for you. We use this pattern for generating the core tables, as well # other studies authored inside BCH. These will always be run after any other # SQL queries have been generated + # [counts_builder_config] # file_names = [ # "count.py" @@ -92,12 +100,31 @@ export_list = [ # supported approaches. # These will run last, so all the data in your study will exist by the time these # are invoked. + # [statistics_config] # file_names = # [ # "psm_config.toml" # ] +# The following section is for advanced/unusual study use cases + +# [options] + +# If you want to override the default schema name to an explicit one, you can define +# the name of this schema here. 99% of the time, this is not the behavior you want - +# you want library data to be in the same schema as your data source, since this allows +# you to keep track of where your source data for a given study run came from. +# +# The intended use case for this is for static/slow moving data sets that are external +# to your EHR data - this is typically things like coding systems. +# +# These should be read only use cases - if you want to do additional iterations with +# the contents of one of these reference sets, do it in the study, not in the reference +# itself. + +# use_dedicated_schema="alternate_schema_name" + ``` There are other hooks you can use in the manifest for more advanced control over diff --git a/tests/test_athena.py b/tests/test_athena.py index f795a4e8..3171bd1a 100644 --- a/tests/test_athena.py +++ b/tests/test_athena.py @@ -1,10 +1,12 @@ -"""Tests for Athena database support""" +"""Edge case testing for Athena database support""" import json import os import pathlib from unittest import mock +import botocore + from cumulus_library import databases @@ -49,7 +51,7 @@ def test_schema_parsing(): clear=True, ) @mock.patch("botocore.session.Session") -def test_upload_parquet(s3_session_mock): +def test_upload_parquet_response_handling(mock_session): path = pathlib.Path(__file__).resolve().parent db = databases.AthenaDatabaseBackend( region="us-east-1", @@ -64,7 +66,7 @@ def test_upload_parquet(s3_session_mock): s3_client = mock.MagicMock() with open(path / "test_data/aws/boto3.client.s3.list_objects_v2.json") as f: s3_client.list_objects_v2.return_value = json.load(f) - s3_session_mock.return_value.create_client.return_value = s3_client + mock_session.return_value.create_client.return_value = s3_client resp = db.upload_file( file=path / "test_data/count_synthea_patient.parquet", study="test_study", @@ -74,3 +76,24 @@ def test_upload_parquet(s3_session_mock): assert resp == ( "s3://cumulus-athena-123456789012-us-east-1/results/cumulus_user_uploads/db_schema/test_study/count_patient" ) + + +@mock.patch("botocore.client") +def test_create_schema(mock_client): + mock_clientobj = mock_client.ClientCreator.return_value.create_client.return_value + mock_clientobj.get_database.side_effect = [ + None, + botocore.exceptions.ClientError({}, {}), + ] + db = databases.AthenaDatabaseBackend( + region="test", + work_group="test", + profile="test", + schema_name="test", + ) + db.create_schema("test_exists") + assert mock_clientobj.get_database.called + assert not mock_clientobj.create_database.called + + db.create_schema("test_new") + assert mock_clientobj.create_database.called diff --git a/tests/test_cli.py b/tests/test_cli.py index a03a2f43..f8b336ac 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -14,6 +14,7 @@ from pathlib import Path from unittest import mock +import duckdb import pandas import pytest import responses @@ -82,7 +83,7 @@ def test_cli_early_exit(args): "study_python_valid__table", ), ( - ["build", "-t", "study_python_valid", "--continue", "module2"], + ["build", "-t", "study_python_valid", "--builder", "module2"], does_not_raise(), "study_python_valid__table_2", ), @@ -311,7 +312,7 @@ def test_clean(mock_path, tmp_path, args, expected, raises): # pylint: disable= "tests/test_data/", ], ["export", "-t", "study_valid", "-s", "tests/test_data/"], - 2, + 3, does_not_raise(), ), ( @@ -325,7 +326,7 @@ def test_clean(mock_path, tmp_path, args, expected, raises): # pylint: disable= "tests/test_data/", ], ["export", "-t", "study_valid", "-s", "tests/test_data/"], - 2, + 3, does_not_raise(), ), (["build", "-t", "vocab"], None, 3, does_not_raise()), @@ -341,7 +342,7 @@ def test_clean(mock_path, tmp_path, args, expected, raises): # pylint: disable= "--statistics", ], ["export", "-t", "study_valid", "-s", "tests/test_data/study_valid/"], - 2, + 3, does_not_raise(), ), ( @@ -354,7 +355,53 @@ def test_clean(mock_path, tmp_path, args, expected, raises): # pylint: disable= "--statistics", ], ["export", "-t", "study_valid", "-s", "tests/test_data/study_valid/"], + 3, + does_not_raise(), + ), + ( + [ + "build", + "-t", + "study_valid", + "-s", + "tests/test_data/study_valid/", + "--continue", + "test2", + ], + ["export", "-t", "study_valid", "-s", "tests/test_data/study_valid/"], 2, + pytest.raises(duckdb.duckdb.CatalogException), + ), + ( + [ + "build", + "-t", + "study_valid", + "-s", + "tests/test_data/study_valid/", + "--continue", + "test3", + ], + ["export", "-t", "study_valid", "-s", "tests/test_data/study_valid/"], + 2, + pytest.raises(errors.StudyManifestParsingError), + ), + ( + [ + "build", + "-t", + "study_dedicated_schema", + "-s", + "tests/test_data/study_dedicated_schema/", + ], + [ + "export", + "-t", + "study_dedicated_schema", + "-s", + "tests/test_data/study_dedicated_schema/", + ], + 4, does_not_raise(), ), ], @@ -370,7 +417,11 @@ def test_cli_executes_queries( cli.main(cli_args=export_args) db = DuckDatabaseBackend(f"{tmp_path}/duck.db") - found_tables = db.cursor().execute("show tables").fetchall() + found_tables = ( + db.cursor() + .execute("SELECT table_schema,table_name FROM information_schema.tables") + .fetchall() + ) assert len(found_tables) == expected_tables for table in found_tables: # If a table was created by this run, check it has the study prefix @@ -389,7 +440,8 @@ def test_cli_executes_queries( with open(f"{manifest_dir}/manifest.toml", encoding="UTF-8") as file: config = toml.load(file) csv_files = glob.glob(f"{tmp_path}/export/{build_args[2]}/*.csv") - for export_table in config["export_config"]["export_list"]: + export_tables = config["export_config"]["export_list"] + for export_table in export_tables: assert any(export_table in x for x in csv_files) @@ -688,3 +740,40 @@ def test_cli_import_study(tmp_path): tmp_path, ) ) + + +@mock.patch.dict(os.environ, clear=True) +def test_dedicated_schema(tmp_path): + core_build_args = duckdb_args( + [ + "build", + "-t", + "core", + ], + tmp_path, + ) + build_args = duckdb_args( + [ + "build", + "-t", + "study_dedicated_schema", + "-s", + "tests/test_data/study_dedicated_schema/", + ], + tmp_path, + ) + cli.main(cli_args=core_build_args) + cli.main(cli_args=build_args) + db = DuckDatabaseBackend(f"{tmp_path}/duck.db") + tables = ( + db.cursor() + .execute("SELECT table_schema,table_name FROM information_schema.tables") + .fetchall() + ) + for table in [ + ("dedicated", "table_1"), + ("dedicated", "table_2"), + ("dedicated", "table_raw_sql"), + ("main", "core__condition"), + ]: + assert table in tables diff --git a/tests/test_data/study_dedicated_schema/manifest.toml b/tests/test_data/study_dedicated_schema/manifest.toml new file mode 100644 index 00000000..10315815 --- /dev/null +++ b/tests/test_data/study_dedicated_schema/manifest.toml @@ -0,0 +1,13 @@ +study_prefix = "study_dedicated_schema" + +[table_builder_config] +file_names = ["module1.py", "module1.py", "module2.py"] + +[sql_config] +file_names = ["test.sql"] + +[export_config] +export_list = ["study_dedicated_schema__table_raw_sql"] + +[advanced_options] +dedicated_schema="dedicated" diff --git a/tests/test_data/study_dedicated_schema/module1.py b/tests/test_data/study_dedicated_schema/module1.py new file mode 100644 index 00000000..38c6b64b --- /dev/null +++ b/tests/test_data/study_dedicated_schema/module1.py @@ -0,0 +1,10 @@ +from cumulus_library.base_table_builder import BaseTableBuilder + + +class ModuleOneRunner(BaseTableBuilder): + display_text = "module1" + + def prepare_queries(self, cursor: object, schema: str, *args, **kwargs): + self.queries.append( + "CREATE TABLE IF NOT EXISTS study_dedicated_schema__table_1 (test int);" + ) diff --git a/tests/test_data/study_dedicated_schema/module2.py b/tests/test_data/study_dedicated_schema/module2.py new file mode 100644 index 00000000..7e72c388 --- /dev/null +++ b/tests/test_data/study_dedicated_schema/module2.py @@ -0,0 +1,10 @@ +from cumulus_library.base_table_builder import BaseTableBuilder + + +class ModuleTwoRunner(BaseTableBuilder): + display_text = "module2" + + def prepare_queries(self, cursor: object, schema: str, *args, **kwargs): + self.queries.append( + "CREATE TABLE IF NOT EXISTS study_dedicated_schema__table_2 (test int);" + ) diff --git a/tests/test_data/study_dedicated_schema/test.sql b/tests/test_data/study_dedicated_schema/test.sql new file mode 100644 index 00000000..fb3cf7f2 --- /dev/null +++ b/tests/test_data/study_dedicated_schema/test.sql @@ -0,0 +1 @@ +CREATE TABLE study_dedicated_schema__table_raw_sql (test int); diff --git a/tests/test_data/study_valid/manifest.toml b/tests/test_data/study_valid/manifest.toml index 81ba3008..295ae4fc 100644 --- a/tests/test_data/study_valid/manifest.toml +++ b/tests/test_data/study_valid/manifest.toml @@ -1,7 +1,13 @@ study_prefix = "study_valid" [sql_config] -file_names = ["test.sql"] +file_names = [ + "test.sql", + "test2.sql" +] [export_config] -export_list = ["study_valid__table"] +export_list = [ + "study_valid__table", + "study_valid__table2" +] diff --git a/tests/test_data/study_valid/test2.sql b/tests/test_data/study_valid/test2.sql new file mode 100644 index 00000000..bb4ac1d3 --- /dev/null +++ b/tests/test_data/study_valid/test2.sql @@ -0,0 +1 @@ +CREATE TABLE study_valid__table2 (test int); diff --git a/tests/test_databases.py b/tests/test_databases.py index 836c12d7..5d23a15b 100644 --- a/tests/test_databases.py +++ b/tests/test_databases.py @@ -227,7 +227,7 @@ def test_pyarrow_types_from_sql(db, data, expected, raises): "args,expected_type, raises", [ ( - {**{"db_type": "duckdb", "schema_name": "test"}, **DUCKDB_KWARGS}, + {**{"db_type": "duckdb", "schema_name": ":memory:"}, **DUCKDB_KWARGS}, databases.DuckDatabaseBackend, does_not_raise(), ), diff --git a/tests/test_study_parser.py b/tests/test_study_parser.py index c3de3ef5..327f142c 100644 --- a/tests/test_study_parser.py +++ b/tests/test_study_parser.py @@ -11,26 +11,40 @@ @pytest.mark.parametrize( - "manifest_path,raises", + "manifest_path,expected,raises", [ - ("test_data/study_valid", does_not_raise()), - (None, does_not_raise()), + ( + "test_data/study_valid", + ( + "{'study_prefix': 'study_valid', 'sql_config': {'file_names': " + "['test.sql', 'test2.sql']}, 'export_config': {'export_list': " + "['study_valid__table', 'study_valid__table2']}}" + ), + does_not_raise(), + ), + (None, "{}", does_not_raise()), ( "test_data/study_missing_prefix", + "{}", + pytest.raises(errors.StudyManifestParsingError), + ), + ( + "test_data/study_wrong_type", + "{}", pytest.raises(errors.StudyManifestParsingError), ), - ("test_data/study_wrong_type", pytest.raises(errors.StudyManifestParsingError)), - ("", pytest.raises(errors.StudyManifestFilesystemError)), - (".", pytest.raises(errors.StudyManifestFilesystemError)), + ("", "{}", pytest.raises(errors.StudyManifestFilesystemError)), + (".", "{}", pytest.raises(errors.StudyManifestFilesystemError)), ], ) -def test_load_manifest(manifest_path, raises): +def test_load_manifest(manifest_path, expected, raises): with raises: if manifest_path is not None: path = f"{pathlib.Path(__file__).resolve().parents[0]}/{manifest_path}" else: path = None - study_parser.StudyManifestParser(path) + manifest = study_parser.StudyManifestParser(path) + assert str(manifest) == expected @pytest.mark.parametrize(