Skip to content

Commit

Permalink
Overhaul of collator.py:_copy_files to handle datasets with ligands b…
Browse files Browse the repository at this point in the history
…ut without event maps
  • Loading branch information
ConorFWild committed Nov 23, 2023
1 parent 4d17343 commit 75aded8
Showing 1 changed file with 128 additions and 65 deletions.
193 changes: 128 additions & 65 deletions src/xchemalign/collator.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ def _copy_files(self, meta):
num_event_maps = 0

event_tables = self._find_event_tables()

forbidden_unattested_ligand_events = {}
for xtal_name, xtal in meta[Constants.META_XTALS].items():
dir = cryst_path / xtal_name

Expand Down Expand Up @@ -610,33 +610,73 @@ def _copy_files(self, meta):
elif type == Constants.CONFIG_TYPE_MODEL_BUILDING:
self.logger.warn("CIF entry missing for {}".format(xtal_name))

# handle panddas event maps

# Handle histroical ligand binding events (in particular pull up their event map SHA256s for comparing)
hist_event_maps = {}
for event_map_data in historical_xtal_data.get(Constants.META_BINDING_EVENT, []):
model = event_map_data.get(Constants.META_PROT_MODEL)
chain = event_map_data.get(Constants.META_PROT_CHAIN)
res = event_map_data.get(Constants.META_PROT_RES)
for ligand_binding_data in historical_xtal_data.get(Constants.META_BINDING_EVENT, []):
model = ligand_binding_data.get(Constants.META_PROT_MODEL)
chain = ligand_binding_data.get(Constants.META_PROT_CHAIN)
res = ligand_binding_data.get(Constants.META_PROT_RES)
if model is not None and chain and res:
hist_event_maps[(model, chain, res)] = event_map_data
hist_event_maps[(model, chain, res)] = ligand_binding_data

best_event_map_paths = self.get_dataset_event_maps(xtal_name, pdb_input, event_tables)
num_identical_historical_event_maps = 0
# Determine the ligands present and their coordinates
dataset_ligands = self.get_dataset_ligands(pdb)

# Match ligand to panddas event maps if possible and determine if those maps are new
best_event_map_paths = self.get_dataset_event_maps(xtal_name, dataset_ligands, event_tables)
identical_historical_event_maps = {}
unattested_ligand_events = {}
attested_ligand_events = {}
event_maps_to_copy = {}
if best_event_map_paths:
for k, tup in best_event_map_paths.items():
path = tup[0]

for ligand_key in dataset_ligands:
if ligand_key in best_event_map_paths:
ligand_event_map_data = best_event_map_paths[ligand_key]
path = ligand_event_map_data[0]
if path:
digest = utils.gen_sha256(path)
ccp4_output = (
cryst_path / xtal_name / "{}_{}_{}_{}.ccp4".format(xtal_name, k[0], k[1], k[2])
cryst_path / xtal_name / "{}_{}_{}_{}.ccp4".format(
xtal_name,
ligand_key[0],
ligand_key[1],
ligand_key[2])
)
event_maps_to_copy[k] = (path, ccp4_output, digest, k, tup[1], tup[2])
hist_data = hist_event_maps.get(k)
attested_ligand_events[ligand_key] = (
path,
ccp4_output,
digest,
ligand_key,
ligand_event_map_data[1],
ligand_event_map_data[2],
)
hist_data = hist_event_maps.get(ligand_key)
# Track whether the event map actually is new by data
if hist_data:
if digest == hist_data.get(Constants.META_SHA256):
num_identical_historical_event_maps += 1
identical_historical_event_maps[ligand_key] = True
else:
event_maps_to_copy[ligand_key] = True
# Handle ligands that cannot be matched
else:
# Add those permitted ligands
if xtal_name in self.panddas_missing_ok:
self.logger.warn(
"no PanDDA event map found for",
xtal_name,
"but this is OK as it's been added to the panddas_missing_ok",
"list in the config file",
)
unattested_ligand_events[ligand_key] = True
# Track forbidden ligands for informative error messages at the end of this function
else:
event_maps_to_copy[k] = (None, None, None, k, tup[1], tup[2])
self.logger.error(
"no PanDDA event map found. If you want to allow this then add",
xtal_name,
"to the panddas_missing_ok list in the config file",
)
forbidden_unattested_ligand_events[xtal_name] = ligand_key


else:
Expand Down Expand Up @@ -691,35 +731,60 @@ def _copy_files(self, meta):
except:
self.logger.warn('failed to generate SMILES for ligand {}'.format(xtal_name))

# copy event maps
if event_maps_to_copy:
if num_identical_historical_event_maps == len(event_maps_to_copy):
historical_data = historical_xtal_data.get(Constants.META_BINDING_EVENT)
data_to_add[Constants.META_BINDING_EVENT] = historical_data
else:
new_data = []
for key, to_copy in event_maps_to_copy.items():
# print('copying {} to {}'.format(to_copy[0], self.output_path / to_copy[1]))
f = shutil.copy2(to_copy[0], self.output_path / to_copy[1], follow_symlinks=True)
if not f:
self.logger.error(
"Failed to copy Panddas file {} to {}".format(
to_copy[0], self.output_path / to_copy[1]
)
# copy event maps that do not differ in SHA from previously known ones
unsucessfully_copied_event_maps = {}
if len(event_maps_to_copy) != 0:
for ligand_key in event_maps_to_copy:
source = attested_ligand_events[ligand_key][0]
destination = attested_ligand_events[ligand_key][1]
f = shutil.copy2(source[0], self.output_path / destination[1], follow_symlinks=True)
if not f:
self.logger.error(
"Failed to copy Panddas file {} to {}".format(
source[0], self.output_path / destination[1]
)
else:
d = {
Constants.META_FILE: str(to_copy[1]),
Constants.META_SHA256: to_copy[2],
Constants.META_PROT_MODEL: int(to_copy[3][0]),
Constants.META_PROT_CHAIN: to_copy[3][1],
Constants.META_PROT_RES: to_copy[3][2],
Constants.META_PROT_INDEX: to_copy[4],
Constants.META_PROT_BDC: to_copy[5],
}
new_data.append(d)

data_to_add[Constants.META_BINDING_EVENT] = new_data
)
# Mark that copying failed
unsucessfully_copied_event_maps[ligand_key] = True


# Create ligand binding events for the dataset
ligand_binding_events = []
for ligand_key in dataset_ligands:
# Add binding events for ligands that can be matched to PanDDA event maps
if ligand_key in attested_ligand_events:
# Skip if failed to copy pandda event map
if ligand_key in unsucessfully_copied_event_maps:
continue
attested_ligand_event_data = attested_ligand_events[ligand_key]
data = {
Constants.META_FILE: attested_ligand_event_data[0],
Constants.META_SHA256: attested_ligand_event_data[2],
Constants.META_PROT_MODEL: ligand_key[0],
Constants.META_PROT_CHAIN: ligand_key[1],
Constants.META_PROT_RES: ligand_key[2],
Constants.META_PROT_INDEX: attested_ligand_event_data[4],
Constants.META_PROT_BDC: attested_ligand_event_data[5],
}
# Add binding events for permitted ligands without an event map
elif ligand_key in unattested_ligand_events:
data = {
Constants.META_FILE: None,
Constants.META_SHA256: None,
Constants.META_PROT_MODEL: ligand_key[0],
Constants.META_PROT_CHAIN: ligand_key[1],
Constants.META_PROT_RES: ligand_key[2],
Constants.META_PROT_INDEX: None,
Constants.META_PROT_BDC: None,
}
# Skip if ligand key is not associated with a legal ligand
else:
continue
ligand_binding_events.append(data)

# Add data on the ligand binding events to the new dataset to add
data_to_add[Constants.META_BINDING_EVENT] = ligand_binding_events


new_xtal_data = {}
for k, v in historical_xtal_data.items():
Expand All @@ -728,6 +793,17 @@ def _copy_files(self, meta):
new_xtal_data[k] = v
xtal[Constants.META_XTAL_FILES] = new_xtal_data

# Handle the presence of ligand without event maps that have not been permitted
if len(forbidden_unattested_ligand_events) != 0:
exception = (
"No PanDDA event map found that correspond to the following ligands. If you want to allow these then "
"add the corresponding crystal names to the panddas_missing_ok list in the config file:\n"
)
for dtag, ligand_key in forbidden_unattested_ligand_events.items():
lk = ligand_key
exception = exception + f"{dtag} : Model: {lk[0]}; Chain: {lk[1]}; Residue: {lk[2]}"
raise Exception(exception)

return meta

def _find_event_tables(self):
Expand Down Expand Up @@ -867,10 +943,12 @@ def _copy_config(self):
return False
return True

def get_ligand_coords(
def get_dataset_ligands(
self,
structure: gemmi.Structure,
structure_path: Path,
) -> dict[tuple[str, str, str], np.array]:
structure = gemmi.read_structure(str(structure_path))

ligand_coords = {}
for model in structure:
for chain in model:
Expand Down Expand Up @@ -922,37 +1000,22 @@ def get_closest_event_map(
else:
return None, None


def get_dataset_event_maps(
self, xtal_name: str, pdb_file: Path, event_tables: dict[Path, pd.DataFrame]
self, xtal_name: str, ligand_coords: dict[(str,str,int), np.array], event_tables: dict[Path, pd.DataFrame]
) -> dict[tuple[str, str, str], Path]:
# Get the relevant structure
structure = gemmi.read_structure(str(pdb_file))

# Get the coordinates of ligands
ligand_coords = self.get_ligand_coords(structure)
# ligand_coords = self.get_ligand_coords(structure)

# Get the closest events within some reasonable radius
closest_event_maps = {}
for ligand_key, ligand_coord in ligand_coords.items():
closest_event_map, data = self.get_closest_event_map(xtal_name, ligand_coord, event_tables)
if closest_event_map:
closest_event_maps[ligand_key] = (closest_event_map, data[0], data[1])
else:
if xtal_name in self.panddas_missing_ok:
self.logger.warn(
"no PanDDA event map found for",
xtal_name,
"but this is OK as it's been added to the panddas_missing_ok",
"list in the config file",
)
else:
self.logger.error(
"no PanDDA event map found. If you want to allow this then add",
xtal_name,
"to the panddas_missing_ok list in the config file",
)

closest_event_maps[ligand_key] = (None, None, None)

return closest_event_maps

Expand Down

0 comments on commit 75aded8

Please sign in to comment.