Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Working Branch #108

214 changes: 200 additions & 14 deletions microsetta_admin/server.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import csv
import tempfile
import jwt
from flask import render_template, Flask, request, session, send_file, url_for
import secrets
Expand Down Expand Up @@ -512,35 +514,100 @@ def new_kits():
**build_login_variables())

elif request.method == 'POST':
num_kits = int(request.form['num_kits'])
num_samples = int(request.form['num_samples'])
prefix = request.form['prefix']
selected_project_ids = request.form.getlist('project_ids')
payload = {'number_of_kits': num_kits,
'number_of_samples': num_samples,
'project_ids': selected_project_ids}
num_kits = int(request.form['number_of_kits'])
num_samples = int(request.form['number_of_samples'])

generate_barcodes = []
i = 1
while i <= num_samples:
barcode = request.form.get(f'generate_barcodes_sample_{i}')
if barcode:
generate_barcodes.append([barcode] * num_kits)
else:
generate_barcodes.append([])
i += 1

user_barcodes = []
total_provided_samples = 0

for i in range(0, 25):
barcode_file = request.files.get(f'upload_csv_{i}')
if barcode_file:
barcode_file = barcode_file.read().decode('utf-8')
new_barcodes = [line.split(',')[0].strip('\ufeff\r')
for line in barcode_file.split('\n')
if line.strip()]
if len(new_barcodes) != num_kits:
if len(new_barcodes) > num_kits:
error_message = (f'Too many barcodes provided in '
f'file {barcode_file}.')
else:
error_message = (f'Not enough barcodes provided in '
f'file {barcode_file}.')
return render_template('create_kits.html',
error_message=error_message,
projects=projects,
**build_login_variables())

user_barcodes.append(new_barcodes)
total_provided_samples += len(new_barcodes)

# Calculate remaining samples to be generated
total_needed_samples = num_kits * num_samples
remaining_samples_to_generate = total_needed_samples - \
total_provided_samples

if remaining_samples_to_generate < 0:
error_message = (f'Too many samples provided '
f'({total_provided_samples}) '
f'for the specified number of kits and samples '
f'({total_needed_samples}).')
return render_template('create_kits.html',
error_message=error_message,
projects=projects,
**build_login_variables())

payload = {
'action': 'create' if not user_barcodes else 'insert',
'project_ids': selected_project_ids,
'number_of_kits': num_kits,
'number_of_samples': num_samples,
'generate_barcodes': generate_barcodes,
'user_barcodes': user_barcodes
}

if prefix:
payload['kit_id_prefix'] = prefix

status, result = APIRequest.post(
'/api/admin/create/kits',
json=payload)
status, result = APIRequest.post('/api/admin/create/kits',
json=payload)

if status != 201:
start_index = result.find("Key")
if start_index != -1:
error_message = result[start_index:]
error_message = error_message[:44]
else:
error_message = "Error message not found in the results."

return render_template('create_kits.html',
error_message='Failed to create kits',
error_message=error_message,
projects=projects,
**build_login_variables())

# StringIO/BytesIO based off https://stackoverflow.com/a/45111660
buf = io.StringIO()
payload = io.BytesIO()

# explicitly expand out the barcode detail
kits = pd.DataFrame(result['created'])
for i in range(num_samples):
kits['barcode_%d' % (i+1)] = [r['sample_barcodes'][i]
for _, r in kits.iterrows()]

for kit_index, row in kits.iterrows():
sample_barcodes = row['sample_barcodes']
for sample_index in range(len(sample_barcodes)):
kits.at[kit_index, f'barcode_{sample_index + 1}'] = \
sample_barcodes[sample_index]

kits.drop(columns='sample_barcodes', inplace=True)

kits.to_csv(buf, sep=',', index=False, header=True)
Expand All @@ -556,6 +623,125 @@ def new_kits():
mimetype='text/csv')


def _read_csv_file(file):
content = file.read().decode('utf-8-sig')
return [row[0] for row in csv.reader(io.StringIO(content),
skipinitialspace=True)
if row]


def _handle_add_barcodes_api_request(payload):
status, result = APIRequest.post('/api/admin/add_barcodes',
json=payload)
if status != 200 and status != 204:
return render_template('add_barcode_to_kit.html',
error_message=result,
**build_login_variables()), status
return result


def _save_and_send_csv(kit_ids, barcodes):
with tempfile.NamedTemporaryFile(mode='w',
delete=False,
newline='') as file:
writer = csv.writer(file)
writer.writerow(['Kit IDs', 'Barcode'])
writer.writerows(zip(kit_ids, barcodes))
filename = file.name

stamp = datetime.now().strftime('%d%b%Y-%H%M')
fname = f'kits-{stamp}.csv'
return send_file(filename,
as_attachment=True,
download_name=fname)


@app.route('/add_barcode_to_kit', methods=['GET', 'POST'])
def new_barcode_kit():
if request.method == 'GET':
return render_template('add_barcode_to_kit.html',
**build_login_variables())

kit_ids = []
barcodes = []

if 'kit_ids' in request.form:
kit_ids = request.form['kit_ids']

if 'user_barcode' in request.form:
user_barcode = request.form['user_barcode']
barcodes.append(user_barcode)
else:
user_barcode = None

generate_barcode_single = \
request.form.get('generate_barcode_single')
generate_barcodes_multiple = \
request.form.get('generate_barcodes_multiple')

if 'kit_ids' in request.files:
kit_ids_file = request.files['kit_ids']
kit_ids = _read_csv_file(kit_ids_file)

if 'barcodes_file' in request.files:
barcodes_file = request.files['barcodes_file']
barcodes = _read_csv_file(barcodes_file)

if generate_barcode_single or user_barcode:
if not user_barcode:
payload = {
'action': 'create',
'kit_ids': kit_ids,
'generate_barcode_single': generate_barcode_single
}
barcodes = _handle_add_barcodes_api_request(payload)
if isinstance(barcodes, tuple):
return barcodes

payload = {
"action": "insert",
"barcodes": barcodes,
"kit_ids": kit_ids
}
result = _handle_add_barcodes_api_request(payload)
if isinstance(result, tuple):
return result

return render_template('add_barcode_to_kit.html',
barcodes=barcodes,
kit_id=kit_ids,
**build_login_variables())

if generate_barcodes_multiple:
payload = {
'action': 'create',
'kit_ids': kit_ids,
'generate_barcodes_multiple': generate_barcodes_multiple
}
barcodes = _handle_add_barcodes_api_request(payload)
if isinstance(barcodes, tuple):
return barcodes

if len(kit_ids) != len(barcodes):
error_message = f'The number of kit IDs ({len(kit_ids)}) '\
f'does not match the number of barcodes '\
f'({len(barcodes)}).'
return render_template('add_barcode_to_kit.html',
error_message=error_message,
**build_login_variables()), 400

payload = {
"action": "insert",
"barcodes": barcodes,
"kit_ids": kit_ids
}
result = _handle_add_barcodes_api_request(payload)
if isinstance(result, tuple):
return result

return _save_and_send_csv(kit_ids, barcodes)


def _check_sample_status(extended_barcode_info):
warning = None
in_microsetta_project = any(
Expand Down
Loading
Loading