This repository has been archived by the owner on Apr 2, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathemr.js
101 lines (72 loc) · 2.51 KB
/
emr.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
const AWS = require('aws-sdk');
const config = require('./config.dev.json');
const emr = new AWS.EMR({
region: config.region || 'us-east-1'
});
const sns = new AWS.SNS({
region: config.region || 'us-east-1'
});
const CLUSTER_ID = config.emr_cluster_id;
const INSTANCE_GROUP_ID = config.emr_instance_group_id;
const TIMEOUT = config.emr_timeout;
exports.scaleOut = async (event, context) => {
console.info(JSON.stringify(event));
console.info(JSON.stringify(context));
let _timeout = TIMEOUT;
const params = {
ClusterId: CLUSTER_ID,
InstanceGroups: [
{
InstanceGroupId: INSTANCE_GROUP_ID,
InstanceCount: config.emr_desired_instance_count
}
]
};
const aws_partition = config.region.startsWith('cn-') ? 'aws-cn': 'aws';
const topicArn = `arn:${aws_partition}:sns:${config.region}:${getAccountId(context)}:emr-scale-out-failed`;
// Scale out instance group
await emr.modifyInstanceGroups(params).promise();
// wait for cluster to run into resizing status, wait for 2 minutes
const errMessage = `EMR cluster ${CLUSTER_ID} scale out failed`;
// Loop to get instance group status
while (1) {
await sleep(10 * 1000);
_timeout -= 10;
if (_timeout < 0) {
const snsParams = {
TopicArn: topicArn,
Message: errMessage
};
await sns.publish(snsParams).promise();
throw new Error(errMessage);
}
const instanceGroup = await getInstanceGroup(CLUSTER_ID, INSTANCE_GROUP_ID);
console.log(`status: ${instanceGroup.Status.State}, requested: ${instanceGroup.RequestedInstanceCount}, current: ${instanceGroup.RunningInstanceCount}`);
if (instanceGroup.Status.State === 'RUNNING' && instanceGroup.RequestedInstanceCount <= instanceGroup.RunningInstanceCount) {
// TODO: INSERT CODE HERE TO submit EMR step
return 'OK'
}
}
};
exports.scaleIn = async () => {
const params = {
ClusterId: CLUSTER_ID,
InstanceGroups: [
{
InstanceGroupId: INSTANCE_GROUP_ID,
InstanceCount: config.emr_initial_instance_count
}
]
};
return await emr.modifyInstanceGroups(params).promise();
};
async function getInstanceGroup(clusterId, groupId) {
const groupsData = await emr.listInstanceGroups({ ClusterId: clusterId }).promise();
return groupsData.InstanceGroups.find(group => group.Id === groupId );
}
function sleep(millis) {
return new Promise(resolve => setTimeout(resolve, millis));
}
function getAccountId(context) {
return context.invokedFunctionArn.split(':')[4]
}