diff --git a/src/xchemalign/collator.py b/src/xchemalign/collator.py index 24d6e58..232ef1a 100644 --- a/src/xchemalign/collator.py +++ b/src/xchemalign/collator.py @@ -556,7 +556,7 @@ def _copy_files(self, meta): num_event_maps = 0 event_tables = self._find_event_tables() - + forbidden_unattested_ligand_events = {} for xtal_name, xtal in meta[Constants.META_XTALS].items(): dir = cryst_path / xtal_name @@ -610,33 +610,73 @@ def _copy_files(self, meta): elif type == Constants.CONFIG_TYPE_MODEL_BUILDING: self.logger.warn("CIF entry missing for {}".format(xtal_name)) - # handle panddas event maps + + # Handle histroical ligand binding events (in particular pull up their event map SHA256s for comparing) hist_event_maps = {} - for event_map_data in historical_xtal_data.get(Constants.META_BINDING_EVENT, []): - model = event_map_data.get(Constants.META_PROT_MODEL) - chain = event_map_data.get(Constants.META_PROT_CHAIN) - res = event_map_data.get(Constants.META_PROT_RES) + for ligand_binding_data in historical_xtal_data.get(Constants.META_BINDING_EVENT, []): + model = ligand_binding_data.get(Constants.META_PROT_MODEL) + chain = ligand_binding_data.get(Constants.META_PROT_CHAIN) + res = ligand_binding_data.get(Constants.META_PROT_RES) if model is not None and chain and res: - hist_event_maps[(model, chain, res)] = event_map_data + hist_event_maps[(model, chain, res)] = ligand_binding_data - best_event_map_paths = self.get_dataset_event_maps(xtal_name, pdb_input, event_tables) - num_identical_historical_event_maps = 0 + # Determine the ligands present and their coordinates + dataset_ligands = self.get_dataset_ligands(pdb) + + # Match ligand to panddas event maps if possible and determine if those maps are new + best_event_map_paths = self.get_dataset_event_maps(xtal_name, dataset_ligands, event_tables) + identical_historical_event_maps = {} + unattested_ligand_events = {} + attested_ligand_events = {} event_maps_to_copy = {} - if best_event_map_paths: - for k, tup in best_event_map_paths.items(): - path = tup[0] + + for ligand_key in dataset_ligands: + if ligand_key in best_event_map_paths: + ligand_event_map_data = best_event_map_paths[ligand_key] + path = ligand_event_map_data[0] if path: digest = utils.gen_sha256(path) ccp4_output = ( - cryst_path / xtal_name / "{}_{}_{}_{}.ccp4".format(xtal_name, k[0], k[1], k[2]) + cryst_path / xtal_name / "{}_{}_{}_{}.ccp4".format( + xtal_name, + ligand_key[0], + ligand_key[1], + ligand_key[2]) ) - event_maps_to_copy[k] = (path, ccp4_output, digest, k, tup[1], tup[2]) - hist_data = hist_event_maps.get(k) + attested_ligand_events[ligand_key] = ( + path, + ccp4_output, + digest, + ligand_key, + ligand_event_map_data[1], + ligand_event_map_data[2], + ) + hist_data = hist_event_maps.get(ligand_key) + # Track whether the event map actually is new by data if hist_data: if digest == hist_data.get(Constants.META_SHA256): - num_identical_historical_event_maps += 1 + identical_historical_event_maps[ligand_key] = True + else: + event_maps_to_copy[ligand_key] = True + # Handle ligands that cannot be matched + else: + # Add those permitted ligands + if xtal_name in self.panddas_missing_ok: + self.logger.warn( + "no PanDDA event map found for", + xtal_name, + "but this is OK as it's been added to the panddas_missing_ok", + "list in the config file", + ) + unattested_ligand_events[ligand_key] = True + # Track forbidden ligands for informative error messages at the end of this function else: - event_maps_to_copy[k] = (None, None, None, k, tup[1], tup[2]) + self.logger.error( + "no PanDDA event map found. If you want to allow this then add", + xtal_name, + "to the panddas_missing_ok list in the config file", + ) + forbidden_unattested_ligand_events[xtal_name] = ligand_key else: @@ -691,35 +731,60 @@ def _copy_files(self, meta): except: self.logger.warn('failed to generate SMILES for ligand {}'.format(xtal_name)) - # copy event maps - if event_maps_to_copy: - if num_identical_historical_event_maps == len(event_maps_to_copy): - historical_data = historical_xtal_data.get(Constants.META_BINDING_EVENT) - data_to_add[Constants.META_BINDING_EVENT] = historical_data - else: - new_data = [] - for key, to_copy in event_maps_to_copy.items(): - # print('copying {} to {}'.format(to_copy[0], self.output_path / to_copy[1])) - f = shutil.copy2(to_copy[0], self.output_path / to_copy[1], follow_symlinks=True) - if not f: - self.logger.error( - "Failed to copy Panddas file {} to {}".format( - to_copy[0], self.output_path / to_copy[1] - ) + # copy event maps that do not differ in SHA from previously known ones + unsucessfully_copied_event_maps = {} + if len(event_maps_to_copy) != 0: + for ligand_key in event_maps_to_copy: + source = attested_ligand_events[ligand_key][0] + destination = attested_ligand_events[ligand_key][1] + f = shutil.copy2(source[0], self.output_path / destination[1], follow_symlinks=True) + if not f: + self.logger.error( + "Failed to copy Panddas file {} to {}".format( + source[0], self.output_path / destination[1] ) - else: - d = { - Constants.META_FILE: str(to_copy[1]), - Constants.META_SHA256: to_copy[2], - Constants.META_PROT_MODEL: int(to_copy[3][0]), - Constants.META_PROT_CHAIN: to_copy[3][1], - Constants.META_PROT_RES: to_copy[3][2], - Constants.META_PROT_INDEX: to_copy[4], - Constants.META_PROT_BDC: to_copy[5], - } - new_data.append(d) - - data_to_add[Constants.META_BINDING_EVENT] = new_data + ) + # Mark that copying failed + unsucessfully_copied_event_maps[ligand_key] = True + + + # Create ligand binding events for the dataset + ligand_binding_events = [] + for ligand_key in dataset_ligands: + # Add binding events for ligands that can be matched to PanDDA event maps + if ligand_key in attested_ligand_events: + # Skip if failed to copy pandda event map + if ligand_key in unsucessfully_copied_event_maps: + continue + attested_ligand_event_data = attested_ligand_events[ligand_key] + data = { + Constants.META_FILE: attested_ligand_event_data[0], + Constants.META_SHA256: attested_ligand_event_data[2], + Constants.META_PROT_MODEL: ligand_key[0], + Constants.META_PROT_CHAIN: ligand_key[1], + Constants.META_PROT_RES: ligand_key[2], + Constants.META_PROT_INDEX: attested_ligand_event_data[4], + Constants.META_PROT_BDC: attested_ligand_event_data[5], + } + # Add binding events for permitted ligands without an event map + elif ligand_key in unattested_ligand_events: + data = { + Constants.META_FILE: None, + Constants.META_SHA256: None, + Constants.META_PROT_MODEL: ligand_key[0], + Constants.META_PROT_CHAIN: ligand_key[1], + Constants.META_PROT_RES: ligand_key[2], + Constants.META_PROT_INDEX: None, + Constants.META_PROT_BDC: None, + } + # Skip if ligand key is not associated with a legal ligand + else: + continue + ligand_binding_events.append(data) + + # Add data on the ligand binding events to the new dataset to add + data_to_add[Constants.META_BINDING_EVENT] = ligand_binding_events + new_xtal_data = {} for k, v in historical_xtal_data.items(): @@ -728,6 +793,17 @@ def _copy_files(self, meta): new_xtal_data[k] = v xtal[Constants.META_XTAL_FILES] = new_xtal_data + # Handle the presence of ligand without event maps that have not been permitted + if len(forbidden_unattested_ligand_events) != 0: + exception = ( + "No PanDDA event map found that correspond to the following ligands. If you want to allow these then " + "add the corresponding crystal names to the panddas_missing_ok list in the config file:\n" + ) + for dtag, ligand_key in forbidden_unattested_ligand_events.items(): + lk = ligand_key + exception = exception + f"{dtag} : Model: {lk[0]}; Chain: {lk[1]}; Residue: {lk[2]}" + raise Exception(exception) + return meta def _find_event_tables(self): @@ -867,10 +943,12 @@ def _copy_config(self): return False return True - def get_ligand_coords( + def get_dataset_ligands( self, - structure: gemmi.Structure, + structure_path: Path, ) -> dict[tuple[str, str, str], np.array]: + structure = gemmi.read_structure(str(structure_path)) + ligand_coords = {} for model in structure: for chain in model: @@ -922,14 +1000,14 @@ def get_closest_event_map( else: return None, None + def get_dataset_event_maps( - self, xtal_name: str, pdb_file: Path, event_tables: dict[Path, pd.DataFrame] + self, xtal_name: str, ligand_coords: dict[(str,str,int), np.array], event_tables: dict[Path, pd.DataFrame] ) -> dict[tuple[str, str, str], Path]: # Get the relevant structure - structure = gemmi.read_structure(str(pdb_file)) # Get the coordinates of ligands - ligand_coords = self.get_ligand_coords(structure) + # ligand_coords = self.get_ligand_coords(structure) # Get the closest events within some reasonable radius closest_event_maps = {} @@ -937,22 +1015,7 @@ def get_dataset_event_maps( closest_event_map, data = self.get_closest_event_map(xtal_name, ligand_coord, event_tables) if closest_event_map: closest_event_maps[ligand_key] = (closest_event_map, data[0], data[1]) - else: - if xtal_name in self.panddas_missing_ok: - self.logger.warn( - "no PanDDA event map found for", - xtal_name, - "but this is OK as it's been added to the panddas_missing_ok", - "list in the config file", - ) - else: - self.logger.error( - "no PanDDA event map found. If you want to allow this then add", - xtal_name, - "to the panddas_missing_ok list in the config file", - ) - closest_event_maps[ligand_key] = (None, None, None) return closest_event_maps