diff --git a/packages/dsw-command-queue/CHANGELOG.md b/packages/dsw-command-queue/CHANGELOG.md index 190ef5ce..d882c906 100644 --- a/packages/dsw-command-queue/CHANGELOG.md +++ b/packages/dsw-command-queue/CHANGELOG.md @@ -8,6 +8,12 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [4.10.5] + +### Fixed + +- Fixed command queue job timeout (moved support from workers to queue, reliable timeout approach) + ## [4.10.4] Released for version consistency with other DSW tools. @@ -256,3 +262,4 @@ Released for version consistency with other DSW tools. [4.10.2]: /../../tree/v4.10.2 [4.10.3]: /../../tree/v4.10.3 [4.10.4]: /../../tree/v4.10.4 +[4.10.5]: /../../tree/v4.10.5 diff --git a/packages/dsw-command-queue/dsw/command_queue/command_queue.py b/packages/dsw-command-queue/dsw/command_queue/command_queue.py index 2743eb76..56911f46 100644 --- a/packages/dsw-command-queue/dsw/command_queue/command_queue.py +++ b/packages/dsw-command-queue/dsw/command_queue/command_queue.py @@ -1,5 +1,6 @@ import abc import datetime +import func_timeout import logging import os import platform @@ -45,21 +46,26 @@ class CommandWorker: def work(self, payload: PersistentCommand): pass - def process_exception(self, e: Exception): + def process_timeout(self, e: BaseException): + pass + + def process_exception(self, e: BaseException): pass class CommandQueue: def __init__(self, worker: CommandWorker, db: Database, - channel: str, component: str, timeout: float): + channel: str, component: str, wait_timeout: float, + work_timeout: int | None = None): self.worker = worker self.db = db self.queries = CommandQueries( channel=channel, component=component ) - self.timeout = timeout + self.wait_timeout = wait_timeout + self.work_timeout = work_timeout @tenacity.retry( reraise=True, @@ -84,7 +90,7 @@ def run(self): self._fetch_and_process_queued() LOG.debug('Waiting for notifications') - w = select.select(fds, [], [], self.timeout) + w = select.select(fds, [], [], self.wait_timeout) if INTERRUPTED: LOG.debug('Interrupt signal received, ending...') @@ -92,7 +98,7 @@ def run(self): if w == ([], [], []): LOG.debug(f'Nothing received in this cycle ' - f'(timeouted after {self.timeout} seconds)') + f'(timeouted after {self.wait_timeout} seconds)') else: notifications = 0 for n in psycopg.generators.notifies(queue_conn.connection.pgconn): @@ -141,13 +147,46 @@ def fetch_and_process(self) -> bool: LOG.debug(f'Previous state: {command.state}') LOG.debug(f'Attempts: {command.attempts} / {command.max_attempts}') LOG.debug(f'Last error: {command.last_error_message}') + attempt_number = command.attempts + 1 try: - self.worker.work(command) + self.db.execute_query( + query=self.queries.query_command_start(), + attempts=attempt_number, + updated_at=datetime.datetime.now(tz=datetime.UTC), + uuid=command.uuid, + ) + self.db.conn_query.connection.commit() + + def work(): + self.worker.work(command) + + if self.work_timeout is None: + LOG.info('Processing (without any timeout set)') + work() + else: + LOG.info(f'Processing (with timeout set to {self.work_timeout} seconds)') + func_timeout.func_timeout( + timeout=self.work_timeout, + func=work, + args=(), + kwargs=None, + ) self.db.execute_query( query=self.queries.query_command_done(), - attempts=command.attempts + 1, + attempts=attempt_number, + updated_at=datetime.datetime.now(tz=datetime.UTC), + uuid=command.uuid, + ) + except func_timeout.exceptions.FunctionTimedOut as e: + msg = f'Processing exceeded time limit ({self.work_timeout} seconds)' + LOG.warning(msg) + self.worker.process_timeout(e) + self.db.execute_query( + query=self.queries.query_command_error(), + attempts=attempt_number, + error_message=msg, updated_at=datetime.datetime.now(tz=datetime.UTC), uuid=command.uuid, ) @@ -157,7 +196,7 @@ def fetch_and_process(self) -> bool: self.worker.process_exception(e) self.db.execute_query( query=self.queries.query_command_error(), - attempts=command.attempts + 1, + attempts=attempt_number, error_message=msg, updated_at=datetime.datetime.now(tz=datetime.UTC), uuid=command.uuid, diff --git a/packages/dsw-command-queue/dsw/command_queue/query.py b/packages/dsw-command-queue/dsw/command_queue/query.py index 25c61eab..50e2cda6 100644 --- a/packages/dsw-command-queue/dsw/command_queue/query.py +++ b/packages/dsw-command-queue/dsw/command_queue/query.py @@ -47,3 +47,12 @@ def query_command_done() -> str: updated_at = %(updated_at)s WHERE uuid = %(uuid)s; """ + + @staticmethod + def query_command_start() -> str: + return """ + UPDATE persistent_command + SET attempts = %(attempts)s, + updated_at = %(updated_at)s + WHERE uuid = %(uuid)s; + """ diff --git a/packages/dsw-command-queue/pyproject.toml b/packages/dsw-command-queue/pyproject.toml index 148aa1ef..1ed6c522 100644 --- a/packages/dsw-command-queue/pyproject.toml +++ b/packages/dsw-command-queue/pyproject.toml @@ -4,7 +4,7 @@ build-backend = 'setuptools.build_meta' [project] name = 'dsw-command-queue' -version = "4.10.4" +version = "4.10.5" description = 'Library for working with command queue and persistent commands' readme = 'README.md' keywords = ['dsw', 'subscriber', 'publisher', 'database', 'queue', 'processing'] @@ -24,8 +24,9 @@ classifiers = [ ] requires-python = '>=3.10, <4' dependencies = [ + 'func-timeout', # DSW - "dsw-database==4.10.4", + "dsw-database==4.10.5", ] [project.urls] diff --git a/packages/dsw-command-queue/requirements.txt b/packages/dsw-command-queue/requirements.txt index 44fff63e..6ad4fceb 100644 --- a/packages/dsw-command-queue/requirements.txt +++ b/packages/dsw-command-queue/requirements.txt @@ -1,3 +1,4 @@ +func_timeout==4.3.5 psycopg==3.2.1 psycopg-binary==3.2.1 PyYAML==6.0.2 diff --git a/packages/dsw-config/CHANGELOG.md b/packages/dsw-config/CHANGELOG.md index 5ec0acd4..9d9ec650 100644 --- a/packages/dsw-config/CHANGELOG.md +++ b/packages/dsw-config/CHANGELOG.md @@ -8,6 +8,10 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [4.10.5] + +Released for version consistency with other DSW tools. + ## [4.10.4] Released for version consistency with other DSW tools. @@ -268,3 +272,4 @@ Released for version consistency with other DSW tools. [4.10.2]: /../../tree/v4.10.2 [4.10.3]: /../../tree/v4.10.3 [4.10.4]: /../../tree/v4.10.4 +[4.10.5]: /../../tree/v4.10.5 diff --git a/packages/dsw-config/dsw/config/model.py b/packages/dsw-config/dsw/config/model.py index e5e12095..5ff3226c 100644 --- a/packages/dsw-config/dsw/config/model.py +++ b/packages/dsw-config/dsw/config/model.py @@ -39,10 +39,11 @@ def __init__(self, enabled: bool, workers_dsn: Optional[str], class DatabaseConfig(ConfigModel): - def __init__(self, connection_string: str, connection_timeout: int, queue_timout: int): + def __init__(self, connection_string: str, connection_timeout: int, + queue_timeout: int): self.connection_string = connection_string self.connection_timeout = connection_timeout - self.queue_timout = queue_timout + self.queue_timeout = queue_timeout class S3Config(ConfigModel): diff --git a/packages/dsw-config/dsw/config/parser.py b/packages/dsw-config/dsw/config/parser.py index a3382422..b0a042a0 100644 --- a/packages/dsw-config/dsw/config/parser.py +++ b/packages/dsw-config/dsw/config/parser.py @@ -83,7 +83,7 @@ def db(self) -> DatabaseConfig: return DatabaseConfig( connection_string=self.get(self.keys.database.connection_string), connection_timeout=self.get(self.keys.database.connection_timeout), - queue_timout=self.get(self.keys.database.queue_timeout), + queue_timeout=self.get(self.keys.database.queue_timeout), ) @property diff --git a/packages/dsw-config/pyproject.toml b/packages/dsw-config/pyproject.toml index c9abe3c3..aba4eac9 100644 --- a/packages/dsw-config/pyproject.toml +++ b/packages/dsw-config/pyproject.toml @@ -4,7 +4,7 @@ build-backend = 'setuptools.build_meta' [project] name = 'dsw-config' -version = "4.10.4" +version = "4.10.5" description = 'Library for DSW config manipulation' readme = 'README.md' keywords = ['dsw', 'config', 'yaml', 'parser'] diff --git a/packages/dsw-data-seeder/CHANGELOG.md b/packages/dsw-data-seeder/CHANGELOG.md index acf0d432..3e910042 100644 --- a/packages/dsw-data-seeder/CHANGELOG.md +++ b/packages/dsw-data-seeder/CHANGELOG.md @@ -8,6 +8,12 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [4.10.5] + +### Fixed + +- Fixed command queue job timeout (moved support from workers to queue, reliable timeout approach) + ## [4.10.4] Released for version consistency with other DSW tools. @@ -314,3 +320,4 @@ Released for version consistency with other DSW tools. [4.10.2]: /../../tree/v4.10.2 [4.10.3]: /../../tree/v4.10.3 [4.10.4]: /../../tree/v4.10.4 +[4.10.5]: /../../tree/v4.10.5 diff --git a/packages/dsw-data-seeder/dsw/data_seeder/config.py b/packages/dsw-data-seeder/dsw/data_seeder/config.py index 73a2961f..6a016f2d 100644 --- a/packages/dsw-data-seeder/dsw/data_seeder/config.py +++ b/packages/dsw-data-seeder/dsw/data_seeder/config.py @@ -1,14 +1,34 @@ from dsw.config import DSWConfigParser -from dsw.config.keys import ConfigKey, cast_str, cast_int -from dsw.config.model import DatabaseConfig, S3Config, \ +from dsw.config.keys import ConfigKey, ConfigKeys, ConfigKeysContainer, \ + cast_str, cast_int, cast_optional_int +from dsw.config.model import ConfigModel, DatabaseConfig, S3Config, \ LoggingConfig, SentryConfig, CloudConfig, GeneralConfig +class _ExperimentalKeys(ConfigKeysContainer): + job_timeout = ConfigKey( + yaml_path=['experimental', 'jobTimeout'], + var_names=['EXPERIMENTAL_JOB_TIMEOUT'], + default=None, + cast=cast_optional_int, + ) + + +class MailerConfigKeys(ConfigKeys): + experimental = _ExperimentalKeys + + +class ExperimentalConfig(ConfigModel): + + def __init__(self, job_timeout: int | None): + self.job_timeout = job_timeout + + class SeederConfig: def __init__(self, db: DatabaseConfig, s3: S3Config, log: LoggingConfig, sentry: SentryConfig, cloud: CloudConfig, general: GeneralConfig, - extra_dbs: dict[str, DatabaseConfig]): + extra_dbs: dict[str, DatabaseConfig], experimental: ExperimentalConfig): self.general = general self.db = db self.s3 = s3 @@ -16,6 +36,7 @@ def __init__(self, db: DatabaseConfig, s3: S3Config, log: LoggingConfig, self.sentry = sentry self.cloud = cloud self.extra_dbs = extra_dbs + self.experimental = experimental def __str__(self): return f'SeederConfig\n' \ @@ -26,11 +47,16 @@ def __str__(self): f'{self.log}' \ f'{self.sentry}' \ f'{self.cloud}' \ + f'{self.experimental}' \ f'====================\n' class SeederConfigParser(DSWConfigParser): + def __init__(self): + super().__init__(keys=MailerConfigKeys) + self.keys = MailerConfigKeys # type: type[MailerConfigKeys] + @property def extra_dbs(self) -> dict[str, DatabaseConfig]: result = {} @@ -57,6 +83,12 @@ def extra_dbs(self) -> dict[str, DatabaseConfig]: return result + @property + def experimental(self) -> ExperimentalConfig: + return ExperimentalConfig( + job_timeout=self.get(self.keys.experimental.job_timeout), + ) + @property def config(self) -> SeederConfig: return SeederConfig( @@ -67,4 +99,5 @@ def config(self) -> SeederConfig: cloud=self.cloud, general=self.general, extra_dbs=self.extra_dbs, + experimental=self.experimental, ) diff --git a/packages/dsw-data-seeder/dsw/data_seeder/consts.py b/packages/dsw-data-seeder/dsw/data_seeder/consts.py index 98bad67b..ba47b963 100644 --- a/packages/dsw-data-seeder/dsw/data_seeder/consts.py +++ b/packages/dsw-data-seeder/dsw/data_seeder/consts.py @@ -6,7 +6,7 @@ DEFAULT_PLACEHOLDER = '<<|TENANT-ID|>>' NULL_UUID = '00000000-0000-0000-0000-000000000000' PROG_NAME = 'dsw-data-seeder' -VERSION = '4.10.4' +VERSION = '4.10.5' VAR_APP_CONFIG_PATH = 'APPLICATION_CONFIG_PATH' VAR_WORKDIR_PATH = 'WORKDIR_PATH' diff --git a/packages/dsw-data-seeder/dsw/data_seeder/seeder.py b/packages/dsw-data-seeder/dsw/data_seeder/seeder.py index 60c526c9..06422d64 100644 --- a/packages/dsw-data-seeder/dsw/data_seeder/seeder.py +++ b/packages/dsw-data-seeder/dsw/data_seeder/seeder.py @@ -260,7 +260,8 @@ def _run_preparation(self, recipe_name: str) -> CommandQueue: db=Context.get().app.db, channel=CMD_CHANNEL, component=CMD_COMPONENT, - timeout=Context.get().app.cfg.db.queue_timout, + wait_timeout=Context.get().app.cfg.db.queue_timeout, + work_timeout=Context.get().app.cfg.experimental.job_timeout, ) return queue @@ -288,6 +289,14 @@ def work(self, cmd: PersistentCommand): Context.get().update_trace_id('-') SentryReporter.set_context('cmd_uuid', '-') + def process_timeout(self, e: BaseException): + LOG.info('Failed with timeout') + SentryReporter.capture_exception(e) + + def process_exception(self, e: BaseException): + LOG.info('Failed with unexpected error', exc_info=e) + SentryReporter.capture_exception(e) + @staticmethod def _update_component_info(): built_at = dateutil.parser.parse(BUILD_INFO.built_at) diff --git a/packages/dsw-data-seeder/pyproject.toml b/packages/dsw-data-seeder/pyproject.toml index 96f3ec56..872961ea 100644 --- a/packages/dsw-data-seeder/pyproject.toml +++ b/packages/dsw-data-seeder/pyproject.toml @@ -4,7 +4,7 @@ build-backend = 'setuptools.build_meta' [project] name = 'dsw-data-seeder' -version = "4.10.4" +version = "4.10.5" description = 'Worker for seeding DSW data' readme = 'README.md' keywords = ['data', 'database', 'seed', 'storage'] @@ -29,10 +29,10 @@ dependencies = [ 'sentry-sdk', 'tenacity', # DSW - "dsw-command-queue==4.10.4", - "dsw-config==4.10.4", - "dsw-database==4.10.4", - "dsw-storage==4.10.4", + "dsw-command-queue==4.10.5", + "dsw-config==4.10.5", + "dsw-database==4.10.5", + "dsw-storage==4.10.5", ] [project.urls] diff --git a/packages/dsw-data-seeder/requirements.txt b/packages/dsw-data-seeder/requirements.txt index d19364e9..e59b7804 100644 --- a/packages/dsw-data-seeder/requirements.txt +++ b/packages/dsw-data-seeder/requirements.txt @@ -1,5 +1,6 @@ certifi==2024.7.4 click==8.1.7 +func_timeout==4.3.5 minio==7.2.8 psycopg==3.2.1 psycopg-binary==3.2.1 diff --git a/packages/dsw-database/CHANGELOG.md b/packages/dsw-database/CHANGELOG.md index c86a9680..a1c0424d 100644 --- a/packages/dsw-database/CHANGELOG.md +++ b/packages/dsw-database/CHANGELOG.md @@ -8,6 +8,10 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [4.10.5] + +Released for version consistency with other DSW tools. + ## [4.10.4] Released for version consistency with other DSW tools. @@ -279,3 +283,4 @@ Released for version consistency with other DSW tools. [4.10.2]: /../../tree/v4.10.2 [4.10.3]: /../../tree/v4.10.3 [4.10.4]: /../../tree/v4.10.4 +[4.10.5]: /../../tree/v4.10.5 diff --git a/packages/dsw-database/pyproject.toml b/packages/dsw-database/pyproject.toml index 9ef7515f..4979d4ce 100644 --- a/packages/dsw-database/pyproject.toml +++ b/packages/dsw-database/pyproject.toml @@ -4,7 +4,7 @@ build-backend = 'setuptools.build_meta' [project] name = 'dsw-database' -version = "4.10.4" +version = "4.10.5" description = 'Library for managing DSW database' readme = 'README.md' keywords = ['dsw', 'database'] @@ -26,7 +26,7 @@ dependencies = [ 'psycopg[binary]', 'tenacity', # DSW - "dsw-config==4.10.4", + "dsw-config==4.10.5", ] [project.urls] diff --git a/packages/dsw-document-worker/CHANGELOG.md b/packages/dsw-document-worker/CHANGELOG.md index 47e63a78..b5874047 100644 --- a/packages/dsw-document-worker/CHANGELOG.md +++ b/packages/dsw-document-worker/CHANGELOG.md @@ -8,6 +8,12 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [4.10.5] + +### Fixed + +- Fixed command queue job timeout (moved support from workers to queue, reliable timeout approach) + ## [4.10.4] Released for version consistency with other DSW tools. @@ -336,3 +342,4 @@ Released for version consistency with other DSW tools. [4.10.2]: /../../tree/v4.10.2 [4.10.3]: /../../tree/v4.10.3 [4.10.4]: /../../tree/v4.10.4 +[4.10.5]: /../../tree/v4.10.5 diff --git a/packages/dsw-document-worker/dsw/document_worker/config.py b/packages/dsw-document-worker/dsw/document_worker/config.py index 1fcc3bb0..d124c794 100644 --- a/packages/dsw-document-worker/dsw/document_worker/config.py +++ b/packages/dsw-document-worker/dsw/document_worker/config.py @@ -1,10 +1,10 @@ import shlex -from typing import List, Optional, Type +from typing import List, Optional from dsw.config import DSWConfigParser -from dsw.config.keys import ConfigKey, ConfigKeys, ConfigKeysContainer,\ +from dsw.config.keys import ConfigKey, ConfigKeys, ConfigKeysContainer, \ cast_str, cast_optional_int -from dsw.config.model import GeneralConfig, SentryConfig, DatabaseConfig,\ +from dsw.config.model import GeneralConfig, SentryConfig, DatabaseConfig, \ S3Config, LoggingConfig, CloudConfig, ConfigModel from .consts import DocumentNamingStrategy @@ -175,7 +175,7 @@ class DocumentWorkerConfigParser(DSWConfigParser): def __init__(self): super().__init__(keys=DocWorkerConfigKeys) - self.keys = DocWorkerConfigKeys # type: Type[DocWorkerConfigKeys] + self.keys = DocWorkerConfigKeys # type: type[DocWorkerConfigKeys] @property def documents(self) -> DocumentsConfig: diff --git a/packages/dsw-document-worker/dsw/document_worker/consts.py b/packages/dsw-document-worker/dsw/document_worker/consts.py index 918ed8e6..4b0f6018 100644 --- a/packages/dsw-document-worker/dsw/document_worker/consts.py +++ b/packages/dsw-document-worker/dsw/document_worker/consts.py @@ -6,7 +6,7 @@ EXIT_SUCCESS = 0 NULL_UUID = '00000000-0000-0000-0000-000000000000' PROG_NAME = 'docworker' -VERSION = '4.10.4' +VERSION = '4.10.5' VAR_APP_CONFIG_PATH = 'APPLICATION_CONFIG_PATH' VAR_WORKDIR_PATH = 'WORKDIR_PATH' diff --git a/packages/dsw-document-worker/dsw/document_worker/limits.py b/packages/dsw-document-worker/dsw/document_worker/limits.py index e4324a38..797a3ee5 100644 --- a/packages/dsw-document-worker/dsw/document_worker/limits.py +++ b/packages/dsw-document-worker/dsw/document_worker/limits.py @@ -30,14 +30,3 @@ def check_size_usage(job_id: str, doc_size: int, f'required {byte_size_format(doc_size)} but ' f'only {byte_size_format(remains)} remains.' ) - - @staticmethod - def timeout_exceeded(job_id: str): - job_timeout = Context.get().app.cfg.experimental.job_timeout - if job_timeout is None: - return - raise JobException( - job_id=job_id, - msg=f'Document generation exceeded time limit ' - f'({job_timeout} seconds).' - ) diff --git a/packages/dsw-document-worker/dsw/document_worker/utils.py b/packages/dsw-document-worker/dsw/document_worker/utils.py index 78ec211b..4498d57d 100644 --- a/packages/dsw-document-worker/dsw/document_worker/utils.py +++ b/packages/dsw-document-worker/dsw/document_worker/utils.py @@ -1,8 +1,3 @@ -import contextlib -import signal - -from typing import Optional - _BYTE_SIZES = ["B", "kB", "MB", "GB", "TB", "PB", "EB", "ZB"] @@ -16,27 +11,3 @@ def byte_size_format(num: float): return f'{_round_size(num)} {unit}' num /= 1000.0 return f'{_round_size(num)} YB' - - -class JobTimeoutError(TimeoutError): - pass - - -def _raise_timeout(signum, frame): - raise JobTimeoutError - - -@contextlib.contextmanager -def timeout(t: Optional[int]): - if t is not None: - signal.signal(signal.SIGALRM, _raise_timeout) - signal.alarm(t) - reached_timeout = False - try: - yield - except JobTimeoutError: - reached_timeout = True - finally: - signal.signal(signal.SIGALRM, signal.SIG_IGN) - if reached_timeout: - raise TimeoutError diff --git a/packages/dsw-document-worker/dsw/document_worker/worker.py b/packages/dsw-document-worker/dsw/document_worker/worker.py index d9294559..da15f31f 100644 --- a/packages/dsw-document-worker/dsw/document_worker/worker.py +++ b/packages/dsw-document-worker/dsw/document_worker/worker.py @@ -22,7 +22,7 @@ from .exceptions import create_job_exception, JobException from .limits import LimitsEnforcer from .templates import TemplateRegistry, Template, Format -from .utils import timeout, JobTimeoutError, byte_size_format +from .utils import byte_size_format LOG = logging.getLogger(__name__) @@ -34,8 +34,6 @@ def decorator(func): def handled_step(job, *args, **kwargs): try: return func(job, *args, **kwargs) - except JobTimeoutError as e: - raise e # re-raise (need to be cached by context manager) except Exception as e: LOG.debug('Handling exception', exc_info=True) raise create_job_exception( @@ -234,15 +232,11 @@ def try_set_job_state(self, state: str, message: str) -> bool: def _run(self): self.get_document() - try: - with timeout(Context.get().app.cfg.experimental.job_timeout): - self.prepare_template() - self.build_document() - self.store_document() - except TimeoutError: - LimitsEnforcer.timeout_exceeded( - job_id=self.doc_uuid, - ) + + self.prepare_template() + self.build_document() + self.store_document() + self.finalize() def _sentry_job_exception(self) -> bool: @@ -287,6 +281,7 @@ class DocumentWorker(CommandWorker): def __init__(self, config: DocumentWorkerConfig, workdir: pathlib.Path): self.config = config self._init_context(workdir=workdir) + self.current_job = None # type: Job | None def _init_context(self, workdir: pathlib.Path): Context.initialize( @@ -332,7 +327,8 @@ def _run_preparation(self) -> CommandQueue: db=Context.get().app.db, channel=CMD_CHANNEL, component=CMD_COMPONENT, - timeout=Context.get().app.cfg.db.queue_timout, + wait_timeout=Context.get().app.cfg.db.queue_timeout, + work_timeout=Context.get().app.cfg.experimental.job_timeout, ) return queue @@ -358,10 +354,23 @@ def work(self, cmd: PersistentCommand): Context.get().update_document_id(document_uuid) SentryReporter.set_context('cmd_uuid', cmd.uuid) LOG.info(f'Running job #{cmd.uuid}') - job = Job(command=cmd, document_uuid=document_uuid) - job.run() + self.current_job = Job(command=cmd, document_uuid=document_uuid) + self.current_job.run() + self.current_job = None Context.get().update_trace_id('-') Context.get().update_document_id('-') - def process_exception(self, e: Exception): + def process_exception(self, e: BaseException): + LOG.info('Failed with exception') SentryReporter.capture_exception(e) + + def process_timeout(self, e: BaseException): + LOG.info('Failed with timeout') + SentryReporter.capture_exception(e) + + if self.current_job is not None: + self.current_job.try_set_job_state( + DocumentState.FAILED, + 'Generating document exceeded the time limit', + ) + self.current_job = None diff --git a/packages/dsw-document-worker/pyproject.toml b/packages/dsw-document-worker/pyproject.toml index 05123f84..2ac4c701 100644 --- a/packages/dsw-document-worker/pyproject.toml +++ b/packages/dsw-document-worker/pyproject.toml @@ -4,7 +4,7 @@ build-backend = 'setuptools.build_meta' [project] name = 'dsw-document-worker' -version = "4.10.4" +version = "4.10.5" description = 'Worker for assembling and transforming documents' readme = 'README.md' keywords = ['documents', 'generation', 'jinja2', 'pandoc', 'worker'] @@ -38,10 +38,10 @@ dependencies = [ 'weasyprint', 'XlsxWriter', # DSW - "dsw-command-queue==4.10.4", - "dsw-config==4.10.4", - "dsw-database==4.10.4", - "dsw-storage==4.10.4", + "dsw-command-queue==4.10.5", + "dsw-config==4.10.5", + "dsw-database==4.10.5", + "dsw-storage==4.10.5", ] [project.urls] diff --git a/packages/dsw-document-worker/requirements.txt b/packages/dsw-document-worker/requirements.txt index fd7b6983..493d9e06 100644 --- a/packages/dsw-document-worker/requirements.txt +++ b/packages/dsw-document-worker/requirements.txt @@ -5,6 +5,7 @@ charset-normalizer==3.3.2 click==8.1.7 cssselect2==0.7.0 fonttools==4.53.1 +func_timeout==4.3.5 html5lib==1.1 idna==3.8 isodate==0.6.1 diff --git a/packages/dsw-mailer/CHANGELOG.md b/packages/dsw-mailer/CHANGELOG.md index ca91252b..43a52aeb 100644 --- a/packages/dsw-mailer/CHANGELOG.md +++ b/packages/dsw-mailer/CHANGELOG.md @@ -8,6 +8,12 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [4.10.5] + +### Fixed + +- Fixed command queue job timeout (moved support from workers to queue, reliable timeout approach) + ## [4.10.4] ### Fixed @@ -292,3 +298,4 @@ Released for version consistency with other DSW tools. [4.10.2]: /../../tree/v4.10.2 [4.10.3]: /../../tree/v4.10.3 [4.10.4]: /../../tree/v4.10.4 +[4.10.5]: /../../tree/v4.10.5 diff --git a/packages/dsw-mailer/dsw/mailer/config.py b/packages/dsw-mailer/dsw/mailer/config.py index d0575ce0..50045690 100644 --- a/packages/dsw-mailer/dsw/mailer/config.py +++ b/packages/dsw-mailer/dsw/mailer/config.py @@ -9,7 +9,20 @@ DatabaseConfig, LoggingConfig, ConfigModel, AWSConfig from dsw.database.model import DBInstanceConfigMail -from typing import Type + +class _ExperimentalKeys(ConfigKeysContainer): + job_timeout = ConfigKey( + yaml_path=['experimental', 'jobTimeout'], + var_names=['EXPERIMENTAL_JOB_TIMEOUT'], + default=None, + cast=cast_optional_int, + ) + + +class ExperimentalConfig(ConfigModel): + + def __init__(self, job_timeout: int | None): + self.job_timeout = job_timeout class _MailKeys(ConfigKeysContainer): @@ -166,6 +179,7 @@ class MailerConfigKeys(ConfigKeys): mail_legacy_smtp = _MailLegacySMTPKeys mail_smtp = _MailSMTPKeys mail_amazon_ses = _MailAmazonSESKeys + experimental = _ExperimentalKeys class SMTPSecurityMode(enum.Enum): @@ -310,13 +324,15 @@ class MailerConfig: def __init__(self, db: DatabaseConfig, log: LoggingConfig, mail: MailConfig, sentry: SentryConfig, - general: GeneralConfig, aws: AWSConfig): + general: GeneralConfig, aws: AWSConfig, + experimental: ExperimentalConfig): self.db = db self.log = log self.mail = mail self.sentry = sentry self.general = general self.aws = aws + self.experimental = experimental # Use AWS credentials for Amazon SES if not provided self.mail.update_aws(aws) @@ -329,6 +345,7 @@ def __str__(self): f'{self.mail}' \ f'{self.sentry}' \ f'{self.general}' \ + f'{self.experimental}' \ f'====================\n' @@ -336,7 +353,7 @@ class MailerConfigParser(DSWConfigParser): def __init__(self): super().__init__(keys=MailerConfigKeys) - self.keys = MailerConfigKeys # type: Type[MailerConfigKeys] + self.keys = MailerConfigKeys # type: type[MailerConfigKeys] @property def mail(self): @@ -379,6 +396,12 @@ def mail(self): dkim_privkey_file=self.get(self.keys.mail.dkim_privkey_file), ) + @property + def experimental(self) -> ExperimentalConfig: + return ExperimentalConfig( + job_timeout=self.get(self.keys.experimental.job_timeout), + ) + @property def config(self) -> MailerConfig: cfg = MailerConfig( @@ -388,6 +411,7 @@ def config(self) -> MailerConfig: sentry=self.sentry, general=self.general, aws=self.aws, + experimental=self.experimental, ) cfg.mail.load_dkim_privkey() return cfg diff --git a/packages/dsw-mailer/dsw/mailer/consts.py b/packages/dsw-mailer/dsw/mailer/consts.py index fb21a013..6a0a5544 100644 --- a/packages/dsw-mailer/dsw/mailer/consts.py +++ b/packages/dsw-mailer/dsw/mailer/consts.py @@ -5,7 +5,7 @@ DEFAULT_ENCODING = 'utf-8' NULL_UUID = '00000000-0000-0000-0000-000000000000' PROG_NAME = 'dsw-mailer' -VERSION = '4.10.4' +VERSION = '4.10.5' VAR_APP_CONFIG_PATH = 'APPLICATION_CONFIG_PATH' VAR_WORKDIR_PATH = 'WORKDIR_PATH' diff --git a/packages/dsw-mailer/dsw/mailer/mailer.py b/packages/dsw-mailer/dsw/mailer/mailer.py index db43ae30..ca8b35fe 100644 --- a/packages/dsw-mailer/dsw/mailer/mailer.py +++ b/packages/dsw-mailer/dsw/mailer/mailer.py @@ -74,7 +74,8 @@ def _run_preparation(self) -> CommandQueue: db=Context.get().app.db, channel=CMD_CHANNEL, component=CMD_COMPONENT, - timeout=Context.get().app.cfg.db.queue_timout, + wait_timeout=Context.get().app.cfg.db.queue_timeout, + work_timeout=Context.get().app.cfg.experimental.job_timeout, ) return queue @@ -121,7 +122,11 @@ def work(self, cmd: PersistentCommand): SentryReporter.set_context('cmd_uuid', '-') Context.get().update_trace_id('-') - def process_exception(self, e: Exception): + def process_timeout(self, e: BaseException): + LOG.info('Failed with timeout') + SentryReporter.capture_exception(e) + + def process_exception(self, e: BaseException): LOG.info('Failed with unexpected error', exc_info=e) SentryReporter.capture_exception(e) diff --git a/packages/dsw-mailer/pyproject.toml b/packages/dsw-mailer/pyproject.toml index b59c78df..a10f680f 100644 --- a/packages/dsw-mailer/pyproject.toml +++ b/packages/dsw-mailer/pyproject.toml @@ -4,7 +4,7 @@ build-backend = 'setuptools.build_meta' [project] name = 'dsw-mailer' -version = "4.10.4" +version = "4.10.5" description = 'Worker for sending email notifications' readme = 'README.md' keywords = ['email', 'jinja2', 'notification', 'template'] @@ -33,9 +33,9 @@ dependencies = [ 'sentry-sdk', 'tenacity', # DSW - "dsw-command-queue==4.10.4", - "dsw-config==4.10.4", - "dsw-database==4.10.4", + "dsw-command-queue==4.10.5", + "dsw-config==4.10.5", + "dsw-database==4.10.5", ] [project.urls] diff --git a/packages/dsw-mailer/requirements.txt b/packages/dsw-mailer/requirements.txt index 294bcf1b..60d3031d 100644 --- a/packages/dsw-mailer/requirements.txt +++ b/packages/dsw-mailer/requirements.txt @@ -4,6 +4,7 @@ certifi==2024.7.4 click==8.1.7 dkimpy==1.1.8 dnspython==2.6.1 +func_timeout==4.3.5 Jinja2==3.1.4 Markdown==3.7 jmespath==1.0.1 diff --git a/packages/dsw-models/CHANGELOG.md b/packages/dsw-models/CHANGELOG.md index 105f5b13..326aa65b 100644 --- a/packages/dsw-models/CHANGELOG.md +++ b/packages/dsw-models/CHANGELOG.md @@ -8,6 +8,10 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [4.10.5] + +Released for version consistency with other DSW tools. + ## [4.10.4] Released for version consistency with other DSW tools. @@ -185,3 +189,4 @@ Released for version consistency with other DSW tools. [4.10.2]: /../../tree/v4.10.2 [4.10.3]: /../../tree/v4.10.3 [4.10.4]: /../../tree/v4.10.4 +[4.10.5]: /../../tree/v4.10.5 diff --git a/packages/dsw-models/pyproject.toml b/packages/dsw-models/pyproject.toml index f9296112..6a9d8726 100644 --- a/packages/dsw-models/pyproject.toml +++ b/packages/dsw-models/pyproject.toml @@ -4,7 +4,7 @@ build-backend = 'setuptools.build_meta' [project] name = 'dsw-models' -version = "4.10.4" +version = "4.10.5" description = 'Library with DSW models and basic IO operations' readme = 'README.md' keywords = ['dsw', 'config', 'yaml', 'parser'] diff --git a/packages/dsw-storage/CHANGELOG.md b/packages/dsw-storage/CHANGELOG.md index e0844af6..53a21813 100644 --- a/packages/dsw-storage/CHANGELOG.md +++ b/packages/dsw-storage/CHANGELOG.md @@ -8,6 +8,10 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [4.10.5] + +Released for version consistency with other DSW tools. + ## [4.10.4] Released for version consistency with other DSW tools. @@ -252,3 +256,4 @@ Released for version consistency with other DSW tools. [4.10.2]: /../../tree/v4.10.2 [4.10.3]: /../../tree/v4.10.3 [4.10.4]: /../../tree/v4.10.4 +[4.10.5]: /../../tree/v4.10.5 diff --git a/packages/dsw-storage/pyproject.toml b/packages/dsw-storage/pyproject.toml index a4b7e9fd..95432cac 100644 --- a/packages/dsw-storage/pyproject.toml +++ b/packages/dsw-storage/pyproject.toml @@ -4,7 +4,7 @@ build-backend = 'setuptools.build_meta' [project] name = 'dsw-storage' -version = "4.10.4" +version = "4.10.5" description = 'Library for managing DSW S3 storage' readme = 'README.md' keywords = ['dsw', 's3', 'bucket', 'storage'] @@ -26,7 +26,7 @@ dependencies = [ 'minio', 'tenacity', # DSW - "dsw-config==4.10.4", + "dsw-config==4.10.5", ] [project.urls] diff --git a/packages/dsw-tdk/CHANGELOG.md b/packages/dsw-tdk/CHANGELOG.md index 9d96559b..b181d978 100644 --- a/packages/dsw-tdk/CHANGELOG.md +++ b/packages/dsw-tdk/CHANGELOG.md @@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [4.10.5] + +Released for version consistency with other DSW tools. + ## [4.10.4] Released for version consistency with other DSW tools. @@ -480,3 +484,4 @@ Initial DSW Template Development Kit (versioned as part of the [DSW platform](ht [4.10.2]: /../../tree/v4.10.2 [4.10.3]: /../../tree/v4.10.3 [4.10.4]: /../../tree/v4.10.4 +[4.10.5]: /../../tree/v4.10.5 diff --git a/packages/dsw-tdk/dsw/tdk/consts.py b/packages/dsw-tdk/dsw/tdk/consts.py index 04f871b2..c914b4ab 100644 --- a/packages/dsw-tdk/dsw/tdk/consts.py +++ b/packages/dsw-tdk/dsw/tdk/consts.py @@ -3,7 +3,7 @@ import re APP = 'dsw-tdk' -VERSION = '4.10.4' +VERSION = '4.10.5' METAMODEL_VERSION = 14 REGEX_SEMVER = re.compile(r'^[0-9]+\.[0-9]+\.[0-9]+$') diff --git a/packages/dsw-tdk/pyproject.toml b/packages/dsw-tdk/pyproject.toml index 89dec1cd..1faa3917 100644 --- a/packages/dsw-tdk/pyproject.toml +++ b/packages/dsw-tdk/pyproject.toml @@ -4,7 +4,7 @@ build-backend = 'setuptools.build_meta' [project] name = 'dsw-tdk' -version = "4.10.4" +version = "4.10.5" description = 'Data Stewardship Wizard Template Development Toolkit' readme = 'README.md' keywords = ['documents', 'dsw', 'jinja2', 'template', 'toolkit']