Merge pull request #603 from pwendell/ec2-updates

Several Improvements to EC2 Scripts
This commit is contained in:
Patrick Wendell 2013-08-24 15:19:56 -07:00
commit f9fc5c160a
4 changed files with 146 additions and 165 deletions

View file

@ -4,10 +4,11 @@ title: Running Spark on EC2
--- ---
The `spark-ec2` script, located in Spark's `ec2` directory, allows you The `spark-ec2` script, located in Spark's `ec2` directory, allows you
to launch, manage and shut down Spark clusters on Amazon EC2. It automatically sets up Mesos, Spark and HDFS to launch, manage and shut down Spark clusters on Amazon EC2. It automatically
on the cluster for you. sets up Spark, Shark and HDFS on the cluster for you. This guide describes
This guide describes how to use `spark-ec2` to launch clusters, how to run jobs on them, and how to shut them down. how to use `spark-ec2` to launch clusters, how to run jobs on them, and how
It assumes you've already signed up for an EC2 account on the [Amazon Web Services site](http://aws.amazon.com/). to shut them down. It assumes you've already signed up for an EC2 account
on the [Amazon Web Services site](http://aws.amazon.com/).
`spark-ec2` is designed to manage multiple named clusters. You can `spark-ec2` is designed to manage multiple named clusters. You can
launch a new cluster (telling the script its size and giving it a name), launch a new cluster (telling the script its size and giving it a name),
@ -59,18 +60,22 @@ RAM). Refer to the Amazon pages about [EC2 instance
types](http://aws.amazon.com/ec2/instance-types) and [EC2 types](http://aws.amazon.com/ec2/instance-types) and [EC2
pricing](http://aws.amazon.com/ec2/#pricing) for information about other pricing](http://aws.amazon.com/ec2/#pricing) for information about other
instance types. instance types.
- `--region=<EC2_REGION>` specifies an EC2 region in which to launch
instances. The default region is `us-east-1`.
- `--zone=<EC2_ZONE>` can be used to specify an EC2 availability zone - `--zone=<EC2_ZONE>` can be used to specify an EC2 availability zone
to launch instances in. Sometimes, you will get an error because there to launch instances in. Sometimes, you will get an error because there
is not enough capacity in one zone, and you should try to launch in is not enough capacity in one zone, and you should try to launch in
another. This happens mostly with the `m1.large` instance types; another.
extra-large (both `m1.xlarge` and `c1.xlarge`) instances tend to be more
available.
- `--ebs-vol-size=GB` will attach an EBS volume with a given amount - `--ebs-vol-size=GB` will attach an EBS volume with a given amount
of space to each node so that you can have a persistent HDFS cluster of space to each node so that you can have a persistent HDFS cluster
on your nodes across cluster restarts (see below). on your nodes across cluster restarts (see below).
- `--spot-price=PRICE` will launch the worker nodes as - `--spot-price=PRICE` will launch the worker nodes as
[Spot Instances](http://aws.amazon.com/ec2/spot-instances/), [Spot Instances](http://aws.amazon.com/ec2/spot-instances/),
bidding for the given maximum price (in dollars). bidding for the given maximum price (in dollars).
- `--spark-version=VERSION` will pre-load the cluster with the
specified version of Spark. VERSION can be a version number
(e.g. "0.7.3") or a specific git hash. By default, a recent
version will be used.
- If one of your launches fails due to e.g. not having the right - If one of your launches fails due to e.g. not having the right
permissions on your private key file, you can run `launch` with the permissions on your private key file, you can run `launch` with the
`--resume` option to restart the setup process on an existing cluster. `--resume` option to restart the setup process on an existing cluster.
@ -99,9 +104,8 @@ permissions on your private key file, you can run `launch` with the
`spark-ec2` to attach a persistent EBS volume to each node for `spark-ec2` to attach a persistent EBS volume to each node for
storing the persistent HDFS. storing the persistent HDFS.
- Finally, if you get errors while running your jobs, look at the slave's logs - Finally, if you get errors while running your jobs, look at the slave's logs
for that job inside of the Mesos work directory (/mnt/mesos-work). You can for that job inside of the scheduler work directory (/root/spark/work). You can
also view the status of the cluster using the Mesos web UI also view the status of the cluster using the web UI: `http://<master-hostname>:8080`.
(`http://<master-hostname>:8080`).
# Configuration # Configuration
@ -140,22 +144,14 @@ section.
# Limitations # Limitations
- `spark-ec2` currently only launches machines in the US-East region of EC2.
It should not be hard to make it launch VMs in other zones, but you will need
to create your own AMIs in them.
- Support for "cluster compute" nodes is limited -- there's no way to specify a - Support for "cluster compute" nodes is limited -- there's no way to specify a
locality group. However, you can launch slave nodes in your locality group. However, you can launch slave nodes in your
`<clusterName>-slaves` group manually and then use `spark-ec2 launch `<clusterName>-slaves` group manually and then use `spark-ec2 launch
--resume` to start a cluster with them. --resume` to start a cluster with them.
- Support for spot instances is limited.
If you have a patch or suggestion for one of these limitations, feel free to If you have a patch or suggestion for one of these limitations, feel free to
[contribute](contributing-to-spark.html) it! [contribute](contributing-to-spark.html) it!
# Using a Newer Spark Version
The Spark EC2 machine images may not come with the latest version of Spark. To use a newer version, you can run `git pull` to pull in `/root/spark` to pull in the latest version of Spark from `git`, and build it using `sbt/sbt compile`. You will also need to copy it to all the other nodes in the cluster using `~/spark-ec2/copy-dir /root/spark`.
# Accessing Data in S3 # Accessing Data in S3
Spark's file interface allows it to process data in Amazon S3 using the same URI formats that are supported for Hadoop. You can specify a path in S3 as input through a URI of the form `s3n://<bucket>/path`. You will also need to set your Amazon security credentials, either by setting the environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` before your program or through `SparkContext.hadoopConfiguration`. Full instructions on S3 access using the Hadoop input libraries can be found on the [Hadoop S3 page](http://wiki.apache.org/hadoop/AmazonS3). Spark's file interface allows it to process data in Amazon S3 using the same URI formats that are supported for Hadoop. You can specify a path in S3 as input through a URI of the form `s3n://<bucket>/path`. You will also need to set your Amazon security credentials, either by setting the environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` before your program or through `SparkContext.hadoopConfiguration`. Full instructions on S3 access using the Hadoop input libraries can be found on the [Hadoop S3 page](http://wiki.apache.org/hadoop/AmazonS3).

View file

@ -1,9 +0,0 @@
#!/usr/bin/env bash
# These variables are automatically filled in by the mesos-ec2 script.
export MESOS_MASTERS="{{master_list}}"
export MESOS_SLAVES="{{slave_list}}"
export MESOS_ZOO_LIST="{{zoo_list}}"
export MESOS_HDFS_DATA_DIRS="{{hdfs_data_dirs}}"
export MESOS_MAPRED_LOCAL_DIRS="{{mapred_local_dirs}}"
export MESOS_SPARK_LOCAL_DIRS="{{spark_local_dirs}}"

View file

@ -1,11 +1,13 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# These variables are automatically filled in by the mesos-ec2 script. # These variables are automatically filled in by the spark-ec2 script.
export MESOS_MASTERS="{{master_list}}" export MASTERS="{{master_list}}"
export MESOS_SLAVES="{{slave_list}}" export SLAVES="{{slave_list}}"
export MESOS_ZOO_LIST="{{zoo_list}}" export HDFS_DATA_DIRS="{{hdfs_data_dirs}}"
export MESOS_HDFS_DATA_DIRS="{{hdfs_data_dirs}}" export MAPRED_LOCAL_DIRS="{{mapred_local_dirs}}"
export MESOS_MAPRED_LOCAL_DIRS="{{mapred_local_dirs}}" export SPARK_LOCAL_DIRS="{{spark_local_dirs}}"
export MESOS_SPARK_LOCAL_DIRS="{{spark_local_dirs}}"
export MODULES="{{modules}}" export MODULES="{{modules}}"
export SPARK_VERSION="{{spark_version}}"
export SHARK_VERSION="{{shark_version}}"
export HADOOP_MAJOR_VERSION="{{hadoop_major_version}}"
export SWAP_MB="{{swap}}" export SWAP_MB="{{swap}}"

View file

@ -36,9 +36,8 @@ import boto
from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType
from boto import ec2 from boto import ec2
# A static URL from which to figure out the latest Mesos EC2 AMI # A URL prefix from which to fetch AMI information
LATEST_AMI_URL = "https://s3.amazonaws.com/mesos-images/ids/latest-spark-0.7" AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list"
# Configure and parse our command-line arguments # Configure and parse our command-line arguments
def parse_args(): def parse_args():
@ -66,10 +65,15 @@ def parse_args():
help="Availability zone to launch instances in, or 'all' to spread " + help="Availability zone to launch instances in, or 'all' to spread " +
"slaves across multiple (an additional $0.01/Gb for bandwidth" + "slaves across multiple (an additional $0.01/Gb for bandwidth" +
"between zones applies)") "between zones applies)")
parser.add_option("-a", "--ami", default="latest", parser.add_option("-a", "--ami", help="Amazon Machine Image ID to use")
help="Amazon Machine Image ID to use, or 'latest' to use latest " + parser.add_option("-v", "--spark-version", default="0.7.3",
"available AMI (default: latest)") help="Version of Spark to use: 'X.Y.Z' or a specific git hash")
parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port", parser.add_option("--spark-git-repo",
default="https://github.com/mesos/spark",
help="Github repo from which to checkout supplied commit hash")
parser.add_option("--hadoop-major-version", default="1",
help="Major version of Hadoop (default: 1)")
parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port",
help="Use SSH dynamic port forwarding to create a SOCKS proxy at " + help="Use SSH dynamic port forwarding to create a SOCKS proxy at " +
"the given local address (for use with login)") "the given local address (for use with login)")
parser.add_option("--resume", action="store_true", default=False, parser.add_option("--resume", action="store_true", default=False,
@ -84,17 +88,11 @@ def parse_args():
parser.add_option("--spot-price", metavar="PRICE", type="float", parser.add_option("--spot-price", metavar="PRICE", type="float",
help="If specified, launch slaves as spot instances with the given " + help="If specified, launch slaves as spot instances with the given " +
"maximum price (in dollars)") "maximum price (in dollars)")
parser.add_option("--cluster-type", type="choice", metavar="TYPE",
choices=["mesos", "standalone"], default="standalone",
help="'mesos' for a Mesos cluster, 'standalone' for a standalone " +
"Spark cluster (default: standalone)")
parser.add_option("--ganglia", action="store_true", default=True, parser.add_option("--ganglia", action="store_true", default=True,
help="Setup Ganglia monitoring on cluster (default: on). NOTE: " + help="Setup Ganglia monitoring on cluster (default: on). NOTE: " +
"the Ganglia page will be publicly accessible") "the Ganglia page will be publicly accessible")
parser.add_option("--no-ganglia", action="store_false", dest="ganglia", parser.add_option("--no-ganglia", action="store_false", dest="ganglia",
help="Disable Ganglia monitoring for the cluster") help="Disable Ganglia monitoring for the cluster")
parser.add_option("--old-scripts", action="store_true", default=False,
help="Use old mesos-ec2 scripts, for Spark <= 0.6 AMIs")
parser.add_option("-u", "--user", default="root", parser.add_option("-u", "--user", default="root",
help="The SSH user you want to connect as (default: root)") help="The SSH user you want to connect as (default: root)")
parser.add_option("--delete-groups", action="store_true", default=False, parser.add_option("--delete-groups", action="store_true", default=False,
@ -109,10 +107,7 @@ def parse_args():
print >> stderr, ("ERROR: The -i or --identity-file argument is " + print >> stderr, ("ERROR: The -i or --identity-file argument is " +
"required for " + action) "required for " + action)
sys.exit(1) sys.exit(1)
if opts.cluster_type not in ["mesos", "standalone"] and action == "launch":
print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type)
sys.exit(1)
# Boto config check # Boto config check
# http://boto.cloudhackers.com/en/latest/boto_config_tut.html # http://boto.cloudhackers.com/en/latest/boto_config_tut.html
home_dir = os.getenv('HOME') home_dir = os.getenv('HOME')
@ -158,67 +153,96 @@ def wait_for_instances(conn, instances):
def is_active(instance): def is_active(instance):
return (instance.state in ['pending', 'running', 'stopping', 'stopped']) return (instance.state in ['pending', 'running', 'stopping', 'stopped'])
# Return correct versions of Spark and Shark, given the supplied Spark version
def get_spark_shark_version(opts):
spark_shark_map = {"0.7.3": "0.7.0"}
version = opts.spark_version.replace("v", "")
if version not in spark_shark_map:
print >> stderr, "Don't know about Spark version: %s" % version
sys.exit(1)
return (version, spark_shark_map[version])
# Attempt to resolve an appropriate AMI given the architecture and
# region of the request.
def get_spark_ami(opts):
instance_types = {
"m1.small": "pvm",
"m1.medium": "pvm",
"m1.large": "pvm",
"m1.xlarge": "pvm",
"t1.micro": "pvm",
"c1.medium": "pvm",
"c1.xlarge": "pvm",
"m2.xlarge": "pvm",
"m2.2xlarge": "pvm",
"m2.4xlarge": "pvm",
"cc1.4xlarge": "hvm",
"cc2.8xlarge": "hvm",
"cg1.4xlarge": "hvm",
"hs1.8xlarge": "hvm",
"hi1.4xlarge": "hvm",
"m3.xlarge": "hvm",
"m3.2xlarge": "hvm",
"cr1.8xlarge": "hvm"
}
if opts.instance_type in instance_types:
instance_type = instance_types[opts.instance_type]
else:
instance_type = "pvm"
print >> stderr,\
"Don't recognize %s, assuming type is pvm" % opts.instance_type
ami_path = "%s/%s/%s" % (AMI_PREFIX, opts.region, instance_type)
try:
ami = urllib2.urlopen(ami_path).read().strip()
print "Spark AMI: " + ami
except:
print >> stderr, "Could not resolve AMI at: " + ami_path
sys.exit(1)
return ami
# Launch a cluster of the given name, by setting up its security groups, # Launch a cluster of the given name, by setting up its security groups,
# and then starting new instances in them. # and then starting new instances in them.
# Returns a tuple of EC2 reservation objects for the master, slave # Returns a tuple of EC2 reservation objects for the master and slaves
# and zookeeper instances (in that order).
# Fails if there already instances running in the cluster's groups. # Fails if there already instances running in the cluster's groups.
def launch_cluster(conn, opts, cluster_name): def launch_cluster(conn, opts, cluster_name):
print "Setting up security groups..." print "Setting up security groups..."
master_group = get_or_make_group(conn, cluster_name + "-master") master_group = get_or_make_group(conn, cluster_name + "-master")
slave_group = get_or_make_group(conn, cluster_name + "-slaves") slave_group = get_or_make_group(conn, cluster_name + "-slaves")
zoo_group = get_or_make_group(conn, cluster_name + "-zoo")
if master_group.rules == []: # Group was just now created if master_group.rules == []: # Group was just now created
master_group.authorize(src_group=master_group) master_group.authorize(src_group=master_group)
master_group.authorize(src_group=slave_group) master_group.authorize(src_group=slave_group)
master_group.authorize(src_group=zoo_group)
master_group.authorize('tcp', 22, 22, '0.0.0.0/0') master_group.authorize('tcp', 22, 22, '0.0.0.0/0')
master_group.authorize('tcp', 8080, 8081, '0.0.0.0/0') master_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
master_group.authorize('tcp', 33000, 33000, '0.0.0.0/0')
master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0') master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0') master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0') master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0')
master_group.authorize('tcp', 3030, 3035, '0.0.0.0/0') master_group.authorize('tcp', 3030, 3035, '0.0.0.0/0')
if opts.cluster_type == "mesos":
master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0')
if opts.ganglia: if opts.ganglia:
master_group.authorize('tcp', 5080, 5080, '0.0.0.0/0') master_group.authorize('tcp', 5080, 5080, '0.0.0.0/0')
if slave_group.rules == []: # Group was just now created if slave_group.rules == []: # Group was just now created
slave_group.authorize(src_group=master_group) slave_group.authorize(src_group=master_group)
slave_group.authorize(src_group=slave_group) slave_group.authorize(src_group=slave_group)
slave_group.authorize(src_group=zoo_group)
slave_group.authorize('tcp', 22, 22, '0.0.0.0/0') slave_group.authorize('tcp', 22, 22, '0.0.0.0/0')
slave_group.authorize('tcp', 8080, 8081, '0.0.0.0/0') slave_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0') slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0')
slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0') slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0')
slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0') slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0')
slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0') slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')
if zoo_group.rules == []: # Group was just now created
zoo_group.authorize(src_group=master_group)
zoo_group.authorize(src_group=slave_group)
zoo_group.authorize(src_group=zoo_group)
zoo_group.authorize('tcp', 22, 22, '0.0.0.0/0')
zoo_group.authorize('tcp', 2181, 2181, '0.0.0.0/0')
zoo_group.authorize('tcp', 2888, 2888, '0.0.0.0/0')
zoo_group.authorize('tcp', 3888, 3888, '0.0.0.0/0')
# Check if instances are already running in our groups # Check if instances are already running in our groups
active_nodes = get_existing_cluster(conn, opts, cluster_name, active_nodes = get_existing_cluster(conn, opts, cluster_name,
die_on_error=False) die_on_error=False)
if any(active_nodes): if any(active_nodes):
print >> stderr, ("ERROR: There are already instances running in " + print >> stderr, ("ERROR: There are already instances running in " +
"group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name)) "group %s or %s" % (master_group.name, slave_group.name))
sys.exit(1) sys.exit(1)
# Figure out the latest AMI from our static URL # Figure out Spark AMI
if opts.ami == "latest": if opts.ami is None:
try: opts.ami = get_spark_ami(opts)
opts.ami = urllib2.urlopen(LATEST_AMI_URL).read().strip()
print "Latest Spark AMI: " + opts.ami
except:
print >> stderr, "Could not read " + LATEST_AMI_URL
sys.exit(1)
print "Launching instances..." print "Launching instances..."
try: try:
@ -285,9 +309,9 @@ def launch_cluster(conn, opts, cluster_name):
print "Canceling spot instance requests" print "Canceling spot instance requests"
conn.cancel_spot_instance_requests(my_req_ids) conn.cancel_spot_instance_requests(my_req_ids)
# Log a warning if any of these requests actually launched instances: # Log a warning if any of these requests actually launched instances:
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( (master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False) conn, opts, cluster_name, die_on_error=False)
running = len(master_nodes) + len(slave_nodes) + len(zoo_nodes) running = len(master_nodes) + len(slave_nodes)
if running: if running:
print >> stderr, ("WARNING: %d instances are still running" % running) print >> stderr, ("WARNING: %d instances are still running" % running)
sys.exit(0) sys.exit(0)
@ -328,21 +352,17 @@ def launch_cluster(conn, opts, cluster_name):
master_nodes = master_res.instances master_nodes = master_res.instances
print "Launched master in %s, regid = %s" % (zone, master_res.id) print "Launched master in %s, regid = %s" % (zone, master_res.id)
zoo_nodes = []
# Return all the instances # Return all the instances
return (master_nodes, slave_nodes, zoo_nodes) return (master_nodes, slave_nodes)
# Get the EC2 instances in an existing cluster if available. # Get the EC2 instances in an existing cluster if available.
# Returns a tuple of lists of EC2 instance objects for the masters, # Returns a tuple of lists of EC2 instance objects for the masters and slaves
# slaves and zookeeper nodes (in that order).
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
print "Searching for existing cluster " + cluster_name + "..." print "Searching for existing cluster " + cluster_name + "..."
reservations = conn.get_all_instances() reservations = conn.get_all_instances()
master_nodes = [] master_nodes = []
slave_nodes = [] slave_nodes = []
zoo_nodes = []
for res in reservations: for res in reservations:
active = [i for i in res.instances if is_active(i)] active = [i for i in res.instances if is_active(i)]
if len(active) > 0: if len(active) > 0:
@ -351,13 +371,11 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
master_nodes += res.instances master_nodes += res.instances
elif group_names == [cluster_name + "-slaves"]: elif group_names == [cluster_name + "-slaves"]:
slave_nodes += res.instances slave_nodes += res.instances
elif group_names == [cluster_name + "-zoo"]: if any((master_nodes, slave_nodes)):
zoo_nodes += res.instances print ("Found %d master(s), %d slaves" %
if any((master_nodes, slave_nodes, zoo_nodes)): (len(master_nodes), len(slave_nodes)))
print ("Found %d master(s), %d slaves, %d ZooKeeper nodes" %
(len(master_nodes), len(slave_nodes), len(zoo_nodes)))
if (master_nodes != [] and slave_nodes != []) or not die_on_error: if (master_nodes != [] and slave_nodes != []) or not die_on_error:
return (master_nodes, slave_nodes, zoo_nodes) return (master_nodes, slave_nodes)
else: else:
if master_nodes == [] and slave_nodes != []: if master_nodes == [] and slave_nodes != []:
print "ERROR: Could not find master in group " + cluster_name + "-master" print "ERROR: Could not find master in group " + cluster_name + "-master"
@ -370,7 +388,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
# Deploy configuration files and run setup scripts on a newly launched # Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster. # or started EC2 cluster.
def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_key): def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
master = master_nodes[0].public_dns_name master = master_nodes[0].public_dns_name
if deploy_ssh_key: if deploy_ssh_key:
print "Copying SSH key %s to master..." % opts.identity_file print "Copying SSH key %s to master..." % opts.identity_file
@ -378,38 +396,26 @@ def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_k
scp(master, opts, opts.identity_file, '~/.ssh/id_rsa') scp(master, opts, opts.identity_file, '~/.ssh/id_rsa')
ssh(master, opts, 'chmod 600 ~/.ssh/id_rsa') ssh(master, opts, 'chmod 600 ~/.ssh/id_rsa')
if opts.cluster_type == "mesos": modules = ['spark', 'shark', 'ephemeral-hdfs', 'persistent-hdfs',
modules = ['ephemeral-hdfs', 'persistent-hdfs', 'mesos'] 'mapreduce', 'spark-standalone']
elif opts.cluster_type == "standalone":
modules = ['ephemeral-hdfs', 'persistent-hdfs', 'spark-standalone'] if opts.hadoop_major_version == "1":
modules = filter(lambda x: x != "mapreduce", modules)
if opts.ganglia: if opts.ganglia:
modules.append('ganglia') modules.append('ganglia')
if not opts.old_scripts: # NOTE: We should clone the repository before running deploy_files to
# NOTE: We should clone the repository before running deploy_files to # prevent ec2-variables.sh from being overwritten
# prevent ec2-variables.sh from being overwritten ssh(master, opts, "rm -rf spark-ec2 && git clone https://github.com/mesos/spark-ec2.git -b v2")
ssh(master, opts, "rm -rf spark-ec2 && git clone https://github.com/mesos/spark-ec2.git")
print "Deploying files to master..." print "Deploying files to master..."
deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes, deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes, modules)
zoo_nodes, modules)
print "Running setup on master..." print "Running setup on master..."
if opts.old_scripts: setup_spark_cluster(master, opts)
if opts.cluster_type == "mesos":
setup_mesos_cluster(master, opts)
elif opts.cluster_type == "standalone":
setup_standalone_cluster(master, slave_nodes, opts)
else:
setup_spark_cluster(master, opts)
print "Done!" print "Done!"
def setup_mesos_cluster(master, opts):
ssh(master, opts, "chmod u+x mesos-ec2/setup")
ssh(master, opts, "mesos-ec2/setup %s %s %s %s" %
("generic", "none", "master", opts.swap))
def setup_standalone_cluster(master, slave_nodes, opts): def setup_standalone_cluster(master, slave_nodes, opts):
slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes]) slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes])
ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips)) ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips))
@ -418,23 +424,18 @@ def setup_standalone_cluster(master, slave_nodes, opts):
def setup_spark_cluster(master, opts): def setup_spark_cluster(master, opts):
ssh(master, opts, "chmod u+x spark-ec2/setup.sh") ssh(master, opts, "chmod u+x spark-ec2/setup.sh")
ssh(master, opts, "spark-ec2/setup.sh") ssh(master, opts, "spark-ec2/setup.sh")
if opts.cluster_type == "mesos": print "Spark standalone cluster started at http://%s:8080" % master
print "Mesos cluster started at http://%s:8080" % master
elif opts.cluster_type == "standalone":
print "Spark standalone cluster started at http://%s:8080" % master
if opts.ganglia: if opts.ganglia:
print "Ganglia started at http://%s:5080/ganglia" % master print "Ganglia started at http://%s:5080/ganglia" % master
# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up # Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes, zoo_nodes): def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes):
print "Waiting for instances to start up..." print "Waiting for instances to start up..."
time.sleep(5) time.sleep(5)
wait_for_instances(conn, master_nodes) wait_for_instances(conn, master_nodes)
wait_for_instances(conn, slave_nodes) wait_for_instances(conn, slave_nodes)
if zoo_nodes != []:
wait_for_instances(conn, zoo_nodes)
print "Waiting %d more seconds..." % wait_secs print "Waiting %d more seconds..." % wait_secs
time.sleep(wait_secs) time.sleep(wait_secs)
@ -455,7 +456,12 @@ def get_num_disks(instance_type):
"m2.4xlarge": 2, "m2.4xlarge": 2,
"cc1.4xlarge": 2, "cc1.4xlarge": 2,
"cc2.8xlarge": 4, "cc2.8xlarge": 4,
"cg1.4xlarge": 2 "cg1.4xlarge": 2,
"hs1.8xlarge": 24,
"cr1.8xlarge": 2,
"hi1.4xlarge": 2,
"m3.xlarge": 0,
"m3.2xlarge": 0
} }
if instance_type in disks_by_instance: if instance_type in disks_by_instance:
return disks_by_instance[instance_type] return disks_by_instance[instance_type]
@ -470,8 +476,7 @@ def get_num_disks(instance_type):
# cluster (e.g. lists of masters and slaves). Files are only deployed to # cluster (e.g. lists of masters and slaves). Files are only deployed to
# the first master instance in the cluster, and we expect the setup # the first master instance in the cluster, and we expect the setup
# script to be run on that instance to copy them to other nodes. # script to be run on that instance to copy them to other nodes.
def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes, def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
modules):
active_master = master_nodes[0].public_dns_name active_master = master_nodes[0].public_dns_name
num_disks = get_num_disks(opts.instance_type) num_disks = get_num_disks(opts.instance_type)
@ -484,28 +489,30 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes,
mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i
spark_local_dirs += ",/mnt%d/spark" % i spark_local_dirs += ",/mnt%d/spark" % i
if zoo_nodes != []: cluster_url = "%s:7077" % active_master
zoo_list = '\n'.join([i.public_dns_name for i in zoo_nodes])
cluster_url = "zoo://" + ",".join( if "." in opts.spark_version:
["%s:2181/mesos" % i.public_dns_name for i in zoo_nodes]) # Pre-built spark & shark deploy
elif opts.cluster_type == "mesos": (spark_v, shark_v) = get_spark_shark_version(opts)
zoo_list = "NONE" else:
cluster_url = "%s:5050" % active_master # Spark-only custom deploy
elif opts.cluster_type == "standalone": spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version)
zoo_list = "NONE" shark_v = ""
cluster_url = "%s:7077" % active_master modules = filter(lambda x: x != "shark", modules)
template_vars = { template_vars = {
"master_list": '\n'.join([i.public_dns_name for i in master_nodes]), "master_list": '\n'.join([i.public_dns_name for i in master_nodes]),
"active_master": active_master, "active_master": active_master,
"slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]), "slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]),
"zoo_list": zoo_list,
"cluster_url": cluster_url, "cluster_url": cluster_url,
"hdfs_data_dirs": hdfs_data_dirs, "hdfs_data_dirs": hdfs_data_dirs,
"mapred_local_dirs": mapred_local_dirs, "mapred_local_dirs": mapred_local_dirs,
"spark_local_dirs": spark_local_dirs, "spark_local_dirs": spark_local_dirs,
"swap": str(opts.swap), "swap": str(opts.swap),
"modules": '\n'.join(modules) "modules": '\n'.join(modules),
"spark_version": spark_v,
"shark_version": shark_v,
"hadoop_major_version": opts.hadoop_major_version
} }
# Create a temp directory in which we will place all the files to be # Create a temp directory in which we will place all the files to be
@ -555,7 +562,7 @@ def ssh(host, opts, command):
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
if (tries > 2): if (tries > 2):
raise e raise e
print "Error connecting to host {0}, sleeping 30".format(e) print "Couldn't connect to host {0}, waiting 30 seconds".format(e)
time.sleep(30) time.sleep(30)
tries = tries + 1 tries = tries + 1
@ -594,20 +601,20 @@ def main():
if action == "launch": if action == "launch":
if opts.resume: if opts.resume:
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( (master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name) conn, opts, cluster_name)
else: else:
(master_nodes, slave_nodes, zoo_nodes) = launch_cluster( (master_nodes, slave_nodes) = launch_cluster(
conn, opts, cluster_name) conn, opts, cluster_name)
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes, zoo_nodes) wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, True) setup_cluster(conn, master_nodes, slave_nodes, opts, True)
elif action == "destroy": elif action == "destroy":
response = raw_input("Are you sure you want to destroy the cluster " + response = raw_input("Are you sure you want to destroy the cluster " +
cluster_name + "?\nALL DATA ON ALL NODES WILL BE LOST!!\n" + cluster_name + "?\nALL DATA ON ALL NODES WILL BE LOST!!\n" +
"Destroy cluster " + cluster_name + " (y/N): ") "Destroy cluster " + cluster_name + " (y/N): ")
if response == "y": if response == "y":
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( (master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False) conn, opts, cluster_name, die_on_error=False)
print "Terminating master..." print "Terminating master..."
for inst in master_nodes: for inst in master_nodes:
@ -615,16 +622,12 @@ def main():
print "Terminating slaves..." print "Terminating slaves..."
for inst in slave_nodes: for inst in slave_nodes:
inst.terminate() inst.terminate()
if zoo_nodes != []:
print "Terminating zoo..."
for inst in zoo_nodes:
inst.terminate()
# Delete security groups as well # Delete security groups as well
if opts.delete_groups: if opts.delete_groups:
print "Deleting security groups (this will take some time)..." print "Deleting security groups (this will take some time)..."
group_names = [cluster_name + "-master", cluster_name + "-slaves", cluster_name + "-zoo"] group_names = [cluster_name + "-master", cluster_name + "-slaves"]
attempt = 1; attempt = 1;
while attempt <= 3: while attempt <= 3:
print "Attempt %d" % attempt print "Attempt %d" % attempt
@ -663,7 +666,7 @@ def main():
print "Try re-running in a few minutes." print "Try re-running in a few minutes."
elif action == "login": elif action == "login":
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( (master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name) conn, opts, cluster_name)
master = master_nodes[0].public_dns_name master = master_nodes[0].public_dns_name
print "Logging into master " + master + "..." print "Logging into master " + master + "..."
@ -674,7 +677,7 @@ def main():
(opts.identity_file, proxy_opt, opts.user, master), shell=True) (opts.identity_file, proxy_opt, opts.user, master), shell=True)
elif action == "get-master": elif action == "get-master":
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(conn, opts, cluster_name) (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
print master_nodes[0].public_dns_name print master_nodes[0].public_dns_name
elif action == "stop": elif action == "stop":
@ -684,7 +687,7 @@ def main():
"AMAZON EBS IF IT IS EBS-BACKED!!\n" + "AMAZON EBS IF IT IS EBS-BACKED!!\n" +
"Stop cluster " + cluster_name + " (y/N): ") "Stop cluster " + cluster_name + " (y/N): ")
if response == "y": if response == "y":
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( (master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False) conn, opts, cluster_name, die_on_error=False)
print "Stopping master..." print "Stopping master..."
for inst in master_nodes: for inst in master_nodes:
@ -694,15 +697,9 @@ def main():
for inst in slave_nodes: for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]: if inst.state not in ["shutting-down", "terminated"]:
inst.stop() inst.stop()
if zoo_nodes != []:
print "Stopping zoo..."
for inst in zoo_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.stop()
elif action == "start": elif action == "start":
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
conn, opts, cluster_name)
print "Starting slaves..." print "Starting slaves..."
for inst in slave_nodes: for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]: if inst.state not in ["shutting-down", "terminated"]:
@ -711,13 +708,8 @@ def main():
for inst in master_nodes: for inst in master_nodes:
if inst.state not in ["shutting-down", "terminated"]: if inst.state not in ["shutting-down", "terminated"]:
inst.start() inst.start()
if zoo_nodes != []: wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
print "Starting zoo..." setup_cluster(conn, master_nodes, slave_nodes, opts, False)
for inst in zoo_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes, zoo_nodes)
setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, False)
else: else:
print >> stderr, "Invalid action: %s" % action print >> stderr, "Invalid action: %s" % action