Skip to content

Commit

Permalink
Check if we can determine extension as soon as possible
Browse files Browse the repository at this point in the history
Ideally we'd use session.begin_nested() and session.rollback() to undo
staged changes, but I couldn't get that to work without changing the
entire commit strategy.
  • Loading branch information
mvdbeek committed Dec 20, 2024
1 parent f7ce1a5 commit eeacd6c
Showing 1 changed file with 101 additions and 81 deletions.
182 changes: 101 additions & 81 deletions lib/galaxy/tools/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,23 +496,24 @@ def execute(

async_tool = tool.tool_type == "data_source_async"

def handle_output(name, output, hidden=None):
def handle_output(name, output, hidden=None, ext: Optional[str] = None):
if async_tool and name in incoming:
# HACK: output data has already been created as a result of the async controller
dataid = incoming[name]
data = trans.sa_session.get(HistoryDatasetAssociation, dataid)
assert data is not None
out_data[name] = data
else:
ext = determine_output_format(
output,
wrapped_params.params,
inp_data,
inp_dataset_collections,
input_ext,
python_template_version=tool.python_template_version,
execution_cache=execution_cache,
)
if ext is None:
ext = determine_output_format(
output,
wrapped_params.params,
inp_data,
inp_dataset_collections,
input_ext,
python_template_version=tool.python_template_version,
execution_cache=execution_cache,
)
create_datasets = True
dataset = None

Expand Down Expand Up @@ -586,82 +587,99 @@ def handle_output(name, output, hidden=None):
return data

child_dataset_names = set()

for name, output in tool.outputs.items():
if not filter_output(tool, output, incoming):
handle_output_timer = ExecutionTimer()
if output.collection:
if completed_job and dataset_collection_elements and name in dataset_collection_elements:
# Output collection is mapped over and has already been copied from original job
continue
collections_manager = app.dataset_collection_manager
element_identifiers: List[Dict[str, Union[str, List[Dict[str, Union[str, List[Any]]]]]]] = []
# mypy doesn't yet support recursive type definitions
known_outputs = output.known_outputs(input_collections, collections_manager.type_registry)
# Just to echo TODO elsewhere - this should be restructured to allow
# nested collections.
for output_part_def in known_outputs:
# Add elements to top-level collection, unless nested...
current_element_identifiers = element_identifiers
current_collection_type = output.structure.collection_type

for parent_id in output_part_def.parent_ids or []:
# TODO: replace following line with formal abstractions for doing this.
current_collection_type = ":".join(current_collection_type.split(":")[1:])
name_to_index = {
value["name"]: index for (index, value) in enumerate(current_element_identifiers)
}
if parent_id not in name_to_index:
if parent_id not in current_element_identifiers:
index = len(current_element_identifiers)
current_element_identifiers.append(
dict(
name=parent_id,
collection_type=current_collection_type,
src="new_collection",
element_identifiers=[],
)
outputs = {name: output for name, output in tool.outputs.items() if not filter_output(tool, output, incoming)}
name_output_extension = [
(
name,
output,
determine_output_format(
output,
wrapped_params.params,
inp_data,
inp_dataset_collections,
input_ext,
python_template_version=tool.python_template_version,
execution_cache=execution_cache,
),
)
for name, output in outputs.items()
]

for name, output, ext in name_output_extension:
handle_output_timer = ExecutionTimer()
handle_output(name, output, ext=ext)
if output.collection:
if completed_job and dataset_collection_elements and name in dataset_collection_elements:
# Output collection is mapped over and has already been copied from original job
continue
collections_manager = app.dataset_collection_manager
element_identifiers: List[Dict[str, Union[str, List[Dict[str, Union[str, List[Any]]]]]]] = []
# mypy doesn't yet support recursive type definitions
known_outputs = output.known_outputs(input_collections, collections_manager.type_registry)
# Just to echo TODO elsewhere - this should be restructured to allow
# nested collections.
for output_part_def in known_outputs:
# Add elements to top-level collection, unless nested...
current_element_identifiers = element_identifiers
current_collection_type = output.structure.collection_type

for parent_id in output_part_def.parent_ids or []:
# TODO: replace following line with formal abstractions for doing this.
current_collection_type = ":".join(current_collection_type.split(":")[1:])
name_to_index = {
value["name"]: index for (index, value) in enumerate(current_element_identifiers)
}
if parent_id not in name_to_index:
if parent_id not in current_element_identifiers:
index = len(current_element_identifiers)
current_element_identifiers.append(
dict(
name=parent_id,
collection_type=current_collection_type,
src="new_collection",
element_identifiers=[],
)
else:
index = name_to_index[parent_id]
current_element_identifiers = cast(
List[
Dict[
str,
Union[str, List[Dict[str, Union[str, List[Any]]]]],
]
],
current_element_identifiers[index]["element_identifiers"],
)

effective_output_name = output_part_def.effective_output_name
child_dataset_names.add(effective_output_name)
element = handle_output(effective_output_name, output_part_def.output_def, hidden=True)
history.stage_addition(element)
# TODO: this shouldn't exist in the top-level of the history at all
# but for now we are still working around that by hiding the contents
# there.
# Following hack causes dataset to no be added to history...
trans.sa_session.add(element)
current_element_identifiers.append(
{
"__object__": element,
"name": output_part_def.element_identifier,
}
)
else:
index = name_to_index[parent_id]
current_element_identifiers = cast(
List[
Dict[
str,
Union[str, List[Dict[str, Union[str, List[Any]]]]],
]
],
current_element_identifiers[index]["element_identifiers"],
)

if output.dynamic_structure:
assert not element_identifiers # known_outputs must have been empty
element_kwds = dict(elements=collections_manager.ELEMENTS_UNINITIALIZED)
else:
element_kwds = dict(element_identifiers=element_identifiers)
output_collections.create_collection(
output=output, name=name, completed_job=completed_job, **element_kwds
effective_output_name = output_part_def.effective_output_name
child_dataset_names.add(effective_output_name)
element = handle_output(effective_output_name, output_part_def.output_def, hidden=True)
history.stage_addition(element)
# TODO: this shouldn't exist in the top-level of the history at all
# but for now we are still working around that by hiding the contents
# there.
# Following hack causes dataset to no be added to history...
trans.sa_session.add(element)
current_element_identifiers.append(
{
"__object__": element,
"name": output_part_def.element_identifier,
}
)
log.info(f"Handled collection output named {name} for tool {tool.id} {handle_output_timer}")

if output.dynamic_structure:
assert not element_identifiers # known_outputs must have been empty
element_kwds = dict(elements=collections_manager.ELEMENTS_UNINITIALIZED)
else:
handle_output(name, output)
log.info(f"Handled output named {name} for tool {tool.id} {handle_output_timer}")
element_kwds = dict(element_identifiers=element_identifiers)
output_collections.create_collection(
output=output, name=name, completed_job=completed_job, **element_kwds
)
log.info(f"Handled collection output named {name} for tool {tool.id} {handle_output_timer}")
else:
handle_output(name, output)
log.info(f"Handled output named {name} for tool {tool.id} {handle_output_timer}")

add_datasets_timer = tool.app.execution_timer_factory.get_timer(
"internals.galaxy.tools.actions.add_datasets",
Expand Down Expand Up @@ -1240,6 +1258,8 @@ def determine_output_format(
break
input_dataset = input_element.element_object
ext = get_ext_or_implicit_ext(input_dataset)
except ToolInputsNotReadyException:
raise
except Exception as e:
log.debug("Exception while trying to determine format_source: %s", e)

Expand Down

0 comments on commit eeacd6c

Please sign in to comment.