diff --git a/viewer/squonk_job_file_transfer.py b/viewer/squonk_job_file_transfer.py index e6600350..1560b43f 100644 --- a/viewer/squonk_job_file_transfer.py +++ b/viewer/squonk_job_file_transfer.py @@ -1,6 +1,7 @@ """Functions (for the celery tasks) that transfer (download) files from Fragalysis to Squonk. """ +import urllib.parse import os import shutil @@ -9,13 +10,6 @@ from squonk2.dm_api import DmApi from celery.utils.log import get_task_logger -from viewer.utils import ( - clean_filename, - add_prop_to_sdf, - add_prop_to_mol, - create_media_sub_directory, - delete_media_sub_directory -) from viewer.models import ( SiteObservation, ComputedMolecule, @@ -24,240 +18,20 @@ logger = get_task_logger(__name__) -# Squonk Specification File Paths -# squonk-file-paths: -# lhs-fragments: -# molecule: {pdb_code}.mol -# template: /fragalysis-files/{target_name}/{pdb_code}.mol -# lhs-protein-apo-desolv: -# molecule: {pdb_code}_apo_desolv.pdb -# template: /fragalysis-files/{target_name}/{pdb_code}_apo_desolv.pdb - -# Fragalysis Upload Mapping (as well as I can decipher it: -# -# Type | From Aligned File | E.g. | Stored In -# .apo_desolve | _apo-desolv.pdb| CD44MMA-x0017_0A_apo-desolv.pdb| Protein.apo_desolve_info -# .mol | .mol | CD44MMA-x0017_0A.mol | Molecule.sdf_info -# .sdf | .sdf | CD44MMA-x0017_0A.sdf | Molecule.sdf_file -# - -SQUONK_PROT_MAPPING = { - 'apo_desolve': {'func': 'prot_file', 'field': 'apo_desolve_info'}, - 'mol': {'func': 'mol_file', 'field': 'sdf_info'}, - 'sdf': {'func': 'sdf_file', 'field': 'sdf_file'} -} - -SQUONK_COMP_MAPPING = { - 'mol': {'func': 'comp_mol_file', 'field': 'sdf_info'}, -} - -REF_PROP = 'ref_mols' - -def ref_mol_from_protein_code(protein_code, target_title): - """Returns the xchem ref_mol given a protein code and target name - If it can't find a target prefix it just returns the code as it is. - - Args: - protein_code (e.g. Mpro-x11513_0A:ALP-POS-f13221e1-4 ) - Returns: - xchem ref mol (e.g. x11513_0A) - """ - target_prefix = target_title + '-' - if target_prefix in protein_code: - return protein_code.strip().split(":")[0].replace(target_prefix, '') - else: - return protein_code - - -def mol_file(field, trans_dir, protein_code, target): - """Copy the requested file from the molecule table to the transfer directory, standardising - the filename. Extract the contents of the mol_field to a file in the transfer directory. - - Args: - field : source field in Molecule table - trans_dir : temporary directory used for transfer - protein - target - Returns: - filepath - """ - logger.info('Generating filepath from ield=%s trans_dir=%s protein_code=%s target=%s', - field, trans_dir, protein_code, target) - - site_obvs = SiteObservation.objects.get(code=protein_code) - - # TODO: I don't think this is working, the mol is stored in - # database field. I suppose the path can be inferred from name and - # standard location, but not sure. - mol_block = getattr(site_obvs, field) - filepath = os.path.join(trans_dir, site_obvs.code.strip().split(":")[0] + '.mol') - code = ref_mol_from_protein_code(protein_code, target.title) - add_prop_to_mol(mol_block, filepath, code) - - logger.info('Generated filepath "%s"', filepath) - - return filepath - - -def sdf_file(field, trans_dir, protein_code, target): - """Copy the requested file from the molecule table to the transfer directory, standardising - the filename. - Args: - field : source field in Molecule table - trans_dir : temporary directory used for transfer - protein - target - Returns: - filepath - """ - - logger.info('Generated filepath from field=%s trans_dir=%s protein_code=%s target=%s', - field, trans_dir, protein_code, target) - - site_obvs = SiteObservation.objects.get(code=protein_code) - file = getattr(site_obvs, field) - if not file: - logger.error( - 'No file (field=%s trans_dir=%s protein_code=%s target=%s)', - field, trans_dir, protein_code, target) - return None - - inpath = os.path.join(settings.MEDIA_ROOT, file.name) - filepath = os.path.join(trans_dir, clean_filename(inpath)) - code = ref_mol_from_protein_code(protein_code, target.title) - add_prop_to_sdf(inpath, filepath, {REF_PROP: code}) - - logger.info('Generated filepath "%s"', filepath) - - return filepath - - -def prot_file(field, trans_dir, protein_code, target): - """Copy the requested file from the protein table to the transfer directory, standardising the - name. +def process_file_transfer(auth_token, job_transfer_id): + """Check/create the file transfer Args: - field : source field in Protein table - trans_dir : temporary directory used for transfer - protein - target - Returns: - filepath - """ - - logger.info('Generating filepath from field=%s' - ' trans_dir=%s protein_code=%s target.title=%s...', - field, trans_dir, protein_code, target.title) - - site_obvs = SiteObservation.objects.get(code=protein_code) - - # The source file (if found) - in_path = None - - # Inspect the DB record... - # file = getattr(protein, field) - file = getattr(site_obvs, field) - if file: - logger.info('%s has a value (%s)', field, file.name) - in_path = os.path.join(settings.MEDIA_ROOT, file.name) - else: - # The chosen field has no value. - # For example 'Project.apo_desolve_info' for protein code 'CD44MMA-x0017_0A'. - # In this case it indicates there's no file link for the corresponding - # '_apo-desolv.pdb' file. - # - # For issue 954 (m2ms/fragalysis-frontend) we hack our way out of this - # (specifically for '_apo-desolv.pdb' files - i.e. we inspect the - # directory we suspect the file might be in, i.e. in the directory - # 'targets/CD44MMA/aligned/CD44MMA-x0017_0A' where 'CD44MMA' is the - # transfer target and 'CD44MMA-x0017_0A' is the protein code. - # We do this to cater for DB migration discrepancies that have evolved. - # - # We handle some protein code differently (Mpro for example has code like - # "Mpro-P0045_0A:TRY-UNI-2eddb1ff-7" but the files are located in - # directories named "Mpro-P0045_0A". - # So, split the name and take the left-hand portion... - lean_protein_code: str = protein_code.split(':')[0] - anticipated_path = os.path.join( - settings.MEDIA_ROOT, - f"targets/{target.title}/aligned", - f"{lean_protein_code}/{lean_protein_code}_apo-desolv.pdb" - ) - if os.path.isfile(anticipated_path): - logger.warning('%s has no value but found %s', field, anticipated_path) - in_path = anticipated_path - else: - logger.error('%s has no value and %s does not exist', - field, anticipated_path) - - # Have we got an input file (via DB or 'guessing')? - if in_path: - out_path = os.path.join(trans_dir, clean_filename(in_path)) - shutil.copyfile(in_path, out_path) - logger.info('Generated transfer file "%s"', out_path) - return out_path - - # Tried our best - but no input file found! - return None - - -def comp_mol_file(field, trans_dir, name, target): - """Extract the contents of the mol_field to a file in the transfer directory - - Args: - field : source field in Molecule table - trans_dir : temporary directory used for transfer - protein - target - Returns: - filepath - """ - del target - - logger.info('Generated filepath from field=%s trans_dir=%s name=%s', - field, trans_dir, name) - - comp = ComputedMolecule.objects.get(name=name) - mol_block = getattr(comp, field) - filepath = os.path.join(trans_dir, name + '.mol') - # In the case of a computed molecule the whole name is used. - add_prop_to_mol(mol_block, filepath, name) - - logger.info('Generated filepath "%s"', filepath) - - return filepath - - -def process_file_transfer(auth_token, - job_transfer_id): - """Check/create the file transfer for list of proteins - - Args: - request, + auth_token from the request job_transfer_id - """ logger.info('+ process_file_transfer(job_transfer_id=%s)', job_transfer_id) - logger.debug(auth_token) job_transfer = JobFileTransfer.objects.get(id=job_transfer_id) logger.info('+ job_transfer.squonk_project=%s', job_transfer.squonk_project) - trans_sub_dir = os.path.join('squonk_transfer', str(job_transfer.id)) - trans_dir = create_media_sub_directory(trans_sub_dir) - - # location in squonk project where files will reside - # e.g. "/fragalysis-files/hjyx/Mpro" for new transfers, - # "/fragalysis-files/Mpro" for existing transfers - target = job_transfer.target - if job_transfer.sub_path: - squonk_directory = os.path.join('/', settings.SQUONK2_MEDIA_DIRECTORY, job_transfer.sub_path, target.title) - else: - squonk_directory = os.path.join('/', settings.SQUONK2_MEDIA_DIRECTORY, target.title) - logger.info('+ Destination squonk_directory=%s', squonk_directory) - # This to pick up NULL values from the changeover to using compounds. if not job_transfer.compounds: job_transfer.compounds = [] @@ -269,104 +43,81 @@ def process_file_transfer(auth_token, logger.info('+ num_to_transfer=%s (%s + %s)', num_to_transfer, num_proteins_to_transfer, num_compounds_to_transfer) - # Proteins - for protein_code in job_transfer.proteins: - # For each protein transfer the list of files to squonk - # Then update the progress in the job_transfer record - logger.info('+ Collecting files for protein_code=%s (trans_dir=%s target=%s)', - protein_code, trans_dir, target) - file_list = [] - for file_type in SQUONK_PROT_MAPPING.values(): - filepath = globals()[file_type['func']](file_type['field'], trans_dir, - protein_code, target) - if filepath: - file_list.append(filepath) - - if not file_list: - logger.warning('No files found for protein_code=%s', protein_code) - continue + # The base directory for the source of the files we are transferring? + # We expect files to include a path relative to TARGET_LOADER_MEDIA_DIRECTORY + FILE_ROOT = os.path.join(settings.MEDIA_ROOT, settings.TARGET_LOADER_MEDIA_DIRECTORY) + logger.info('+ FILE_ROOT=%s', FILE_ROOT) - logger.info('+ Found % Protein files', len(file_list)) - logger.info('+ Protein file_list=%s', file_list) - logger.info('+ Calling DmApi.put_unmanaged_project_files() [proteins]...') - result = DmApi.put_unmanaged_project_files(auth_token, - project_id=job_transfer.squonk_project, - project_files=file_list, - project_path=squonk_directory, - force=True) - logger.debug(result) - - if result.success: - idx += 1 - job_transfer.transfer_progress = (idx*100/num_to_transfer) - job_transfer.save() - logger.info('+ Transferred Protein files') - else: - msg = f'File Transfer Failed (msg={result.msg})' - logger.error(msg) - raise RuntimeError(msg) + # Build the Squonk2 Project directory where files will be placed + # e.g. "/fragalysis-files/hjyx". + target = job_transfer.target + squonk_directory = os.path.join('/', settings.SQUONK2_MEDIA_DIRECTORY, job_transfer.sub_path) + logger.info('+ Destination squonk_directory=%s', squonk_directory) - # Computed Molecules - for name in job_transfer.compounds: - # For each protein transfer the list of files to squonk - # Then update the progress in the job_transfer record - logger.info('+ Collecting files for compound=%s (trans_dir=%s target=%s)', - name, trans_dir, target) + # All the files (proteins or compounds) are provided using relative + # paths from the media directory. So we can join the tow lists + # and treat them the same + all_filename_refs = job_transfer.proteins + job_transfer.compounds + if all_filename_refs: + logger.info('+ Collecting files...') file_list = [] - for file_type in SQUONK_COMP_MAPPING.values(): - filepath = globals()[file_type['func']](file_type['field'], trans_dir, - name, target) - if filepath: - file_list.append(filepath) - - if not file_list: - logger.warning('No files found for compound=%s', name) - continue - - logger.info('+ Found % Compound files', len(file_list)) - logger.info('+ Compound file_list=%s', file_list) - logger.info('+ Calling DmApi.put_unmanaged_project_files() [compounds]...') - result = DmApi.put_unmanaged_project_files(auth_token, - project_id=job_transfer.squonk_project, - project_files=file_list, - project_path=squonk_directory, - force=True) + for filename_ref in all_filename_refs: + # We need to decode the file reference, + # it is likely to be URL encoded. + filename = urllib.parse.unquote(filename_ref) + logger.info('+ Collecting filename=%s (target=%s)', filename, target) + # File is expected to exist in the media directory + file_path = os.path.join(FILE_ROOT, filename) + if not os.path.isfile(file_path): + msg = f'No such protein file ({file_path})' + logger.error(msg) + raise RuntimeError(msg) + file_list.append(file_path) + + logger.info('+ Found %s files', len(file_list)) + logger.info('+ Calling DmApi.put_unmanaged_project_files() [proteins]...') + result = DmApi.put_unmanaged_project_files( + auth_token, + project_id=job_transfer.squonk_project, + project_files=file_list, + project_path=squonk_directory, + force=True, + ) logger.debug(result) if result.success: idx += 1 - job_transfer.transfer_progress = (idx*100/num_to_transfer) + job_transfer.transfer_progress = idx * 100 / num_to_transfer job_transfer.save() - logger.info('+ Transferred Compound files') + logger.info('+ Transferred files') else: msg = f'File Transfer Failed (msg={result.msg})' logger.error(msg) raise RuntimeError(msg) - # Tidy up the transfer directory. - delete_media_sub_directory(trans_sub_dir) - -def check_file_transfer(request): - """Check the request and return a list of protein codes and/or computed molecule names +def validate_file_transfer_files(request): + """Check the request and return a list of proteins and/or computed molecule objects Args: request Returns error dict - list of validated protein codes - list of validated computed molecule names + list of validated proteins (SiteObservation) + list of validated computed molecules (ComputedMolecule) """ error = {} proteins = [] compounds = [] if request.data['proteins']: + # Get first part of protein code proteins_list = [p.strip().split(":")[0] for p in request.data['proteins'].split(',')] + logger.info('+ Given proteins=%s', proteins_list) + proteins = [] - # Filter by protein codes for code_first_part in proteins_list: site_obvs = SiteObservation.objects.filter(code__contains=code_first_part).values() if site_obvs.exists(): @@ -383,10 +134,12 @@ def check_file_transfer(request): return error, proteins, compounds if request.data['compounds']: - # Get first part of protein code + + # Get compounds compounds_list = [c.strip() for c in request.data['compounds'].split(',')] + logger.info('+ Given compounds=%s', compounds_list) + compounds = [] - # Filter by protein codes for compound in compounds_list: comp = ComputedMolecule.objects.filter(name=compound).values() if comp.exists(): @@ -405,7 +158,7 @@ def check_file_transfer(request): if proteins or compounds: return error, proteins, compounds else: - error['message'] = 'A valid protein codes and/or a list of valid compound names must ' \ - 'be entered' + error['message'] = 'A valid set of protein codes and/or a list of valid' \ + ' compound names must be provided' error['status'] = status.HTTP_404_NOT_FOUND return error, proteins, compounds diff --git a/viewer/tasks.py b/viewer/tasks.py index eeb0bb66..81841074 100644 --- a/viewer/tasks.py +++ b/viewer/tasks.py @@ -518,12 +518,13 @@ def task_load_target(self, data_bundle=None, proposal_ref=None, contact_email=No @shared_task def process_job_file_transfer(auth_token, jt_id): - """ Celery task to take a list of proteins and specification and transfer the files to Squonk2 + """Celery task to take a list of proteins and a specification + and transfer the files to Squonk2 Parameters ---------- - task_ - jt_id of job_file_transfer record + auth_token for the executing user + jt_id is the job_file_transfer record ID Returns ------- diff --git a/viewer/views.py b/viewer/views.py index c19202de..ebea93b4 100644 --- a/viewer/views.py +++ b/viewer/views.py @@ -74,7 +74,7 @@ ) from .squonk_job_file_transfer import ( - check_file_transfer + validate_file_transfer_files ) from .squonk_job_request import ( @@ -1616,9 +1616,6 @@ def create(self, request): logger.info('+ snapshot_id=%s', snapshot_id) logger.info('+ session_project_id=%s', session_project_id) - target = models.Target.objects.get(id=target_id) - assert target - # Check the user can use this Squonk2 facility. # To do this we need to setup a couple of API parameter objects. sq2a_common_params: CommonParams = CommonParams(user_id=user.id, @@ -1632,8 +1629,8 @@ def create(self, request): content = {f'You cannot do this ({sq2a_rv.msg})'} return Response(content, status=status.HTTP_403_FORBIDDEN) - # Check the presense of the files expected to be transferred - error, proteins, compounds = check_file_transfer(request) + # Check the existence of the files that are expected to be transferred + error, proteins, compounds = validate_file_transfer_files(request) if error: return Response(error['message'], status=error['status']) @@ -1682,7 +1679,7 @@ def create(self, request): assert job_transfer.target assert job_transfer.target.title transfer_target = job_transfer.target.title - logger.info('+ transfer_target=%s', transfer_target) + logger.info('+ job_transfer.target=%s', transfer_target) job_transfer.transfer_status = 'PENDING' job_transfer.transfer_datetime = None @@ -1690,19 +1687,12 @@ def create(self, request): job_transfer.save() # The root (in the Squonk project) where files will be written for this Job. - # Something like "fragalysis-files/hjyx" for new transfers, - # "fragalysis-files" for existing transfers - if job_transfer.sub_path: - transfer_root = os.path.join(settings.SQUONK2_MEDIA_DIRECTORY, job_transfer.sub_path) - else: - transfer_root = settings.SQUONK2_MEDIA_DIRECTORY - logger.info('+ transfer_root=%s', transfer_root) - - logger.info('oidc_access_token') - logger.info(request.session['oidc_access_token']) - - logger.info('+ Starting transfer (celery) (job_transfer.id=%s)...', - job_transfer.id) + # This will be something "${SQUONK2_MEDIA_DIRECTORY}/hjyx", where "hjyx" is + # a randomly-generated 4-character sequence for the given transfer assigned + # when the JobFileTransfer record is created. + transfer_root = os.path.join(settings.SQUONK2_MEDIA_DIRECTORY, job_transfer.sub_path) + logger.info('+ Starting transfer (celery) (job_transfer.id=%s transfer_root=%s)...', + job_transfer.id, transfer_root) job_transfer_task = process_job_file_transfer.delay(request.session['oidc_access_token'], job_transfer.id)