Skip to content

Commit

Permalink
Talk to ingest safely (#154)
Browse files Browse the repository at this point in the history
* Make submission code include auth headers in PUT requests.

* Pass required runtime-env and path to service account key params to the submission tasks from the top level workflow.

* Prepare for the next release.

* Pre-release.
  • Loading branch information
rexwangcc authored Jul 3, 2019
1 parent a73acd9 commit 8695d76
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 9 deletions.
12 changes: 9 additions & 3 deletions adapter_pipelines/submit.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ task create_submission {
String pipeline_tools_version
Boolean add_md5s
String runtime_environment
File service_account_key_path = "gs://broad-dsde-mint-${runtime_environment}-credentials/caas_key.json"
File service_account_key_path
command <<<
export RECORD_HTTP_REQUESTS="${record_http}"
Expand Down Expand Up @@ -205,6 +205,7 @@ task confirm_submission {
Int? individual_request_timeout
Boolean record_http
String pipeline_tools_version
File service_account_key_path

command <<<
set -e
Expand Down Expand Up @@ -261,6 +262,8 @@ workflow submit {
String pipeline_version
# Disk space to allocate for stage_files task
Int disk_space
# Service account key for generating JWT for submission to Ingest
File service_account_key_path = "gs://broad-dsde-mint-${runtime_environment}-credentials/caas_key.json"

call get_metadata {
input:
Expand Down Expand Up @@ -300,7 +303,8 @@ workflow submit {
record_http = record_http,
pipeline_tools_version = pipeline_tools_version,
add_md5s = add_md5s,
runtime_environment = runtime_environment
runtime_environment = runtime_environment,
service_account_key_path = service_account_key_path
}

call stage_files {
Expand All @@ -326,7 +330,9 @@ workflow submit {
retry_multiplier = retry_multiplier,
retry_max_interval = retry_max_interval,
record_http = record_http,
pipeline_tools_version = pipeline_tools_version
pipeline_tools_version = pipeline_tools_version,
runtime_environment = runtime_environment,
service_account_key_path = service_account_key_path
}

output {
Expand Down
39 changes: 34 additions & 5 deletions pipeline_tools/shared/submission/confirm_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from tenacity import retry_if_result, RetryError
from datetime import datetime
from pipeline_tools.shared.http_requests import HttpRequests
from pipeline_tools.shared import auth_utils


def wait_for_valid_status(envelope_url, http_requests):
Expand Down Expand Up @@ -41,21 +42,30 @@ def status_is_invalid(response):
return True


def confirm(envelope_url, http_requests):
def confirm(envelope_url, http_requests, runtime_environment, service_account_key_path):
"""Confirms the submission.
Args:
envelope_url (str): the url for the envelope
http_requests (HttpRequests): HttpRequests object
envelope_url (str): the url for the envelope.
http_requests (HttpRequests): HttpRequests object.
runtime_environment (str): Environment where the pipeline is running ('dev', 'test', 'staging' or 'prod').
service_account_key_path (str): Path to the JSON service account key for generating a JWT.
Returns:
str: The text of the response
Raises:
requests.HTTPError: if the response status indicates an error
"""
print('Making auth headers')
dcp_auth_client = auth_utils.DCPAuthClient(
service_account_key_path, runtime_environment
)
auth_headers = dcp_auth_client.get_auth_header()

print('Confirming submission')
headers = {'Content-type': 'application/json'}
headers.update(auth_headers)
response = http_requests.put(
'{}/submissionEvent'.format(envelope_url), headers=headers
)
Expand All @@ -66,7 +76,21 @@ def confirm(envelope_url, http_requests):

def main():
parser = argparse.ArgumentParser()
parser.add_argument('--envelope_url', required=True)
parser.add_argument(
'--envelope_url',
required=True,
help='The url of the submission envelope in Ingest service.',
)
parser.add_argument(
'--runtime_environment',
required=True,
help='Environment where the pipeline is running ("dev", "test", "staging" or "prod").',
)
parser.add_argument(
'--service_account_key_path',
required=True,
help='Path to the JSON service account key for generating a JWT.',
)
args = parser.parse_args()
http_requests = HttpRequests()
try:
Expand All @@ -75,7 +99,12 @@ def main():
message = 'Timed out while waiting for Valid status.'
raise ValueError(message)

confirm(args.envelope_url, http_requests)
confirm(
args.envelope_url,
http_requests,
args.runtime_environment,
args.service_account_key_path,
)


if __name__ == '__main__':
Expand Down
9 changes: 8 additions & 1 deletion pipeline_tools/shared/submission/create_envelope.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def build_envelope(
)
)
link_analysis_protocol_to_analysis_process(
auth_headers=auth_headers,
link_url=link_url,
analysis_protocol_url=analysis_protocol_entity_url,
http_requests=http_requests,
Expand Down Expand Up @@ -387,22 +388,28 @@ def add_file_reference(file_ref, file_refs_url, auth_headers, http_requests):


def link_analysis_protocol_to_analysis_process(
link_url, analysis_protocol_url, http_requests
auth_headers, link_url, analysis_protocol_url, http_requests
):
"""Make the analysis process to be associated with the analysis_protocol to let Ingest create the links.json.
Args:
auth_headers (dict): Dict representing headers to use for auth.
link_url (str): The url for link protocols to processes.
analysis_protocol_url (str): The url for creating the analysis_protocol.
http_requests (http_requests.HttpRequests): The HttpRequests object to use for talking to Ingest.
Returns:
dict: Dict representing the JSON response to the request that adding the file reference.
Raises:
requests.HTTPError: For 4xx errors or 5xx errors beyond timeout.
"""
link_headers = {'content-type': 'text/uri-list'}
link_headers.update(auth_headers)
response = http_requests.put(
link_url, headers=link_headers, data=analysis_protocol_url
)
return response.json()


def main():
Expand Down

0 comments on commit 8695d76

Please sign in to comment.