Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pyfunc code #261

Merged
merged 24 commits into from
Aug 15, 2024
Merged
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
23af3a8
Allow defining func_code as a plain string
awicenec May 29, 2024
a44a19c
Backward compatibility for dask
awicenec May 29, 2024
9915b0b
Final updates, all tests passing
awicenec May 29, 2024
7b8ca98
Added pyext as a dependency
awicenec May 29, 2024
8a73b5c
added pyext to doc build
awicenec May 29, 2024
cbf3b86
Moved pyext to mock modules
awicenec May 29, 2024
b747c43
Merge branch 'master' into pyfunc_code
awicenec Jul 18, 2024
bcb246b
LIU-395: Resolve missing AppDrop tracebacks from log files.
myxie Aug 8, 2024
d86bb27
LIU-395: Runtime needs to be imported before other dlg modules.
myxie Aug 8, 2024
4af4f31
LIU-395: Change DLG_ROOT to current test environment.
myxie Aug 8, 2024
7405e7c
LIU-395: Move out log test to separate file.
myxie Aug 8, 2024
f9a013f
LIU-396: Add additional port options to CLI for NodeManager.
myxie Aug 12, 2024
fdc78bb
Merge branch 'master' into pyfunc_code
awicenec Aug 14, 2024
ba5483f
Better error reporting and pyext installation.
awicenec Aug 14, 2024
442b0ca
Trial to fix CI failing tests.
awicenec Aug 14, 2024
2ac2a0d
Switch to latest 3.8 version
awicenec Aug 14, 2024
2f81c27
Maybe timeout too low?
awicenec Aug 14, 2024
c72daaa
More trial
awicenec Aug 14, 2024
ded2e41
Maybe skipping 3.8 works?
awicenec Aug 14, 2024
a3e9a43
Initialize array, else it is kept across sessions.
awicenec Aug 14, 2024
6fe06e7
Changed back to 5s timeout
awicenec Aug 15, 2024
dea31d2
Merge remote-tracking branch 'origin/LIU-396' into pyfunc_code
awicenec Aug 15, 2024
6a34fb4
Merge remote-tracking branch 'origin/LIU-395' into pyfunc_code
awicenec Aug 15, 2024
d8c11dc
Fixed lint error
awicenec Aug 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
LIU-395: Move out log test to separate file.
- Also added the `conftest.py` so we properly load the runtime when starting pytest as a suite (as opposed to running invidually).
  • Loading branch information
myxie committed Aug 8, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 7405e7c38dcb5fd228253dfb44b1915df8f9b526
8 changes: 8 additions & 0 deletions daliuge-engine/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""
This file ensures dlg.runtime is loaded up when running pytest on a suite of tests.

This sets up the correct logging runtime and drop-tracking.
"""

import dlg.runtime
import pytest
1 change: 0 additions & 1 deletion daliuge-engine/dlg/manager/session.py
Original file line number Diff line number Diff line change
@@ -289,7 +289,6 @@ def addGraphSpec(self, graphSpec):

# This will check the consistency of each dropSpec
logger.debug("Trying to add graphSpec:")
logger.exception("Trying to test exception")
for x in graphSpec:
logger.debug("%s: %s", x, x.keys())
try:
104 changes: 104 additions & 0 deletions daliuge-engine/test/test_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import logging
import time
import pytest

from pathlib import Path


from dlg.runtime import version # Imported to setup DlgLogger
from dlg.apps.app_base import BarrierAppDROP
from dlg.droputils import DROPWaiterCtx
from dlg.manager.session import Session, generateLogFileName

default_repro = {
"rmode": "1",
"RERUN": {
"lg_blockhash": "x",
"pgt_blockhash": "y",
"pg_blockhash": "z",
},
}
default_graph_repro = {
"rmode": "1",
"meta_data": {"repro_protocol": 0.1, "hashing_alg": "_sha3.sha3_256"},
"merkleroot": "a",
"RERUN": {
"signature": "b",
},
}

class MockThrowingDrop(BarrierAppDROP):
def run(self):
raise RuntimeError("App drop thrown")

def add_test_reprodata(graph: list):
for drop in graph:
drop["reprodata"] = default_repro.copy()
graph.append(default_graph_repro.copy())
return graph

def test_logs(caplog):
"""
Confirm that when we run a session in which the AppDrop experiences a runtime error,
we produce the trackback for that application in the file.

This acts as a regression test to make sure changes in the future don't lead to
app/data drop tracking no longer adding appropriate attributes to the LogRecords such
that they pass the filter setup in the Session constructor. For further information,
review the runtime/__init__.py file.

The test uses the pytest.caplog fixture to first confirm that:
1. An exception is logged in the DlgLogger, and
2. That the exception passes through the filters setup in the Session constructor, and
properly described in the session log file.

This aims to detect regressions when re-organising class structures or logging in the
future.
"""

with caplog.at_level(logging.INFO):
with Session("log-session") as s:
s.addGraphSpec(
add_test_reprodata(
[
{
"oid": "A",
"categoryType": "Data",
"dropclass": "dlg.data.drops.memory.InMemoryDROP",
"consumers": ["B"],
},
{
"oid": "B",
"categoryType": "Application",
"dropclass": "test.test_session.MockThrowingDrop",
"sleep_time": 2,
},
{
"oid": "C",
"categoryType": "Data",
"dropclass": "dlg.data.drops.memory.InMemoryDROP",
"producers": ["B"],
},
]
)
)

s.deploy()
with DROPWaiterCtx(None, s.drops["C"], 300):
s.drops["A"].write(b"x")
s.drops["A"].setCompleted()

# Logger needs time to get messages.
time.sleep(5)
logfile = Path(generateLogFileName(s._sessionDir, s.sessionId))
exception_logged = False
for record in caplog.records:
if record.name == 'dlg.apps.app_base' and record.levelname == 'ERROR':
exception_logged = True
with logfile.open('r') as f:
buffer = f.read()
assert record.name in buffer
assert 'Traceback' in buffer
assert 'App drop thrown' in buffer
assert exception_logged
logfile.unlink(missing_ok=True)
73 changes: 2 additions & 71 deletions daliuge-engine/test/test_session.py
Original file line number Diff line number Diff line change
@@ -29,8 +29,9 @@

from pathlib import Path

from dlg.runtime import version # Imported to setup DlgLogger
# from dlg.runtime import version # Imported to setup DlgLogger

from dlg.utils import getDlgWorkDir
from dlg.apps.app_base import BarrierAppDROP
from dlg.ddap_protocol import DROPLinkType, DROPStates, AppDROPStates
from dlg.droputils import DROPWaiterCtx
@@ -67,76 +68,6 @@ def add_test_reprodata(graph: list):
return graph


def test_logs(caplog):
"""
Confirm that when we run a session in which the AppDrop experiences a runtime error,
we produce the trackback for that application in the file.

This acts as a regression test to make sure changes in the future don't lead to
app/data drop tracking no longer adding appropriate attributes to the LogRecords such
that they pass the filter setup in the Session constructor. For further information,
review the runtime/__init__.py file.

The test uses the pytest.caplog fixture to first confirm that:
1. An exception is logged in the DlgLogger, and
2. That the exception passes through the filters setup in the Session constructor, and
properly described in the session log file.

This aims to detect regressions when re-organising class structures or logging in the
future.
"""
tmp_root = os.environ["DLG_ROOT"]
os.environ["DLG_ROOT"] = str(Path(__file__).cwd())
with caplog.at_level(logging.INFO):
with Session("1") as s:
# caplog.handler.addFilter(SessionFilter("1"))
s.addGraphSpec(
add_test_reprodata(
[
{
"oid": "A",
"categoryType": "Data",
"dropclass": "dlg.data.drops.memory.InMemoryDROP",
"consumers": ["B"],
},
{
"oid": "B",
"categoryType": "Application",
"dropclass": "test.test_session.MockThrowingDrop",
"sleep_time": 2,
},
{
"oid": "C",
"categoryType": "Data",
"dropclass": "dlg.data.drops.memory.InMemoryDROP",
"producers": ["B"],
},
]
)
)

s.deploy()
with DROPWaiterCtx(None, s.drops["C"], 1):
s.drops["A"].write(b"x")
s.drops["A"].setCompleted()

# Logger needs time to get messages.
time.sleep(5)
logfile = Path(generateLogFileName(s._sessionDir, s.sessionId))
exception_logged = False
for record in caplog.records:
if record.name == 'dlg.apps.app_base' and record.levelname == 'ERROR':
exception_logged = True
with logfile.open('r') as f:
buffer = f.read()
assert record.name in buffer
assert 'Traceback' in buffer
assert 'App drop thrown' in buffer
assert exception_logged
logfile.unlink(missing_ok=True)
os.environ["DLG_ROOT"] = tmp_root


class TestSession(unittest.TestCase):
def test_sessionStates(self):
with Session("1") as s:
Loading