Skip to content

Commit

Permalink
Merge pull request #64 from simon-mazenoux/feat-implement-taskqueuedb
Browse files Browse the repository at this point in the history
Implement kill, delete and remove endpoints
  • Loading branch information
chrisburr authored Oct 26, 2023
2 parents 0cddd49 + b333ae4 commit ba6966b
Show file tree
Hide file tree
Showing 22 changed files with 1,792 additions and 223 deletions.
1 change: 1 addition & 0 deletions run_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export DIRACX_DB_URL_AUTHDB="sqlite+aiosqlite:///:memory:"
export DIRACX_DB_URL_JOBDB="sqlite+aiosqlite:///:memory:"
export DIRACX_DB_URL_JOBLOGGINGDB="sqlite+aiosqlite:///:memory:"
export DIRACX_DB_URL_SANDBOXMETADATADB="sqlite+aiosqlite:///:memory:"
export DIRACX_DB_URL_TASKQUEUEDB="sqlite+aiosqlite:///:memory:"
export DIRACX_SERVICE_AUTH_TOKEN_KEY="file://${signing_key}"
export DIRACX_SERVICE_AUTH_ALLOWED_REDIRECTS='["http://'$(hostname| tr -s '[:upper:]' '[:lower:]')':8000/docs/oauth2-redirect"]'
export DIRACX_SANDBOX_STORE_BUCKET_NAME=sandboxes
Expand Down
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ diracx.db.sql =
JobDB = diracx.db.sql:JobDB
JobLoggingDB = diracx.db.sql:JobLoggingDB
SandboxMetadataDB = diracx.db.sql:SandboxMetadataDB
#DummyDB = diracx.db:DummyDB
TaskQueueDB = diracx.db.sql:TaskQueueDB
#DummyDB = diracx.db.sql:DummyDB
diracx.db.os =
JobParametersDB = diracx.db.os:JobParametersDB
diracx.services =
Expand Down
4 changes: 1 addition & 3 deletions src/diracx/cli/internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ def generate_cs(
DefaultGroup=user_group,
Users={},
Groups={
user_group: GroupConfig(
JobShare=None, Properties={"NormalUser"}, Quota=None, Users=set()
)
user_group: GroupConfig(Properties={"NormalUser"}, Quota=None, Users=set())
},
)
config = Config(
Expand Down
228 changes: 228 additions & 0 deletions src/diracx/client/aio/operations/_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
build_auth_userinfo_request,
build_config_serve_config_request,
build_jobs_delete_bulk_jobs_request,
build_jobs_delete_single_job_request,
build_jobs_get_job_status_bulk_request,
build_jobs_get_job_status_history_bulk_request,
build_jobs_get_sandbox_file_request,
Expand All @@ -43,6 +44,9 @@
build_jobs_get_single_job_status_request,
build_jobs_initiate_sandbox_upload_request,
build_jobs_kill_bulk_jobs_request,
build_jobs_kill_single_job_request,
build_jobs_remove_bulk_jobs_request,
build_jobs_remove_single_job_request,
build_jobs_reschedule_bulk_jobs_request,
build_jobs_reschedule_single_job_request,
build_jobs_search_request,
Expand Down Expand Up @@ -1283,6 +1287,64 @@ async def kill_bulk_jobs(self, *, job_ids: List[int], **kwargs: Any) -> Any:

return deserialized

@distributed_trace_async
async def remove_bulk_jobs(self, *, job_ids: List[int], **kwargs: Any) -> Any:
"""Remove Bulk Jobs.
Fully remove a list of jobs from the WMS databases.
WARNING: This endpoint has been implemented for the compatibility with the legacy DIRAC WMS
and the JobCleaningAgent. However, once this agent is ported to diracx, this endpoint should
be removed, and the delete endpoint should be used instead for any other purpose.
:keyword job_ids: Required.
:paramtype job_ids: list[int]
:return: any
:rtype: any
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})

_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}

cls: ClsType[Any] = kwargs.pop("cls", None)

request = build_jobs_remove_bulk_jobs_request(
job_ids=job_ids,
headers=_headers,
params=_params,
)
request.url = self._client.format_url(request.url)

_stream = False
pipeline_response: PipelineResponse = (
await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
)

response = pipeline_response.http_response

if response.status_code not in [200]:
map_error(
status_code=response.status_code, response=response, error_map=error_map
)
raise HttpResponseError(response=response)

deserialized = self._deserialize("object", pipeline_response)

if cls:
return cls(pipeline_response, deserialized, {})

return deserialized

@distributed_trace_async
async def get_job_status_bulk(
self, *, job_ids: List[int], **kwargs: Any
Expand Down Expand Up @@ -1948,6 +2010,172 @@ async def get_single_job(self, job_id: int, **kwargs: Any) -> Any:

return deserialized

@distributed_trace_async
async def delete_single_job(self, job_id: int, **kwargs: Any) -> Any:
"""Delete Single Job.
Delete a job by killing and setting the job status to DELETED.
:param job_id: Required.
:type job_id: int
:return: any
:rtype: any
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})

_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}

cls: ClsType[Any] = kwargs.pop("cls", None)

request = build_jobs_delete_single_job_request(
job_id=job_id,
headers=_headers,
params=_params,
)
request.url = self._client.format_url(request.url)

_stream = False
pipeline_response: PipelineResponse = (
await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
)

response = pipeline_response.http_response

if response.status_code not in [200]:
map_error(
status_code=response.status_code, response=response, error_map=error_map
)
raise HttpResponseError(response=response)

deserialized = self._deserialize("object", pipeline_response)

if cls:
return cls(pipeline_response, deserialized, {})

return deserialized

@distributed_trace_async
async def kill_single_job(self, job_id: int, **kwargs: Any) -> Any:
"""Kill Single Job.
Kill a job.
:param job_id: Required.
:type job_id: int
:return: any
:rtype: any
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})

_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}

cls: ClsType[Any] = kwargs.pop("cls", None)

request = build_jobs_kill_single_job_request(
job_id=job_id,
headers=_headers,
params=_params,
)
request.url = self._client.format_url(request.url)

_stream = False
pipeline_response: PipelineResponse = (
await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
)

response = pipeline_response.http_response

if response.status_code not in [200]:
map_error(
status_code=response.status_code, response=response, error_map=error_map
)
raise HttpResponseError(response=response)

deserialized = self._deserialize("object", pipeline_response)

if cls:
return cls(pipeline_response, deserialized, {})

return deserialized

@distributed_trace_async
async def remove_single_job(self, job_id: int, **kwargs: Any) -> Any:
"""Remove Single Job.
Fully remove a job from the WMS databases.
WARNING: This endpoint has been implemented for the compatibility with the legacy DIRAC WMS
and the JobCleaningAgent. However, once this agent is ported to diracx, this endpoint should
be removed, and the delete endpoint should be used instead.
:param job_id: Required.
:type job_id: int
:return: any
:rtype: any
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})

_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}

cls: ClsType[Any] = kwargs.pop("cls", None)

request = build_jobs_remove_single_job_request(
job_id=job_id,
headers=_headers,
params=_params,
)
request.url = self._client.format_url(request.url)

_stream = False
pipeline_response: PipelineResponse = (
await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
)

response = pipeline_response.http_response

if response.status_code not in [200]:
map_error(
status_code=response.status_code, response=response, error_map=error_map
)
raise HttpResponseError(response=response)

deserialized = self._deserialize("object", pipeline_response)

if cls:
return cls(pipeline_response, deserialized, {})

return deserialized

@distributed_trace_async
async def get_single_job_status(
self, job_id: int, **kwargs: Any
Expand Down
4 changes: 4 additions & 0 deletions src/diracx/client/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from ._models import SandboxInfo
from ._models import SandboxUploadResponse
from ._models import ScalarSearchSpec
from ._models import ScalarSearchSpecValue
from ._models import SetJobStatusReturn
from ._models import SortSpec
from ._models import SortSpecDirection
Expand All @@ -32,6 +33,7 @@
from ._models import ValidationError
from ._models import ValidationErrorLocItem
from ._models import VectorSearchSpec
from ._models import VectorSearchSpecValues

from ._enums import ChecksumAlgorithm
from ._enums import Enum0
Expand Down Expand Up @@ -68,6 +70,7 @@
"SandboxInfo",
"SandboxUploadResponse",
"ScalarSearchSpec",
"ScalarSearchSpecValue",
"SetJobStatusReturn",
"SortSpec",
"SortSpecDirection",
Expand All @@ -78,6 +81,7 @@
"ValidationError",
"ValidationErrorLocItem",
"VectorSearchSpec",
"VectorSearchSpecValues",
"ChecksumAlgorithm",
"Enum0",
"Enum1",
Expand Down
Loading

0 comments on commit ba6966b

Please sign in to comment.