Skip to content

Commit

Permalink
Merge pull request #108 from ayobi/generate_barcodes_for_exisiting_kits
Browse files Browse the repository at this point in the history
Create Working Branch
  • Loading branch information
cassidysymons authored Oct 15, 2024
2 parents b5022c6 + be86573 commit a739b84
Show file tree
Hide file tree
Showing 5 changed files with 514 additions and 52 deletions.
214 changes: 200 additions & 14 deletions microsetta_admin/server.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import csv
import tempfile
import jwt
from flask import render_template, Flask, request, session, send_file, url_for
import secrets
Expand Down Expand Up @@ -512,35 +514,100 @@ def new_kits():
**build_login_variables())

elif request.method == 'POST':
num_kits = int(request.form['num_kits'])
num_samples = int(request.form['num_samples'])
prefix = request.form['prefix']
selected_project_ids = request.form.getlist('project_ids')
payload = {'number_of_kits': num_kits,
'number_of_samples': num_samples,
'project_ids': selected_project_ids}
num_kits = int(request.form['number_of_kits'])
num_samples = int(request.form['number_of_samples'])

generate_barcodes = []
i = 1
while i <= num_samples:
barcode = request.form.get(f'generate_barcodes_sample_{i}')
if barcode:
generate_barcodes.append([barcode] * num_kits)
else:
generate_barcodes.append([])
i += 1

user_barcodes = []
total_provided_samples = 0

for i in range(0, 25):
barcode_file = request.files.get(f'upload_csv_{i}')
if barcode_file:
barcode_file = barcode_file.read().decode('utf-8')
new_barcodes = [line.split(',')[0].strip('\ufeff\r')
for line in barcode_file.split('\n')
if line.strip()]
if len(new_barcodes) != num_kits:
if len(new_barcodes) > num_kits:
error_message = (f'Too many barcodes provided in '
f'file {barcode_file}.')
else:
error_message = (f'Not enough barcodes provided in '
f'file {barcode_file}.')
return render_template('create_kits.html',
error_message=error_message,
projects=projects,
**build_login_variables())

user_barcodes.append(new_barcodes)
total_provided_samples += len(new_barcodes)

# Calculate remaining samples to be generated
total_needed_samples = num_kits * num_samples
remaining_samples_to_generate = total_needed_samples - \
total_provided_samples

if remaining_samples_to_generate < 0:
error_message = (f'Too many samples provided '
f'({total_provided_samples}) '
f'for the specified number of kits and samples '
f'({total_needed_samples}).')
return render_template('create_kits.html',
error_message=error_message,
projects=projects,
**build_login_variables())

payload = {
'action': 'create' if not user_barcodes else 'insert',
'project_ids': selected_project_ids,
'number_of_kits': num_kits,
'number_of_samples': num_samples,
'generate_barcodes': generate_barcodes,
'user_barcodes': user_barcodes
}

if prefix:
payload['kit_id_prefix'] = prefix

status, result = APIRequest.post(
'/api/admin/create/kits',
json=payload)
status, result = APIRequest.post('/api/admin/create/kits',
json=payload)

if status != 201:
start_index = result.find("Key")
if start_index != -1:
error_message = result[start_index:]
error_message = error_message[:44]
else:
error_message = "Error message not found in the results."

return render_template('create_kits.html',
error_message='Failed to create kits',
error_message=error_message,
projects=projects,
**build_login_variables())

# StringIO/BytesIO based off https://stackoverflow.com/a/45111660
buf = io.StringIO()
payload = io.BytesIO()

# explicitly expand out the barcode detail
kits = pd.DataFrame(result['created'])
for i in range(num_samples):
kits['barcode_%d' % (i+1)] = [r['sample_barcodes'][i]
for _, r in kits.iterrows()]

for kit_index, row in kits.iterrows():
sample_barcodes = row['sample_barcodes']
for sample_index in range(len(sample_barcodes)):
kits.at[kit_index, f'barcode_{sample_index + 1}'] = \
sample_barcodes[sample_index]

kits.drop(columns='sample_barcodes', inplace=True)

kits.to_csv(buf, sep=',', index=False, header=True)
Expand All @@ -556,6 +623,125 @@ def new_kits():
mimetype='text/csv')


def _read_csv_file(file):
content = file.read().decode('utf-8-sig')
return [row[0] for row in csv.reader(io.StringIO(content),
skipinitialspace=True)
if row]


def _handle_add_barcodes_api_request(payload):
status, result = APIRequest.post('/api/admin/add_barcodes',
json=payload)
if status != 200 and status != 204:
return render_template('add_barcode_to_kit.html',
error_message=result,
**build_login_variables()), status
return result


def _save_and_send_csv(kit_ids, barcodes):
with tempfile.NamedTemporaryFile(mode='w',
delete=False,
newline='') as file:
writer = csv.writer(file)
writer.writerow(['Kit IDs', 'Barcode'])
writer.writerows(zip(kit_ids, barcodes))
filename = file.name

stamp = datetime.now().strftime('%d%b%Y-%H%M')
fname = f'kits-{stamp}.csv'
return send_file(filename,
as_attachment=True,
download_name=fname)


@app.route('/add_barcode_to_kit', methods=['GET', 'POST'])
def new_barcode_kit():
if request.method == 'GET':
return render_template('add_barcode_to_kit.html',
**build_login_variables())

kit_ids = []
barcodes = []

if 'kit_ids' in request.form:
kit_ids = request.form['kit_ids']

if 'user_barcode' in request.form:
user_barcode = request.form['user_barcode']
barcodes.append(user_barcode)
else:
user_barcode = None

generate_barcode_single = \
request.form.get('generate_barcode_single')
generate_barcodes_multiple = \
request.form.get('generate_barcodes_multiple')

if 'kit_ids' in request.files:
kit_ids_file = request.files['kit_ids']
kit_ids = _read_csv_file(kit_ids_file)

if 'barcodes_file' in request.files:
barcodes_file = request.files['barcodes_file']
barcodes = _read_csv_file(barcodes_file)

if generate_barcode_single or user_barcode:
if not user_barcode:
payload = {
'action': 'create',
'kit_ids': kit_ids,
'generate_barcode_single': generate_barcode_single
}
barcodes = _handle_add_barcodes_api_request(payload)
if isinstance(barcodes, tuple):
return barcodes

payload = {
"action": "insert",
"barcodes": barcodes,
"kit_ids": kit_ids
}
result = _handle_add_barcodes_api_request(payload)
if isinstance(result, tuple):
return result

return render_template('add_barcode_to_kit.html',
barcodes=barcodes,
kit_id=kit_ids,
**build_login_variables())

if generate_barcodes_multiple:
payload = {
'action': 'create',
'kit_ids': kit_ids,
'generate_barcodes_multiple': generate_barcodes_multiple
}
barcodes = _handle_add_barcodes_api_request(payload)
if isinstance(barcodes, tuple):
return barcodes

if len(kit_ids) != len(barcodes):
error_message = f'The number of kit IDs ({len(kit_ids)}) '\
f'does not match the number of barcodes '\
f'({len(barcodes)}).'
return render_template('add_barcode_to_kit.html',
error_message=error_message,
**build_login_variables()), 400

payload = {
"action": "insert",
"barcodes": barcodes,
"kit_ids": kit_ids
}
result = _handle_add_barcodes_api_request(payload)
if isinstance(result, tuple):
return result

return _save_and_send_csv(kit_ids, barcodes)


def _check_sample_status(extended_barcode_info):
warning = None
in_microsetta_project = any(
Expand Down
Loading

0 comments on commit a739b84

Please sign in to comment.