Skip to content

Commit

Permalink
Refactor TenXRunSample class logic to add vdj handling using multi pi…
Browse files Browse the repository at this point in the history
…peline
  • Loading branch information
glrs committed Oct 15, 2024
1 parent 1041ab9 commit 6256fd3
Showing 1 changed file with 121 additions and 60 deletions.
181 changes: 121 additions & 60 deletions lib/realms/tenx/run_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def __init__(
"""
self.run_sample_id: str = sample_id
self.lab_samples: List[Any] = lab_samples
self.project_info: Dict[str, Any] = project_info
self.config: Mapping[str, Any] = config
self.project_info: Dict[str, Any] = project_info or {}
self.config: Mapping[str, Any] = config or {}
self.ydm: Any = yggdrasil_db_manager

# self.decision_table = TenXUtils.load_decision_table("10x_decision_table.json")
Expand All @@ -50,9 +50,9 @@ def __init__(

self.features: List[str] = self._collect_features()
self.pipeline_info: Optional[Dict[str, Any]] = self._get_pipeline_info()
self.reference_genomes: Optional[Dict[str, str]] = (
self.reference_genomes: Dict[str, str] = (
self.collect_reference_genomes()
)
) or {}

if DEBUG:
# Use a mock SlurmJobManager for debugging purposes
Expand Down Expand Up @@ -97,7 +97,9 @@ def collect_reference_genomes(self) -> Optional[Dict[str, str]]:
or None if an error occurs.
"""
ref_genomes: Dict[str, str] = {}
feature_to_ref_key = self.config.get("feature_to_ref_key", {})
feature_to_ref_key = (
self.config.get("feature_to_ref_key", {}) if self.config else {}
)

for lab_sample in self.lab_samples:
if lab_sample.reference_genome:
Expand Down Expand Up @@ -178,21 +180,18 @@ async def process(self):
self.status = "failed"
return

pipeline = self.pipeline_info.get("pipeline", "")
pipeline_exec = self.pipeline_info.get("pipeline_exec", "")

logging.debug(f"[{self.run_sample_id}] Pipeline: {pipeline}")
logging.debug(f"[{self.run_sample_id}] Pipeline executable: {pipeline_exec}")

logging.info(f"[{self.run_sample_id}] Generating required files...")
# Step 3: Generate necessary files based on pipeline requirements

# Step 3: Generate required files based on configuration
# TODO: Register generated files in the file handler
if self.pipeline_info.get("libraries_csv"):
self.generate_libraries_csv()
if self.pipeline_info.get("feature_ref"):
self.generate_feature_reference_csv()
if self.pipeline_info.get("multi_csv"):
self.generate_multi_sample_csv()
files_to_generate = self.pipeline_info.get("files_to_generate", [])
for file_type in files_to_generate:
if file_type == "libraries_csv":
self.generate_libraries_csv()
elif file_type == "feature_ref_csv":
self.generate_feature_reference_csv()
elif file_type == "multi_csv":
self.generate_multi_sample_csv()

cellranger_command = self.assemble_cellranger_command()

Expand Down Expand Up @@ -251,20 +250,23 @@ def assemble_cellranger_command(self) -> str:
if self.reference_genomes is None:
raise ValueError("Reference genomes information is missing.")

command_parts = [
f"{self.pipeline_info['pipeline_exec']} {self.pipeline_info['pipeline']}",
]

pipeline = self.pipeline_info.get("pipeline", "")
pipeline_exec = self.pipeline_info.get("pipeline_exec", "")
required_args = self.pipeline_info.get("required_arguments", [])
additional_args = self.pipeline_info.get("fixed_arguments", [])
additional_args = self.pipeline_info.get("command_arguments", [])

# Add output directory argument
additional_args.append(f"--output-dir={str(self.file_handler.sample_dir)}")
command_parts = [f"{pipeline_exec} {pipeline}"]

logging.debug(f"[{self.run_sample_id}] Pipeline: {pipeline}")
logging.debug(f"[{self.run_sample_id}] Pipeline executable: {pipeline_exec}")

# Mapping of argument names to their values
arg_values: Dict[str, Any] = {
"--id": self.run_sample_id,
# '--transcriptome': self.config.get('gene_expression_reference'),
"--csv": str(
self.file_handler.base_dir / f"{self.run_sample_id}_multi.csv"
),
"--transcriptome": self.reference_genomes["gex"],
"--fastqs": ",".join(
[",".join(paths) for paths in self.lab_samples[0].fastq_dirs.values()]
),
Expand All @@ -276,67 +278,87 @@ def assemble_cellranger_command(self) -> str:
self.file_handler.base_dir
/ f"{self.run_sample_id}_feature_reference.csv"
),
"--csv": str(
self.file_handler.base_dir / f"{self.run_sample_id}_multi.csv"
),
}

# Add references based on the pipeline
if self.pipeline_info.get("pipeline") == "count":
if "gex" in self.reference_genomes:
arg_values["--transcriptome"] = self.reference_genomes["gex"]
elif self.pipeline_info.get("pipeline") == "vdj":
if "vdj" in self.reference_genomes:
arg_values["--reference"] = self.reference_genomes["vdj"]
elif self.pipeline_info.get("pipeline") == "atac":
if "atac" in self.reference_genomes:
arg_values["--reference"] = self.reference_genomes["atac"]
elif self.pipeline_info.get("pipeline") == "multi":
# references are specified in the multi-sample CSV file
pass
# if self.pipeline_info.get("pipeline") == "count":
# if "gex" in self.reference_genomes:
# arg_values["--transcriptome"] = self.reference_genomes["gex"]
# elif self.pipeline_info.get("pipeline") == "vdj":
# if "vdj" in self.reference_genomes:
# arg_values["--reference"] = self.reference_genomes["vdj"]
# elif self.pipeline_info.get("pipeline") == "atac":
# if "atac" in self.reference_genomes:
# arg_values["--reference"] = self.reference_genomes["atac"]
# elif self.pipeline_info.get("pipeline") == "multi":
# # references are specified in the multi-sample CSV file
# pass

for arg in required_args:
value = arg_values.get(arg)
if value:
command_parts.append(f"{arg}={value}")
else:
logging.error(
f"[{self.run_sample_id}] Missing value for required argument {arg}"
)

# Include additional arguments
command_parts.extend(additional_args)

# Add output directory argument
command_parts.append(f"--output-dir={str(self.file_handler.sample_dir)}")

# Join all parts into a single command string
command = " \\\n ".join(command_parts)
return command

def collect_libraries_data(self) -> List[Dict[str, str]]:
"""Generate the data for the libraries."""
libraries_data = []
for lab_sample in self.lab_samples:
feature_type = self.feature_to_library_type.get(lab_sample.feature)
if not feature_type:
logging.error(
f"[{self.run_sample_id}] Feature type not found for feature "
f"'{lab_sample.feature}' in sample '{lab_sample.sample_id}'"
)
continue
# Collect FASTQ paths
for paths in lab_sample.fastq_dirs.values():
for path in paths:
libraries_data.append(
{
"fastqs": str(path),
"sample": lab_sample.lab_sample_id,
"library_type": feature_type,
}
)
return libraries_data

def generate_libraries_csv(self) -> None:
"""Generate the libraries CSV file required for processing."""
logging.info(f"[{self.run_sample_id}] Generating library CSV")
library_csv_path = (
self.file_handler.base_dir / f"{self.run_sample_id}_libraries.csv"
)

# Ensure the directory exists
library_csv_path.parent.mkdir(parents=True, exist_ok=True)

libraries_data = self.collect_libraries_data()

with open(library_csv_path, "w", newline="") as csvfile:
writer = csv.DictWriter(
csvfile, fieldnames=["fastqs", "sample", "library_type"]
)
writer.writeheader()
for lab_sample in self.lab_samples:
feature_type = self.feature_to_library_type.get(lab_sample.feature)
if not feature_type:
logging.error(
f"[{self.run_sample_id}] Feature type not found for feature "
f"'{lab_sample.feature}' in sample '{lab_sample.sample_id}'"
)
continue
# Write one row per FASTQ directory
for paths in lab_sample.fastq_dirs.values():
for path in paths:
writer.writerow(
{
"fastqs": str(path),
"sample": lab_sample.lab_sample_id,
"library_type": feature_type,
}
)
for lib in libraries_data:
writer.writerow(lib)

logging.info(
f"[{self.run_sample_id}] Libraries CSV generated at {library_csv_path}"
)

def generate_feature_reference_csv(self) -> None:
"""Generate the feature reference CSV file required for processing."""
Expand All @@ -346,7 +368,46 @@ def generate_feature_reference_csv(self) -> None:
def generate_multi_sample_csv(self) -> None:
"""Generate the multi-sample CSV file required for processing."""
logging.info(f"[{self.run_sample_id}] Generating multi-sample CSV")
pass
multi_csv_path = self.file_handler.base_dir / f"{self.run_sample_id}_multi.csv"

# Ensure the directory exists
multi_csv_path.parent.mkdir(parents=True, exist_ok=True)

with open(multi_csv_path, "w") as multi_file:
# Get multi CSV sections and arguments from the configuration
if self.pipeline_info:
multi_sections = self.pipeline_info.get("multi_csv_sections", [])
multi_arguments = self.pipeline_info.get("multi_csv_arguments", {})

# Write sections based on multi_arguments
for section in multi_sections:
multi_file.write(f"[{section}]\n")
# Add reference path if available
ref_key = self.config.get("feature_to_ref_key", {}).get(section)
if ref_key and ref_key in self.reference_genomes:
ref_path = self.reference_genomes.get(ref_key, "")
multi_file.write(f"reference,{ref_path}\n")
else:
logging.warning(
f"No reference genome found for section '{section}'"
)
# Add additional arguments
for arg in multi_arguments.get(section, []):
multi_file.write(f"{arg}\n")
multi_file.write("\n")

# Write the [libraries] section
multi_file.write("[libraries]\n")
multi_file.write("fastqs,sample,library_type\n")
libraries_data = self.collect_libraries_data()
for lib in libraries_data:
multi_file.write(
f"{lib['fastqs']},{lib['sample']},{lib['library_type']}\n"
)

logging.info(
f"[{self.run_sample_id}] Multi-sample CSV generated at {multi_csv_path}"
)

def post_process(self) -> None:
"""Perform post-processing steps after job completion."""
Expand Down

0 comments on commit 6256fd3

Please sign in to comment.