Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes issue encountered during testing. #96

Merged
merged 3 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions qp_klp/Protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,10 @@ def convert_raw_to_fastq(self):
if 'TellReadJob' not in self.skip_steps:
job.run(callback=self.job_callback)

self.pipeline.get_sample_ids()
failed_samples = []
# audit the results to determine which samples failed to convert
# properly. Append these to the failed-samples report and also
# return the list directly to the caller.
failed_samples = job.audit()
if hasattr(self, 'fsr'):
# NB 16S does not require a failed samples report and
# it is not performed by SPP.
Expand All @@ -130,32 +132,35 @@ def convert_raw_to_fastq(self):
def generate_sequence_counts(self):
config = self.pipeline.get_software_configuration('tell-seq')

files_to_count_path = join(self.pipeline.output_path,
'files_to_count.txt')

with open(files_to_count_path, 'w') as f:
for root, _, files in walk(self.raw_fastq_files_path):
for _file in files:
if self._determine_orientation(_file) in ['R1', 'R2']:
print(join(root, _file), file=f)

job = SeqCountsJob(self.pipeline.run_dir,
self.pipeline.output_path,
self.pipeline.input_file_path,
config['queue'],
config['nodes'],
config['wallclock_time_in_minutes'],
config['normcount_mem_limit'],
config['modules_to_load'],
self.master_qiita_job_id,
'',
config['integrate_script_path'],
self.pipeline.qiita_job_id)
config['job_max_array_length'],
files_to_count_path,
self.pipeline.get_sample_sheet_path(),
cores_per_task=config['tellread_cores'])

if 'SeqCountsJob' not in self.skip_steps:
job.run(callback=self.job_callback)

# audit the results to determine which samples failed to convert
# properly. Append these to the failed-samples report and also
# return the list directly to the caller.
failed_samples = job.audit_me(self.pipeline.get_sample_ids())
if hasattr(self, 'fsr'):
# NB 16S does not require a failed samples report and
# it is not performed by SPP.
self.fsr.write(failed_samples, job.__class__.__name__)

return failed_samples
# Do not add an entry to fsr because w/respect to counting, either
# all jobs are going to fail or none are going to fail. It's not
# likely that we're going to fail to count sequences for only some
# of the samples.

def integrate_results(self):
config = self.pipeline.get_software_configuration('tell-seq')
Expand All @@ -173,7 +178,6 @@ def integrate_results(self):
config['integrate_mem_limit'],
config['modules_to_load'],
self.master_qiita_job_id,
"foo",
config['integrate_script_path'],
# NB: sample_index_list used may vary
# from project to project in the future.
Expand Down Expand Up @@ -224,7 +228,7 @@ def integrate_results(self):
# audit the results to determine which samples failed to convert
# properly. Append these to the failed-samples report and also
# return the list directly to the caller.
failed_samples = job.audit_me(self.pipeline.get_sample_ids())
failed_samples = job.audit()

if hasattr(self, 'fsr'):
# NB 16S does not require a failed samples report and
Expand Down
4 changes: 2 additions & 2 deletions qp_klp/TellseqMetagenomicWorkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ def execute_pipeline(self):
# This means fsr reports will be accurate even on restarts.
self.convert_raw_to_fastq()

self.generate_sequence_counts()

self.integrate_results()

self.generate_sequence_counts()

self.update_status("Performing quality control", 2, 9)
self.quality_control()

Expand Down
42 changes: 39 additions & 3 deletions qp_klp/Workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,40 @@ def get_samples_in_qiita(cls, qclient, qiita_id):

return (samples, tids)

@classmethod
def _determine_orientation(cls, file_name):
# aka forward, reverse, and indexed reads
orientations = ['R1', 'R2', 'I1', 'I2']

results = []

# assume orientation is always present in the file's name.
# assume that it is of one of the four forms above.
# assume that it is always the right-most occurance of the four
# orientations above.
# assume that orientation is encapsulated with either '_' or '.'
# e.g.: '_R1_', '.I2.'.
# assume users can and will include any or all of the four
# orientation as part of their filenames as well. e.g.:
# ABC_7_04_1776_R1_SRE_S3_L007_R2_001.trimmed.fastq.gz
for o in orientations:
variations = [f"_{o}_", f".{o}."]
for v in variations:
# rfind searches from the end of the string, rather than
# its beginning. It returns the position in the string
# where the substring begins.
results.append((file_name.rfind(v), o))

# the orientation will be the substring found with the maximum
# found value for pos. That is, it will be the substring that
# begins at the rightest most position in the file name.
results.sort(reverse=True)

pos, orientation = results[0]

# if no orientations were found, then return None.
return None if pos == -1 else orientation

def _get_postqc_fastq_files(self, out_dir, project):
af = None
sub_folders = ['amplicon', 'filtered_sequences', 'trimmed_sequences']
Expand All @@ -599,11 +633,13 @@ def _get_postqc_fastq_files(self, out_dir, project):
'raw_reverse_seqs': []}

for fastq_file in af:
if '_I1_' in fastq_file or '_I2_' in fastq_file:
_, file_name = split(fastq_file)
orientation = self._determine_orientation(file_name)
if orientation in ['I1', 'I2']:
files['raw_barcodes'].append(fastq_file)
elif '_R1_' in fastq_file:
elif orientation == 'R1':
files['raw_forward_seqs'].append(fastq_file)
elif '_R2_' in fastq_file:
elif orientation == 'R2':
files['raw_reverse_seqs'].append(fastq_file)
else:
raise ValueError(f"Unrecognized file: {fastq_file}")
Expand Down
18 changes: 9 additions & 9 deletions qp_klp/klp.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
user_input_file = parameters.pop('sample_sheet')
lane_number = parameters.pop('lane_number')

if {'body', 'content_type', 'filename'} != set(user_input_file):
return False, None, ("This doesn't appear to be a valid sample "
"sheet or mapping file; please review.")
uif_path = out_path(user_input_file['filename'].replace(' ', '_'))
# save raw data to file
with open(uif_path, 'w') as f:
f.write(user_input_file['body'])

# the run_identifier must be saved because it is not always preserved
# in a dependable location downstream. The user input file must be
# saved because it is always a unique name and it cannot be guaranteed
Expand All @@ -114,15 +122,7 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
# the user_input file on the first run.
restart_file_path = out_path('restart_me')
with open(restart_file_path, 'w') as f:
f.write(f"{run_identifier}\n{user_input_file}")

if {'body', 'content_type', 'filename'} != set(user_input_file):
return False, None, ("This doesn't appear to be a valid sample "
"sheet or mapping file; please review.")
uif_path = out_path(user_input_file['filename'].replace(' ', '_'))
# save raw data to file
with open(uif_path, 'w') as f:
f.write(user_input_file['body'])
f.write(f"{run_identifier}\n{uif_path}")

final_results_path = out_path('final_results')
makedirs(final_results_path, exist_ok=True)
Expand Down
43 changes: 43 additions & 0 deletions qp_klp/tests/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from os import environ, remove, getcwd
import re
from qp_klp.WorkflowFactory import WorkflowFactory
from qp_klp.Workflows import Workflow
from metapool import load_sample_sheet
from collections import defaultdict
from random import randint
Expand Down Expand Up @@ -890,3 +891,45 @@ def open_job_script(script_path):
exp = open_job_script("qp_klp/tests/data/tellread_test.sbatch")

self.assertEqual(obs, exp)

def test_foo(self):
test_names = [
# single additional occurance: R1
("ABC_7_04_1776_R1_SRE_S3_L007_R1_001.trimmed.fastq.gz", "R1"),
("ABC_7_04_1776_R1_SRE_S3_L007_R2_001.trimmed.fastq.gz", "R2"),
("ABC_7_04_1776_R1_SRE_S3_L007_I1_001.trimmed.fastq.gz", "I1"),
("ABC_7_04_1776_R1_SRE_S3_L007_I2_001.trimmed.fastq.gz", "I2"),

# test w/dots.
("ABC_7_04_1776.R1.SRE_S3_L007.R1.001.trimmed.fastq.gz", "R1"),
("ABC_7_04_1776.R1.SRE_S3_L007.R2.001.trimmed.fastq.gz", "R2"),
("ABC_7_04_1776.R1.SRE_S3_L007.I1.001.trimmed.fastq.gz", "I1"),
("ABC_7_04_1776.R1.SRE_S3_L007.I2.001.trimmed.fastq.gz", "I2"),

# single additional occurance: R2
("ABC_7_04_1776_R2_SRE_S3_L007_R1_001.trimmed.fastq.gz", "R1"),
("ABC_7_04_1776_R2_SRE_S3_L007_R2_001.trimmed.fastq.gz", "R2"),
("ABC_7_04_1776_R2_SRE_S3_L007_I1_001.trimmed.fastq.gz", "I1"),
("ABC_7_04_1776_R2_SRE_S3_L007_I2_001.trimmed.fastq.gz", "I2"),

# single additional occurance: In
("ABC_7_04_1776_I2_SRE_S3_L007_R1_001.trimmed.fastq.gz", "R1"),
("ABC_7_04_1776_I1_SRE_S3_L007_R2_001.trimmed.fastq.gz", "R2"),
("ABC_7_04_1776_I2_SRE_S3_L007_I1_001.trimmed.fastq.gz", "I1"),
("ABC_7_04_1776_I1_SRE_S3_L007_I2_001.trimmed.fastq.gz", "I2"),

# no additional occurances
("ABC_7_04_1776_SRE_S3_L007_R1_001.trimmed.fastq.gz", "R1"),
("ABC_7_04_1776_SRE_S3_L007_R2_001.trimmed.fastq.gz", "R2"),
("ABC_7_04_1776_SRE_S3_L007_I1_001.trimmed.fastq.gz", "I1"),
("ABC_7_04_1776_SRE_S3_L007_I2_001.trimmed.fastq.gz", "I2"),

# two additional occurances
("ABC_7_04_1776_I2_SRE.R1.S3_L007_R1_001.trimmed.fastq.gz", "R1"),
("ABC_7_04_1776_I1_SRE.R1.S3_L007_R2_001.trimmed.fastq.gz", "R2"),
("ABC_7_04_1776_I2_SRE.R1.S3_L007_I1_001.trimmed.fastq.gz", "I1"),
("ABC_7_04_1776_I1_SRE.R1.S3_L007_I2_001.trimmed.fastq.gz", "I2"),
]

for file_name, exp in test_names:
self.assertEqual(Workflow._determine_orientation(file_name), exp)
Loading