From aeacc89f553d7fc3e6faea1dfea9516d242b8473 Mon Sep 17 00:00:00 2001 From: Joseph Areeda Date: Wed, 25 Sep 2024 13:40:33 -0700 Subject: [PATCH] Fix Max lookback issue (#178) * Do not cross metric day boundaries. * Merge day boundary (#146) * Address issue #126. Allow pyomicron to run from a frame cache without accessing dqsegdb. Add documentation for this * Do not merge files if they overlap "metric days" * Do not cross metric day boundaries. Co-authored-by: Joseph Areeda * Check point merge (#147) * Do not cross metric day boundaries. * add log file arg delete empty directories when done * Tweak remove empty dir removal * Tweak remove empty dir removal again * Merge day boundary (#146) * Address issue #126. Allow pyomicron to run from a frame cache without accessing dqsegdb. Add documentation for this * Do not merge files if they overlap "metric days" * Do not cross metric day boundaries. Co-authored-by: Joseph Areeda * rebase agaist last approved PR * rebase against last approved PR * rebase against last approved PR again, fix flake8 * Fix a bug in remove empty directories. Co-authored-by: Joseph Areeda * Merge day boundary (#146) * Address issue #126. Allow pyomicron to run from a frame cache without accessing dqsegdb. Add documentation for this * Do not merge files if they overlap "metric days" * Do not cross metric day boundaries. Co-authored-by: Joseph Areeda * minor doc changes * Fix a bug where an xml.gz file could get compressed again in merge-with-gaps * Fix a double gzip of ligolw files (#151) * Do not cross metric day boundaries. * Merge day boundary (#146) * Address issue #126. Allow pyomicron to run from a frame cache without accessing dqsegdb. Add documentation for this * Do not merge files if they overlap "metric days" * Do not cross metric day boundaries. Co-authored-by: Joseph Areeda * Check point merge (#147) * Do not cross metric day boundaries. * add log file arg delete empty directories when done * Tweak remove empty dir removal * Tweak remove empty dir removal again * Merge day boundary (#146) * Address issue #126. Allow pyomicron to run from a frame cache without accessing dqsegdb. Add documentation for this * Do not merge files if they overlap "metric days" * Do not cross metric day boundaries. Co-authored-by: Joseph Areeda * rebase agaist last approved PR * rebase against last approved PR * rebase against last approved PR again, fix flake8 * Fix a bug in remove empty directories. Co-authored-by: Joseph Areeda * Merge day boundary (#146) * Address issue #126. Allow pyomicron to run from a frame cache without accessing dqsegdb. Add documentation for this * Do not merge files if they overlap "metric days" * Do not cross metric day boundaries. Co-authored-by: Joseph Areeda * minor doc changes * Fix a bug where an xml.gz file could get compressed again in merge-with-gaps * Implement a periodic vacate to address permanent D-state (uninterupptible wait) causing jobs to fail to complete * Always create a log file. If not specified put one in the output directory * Fix a problem with periodic vacate. * Up the periodic vacate time to 3 hrs * Found a job killing typo * Add time limits to post processing also * Don't save segments.txt file if no sgments founds because we don't know if it's an issue of not finding them or a valid not analyzable state. * disable periodic vacate to demo the problem. * Fix reported version in some utilities. Only update segments.txt if omicron is actually run. * Clarify relative imports. and add details to a few log messages * Resolve flake8 issues --------- Co-authored-by: Joseph Areeda * Resolve flake8 issues * Update log format to use human readble date/time instead of gps tweak logging to better underst guardian channel usage- * remove old setup.cfg * Work vonpytest failures. The remaining errors are the result of omicron segfaults if environment variable not set * missing blank line, from flake8 * Fix a problem wit hmax-online-lookback not working properly in all pathes. Add some uman readable date/times to gps messages * Fix logging problems from different gps time objects * Better logging why online effort did not run * Up default lookback window to 40 min * Up default maximum lookback window to 60 min. Better logging of why we did not run. * Fix flake8 and more logging updates * Fix flake8 and more logging updates * More logging updates * More logging updates, paths through could cause error * Trap and print errors from main() * fix dag submission command * add smart postscript to allow retries befor ignoring errors * tst version of scitokens and smart post script * tst version of scitokens and smart post script * add arg to specify auth type (x5099, vault or apissuer) * memopry units in the wrong place * memory units in the wrong place, condor_run * flake8 nit picked * Again try to get periodic_release and periodic_remove correct * Sort console scripts * Typo in periodic_remove * Better error message when programs not available * implement cona run for all jobs in dag * conda run complications with cvmfs * archive.py deals with renamed trigger files * archive.py deals with renamed trigger files take 2 * condor run needed in all scripts. * minor logging changes * working on archive issues * working on archive issues, keep "temporary" files to help debugging * more logging * Default max lookback changed to 30min. more logging tweaks * Add omicron_utils to insta;; requirements * Work on build and test workflow error. * Still working on build and test workflow error. Remove Python 3.9 from workflows * Set loglevel for OmicronConfig to Critical so --version command is clean * Resolve all conversations * try to deal with github error ``` Error: This request has been automatically failed because it uses a deprecated version of `actions/upload-artifact: v2`. Learn more: https://github.blog/changelog/2024-02-13-deprecation-notice-v1-and-v2-of-the-artifact-actions/ ``` * try to deal with github error ``` Error: This request has been automatically failed because it uses a deprecated version of `actions/upload-artifact: v2`. Learn more: https://github.blog/changelog/2024-02-13-deprecation-notice-v1-and-v2-of-the-artifact-actions/ ``` --------- Co-authored-by: Joseph Areeda --- .github/workflows/build.yml | 11 +- omicron/cli/archive.py | 24 ++- omicron/cli/omicron_post_script.py | 137 ++++++++++++ omicron/cli/process.py | 331 +++++++++++++++++++++-------- omicron/condor.py | 3 +- omicron/tests/test_parameters.py | 5 +- omicron/utils.py | 29 +++ setup.cfg | 12 +- 8 files changed, 436 insertions(+), 116 deletions(-) create mode 100644 omicron/cli/omicron_post_script.py diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 54bd5a4..81200d5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -37,7 +37,6 @@ jobs: - macOS - Ubuntu python-version: - - "3.9" - "3.10" - "3.11" runs-on: ${{ matrix.os }}-latest @@ -49,12 +48,12 @@ jobs: steps: - name: Get source code - uses: actions/checkout@v2 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Cache conda packages - uses: actions/cache@v2 + uses: actions/cache@v4 env: # increment to reset cache CACHE_NUMBER: 0 @@ -64,7 +63,7 @@ jobs: restore-keys: ${{ runner.os }}-conda-${{ matrix.python-version }}- - name: Configure conda - uses: conda-incubator/setup-miniconda@v2 + uses: conda-incubator/setup-miniconda@v3 with: activate-environment: test miniforge-variant: Mambaforge @@ -111,14 +110,14 @@ jobs: run: python -m coverage xml - name: Publish coverage to Codecov - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 with: files: coverage.xml flags: Conda,${{ runner.os }},python${{ matrix.python-version }} - name: Upload test results if: always() - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: pytest-conda-${{ matrix.os }}-${{ matrix.python-version }} path: pytest.xml diff --git a/omicron/cli/archive.py b/omicron/cli/archive.py index efda155..14e4938 100644 --- a/omicron/cli/archive.py +++ b/omicron/cli/archive.py @@ -90,7 +90,7 @@ def scandir(otrigdir): def process_dir(dir_path, outdir, logger, keep_files): """ Copy all trigget files to appropriate directory - @param logger: program'sclogger + @param logger: program's logger @param Path dir_path: input directory @param Path outdir: top level output directory eg ${HOME}/triggers @param boolean keep_files: Do not delete files after copying to archive @@ -116,14 +116,17 @@ def process_dir(dir_path, outdir, logger, keep_files): tspan = Segment(strt, strt + dur) otrigdir = outdir / ifo / chan / str(int(strt / 1e5)) + + logger.debug(f'Trigger file:\n' + f' {tfile_path.name}\n' + f' ifo: [{ifo}], chan: [{chan}], strt: {strt}, duration: {dur} ext: [{ext}]\n' + f' outdir: {str(otrigdir.absolute())}') + if str(otrigdir.absolute()) not in dest_segs.keys(): dest_segs[str(otrigdir.absolute())] = scandir(otrigdir) - logger.debug( - f'ifo: [{ifo}], chan: [{chan}], strt: {strt}, ext: [{ext}] -> {str(otrigdir.absolute())}') - if dest_segs[str(otrigdir.absolute())].intersects_segment(tspan): - logger.warn(f'{tfile_path.name} ignored because it would overlap') + logger.warning(f'{tfile_path.name} ignored because it would overlap') else: otrigdir.mkdir(mode=0o755, parents=True, exist_ok=True) shutil.copy(tfile, str(otrigdir.absolute())) @@ -134,11 +137,14 @@ def process_dir(dir_path, outdir, logger, keep_files): def main(): - logging.basicConfig() + # global logger + log_file_format = "%(asctime)s - %(levelname)s - %(funcName)s %(lineno)d: %(message)s" + log_file_date_format = '%m-%d %H:%M:%S' + logging.basicConfig(format=log_file_format, datefmt=log_file_date_format) logger = logging.getLogger(__process_name__) logger.setLevel(logging.DEBUG) - home = os.getenv('HOME') + home = Path.home() outdir_default = os.getenv('OMICRON_HOME', f'{home}/triggers') parser = argparse.ArgumentParser(description=textwrap.dedent(__doc__), formatter_class=argparse.RawDescriptionHelpFormatter, @@ -169,6 +175,10 @@ def main(): else: logger.setLevel(logging.DEBUG) + logger.debug("Command line args:") + for arg in vars(args): + logger.debug(f' {arg} = {str(getattr(args, arg))}') + indir = Path(args.indir) outdir = Path(args.outdir) if not outdir.exists(): diff --git a/omicron/cli/omicron_post_script.py b/omicron/cli/omicron_post_script.py new file mode 100644 index 0000000..94ac75d --- /dev/null +++ b/omicron/cli/omicron_post_script.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# vim: nu:ai:ts=4:sw=4 + +# +# Copyright (C) 2024 Joseph Areeda +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +""" +The situation is that we run DAGs with many omicron jobs, some of which fail for data dependent reasons that +are valid and permanent but others are transient like network issues that could be resolved with a retry. + +This program isun as a post script to allow us to retry the job but return a success code even if it fails +repeatedly so that the DAG completes. +""" +import textwrap +import time + +start_time = time.time() + +import argparse +import logging +from pathlib import Path +import sys +import traceback + +try: + from ._version import __version__ +except ImportError: + __version__ = '0.0.0' + +__author__ = 'joseph areeda' +__email__ = 'joseph.areeda@ligo.org' +__process_name__ = Path(__file__).name + +logger = None + + +def parser_add_args(parser): + """ + Set up command parser + :param argparse.ArgumentParser parser: + :return: None but parser object is updated + """ + parser.add_argument('-v', '--verbose', action='count', default=1, + help='increase verbose output') + parser.add_argument('-V', '--version', action='version', + version=__version__) + parser.add_argument('-q', '--quiet', default=False, action='store_true', + help='show only fatal errors') + parser.add_argument('--return-code', help='Program return code') + parser.add_argument('--max-retry', help='condor max retry value') + parser.add_argument('--retry', help='current try starting at 0') + parser.add_argument('--log', help='Path for a copy of our logger output') + + +def main(): + global logger + + log_file_format = "%(asctime)s - %(levelname)s, %(pathname)s:%(lineno)d: %(message)s" + log_file_date_format = '%m-%d %H:%M:%S' + logging.basicConfig(format=log_file_format, datefmt=log_file_date_format) + logger = logging.getLogger(__process_name__) + logger.setLevel(logging.DEBUG) + + epilog = textwrap.dedent(""" + This progam is designed to be run as a post script in a Condor DAG. For available arguments see: + https://htcondor.readthedocs.io/en/latest/automated-workflows/dagman-scripts.html#special-script-argument-macros + A typical lne in the DAG might look like: + python omicron_post_script.py -vvv --return $(RETURN) --retry $(RETRY) --max-retry $(MAX_RETRIES) --log + + """) + + parser = argparse.ArgumentParser(description=__doc__, prog=__process_name__, epilog=epilog, + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser_add_args(parser) + args = parser.parse_args() + verbosity = 0 if args.quiet else args.verbose + + if verbosity < 1: + logger.setLevel(logging.CRITICAL) + elif verbosity < 2: + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.DEBUG) + + if args.log: + log = Path(args.log) + log.parent.mkdir(0o775, exist_ok=True, parents=True) + file_handler = logging.FileHandler(log, mode='a') + log_formatter = logging.Formatter(log_file_format, datefmt=log_file_date_format) + file_handler.setFormatter(log_formatter) + logger.addHandler(file_handler) + + me = Path(__file__).name + logger.info(f'--------- Running {str(me)}') + # debugging? + logger.debug(f'{__process_name__} version: {__version__} called with arguments:') + for k, v in args.__dict__.items(): + logger.debug(' {} = {}'.format(k, v)) + + ret = int(args.return_code) + retry = int(args.retry) + max_retry = int(args.max_retry) + ret = ret if retry < max_retry or ret == 0 else 0 + logger.info(f'returning {ret}') + return ret + + +if __name__ == "__main__": + try: + ret = main() + except (ValueError, TypeError, OSError, NameError, ArithmeticError, RuntimeError) as ex: + print(ex, file=sys.stderr) + traceback.print_exc(file=sys.stderr) + ret = 21 + + if logger is None: + logging.basicConfig() + logger = logging.getLogger(__process_name__) + logger.setLevel(logging.DEBUG) + # report our run time + logger.info(f'Elapsed time: {time.time() - start_time:.1f}s') + sys.exit(ret) diff --git a/omicron/cli/process.py b/omicron/cli/process.py index 3c186f8..83bcffe 100644 --- a/omicron/cli/process.py +++ b/omicron/cli/process.py @@ -58,7 +58,15 @@ *automatically* submitted to condor for processing. """ +import logging import time +import traceback + +from omicron_utils.conda_fns import get_conda_run +from omicron_utils.omicron_config import OmicronConfig + +from omicron.utils import gps_to_hr, deltat_to_hr + prog_start = time.time() from gwpy.segments import SegmentList, Segment @@ -75,8 +83,6 @@ from subprocess import check_call from tempfile import gettempdir from time import sleep - -import gwpy.time from glue import pipeline from gwpy.io.cache import read_cache @@ -93,7 +99,13 @@ DAG_TAG = "omicron" + logger = log.Logger('omicron-process') +old_level = logger.getEffectiveLevel() +logger.setLevel(logging.CRITICAL) +omicron_config = OmicronConfig(logger=logger) +config = omicron_config.get_config() +logger.setLevel(old_level) def clean_exit(exitcode, tempfiles=None): @@ -168,11 +180,14 @@ def clean_tempfiles(tempfiles): logger.debug("Deleted path '{}'".format(f)) -def create_parser(): +def create_parser(config): """Create a command-line parser for this entry point + @param configparser.ConfigParser config: config for our pipelines not Omicron itself + """ - epilog = """This source code for this project is available here: + epilog = """ +This source code for this project is available here: https://github.com/gwpy/pyomicron/ @@ -181,7 +196,8 @@ def create_parser(): Documentation is available here: -https://pyomicron.readthedocs.io/en/latest/""" +https://pyomicron.readthedocs.io/en/latest/ +""" parser = argparse.ArgumentParser( description=__doc__, @@ -198,9 +214,18 @@ def create_parser(): action='version', version=__version__, ) + ifo = const.IFO + if ifo: + grp_file = Path.home() / 'omicron' / 'online' / f'{ifo.lower()}-groups.txt' + with grp_file.open('r') as grp_fp: + all_groups = re.sub('\n(? max_lookback: start = end - max_lookback + else: + logger.debug(f"Online segment record recovered: {gps_to_hr(start)} - {gps_to_hr(end)}") + else: start, end = args.gps start = int(start) @@ -764,11 +833,8 @@ def main(args=None): dataend = end + padding dataduration = dataend - datastart - start_dt = gwpy.time.tconvert(datastart).strftime('%x %X') - end_dt = gwpy.time.tconvert(dataend).strftime('%x %X') - logger.info(f'Processing segment determined as: {datastart:d} - {dataend:d} : {start_dt} - {end_dt}') - dur_str = '{} {}'.format(int(dataduration / 86400) if dataduration > 86400 else '', - time.strftime('%H:%M:%S', time.gmtime(dataduration))) + logger.info(f'Processing segment determined as: {gps_to_hr(datastart)} - {gps_to_hr(dataend)}') + dur_str = deltat_to_hr(dataduration) logger.info(f"Duration = {dataduration} - {dur_str}") span = (start, end) @@ -780,27 +846,28 @@ def main(args=None): # validate span is long enough if dataduration < minduration and online: - logger.info("Segment is too short (%d < %d), please try again later" - % (duration, minduration)) + if dataduration < 0: + logger.info(f'Frame data is not available for interval {gps_to_hr(start)} to {gps_to_hr(end)}') + else: + logger.info(f"Segment is too short ({duration} < {minduration}), please try again later") clean_dirs(run_dir_list) clean_exit(0, tempfiles) elif dataduration < minduration: - raise ValueError( - "Segment [%d, %d) is too short (%d < %d), please " - "extend the segment, or shorten the timing parameters." - % (start, end, duration, chunkdur - padding * 2), - ) + ermsg = f'Segment [{start}, {end}) is too short ({duration} < {minduration}), ' \ + f'please; extend the segment, or shorten the timing parameters.' + logger.critical(ermsg) + raise ValueError(ermsg) # -- find run segments # get segments from state vector - if (online and statechannel) or (statechannel and not stateflag) or ( - statechannel and args.no_segdb): + if (online and statechannel) or (statechannel and not stateflag) or (statechannel and args.no_segdb): logger.info(f'Finding segments for relevant state... from:{datastart} length: {dataduration}s') logger.debug(f'For segment finding: online: {online}, statechannel: {statechannel}, ' f'stateflag: {stateflag} args.no_segdb: {args.no_segdb}') seg_qry_strt = time.time() if statebits == "guardian": # use guardian - logger.debug(f'Using guardian for {statechannel}: {datastart}-{dataend} ') + logger.debug(f'Using guardian for {statechannel}: {gps_to_hr(datastart)}-{gps_to_hr(dataend)}:' + f' {(dataend - datastart)} seconds') segs = segments.get_guardian_segments( statechannel, stateft, @@ -822,7 +889,8 @@ def main(args=None): # get segments from segment database elif stateflag: - logger.info(f'Querying segments for relevant state: {stateflag} from:{datastart} length: {dataduration}s') + logger.info(f'Querying segment database for relevant state: {stateflag} from:{datastart} length:' + f' {dataduration}s {deltat_to_hr(dataduration)}') seg_qry_strt = time.time() segs = segments.query_state_segments(stateflag, datastart, dataend, pad=statepad) @@ -830,6 +898,7 @@ def main(args=None): # Get segments from frame cache elif args.cache_file: + logger.info('Get segments from cache') cache = read_cache(str(args.cache_file)) cache_segs = segments.cache_segments(cache) srch_span = SegmentList([Segment(datastart, dataend)]) @@ -837,13 +906,16 @@ def main(args=None): # get segments from frame availability else: + logger.info('Get segments from frame availability') + fa_qry_strt = time.time() segs = segments.get_frame_segments(ifo, frametype, datastart, dataend) + logger.info(f'Frame availability query took {time.time() - fa_qry_strt}s') # print frame segments recovered if len(segs): logger.info("State/frame segments recovered as") for seg in segs: - logger.info(" %d %d [%d]" % (seg[0], seg[1], abs(seg))) + logger.info(f" {gps_to_hr(seg[0])} {gps_to_hr(seg[1])} [{abs(seg)}]") logger.info("Duration = %d seconds" % abs(segs)) # if running online, we want to avoid processing up to the extent of @@ -891,7 +963,7 @@ def main(args=None): step = chunkdur - overlap segs[-1] = type(segs[-1])(lastseg[0], t) dataend = segs[-1][1] - logger.info("This analysis will now run to %d" % dataend) + logger.info(f"This analysis will now run to {gps_to_hr(dataend)}") # recalculate the processing segment dataspan = type(segs)([segments.Segment(datastart, dataend)]) @@ -977,17 +1049,21 @@ def main(args=None): trigsegs = type(segs)(type(s)(*s) for s in segs).contract(padding) # display segments - logger.info("Final data segments selected as") - for seg in segs: - logger.info(f" {seg[0]:d} {seg[1]:d} {abs(seg):d}") - logger.info(f"Duration = {abs(segs):d} seconds") + if len(segs) == 0: + logger.info('No analyzable segments found. Exiting.') + exit(0) + else: + logger.info("Final data segments selected as") + for seg in segs: + logger.info(f" {gps_to_hr(seg[0])} {gps_to_hr(seg[1])} {abs(seg)}") + logger.info(f"Duration = {abs(segs)} seconds") span = type(trigsegs)([trigsegs.extent()]) logger.info("This will output triggers for") for seg in trigsegs: - logger.info(f" {seg[0]:d} {seg[1]:d} {abs(seg):d}") - logger.info(f"Duration = {abs(trigsegs):d} seconds") + logger.info(f" {gps_to_hr(seg[0])} {gps_to_hr(seg[1])} {abs(seg)}") + logger.info(f"Duration = {abs(trigsegs)} seconds") # -- config omicron config directory -------------------------------------- @@ -1011,55 +1087,92 @@ def main(args=None): dag.set_dag_file(str(dagpath.with_suffix(""))) # set up condor commands for all jobs - condorcmds = { + base_condorcmds = { "accounting_group": args.condor_accounting_group, "accounting_group_user": args.condor_accounting_group_user, "request_disk": args.condor_request_disk, - "request_memory": 1024, # MB + "request_memory": '1024', # units are MB but cannot be specified here } + condor_igwn_auth = { + # scitokens needed for dqsegdb + 'use_oauth_services': 'igwn', + 'igwn_oauth_options_dqsegdb': "--role $ENV('TOKEN_ROLE') --credkey $ENV('TOKEN_CREDKEY')", + 'igwn_oauth_resource_dqsegdb': 'https: // segments.ligo.org', + 'igwn_oauth_permissions_dqsegdb': 'dqsegdb.read', + 'environment': '"BEARER_TOKEN_FILE=$$(_CONDOR_SCRATCH_DIR)/.condor_creds/igwn_dqsegdb.use"' + } + condor_apissuer_auth = { + 'use_oauth_services': 'scitokens', + } + condor_x509_auth = { + 'getenv': 'X509_USER_PROXY, KRB5CNAME' + } + if args.auth_type == 'x509': + condorcmds = dict(base_condorcmds | condor_x509_auth) + elif args.auth_type == 'igwn': + condorcmds = dict(base_condorcmds | condor_igwn_auth) + elif args.auth_type == 'scitokens': + condorcmds = dict(base_condorcmds | condor_apissuer_auth) + else: + condorcmds = base_condorcmds.copy() + logger.warning('We do not know how to authenticate to dqsegdb or cvmfs') + + condorcmds: dict[str, str] + for cmd_ in args.condor_command: key, value = cmd_.split('=', 1) condorcmds[key.rstrip().lower()] = value.strip() + conda_exe, conda_args = get_conda_run(config, args.conda_env, logger) + conda_arg_list = conda_args.split() + conda_run_prefix = conda_exe + ' ' + conda_args # create omicron job ojob = condor.OmicronProcessJob( args.universe, - args.executable, + conda_exe, + tag='omicron', subdir=condir, logdir=logdir, **condorcmds, ) + for job_arg in conda_arg_list: + ojob.add_arg(job_arg) + + ojob.add_arg('omicron') # This allows us to start with a memory request that works maybe 80%, but bumps it if we go over # we also limit individual jobs to a max runtime to cause them to be vacates to deal with NFS hanging reqmem = condorcmds.pop('request_memory', 1024) - ojob.add_condor_cmd('+InitialRequestMemory', f'{reqmem}') + ojob.add_condor_cmd('my.InitialRequestMemory', f'{reqmem}') ojob.add_condor_cmd('request_memory', f'ifthenelse(isUndefined(MemoryUsage), {reqmem}, int(3*MemoryUsage))') - ojob.add_condor_cmd('periodic_release', '(HoldReasonCode =?= 26 || HoldReasonCode =?= 34 ' - '|| HoldReasonCode =?= 46) && (JobStatus == 5)') + ojob.add_condor_cmd('periodic_release', '(HoldReasonCode =?= 26 || HoldReasonCode =?= 34) ' + ' && (JobStatus == 5) && (time() - EnteredCurrentStatus > 10)') + ojob.add_condor_cmd('allowed_job_duration', 3 * 3600) + ojob.add_condor_cmd('periodic_remove', '(JobStatus == 1 && MemoryUsage >= 7000 || (HoldReasonCode =?= 46 ))') ojob.add_condor_cmd('allowed_job_duration', 3 * 3600) - ojob.add_condor_cmd('periodic_remove', '(JobStatus == 1) && MemoryUsage >= 7G') - ojob.add_condor_cmd('+OmicronProcess', f'"{group}"') + ojob.add_condor_cmd('my.OmicronProcess', f'"{group}"') # create post-processing jobs - ppjob = condor.OmicronProcessJob(args.universe, shutil.which('bash'), + ppjob = condor.OmicronProcessJob(args.universe, conda_exe, subdir=condir, logdir=logdir, tag='post-processing', **condorcmds) - ppjob.add_condor_cmd('+OmicronPostProcess', f'"{group}"') + for job_arg in conda_arg_list: + ppjob.add_arg(job_arg) + + ppjob.add_arg(shutil.which('bash')) + ppjob.add_arg('-e') + ppjob.add_condor_cmd('my.OmicronPostProcess', f'"{group}"') ppmem = 1024 - ppjob.add_condor_cmd('+InitialRequestMemory', f'{ppmem}') + ppjob.add_condor_cmd('my.InitialRequestMemory', f'{ppmem}') ppjob.add_condor_cmd('request_memory', - f'ifthenelse(isUndefined(MemoryUsage), {ppmem}, int(3*MemoryUsage))') - ojob.add_condor_cmd('allowed_job_duration', 3 * 3600) - ppjob.add_condor_cmd('periodic_release', - '(HoldReasonCode =?= 26 || HoldReasonCode =?= 34 ' - '|| HoldReasonCode =?= 46) && (JobStatus == 5)') + f'ifthenelse(isUndefined(MemoryUsage), {ppmem}, int(1.5*MemoryUsage))') + ppjob.add_condor_cmd('periodic_release', '(HoldReasonCode =?= 26 || HoldReasonCode =?= 34) ' + '&& (JobStatus == 5) && (time() - EnteredCurrentStatus > 10)') - ppjob.add_condor_cmd('periodic_remove', '(JobStatus == 1) && MemoryUsage >= 7G') + ppjob.add_condor_cmd('periodic_remove', '((JobStatus == 1) && MemoryUsage >= 7000) || (HoldReasonCode =?= 46)') ppjob.add_condor_cmd('environment', '"HDF5_USE_FILE_LOCKING=FALSE"') - ppjob.add_short_opt('e', '') ppnodes = [] prog_path = dict() prog_path['omicron-merge'] = shutil.which('omicron-merge-with-gaps') @@ -1068,11 +1181,12 @@ def main(args=None): prog_path['ligolw_add'] = shutil.which('ligolw_add') prog_path['gzip'] = shutil.which('gzip') prog_path['omicron_archive'] = shutil.which('omicron-archive') + prog_path['clean_logs'] = shutil.which('omicron-clean-logs') goterr = list() for exe in prog_path.keys(): if not prog_path[exe]: - logger.critical(f'required program: {prog_path[exe]} not found') + logger.critical(f'required program: {exe} not found') goterr.append(exe) if goterr: raise ValueError(f'Required programs not found in current environment: {", ".join(goterr)}') @@ -1081,15 +1195,27 @@ def main(args=None): rmfiles = [] if not args.skip_rm: rmjob = condor.OmicronProcessJob( - args.universe, str(condir / "post-process-rm.sh"), + args.universe, conda_exe, subdir=condir, logdir=logdir, tag='post-processing-rm', **condorcmds) rm = shutil.which('rm') + + for job_arg in conda_arg_list: + rmjob.add_arg(job_arg) + rmscript = condir / "post-process-rm.sh" + rmjob.add_arg(str(rmscript)) + rmjob.add_condor_cmd('+OmicronPostProcess', '"%s"' % group) if args.archive: archivejob = condor.OmicronProcessJob( - args.universe, str(condir / "archive.sh"), + args.universe, conda_exe, subdir=condir, logdir=logdir, tag='archive', **condorcmds) + for job_arg in conda_arg_list: + archivejob.add_arg(job_arg) + + archive_script = condir / "archive.sh" + archivejob.add_arg(str(archive_script)) + archivejob.add_condor_cmd('+OmicronPostProcess', '"%s"' % group) archivefiles = {} else: @@ -1126,10 +1252,19 @@ def main(args=None): node.add_var_arg(str(subseg[0])) node.add_var_arg(str(subseg[1])) node.add_file_arg(pf) - # we need to ignore errors in individual nodes - node.set_post_script(shutil.which('bash')) - node.add_post_script_arg('-c') - node.add_post_script_arg('exit 0') + # we need to ignore errors in individual omicron jobs after retrying + node.set_post_script(shutil.which('omicron-post-script')) + # -vvv --return $RETURN --retry $RETRY --max-retry $MAX_RETRIES --log pscript.log + node.add_post_script_arg('-vvv') + node.add_post_script_arg('--return') + node.add_post_script_arg('$RETURN') + node.add_post_script_arg('--retry') + node.add_post_script_arg('$RETRY') + node.add_post_script_arg('--max-retry') + node.add_post_script_arg('$MAX_RETRIES') + node.add_post_script_arg('--log') + post_script_log_file = condir / 'post_script.log' + node.add_post_script_arg(str(post_script_log_file.absolute())) for chan in chanlist: for form, flist in nodefiles[chan].items(): @@ -1151,7 +1286,7 @@ def main(args=None): # post-process (one post-processing job per channel # per data segment) if not args.skip_postprocessing: - script = condir / "post-process-{}-{}-{}.sh".format(i, s, e) + script = condir / f"post-process-{i}-{s}-{e}.sh" ppnode = pipeline.CondorDAGNode(ppjob) ppnode.add_var_arg(str(script)) operations = [] @@ -1174,7 +1309,7 @@ def main(args=None): ppnode.add_input_file(f) no_merge = '--no-merge' if args.skip_root_merge else '' - operations.append(f' {prog_path["omicron-merge"]} {no_merge} ' + operations.append(f' {conda_run_prefix} {prog_path["omicron-merge"]} {no_merge} ' f'--out-dir {mergepath} {rootfiles} ') rmfiles.append(rootfiles) @@ -1186,7 +1321,7 @@ def main(args=None): no_merge = '--no-merge' if args.skip_hdf5_merge else '' operations.append( - f' {prog_path["omicron-merge"]} {no_merge} ' + f' {conda_run_prefix} {prog_path["omicron-merge"]} {no_merge} ' f' --out-dir {mergepath} {hdf5files} ') rmfiles.append(hdf5files) @@ -1199,7 +1334,7 @@ def main(args=None): no_merge = '--no-merge' if args.skip_ligolw_add else '' no_gzip = '--no-gzip' if args.skip_gzip else '' operations.append( - f' {prog_path["omicron-merge"]} {no_merge} {no_gzip} --uint-bug ' + f' {conda_run_prefix} {prog_path["omicron-merge"]} {no_merge} {no_gzip} --uint-bug ' f' --out-dir {mergepath} {xmlfiles} ') rmfiles.append(xmlfiles) @@ -1231,12 +1366,7 @@ def main(args=None): # add header print('#!/bin/bash -e\n#', file=f) print("# omicron-process post-processing", file=f) - print( - '#\n# File created by\n# {}\n#'.format( - ' '.join(sys.argv), - ), - file=f, - ) + print(f'#\n# File created by\n# {sys.argv[0]} version: {__version__}\n#', file=f) print("# Group: %s" % group, file=f) print("# Segment: [%d, %d)" % (s, e), file=f) print("# Channels:\n#", file=f) @@ -1260,12 +1390,16 @@ def main(args=None): acache = {fmt: list() for fmt in fileformats} if newdag: # write shell script to seed archive - with open(archivejob.get_executable(), 'w') as f: + with archive_script.open('w') as f: print('#!/bin/bash -e\n', file=f) print('# Archive all trigger files saved in the merge directory ', file=f) - print(f'{prog_path["omicron_archive"]} --indir {str(mergedir.absolute())} -vv', file=f) + print(f'#\n# File created by this command:\n# {" ".join(sys.argv)}\n#', file=f) + print(f'# Running: {sys.argv[0]} version {__version__}\n', file=f) + + print(f'{conda_exe} {conda_args} ', end=' ', file=f) + print(f'{prog_path["omicron_archive"]} --indir {str(mergedir.absolute())} -vvv', file=f) - os.chmod(archivejob.get_executable(), 0o755) + archive_script.chmod(0o755) # write caches to disk for fmt, fcache in acache.items(): cachefile = cachedir / "omicron-{0}.lcf".format(fmt) @@ -1277,13 +1411,12 @@ def main(args=None): archivenode.set_retry(args.condor_retry) archivenode.set_category('archive') dag.add_node(archivenode) - tempfiles.append(archivejob.get_executable()) + tempfiles.append(archive_script) # add rm job right at the end rmnode = None if not args.skip_rm: rmnode = pipeline.CondorDAGNode(rmjob) - rmscript = rmjob.get_executable() with open(rmscript, 'w') as f: print('#!/bin/bash -e\n#', file=f) print("# omicron-process post-processing-rm", file=f) @@ -1297,7 +1430,7 @@ def main(args=None): for rmset in rmfiles: print('%s -f %s' % (rm, rmset), file=f) if newdag: - os.chmod(rmscript, 0o755) + rmscript.chmod(0o755) tempfiles.append(rmscript) rmnode.set_category('postprocessing') if rmnode: @@ -1441,11 +1574,21 @@ def main(args=None): # clean up temporary files tempfiles.extend(trigdir.glob("ffconvert.*.ffl")) - clean_tempfiles(tempfiles) + if tempfiles is not None: + logger.debug('Temporary files that cn be deleted') + for f in tempfiles: + isd = 'dir:' if Path(f).is_dir() else 'fil:' + logger.debug(f'Delete {isd} {f}') + + # clean_tempfiles(tempfiles) # and exit - logger.info(f"--- Processing complete. Elapsed: {time.time() - prog_start} seconds ----------------") + logger.info(f"--- Processing complete. Elapsed: {(time.time() - prog_start):1f} seconds ----------------") if __name__ == "__main__": - main() + try: + main() + except (ValueError, TypeError, OSError, NameError, ArithmeticError, RuntimeError) as ex: + print(ex, file=sys.stderr) + traceback.print_exc(file=sys.stderr) diff --git a/omicron/condor.py b/omicron/condor.py index 2f506b9..e676e1b 100644 --- a/omicron/condor.py +++ b/omicron/condor.py @@ -424,8 +424,7 @@ def find_dagman_id(group, classad="OmicronDAGMan", user=getuser(), **constraints) clusterid = job['ClusterId'] if get_job_status(job) >= 3: - raise RuntimeError("DAGMan cluster %d found, but in state %r" - % JOB_STATUS[job['JobStatus']]) + raise RuntimeError(f"DAGMan cluster {clusterid} found, but in state {JOB_STATUS[job['JobStatus']]}") return clusterid diff --git a/omicron/tests/test_parameters.py b/omicron/tests/test_parameters.py index f9c9b1c..73d5ef4 100644 --- a/omicron/tests/test_parameters.py +++ b/omicron/tests/test_parameters.py @@ -94,6 +94,8 @@ def test_validate_parameters(pars): def test_from_channel_list_config(): + # I disabled this test because the Omicron program segfaults when + # it is run from pytest cp = ConfigParser() section = 'test' cp.add_section(section) @@ -103,8 +105,7 @@ def test_from_channel_list_config(): with tempfile.NamedTemporaryFile(suffix='.ini', mode='w') as f: cp.write(f) pars = OmicronParameters.from_channel_list_config(cp, section) - assert pars.getlist('DATA', 'CHANNELS') == ['X1:TEST-CHANNEL', - 'X1:TEST-CHANNEL_2'] + assert pars.getlist('DATA', 'CHANNELS') == ['X1:TEST-CHANNEL', 'X1:TEST-CHANNEL_2'] assert tuple(pars.getfloats('PARAMETER', 'FREQUENCYRANGE')) == (10., 100.) diff --git a/omicron/utils.py b/omicron/utils.py index 39562ab..65dd896 100644 --- a/omicron/utils.py +++ b/omicron/utils.py @@ -22,10 +22,12 @@ import os import subprocess import sys +import time from pathlib import Path from shutil import which from tempfile import gettempdir +from gwpy.time import from_gps from packaging.version import Version from . import const @@ -118,3 +120,30 @@ def astropy_config_path(parent, update_environ=True): if update_environ: os.environ["XDG_CONFIG_HOME"] = str(confpath) return confpath + + +def gps_to_hr(gps): + """ + Convert a gps time to a human readable string for our log files + @param LIGOTimeGPS | int | float gps: time to consider + @return str: hr string eg: "1386527433 (12/13/23 18:30:16)" + """ + dt = from_gps(int(gps)) + dt_str = dt.strftime('%x %X') + ret = f'{int(gps)} ({dt_str})' + return ret + + +def deltat_to_hr(dt): + """ + Convert a time in seconds to a human readable string + @param int dt: delta t + @return str: [] HH:MM:SS + """ + ret = f'{dt}' + if dt > 0: + day = f'{int(dt) / 86400}' if dt >= 86400 else '' + time_str = time.strftime('%H:%M:%S', time.gmtime(int(dt))) + ret += f' - {day} {time_str}' + + return ret diff --git a/setup.cfg b/setup.cfg index 016240c..0d1325d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -30,7 +30,6 @@ classifiers = Operating System :: Unix Operating System :: MacOS Programming Language :: Python :: 3 - Programming Language :: Python :: 3.9 Programming Language :: Python :: 3.10 Programming Language :: Python :: 3.11 Topic :: Scientific/Engineering @@ -60,7 +59,7 @@ per-file-ignores = [options] packages = find: -python_requires = >=3.9 +python_requires = >=3.10 install_requires = dqsegdb2 >= 1.2.0 gwdatafind @@ -75,6 +74,7 @@ install_requires = packaging pycondor python-ligo-lw >= 1.4.0 + omicron_utils [options.extras_require] test = @@ -93,13 +93,15 @@ conda = [options.entry_points] console_scripts = + omicron-archive = omicron.cli.archive:main omicron-hdf5-merge = omicron.cli.hdf5_merge:main - omicron-show = omicron.cli.show:main + omicron-merge-with-gaps = omicron.cli.merge_with_gaps:main + omicron-post-script = omicron.cli.omicron_post_script:main omicron-process = omicron.cli.process:main omicron-root-merge = omicron.cli.root_merge:main - omicron-archive = omicron.cli.archive:main + omicron-show = omicron.cli.show:main omicron-status = omicron.cli.status:main - omicron-merge-with-gaps = omicron.cli.merge_with_gaps:main + # -- tools ------------------