From 63faccbe2f7b0fd922eef0be19f7bae4656c12c2 Mon Sep 17 00:00:00 2001 From: Charles Cowart Date: Thu, 12 Dec 2024 22:35:45 -0800 Subject: [PATCH 1/3] fixes issue encountered during testing. --- qp_klp/Workflows.py | 42 ++++++++++++++++++++++++++++++--- qp_klp/tests/test_workflows.py | 43 ++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 3 deletions(-) diff --git a/qp_klp/Workflows.py b/qp_klp/Workflows.py index 0b8a608..e2b1fd4 100644 --- a/qp_klp/Workflows.py +++ b/qp_klp/Workflows.py @@ -584,6 +584,40 @@ def get_samples_in_qiita(cls, qclient, qiita_id): return (samples, tids) + @classmethod + def _determine_orientation(cls, file_name): + # aka forward, reverse, and indexed reads + orientations = ['R1', 'R2', 'I1', 'I2'] + + results = [] + + # assume orientation is always present in the file's name. + # assume that it is of one of the four forms above. + # assume that it is always the right-most occurance of the four + # orientations above. + # assume that orientation is encapsulated with either '_' or '.' + # e.g.: '_R1_', '.I2.'. + # assume users can and will include any or all of the four + # orientation as part of their filenames as well. e.g.: + # ABC_7_04_1776_R1_SRE_S3_L007_R2_001.trimmed.fastq.gz + for o in orientations: + variations = [f"_{o}_", f".{o}."] + for v in variations: + # rfind searches from the end of the string, rather than + # its beginning. It returns the position in the string + # where the substring begins. + results.append((file_name.rfind(v), o)) + + # the orientation will be the substring found with the maximum + # found value for pos. That is, it will be the substring that + # begins at the rightest most position in the file name. + results.sort(reverse=True) + + pos, orientation = results[0] + + # if no orientations were found, then return None. + return None if pos == -1 else orientation + def _get_postqc_fastq_files(self, out_dir, project): af = None sub_folders = ['amplicon', 'filtered_sequences', 'trimmed_sequences'] @@ -599,11 +633,13 @@ def _get_postqc_fastq_files(self, out_dir, project): 'raw_reverse_seqs': []} for fastq_file in af: - if '_I1_' in fastq_file or '_I2_' in fastq_file: + _, file_name = split(fastq_file) + orientation = self._determine_orientation(file_name) + if orientation in ['I1', 'I2']: files['raw_barcodes'].append(fastq_file) - elif '_R1_' in fastq_file: + elif orientation == 'R1': files['raw_forward_seqs'].append(fastq_file) - elif '_R2_' in fastq_file: + elif orientation == 'R2': files['raw_reverse_seqs'].append(fastq_file) else: raise ValueError(f"Unrecognized file: {fastq_file}") diff --git a/qp_klp/tests/test_workflows.py b/qp_klp/tests/test_workflows.py index b2cd39d..37f1fcf 100644 --- a/qp_klp/tests/test_workflows.py +++ b/qp_klp/tests/test_workflows.py @@ -12,6 +12,7 @@ from os import environ, remove, getcwd import re from qp_klp.WorkflowFactory import WorkflowFactory +from qp_klp.Workflows import Workflow from metapool import load_sample_sheet from collections import defaultdict from random import randint @@ -890,3 +891,45 @@ def open_job_script(script_path): exp = open_job_script("qp_klp/tests/data/tellread_test.sbatch") self.assertEqual(obs, exp) + + def test_foo(self): + test_names = [ + # single additional occurance: R1 + ("ABC_7_04_1776_R1_SRE_S3_L007_R1_001.trimmed.fastq.gz", "R1"), + ("ABC_7_04_1776_R1_SRE_S3_L007_R2_001.trimmed.fastq.gz", "R2"), + ("ABC_7_04_1776_R1_SRE_S3_L007_I1_001.trimmed.fastq.gz", "I1"), + ("ABC_7_04_1776_R1_SRE_S3_L007_I2_001.trimmed.fastq.gz", "I2"), + + # test w/dots. + ("ABC_7_04_1776.R1.SRE_S3_L007.R1.001.trimmed.fastq.gz", "R1"), + ("ABC_7_04_1776.R1.SRE_S3_L007.R2.001.trimmed.fastq.gz", "R2"), + ("ABC_7_04_1776.R1.SRE_S3_L007.I1.001.trimmed.fastq.gz", "I1"), + ("ABC_7_04_1776.R1.SRE_S3_L007.I2.001.trimmed.fastq.gz", "I2"), + + # single additional occurance: R2 + ("ABC_7_04_1776_R2_SRE_S3_L007_R1_001.trimmed.fastq.gz", "R1"), + ("ABC_7_04_1776_R2_SRE_S3_L007_R2_001.trimmed.fastq.gz", "R2"), + ("ABC_7_04_1776_R2_SRE_S3_L007_I1_001.trimmed.fastq.gz", "I1"), + ("ABC_7_04_1776_R2_SRE_S3_L007_I2_001.trimmed.fastq.gz", "I2"), + + # single additional occurance: In + ("ABC_7_04_1776_I2_SRE_S3_L007_R1_001.trimmed.fastq.gz", "R1"), + ("ABC_7_04_1776_I1_SRE_S3_L007_R2_001.trimmed.fastq.gz", "R2"), + ("ABC_7_04_1776_I2_SRE_S3_L007_I1_001.trimmed.fastq.gz", "I1"), + ("ABC_7_04_1776_I1_SRE_S3_L007_I2_001.trimmed.fastq.gz", "I2"), + + # no additional occurances + ("ABC_7_04_1776_SRE_S3_L007_R1_001.trimmed.fastq.gz", "R1"), + ("ABC_7_04_1776_SRE_S3_L007_R2_001.trimmed.fastq.gz", "R2"), + ("ABC_7_04_1776_SRE_S3_L007_I1_001.trimmed.fastq.gz", "I1"), + ("ABC_7_04_1776_SRE_S3_L007_I2_001.trimmed.fastq.gz", "I2"), + + # two additional occurances + ("ABC_7_04_1776_I2_SRE.R1.S3_L007_R1_001.trimmed.fastq.gz", "R1"), + ("ABC_7_04_1776_I1_SRE.R1.S3_L007_R2_001.trimmed.fastq.gz", "R2"), + ("ABC_7_04_1776_I2_SRE.R1.S3_L007_I1_001.trimmed.fastq.gz", "I1"), + ("ABC_7_04_1776_I1_SRE.R1.S3_L007_I2_001.trimmed.fastq.gz", "I2"), + ] + + for file_name, exp in test_names: + self.assertEqual(Workflow._determine_orientation(file_name), exp) From 3cbc89e535e2edc051ced84db2fb5bead9d1c899 Mon Sep 17 00:00:00 2001 From: Charles Cowart Date: Wed, 8 Jan 2025 20:43:03 -0800 Subject: [PATCH 2/3] Updates based on testing --- qp_klp/Protocol.py | 43 +++++++++++++++++++++++++------------------ qp_klp/klp.py | 18 +++++++++--------- 2 files changed, 34 insertions(+), 27 deletions(-) diff --git a/qp_klp/Protocol.py b/qp_klp/Protocol.py index ae6422b..160ca86 100644 --- a/qp_klp/Protocol.py +++ b/qp_klp/Protocol.py @@ -118,8 +118,10 @@ def convert_raw_to_fastq(self): if 'TellReadJob' not in self.skip_steps: job.run(callback=self.job_callback) - self.pipeline.get_sample_ids() - failed_samples = [] + # audit the results to determine which samples failed to convert + # properly. Append these to the failed-samples report and also + # return the list directly to the caller. + failed_samples = job.audit() if hasattr(self, 'fsr'): # NB 16S does not require a failed samples report and # it is not performed by SPP. @@ -129,33 +131,39 @@ def convert_raw_to_fastq(self): def generate_sequence_counts(self): config = self.pipeline.get_software_configuration('tell-seq') + # filter on corrected.err_barcode_removed + + files_to_count_path = join(self.pipeline.output_path, + 'files_to_count.txt') + + with open(files_to_count_path, 'w') as f: + # for raw_counts_r1r2, count corrected.err_barcode_removed files + # (TellReadJob final output). + for root, dirs, files in walk(self.raw_fastq_files_path): + for _file in files: + if 'corrected.err_barcode_removed' in _file: + print(join(root, _file), file=f) job = SeqCountsJob(self.pipeline.run_dir, self.pipeline.output_path, - self.pipeline.input_file_path, config['queue'], config['nodes'], config['wallclock_time_in_minutes'], config['normcount_mem_limit'], config['modules_to_load'], self.master_qiita_job_id, - '', - config['integrate_script_path'], - self.pipeline.qiita_job_id) + config['job_max_array_length'], + files_to_count_path, + self.pipeline.get_sample_sheet_path(), + cores_per_task=config['tellread_cores']) if 'SeqCountsJob' not in self.skip_steps: job.run(callback=self.job_callback) - # audit the results to determine which samples failed to convert - # properly. Append these to the failed-samples report and also - # return the list directly to the caller. - failed_samples = job.audit_me(self.pipeline.get_sample_ids()) - if hasattr(self, 'fsr'): - # NB 16S does not require a failed samples report and - # it is not performed by SPP. - self.fsr.write(failed_samples, job.__class__.__name__) - - return failed_samples + # Do not add an entry to fsr because w/respect to counting, either + # all jobs are going to fail or none are going to fail. It's not + # likely that we're going to fail to count sequences for only some + # of the samples. def integrate_results(self): config = self.pipeline.get_software_configuration('tell-seq') @@ -173,7 +181,6 @@ def integrate_results(self): config['integrate_mem_limit'], config['modules_to_load'], self.master_qiita_job_id, - "foo", config['integrate_script_path'], # NB: sample_index_list used may vary # from project to project in the future. @@ -224,7 +231,7 @@ def integrate_results(self): # audit the results to determine which samples failed to convert # properly. Append these to the failed-samples report and also # return the list directly to the caller. - failed_samples = job.audit_me(self.pipeline.get_sample_ids()) + failed_samples = job.audit() if hasattr(self, 'fsr'): # NB 16S does not require a failed samples report and diff --git a/qp_klp/klp.py b/qp_klp/klp.py index ca6c015..6168b4e 100644 --- a/qp_klp/klp.py +++ b/qp_klp/klp.py @@ -106,6 +106,14 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir): user_input_file = parameters.pop('sample_sheet') lane_number = parameters.pop('lane_number') + if {'body', 'content_type', 'filename'} != set(user_input_file): + return False, None, ("This doesn't appear to be a valid sample " + "sheet or mapping file; please review.") + uif_path = out_path(user_input_file['filename'].replace(' ', '_')) + # save raw data to file + with open(uif_path, 'w') as f: + f.write(user_input_file['body']) + # the run_identifier must be saved because it is not always preserved # in a dependable location downstream. The user input file must be # saved because it is always a unique name and it cannot be guaranteed @@ -114,15 +122,7 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir): # the user_input file on the first run. restart_file_path = out_path('restart_me') with open(restart_file_path, 'w') as f: - f.write(f"{run_identifier}\n{user_input_file}") - - if {'body', 'content_type', 'filename'} != set(user_input_file): - return False, None, ("This doesn't appear to be a valid sample " - "sheet or mapping file; please review.") - uif_path = out_path(user_input_file['filename'].replace(' ', '_')) - # save raw data to file - with open(uif_path, 'w') as f: - f.write(user_input_file['body']) + f.write(f"{run_identifier}\n{uif_path}") final_results_path = out_path('final_results') makedirs(final_results_path, exist_ok=True) From 859b7be63ee0e95d0f941ebc133e1b29b13745d3 Mon Sep 17 00:00:00 2001 From: Charles Cowart Date: Thu, 9 Jan 2025 11:47:47 -0800 Subject: [PATCH 3/3] Paste integrated count change --- qp_klp/Protocol.py | 7 ++----- qp_klp/TellseqMetagenomicWorkflow.py | 4 ++-- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/qp_klp/Protocol.py b/qp_klp/Protocol.py index 160ca86..195577d 100644 --- a/qp_klp/Protocol.py +++ b/qp_klp/Protocol.py @@ -131,17 +131,14 @@ def convert_raw_to_fastq(self): def generate_sequence_counts(self): config = self.pipeline.get_software_configuration('tell-seq') - # filter on corrected.err_barcode_removed files_to_count_path = join(self.pipeline.output_path, 'files_to_count.txt') with open(files_to_count_path, 'w') as f: - # for raw_counts_r1r2, count corrected.err_barcode_removed files - # (TellReadJob final output). - for root, dirs, files in walk(self.raw_fastq_files_path): + for root, _, files in walk(self.raw_fastq_files_path): for _file in files: - if 'corrected.err_barcode_removed' in _file: + if self._determine_orientation(_file) in ['R1', 'R2']: print(join(root, _file), file=f) job = SeqCountsJob(self.pipeline.run_dir, diff --git a/qp_klp/TellseqMetagenomicWorkflow.py b/qp_klp/TellseqMetagenomicWorkflow.py index 72bc100..be353f2 100644 --- a/qp_klp/TellseqMetagenomicWorkflow.py +++ b/qp_klp/TellseqMetagenomicWorkflow.py @@ -101,10 +101,10 @@ def execute_pipeline(self): # This means fsr reports will be accurate even on restarts. self.convert_raw_to_fastq() - self.generate_sequence_counts() - self.integrate_results() + self.generate_sequence_counts() + self.update_status("Performing quality control", 2, 9) self.quality_control()