Skip to content

Commit

Permalink
fixes issue encountered during testing. (#96)
Browse files Browse the repository at this point in the history
* fixes issue encountered during testing.

* Updates based on testing

* Paste integrated count change
  • Loading branch information
charles-cowart authored Jan 10, 2025
1 parent 2d66c98 commit babe8ae
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 32 deletions.
40 changes: 22 additions & 18 deletions qp_klp/Protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,10 @@ def convert_raw_to_fastq(self):
if 'TellReadJob' not in self.skip_steps:
job.run(callback=self.job_callback)

self.pipeline.get_sample_ids()
failed_samples = []
# audit the results to determine which samples failed to convert
# properly. Append these to the failed-samples report and also
# return the list directly to the caller.
failed_samples = job.audit()
if hasattr(self, 'fsr'):
# NB 16S does not require a failed samples report and
# it is not performed by SPP.
Expand All @@ -130,32 +132,35 @@ def convert_raw_to_fastq(self):
def generate_sequence_counts(self):
config = self.pipeline.get_software_configuration('tell-seq')

files_to_count_path = join(self.pipeline.output_path,
'files_to_count.txt')

with open(files_to_count_path, 'w') as f:
for root, _, files in walk(self.raw_fastq_files_path):
for _file in files:
if self._determine_orientation(_file) in ['R1', 'R2']:
print(join(root, _file), file=f)

job = SeqCountsJob(self.pipeline.run_dir,
self.pipeline.output_path,
self.pipeline.input_file_path,
config['queue'],
config['nodes'],
config['wallclock_time_in_minutes'],
config['normcount_mem_limit'],
config['modules_to_load'],
self.master_qiita_job_id,
'',
config['integrate_script_path'],
self.pipeline.qiita_job_id)
config['job_max_array_length'],
files_to_count_path,
self.pipeline.get_sample_sheet_path(),
cores_per_task=config['tellread_cores'])

if 'SeqCountsJob' not in self.skip_steps:
job.run(callback=self.job_callback)

# audit the results to determine which samples failed to convert
# properly. Append these to the failed-samples report and also
# return the list directly to the caller.
failed_samples = job.audit_me(self.pipeline.get_sample_ids())
if hasattr(self, 'fsr'):
# NB 16S does not require a failed samples report and
# it is not performed by SPP.
self.fsr.write(failed_samples, job.__class__.__name__)

return failed_samples
# Do not add an entry to fsr because w/respect to counting, either
# all jobs are going to fail or none are going to fail. It's not
# likely that we're going to fail to count sequences for only some
# of the samples.

def integrate_results(self):
config = self.pipeline.get_software_configuration('tell-seq')
Expand All @@ -173,7 +178,6 @@ def integrate_results(self):
config['integrate_mem_limit'],
config['modules_to_load'],
self.master_qiita_job_id,
"foo",
config['integrate_script_path'],
# NB: sample_index_list used may vary
# from project to project in the future.
Expand Down Expand Up @@ -224,7 +228,7 @@ def integrate_results(self):
# audit the results to determine which samples failed to convert
# properly. Append these to the failed-samples report and also
# return the list directly to the caller.
failed_samples = job.audit_me(self.pipeline.get_sample_ids())
failed_samples = job.audit()

if hasattr(self, 'fsr'):
# NB 16S does not require a failed samples report and
Expand Down
4 changes: 2 additions & 2 deletions qp_klp/TellseqMetagenomicWorkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ def execute_pipeline(self):
# This means fsr reports will be accurate even on restarts.
self.convert_raw_to_fastq()

self.generate_sequence_counts()

self.integrate_results()

self.generate_sequence_counts()

self.update_status("Performing quality control", 2, 9)
self.quality_control()

Expand Down
42 changes: 39 additions & 3 deletions qp_klp/Workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,40 @@ def get_samples_in_qiita(cls, qclient, qiita_id):

return (samples, tids)

@classmethod
def _determine_orientation(cls, file_name):
# aka forward, reverse, and indexed reads
orientations = ['R1', 'R2', 'I1', 'I2']

results = []

# assume orientation is always present in the file's name.
# assume that it is of one of the four forms above.
# assume that it is always the right-most occurance of the four
# orientations above.
# assume that orientation is encapsulated with either '_' or '.'
# e.g.: '_R1_', '.I2.'.
# assume users can and will include any or all of the four
# orientation as part of their filenames as well. e.g.:
# ABC_7_04_1776_R1_SRE_S3_L007_R2_001.trimmed.fastq.gz
for o in orientations:
variations = [f"_{o}_", f".{o}."]
for v in variations:
# rfind searches from the end of the string, rather than
# its beginning. It returns the position in the string
# where the substring begins.
results.append((file_name.rfind(v), o))

# the orientation will be the substring found with the maximum
# found value for pos. That is, it will be the substring that
# begins at the rightest most position in the file name.
results.sort(reverse=True)

pos, orientation = results[0]

# if no orientations were found, then return None.
return None if pos == -1 else orientation

def _get_postqc_fastq_files(self, out_dir, project):
af = None
sub_folders = ['amplicon', 'filtered_sequences', 'trimmed_sequences']
Expand All @@ -599,11 +633,13 @@ def _get_postqc_fastq_files(self, out_dir, project):
'raw_reverse_seqs': []}

for fastq_file in af:
if '_I1_' in fastq_file or '_I2_' in fastq_file:
_, file_name = split(fastq_file)
orientation = self._determine_orientation(file_name)
if orientation in ['I1', 'I2']:
files['raw_barcodes'].append(fastq_file)
elif '_R1_' in fastq_file:
elif orientation == 'R1':
files['raw_forward_seqs'].append(fastq_file)
elif '_R2_' in fastq_file:
elif orientation == 'R2':
files['raw_reverse_seqs'].append(fastq_file)
else:
raise ValueError(f"Unrecognized file: {fastq_file}")
Expand Down
18 changes: 9 additions & 9 deletions qp_klp/klp.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
user_input_file = parameters.pop('sample_sheet')
lane_number = parameters.pop('lane_number')

if {'body', 'content_type', 'filename'} != set(user_input_file):
return False, None, ("This doesn't appear to be a valid sample "
"sheet or mapping file; please review.")
uif_path = out_path(user_input_file['filename'].replace(' ', '_'))
# save raw data to file
with open(uif_path, 'w') as f:
f.write(user_input_file['body'])

# the run_identifier must be saved because it is not always preserved
# in a dependable location downstream. The user input file must be
# saved because it is always a unique name and it cannot be guaranteed
Expand All @@ -114,15 +122,7 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
# the user_input file on the first run.
restart_file_path = out_path('restart_me')
with open(restart_file_path, 'w') as f:
f.write(f"{run_identifier}\n{user_input_file}")

if {'body', 'content_type', 'filename'} != set(user_input_file):
return False, None, ("This doesn't appear to be a valid sample "
"sheet or mapping file; please review.")
uif_path = out_path(user_input_file['filename'].replace(' ', '_'))
# save raw data to file
with open(uif_path, 'w') as f:
f.write(user_input_file['body'])
f.write(f"{run_identifier}\n{uif_path}")

final_results_path = out_path('final_results')
makedirs(final_results_path, exist_ok=True)
Expand Down
43 changes: 43 additions & 0 deletions qp_klp/tests/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from os import environ, remove, getcwd
import re
from qp_klp.WorkflowFactory import WorkflowFactory
from qp_klp.Workflows import Workflow
from metapool import load_sample_sheet
from collections import defaultdict
from random import randint
Expand Down Expand Up @@ -890,3 +891,45 @@ def open_job_script(script_path):
exp = open_job_script("qp_klp/tests/data/tellread_test.sbatch")

self.assertEqual(obs, exp)

def test_foo(self):
test_names = [
# single additional occurance: R1
("ABC_7_04_1776_R1_SRE_S3_L007_R1_001.trimmed.fastq.gz", "R1"),
("ABC_7_04_1776_R1_SRE_S3_L007_R2_001.trimmed.fastq.gz", "R2"),
("ABC_7_04_1776_R1_SRE_S3_L007_I1_001.trimmed.fastq.gz", "I1"),
("ABC_7_04_1776_R1_SRE_S3_L007_I2_001.trimmed.fastq.gz", "I2"),

# test w/dots.
("ABC_7_04_1776.R1.SRE_S3_L007.R1.001.trimmed.fastq.gz", "R1"),
("ABC_7_04_1776.R1.SRE_S3_L007.R2.001.trimmed.fastq.gz", "R2"),
("ABC_7_04_1776.R1.SRE_S3_L007.I1.001.trimmed.fastq.gz", "I1"),
("ABC_7_04_1776.R1.SRE_S3_L007.I2.001.trimmed.fastq.gz", "I2"),

# single additional occurance: R2
("ABC_7_04_1776_R2_SRE_S3_L007_R1_001.trimmed.fastq.gz", "R1"),
("ABC_7_04_1776_R2_SRE_S3_L007_R2_001.trimmed.fastq.gz", "R2"),
("ABC_7_04_1776_R2_SRE_S3_L007_I1_001.trimmed.fastq.gz", "I1"),
("ABC_7_04_1776_R2_SRE_S3_L007_I2_001.trimmed.fastq.gz", "I2"),

# single additional occurance: In
("ABC_7_04_1776_I2_SRE_S3_L007_R1_001.trimmed.fastq.gz", "R1"),
("ABC_7_04_1776_I1_SRE_S3_L007_R2_001.trimmed.fastq.gz", "R2"),
("ABC_7_04_1776_I2_SRE_S3_L007_I1_001.trimmed.fastq.gz", "I1"),
("ABC_7_04_1776_I1_SRE_S3_L007_I2_001.trimmed.fastq.gz", "I2"),

# no additional occurances
("ABC_7_04_1776_SRE_S3_L007_R1_001.trimmed.fastq.gz", "R1"),
("ABC_7_04_1776_SRE_S3_L007_R2_001.trimmed.fastq.gz", "R2"),
("ABC_7_04_1776_SRE_S3_L007_I1_001.trimmed.fastq.gz", "I1"),
("ABC_7_04_1776_SRE_S3_L007_I2_001.trimmed.fastq.gz", "I2"),

# two additional occurances
("ABC_7_04_1776_I2_SRE.R1.S3_L007_R1_001.trimmed.fastq.gz", "R1"),
("ABC_7_04_1776_I1_SRE.R1.S3_L007_R2_001.trimmed.fastq.gz", "R2"),
("ABC_7_04_1776_I2_SRE.R1.S3_L007_I1_001.trimmed.fastq.gz", "I1"),
("ABC_7_04_1776_I1_SRE.R1.S3_L007_I2_001.trimmed.fastq.gz", "I2"),
]

for file_name, exp in test_names:
self.assertEqual(Workflow._determine_orientation(file_name), exp)

0 comments on commit babe8ae

Please sign in to comment.