Skip to content

Commit

Permalink
fix #351: enable query of job logging DB via search endpoint. Handle …
Browse files Browse the repository at this point in the history
…killing jobs properly
  • Loading branch information
ryuwd committed Dec 19, 2024
1 parent 8b93c95 commit 492d77b
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 58 deletions.
36 changes: 20 additions & 16 deletions diracx-db/src/diracx/db/sql/job_logging/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,9 @@ def get_epoc(date):

seqnum = {jid: seqnum for jid, seqnum in (await self.conn.execute(seqnum_stmt))}
# IF a seqnum is not found, then assume it does not exist and the first sequence number is 1.

# https://docs.sqlalchemy.org/en/20/orm/queryguide/dml.html#orm-bulk-insert-statements
await self.conn.execute(
insert(LoggingInfo),
LoggingInfo.__table__.insert(),
[
{
"JobID": record.job_id,
Expand All @@ -118,38 +117,43 @@ def get_epoc(date):
"ApplicationStatus": record.application_status[:255],
"StatusTime": record.date,
"StatusTimeOrder": get_epoc(record.date),
"Source": record.source[:32],
"StatusSource": record.source[:32],
}
for record in records
],
)

async def get_records(self, job_id: int) -> list[JobStatusReturn]:
async def get_records(self, job_ids: list[int]) -> dict[int, JobStatusReturn]:
"""Returns a Status,MinorStatus,ApplicationStatus,StatusTime,Source tuple
for each record found for job specified by its jobID in historical order.
"""
# We could potentially use a group_by here, but we need to post-process the
# results later.
stmt = (
select(
LoggingInfo.JobID,
LoggingInfo.Status,
LoggingInfo.MinorStatus,
LoggingInfo.ApplicationStatus,
LoggingInfo.StatusTime,
LoggingInfo.Source,
)
.where(LoggingInfo.JobID == int(job_id))
.where(LoggingInfo.JobID.in_(job_ids))
.order_by(LoggingInfo.StatusTimeOrder, LoggingInfo.StatusTime)
)
rows = await self.conn.execute(stmt)

values = []
values = defaultdict(list)
for (
job_id,
status,
minor_status,
application_status,
status_time,
status_source,
) in rows:
values.append(

values[job_id].append(
[
status,
minor_status,
Expand All @@ -161,16 +165,16 @@ async def get_records(self, job_id: int) -> list[JobStatusReturn]:

# If no value has been set for the application status in the first place,
# We put this status to unknown
res = []
if values:
if values[0][2] == "idem":
values[0][2] = "Unknown"
res: dict = defaultdict(list)
for job_id, history in values.items():
if history[0][2] == "idem":
history[0][2] = "Unknown"

Check warning on line 171 in diracx-db/src/diracx/db/sql/job_logging/db.py

View check run for this annotation

Codecov / codecov/patch

diracx-db/src/diracx/db/sql/job_logging/db.py#L171

Added line #L171 was not covered by tests

# We replace "idem" values by the value previously stated
for i in range(1, len(values)):
for i in range(1, len(history)):
for j in range(3):
if values[i][j] == "idem":
values[i][j] = values[i - 1][j]
if history[i][j] == "idem":
history[i][j] = history[i - 1][j]

Check warning on line 177 in diracx-db/src/diracx/db/sql/job_logging/db.py

View check run for this annotation

Codecov / codecov/patch

diracx-db/src/diracx/db/sql/job_logging/db.py#L177

Added line #L177 was not covered by tests

# And we replace arrays with tuples
for (
Expand All @@ -179,8 +183,8 @@ async def get_records(self, job_id: int) -> list[JobStatusReturn]:
application_status,
status_time,
status_source,
) in values:
res.append(
) in history:
res[job_id].append(
JobStatusReturn(
Status=status,
MinorStatus=minor_status,
Expand Down
1 change: 1 addition & 0 deletions diracx-db/src/diracx/db/sql/utils/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ async def set_job_status_bulk(
)

job_data: dict[str, str] = {}
new_status: str | None = None

Check warning on line 438 in diracx-db/src/diracx/db/sql/utils/job.py

View check run for this annotation

Codecov / codecov/patch

diracx-db/src/diracx/db/sql/utils/job.py#L438

Added line #L438 was not covered by tests
if updateTimes[-1] >= lastTime:
new_status, new_minor, new_application = (
returnValueOrRaise( # TODO: Catch this
Expand Down
73 changes: 56 additions & 17 deletions diracx-db/tests/jobs/test_jobLoggingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,66 @@ async def job_logging_db():
yield job_logging_db


async def test_insert_record(job_logging_db: JobLoggingDB):
async def test_insert_records(job_logging_db: JobLoggingDB):
async with job_logging_db as job_logging_db:
# Arrange
date = datetime.now(timezone.utc)

# Act
await job_logging_db.insert_record(
1,
status=JobStatus.RECEIVED,
minor_status="minor_status",
application_status="application_status",
date=date,
source="pytest",
)
for i in range(50):
await job_logging_db.insert_record(
i,
status=JobStatus.RECEIVED,
minor_status="received_minor_status",
application_status="application_status",
date=date,
source="pytest",
)
await job_logging_db.insert_record(
i,
status=JobStatus.SUBMITTING,
minor_status="submitted_minor_status",
application_status="application_status",
date=date,
source="pytest",
)
await job_logging_db.insert_record(
i,
status=JobStatus.RUNNING,
minor_status="running_minor_status",
application_status="application_status",
date=date,
source="pytest",
)

# Assert
res = await job_logging_db.get_records(1)

assert len(res) == 1
assert res[0].Status == JobStatus.RECEIVED.value
assert res[0].MinorStatus == "minor_status"
assert res[0].ApplicationStatus == "application_status"
assert res[0].StatusTime == date
assert res[0].Source == "pytest"
res = await job_logging_db.get_records([i for i in range(50)])

assert len(res) == 50
# Check the first job - first record
assert res[0][0].Status == JobStatus.RECEIVED.value
assert res[0][0].MinorStatus == "received_minor_status"
assert res[0][0].ApplicationStatus == "application_status"
assert res[0][0].StatusTime == date
assert res[0][0].Source == "pytest"

# Check the first job - second record
assert res[0][1].Status == JobStatus.SUBMITTING.value
assert res[0][1].MinorStatus == "submitted_minor_status"
assert res[0][1].ApplicationStatus == "application_status"
assert res[0][1].StatusTime == date
assert res[0][1].Source == "pytest"

# Check the first job - third record
assert res[0][2].Status == JobStatus.RUNNING.value
assert res[0][2].MinorStatus == "running_minor_status"
assert res[0][2].ApplicationStatus == "application_status"
assert res[0][2].StatusTime == date
assert res[0][2].Source == "pytest"

# Check the last job - third record
assert res[49][2].Status == JobStatus.RUNNING.value
assert res[49][2].MinorStatus == "running_minor_status"
assert res[49][2].ApplicationStatus == "application_status"
assert res[49][2].StatusTime == date
assert res[49][2].Source == "pytest"
16 changes: 16 additions & 0 deletions diracx-routers/src/diracx/routers/jobs/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ..dependencies import (
Config,
JobDB,
JobLoggingDB,
JobParametersDB,
)
from ..fastapi_classes import DiracxRouter
Expand Down Expand Up @@ -143,6 +144,7 @@ async def search(
config: Config,
job_db: JobDB,
job_parameters_db: JobParametersDB,
job_logging_db: JobLoggingDB,
user_info: Annotated[AuthorizedUserInfo, Depends(verify_dirac_access_token)],
check_permissions: CheckWMSPolicyCallable,
response: Response,
Expand All @@ -164,6 +166,12 @@ async def search(

if body is None:
body = JobSearchParams()

if query_logging_info := ("LoggingInfo" in (body.parameters or [])):
if body.parameters:
body.parameters.remove("LoggingInfo")
body.parameters = ["JobID"] + (body.parameters or [])

# TODO: Apply all the job policy stuff properly using user_info
if not config.Operations["Defaults"].Services.JobMonitoring.GlobalJobsInfo:
body.search.append(
Expand All @@ -182,6 +190,14 @@ async def search(
page=page,
per_page=per_page,
)

if query_logging_info:

Check warning on line 194 in diracx-routers/src/diracx/routers/jobs/query.py

View check run for this annotation

Codecov / codecov/patch

diracx-routers/src/diracx/routers/jobs/query.py#L194

Added line #L194 was not covered by tests
job_logging_info = await job_logging_db.get_records(
[job["JobID"] for job in jobs]
)
for job in jobs:
job.update({"LoggingInfo": job_logging_info[job["JobID"]]})

Check warning on line 199 in diracx-routers/src/diracx/routers/jobs/query.py

View check run for this annotation

Codecov / codecov/patch

diracx-routers/src/diracx/routers/jobs/query.py#L198-L199

Added lines #L198 - L199 were not covered by tests

# Set the Content-Range header if needed
# https://datatracker.ietf.org/doc/html/rfc7233#section-4

Expand Down
67 changes: 42 additions & 25 deletions diracx-routers/tests/test_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,56 +469,73 @@ def test_get_job_status_in_bulk(normal_user_client: TestClient, valid_job_ids: l
async def test_get_job_status_history(
normal_user_client: TestClient, valid_job_id: int
):
pytest.skip("TODO: decide whether to keep this")
# Arrange
r = normal_user_client.get(f"/api/jobs/{valid_job_id}/status")
r = normal_user_client.post(
"/api/jobs/search",
json={
"parameters": ["JobID", "Status", "MinorStatus", "ApplicationStatus"],
"search": [{"parameter": "JobID", "operator": "eq", "value": valid_job_id}],
},
)
assert r.status_code == 200, r.json()
assert r.json()[str(valid_job_id)]["Status"] == JobStatus.RECEIVED.value
assert r.json()[str(valid_job_id)]["MinorStatus"] == "Job accepted"
assert r.json()[str(valid_job_id)]["ApplicationStatus"] == "Unknown"
assert r.json()[0]["Status"] == JobStatus.RECEIVED.value
assert r.json()[0]["MinorStatus"] == "Job accepted"
assert r.json()[0]["ApplicationStatus"] == "Unknown"

NEW_STATUS = JobStatus.CHECKING.value
NEW_MINOR_STATUS = "JobPath"
before = datetime.now(timezone.utc)

r = normal_user_client.patch(
f"/api/jobs/{valid_job_id}/status",
"/api/jobs/status",
json={
datetime.now(tz=timezone.utc).isoformat(): {
"Status": NEW_STATUS,
"MinorStatus": NEW_MINOR_STATUS,
valid_job_id: {
datetime.now(tz=timezone.utc).isoformat(): {
"Status": NEW_STATUS,
"MinorStatus": NEW_MINOR_STATUS,
}
}
},
)

after = datetime.now(timezone.utc)

assert r.status_code == 200, r.json()
assert r.json()[str(valid_job_id)]["Status"] == NEW_STATUS
assert r.json()[str(valid_job_id)]["MinorStatus"] == NEW_MINOR_STATUS
assert r.json()["success"][str(valid_job_id)]["Status"] == NEW_STATUS
assert r.json()["success"][str(valid_job_id)]["MinorStatus"] == NEW_MINOR_STATUS

# Act
r = normal_user_client.get(
f"/api/jobs/{valid_job_id}/status/history",
r = normal_user_client.post(
"/api/jobs/search",
json={
"parameters": [
"JobID",
"Status",
"MinorStatus",
"ApplicationStatus",
"LoggingInfo",
],
"search": [{"parameter": "JobID", "operator": "eq", "value": valid_job_id}],
},
)

# Assert
assert r.status_code == 200, r.json()
assert len(r.json()) == 1
assert len(r.json()[str(valid_job_id)]) == 2
assert r.json()[str(valid_job_id)][0]["Status"] == JobStatus.RECEIVED.value
assert r.json()[str(valid_job_id)][0]["MinorStatus"] == "Job accepted"
assert r.json()[str(valid_job_id)][0]["ApplicationStatus"] == "Unknown"
assert r.json()[str(valid_job_id)][0]["Source"] == "JobManager"

assert r.json()[str(valid_job_id)][1]["Status"] == JobStatus.CHECKING.value
assert r.json()[str(valid_job_id)][1]["MinorStatus"] == "JobPath"
assert r.json()[str(valid_job_id)][1]["ApplicationStatus"] == "Unknown"
assert len(r.json()[0]["LoggingInfo"]) == 2
assert r.json()[0]["LoggingInfo"][0]["Status"] == JobStatus.RECEIVED.value
assert r.json()[0]["LoggingInfo"][0]["MinorStatus"] == "Job accepted"
assert r.json()[0]["LoggingInfo"][0]["ApplicationStatus"] == "Unknown"
assert r.json()[0]["LoggingInfo"][0]["Source"] == "JobManager"

assert r.json()[0]["LoggingInfo"][1]["Status"] == JobStatus.CHECKING.value
assert r.json()[0]["LoggingInfo"][1]["MinorStatus"] == "JobPath"
assert r.json()[0]["LoggingInfo"][1]["ApplicationStatus"] == "Unknown"
assert (
before
< datetime.fromisoformat(r.json()[str(valid_job_id)][1]["StatusTime"])
< datetime.fromisoformat(r.json()[0]["LoggingInfo"][1]["StatusTime"])
< after
)
assert r.json()[str(valid_job_id)][1]["Source"] == "Unknown"
assert r.json()[0]["LoggingInfo"][1]["Source"] == "Unknown"


def test_get_job_status_history_in_bulk(
Expand Down

0 comments on commit 492d77b

Please sign in to comment.