Skip to content

Commit

Permalink
[SPARK-3398] [EC2] Have spark-ec2 intelligently wait for specific clu…
Browse files Browse the repository at this point in the history
…ster states

Instead of waiting arbitrary amounts of time for the cluster to reach a specific state, this patch lets `spark-ec2` explicitly wait for a cluster to reach a desired state.

This is useful in a couple of situations:
* The cluster is launching and you want to wait until SSH is available before installing stuff.
* The cluster is being terminated and you want to wait until all the instances are terminated before trying to delete security groups.

This patch removes the need for the `--wait` option and removes some of the time-based retry logic that was being used.

Author: Nicholas Chammas <[email protected]>

Closes apache#2339 from nchammas/spark-ec2-wait-properly and squashes the following commits:

43a69f0 [Nicholas Chammas] short-circuit SSH check; linear backoff
9a9e035 [Nicholas Chammas] remove extraneous comment
26c5ed0 [Nicholas Chammas] replace print with write()
bb67c06 [Nicholas Chammas] deprecate wait option; remove dead code
7969265 [Nicholas Chammas] fix long line (PEP 8)
126e4cf [Nicholas Chammas] wait for specific cluster states
  • Loading branch information
nchammas authored and JoshRosen committed Oct 7, 2014
1 parent b32bb72 commit 5912ca6
Showing 1 changed file with 86 additions and 25 deletions.
111 changes: 86 additions & 25 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import tempfile
import time
import urllib2
import warnings
from optparse import OptionParser
from sys import stderr
import boto
Expand Down Expand Up @@ -61,8 +62,8 @@ def parse_args():
"-s", "--slaves", type="int", default=1,
help="Number of slaves to launch (default: %default)")
parser.add_option(
"-w", "--wait", type="int", default=120,
help="Seconds to wait for nodes to start (default: %default)")
"-w", "--wait", type="int",
help="DEPRECATED (no longer necessary) - Seconds to wait for nodes to start")
parser.add_option(
"-k", "--key-pair",
help="Key pair to use on instances")
Expand Down Expand Up @@ -195,18 +196,6 @@ def get_or_make_group(conn, name):
return conn.create_security_group(name, "Spark EC2 group")


# Wait for a set of launched instances to exit the "pending" state
# (i.e. either to start running or to fail and be terminated)
def wait_for_instances(conn, instances):
while True:
for i in instances:
i.update()
if len([i for i in instances if i.state == 'pending']) > 0:
time.sleep(5)
else:
return


# Check whether a given EC2 instance object is in a state we consider active,
# i.e. not terminating or terminated. We count both stopping and stopped as
# active since we can restart stopped clusters.
Expand Down Expand Up @@ -619,14 +608,64 @@ def setup_spark_cluster(master, opts):
print "Ganglia started at http://%s:5080/ganglia" % master


# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes):
print "Waiting for instances to start up..."
time.sleep(5)
wait_for_instances(conn, master_nodes)
wait_for_instances(conn, slave_nodes)
print "Waiting %d more seconds..." % wait_secs
time.sleep(wait_secs)
def is_ssh_available(host, opts):
"Checks if SSH is available on the host."
try:
with open(os.devnull, 'w') as devnull:
ret = subprocess.check_call(
ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
'%s@%s' % (opts.user, host), stringify_command('true')],
stdout=devnull,
stderr=devnull
)
return ret == 0
except subprocess.CalledProcessError as e:
return False


def is_cluster_ssh_available(cluster_instances, opts):
for i in cluster_instances:
if not is_ssh_available(host=i.ip_address, opts=opts):
return False
else:
return True


def wait_for_cluster_state(cluster_instances, cluster_state, opts):
"""
cluster_instances: a list of boto.ec2.instance.Instance
cluster_state: a string representing the desired state of all the instances in the cluster
value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as
'running', 'terminated', etc.
(would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250)
"""
sys.stdout.write(
"Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state)
)
sys.stdout.flush()

num_attempts = 0

while True:
time.sleep(3 * num_attempts)

for i in cluster_instances:
s = i.update() # capture output to suppress print to screen in newer versions of boto

if cluster_state == 'ssh-ready':
if all(i.state == 'running' for i in cluster_instances) and \
is_cluster_ssh_available(cluster_instances, opts):
break
else:
if all(i.state == cluster_state for i in cluster_instances):
break

num_attempts += 1

sys.stdout.write(".")
sys.stdout.flush()

sys.stdout.write("\n")


# Get number of local disks available for a given EC2 instance type.
Expand Down Expand Up @@ -868,6 +907,16 @@ def real_main():
(opts, action, cluster_name) = parse_args()

# Input parameter validation
if opts.wait is not None:
# NOTE: DeprecationWarnings are silent in 2.7+ by default.
# To show them, run Python with the -Wdefault switch.
# See: https://docs.python.org/3.5/whatsnew/2.7.html
warnings.warn(
"This option is deprecated and has no effect. "
"spark-ec2 automatically waits as long as necessary for clusters to startup.",
DeprecationWarning
)

if opts.ebs_vol_num > 8:
print >> stderr, "ebs-vol-num cannot be greater than 8"
sys.exit(1)
Expand All @@ -890,7 +939,11 @@ def real_main():
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
else:
(master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name)
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
wait_for_cluster_state(
cluster_instances=(master_nodes + slave_nodes),
cluster_state='ssh-ready',
opts=opts
)
setup_cluster(conn, master_nodes, slave_nodes, opts, True)

elif action == "destroy":
Expand Down Expand Up @@ -919,7 +972,11 @@ def real_main():
else:
group_names = [opts.security_group_prefix + "-master",
opts.security_group_prefix + "-slaves"]

wait_for_cluster_state(
cluster_instances=(master_nodes + slave_nodes),
cluster_state='terminated',
opts=opts
)
attempt = 1
while attempt <= 3:
print "Attempt %d" % attempt
Expand Down Expand Up @@ -1019,7 +1076,11 @@ def real_main():
for inst in master_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
wait_for_cluster_state(
cluster_instances=(master_nodes + slave_nodes),
cluster_state='ssh-ready',
opts=opts
)
setup_cluster(conn, master_nodes, slave_nodes, opts, False)

else:
Expand Down

0 comments on commit 5912ca6

Please sign in to comment.