From 6256fd375513b846f22a48a29f73bafdc9a7e55e Mon Sep 17 00:00:00 2001 From: glrs <5999366+glrs@users.noreply.github.com> Date: Tue, 15 Oct 2024 15:41:33 +0200 Subject: [PATCH] Refactor TenXRunSample class logic to add vdj handling using multi pipeline --- lib/realms/tenx/run_sample.py | 181 +++++++++++++++++++++++----------- 1 file changed, 121 insertions(+), 60 deletions(-) diff --git a/lib/realms/tenx/run_sample.py b/lib/realms/tenx/run_sample.py index 2ebdee5..2320f42 100644 --- a/lib/realms/tenx/run_sample.py +++ b/lib/realms/tenx/run_sample.py @@ -37,8 +37,8 @@ def __init__( """ self.run_sample_id: str = sample_id self.lab_samples: List[Any] = lab_samples - self.project_info: Dict[str, Any] = project_info - self.config: Mapping[str, Any] = config + self.project_info: Dict[str, Any] = project_info or {} + self.config: Mapping[str, Any] = config or {} self.ydm: Any = yggdrasil_db_manager # self.decision_table = TenXUtils.load_decision_table("10x_decision_table.json") @@ -50,9 +50,9 @@ def __init__( self.features: List[str] = self._collect_features() self.pipeline_info: Optional[Dict[str, Any]] = self._get_pipeline_info() - self.reference_genomes: Optional[Dict[str, str]] = ( + self.reference_genomes: Dict[str, str] = ( self.collect_reference_genomes() - ) + ) or {} if DEBUG: # Use a mock SlurmJobManager for debugging purposes @@ -97,7 +97,9 @@ def collect_reference_genomes(self) -> Optional[Dict[str, str]]: or None if an error occurs. """ ref_genomes: Dict[str, str] = {} - feature_to_ref_key = self.config.get("feature_to_ref_key", {}) + feature_to_ref_key = ( + self.config.get("feature_to_ref_key", {}) if self.config else {} + ) for lab_sample in self.lab_samples: if lab_sample.reference_genome: @@ -178,21 +180,18 @@ async def process(self): self.status = "failed" return - pipeline = self.pipeline_info.get("pipeline", "") - pipeline_exec = self.pipeline_info.get("pipeline_exec", "") - - logging.debug(f"[{self.run_sample_id}] Pipeline: {pipeline}") - logging.debug(f"[{self.run_sample_id}] Pipeline executable: {pipeline_exec}") - logging.info(f"[{self.run_sample_id}] Generating required files...") - # Step 3: Generate necessary files based on pipeline requirements + + # Step 3: Generate required files based on configuration # TODO: Register generated files in the file handler - if self.pipeline_info.get("libraries_csv"): - self.generate_libraries_csv() - if self.pipeline_info.get("feature_ref"): - self.generate_feature_reference_csv() - if self.pipeline_info.get("multi_csv"): - self.generate_multi_sample_csv() + files_to_generate = self.pipeline_info.get("files_to_generate", []) + for file_type in files_to_generate: + if file_type == "libraries_csv": + self.generate_libraries_csv() + elif file_type == "feature_ref_csv": + self.generate_feature_reference_csv() + elif file_type == "multi_csv": + self.generate_multi_sample_csv() cellranger_command = self.assemble_cellranger_command() @@ -251,20 +250,23 @@ def assemble_cellranger_command(self) -> str: if self.reference_genomes is None: raise ValueError("Reference genomes information is missing.") - command_parts = [ - f"{self.pipeline_info['pipeline_exec']} {self.pipeline_info['pipeline']}", - ] - + pipeline = self.pipeline_info.get("pipeline", "") + pipeline_exec = self.pipeline_info.get("pipeline_exec", "") required_args = self.pipeline_info.get("required_arguments", []) - additional_args = self.pipeline_info.get("fixed_arguments", []) + additional_args = self.pipeline_info.get("command_arguments", []) - # Add output directory argument - additional_args.append(f"--output-dir={str(self.file_handler.sample_dir)}") + command_parts = [f"{pipeline_exec} {pipeline}"] + + logging.debug(f"[{self.run_sample_id}] Pipeline: {pipeline}") + logging.debug(f"[{self.run_sample_id}] Pipeline executable: {pipeline_exec}") # Mapping of argument names to their values arg_values: Dict[str, Any] = { "--id": self.run_sample_id, - # '--transcriptome': self.config.get('gene_expression_reference'), + "--csv": str( + self.file_handler.base_dir / f"{self.run_sample_id}_multi.csv" + ), + "--transcriptome": self.reference_genomes["gex"], "--fastqs": ",".join( [",".join(paths) for paths in self.lab_samples[0].fastq_dirs.values()] ), @@ -276,37 +278,64 @@ def assemble_cellranger_command(self) -> str: self.file_handler.base_dir / f"{self.run_sample_id}_feature_reference.csv" ), - "--csv": str( - self.file_handler.base_dir / f"{self.run_sample_id}_multi.csv" - ), } # Add references based on the pipeline - if self.pipeline_info.get("pipeline") == "count": - if "gex" in self.reference_genomes: - arg_values["--transcriptome"] = self.reference_genomes["gex"] - elif self.pipeline_info.get("pipeline") == "vdj": - if "vdj" in self.reference_genomes: - arg_values["--reference"] = self.reference_genomes["vdj"] - elif self.pipeline_info.get("pipeline") == "atac": - if "atac" in self.reference_genomes: - arg_values["--reference"] = self.reference_genomes["atac"] - elif self.pipeline_info.get("pipeline") == "multi": - # references are specified in the multi-sample CSV file - pass + # if self.pipeline_info.get("pipeline") == "count": + # if "gex" in self.reference_genomes: + # arg_values["--transcriptome"] = self.reference_genomes["gex"] + # elif self.pipeline_info.get("pipeline") == "vdj": + # if "vdj" in self.reference_genomes: + # arg_values["--reference"] = self.reference_genomes["vdj"] + # elif self.pipeline_info.get("pipeline") == "atac": + # if "atac" in self.reference_genomes: + # arg_values["--reference"] = self.reference_genomes["atac"] + # elif self.pipeline_info.get("pipeline") == "multi": + # # references are specified in the multi-sample CSV file + # pass for arg in required_args: value = arg_values.get(arg) if value: command_parts.append(f"{arg}={value}") + else: + logging.error( + f"[{self.run_sample_id}] Missing value for required argument {arg}" + ) # Include additional arguments command_parts.extend(additional_args) + # Add output directory argument + command_parts.append(f"--output-dir={str(self.file_handler.sample_dir)}") + # Join all parts into a single command string command = " \\\n ".join(command_parts) return command + def collect_libraries_data(self) -> List[Dict[str, str]]: + """Generate the data for the libraries.""" + libraries_data = [] + for lab_sample in self.lab_samples: + feature_type = self.feature_to_library_type.get(lab_sample.feature) + if not feature_type: + logging.error( + f"[{self.run_sample_id}] Feature type not found for feature " + f"'{lab_sample.feature}' in sample '{lab_sample.sample_id}'" + ) + continue + # Collect FASTQ paths + for paths in lab_sample.fastq_dirs.values(): + for path in paths: + libraries_data.append( + { + "fastqs": str(path), + "sample": lab_sample.lab_sample_id, + "library_type": feature_type, + } + ) + return libraries_data + def generate_libraries_csv(self) -> None: """Generate the libraries CSV file required for processing.""" logging.info(f"[{self.run_sample_id}] Generating library CSV") @@ -314,29 +343,22 @@ def generate_libraries_csv(self) -> None: self.file_handler.base_dir / f"{self.run_sample_id}_libraries.csv" ) + # Ensure the directory exists + library_csv_path.parent.mkdir(parents=True, exist_ok=True) + + libraries_data = self.collect_libraries_data() + with open(library_csv_path, "w", newline="") as csvfile: writer = csv.DictWriter( csvfile, fieldnames=["fastqs", "sample", "library_type"] ) writer.writeheader() - for lab_sample in self.lab_samples: - feature_type = self.feature_to_library_type.get(lab_sample.feature) - if not feature_type: - logging.error( - f"[{self.run_sample_id}] Feature type not found for feature " - f"'{lab_sample.feature}' in sample '{lab_sample.sample_id}'" - ) - continue - # Write one row per FASTQ directory - for paths in lab_sample.fastq_dirs.values(): - for path in paths: - writer.writerow( - { - "fastqs": str(path), - "sample": lab_sample.lab_sample_id, - "library_type": feature_type, - } - ) + for lib in libraries_data: + writer.writerow(lib) + + logging.info( + f"[{self.run_sample_id}] Libraries CSV generated at {library_csv_path}" + ) def generate_feature_reference_csv(self) -> None: """Generate the feature reference CSV file required for processing.""" @@ -346,7 +368,46 @@ def generate_feature_reference_csv(self) -> None: def generate_multi_sample_csv(self) -> None: """Generate the multi-sample CSV file required for processing.""" logging.info(f"[{self.run_sample_id}] Generating multi-sample CSV") - pass + multi_csv_path = self.file_handler.base_dir / f"{self.run_sample_id}_multi.csv" + + # Ensure the directory exists + multi_csv_path.parent.mkdir(parents=True, exist_ok=True) + + with open(multi_csv_path, "w") as multi_file: + # Get multi CSV sections and arguments from the configuration + if self.pipeline_info: + multi_sections = self.pipeline_info.get("multi_csv_sections", []) + multi_arguments = self.pipeline_info.get("multi_csv_arguments", {}) + + # Write sections based on multi_arguments + for section in multi_sections: + multi_file.write(f"[{section}]\n") + # Add reference path if available + ref_key = self.config.get("feature_to_ref_key", {}).get(section) + if ref_key and ref_key in self.reference_genomes: + ref_path = self.reference_genomes.get(ref_key, "") + multi_file.write(f"reference,{ref_path}\n") + else: + logging.warning( + f"No reference genome found for section '{section}'" + ) + # Add additional arguments + for arg in multi_arguments.get(section, []): + multi_file.write(f"{arg}\n") + multi_file.write("\n") + + # Write the [libraries] section + multi_file.write("[libraries]\n") + multi_file.write("fastqs,sample,library_type\n") + libraries_data = self.collect_libraries_data() + for lib in libraries_data: + multi_file.write( + f"{lib['fastqs']},{lib['sample']},{lib['library_type']}\n" + ) + + logging.info( + f"[{self.run_sample_id}] Multi-sample CSV generated at {multi_csv_path}" + ) def post_process(self) -> None: """Perform post-processing steps after job completion."""