Skip to content

Commit

Permalink
LIU-392: Output ports now use encoding if available
Browse files Browse the repository at this point in the history
  • Loading branch information
myxie committed Dec 19, 2024
1 parent 3f92e4c commit 77c9f2a
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 104 deletions.
143 changes: 63 additions & 80 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,45 +391,41 @@ def _init_appArgs(self, pargsDict: dict, keyargsDict: dict, posargs: list) -> li
"""
pargs = [] # positional arguments
funcargs = {} # Function arguments
if "applicationArgs" in self.parameters:
appArgs = self.parameters["applicationArgs"] # we'll pop the default ones
_dum = [appArgs.pop(k) for k in self.func_def_keywords if k in appArgs]
logger.debug(
"Default keyword arguments removed: %s",
[i for i in _dum],
)
# update the positional args
if self._applicationArgs:
# update the positional args
pargsDict.update(
{k: self.parameters[k] for k in pargsDict if k in self.parameters}
)
# if defined in both we use AppArgs values
for arg in appArgs:
for arg in self._applicationArgs:
# check value type and interpret
if appArgs[arg]["type"] in ["Json", "Complex"]:
if self._applicationArgs[arg]["type"] in ["Json", "Complex"]:
try:
value = ast.literal_eval(appArgs[arg]["value"])
encoding = appArgs[arg]["encoding"]
value = ast.literal_eval(self._applicationArgs[arg]["value"])
# TODO sanity check the encoding?
encoding = self._applicationArgs[arg]["encoding"]
logger.debug(
f"Evaluated %s to %s",
appArgs[arg]["value"],
self._applicationArgs[arg]["value"],
type(value),
)
appArgs[arg]["value"] = value
self._applicationArgs[arg]["value"] = value
except ValueError:
logger.error("Unable to evaluate %s", appArgs[arg]["value"])
logger.error("Unable to evaluate %s", self._applicationArgs[arg]["value"])
else:
value = appArgs[arg]["value"]
encoding = appArgs[arg]["encoding"]
value = self._applicationArgs[arg]["value"]
encoding = self._applicationArgs[arg]["encoding"]
if arg in pargsDict:
pargsDict.update({arg:{"value":value, "encoding":encoding}})

_ = [appArgs.pop(k) for k in pargsDict if k in appArgs]
_ = [self._applicationArgs.pop(k) for k in pargsDict if k in self._applicationArgs]
logger.debug("Updated posargs dictionary: %s", pargsDict)

# update the keyword arguments
keyargsDict.update(
{
k: {"value": appArgs[k]["value"], "encoding": appArgs[k]["encoding"]}
for k in keyargsDict if k in appArgs
k: {"value": self._applicationArgs[k]["value"], "encoding": self._applicationArgs[k]["encoding"]}
for k in keyargsDict if k in self._applicationArgs
}
)
logger.debug("Updated keyargs dictionary: %s", keyargsDict)
Expand All @@ -438,13 +434,13 @@ def _init_appArgs(self, pargsDict: dict, keyargsDict: dict, posargs: list) -> li
# TODO: This should only be done if the function signature allows it
vparg = []
vkarg = {}
logger.debug(f"Remaining AppArguments {appArgs}")
for arg in appArgs:
if appArgs[arg]["type"] in ["Json", "Complex"]:
value = ast.literal_eval(appArgs[arg]["value"])
logger.debug(f"Remaining AppArguments {self._applicationArgs}")
for arg in self._applicationArgs:
if self._applicationArgs[arg]["type"] in ["Json", "Complex"]:
value = ast.literal_eval(self._applicationArgs[arg]["value"])
else:
value = appArgs[arg]["value"]
if appArgs[arg]["positional"]:
value = self._applicationArgs[arg]["value"]
if self._applicationArgs[arg]["positional"]:
vparg.append(value)
else:
vkarg.update({arg: value})
Expand All @@ -458,21 +454,23 @@ def _init_appArgs(self, pargsDict: dict, keyargsDict: dict, posargs: list) -> li
logger.debug("Adding remaining **kwargs to funcargs: %s", vkarg)
funcargs.update(vkarg)
else:
if self.input_parser:
encoding = self.input_parser
else:
encoding = "dill"
pargsDict.update(
{k: {"value": pargsDict[k], "encoding": "pickle"} for k in pargsDict}
{k: {"value": pargsDict[k], "encoding": encoding} for k in pargsDict}
)

tmpPargs = {port: subdict["value"] for port, subdict in pargsDict.items()}
# Extract arg and values from pargs; we no longer need the encoding
tmpPargs = {arg: subdict["value"] for arg, subdict in pargsDict.items()}
logger.debug(f"Updating funcargs with values from pargsDict {pargsDict}")
funcargs.update(tmpPargs)

# Mixin the values from named ports
portargs = self._ports2args(posargs, pargsDict, keyargsDict)

logger.debug(f"Updating funcargs with values from pargsDict {pargsDict}")
# Extract port and values from pargs; we no longer need the encoding

logger.debug(f"Updating funcargs with values from named ports {portargs}")
tmpPortArgs = {port: subdict["value"] for port, subdict in portargs.items()}

funcargs.update(tmpPortArgs)

return [funcargs, pargs]
Expand Down Expand Up @@ -512,44 +510,6 @@ def _ports2args(self, posargs, pargsDict, keyargsDict) -> dict:
parser=get_port_reader_function(self.input_parser)
)
)
else:
check_len = min(
len(iitems),
self.fn_nargs + self.fn_nkw,
)

iitem_keys = list(iitems.keys())
for i in range(min(len(iitems), self.fn_nargs)):
key = iitem_keys[i]
all_contents = get_port_reader_function(self.input_parser)
portargs.update({self.argnames[i]: all_contents(iitems[key])})

# 4. replace default argument values with named output ports
if "outputs" in self.parameters and check_ports_dict(
self.parameters["outputs"]
):
# TODO remove output processing
check_len = min(len(oitems), self.fn_nargs + self.fn_nkw)
outputs_dict = collections.OrderedDict()
for outport in self.parameters["outputs"]:
key = list(outport.keys())[0]
outputs_dict[key] = {
"name": oitems[key],
"path": oitems[key].path if 'path' in oitems else None,
"drop": oitems[key]
}

portargs.update(
identify_named_ports(
outputs_dict,
posargs,
pargsDict,
keyargsDict,
check_len=check_len,
mode="outputs",
addPositionalToKeyword=True,
)
)
return portargs

def initialize_with_func_code(self):
Expand Down Expand Up @@ -735,28 +695,51 @@ def check_outputs_match_results(self, result: tuple):
f"does not match generated results ({len(self.results)})")
return result

def _match_parser(output_drop):
def _match_parser(self, output_drop):
"""
Match the output parser to the appropriate drop
"""

encoding = None
component_params = self.parameters.get("componentParams")
if not component_params:
return self.output_parser
if "outputs" in self.parameters and check_ports_dict(self.parameters["outputs"]):
for outport in self.parameters["outputs"]:
drop_uid, drop_port = list(outport.items())[0]
if drop_uid == output_drop.uid:
encoding = component_params[drop_port]["encoding"]
if encoding:
return DropParser(encoding)
else:
return self.output_parser

def write_results(self, result):

if not self.outputs:
return

for o in self.outputs:
if self.output_parser is DropParser.PICKLE:
parser = self._match_parser(o)
if parser is DropParser.PICKLE:
logger.debug(f"Writing pickeled result {type(result)} to {o}")
o.write(pickle.dumps(result))
elif self.output_parser is DropParser.EVAL:
o.write(repr(result).encode("utf-8"))
elif self.output_parser is DropParser.NPY:
elif parser is DropParser.EVAL or parser is DropParser.UTF8:
encoded_result = repr(result).encode("utf-8")
o.write(encoded_result)
elif parser is DropParser.NPY:
import numpy as np
if not isinstance(result, np.ndarray):
try:
result = np.array(result)
except Exception as e:
raise(e)
drop_loaders.save_npy(o, result)
elif self.output_parser is DropParser.RAW:
elif parser is DropParser.RAW:
o.write(result)
elif self.output_parser is DropParser.DILL:
o.write(pickle.dumps(result))
elif parser is DropParser.DILL:
o.write(dill.dumps(result))
elif parser is DropParser.BINARY:
drop_loaders.save_binary(o, result)
else:
ValueError(self.output_parser.__repr__())

Expand Down
23 changes: 23 additions & 0 deletions daliuge-engine/dlg/drop_loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,26 @@ def load_dill(drop: "DataDROP"):
drop.close(desc)
return dill.loads(buf.getbuffer())

def load_binary(drop: "DataDROP"):
"""
Load binary
"""
buf = io.BytesIO()
desc = drop.open()
while True:
data = drop.read(desc)
if not data:
break
buf.write(data)
drop.close(desc)
return buf.getvalue()

def save_binary(drop: "DataDROP", data: bytes):
"""
Load binary
"""
bytes_data = io.BytesIO(data)
dropio = drop.getIO()
dropio.open(OpenMode.OPEN_WRITE)
dropio.write(bytes_data)
dropio.close()
9 changes: 5 additions & 4 deletions daliuge-engine/dlg/graph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import collections
import importlib
import logging
from typing import List, Optional

from typing import List
from dlg.common.reproducibility.constants import ReproducibilityFlags

from . import droputils
Expand All @@ -39,6 +39,7 @@
LINKTYPE_NTO1_PROPERTY,
LINKTYPE_1TON_APPEND_METHOD,
)

from .data.drops.data_base import NullDROP
from .data.drops.container import ContainerDROP

Expand Down Expand Up @@ -245,7 +246,9 @@ def loadDropSpecs(dropSpecList):
return dropSpecs, reprodata


def createGraphFromDropSpecList(dropSpecList, session=None, ret_drops=False):
def createGraphFromDropSpecList(dropSpecList: List[dict],
session: Optional["Session"]=None
) -> List[AbstractDROP]:
logger.debug("Found %d DROP definitions", len(dropSpecList))

# Step #1: create the actual DROPs
Expand Down Expand Up @@ -317,8 +320,6 @@ def createGraphFromDropSpecList(dropSpecList, session=None, ret_drops=False):
drop for drop in drops.values() if not droputils.getUpstreamObjects(drop)
]
logger.info("%d graph roots found, bye-bye!", len(roots))
if ret_drops:
return roots, drops
return roots


Expand Down
8 changes: 4 additions & 4 deletions daliuge-engine/dlg/named_port_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
class DropParser(Enum):
RAW = "raw"
PICKLE = "pickle"
EVAL = "eval"
EVAL = "eval"
NPY = "npy"
DILL = "dill"
# JSON = "json"
PATH = "path" # input only
DATAURL = "dataurl" # input only
BINARY = "binary"
UTF8 = "utf-8"


def serialize_kwargs(keyargs, prefix="--", separator=" "):
Expand Down Expand Up @@ -378,7 +379,6 @@ def _get_args(appArgs, positional=False):
logger.debug("%s arguments: %s", argType, args)
return args


def get_port_reader_function(input_parser: DropParser):
"""
Return the function used to read input from a named port
Expand All @@ -388,7 +388,7 @@ def get_port_reader_function(input_parser: DropParser):
if input_parser is DropParser.PICKLE:
# all_contents = lambda x: pickle.loads(droputils.allDropContents(x))
reader = drop_loaders.load_pickle
elif input_parser is DropParser.EVAL:
elif input_parser is DropParser.EVAL or input_parser is DropParser.UTF8:

def optionalEval(x):
# Null and Empty Drops will return an empty byte string
Expand All @@ -406,7 +406,7 @@ def optionalEval(x):
elif input_parser is DropParser.DILL:
reader = drop_loaders.load_dill
elif input_parser is DropParser.BINARY:
reader = drop_loaders.load_dill
reader = drop_loaders.load_binary
else:
raise ValueError(input_parser.__repr__())
return reader
Loading

0 comments on commit 77c9f2a

Please sign in to comment.