Skip to content

Commit

Permalink
Merge pull request #94 from opensds/development
Browse files Browse the repository at this point in the history
Sync Development to Master
  • Loading branch information
wisererik authored Jul 19, 2019
2 parents 43d8022 + 910551b commit 27d70a9
Show file tree
Hide file tree
Showing 27 changed files with 1,058 additions and 153 deletions.
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ include =
orchestration/*
omit =
tests/*
orchestration/connectionmanager/credcreator.py
2 changes: 1 addition & 1 deletion contrib/st2/opensds/actions/create_bucket_migration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ parameters:
user_id:
type: string
description: "User ID."
required: true
required: false
name:
type: string
description: "Name for the Bucket Migration Dataflow."
Expand Down
52 changes: 52 additions & 0 deletions contrib/st2/opensds/actions/data_analysis.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
description: Migrate and Data Analysis Service
enabled: true
runner_type: mistral-v2
entry_point: workflows/data_analysis.yaml
name: data-analysis
pack: opensds
parameters:
ip_addr:
required: true
description: "Host IP for the OpenSDS."
type: string
port:
required: true
description: "Port for the service."
type: string
tenant_id:
required: true
description: "Tenant ID."
type: string
user_id:
required: false
description: "User ID."
type: string
name:
required: true
description: "Instance Name, Will Be Used To Create Migration Plan."
type: string
dest_backend_name:
required: true
description: "Destination Backend Storage Name."
type: string
src_bucket_name:
required: true
description: "Source Bucket Name."
type: string
dest_bucket_name:
required: true
description: "Destination Bucket Name."
type: string
analysis_engine_type:
required: true
description: "Data Analysis Egine Type."
type: string
analysis_args:
required: true
description: "Arguments Used for Data Analysis."
type: object
auth_token:
required: false
description: "Authentication Token."
type: string
31 changes: 31 additions & 0 deletions contrib/st2/opensds/actions/delete_bucket_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2019 The OpenSDS Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import requests

from st2common.runners.base_action import Action


class DeleteMigrationPlanAction(Action):
def run(self, url,
tenant_id,
user_id,
auth_token):
headers = {
'accept': 'application/json',
'content-type': 'application/json',
'x-auth-token': auth_token
}
r = requests.delete(url=url, headers=headers)
r.raise_for_status()
23 changes: 23 additions & 0 deletions contrib/st2/opensds/actions/delete_bucket_migration.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
description: Delete Bucket Migration Dataflow Plan from Multi-cloud
enabled: true
entry_point: delete_bucket_migration.py
name: delete-bucket-migration
parameters:
url:
type: string
description: "URL for Deleting Bucket Migration Dataflow Plan."
required: true
tenant_id:
type: string
description: "Tenant ID."
required: true
user_id:
type: string
description: "User ID."
required: false
auth_token:
type: string
description: "Authentication Token."
required: false
runner_type: "python-script"
159 changes: 159 additions & 0 deletions contrib/st2/opensds/actions/execute_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Copyright 2019 The OpenSDS Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import boto3
import time

from st2common.runners.base_action import Action

RET_SUCCEED = "success"
RET_FAILED = "failed"


# format of args:
# {
# 'Name': 'string'
# 'AK': 'string'
# 'SK': 'string'
# 'Region': 'string' #example: 'us-east-1'
# 'ReleaseLabel': 'string' #example: 'emr-5.20.0'
# 'Instances': {
# 'MasterInstanceType': 'string' #example: 'm3.xlarge'
# 'SlaveInstanceType': 'string' #example: 'm3.xlarge'
# 'InstanceCount': 123 #example: 4 (means 1 master node and 3 slave nodes)
# 'KeepJobFlowAliveWhenNoSteps': True|False
# 'TerminationProtected': True|False
# }
# 'JobFlowRole': 'string' #example: 'EMR_EC2_DefaultRole'
# 'ServiceRole': 'string' #example: 'EMR_DefaultRole'
# 'VisibleToAllUsers': True|False
# 'Applications': [
# {
# 'Name': 'string' #example: 'Hadoop'
# }
# ]
# 'Steps': [
# {
# 'Name': 'string'
# 'ActionOnFailure': 'TERMINATE_JOB_FLOW'|'TERMINATE_CLUSTER'|'CANCEL_AND_WAIT'|'CONTINUE'
# 'HadoopJarStep': {
# 'Jar': 'string' #example: 's3n://elasticmapreduce/samples/cloudfront/logprocessor.jar' or
# 'command-runner.jar'
# 'Args':['string'] #example: ['-input', 's3n://mytest/analysis_bucket/input', '-output',
# 's3n://mytest/analysis_bucket/output'] or ['state-pusher-script']
# }
# }
# ]
# }


class ExecAnalysisAction(Action):
def run(self, analysis_engine_type="", args="", auth_token=""):
ret = RET_SUCCEED
if analysis_engine_type == 'aws emr':
status = aws_analysis(args)
if status != 'SUCCEED':
ret = RET_FAILED
else:
print("unsupported analysis engine type:%s" % analysis_engine_type)
ret = RET_FAILED

return ret


def get_job_end_status(client, jid):
status = 'FAILED'
while True:
# get status each 30 seconds
time.sleep(30)
resp = client.describe_cluster(ClusterId=jid)
print(resp)
# value for State:
# 'STARTING'|'BOOTSTRAPPING'|'RUNNING'|'WAITING'|'TERMINATING'|'TERMINATED'|'TERMINATED_WITH_ERRORS'
state = resp.get("Cluster").get("Status").get("State")
# value of StateChangeReason.Code: INTERNAL_ERROR | VALIDATION_ERROR |
# INSTANCE_FAILURE | INSTANCE_FLEET_TIMEOUT | BOOTSTRAP_FAILURE |
# USER_REQUEST | STEP_FAILURE | ALL_STEPS_COMPLETED
if state == 'TERMINATED' or state == 'TERMINATED_WITH_ERRORS':
terminateCode = resp.get("Cluster").get(
"Status").get("StateChangeReason").get("Code")
print("job terminate state: %s" % state)
print("job terminate reason: %s" % terminateCode)
if terminateCode == 'ALL_STEPS_COMPLETED':
status = 'SUCCEED'
break

return status


def aws_analysis(args):
# create a client
client = boto3.client(
'emr',
aws_access_key_id=args['AK'],
aws_secret_access_key=args['SK'],
region_name=args["Region"]
)

apps = []
for app in args['Applications']:
apps.append(app)

steps = []
for step in args['Steps']:
steps.append(step)

count = args['Instances']['InstanceCount']
instances = {
'InstanceCount': count,
'MasterInstanceType': args['Instances']['MasterInstanceType'],
'KeepJobFlowAliveWhenNoSteps': False, # TODO: support True
'TerminationProtected': False, # TODO: support True
}
if count > 1:
instances['SlaveInstanceType'] = args['Instances']['SlaveInstanceType']

response = client.run_job_flow(
Name=args['Name'],
ReleaseLabel=args['ReleaseLabel'],
Instances=instances,
JobFlowRole=args['JobFlowRole'],
ServiceRole=args['ServiceRole'],
VisibleToAllUsers=args['VisibleToAllUsers'],
Applications=apps,
Steps=steps
)

# get returned status
# response example: {'ResponseMetadata': {'RetryAttempts': 0, 'HTTPStatusCode': 200,
# 'RequestId': '026ec48e-8da3-11e9-baa7-5170a521c0b0',
# 'HTTPHeaders': {'x-amzn-requestid': '026ec48e-8da3-11e9-baa7-5170a521c0b0',
# 'date': 'Thu, 13 Jun 2019 06:18:07 GMT', 'content-length': '31',
# 'content-type': 'application/x-amz-json-1.1'}}, u'JobFlowId': u'j-3IVHGVODNQK78'}
http_code = response['ResponseMetadata']['HTTPStatusCode']
if http_code != 200:
print("aws emr: run job flow failed, http_code is %d" % http_code)
raise RuntimeError('run aws emr job flow failed')

print(response)

flowid = response['JobFlowId']
final_status = get_job_end_status(client, flowid)
if final_status != 'SUCCEED':
print(
"aws emr: job flow run failed, id=%s, job_status=%s" %
(flowid, final_status))
raise RuntimeError('aws emr job run failed')

print("aws emr: job flow run succeed, id=%s" % flowid)
56 changes: 56 additions & 0 deletions contrib/st2/opensds/actions/execute_analysis.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
---
description: execuete data analysis by calling APIs of cloud vendor
enabled: true
entry_point: execute_analysis.py
name: execute-analysis
parameters:
timeout:
default: 99999999
analysis_engine_type:
type: string
description: "aws-emr, huawei-mrs, etc. but only aws-emr supported now."
required: true
args:
type: object
description: "cloud specific arguments for data analysis, in json string, example is as below."
# aws input format:
# {
# 'Name': 'string'
# 'AK': 'string'
# 'SK': 'string'
# 'Region': 'string' #example: 'us-east-1'
# 'ReleaseLabel': 'string' #example: 'emr-5.20.0'
# 'Instances': {
# 'MasterInstanceType': 'string' #example: 'm3.xlarge'
# 'SlaveInstanceType': 'string' #example: 'm3.xlarge'
# 'InstanceCount': 123 #example: 4 (means 1 master node and 3 slave nodes)
# 'KeepJobFlowAliveWhenNoSteps': True|False
# 'TerminationProtected': True|False
# }
# 'JobFlowRole': 'string' #example: 'EMR_EC2_DefaultRole'
# 'ServiceRole': 'string' #example: 'EMR_DefaultRole'
# 'VisibleToAllUsers': True|False
# 'Applications': [
# {
# 'Name': 'string' #example: 'Hadoop'
# }
# ]
# 'Steps': [
# {
# 'Name': 'string'
# 'ActionOnFailure': 'TERMINATE_JOB_FLOW'|'TERMINATE_CLUSTER'|'CANCEL_AND_WAIT'|'CONTINUE'
# 'HadoopJarStep': {
# # example: 's3n://elasticmapreduce/logprocessor.jar' or 'command-runner.jar'
# 'Jar': 'string'
# # example: ['-input', 's3n://test/input', '-output', 's3n://test/output'] or ['state-pusher-script']
# 'Args':['string']
# }
# }
# ]
# }
required: true
auth_token:
type: string
description: "Authentication Token."
required: false
runner_type: "python-script"
3 changes: 2 additions & 1 deletion contrib/st2/opensds/actions/get_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
class GetBucketAction(Action):
def run(self, url, bucket_name, auth_token):
headers = {
'x-auth-token': auth_token
'x-auth-token': auth_token,
'Content-type': 'application/xml'
}
r = requests.get(url=url, headers=headers)
r.raise_for_status()
Expand Down
2 changes: 2 additions & 0 deletions contrib/st2/opensds/actions/get_bucket_migration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ enabled: true
entry_point: get_bucket_migration.py
name: get-bucket-migration
parameters:
timeout:
default: 7200
url:
type: string
description: "URL for getting the Bucket Migration Information for multi-cloud S3 Interface."
Expand Down
38 changes: 38 additions & 0 deletions contrib/st2/opensds/actions/load_objects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2019 The OpenSDS Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import requests
import xml.etree.ElementTree as ET

from st2common.runners.base_action import Action


class LoadObjsAction(Action):
def run(self, url, backend_name, prefix, auth_token):
body = """<?xml version='1.0' encoding='utf-8'?>
<LoadObjectsReq>
<Backend>{}</Backend>
<Prefix>{}</Prefix>
</LoadObjectsReq>""".format(backend_name, prefix)
headers = {'content-type': 'application/xml',
'x-auth-token': auth_token,
'accept': 'application/xml'
}
r = requests.put(url=url, data=body, headers=headers)
print(r.content)
r.raise_for_status()
if r.status_code == requests.codes.ok:
response = ET.fromstring(r.text)
msg = response.find("Msg")
print(msg)
Loading

0 comments on commit 27d70a9

Please sign in to comment.