2012-06-11 02:06:15 -04:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
2013-07-16 20:21:33 -04:00
#
2012-06-11 02:06:15 -04:00
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
2013-07-29 20:19:33 -04:00
#
2012-06-11 02:06:15 -04:00
# http://www.apache.org/licenses/LICENSE-2.0
2013-07-29 20:19:33 -04:00
#
2012-06-11 02:06:15 -04:00
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
2013-07-16 20:21:33 -04:00
#
2012-06-11 02:06:15 -04:00
from __future__ import with_statement
import logging
import os
2013-05-17 20:10:47 -04:00
import pipes
2012-06-11 02:06:15 -04:00
import random
import shutil
import subprocess
import sys
import tempfile
import time
import urllib2
from optparse import OptionParser
from sys import stderr
2012-12-11 13:48:21 -05:00
import boto
2012-06-11 02:06:15 -04:00
from boto . ec2 . blockdevicemapping import BlockDeviceMapping , EBSBlockDeviceType
2012-11-19 17:21:16 -05:00
from boto import ec2
2012-06-11 02:06:15 -04:00
2013-09-11 17:59:42 -04:00
class UsageError ( Exception ) :
pass
2013-04-19 01:31:24 -04:00
# A URL prefix from which to fetch AMI information
2013-08-01 18:43:41 -04:00
AMI_PREFIX = " https://raw.github.com/mesos/spark-ec2/v2/ami-list "
2012-06-11 02:06:15 -04:00
# Configure and parse our command-line arguments
def parse_args ( ) :
parser = OptionParser ( usage = " spark-ec2 [options] <action> <cluster_name> "
+ " \n \n <action> can be: launch, destroy, login, stop, start, get-master " ,
add_help_option = False )
parser . add_option ( " -h " , " --help " , action = " help " ,
help = " Show this help message and exit " )
parser . add_option ( " -s " , " --slaves " , type = " int " , default = 1 ,
help = " Number of slaves to launch (default: 1) " )
2012-06-29 19:25:06 -04:00
parser . add_option ( " -w " , " --wait " , type = " int " , default = 120 ,
help = " Seconds to wait for nodes to start (default: 120) " )
2012-06-11 02:06:15 -04:00
parser . add_option ( " -k " , " --key-pair " ,
help = " Key pair to use on instances " )
2013-07-29 20:19:33 -04:00
parser . add_option ( " -i " , " --identity-file " ,
2012-06-11 02:06:15 -04:00
help = " SSH private key file to use for logging into instances " )
parser . add_option ( " -t " , " --instance-type " , default = " m1.large " ,
help = " Type of instance to launch (default: m1.large). " +
" WARNING: must be 64-bit; small instances won ' t work " )
parser . add_option ( " -m " , " --master-instance-type " , default = " " ,
help = " Master instance type (leave empty for same as instance-type) " )
2012-08-29 01:40:00 -04:00
parser . add_option ( " -r " , " --region " , default = " us-east-1 " ,
help = " EC2 region zone to launch instances in " )
parser . add_option ( " -z " , " --zone " , default = " " ,
2012-11-16 20:25:28 -05:00
help = " Availability zone to launch instances in, or ' all ' to spread " +
2012-11-18 02:09:11 -05:00
" slaves across multiple (an additional $0.01/Gb for bandwidth " +
" between zones applies) " )
2013-08-21 18:34:31 -04:00
parser . add_option ( " -a " , " --ami " , help = " Amazon Machine Image ID to use " )
2013-10-05 19:59:11 -04:00
parser . add_option ( " -v " , " --spark-version " , default = " 0.8.0 " ,
2013-06-11 22:53:23 -04:00
help = " Version of Spark to use: ' X.Y.Z ' or a specific git hash " )
2013-11-10 01:52:23 -05:00
parser . add_option ( " --spark-git-repo " ,
2013-11-01 21:41:49 -04:00
default = " https://github.com/apache/incubator-spark " ,
2013-06-11 22:53:23 -04:00
help = " Github repo from which to checkout supplied commit hash " )
2013-08-20 18:49:52 -04:00
parser . add_option ( " --hadoop-major-version " , default = " 1 " ,
help = " Major version of Hadoop (default: 1) " )
2013-11-10 01:52:23 -05:00
parser . add_option ( " -D " , metavar = " [ADDRESS:]PORT " , dest = " proxy_port " ,
2012-06-11 02:06:15 -04:00
help = " Use SSH dynamic port forwarding to create a SOCKS proxy at " +
" the given local address (for use with login) " )
parser . add_option ( " --resume " , action = " store_true " , default = False ,
help = " Resume installation on a previously launched cluster " +
" (for debugging) " )
parser . add_option ( " --ebs-vol-size " , metavar = " SIZE " , type = " int " , default = 0 ,
help = " Attach a new EBS volume of size SIZE (in GB) to each node as " +
" /vol. The volumes will be deleted when the instances terminate. " +
" Only possible on EBS-backed AMIs. " )
parser . add_option ( " --swap " , metavar = " SWAP " , type = " int " , default = 1024 ,
help = " Swap space to set up per node, in MB (default: 1024) " )
parser . add_option ( " --spot-price " , metavar = " PRICE " , type = " float " ,
help = " If specified, launch slaves as spot instances with the given " +
" maximum price (in dollars) " )
2013-02-17 19:53:12 -05:00
parser . add_option ( " --ganglia " , action = " store_true " , default = True ,
help = " Setup Ganglia monitoring on cluster (default: on). NOTE: " +
" the Ganglia page will be publicly accessible " )
parser . add_option ( " --no-ganglia " , action = " store_false " , dest = " ganglia " ,
help = " Disable Ganglia monitoring for the cluster " )
2012-08-02 18:23:52 -04:00
parser . add_option ( " -u " , " --user " , default = " root " ,
2013-02-17 19:53:12 -05:00
help = " The SSH user you want to connect as (default: root) " )
2012-12-11 13:48:21 -05:00
parser . add_option ( " --delete-groups " , action = " store_true " , default = False ,
2013-02-17 19:53:12 -05:00
help = " When destroying a cluster, delete the security groups that were created " )
2013-11-09 08:12:51 -05:00
parser . add_option ( " --use-existing-master " , action = " store_true " , default = False ,
help = " Launch fresh slaves, but use an existing stopped master if possible " )
2013-07-29 20:19:33 -04:00
2012-06-11 02:06:15 -04:00
( opts , args ) = parser . parse_args ( )
if len ( args ) != 2 :
parser . print_help ( )
sys . exit ( 1 )
( action , cluster_name ) = args
2013-07-03 19:57:22 -04:00
2012-11-19 17:21:16 -05:00
# Boto config check
# http://boto.cloudhackers.com/en/latest/boto_config_tut.html
home_dir = os . getenv ( ' HOME ' )
if home_dir == None or not os . path . isfile ( home_dir + ' /.boto ' ) :
if not os . path . isfile ( ' /etc/boto.cfg ' ) :
if os . getenv ( ' AWS_ACCESS_KEY_ID ' ) == None :
print >> stderr , ( " ERROR: The environment variable AWS_ACCESS_KEY_ID " +
" must be set " )
sys . exit ( 1 )
if os . getenv ( ' AWS_SECRET_ACCESS_KEY ' ) == None :
print >> stderr , ( " ERROR: The environment variable AWS_SECRET_ACCESS_KEY " +
" must be set " )
sys . exit ( 1 )
2012-06-11 02:06:15 -04:00
return ( opts , action , cluster_name )
# Get the EC2 security group of the given name, creating it if it doesn't exist
def get_or_make_group ( conn , name ) :
groups = conn . get_all_security_groups ( )
group = [ g for g in groups if g . name == name ]
if len ( group ) > 0 :
return group [ 0 ]
else :
print " Creating security group " + name
2012-08-02 18:23:52 -04:00
return conn . create_security_group ( name , " Spark EC2 group " )
2012-06-11 02:06:15 -04:00
# Wait for a set of launched instances to exit the "pending" state
# (i.e. either to start running or to fail and be terminated)
def wait_for_instances ( conn , instances ) :
while True :
for i in instances :
i . update ( )
if len ( [ i for i in instances if i . state == ' pending ' ] ) > 0 :
time . sleep ( 5 )
else :
return
# Check whether a given EC2 instance object is in a state we consider active,
# i.e. not terminating or terminated. We count both stopping and stopped as
# active since we can restart stopped clusters.
def is_active ( instance ) :
return ( instance . state in [ ' pending ' , ' running ' , ' stopping ' , ' stopped ' ] )
2013-06-07 17:54:26 -04:00
# Return correct versions of Spark and Shark, given the supplied Spark version
2013-06-07 14:54:16 -04:00
def get_spark_shark_version ( opts ) :
2013-10-05 21:10:41 -04:00
spark_shark_map = { " 0.7.3 " : " 0.7.1 " , " 0.8.0 " : " 0.8.0 " }
2013-06-07 14:54:16 -04:00
version = opts . spark_version . replace ( " v " , " " )
2013-07-17 20:43:15 -04:00
if version not in spark_shark_map :
2013-06-07 17:54:26 -04:00
print >> stderr , " Don ' t know about Spark version: %s " % version
sys . exit ( 1 )
2013-06-07 14:54:16 -04:00
return ( version , spark_shark_map [ version ] )
2013-04-19 01:31:24 -04:00
# Attempt to resolve an appropriate AMI given the architecture and
# region of the request.
def get_spark_ami ( opts ) :
instance_types = {
" m1.small " : " pvm " ,
" m1.medium " : " pvm " ,
" m1.large " : " pvm " ,
" m1.xlarge " : " pvm " ,
" t1.micro " : " pvm " ,
" c1.medium " : " pvm " ,
" c1.xlarge " : " pvm " ,
" m2.xlarge " : " pvm " ,
" m2.2xlarge " : " pvm " ,
" m2.4xlarge " : " pvm " ,
" cc1.4xlarge " : " hvm " ,
" cc2.8xlarge " : " hvm " ,
" cg1.4xlarge " : " hvm " ,
" hs1.8xlarge " : " hvm " ,
" hi1.4xlarge " : " hvm " ,
" m3.xlarge " : " hvm " ,
" m3.2xlarge " : " hvm " ,
" cr1.8xlarge " : " hvm "
}
if opts . instance_type in instance_types :
instance_type = instance_types [ opts . instance_type ]
else :
instance_type = " pvm "
print >> stderr , \
" Don ' t recognize %s , assuming type is pvm " % opts . instance_type
2013-11-10 01:52:23 -05:00
2013-06-07 14:54:16 -04:00
ami_path = " %s / %s / %s " % ( AMI_PREFIX , opts . region , instance_type )
2013-04-19 01:31:24 -04:00
try :
ami = urllib2 . urlopen ( ami_path ) . read ( ) . strip ( )
print " Spark AMI: " + ami
except :
2013-05-08 20:18:21 -04:00
print >> stderr , " Could not resolve AMI at: " + ami_path
2013-04-19 01:31:24 -04:00
sys . exit ( 1 )
return ami
2012-06-11 02:06:15 -04:00
# Launch a cluster of the given name, by setting up its security groups,
# and then starting new instances in them.
2013-06-26 14:15:48 -04:00
# Returns a tuple of EC2 reservation objects for the master and slaves
2012-06-11 02:06:15 -04:00
# Fails if there already instances running in the cluster's groups.
def launch_cluster ( conn , opts , cluster_name ) :
print " Setting up security groups... "
master_group = get_or_make_group ( conn , cluster_name + " -master " )
slave_group = get_or_make_group ( conn , cluster_name + " -slaves " )
if master_group . rules == [ ] : # Group was just now created
master_group . authorize ( src_group = master_group )
master_group . authorize ( src_group = slave_group )
master_group . authorize ( ' tcp ' , 22 , 22 , ' 0.0.0.0/0 ' )
master_group . authorize ( ' tcp ' , 8080 , 8081 , ' 0.0.0.0/0 ' )
2013-11-10 19:00:09 -05:00
master_group . authorize ( ' tcp ' , 19999 , 19999 , ' 0.0.0.0/0 ' )
2013-01-27 03:26:00 -05:00
master_group . authorize ( ' tcp ' , 50030 , 50030 , ' 0.0.0.0/0 ' )
master_group . authorize ( ' tcp ' , 50070 , 50070 , ' 0.0.0.0/0 ' )
master_group . authorize ( ' tcp ' , 60070 , 60070 , ' 0.0.0.0/0 ' )
2013-09-11 02:12:27 -04:00
master_group . authorize ( ' tcp ' , 4040 , 4045 , ' 0.0.0.0/0 ' )
2013-01-27 03:26:00 -05:00
if opts . ganglia :
2013-02-18 19:56:01 -05:00
master_group . authorize ( ' tcp ' , 5080 , 5080 , ' 0.0.0.0/0 ' )
2012-06-11 02:06:15 -04:00
if slave_group . rules == [ ] : # Group was just now created
slave_group . authorize ( src_group = master_group )
slave_group . authorize ( src_group = slave_group )
slave_group . authorize ( ' tcp ' , 22 , 22 , ' 0.0.0.0/0 ' )
slave_group . authorize ( ' tcp ' , 8080 , 8081 , ' 0.0.0.0/0 ' )
2013-01-27 03:26:00 -05:00
slave_group . authorize ( ' tcp ' , 50060 , 50060 , ' 0.0.0.0/0 ' )
slave_group . authorize ( ' tcp ' , 50075 , 50075 , ' 0.0.0.0/0 ' )
slave_group . authorize ( ' tcp ' , 60060 , 60060 , ' 0.0.0.0/0 ' )
slave_group . authorize ( ' tcp ' , 60075 , 60075 , ' 0.0.0.0/0 ' )
2012-06-11 02:06:15 -04:00
# Check if instances are already running in our groups
2013-11-09 08:12:51 -05:00
existing_masters , existing_slaves = get_existing_cluster ( conn , opts , cluster_name ,
die_on_error = False )
if existing_slaves or ( existing_masters and not opts . use_existing_master ) :
2012-11-03 20:02:47 -04:00
print >> stderr , ( " ERROR: There are already instances running in " +
2013-07-11 19:50:27 -04:00
" group %s or %s " % ( master_group . name , slave_group . name ) )
2012-11-03 20:02:47 -04:00
sys . exit ( 1 )
2012-06-11 02:06:15 -04:00
2013-05-09 01:50:53 -04:00
# Figure out Spark AMI
2013-06-13 13:30:59 -04:00
if opts . ami is None :
2013-04-19 01:31:24 -04:00
opts . ami = get_spark_ami ( opts )
2012-06-11 02:06:15 -04:00
print " Launching instances... "
try :
image = conn . get_all_images ( image_ids = [ opts . ami ] ) [ 0 ]
except :
print >> stderr , " Could not find AMI " + opts . ami
sys . exit ( 1 )
# Create block device mapping so that we can add an EBS volume if asked to
block_map = BlockDeviceMapping ( )
if opts . ebs_vol_size > 0 :
device = EBSBlockDeviceType ( )
device . size = opts . ebs_vol_size
device . delete_on_termination = True
block_map [ " /dev/sdv " ] = device
# Launch slaves
if opts . spot_price != None :
# Launch spot instances with the requested price
print ( " Requesting %d slaves as spot instances with price $ %.3f " %
( opts . slaves , opts . spot_price ) )
2012-11-16 20:25:28 -05:00
zones = get_zones ( conn , opts )
num_zones = len ( zones )
i = 0
my_req_ids = [ ]
for zone in zones :
num_slaves_this_zone = get_partition ( opts . slaves , num_zones , i )
slave_reqs = conn . request_spot_instances (
price = opts . spot_price ,
image_id = opts . ami ,
launch_group = " launch-group- %s " % cluster_name ,
placement = zone ,
count = num_slaves_this_zone ,
key_name = opts . key_pair ,
security_groups = [ slave_group ] ,
instance_type = opts . instance_type ,
block_device_map = block_map )
my_req_ids + = [ req . id for req in slave_reqs ]
i + = 1
2013-07-29 20:19:33 -04:00
2012-06-11 02:06:15 -04:00
print " Waiting for spot instances to be granted... "
2012-10-31 02:32:38 -04:00
try :
while True :
time . sleep ( 10 )
reqs = conn . get_all_spot_instance_requests ( )
id_to_req = { }
for r in reqs :
id_to_req [ r . id ] = r
active_instance_ids = [ ]
for i in my_req_ids :
if i in id_to_req and id_to_req [ i ] . state == " active " :
active_instance_ids . append ( id_to_req [ i ] . instance_id )
if len ( active_instance_ids ) == opts . slaves :
print " All %d slaves granted " % opts . slaves
reservations = conn . get_all_instances ( active_instance_ids )
slave_nodes = [ ]
for r in reservations :
slave_nodes + = r . instances
break
else :
print " %d of %d slaves granted, waiting longer " % (
len ( active_instance_ids ) , opts . slaves )
except :
print " Canceling spot instance requests "
conn . cancel_spot_instance_requests ( my_req_ids )
# Log a warning if any of these requests actually launched instances:
2013-06-26 14:15:48 -04:00
( master_nodes , slave_nodes ) = get_existing_cluster (
2012-10-31 02:32:38 -04:00
conn , opts , cluster_name , die_on_error = False )
2013-06-26 14:15:48 -04:00
running = len ( master_nodes ) + len ( slave_nodes )
2012-10-31 02:32:38 -04:00
if running :
print >> stderr , ( " WARNING: %d instances are still running " % running )
sys . exit ( 0 )
2012-06-11 02:06:15 -04:00
else :
# Launch non-spot instances
2012-11-16 20:25:28 -05:00
zones = get_zones ( conn , opts )
num_zones = len ( zones )
i = 0
slave_nodes = [ ]
for zone in zones :
num_slaves_this_zone = get_partition ( opts . slaves , num_zones , i )
2012-12-11 13:48:21 -05:00
if num_slaves_this_zone > 0 :
slave_res = image . run ( key_name = opts . key_pair ,
security_groups = [ slave_group ] ,
instance_type = opts . instance_type ,
placement = zone ,
min_count = num_slaves_this_zone ,
max_count = num_slaves_this_zone ,
block_device_map = block_map )
slave_nodes + = slave_res . instances
print " Launched %d slaves in %s , regid = %s " % ( num_slaves_this_zone ,
zone , slave_res . id )
2012-11-16 20:25:28 -05:00
i + = 1
2012-06-11 02:06:15 -04:00
2013-11-09 08:12:51 -05:00
# Launch or resume masters
if existing_masters :
print " Starting master... "
for inst in existing_masters :
if inst . state not in [ " shutting-down " , " terminated " ] :
inst . start ( )
master_nodes = existing_masters
else :
master_type = opts . master_instance_type
if master_type == " " :
master_type = opts . instance_type
if opts . zone == ' all ' :
opts . zone = random . choice ( conn . get_all_zones ( ) ) . name
master_res = image . run ( key_name = opts . key_pair ,
security_groups = [ master_group ] ,
instance_type = master_type ,
placement = opts . zone ,
min_count = 1 ,
max_count = 1 ,
block_device_map = block_map )
master_nodes = master_res . instances
print " Launched master in %s , regid = %s " % ( zone , master_res . id )
2012-06-11 02:06:15 -04:00
# Return all the instances
2013-06-26 14:15:48 -04:00
return ( master_nodes , slave_nodes )
2012-06-11 02:06:15 -04:00
# Get the EC2 instances in an existing cluster if available.
2013-06-26 14:15:48 -04:00
# Returns a tuple of lists of EC2 instance objects for the masters and slaves
2012-10-18 13:01:38 -04:00
def get_existing_cluster ( conn , opts , cluster_name , die_on_error = True ) :
2012-06-11 02:06:15 -04:00
print " Searching for existing cluster " + cluster_name + " ... "
reservations = conn . get_all_instances ( )
master_nodes = [ ]
slave_nodes = [ ]
for res in reservations :
active = [ i for i in res . instances if is_active ( i ) ]
2013-09-19 17:09:26 -04:00
for inst in active :
group_names = [ g . name for g in inst . groups ]
2012-06-11 02:06:15 -04:00
if group_names == [ cluster_name + " -master " ] :
2013-09-19 17:09:26 -04:00
master_nodes . append ( inst )
2012-06-11 02:06:15 -04:00
elif group_names == [ cluster_name + " -slaves " ] :
2013-09-19 17:09:26 -04:00
slave_nodes . append ( inst )
2013-06-26 14:15:48 -04:00
if any ( ( master_nodes , slave_nodes ) ) :
print ( " Found %d master(s), %d slaves " %
( len ( master_nodes ) , len ( slave_nodes ) ) )
2012-10-18 13:01:38 -04:00
if ( master_nodes != [ ] and slave_nodes != [ ] ) or not die_on_error :
2013-06-26 14:15:48 -04:00
return ( master_nodes , slave_nodes )
2012-06-11 02:06:15 -04:00
else :
if master_nodes == [ ] and slave_nodes != [ ] :
print " ERROR: Could not find master in group " + cluster_name + " -master "
elif master_nodes != [ ] and slave_nodes == [ ] :
print " ERROR: Could not find slaves in group " + cluster_name + " -slaves "
else :
print " ERROR: Could not find any existing cluster "
sys . exit ( 1 )
# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.
2013-06-26 14:15:48 -04:00
def setup_cluster ( conn , master_nodes , slave_nodes , opts , deploy_ssh_key ) :
2013-01-28 14:16:14 -05:00
master = master_nodes [ 0 ] . public_dns_name
if deploy_ssh_key :
2013-07-03 19:57:22 -04:00
print " Generating cluster ' s SSH key on master... "
key_setup = """
[ - f ~ / . ssh / id_rsa ] | |
( ssh - keygen - q - t rsa - N ' ' - f ~ / . ssh / id_rsa & &
cat ~ / . ssh / id_rsa . pub >> ~ / . ssh / authorized_keys )
"""
ssh ( master , opts , key_setup )
dot_ssh_tar = ssh_read ( master , opts , [ ' tar ' , ' c ' , ' .ssh ' ] )
print " Transferring cluster ' s SSH key to slaves... "
for slave in slave_nodes :
print slave . public_dns_name
ssh_write ( slave . public_dns_name , opts , [ ' tar ' , ' x ' ] , dot_ssh_tar )
2013-01-28 14:16:14 -05:00
2013-11-10 01:52:23 -05:00
modules = [ ' spark ' , ' shark ' , ' ephemeral-hdfs ' , ' persistent-hdfs ' ,
' mapreduce ' , ' spark-standalone ' , ' tachyon ' ]
2013-01-27 01:48:39 -05:00
2013-07-30 17:55:00 -04:00
if opts . hadoop_major_version == " 1 " :
modules = filter ( lambda x : x != " mapreduce " , modules )
2013-01-27 03:26:00 -05:00
if opts . ganglia :
modules . append ( ' ganglia ' )
2013-06-11 20:52:10 -04:00
# NOTE: We should clone the repository before running deploy_files to
# prevent ec2-variables.sh from being overwritten
2013-08-01 15:27:43 -04:00
ssh ( master , opts , " rm -rf spark-ec2 && git clone https://github.com/mesos/spark-ec2.git -b v2 " )
2013-01-27 01:48:39 -05:00
print " Deploying files to master... "
2013-06-26 14:15:48 -04:00
deploy_files ( conn , " deploy.generic " , opts , master_nodes , slave_nodes , modules )
2013-01-28 14:16:14 -05:00
2012-06-11 02:06:15 -04:00
print " Running setup on master... "
2013-06-11 20:52:10 -04:00
setup_spark_cluster ( master , opts )
2012-08-02 18:23:52 -04:00
print " Done! "
def setup_standalone_cluster ( master , slave_nodes , opts ) :
slave_ips = ' \n ' . join ( [ i . public_dns_name for i in slave_nodes ] )
ssh ( master , opts , " echo \" %s \" > spark/conf/slaves " % ( slave_ips ) )
2012-08-14 13:18:24 -04:00
ssh ( master , opts , " /root/spark/bin/start-all.sh " )
2013-07-29 20:19:33 -04:00
2013-01-28 14:16:14 -05:00
def setup_spark_cluster ( master , opts ) :
ssh ( master , opts , " chmod u+x spark-ec2/setup.sh " )
2013-06-11 22:53:23 -04:00
ssh ( master , opts , " spark-ec2/setup.sh " )
2013-06-26 14:15:48 -04:00
print " Spark standalone cluster started at http:// %s :8080 " % master
2013-02-18 21:30:36 -05:00
2013-02-18 20:15:22 -05:00
if opts . ganglia :
print " Ganglia started at http:// %s :5080/ganglia " % master
2013-01-28 14:16:14 -05:00
2012-06-11 02:06:15 -04:00
# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
2013-06-26 14:15:48 -04:00
def wait_for_cluster ( conn , wait_secs , master_nodes , slave_nodes ) :
2012-06-11 02:06:15 -04:00
print " Waiting for instances to start up... "
time . sleep ( 5 )
wait_for_instances ( conn , master_nodes )
wait_for_instances ( conn , slave_nodes )
print " Waiting %d more seconds... " % wait_secs
time . sleep ( wait_secs )
# Get number of local disks available for a given EC2 instance type.
def get_num_disks ( instance_type ) :
# From http://docs.amazonwebservices.com/AWSEC2/latest/UserGuide/index.html?InstanceStorage.html
disks_by_instance = {
" m1.small " : 1 ,
2012-10-09 10:57:34 -04:00
" m1.medium " : 1 ,
2012-06-11 02:06:15 -04:00
" m1.large " : 2 ,
" m1.xlarge " : 4 ,
" t1.micro " : 1 ,
" c1.medium " : 1 ,
" c1.xlarge " : 4 ,
" m2.xlarge " : 1 ,
" m2.2xlarge " : 1 ,
" m2.4xlarge " : 2 ,
" cc1.4xlarge " : 2 ,
" cc2.8xlarge " : 4 ,
2013-04-17 21:09:57 -04:00
" cg1.4xlarge " : 2 ,
" hs1.8xlarge " : 24 ,
2013-04-19 01:31:24 -04:00
" cr1.8xlarge " : 2 ,
" hi1.4xlarge " : 2 ,
" m3.xlarge " : 0 ,
" m3.2xlarge " : 0
2012-06-11 02:06:15 -04:00
}
if instance_type in disks_by_instance :
return disks_by_instance [ instance_type ]
else :
print >> stderr , ( " WARNING: Don ' t know number of disks on instance type %s ; assuming 1 "
% instance_type )
return 1
# Deploy the configuration file templates in a given local directory to
# a cluster, filling in any template parameters with information about the
# cluster (e.g. lists of masters and slaves). Files are only deployed to
# the first master instance in the cluster, and we expect the setup
# script to be run on that instance to copy them to other nodes.
2013-06-26 14:15:48 -04:00
def deploy_files ( conn , root_dir , opts , master_nodes , slave_nodes , modules ) :
2012-06-11 02:06:15 -04:00
active_master = master_nodes [ 0 ] . public_dns_name
num_disks = get_num_disks ( opts . instance_type )
hdfs_data_dirs = " /mnt/ephemeral-hdfs/data "
mapred_local_dirs = " /mnt/hadoop/mrlocal "
2012-10-12 22:53:03 -04:00
spark_local_dirs = " /mnt/spark "
2012-06-11 02:06:15 -04:00
if num_disks > 1 :
for i in range ( 2 , num_disks + 1 ) :
hdfs_data_dirs + = " ,/mnt %d /ephemeral-hdfs/data " % i
mapred_local_dirs + = " ,/mnt %d /hadoop/mrlocal " % i
2012-10-12 22:53:03 -04:00
spark_local_dirs + = " ,/mnt %d /spark " % i
2012-06-11 02:06:15 -04:00
2013-06-26 14:15:48 -04:00
cluster_url = " %s :7077 " % active_master
2012-06-11 02:06:15 -04:00
2013-06-11 22:53:23 -04:00
if " . " in opts . spark_version :
2013-07-11 19:18:16 -04:00
# Pre-built spark & shark deploy
2013-06-11 22:53:23 -04:00
( spark_v , shark_v ) = get_spark_shark_version ( opts )
2013-07-11 19:18:16 -04:00
else :
2013-07-11 18:44:31 -04:00
# Spark-only custom deploy
spark_v = " %s | %s " % ( opts . spark_git_repo , opts . spark_version )
shark_v = " "
modules = filter ( lambda x : x != " shark " , modules )
2013-06-11 22:53:23 -04:00
2012-06-11 02:06:15 -04:00
template_vars = {
" master_list " : ' \n ' . join ( [ i . public_dns_name for i in master_nodes ] ) ,
" active_master " : active_master ,
" slave_list " : ' \n ' . join ( [ i . public_dns_name for i in slave_nodes ] ) ,
" cluster_url " : cluster_url ,
" hdfs_data_dirs " : hdfs_data_dirs ,
2012-10-12 22:53:03 -04:00
" mapred_local_dirs " : mapred_local_dirs ,
2013-01-27 01:48:39 -05:00
" spark_local_dirs " : spark_local_dirs ,
" swap " : str ( opts . swap ) ,
2013-06-11 22:53:23 -04:00
" modules " : ' \n ' . join ( modules ) ,
" spark_version " : spark_v ,
2013-06-12 21:39:01 -04:00
" shark_version " : shark_v ,
" hadoop_major_version " : opts . hadoop_major_version
2012-06-11 02:06:15 -04:00
}
# Create a temp directory in which we will place all the files to be
# deployed after we substitue template parameters in them
tmp_dir = tempfile . mkdtemp ( )
for path , dirs , files in os . walk ( root_dir ) :
if path . find ( " .svn " ) == - 1 :
dest_dir = os . path . join ( ' / ' , path [ len ( root_dir ) : ] )
local_dir = tmp_dir + dest_dir
if not os . path . exists ( local_dir ) :
os . makedirs ( local_dir )
for filename in files :
if filename [ 0 ] not in ' #.~ ' and filename [ - 1 ] != ' ~ ' :
dest_file = os . path . join ( dest_dir , filename )
local_file = tmp_dir + dest_file
with open ( os . path . join ( path , filename ) ) as src :
with open ( local_file , " w " ) as dest :
text = src . read ( )
for key in template_vars :
text = text . replace ( " {{ " + key + " }} " , template_vars [ key ] )
dest . write ( text )
dest . close ( )
# rsync the whole directory over to the master machine
2013-05-17 20:10:47 -04:00
command = [
' rsync ' , ' -rv ' ,
' -e ' , stringify_command ( ssh_command ( opts ) ) ,
" %s / " % tmp_dir ,
" %s @ %s :/ " % ( opts . user , active_master )
]
subprocess . check_call ( command )
2012-06-11 02:06:15 -04:00
# Remove the temp directory we created above
shutil . rmtree ( tmp_dir )
2013-05-17 20:10:47 -04:00
def stringify_command ( parts ) :
if isinstance ( parts , str ) :
return parts
else :
return ' ' . join ( map ( pipes . quote , parts ) )
def ssh_args ( opts ) :
2013-07-03 19:57:22 -04:00
parts = [ ' -o ' , ' StrictHostKeyChecking=no ' ]
if opts . identity_file is not None :
parts + = [ ' -i ' , opts . identity_file ]
2013-05-17 20:10:47 -04:00
return parts
def ssh_command ( opts ) :
return [ ' ssh ' ] + ssh_args ( opts )
2012-06-11 02:06:15 -04:00
2013-04-10 00:37:02 -04:00
# Run a command on a host through ssh, retrying up to two times
# and then throwing an exception if ssh continues to fail.
2012-06-11 02:06:15 -04:00
def ssh ( host , opts , command ) :
2013-04-10 00:37:02 -04:00
tries = 0
while True :
try :
return subprocess . check_call (
2013-05-17 20:10:47 -04:00
ssh_command ( opts ) + [ ' -t ' , ' %s @ %s ' % ( opts . user , host ) , stringify_command ( command ) ] )
2013-04-10 00:37:02 -04:00
except subprocess . CalledProcessError as e :
if ( tries > 2 ) :
2013-09-11 17:59:42 -04:00
# If this was an ssh failure, provide the user with hints.
if e . returncode == 255 :
raise UsageError ( " Failed to SSH to remote host {0} . \n Please check that you have provided the correct --identity-file and --key-pair parameters and try again. " . format ( host ) )
else :
raise e
print >> stderr , " Error executing remote command, retrying after 30 seconds: {0} " . format ( e )
2013-04-10 00:37:02 -04:00
time . sleep ( 30 )
tries = tries + 1
2013-07-29 20:19:33 -04:00
2013-07-03 19:57:22 -04:00
def ssh_read ( host , opts , command ) :
return subprocess . check_output (
ssh_command ( opts ) + [ ' %s @ %s ' % ( opts . user , host ) , stringify_command ( command ) ] )
2013-07-29 20:19:33 -04:00
2012-06-11 02:06:15 -04:00
2013-07-03 19:57:22 -04:00
def ssh_write ( host , opts , command , input ) :
tries = 0
while True :
proc = subprocess . Popen (
ssh_command ( opts ) + [ ' %s @ %s ' % ( opts . user , host ) , stringify_command ( command ) ] ,
stdin = subprocess . PIPE )
proc . stdin . write ( input )
proc . stdin . close ( )
2013-09-11 17:59:42 -04:00
status = proc . wait ( )
if status == 0 :
2013-07-03 19:57:22 -04:00
break
elif ( tries > 2 ) :
2013-09-11 17:59:42 -04:00
raise RuntimeError ( " ssh_write failed with error %s " % proc . returncode )
2013-07-03 19:57:22 -04:00
else :
2013-09-11 17:59:42 -04:00
print >> stderr , " Error {0} while executing remote command, retrying after 30 seconds " . format ( status )
2013-07-03 19:57:22 -04:00
time . sleep ( 30 )
tries = tries + 1
2012-06-11 02:06:15 -04:00
2012-11-16 20:25:28 -05:00
# Gets a list of zones to launch instances in
def get_zones ( conn , opts ) :
if opts . zone == ' all ' :
zones = [ z . name for z in conn . get_all_zones ( ) ]
else :
zones = [ opts . zone ]
return zones
# Gets the number of items in a partition
def get_partition ( total , num_partitions , current_partitions ) :
num_slaves_this_zone = total / num_partitions
if ( total % num_partitions ) - current_partitions > 0 :
num_slaves_this_zone + = 1
return num_slaves_this_zone
2013-09-11 17:59:42 -04:00
def real_main ( ) :
2012-06-11 02:06:15 -04:00
( opts , action , cluster_name ) = parse_args ( )
2012-11-19 17:21:16 -05:00
try :
conn = ec2 . connect_to_region ( opts . region )
except Exception as e :
print >> stderr , ( e )
sys . exit ( 1 )
2012-06-11 02:06:15 -04:00
# Select an AZ at random if it was not specified.
if opts . zone == " " :
opts . zone = random . choice ( conn . get_all_zones ( ) ) . name
if action == " launch " :
if opts . resume :
2013-06-26 14:15:48 -04:00
( master_nodes , slave_nodes ) = get_existing_cluster (
2012-06-11 02:06:15 -04:00
conn , opts , cluster_name )
else :
2013-06-26 14:15:48 -04:00
( master_nodes , slave_nodes ) = launch_cluster (
2012-06-11 02:06:15 -04:00
conn , opts , cluster_name )
2013-06-26 14:15:48 -04:00
wait_for_cluster ( conn , opts . wait , master_nodes , slave_nodes )
setup_cluster ( conn , master_nodes , slave_nodes , opts , True )
2012-06-11 02:06:15 -04:00
elif action == " destroy " :
response = raw_input ( " Are you sure you want to destroy the cluster " +
cluster_name + " ? \n ALL DATA ON ALL NODES WILL BE LOST!! \n " +
" Destroy cluster " + cluster_name + " (y/N): " )
if response == " y " :
2013-06-26 14:15:48 -04:00
( master_nodes , slave_nodes ) = get_existing_cluster (
2012-10-18 13:01:38 -04:00
conn , opts , cluster_name , die_on_error = False )
2012-06-11 02:06:15 -04:00
print " Terminating master... "
for inst in master_nodes :
inst . terminate ( )
print " Terminating slaves... "
for inst in slave_nodes :
inst . terminate ( )
2013-11-10 01:52:23 -05:00
2012-11-16 17:02:43 -05:00
# Delete security groups as well
2012-12-11 13:48:21 -05:00
if opts . delete_groups :
print " Deleting security groups (this will take some time)... "
2013-06-26 14:15:48 -04:00
group_names = [ cluster_name + " -master " , cluster_name + " -slaves " ]
2013-11-10 01:52:23 -05:00
2012-12-11 13:48:21 -05:00
attempt = 1 ;
while attempt < = 3 :
print " Attempt %d " % attempt
groups = [ g for g in conn . get_all_security_groups ( ) if g . name in group_names ]
success = True
# Delete individual rules in all groups before deleting groups to
# remove dependencies between them
for group in groups :
print " Deleting rules in security group " + group . name
for rule in group . rules :
for grant in rule . grants :
success & = group . revoke ( ip_protocol = rule . ip_protocol ,
from_port = rule . from_port ,
to_port = rule . to_port ,
src_group = grant )
2013-07-29 20:19:33 -04:00
2012-12-11 13:48:21 -05:00
# Sleep for AWS eventual-consistency to catch up, and for instances
# to terminate
time . sleep ( 30 ) # Yes, it does have to be this long :-(
for group in groups :
try :
conn . delete_security_group ( group . name )
print " Deleted security group " + group . name
except boto . exception . EC2ResponseError :
success = False ;
print " Failed to delete security group " + group . name
2013-07-29 20:19:33 -04:00
2012-12-11 13:48:21 -05:00
# Unfortunately, group.revoke() returns True even if a rule was not
# deleted, so this needs to be rerun if something fails
if success : break ;
2013-07-29 20:19:33 -04:00
2012-12-11 13:48:21 -05:00
attempt + = 1
2013-07-29 20:19:33 -04:00
2012-12-11 13:48:21 -05:00
if not success :
print " Failed to delete all security groups after 3 tries. "
print " Try re-running in a few minutes. "
2012-06-11 02:06:15 -04:00
elif action == " login " :
2013-06-26 14:15:48 -04:00
( master_nodes , slave_nodes ) = get_existing_cluster (
2012-06-11 02:06:15 -04:00
conn , opts , cluster_name )
master = master_nodes [ 0 ] . public_dns_name
print " Logging into master " + master + " ... "
2013-05-17 20:10:47 -04:00
proxy_opt = [ ]
2012-06-11 02:06:15 -04:00
if opts . proxy_port != None :
2013-05-17 20:10:47 -04:00
proxy_opt = [ ' -D ' , opts . proxy_port ]
subprocess . check_call (
ssh_command ( opts ) + proxy_opt + [ ' -t ' , " %s @ %s " % ( opts . user , master ) ] )
2012-06-11 02:06:15 -04:00
elif action == " get-master " :
2013-06-26 14:15:48 -04:00
( master_nodes , slave_nodes ) = get_existing_cluster ( conn , opts , cluster_name )
2012-06-11 02:06:15 -04:00
print master_nodes [ 0 ] . public_dns_name
elif action == " stop " :
response = raw_input ( " Are you sure you want to stop the cluster " +
cluster_name + " ? \n DATA ON EPHEMERAL DISKS WILL BE LOST, " +
2013-07-29 20:19:33 -04:00
" BUT THE CLUSTER WILL KEEP USING SPACE ON \n " +
2012-06-11 02:06:15 -04:00
" AMAZON EBS IF IT IS EBS-BACKED!! \n " +
2013-11-09 08:12:51 -05:00
" All data on spot-instance slaves will be lost. \n " +
2012-06-11 02:06:15 -04:00
" Stop cluster " + cluster_name + " (y/N): " )
if response == " y " :
2013-06-26 14:15:48 -04:00
( master_nodes , slave_nodes ) = get_existing_cluster (
2012-10-18 13:01:38 -04:00
conn , opts , cluster_name , die_on_error = False )
2012-06-11 02:06:15 -04:00
print " Stopping master... "
for inst in master_nodes :
if inst . state not in [ " shutting-down " , " terminated " ] :
inst . stop ( )
print " Stopping slaves... "
for inst in slave_nodes :
if inst . state not in [ " shutting-down " , " terminated " ] :
2013-11-09 08:12:51 -05:00
if inst . spot_instance_request_id :
inst . terminate ( )
else :
inst . stop ( )
2012-06-11 02:06:15 -04:00
elif action == " start " :
2013-06-26 14:15:48 -04:00
( master_nodes , slave_nodes ) = get_existing_cluster ( conn , opts , cluster_name )
2012-06-11 02:06:15 -04:00
print " Starting slaves... "
for inst in slave_nodes :
if inst . state not in [ " shutting-down " , " terminated " ] :
inst . start ( )
print " Starting master... "
for inst in master_nodes :
if inst . state not in [ " shutting-down " , " terminated " ] :
inst . start ( )
2013-06-26 14:15:48 -04:00
wait_for_cluster ( conn , opts . wait , master_nodes , slave_nodes )
setup_cluster ( conn , master_nodes , slave_nodes , opts , False )
2012-06-11 02:06:15 -04:00
else :
print >> stderr , " Invalid action: %s " % action
sys . exit ( 1 )
2013-09-11 17:59:42 -04:00
def main ( ) :
try :
real_main ( )
except UsageError , e :
print >> stderr , " \n Error: \n " , e
2012-06-11 02:06:15 -04:00
if __name__ == " __main__ " :
logging . basicConfig ( )
main ( )