Skip to content

Commit

Permalink
feat(cli): add "igneous wait" command
Browse files Browse the repository at this point in the history
This helps with scripting multiple commands together.
  • Loading branch information
william-silversmith committed Dec 18, 2024
1 parent 36ab477 commit c62a96e
Showing 1 changed file with 54 additions and 32 deletions.
86 changes: 54 additions & 32 deletions igneous_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,58 @@ def ccl_auto(
if clean:
igneous.tasks.image.ccl.clean_intermediate_files(src, mip)

def is_empty(tq, sqs_sec_to_wait=120):
start_time = time.time()
last_empty = False

# Offical Amazon docs state that to determine if a queue is empty,
# you have to test whether it's consistently empty for several
# minutes. So we pick 120 seconds to wait somewhat arbitrarily.
# https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/confirm-queue-is-empty.html
def _is_empty():
nonlocal start_time
nonlocal sqs_sec_to_wait
nonlocal last_empty

if tq.path.protocol == "sqs":
if tq.is_empty():
if last_empty:
elapsed_time = time.time() - start_time
if elapsed_time >= sqs_sec_to_wait:
return True
else:
return False
else:
print(f"Queue appearently empty. Waiting {sqs_sec_to_wait} sec. to confirm.")
start_time = time.time()
last_empty = True
return False
else:
last_empty = False
return False
else:
return tq.is_empty()

return _is_empty

@main.command()
@click.argument("queue", type=str)
@click.option('--aws-region', default=SQS_REGION_NAME, help=f"AWS region in which the SQS queue resides.", show_default=True)
@click.option('--rate', default=30.0, help=f"Number of seconds between each poll of the queue.", show_default=True)
@click.pass_context
def wait(ctx, queue, aws_region, rate):
"""Wait for a queue to empty without executing tasks.
This can be useful for filling a queue and then waiting
for a cluster to finish processing before loading the next
set of tasks in a script.
"""
tq = TaskQueue(normalize_path(queue), region_name=aws_region)
empty_fn = is_empty(tq)

while not empty_fn():
time.sleep(rate)

@main.command()
@click.argument("queue", type=str)
@click.option('--aws-region', default=SQS_REGION_NAME, help=f"AWS region in which the SQS queue resides.", show_default=True)
Expand Down Expand Up @@ -843,40 +895,10 @@ def execute_helper(
):
tq = TaskQueue(normalize_path(queue), region_name=aws_region)

sqs_sec_to_wait = 120
start_time = time.time()
last_empty = False

# Offical Amazon docs state that to determine if a queue is empty,
# you have to test whether it's consistently empty for several
# minutes. So we pick 120 seconds to wait somewhat arbitrarily.
# https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/confirm-queue-is-empty.html
def is_empty():
nonlocal start_time
nonlocal sqs_sec_to_wait
nonlocal last_empty

if tq.path.protocol == "sqs":
if tq.is_empty():
if last_empty:
elapsed_time = time.time() - start_time
if elapsed_time >= sqs_sec_to_wait:
return True
else:
return False
else:
print(f"Queue appearently empty. Waiting {sqs_sec_to_wait} sec. to confirm.")
start_time = time.time()
last_empty = True
return False
else:
last_empty = False
return False
else:
return tq.is_empty()
empty_fn = is_empty(tq)

def stop_after_elapsed_time(tries, executed, elapsed_time):
if exit_on_empty and is_empty():
if exit_on_empty and empty_fn():
return True

if num_tasks >= 0 and executed >= num_tasks:
Expand Down

0 comments on commit c62a96e

Please sign in to comment.