2012-06-11 02:06:15 -04:00
|
|
|
#!/usr/bin/env python
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
2013-07-16 20:21:33 -04:00
|
|
|
#
|
2012-06-11 02:06:15 -04:00
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
# or more contributor license agreements. See the NOTICE file
|
|
|
|
# distributed with this work for additional information
|
|
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
|
|
# to you under the Apache License, Version 2.0 (the
|
|
|
|
# "License"); you may not use this file except in compliance
|
|
|
|
# with the License. You may obtain a copy of the License at
|
2013-07-29 20:19:33 -04:00
|
|
|
#
|
2012-06-11 02:06:15 -04:00
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
2013-07-29 20:19:33 -04:00
|
|
|
#
|
2012-06-11 02:06:15 -04:00
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
2013-07-16 20:21:33 -04:00
|
|
|
#
|
2012-06-11 02:06:15 -04:00
|
|
|
|
2015-05-26 12:02:25 -04:00
|
|
|
from __future__ import division, print_function, with_statement
|
2012-06-11 02:06:15 -04:00
|
|
|
|
2015-05-26 12:02:25 -04:00
|
|
|
import codecs
|
2014-12-19 20:02:37 -05:00
|
|
|
import hashlib
|
[SPARK-6193] [EC2] Push group filter up to EC2
When looking for a cluster, spark-ec2 currently pulls down [info for all instances](https://github.com/apache/spark/blob/eb48fd6e9d55fb034c00e61374bb9c2a86a82fb8/ec2/spark_ec2.py#L620) and filters locally. When working on an AWS account with hundreds of active instances, this step alone can take over 10 seconds.
This PR improves how spark-ec2 searches for clusters by pushing the filter up to EC2.
Basically, the problem (and solution) look like this:
```python
>>> timeit.timeit('blah = conn.get_all_reservations()', setup='from __main__ import conn', number=10)
116.96390509605408
>>> timeit.timeit('blah = conn.get_all_reservations(filters={"instance.group-name": ["my-cluster-master"]})', setup='from __main__ import conn', number=10)
4.629754066467285
```
Translated to a user-visible action, this looks like (against an AWS account with ~200 active instances):
```shell
# master
$ python -m timeit -n 3 --setup 'import subprocess' 'subprocess.call("./spark-ec2 get-master my-cluster --region us-west-2", shell=True)'
...
3 loops, best of 3: 9.83 sec per loop
# this PR
$ python -m timeit -n 3 --setup 'import subprocess' 'subprocess.call("./spark-ec2 get-master my-cluster --region us-west-2", shell=True)'
...
3 loops, best of 3: 1.47 sec per loop
```
This PR also refactors `get_existing_cluster()` to make it, I hope, simpler.
Finally, this PR fixes some minor grammar issues related to printing status to the user. :tophat: :clap:
Author: Nicholas Chammas <nicholas.chammas@gmail.com>
Closes #4922 from nchammas/get-existing-cluster-faster and squashes the following commits:
18802f1 [Nicholas Chammas] ignore shutting-down
f2a5b9f [Nicholas Chammas] fix grammar
d96a489 [Nicholas Chammas] push group filter up to EC2
2015-03-08 10:01:26 -04:00
|
|
|
import itertools
|
2012-06-11 02:06:15 -04:00
|
|
|
import logging
|
|
|
|
import os
|
2015-02-08 05:08:51 -05:00
|
|
|
import os.path
|
2013-05-17 20:10:47 -04:00
|
|
|
import pipes
|
2012-06-11 02:06:15 -04:00
|
|
|
import random
|
|
|
|
import shutil
|
2014-09-02 01:14:28 -04:00
|
|
|
import string
|
2015-02-08 05:08:51 -05:00
|
|
|
from stat import S_IRUSR
|
2012-06-11 02:06:15 -04:00
|
|
|
import subprocess
|
|
|
|
import sys
|
2014-12-19 20:02:37 -05:00
|
|
|
import tarfile
|
2012-06-11 02:06:15 -04:00
|
|
|
import tempfile
|
2015-02-09 04:44:53 -05:00
|
|
|
import textwrap
|
2012-06-11 02:06:15 -04:00
|
|
|
import time
|
2014-10-07 19:54:32 -04:00
|
|
|
import warnings
|
2014-11-29 03:31:06 -05:00
|
|
|
from datetime import datetime
|
2012-06-11 02:06:15 -04:00
|
|
|
from optparse import OptionParser
|
|
|
|
from sys import stderr
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
if sys.version < "3":
|
|
|
|
from urllib2 import urlopen, Request, HTTPError
|
|
|
|
else:
|
|
|
|
from urllib.request import urlopen, Request
|
|
|
|
from urllib.error import HTTPError
|
2015-05-26 12:02:25 -04:00
|
|
|
raw_input = input
|
|
|
|
xrange = range
|
2015-04-16 19:20:57 -04:00
|
|
|
|
2015-05-19 00:38:37 -04:00
|
|
|
SPARK_EC2_VERSION = "1.4.0"
|
2015-02-06 15:08:22 -05:00
|
|
|
SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
|
2015-01-08 20:42:08 -05:00
|
|
|
VALID_SPARK_VERSIONS = set([
|
|
|
|
"0.7.3",
|
|
|
|
"0.8.0",
|
|
|
|
"0.8.1",
|
|
|
|
"0.9.0",
|
|
|
|
"0.9.1",
|
|
|
|
"0.9.2",
|
|
|
|
"1.0.0",
|
|
|
|
"1.0.1",
|
|
|
|
"1.0.2",
|
|
|
|
"1.1.0",
|
|
|
|
"1.1.1",
|
|
|
|
"1.2.0",
|
2015-02-12 17:38:42 -05:00
|
|
|
"1.2.1",
|
2015-05-17 03:12:20 -04:00
|
|
|
"1.3.0",
|
|
|
|
"1.3.1",
|
2015-06-12 13:28:30 -04:00
|
|
|
"1.4.0",
|
2015-01-08 20:42:08 -05:00
|
|
|
])
|
|
|
|
|
2015-03-10 07:02:12 -04:00
|
|
|
SPARK_TACHYON_MAP = {
|
|
|
|
"1.0.0": "0.4.1",
|
|
|
|
"1.0.1": "0.4.1",
|
|
|
|
"1.0.2": "0.4.1",
|
|
|
|
"1.1.0": "0.5.0",
|
|
|
|
"1.1.1": "0.5.0",
|
|
|
|
"1.2.0": "0.5.0",
|
|
|
|
"1.2.1": "0.5.0",
|
2015-05-17 03:12:20 -04:00
|
|
|
"1.3.0": "0.5.0",
|
|
|
|
"1.3.1": "0.5.0",
|
2015-06-12 13:28:30 -04:00
|
|
|
"1.4.0": "0.6.4",
|
2015-03-10 07:02:12 -04:00
|
|
|
}
|
|
|
|
|
2015-02-06 15:08:22 -05:00
|
|
|
DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION
|
2015-01-08 20:42:08 -05:00
|
|
|
DEFAULT_SPARK_GITHUB_REPO = "https://github.com/apache/spark"
|
|
|
|
|
2015-02-09 18:47:07 -05:00
|
|
|
# Default location to get the spark-ec2 scripts (and ami-list) from
|
|
|
|
DEFAULT_SPARK_EC2_GITHUB_REPO = "https://github.com/mesos/spark-ec2"
|
2015-06-11 16:22:08 -04:00
|
|
|
DEFAULT_SPARK_EC2_BRANCH = "branch-1.4"
|
2012-06-11 02:06:15 -04:00
|
|
|
|
2014-09-06 02:08:54 -04:00
|
|
|
|
[SPARK-6191] [EC2] Generalize ability to download libs
Right now we have a method to specifically download boto. This PR generalizes it so it's easy to download additional libraries if we want.
For example, adding new external libraries for spark-ec2 is now as simple as:
```python
external_libs = [
{
"name": "boto",
"version": "2.34.0",
"md5": "5556223d2d0cc4d06dd4829e671dcecd"
},
{
"name": "PyYAML",
"version": "3.11",
"md5": "f50e08ef0fe55178479d3a618efe21db"
},
{
"name": "argparse",
"version": "1.3.0",
"md5": "9bcf7f612190885c8c85e30ba41db3c7"
}
]
```
Likely use cases:
* Downloading PyYAML to allow spark-ec2 configs to be persisted as a YAML file. ([SPARK-925](https://issues.apache.org/jira/browse/SPARK-925))
* Downloading argparse to clean up / modernize our option parsing.
First run output, with PyYAML and argparse added just for demonstration purposes:
```shell
$ ./spark-ec2 --version
Downloading external libraries that spark-ec2 needs from PyPI to /path/to/spark/ec2/lib...
This should be a one-time operation.
- Downloading boto...
- Finished downloading boto.
- Downloading PyYAML...
- Finished downloading PyYAML.
- Downloading argparse...
- Finished downloading argparse.
spark-ec2 1.2.1
```
Output thereafter:
```shell
$ ./spark-ec2 --version
spark-ec2 1.2.1
```
Author: Nicholas Chammas <nicholas.chammas@gmail.com>
Closes #4919 from nchammas/setup-ec2-libs and squashes the following commits:
a077955 [Nicholas Chammas] print default region
c95fb7d [Nicholas Chammas] to docstring
5448845 [Nicholas Chammas] remove libs added for demo purposes
60d8c23 [Nicholas Chammas] generalize ability to download libs
2015-03-10 06:58:31 -04:00
|
|
|
def setup_external_libs(libs):
|
|
|
|
"""
|
|
|
|
Download external libraries from PyPI to SPARK_EC2_DIR/lib/ and prepend them to our PATH.
|
|
|
|
"""
|
|
|
|
PYPI_URL_PREFIX = "https://pypi.python.org/packages/source"
|
|
|
|
SPARK_EC2_LIB_DIR = os.path.join(SPARK_EC2_DIR, "lib")
|
|
|
|
|
|
|
|
if not os.path.exists(SPARK_EC2_LIB_DIR):
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Downloading external libraries that spark-ec2 needs from PyPI to {path}...".format(
|
[SPARK-6191] [EC2] Generalize ability to download libs
Right now we have a method to specifically download boto. This PR generalizes it so it's easy to download additional libraries if we want.
For example, adding new external libraries for spark-ec2 is now as simple as:
```python
external_libs = [
{
"name": "boto",
"version": "2.34.0",
"md5": "5556223d2d0cc4d06dd4829e671dcecd"
},
{
"name": "PyYAML",
"version": "3.11",
"md5": "f50e08ef0fe55178479d3a618efe21db"
},
{
"name": "argparse",
"version": "1.3.0",
"md5": "9bcf7f612190885c8c85e30ba41db3c7"
}
]
```
Likely use cases:
* Downloading PyYAML to allow spark-ec2 configs to be persisted as a YAML file. ([SPARK-925](https://issues.apache.org/jira/browse/SPARK-925))
* Downloading argparse to clean up / modernize our option parsing.
First run output, with PyYAML and argparse added just for demonstration purposes:
```shell
$ ./spark-ec2 --version
Downloading external libraries that spark-ec2 needs from PyPI to /path/to/spark/ec2/lib...
This should be a one-time operation.
- Downloading boto...
- Finished downloading boto.
- Downloading PyYAML...
- Finished downloading PyYAML.
- Downloading argparse...
- Finished downloading argparse.
spark-ec2 1.2.1
```
Output thereafter:
```shell
$ ./spark-ec2 --version
spark-ec2 1.2.1
```
Author: Nicholas Chammas <nicholas.chammas@gmail.com>
Closes #4919 from nchammas/setup-ec2-libs and squashes the following commits:
a077955 [Nicholas Chammas] print default region
c95fb7d [Nicholas Chammas] to docstring
5448845 [Nicholas Chammas] remove libs added for demo purposes
60d8c23 [Nicholas Chammas] generalize ability to download libs
2015-03-10 06:58:31 -04:00
|
|
|
path=SPARK_EC2_LIB_DIR
|
2015-04-16 19:20:57 -04:00
|
|
|
))
|
|
|
|
print("This should be a one-time operation.")
|
[SPARK-6191] [EC2] Generalize ability to download libs
Right now we have a method to specifically download boto. This PR generalizes it so it's easy to download additional libraries if we want.
For example, adding new external libraries for spark-ec2 is now as simple as:
```python
external_libs = [
{
"name": "boto",
"version": "2.34.0",
"md5": "5556223d2d0cc4d06dd4829e671dcecd"
},
{
"name": "PyYAML",
"version": "3.11",
"md5": "f50e08ef0fe55178479d3a618efe21db"
},
{
"name": "argparse",
"version": "1.3.0",
"md5": "9bcf7f612190885c8c85e30ba41db3c7"
}
]
```
Likely use cases:
* Downloading PyYAML to allow spark-ec2 configs to be persisted as a YAML file. ([SPARK-925](https://issues.apache.org/jira/browse/SPARK-925))
* Downloading argparse to clean up / modernize our option parsing.
First run output, with PyYAML and argparse added just for demonstration purposes:
```shell
$ ./spark-ec2 --version
Downloading external libraries that spark-ec2 needs from PyPI to /path/to/spark/ec2/lib...
This should be a one-time operation.
- Downloading boto...
- Finished downloading boto.
- Downloading PyYAML...
- Finished downloading PyYAML.
- Downloading argparse...
- Finished downloading argparse.
spark-ec2 1.2.1
```
Output thereafter:
```shell
$ ./spark-ec2 --version
spark-ec2 1.2.1
```
Author: Nicholas Chammas <nicholas.chammas@gmail.com>
Closes #4919 from nchammas/setup-ec2-libs and squashes the following commits:
a077955 [Nicholas Chammas] print default region
c95fb7d [Nicholas Chammas] to docstring
5448845 [Nicholas Chammas] remove libs added for demo purposes
60d8c23 [Nicholas Chammas] generalize ability to download libs
2015-03-10 06:58:31 -04:00
|
|
|
os.mkdir(SPARK_EC2_LIB_DIR)
|
|
|
|
|
|
|
|
for lib in libs:
|
|
|
|
versioned_lib_name = "{n}-{v}".format(n=lib["name"], v=lib["version"])
|
|
|
|
lib_dir = os.path.join(SPARK_EC2_LIB_DIR, versioned_lib_name)
|
|
|
|
|
|
|
|
if not os.path.isdir(lib_dir):
|
|
|
|
tgz_file_path = os.path.join(SPARK_EC2_LIB_DIR, versioned_lib_name + ".tar.gz")
|
2015-04-16 19:20:57 -04:00
|
|
|
print(" - Downloading {lib}...".format(lib=lib["name"]))
|
|
|
|
download_stream = urlopen(
|
[SPARK-6191] [EC2] Generalize ability to download libs
Right now we have a method to specifically download boto. This PR generalizes it so it's easy to download additional libraries if we want.
For example, adding new external libraries for spark-ec2 is now as simple as:
```python
external_libs = [
{
"name": "boto",
"version": "2.34.0",
"md5": "5556223d2d0cc4d06dd4829e671dcecd"
},
{
"name": "PyYAML",
"version": "3.11",
"md5": "f50e08ef0fe55178479d3a618efe21db"
},
{
"name": "argparse",
"version": "1.3.0",
"md5": "9bcf7f612190885c8c85e30ba41db3c7"
}
]
```
Likely use cases:
* Downloading PyYAML to allow spark-ec2 configs to be persisted as a YAML file. ([SPARK-925](https://issues.apache.org/jira/browse/SPARK-925))
* Downloading argparse to clean up / modernize our option parsing.
First run output, with PyYAML and argparse added just for demonstration purposes:
```shell
$ ./spark-ec2 --version
Downloading external libraries that spark-ec2 needs from PyPI to /path/to/spark/ec2/lib...
This should be a one-time operation.
- Downloading boto...
- Finished downloading boto.
- Downloading PyYAML...
- Finished downloading PyYAML.
- Downloading argparse...
- Finished downloading argparse.
spark-ec2 1.2.1
```
Output thereafter:
```shell
$ ./spark-ec2 --version
spark-ec2 1.2.1
```
Author: Nicholas Chammas <nicholas.chammas@gmail.com>
Closes #4919 from nchammas/setup-ec2-libs and squashes the following commits:
a077955 [Nicholas Chammas] print default region
c95fb7d [Nicholas Chammas] to docstring
5448845 [Nicholas Chammas] remove libs added for demo purposes
60d8c23 [Nicholas Chammas] generalize ability to download libs
2015-03-10 06:58:31 -04:00
|
|
|
"{prefix}/{first_letter}/{lib_name}/{lib_name}-{lib_version}.tar.gz".format(
|
|
|
|
prefix=PYPI_URL_PREFIX,
|
|
|
|
first_letter=lib["name"][:1],
|
|
|
|
lib_name=lib["name"],
|
|
|
|
lib_version=lib["version"]
|
|
|
|
)
|
|
|
|
)
|
|
|
|
with open(tgz_file_path, "wb") as tgz_file:
|
|
|
|
tgz_file.write(download_stream.read())
|
|
|
|
with open(tgz_file_path) as tar:
|
|
|
|
if hashlib.md5(tar.read()).hexdigest() != lib["md5"]:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("ERROR: Got wrong md5sum for {lib}.".format(lib=lib["name"]), file=stderr)
|
[SPARK-6191] [EC2] Generalize ability to download libs
Right now we have a method to specifically download boto. This PR generalizes it so it's easy to download additional libraries if we want.
For example, adding new external libraries for spark-ec2 is now as simple as:
```python
external_libs = [
{
"name": "boto",
"version": "2.34.0",
"md5": "5556223d2d0cc4d06dd4829e671dcecd"
},
{
"name": "PyYAML",
"version": "3.11",
"md5": "f50e08ef0fe55178479d3a618efe21db"
},
{
"name": "argparse",
"version": "1.3.0",
"md5": "9bcf7f612190885c8c85e30ba41db3c7"
}
]
```
Likely use cases:
* Downloading PyYAML to allow spark-ec2 configs to be persisted as a YAML file. ([SPARK-925](https://issues.apache.org/jira/browse/SPARK-925))
* Downloading argparse to clean up / modernize our option parsing.
First run output, with PyYAML and argparse added just for demonstration purposes:
```shell
$ ./spark-ec2 --version
Downloading external libraries that spark-ec2 needs from PyPI to /path/to/spark/ec2/lib...
This should be a one-time operation.
- Downloading boto...
- Finished downloading boto.
- Downloading PyYAML...
- Finished downloading PyYAML.
- Downloading argparse...
- Finished downloading argparse.
spark-ec2 1.2.1
```
Output thereafter:
```shell
$ ./spark-ec2 --version
spark-ec2 1.2.1
```
Author: Nicholas Chammas <nicholas.chammas@gmail.com>
Closes #4919 from nchammas/setup-ec2-libs and squashes the following commits:
a077955 [Nicholas Chammas] print default region
c95fb7d [Nicholas Chammas] to docstring
5448845 [Nicholas Chammas] remove libs added for demo purposes
60d8c23 [Nicholas Chammas] generalize ability to download libs
2015-03-10 06:58:31 -04:00
|
|
|
sys.exit(1)
|
|
|
|
tar = tarfile.open(tgz_file_path)
|
|
|
|
tar.extractall(path=SPARK_EC2_LIB_DIR)
|
|
|
|
tar.close()
|
|
|
|
os.remove(tgz_file_path)
|
2015-04-16 19:20:57 -04:00
|
|
|
print(" - Finished downloading {lib}.".format(lib=lib["name"]))
|
[SPARK-6191] [EC2] Generalize ability to download libs
Right now we have a method to specifically download boto. This PR generalizes it so it's easy to download additional libraries if we want.
For example, adding new external libraries for spark-ec2 is now as simple as:
```python
external_libs = [
{
"name": "boto",
"version": "2.34.0",
"md5": "5556223d2d0cc4d06dd4829e671dcecd"
},
{
"name": "PyYAML",
"version": "3.11",
"md5": "f50e08ef0fe55178479d3a618efe21db"
},
{
"name": "argparse",
"version": "1.3.0",
"md5": "9bcf7f612190885c8c85e30ba41db3c7"
}
]
```
Likely use cases:
* Downloading PyYAML to allow spark-ec2 configs to be persisted as a YAML file. ([SPARK-925](https://issues.apache.org/jira/browse/SPARK-925))
* Downloading argparse to clean up / modernize our option parsing.
First run output, with PyYAML and argparse added just for demonstration purposes:
```shell
$ ./spark-ec2 --version
Downloading external libraries that spark-ec2 needs from PyPI to /path/to/spark/ec2/lib...
This should be a one-time operation.
- Downloading boto...
- Finished downloading boto.
- Downloading PyYAML...
- Finished downloading PyYAML.
- Downloading argparse...
- Finished downloading argparse.
spark-ec2 1.2.1
```
Output thereafter:
```shell
$ ./spark-ec2 --version
spark-ec2 1.2.1
```
Author: Nicholas Chammas <nicholas.chammas@gmail.com>
Closes #4919 from nchammas/setup-ec2-libs and squashes the following commits:
a077955 [Nicholas Chammas] print default region
c95fb7d [Nicholas Chammas] to docstring
5448845 [Nicholas Chammas] remove libs added for demo purposes
60d8c23 [Nicholas Chammas] generalize ability to download libs
2015-03-10 06:58:31 -04:00
|
|
|
sys.path.insert(1, lib_dir)
|
|
|
|
|
|
|
|
|
|
|
|
# Only PyPI libraries are supported.
|
|
|
|
external_libs = [
|
|
|
|
{
|
|
|
|
"name": "boto",
|
|
|
|
"version": "2.34.0",
|
|
|
|
"md5": "5556223d2d0cc4d06dd4829e671dcecd"
|
|
|
|
}
|
|
|
|
]
|
|
|
|
|
|
|
|
setup_external_libs(external_libs)
|
|
|
|
|
2014-12-19 20:02:37 -05:00
|
|
|
import boto
|
|
|
|
from boto.ec2.blockdevicemapping import BlockDeviceMapping, BlockDeviceType, EBSBlockDeviceType
|
|
|
|
from boto import ec2
|
|
|
|
|
|
|
|
|
2014-06-01 18:39:04 -04:00
|
|
|
class UsageError(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
2012-06-11 02:06:15 -04:00
|
|
|
# Configure and parse our command-line arguments
|
|
|
|
def parse_args():
|
2014-06-01 18:39:04 -04:00
|
|
|
parser = OptionParser(
|
2015-02-06 15:08:22 -05:00
|
|
|
prog="spark-ec2",
|
|
|
|
version="%prog {v}".format(v=SPARK_EC2_VERSION),
|
|
|
|
usage="%prog [options] <action> <cluster_name>\n\n"
|
|
|
|
+ "<action> can be: launch, destroy, login, stop, start, get-master, reboot-slaves")
|
2015-02-10 10:45:38 -05:00
|
|
|
|
2014-06-01 18:39:04 -04:00
|
|
|
parser.add_option(
|
|
|
|
"-s", "--slaves", type="int", default=1,
|
2014-09-06 17:39:29 -04:00
|
|
|
help="Number of slaves to launch (default: %default)")
|
2014-06-01 18:39:04 -04:00
|
|
|
parser.add_option(
|
2014-10-07 19:54:32 -04:00
|
|
|
"-w", "--wait", type="int",
|
|
|
|
help="DEPRECATED (no longer necessary) - Seconds to wait for nodes to start")
|
2014-06-01 18:39:04 -04:00
|
|
|
parser.add_option(
|
|
|
|
"-k", "--key-pair",
|
|
|
|
help="Key pair to use on instances")
|
|
|
|
parser.add_option(
|
|
|
|
"-i", "--identity-file",
|
|
|
|
help="SSH private key file to use for logging into instances")
|
|
|
|
parser.add_option(
|
|
|
|
"-t", "--instance-type", default="m1.large",
|
2014-09-06 17:39:29 -04:00
|
|
|
help="Type of instance to launch (default: %default). " +
|
2014-06-01 18:39:04 -04:00
|
|
|
"WARNING: must be 64-bit; small instances won't work")
|
|
|
|
parser.add_option(
|
|
|
|
"-m", "--master-instance-type", default="",
|
|
|
|
help="Master instance type (leave empty for same as instance-type)")
|
|
|
|
parser.add_option(
|
|
|
|
"-r", "--region", default="us-east-1",
|
[SPARK-6191] [EC2] Generalize ability to download libs
Right now we have a method to specifically download boto. This PR generalizes it so it's easy to download additional libraries if we want.
For example, adding new external libraries for spark-ec2 is now as simple as:
```python
external_libs = [
{
"name": "boto",
"version": "2.34.0",
"md5": "5556223d2d0cc4d06dd4829e671dcecd"
},
{
"name": "PyYAML",
"version": "3.11",
"md5": "f50e08ef0fe55178479d3a618efe21db"
},
{
"name": "argparse",
"version": "1.3.0",
"md5": "9bcf7f612190885c8c85e30ba41db3c7"
}
]
```
Likely use cases:
* Downloading PyYAML to allow spark-ec2 configs to be persisted as a YAML file. ([SPARK-925](https://issues.apache.org/jira/browse/SPARK-925))
* Downloading argparse to clean up / modernize our option parsing.
First run output, with PyYAML and argparse added just for demonstration purposes:
```shell
$ ./spark-ec2 --version
Downloading external libraries that spark-ec2 needs from PyPI to /path/to/spark/ec2/lib...
This should be a one-time operation.
- Downloading boto...
- Finished downloading boto.
- Downloading PyYAML...
- Finished downloading PyYAML.
- Downloading argparse...
- Finished downloading argparse.
spark-ec2 1.2.1
```
Output thereafter:
```shell
$ ./spark-ec2 --version
spark-ec2 1.2.1
```
Author: Nicholas Chammas <nicholas.chammas@gmail.com>
Closes #4919 from nchammas/setup-ec2-libs and squashes the following commits:
a077955 [Nicholas Chammas] print default region
c95fb7d [Nicholas Chammas] to docstring
5448845 [Nicholas Chammas] remove libs added for demo purposes
60d8c23 [Nicholas Chammas] generalize ability to download libs
2015-03-10 06:58:31 -04:00
|
|
|
help="EC2 region used to launch instances in, or to find them in (default: %default)")
|
2014-06-01 18:39:04 -04:00
|
|
|
parser.add_option(
|
|
|
|
"-z", "--zone", default="",
|
|
|
|
help="Availability zone to launch instances in, or 'all' to spread " +
|
|
|
|
"slaves across multiple (an additional $0.01/Gb for bandwidth" +
|
2014-11-28 17:43:38 -05:00
|
|
|
"between zones applies) (default: a single zone chosen at random)")
|
2015-02-10 10:45:38 -05:00
|
|
|
parser.add_option(
|
|
|
|
"-a", "--ami",
|
|
|
|
help="Amazon Machine Image ID to use")
|
2014-06-01 18:39:04 -04:00
|
|
|
parser.add_option(
|
2014-09-06 17:39:29 -04:00
|
|
|
"-v", "--spark-version", default=DEFAULT_SPARK_VERSION,
|
|
|
|
help="Version of Spark to use: 'X.Y.Z' or a specific git hash (default: %default)")
|
2014-06-01 18:39:04 -04:00
|
|
|
parser.add_option(
|
|
|
|
"--spark-git-repo",
|
2015-01-08 20:42:08 -05:00
|
|
|
default=DEFAULT_SPARK_GITHUB_REPO,
|
|
|
|
help="Github repo from which to checkout supplied commit hash (default: %default)")
|
2015-02-09 18:47:07 -05:00
|
|
|
parser.add_option(
|
|
|
|
"--spark-ec2-git-repo",
|
|
|
|
default=DEFAULT_SPARK_EC2_GITHUB_REPO,
|
|
|
|
help="Github repo from which to checkout spark-ec2 (default: %default)")
|
|
|
|
parser.add_option(
|
|
|
|
"--spark-ec2-git-branch",
|
|
|
|
default=DEFAULT_SPARK_EC2_BRANCH,
|
|
|
|
help="Github repo branch of spark-ec2 to use (default: %default)")
|
2015-03-07 07:56:59 -05:00
|
|
|
parser.add_option(
|
|
|
|
"--deploy-root-dir",
|
|
|
|
default=None,
|
|
|
|
help="A directory to copy into / on the first master. " +
|
|
|
|
"Must be absolute. Note that a trailing slash is handled as per rsync: " +
|
|
|
|
"If you omit it, the last directory of the --deploy-root-dir path will be created " +
|
|
|
|
"in / before copying its contents. If you append the trailing slash, " +
|
|
|
|
"the directory is not created and its contents are copied directly into /. " +
|
|
|
|
"(default: %default).")
|
2014-06-01 18:39:04 -04:00
|
|
|
parser.add_option(
|
|
|
|
"--hadoop-major-version", default="1",
|
2015-06-03 18:14:38 -04:00
|
|
|
help="Major version of Hadoop. Valid options are 1 (Hadoop 1.0.4), 2 (CDH 4.2.0), yarn " +
|
|
|
|
"(Hadoop 2.4.0) (default: %default)")
|
2014-06-01 18:39:04 -04:00
|
|
|
parser.add_option(
|
|
|
|
"-D", metavar="[ADDRESS:]PORT", dest="proxy_port",
|
|
|
|
help="Use SSH dynamic port forwarding to create a SOCKS proxy at " +
|
|
|
|
"the given local address (for use with login)")
|
|
|
|
parser.add_option(
|
|
|
|
"--resume", action="store_true", default=False,
|
|
|
|
help="Resume installation on a previously launched cluster " +
|
|
|
|
"(for debugging)")
|
|
|
|
parser.add_option(
|
|
|
|
"--ebs-vol-size", metavar="SIZE", type="int", default=0,
|
2014-09-05 02:34:58 -04:00
|
|
|
help="Size (in GB) of each EBS volume.")
|
|
|
|
parser.add_option(
|
|
|
|
"--ebs-vol-type", default="standard",
|
|
|
|
help="EBS volume type (e.g. 'gp2', 'standard').")
|
|
|
|
parser.add_option(
|
|
|
|
"--ebs-vol-num", type="int", default=1,
|
|
|
|
help="Number of EBS volumes to attach to each node as /vol[x]. " +
|
|
|
|
"The volumes will be deleted when the instances terminate. " +
|
|
|
|
"Only possible on EBS-backed AMIs. " +
|
|
|
|
"EBS volumes are only attached if --ebs-vol-size > 0." +
|
|
|
|
"Only support up to 8 EBS volumes.")
|
2015-02-10 10:45:38 -05:00
|
|
|
parser.add_option(
|
|
|
|
"--placement-group", type="string", default=None,
|
|
|
|
help="Which placement group to try and launch " +
|
|
|
|
"instances into. Assumes placement group is already " +
|
|
|
|
"created.")
|
2014-06-01 18:39:04 -04:00
|
|
|
parser.add_option(
|
|
|
|
"--swap", metavar="SWAP", type="int", default=1024,
|
2014-09-06 17:39:29 -04:00
|
|
|
help="Swap space to set up per node, in MB (default: %default)")
|
2014-06-01 18:39:04 -04:00
|
|
|
parser.add_option(
|
|
|
|
"--spot-price", metavar="PRICE", type="float",
|
|
|
|
help="If specified, launch slaves as spot instances with the given " +
|
|
|
|
"maximum price (in dollars)")
|
|
|
|
parser.add_option(
|
|
|
|
"--ganglia", action="store_true", default=True,
|
2014-09-06 17:39:29 -04:00
|
|
|
help="Setup Ganglia monitoring on cluster (default: %default). NOTE: " +
|
2014-06-01 18:39:04 -04:00
|
|
|
"the Ganglia page will be publicly accessible")
|
|
|
|
parser.add_option(
|
|
|
|
"--no-ganglia", action="store_false", dest="ganglia",
|
|
|
|
help="Disable Ganglia monitoring for the cluster")
|
|
|
|
parser.add_option(
|
|
|
|
"-u", "--user", default="root",
|
2014-09-06 17:39:29 -04:00
|
|
|
help="The SSH user you want to connect as (default: %default)")
|
2014-06-01 18:39:04 -04:00
|
|
|
parser.add_option(
|
|
|
|
"--delete-groups", action="store_true", default=False,
|
2014-11-25 19:07:09 -05:00
|
|
|
help="When destroying a cluster, delete the security groups that were created")
|
2014-06-01 18:39:04 -04:00
|
|
|
parser.add_option(
|
|
|
|
"--use-existing-master", action="store_true", default=False,
|
|
|
|
help="Launch fresh slaves, but use an existing stopped master if possible")
|
|
|
|
parser.add_option(
|
|
|
|
"--worker-instances", type="int", default=1,
|
2015-06-03 18:14:38 -04:00
|
|
|
help="Number of instances per worker: variable SPARK_WORKER_INSTANCES. Not used if YARN " +
|
|
|
|
"is used as Hadoop major version (default: %default)")
|
2014-06-01 18:39:04 -04:00
|
|
|
parser.add_option(
|
|
|
|
"--master-opts", type="string", default="",
|
|
|
|
help="Extra options to give to master through SPARK_MASTER_OPTS variable " +
|
|
|
|
"(e.g -Dspark.worker.timeout=180)")
|
2014-08-03 13:25:59 -04:00
|
|
|
parser.add_option(
|
|
|
|
"--user-data", type="string", default="",
|
[SPARK-6191] [EC2] Generalize ability to download libs
Right now we have a method to specifically download boto. This PR generalizes it so it's easy to download additional libraries if we want.
For example, adding new external libraries for spark-ec2 is now as simple as:
```python
external_libs = [
{
"name": "boto",
"version": "2.34.0",
"md5": "5556223d2d0cc4d06dd4829e671dcecd"
},
{
"name": "PyYAML",
"version": "3.11",
"md5": "f50e08ef0fe55178479d3a618efe21db"
},
{
"name": "argparse",
"version": "1.3.0",
"md5": "9bcf7f612190885c8c85e30ba41db3c7"
}
]
```
Likely use cases:
* Downloading PyYAML to allow spark-ec2 configs to be persisted as a YAML file. ([SPARK-925](https://issues.apache.org/jira/browse/SPARK-925))
* Downloading argparse to clean up / modernize our option parsing.
First run output, with PyYAML and argparse added just for demonstration purposes:
```shell
$ ./spark-ec2 --version
Downloading external libraries that spark-ec2 needs from PyPI to /path/to/spark/ec2/lib...
This should be a one-time operation.
- Downloading boto...
- Finished downloading boto.
- Downloading PyYAML...
- Finished downloading PyYAML.
- Downloading argparse...
- Finished downloading argparse.
spark-ec2 1.2.1
```
Output thereafter:
```shell
$ ./spark-ec2 --version
spark-ec2 1.2.1
```
Author: Nicholas Chammas <nicholas.chammas@gmail.com>
Closes #4919 from nchammas/setup-ec2-libs and squashes the following commits:
a077955 [Nicholas Chammas] print default region
c95fb7d [Nicholas Chammas] to docstring
5448845 [Nicholas Chammas] remove libs added for demo purposes
60d8c23 [Nicholas Chammas] generalize ability to download libs
2015-03-10 06:58:31 -04:00
|
|
|
help="Path to a user-data file (most AMIs interpret this as an initialization script)")
|
2014-08-25 16:55:04 -04:00
|
|
|
parser.add_option(
|
|
|
|
"--authorized-address", type="string", default="0.0.0.0/0",
|
2014-09-06 17:39:29 -04:00
|
|
|
help="Address to authorize on created security groups (default: %default)")
|
2014-08-25 16:55:04 -04:00
|
|
|
parser.add_option(
|
|
|
|
"--additional-security-group", type="string", default="",
|
|
|
|
help="Additional security group to place the machines in")
|
2014-09-16 16:40:16 -04:00
|
|
|
parser.add_option(
|
|
|
|
"--copy-aws-credentials", action="store_true", default=False,
|
|
|
|
help="Add AWS credentials to hadoop configuration to allow Spark to access S3")
|
2014-12-16 15:13:21 -05:00
|
|
|
parser.add_option(
|
2015-02-10 10:45:38 -05:00
|
|
|
"--subnet-id", default=None,
|
|
|
|
help="VPC subnet to launch instances in")
|
2014-12-16 15:13:21 -05:00
|
|
|
parser.add_option(
|
2015-02-10 10:45:38 -05:00
|
|
|
"--vpc-id", default=None,
|
|
|
|
help="VPC to launch instances in")
|
2015-04-08 16:48:45 -04:00
|
|
|
parser.add_option(
|
|
|
|
"--private-ips", action="store_true", default=False,
|
|
|
|
help="Use private IPs for instances rather than public if VPC/subnet " +
|
|
|
|
"requires that.")
|
2014-06-01 18:39:04 -04:00
|
|
|
|
|
|
|
(opts, args) = parser.parse_args()
|
|
|
|
if len(args) != 2:
|
|
|
|
parser.print_help()
|
2012-11-19 17:21:16 -05:00
|
|
|
sys.exit(1)
|
2014-06-01 18:39:04 -04:00
|
|
|
(action, cluster_name) = args
|
|
|
|
|
|
|
|
# Boto config check
|
|
|
|
# http://boto.cloudhackers.com/en/latest/boto_config_tut.html
|
|
|
|
home_dir = os.getenv('HOME')
|
|
|
|
if home_dir is None or not os.path.isfile(home_dir + '/.boto'):
|
|
|
|
if not os.path.isfile('/etc/boto.cfg'):
|
|
|
|
if os.getenv('AWS_ACCESS_KEY_ID') is None:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("ERROR: The environment variable AWS_ACCESS_KEY_ID must be set",
|
|
|
|
file=stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
sys.exit(1)
|
|
|
|
if os.getenv('AWS_SECRET_ACCESS_KEY') is None:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("ERROR: The environment variable AWS_SECRET_ACCESS_KEY must be set",
|
|
|
|
file=stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
sys.exit(1)
|
|
|
|
return (opts, action, cluster_name)
|
2012-06-11 02:06:15 -04:00
|
|
|
|
|
|
|
|
|
|
|
# Get the EC2 security group of the given name, creating it if it doesn't exist
|
2014-12-16 15:13:21 -05:00
|
|
|
def get_or_make_group(conn, name, vpc_id):
|
2014-06-01 18:39:04 -04:00
|
|
|
groups = conn.get_all_security_groups()
|
|
|
|
group = [g for g in groups if g.name == name]
|
|
|
|
if len(group) > 0:
|
|
|
|
return group[0]
|
|
|
|
else:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Creating security group " + name)
|
2014-12-16 15:13:21 -05:00
|
|
|
return conn.create_security_group(name, "Spark EC2 group", vpc_id)
|
2012-06-11 02:06:15 -04:00
|
|
|
|
|
|
|
|
2015-01-08 20:42:08 -05:00
|
|
|
def get_validate_spark_version(version, repo):
|
|
|
|
if "." in version:
|
|
|
|
version = version.replace("v", "")
|
|
|
|
if version not in VALID_SPARK_VERSIONS:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Don't know about Spark version: {v}".format(v=version), file=stderr)
|
2015-01-08 20:42:08 -05:00
|
|
|
sys.exit(1)
|
|
|
|
return version
|
|
|
|
else:
|
|
|
|
github_commit_url = "{repo}/commit/{commit_hash}".format(repo=repo, commit_hash=version)
|
2015-04-16 19:20:57 -04:00
|
|
|
request = Request(github_commit_url)
|
2015-01-08 20:42:08 -05:00
|
|
|
request.get_method = lambda: 'HEAD'
|
|
|
|
try:
|
2015-04-16 19:20:57 -04:00
|
|
|
response = urlopen(request)
|
|
|
|
except HTTPError as e:
|
|
|
|
print("Couldn't validate Spark commit: {url}".format(url=github_commit_url),
|
|
|
|
file=stderr)
|
|
|
|
print("Received HTTP response code of {code}.".format(code=e.code), file=stderr)
|
2015-01-08 20:42:08 -05:00
|
|
|
sys.exit(1)
|
|
|
|
return version
|
|
|
|
|
|
|
|
|
2014-09-29 13:45:08 -04:00
|
|
|
# Source: http://aws.amazon.com/amazon-linux-ami/instance-type-matrix/
|
2015-05-08 18:59:34 -04:00
|
|
|
# Last Updated: 2015-05-08
|
2014-09-29 13:45:08 -04:00
|
|
|
# For easy maintainability, please keep this manually-inputted dictionary sorted by key.
|
2015-02-10 10:45:38 -05:00
|
|
|
EC2_INSTANCE_TYPES = {
|
|
|
|
"c1.medium": "pvm",
|
|
|
|
"c1.xlarge": "pvm",
|
2015-05-08 18:59:34 -04:00
|
|
|
"c3.large": "pvm",
|
|
|
|
"c3.xlarge": "pvm",
|
2015-02-10 10:45:38 -05:00
|
|
|
"c3.2xlarge": "pvm",
|
|
|
|
"c3.4xlarge": "pvm",
|
|
|
|
"c3.8xlarge": "pvm",
|
2015-05-08 18:59:34 -04:00
|
|
|
"c4.large": "hvm",
|
|
|
|
"c4.xlarge": "hvm",
|
|
|
|
"c4.2xlarge": "hvm",
|
|
|
|
"c4.4xlarge": "hvm",
|
|
|
|
"c4.8xlarge": "hvm",
|
2015-02-10 10:45:38 -05:00
|
|
|
"cc1.4xlarge": "hvm",
|
|
|
|
"cc2.8xlarge": "hvm",
|
|
|
|
"cg1.4xlarge": "hvm",
|
|
|
|
"cr1.8xlarge": "hvm",
|
2015-05-08 18:59:34 -04:00
|
|
|
"d2.xlarge": "hvm",
|
|
|
|
"d2.2xlarge": "hvm",
|
|
|
|
"d2.4xlarge": "hvm",
|
|
|
|
"d2.8xlarge": "hvm",
|
|
|
|
"g2.2xlarge": "hvm",
|
|
|
|
"g2.8xlarge": "hvm",
|
2015-02-10 10:45:38 -05:00
|
|
|
"hi1.4xlarge": "pvm",
|
|
|
|
"hs1.8xlarge": "pvm",
|
2015-05-08 18:59:34 -04:00
|
|
|
"i2.xlarge": "hvm",
|
2015-02-10 10:45:38 -05:00
|
|
|
"i2.2xlarge": "hvm",
|
|
|
|
"i2.4xlarge": "hvm",
|
|
|
|
"i2.8xlarge": "hvm",
|
|
|
|
"m1.small": "pvm",
|
2015-05-08 18:59:34 -04:00
|
|
|
"m1.medium": "pvm",
|
|
|
|
"m1.large": "pvm",
|
2015-02-10 10:45:38 -05:00
|
|
|
"m1.xlarge": "pvm",
|
2015-05-08 18:59:34 -04:00
|
|
|
"m2.xlarge": "pvm",
|
2015-02-10 10:45:38 -05:00
|
|
|
"m2.2xlarge": "pvm",
|
|
|
|
"m2.4xlarge": "pvm",
|
|
|
|
"m3.medium": "hvm",
|
2015-05-08 18:59:34 -04:00
|
|
|
"m3.large": "hvm",
|
2015-02-10 10:45:38 -05:00
|
|
|
"m3.xlarge": "hvm",
|
2015-05-08 18:59:34 -04:00
|
|
|
"m3.2xlarge": "hvm",
|
|
|
|
"r3.large": "hvm",
|
|
|
|
"r3.xlarge": "hvm",
|
2015-02-10 10:45:38 -05:00
|
|
|
"r3.2xlarge": "hvm",
|
|
|
|
"r3.4xlarge": "hvm",
|
|
|
|
"r3.8xlarge": "hvm",
|
|
|
|
"t1.micro": "pvm",
|
|
|
|
"t2.micro": "hvm",
|
|
|
|
"t2.small": "hvm",
|
2015-05-08 18:59:34 -04:00
|
|
|
"t2.medium": "hvm",
|
2015-02-10 10:45:38 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2015-03-10 07:02:12 -04:00
|
|
|
def get_tachyon_version(spark_version):
|
|
|
|
return SPARK_TACHYON_MAP.get(spark_version, "")
|
|
|
|
|
|
|
|
|
2015-02-10 10:45:38 -05:00
|
|
|
# Attempt to resolve an appropriate AMI given the architecture and region of the request.
|
2013-04-19 01:31:24 -04:00
|
|
|
def get_spark_ami(opts):
|
2015-02-10 10:45:38 -05:00
|
|
|
if opts.instance_type in EC2_INSTANCE_TYPES:
|
|
|
|
instance_type = EC2_INSTANCE_TYPES[opts.instance_type]
|
2014-06-01 18:39:04 -04:00
|
|
|
else:
|
|
|
|
instance_type = "pvm"
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Don't recognize %s, assuming type is pvm" % opts.instance_type, file=stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
|
2015-02-09 18:47:07 -05:00
|
|
|
# URL prefix from which to fetch AMI information
|
|
|
|
ami_prefix = "{r}/{b}/ami-list".format(
|
|
|
|
r=opts.spark_ec2_git_repo.replace("https://github.com", "https://raw.github.com", 1),
|
|
|
|
b=opts.spark_ec2_git_branch)
|
|
|
|
|
|
|
|
ami_path = "%s/%s/%s" % (ami_prefix, opts.region, instance_type)
|
2015-05-26 12:02:25 -04:00
|
|
|
reader = codecs.getreader("ascii")
|
2014-06-01 18:39:04 -04:00
|
|
|
try:
|
2015-05-26 12:02:25 -04:00
|
|
|
ami = reader(urlopen(ami_path)).read().strip()
|
2014-06-01 18:39:04 -04:00
|
|
|
except:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Could not resolve AMI at: " + ami_path, file=stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
sys.exit(1)
|
|
|
|
|
2015-05-26 12:02:25 -04:00
|
|
|
print("Spark AMI: " + ami)
|
2014-06-01 18:39:04 -04:00
|
|
|
return ami
|
|
|
|
|
2012-06-11 02:06:15 -04:00
|
|
|
|
|
|
|
# Launch a cluster of the given name, by setting up its security groups,
|
|
|
|
# and then starting new instances in them.
|
2013-06-26 14:15:48 -04:00
|
|
|
# Returns a tuple of EC2 reservation objects for the master and slaves
|
2012-06-11 02:06:15 -04:00
|
|
|
# Fails if there already instances running in the cluster's groups.
|
|
|
|
def launch_cluster(conn, opts, cluster_name):
|
2014-06-01 18:39:04 -04:00
|
|
|
if opts.identity_file is None:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("ERROR: Must provide an identity file (-i) for ssh connections.", file=stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
sys.exit(1)
|
2015-02-08 05:08:51 -05:00
|
|
|
|
2014-06-01 18:39:04 -04:00
|
|
|
if opts.key_pair is None:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("ERROR: Must provide a key pair name (-k) to use on instances.", file=stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
sys.exit(1)
|
2014-08-03 13:25:59 -04:00
|
|
|
|
|
|
|
user_data_content = None
|
|
|
|
if opts.user_data:
|
|
|
|
with open(opts.user_data) as user_data_file:
|
|
|
|
user_data_content = user_data_file.read()
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Setting up security groups...")
|
2014-12-16 15:13:21 -05:00
|
|
|
master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id)
|
|
|
|
slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id)
|
2014-08-25 16:55:04 -04:00
|
|
|
authorized_address = opts.authorized_address
|
2014-06-01 18:39:04 -04:00
|
|
|
if master_group.rules == []: # Group was just now created
|
2014-12-16 15:13:21 -05:00
|
|
|
if opts.vpc_id is None:
|
|
|
|
master_group.authorize(src_group=master_group)
|
|
|
|
master_group.authorize(src_group=slave_group)
|
|
|
|
else:
|
|
|
|
master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
|
|
|
|
src_group=master_group)
|
|
|
|
master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
|
|
|
|
src_group=master_group)
|
|
|
|
master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
|
|
|
|
src_group=master_group)
|
|
|
|
master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
|
|
|
|
src_group=slave_group)
|
|
|
|
master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
|
|
|
|
src_group=slave_group)
|
|
|
|
master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
|
|
|
|
src_group=slave_group)
|
2014-08-25 16:55:04 -04:00
|
|
|
master_group.authorize('tcp', 22, 22, authorized_address)
|
|
|
|
master_group.authorize('tcp', 8080, 8081, authorized_address)
|
|
|
|
master_group.authorize('tcp', 18080, 18080, authorized_address)
|
|
|
|
master_group.authorize('tcp', 19999, 19999, authorized_address)
|
|
|
|
master_group.authorize('tcp', 50030, 50030, authorized_address)
|
|
|
|
master_group.authorize('tcp', 50070, 50070, authorized_address)
|
|
|
|
master_group.authorize('tcp', 60070, 60070, authorized_address)
|
|
|
|
master_group.authorize('tcp', 4040, 4045, authorized_address)
|
2015-04-01 06:10:43 -04:00
|
|
|
# HDFS NFS gateway requires 111,2049,4242 for tcp & udp
|
|
|
|
master_group.authorize('tcp', 111, 111, authorized_address)
|
|
|
|
master_group.authorize('udp', 111, 111, authorized_address)
|
|
|
|
master_group.authorize('tcp', 2049, 2049, authorized_address)
|
|
|
|
master_group.authorize('udp', 2049, 2049, authorized_address)
|
|
|
|
master_group.authorize('tcp', 4242, 4242, authorized_address)
|
|
|
|
master_group.authorize('udp', 4242, 4242, authorized_address)
|
2015-05-26 18:01:27 -04:00
|
|
|
# RM in YARN mode uses 8088
|
|
|
|
master_group.authorize('tcp', 8088, 8088, authorized_address)
|
2014-06-01 18:39:04 -04:00
|
|
|
if opts.ganglia:
|
2014-08-25 16:55:04 -04:00
|
|
|
master_group.authorize('tcp', 5080, 5080, authorized_address)
|
2014-06-01 18:39:04 -04:00
|
|
|
if slave_group.rules == []: # Group was just now created
|
2014-12-16 15:13:21 -05:00
|
|
|
if opts.vpc_id is None:
|
|
|
|
slave_group.authorize(src_group=master_group)
|
|
|
|
slave_group.authorize(src_group=slave_group)
|
|
|
|
else:
|
|
|
|
slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
|
|
|
|
src_group=master_group)
|
|
|
|
slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
|
|
|
|
src_group=master_group)
|
|
|
|
slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
|
|
|
|
src_group=master_group)
|
|
|
|
slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
|
|
|
|
src_group=slave_group)
|
|
|
|
slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
|
|
|
|
src_group=slave_group)
|
|
|
|
slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
|
|
|
|
src_group=slave_group)
|
2014-08-25 16:55:04 -04:00
|
|
|
slave_group.authorize('tcp', 22, 22, authorized_address)
|
|
|
|
slave_group.authorize('tcp', 8080, 8081, authorized_address)
|
|
|
|
slave_group.authorize('tcp', 50060, 50060, authorized_address)
|
|
|
|
slave_group.authorize('tcp', 50075, 50075, authorized_address)
|
|
|
|
slave_group.authorize('tcp', 60060, 60060, authorized_address)
|
|
|
|
slave_group.authorize('tcp', 60075, 60075, authorized_address)
|
2014-06-01 18:39:04 -04:00
|
|
|
|
2014-11-25 19:07:09 -05:00
|
|
|
# Check if instances are already running in our groups
|
2014-06-01 18:39:04 -04:00
|
|
|
existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name,
|
|
|
|
die_on_error=False)
|
|
|
|
if existing_slaves or (existing_masters and not opts.use_existing_master):
|
2015-04-16 19:20:57 -04:00
|
|
|
print("ERROR: There are already instances running in group %s or %s" %
|
|
|
|
(master_group.name, slave_group.name), file=stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
# Figure out Spark AMI
|
|
|
|
if opts.ami is None:
|
|
|
|
opts.ami = get_spark_ami(opts)
|
2014-08-25 16:55:04 -04:00
|
|
|
|
2014-12-16 15:13:21 -05:00
|
|
|
# we use group ids to work around https://github.com/boto/boto/issues/350
|
|
|
|
additional_group_ids = []
|
2014-08-25 16:55:04 -04:00
|
|
|
if opts.additional_security_group:
|
2014-12-16 15:13:21 -05:00
|
|
|
additional_group_ids = [sg.id
|
|
|
|
for sg in conn.get_all_security_groups()
|
|
|
|
if opts.additional_security_group in (sg.name, sg.id)]
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Launching instances...")
|
2014-06-01 18:39:04 -04:00
|
|
|
|
2012-10-31 02:32:38 -04:00
|
|
|
try:
|
2014-06-01 18:39:04 -04:00
|
|
|
image = conn.get_all_images(image_ids=[opts.ami])[0]
|
2012-10-31 02:32:38 -04:00
|
|
|
except:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Could not find AMI " + opts.ami, file=stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
sys.exit(1)
|
2012-06-11 02:06:15 -04:00
|
|
|
|
2014-09-05 02:34:58 -04:00
|
|
|
# Create block device mapping so that we can add EBS volumes if asked to.
|
|
|
|
# The first drive is attached as /dev/sds, 2nd as /dev/sdt, ... /dev/sdz
|
2014-06-01 18:39:04 -04:00
|
|
|
block_map = BlockDeviceMapping()
|
|
|
|
if opts.ebs_vol_size > 0:
|
2014-09-05 02:34:58 -04:00
|
|
|
for i in range(opts.ebs_vol_num):
|
|
|
|
device = EBSBlockDeviceType()
|
|
|
|
device.size = opts.ebs_vol_size
|
2014-09-06 02:08:54 -04:00
|
|
|
device.volume_type = opts.ebs_vol_type
|
2014-09-05 02:34:58 -04:00
|
|
|
device.delete_on_termination = True
|
|
|
|
block_map["/dev/sd" + chr(ord('s') + i)] = device
|
2014-06-01 18:39:04 -04:00
|
|
|
|
2014-09-02 01:14:28 -04:00
|
|
|
# AWS ignores the AMI-specified block device mapping for M3 (see SPARK-3342).
|
|
|
|
if opts.instance_type.startswith('m3.'):
|
|
|
|
for i in range(get_num_disks(opts.instance_type)):
|
|
|
|
dev = BlockDeviceType()
|
|
|
|
dev.ephemeral_name = 'ephemeral%d' % i
|
|
|
|
# The first ephemeral drive is /dev/sdb.
|
|
|
|
name = '/dev/sd' + string.letters[i + 1]
|
|
|
|
block_map[name] = dev
|
|
|
|
|
2014-06-01 18:39:04 -04:00
|
|
|
# Launch slaves
|
|
|
|
if opts.spot_price is not None:
|
|
|
|
# Launch spot instances with the requested price
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Requesting %d slaves as spot instances with price $%.3f" %
|
|
|
|
(opts.slaves, opts.spot_price))
|
2014-06-01 18:39:04 -04:00
|
|
|
zones = get_zones(conn, opts)
|
|
|
|
num_zones = len(zones)
|
|
|
|
i = 0
|
|
|
|
my_req_ids = []
|
|
|
|
for zone in zones:
|
|
|
|
num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
|
|
|
|
slave_reqs = conn.request_spot_instances(
|
|
|
|
price=opts.spot_price,
|
|
|
|
image_id=opts.ami,
|
|
|
|
launch_group="launch-group-%s" % cluster_name,
|
|
|
|
placement=zone,
|
|
|
|
count=num_slaves_this_zone,
|
|
|
|
key_name=opts.key_pair,
|
2014-12-16 15:13:21 -05:00
|
|
|
security_group_ids=[slave_group.id] + additional_group_ids,
|
2014-06-01 18:39:04 -04:00
|
|
|
instance_type=opts.instance_type,
|
2014-08-03 13:25:59 -04:00
|
|
|
block_device_map=block_map,
|
2014-12-16 15:13:21 -05:00
|
|
|
subnet_id=opts.subnet_id,
|
2014-12-16 17:37:04 -05:00
|
|
|
placement_group=opts.placement_group,
|
2014-08-03 13:25:59 -04:00
|
|
|
user_data=user_data_content)
|
2014-06-01 18:39:04 -04:00
|
|
|
my_req_ids += [req.id for req in slave_reqs]
|
|
|
|
i += 1
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Waiting for spot instances to be granted...")
|
2014-06-01 18:39:04 -04:00
|
|
|
try:
|
|
|
|
while True:
|
|
|
|
time.sleep(10)
|
|
|
|
reqs = conn.get_all_spot_instance_requests()
|
|
|
|
id_to_req = {}
|
|
|
|
for r in reqs:
|
|
|
|
id_to_req[r.id] = r
|
|
|
|
active_instance_ids = []
|
|
|
|
for i in my_req_ids:
|
2014-11-25 19:07:09 -05:00
|
|
|
if i in id_to_req and id_to_req[i].state == "active":
|
|
|
|
active_instance_ids.append(id_to_req[i].instance_id)
|
2014-06-01 18:39:04 -04:00
|
|
|
if len(active_instance_ids) == opts.slaves:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("All %d slaves granted" % opts.slaves)
|
2014-12-19 20:02:37 -05:00
|
|
|
reservations = conn.get_all_reservations(active_instance_ids)
|
2014-06-01 18:39:04 -04:00
|
|
|
slave_nodes = []
|
|
|
|
for r in reservations:
|
|
|
|
slave_nodes += r.instances
|
|
|
|
break
|
|
|
|
else:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("%d of %d slaves granted, waiting longer" % (
|
|
|
|
len(active_instance_ids), opts.slaves))
|
2014-06-01 18:39:04 -04:00
|
|
|
except:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Canceling spot instance requests")
|
2014-06-01 18:39:04 -04:00
|
|
|
conn.cancel_spot_instance_requests(my_req_ids)
|
|
|
|
# Log a warning if any of these requests actually launched instances:
|
|
|
|
(master_nodes, slave_nodes) = get_existing_cluster(
|
|
|
|
conn, opts, cluster_name, die_on_error=False)
|
|
|
|
running = len(master_nodes) + len(slave_nodes)
|
|
|
|
if running:
|
2015-04-16 19:20:57 -04:00
|
|
|
print(("WARNING: %d instances are still running" % running), file=stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
sys.exit(0)
|
|
|
|
else:
|
|
|
|
# Launch non-spot instances
|
|
|
|
zones = get_zones(conn, opts)
|
|
|
|
num_zones = len(zones)
|
|
|
|
i = 0
|
|
|
|
slave_nodes = []
|
|
|
|
for zone in zones:
|
|
|
|
num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
|
|
|
|
if num_slaves_this_zone > 0:
|
|
|
|
slave_res = image.run(key_name=opts.key_pair,
|
2014-12-16 15:13:21 -05:00
|
|
|
security_group_ids=[slave_group.id] + additional_group_ids,
|
2014-06-01 18:39:04 -04:00
|
|
|
instance_type=opts.instance_type,
|
|
|
|
placement=zone,
|
|
|
|
min_count=num_slaves_this_zone,
|
|
|
|
max_count=num_slaves_this_zone,
|
2014-08-03 13:25:59 -04:00
|
|
|
block_device_map=block_map,
|
2014-12-16 15:13:21 -05:00
|
|
|
subnet_id=opts.subnet_id,
|
2014-12-16 17:37:04 -05:00
|
|
|
placement_group=opts.placement_group,
|
2014-08-03 13:25:59 -04:00
|
|
|
user_data=user_data_content)
|
2014-06-01 18:39:04 -04:00
|
|
|
slave_nodes += slave_res.instances
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Launched {s} slave{plural_s} in {z}, regid = {r}".format(
|
|
|
|
s=num_slaves_this_zone,
|
|
|
|
plural_s=('' if num_slaves_this_zone == 1 else 's'),
|
|
|
|
z=zone,
|
|
|
|
r=slave_res.id))
|
2014-06-01 18:39:04 -04:00
|
|
|
i += 1
|
|
|
|
|
|
|
|
# Launch or resume masters
|
|
|
|
if existing_masters:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Starting master...")
|
2014-06-01 18:39:04 -04:00
|
|
|
for inst in existing_masters:
|
|
|
|
if inst.state not in ["shutting-down", "terminated"]:
|
|
|
|
inst.start()
|
|
|
|
master_nodes = existing_masters
|
|
|
|
else:
|
|
|
|
master_type = opts.master_instance_type
|
|
|
|
if master_type == "":
|
|
|
|
master_type = opts.instance_type
|
|
|
|
if opts.zone == 'all':
|
|
|
|
opts.zone = random.choice(conn.get_all_zones()).name
|
|
|
|
master_res = image.run(key_name=opts.key_pair,
|
2014-12-16 15:13:21 -05:00
|
|
|
security_group_ids=[master_group.id] + additional_group_ids,
|
2014-06-01 18:39:04 -04:00
|
|
|
instance_type=master_type,
|
|
|
|
placement=opts.zone,
|
|
|
|
min_count=1,
|
|
|
|
max_count=1,
|
2014-08-27 15:43:22 -04:00
|
|
|
block_device_map=block_map,
|
2014-12-16 15:13:21 -05:00
|
|
|
subnet_id=opts.subnet_id,
|
2014-12-16 17:37:04 -05:00
|
|
|
placement_group=opts.placement_group,
|
2014-08-27 15:43:22 -04:00
|
|
|
user_data=user_data_content)
|
2014-12-16 17:37:04 -05:00
|
|
|
|
2014-06-01 18:39:04 -04:00
|
|
|
master_nodes = master_res.instances
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Launched master in %s, regid = %s" % (zone, master_res.id))
|
2014-06-01 18:39:04 -04:00
|
|
|
|
2015-02-06 16:27:34 -05:00
|
|
|
# This wait time corresponds to SPARK-4983
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Waiting for AWS to propagate instance metadata...")
|
2015-02-06 16:27:34 -05:00
|
|
|
time.sleep(5)
|
2014-06-11 00:49:08 -04:00
|
|
|
# Give the instances descriptive names
|
|
|
|
for master in master_nodes:
|
2014-11-25 19:07:09 -05:00
|
|
|
master.add_tag(
|
|
|
|
key='Name',
|
|
|
|
value='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id))
|
2014-06-11 00:49:08 -04:00
|
|
|
for slave in slave_nodes:
|
2014-11-25 19:07:09 -05:00
|
|
|
slave.add_tag(
|
|
|
|
key='Name',
|
|
|
|
value='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id))
|
2014-06-11 00:49:08 -04:00
|
|
|
|
2014-06-01 18:39:04 -04:00
|
|
|
# Return all the instances
|
|
|
|
return (master_nodes, slave_nodes)
|
2012-06-11 02:06:15 -04:00
|
|
|
|
2014-09-06 02:08:54 -04:00
|
|
|
|
2012-10-18 13:01:38 -04:00
|
|
|
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
|
[SPARK-6193] [EC2] Push group filter up to EC2
When looking for a cluster, spark-ec2 currently pulls down [info for all instances](https://github.com/apache/spark/blob/eb48fd6e9d55fb034c00e61374bb9c2a86a82fb8/ec2/spark_ec2.py#L620) and filters locally. When working on an AWS account with hundreds of active instances, this step alone can take over 10 seconds.
This PR improves how spark-ec2 searches for clusters by pushing the filter up to EC2.
Basically, the problem (and solution) look like this:
```python
>>> timeit.timeit('blah = conn.get_all_reservations()', setup='from __main__ import conn', number=10)
116.96390509605408
>>> timeit.timeit('blah = conn.get_all_reservations(filters={"instance.group-name": ["my-cluster-master"]})', setup='from __main__ import conn', number=10)
4.629754066467285
```
Translated to a user-visible action, this looks like (against an AWS account with ~200 active instances):
```shell
# master
$ python -m timeit -n 3 --setup 'import subprocess' 'subprocess.call("./spark-ec2 get-master my-cluster --region us-west-2", shell=True)'
...
3 loops, best of 3: 9.83 sec per loop
# this PR
$ python -m timeit -n 3 --setup 'import subprocess' 'subprocess.call("./spark-ec2 get-master my-cluster --region us-west-2", shell=True)'
...
3 loops, best of 3: 1.47 sec per loop
```
This PR also refactors `get_existing_cluster()` to make it, I hope, simpler.
Finally, this PR fixes some minor grammar issues related to printing status to the user. :tophat: :clap:
Author: Nicholas Chammas <nicholas.chammas@gmail.com>
Closes #4922 from nchammas/get-existing-cluster-faster and squashes the following commits:
18802f1 [Nicholas Chammas] ignore shutting-down
f2a5b9f [Nicholas Chammas] fix grammar
d96a489 [Nicholas Chammas] push group filter up to EC2
2015-03-08 10:01:26 -04:00
|
|
|
"""
|
|
|
|
Get the EC2 instances in an existing cluster if available.
|
|
|
|
Returns a tuple of lists of EC2 instance objects for the masters and slaves.
|
|
|
|
"""
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Searching for existing cluster {c} in region {r}...".format(
|
|
|
|
c=cluster_name, r=opts.region))
|
[SPARK-6193] [EC2] Push group filter up to EC2
When looking for a cluster, spark-ec2 currently pulls down [info for all instances](https://github.com/apache/spark/blob/eb48fd6e9d55fb034c00e61374bb9c2a86a82fb8/ec2/spark_ec2.py#L620) and filters locally. When working on an AWS account with hundreds of active instances, this step alone can take over 10 seconds.
This PR improves how spark-ec2 searches for clusters by pushing the filter up to EC2.
Basically, the problem (and solution) look like this:
```python
>>> timeit.timeit('blah = conn.get_all_reservations()', setup='from __main__ import conn', number=10)
116.96390509605408
>>> timeit.timeit('blah = conn.get_all_reservations(filters={"instance.group-name": ["my-cluster-master"]})', setup='from __main__ import conn', number=10)
4.629754066467285
```
Translated to a user-visible action, this looks like (against an AWS account with ~200 active instances):
```shell
# master
$ python -m timeit -n 3 --setup 'import subprocess' 'subprocess.call("./spark-ec2 get-master my-cluster --region us-west-2", shell=True)'
...
3 loops, best of 3: 9.83 sec per loop
# this PR
$ python -m timeit -n 3 --setup 'import subprocess' 'subprocess.call("./spark-ec2 get-master my-cluster --region us-west-2", shell=True)'
...
3 loops, best of 3: 1.47 sec per loop
```
This PR also refactors `get_existing_cluster()` to make it, I hope, simpler.
Finally, this PR fixes some minor grammar issues related to printing status to the user. :tophat: :clap:
Author: Nicholas Chammas <nicholas.chammas@gmail.com>
Closes #4922 from nchammas/get-existing-cluster-faster and squashes the following commits:
18802f1 [Nicholas Chammas] ignore shutting-down
f2a5b9f [Nicholas Chammas] fix grammar
d96a489 [Nicholas Chammas] push group filter up to EC2
2015-03-08 10:01:26 -04:00
|
|
|
|
|
|
|
def get_instances(group_names):
|
|
|
|
"""
|
|
|
|
Get all non-terminated instances that belong to any of the provided security groups.
|
|
|
|
|
|
|
|
EC2 reservation filters and instance states are documented here:
|
|
|
|
http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html#options
|
|
|
|
"""
|
|
|
|
reservations = conn.get_all_reservations(
|
|
|
|
filters={"instance.group-name": group_names})
|
|
|
|
instances = itertools.chain.from_iterable(r.instances for r in reservations)
|
|
|
|
return [i for i in instances if i.state not in ["shutting-down", "terminated"]]
|
|
|
|
|
|
|
|
master_instances = get_instances([cluster_name + "-master"])
|
|
|
|
slave_instances = get_instances([cluster_name + "-slaves"])
|
|
|
|
|
|
|
|
if any((master_instances, slave_instances)):
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Found {m} master{plural_m}, {s} slave{plural_s}.".format(
|
|
|
|
m=len(master_instances),
|
|
|
|
plural_m=('' if len(master_instances) == 1 else 's'),
|
|
|
|
s=len(slave_instances),
|
|
|
|
plural_s=('' if len(slave_instances) == 1 else 's')))
|
[SPARK-6193] [EC2] Push group filter up to EC2
When looking for a cluster, spark-ec2 currently pulls down [info for all instances](https://github.com/apache/spark/blob/eb48fd6e9d55fb034c00e61374bb9c2a86a82fb8/ec2/spark_ec2.py#L620) and filters locally. When working on an AWS account with hundreds of active instances, this step alone can take over 10 seconds.
This PR improves how spark-ec2 searches for clusters by pushing the filter up to EC2.
Basically, the problem (and solution) look like this:
```python
>>> timeit.timeit('blah = conn.get_all_reservations()', setup='from __main__ import conn', number=10)
116.96390509605408
>>> timeit.timeit('blah = conn.get_all_reservations(filters={"instance.group-name": ["my-cluster-master"]})', setup='from __main__ import conn', number=10)
4.629754066467285
```
Translated to a user-visible action, this looks like (against an AWS account with ~200 active instances):
```shell
# master
$ python -m timeit -n 3 --setup 'import subprocess' 'subprocess.call("./spark-ec2 get-master my-cluster --region us-west-2", shell=True)'
...
3 loops, best of 3: 9.83 sec per loop
# this PR
$ python -m timeit -n 3 --setup 'import subprocess' 'subprocess.call("./spark-ec2 get-master my-cluster --region us-west-2", shell=True)'
...
3 loops, best of 3: 1.47 sec per loop
```
This PR also refactors `get_existing_cluster()` to make it, I hope, simpler.
Finally, this PR fixes some minor grammar issues related to printing status to the user. :tophat: :clap:
Author: Nicholas Chammas <nicholas.chammas@gmail.com>
Closes #4922 from nchammas/get-existing-cluster-faster and squashes the following commits:
18802f1 [Nicholas Chammas] ignore shutting-down
f2a5b9f [Nicholas Chammas] fix grammar
d96a489 [Nicholas Chammas] push group filter up to EC2
2015-03-08 10:01:26 -04:00
|
|
|
|
|
|
|
if not master_instances and die_on_error:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("ERROR: Could not find a master for cluster {c} in region {r}.".format(
|
|
|
|
c=cluster_name, r=opts.region), file=sys.stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
sys.exit(1)
|
2012-06-11 02:06:15 -04:00
|
|
|
|
[SPARK-6193] [EC2] Push group filter up to EC2
When looking for a cluster, spark-ec2 currently pulls down [info for all instances](https://github.com/apache/spark/blob/eb48fd6e9d55fb034c00e61374bb9c2a86a82fb8/ec2/spark_ec2.py#L620) and filters locally. When working on an AWS account with hundreds of active instances, this step alone can take over 10 seconds.
This PR improves how spark-ec2 searches for clusters by pushing the filter up to EC2.
Basically, the problem (and solution) look like this:
```python
>>> timeit.timeit('blah = conn.get_all_reservations()', setup='from __main__ import conn', number=10)
116.96390509605408
>>> timeit.timeit('blah = conn.get_all_reservations(filters={"instance.group-name": ["my-cluster-master"]})', setup='from __main__ import conn', number=10)
4.629754066467285
```
Translated to a user-visible action, this looks like (against an AWS account with ~200 active instances):
```shell
# master
$ python -m timeit -n 3 --setup 'import subprocess' 'subprocess.call("./spark-ec2 get-master my-cluster --region us-west-2", shell=True)'
...
3 loops, best of 3: 9.83 sec per loop
# this PR
$ python -m timeit -n 3 --setup 'import subprocess' 'subprocess.call("./spark-ec2 get-master my-cluster --region us-west-2", shell=True)'
...
3 loops, best of 3: 1.47 sec per loop
```
This PR also refactors `get_existing_cluster()` to make it, I hope, simpler.
Finally, this PR fixes some minor grammar issues related to printing status to the user. :tophat: :clap:
Author: Nicholas Chammas <nicholas.chammas@gmail.com>
Closes #4922 from nchammas/get-existing-cluster-faster and squashes the following commits:
18802f1 [Nicholas Chammas] ignore shutting-down
f2a5b9f [Nicholas Chammas] fix grammar
d96a489 [Nicholas Chammas] push group filter up to EC2
2015-03-08 10:01:26 -04:00
|
|
|
return (master_instances, slave_instances)
|
|
|
|
|
2014-11-25 19:07:09 -05:00
|
|
|
|
2012-06-11 02:06:15 -04:00
|
|
|
# Deploy configuration files and run setup scripts on a newly launched
|
|
|
|
# or started EC2 cluster.
|
2013-06-26 14:15:48 -04:00
|
|
|
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
|
2015-04-08 16:48:45 -04:00
|
|
|
master = get_dns_name(master_nodes[0], opts.private_ips)
|
2014-06-01 18:39:04 -04:00
|
|
|
if deploy_ssh_key:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Generating cluster's SSH key on master...")
|
2014-06-01 18:39:04 -04:00
|
|
|
key_setup = """
|
|
|
|
[ -f ~/.ssh/id_rsa ] ||
|
|
|
|
(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa &&
|
|
|
|
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys)
|
|
|
|
"""
|
|
|
|
ssh(master, opts, key_setup)
|
|
|
|
dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh'])
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Transferring cluster's SSH key to slaves...")
|
2014-06-01 18:39:04 -04:00
|
|
|
for slave in slave_nodes:
|
2015-04-08 16:48:45 -04:00
|
|
|
slave_address = get_dns_name(slave, opts.private_ips)
|
2015-04-16 19:20:57 -04:00
|
|
|
print(slave_address)
|
2015-04-08 16:48:45 -04:00
|
|
|
ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar)
|
2014-06-01 18:39:04 -04:00
|
|
|
|
2015-01-08 20:42:08 -05:00
|
|
|
modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs',
|
2014-06-01 18:39:04 -04:00
|
|
|
'mapreduce', 'spark-standalone', 'tachyon']
|
|
|
|
|
|
|
|
if opts.hadoop_major_version == "1":
|
2015-05-26 12:02:25 -04:00
|
|
|
modules = list(filter(lambda x: x != "mapreduce", modules))
|
2014-06-01 18:39:04 -04:00
|
|
|
|
|
|
|
if opts.ganglia:
|
|
|
|
modules.append('ganglia')
|
|
|
|
|
2015-06-03 18:14:38 -04:00
|
|
|
# Clear SPARK_WORKER_INSTANCES if running on YARN
|
|
|
|
if opts.hadoop_major_version == "yarn":
|
|
|
|
opts.worker_instances = ""
|
|
|
|
|
2014-06-01 18:39:04 -04:00
|
|
|
# NOTE: We should clone the repository before running deploy_files to
|
|
|
|
# prevent ec2-variables.sh from being overwritten
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format(
|
|
|
|
r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch))
|
2014-11-03 12:02:35 -05:00
|
|
|
ssh(
|
|
|
|
host=master,
|
|
|
|
opts=opts,
|
|
|
|
command="rm -rf spark-ec2"
|
|
|
|
+ " && "
|
2015-02-09 18:47:07 -05:00
|
|
|
+ "git clone {r} -b {b} spark-ec2".format(r=opts.spark_ec2_git_repo,
|
|
|
|
b=opts.spark_ec2_git_branch)
|
2014-11-03 12:02:35 -05:00
|
|
|
)
|
2014-06-01 18:39:04 -04:00
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Deploying files to master...")
|
2014-11-05 23:45:35 -05:00
|
|
|
deploy_files(
|
|
|
|
conn=conn,
|
|
|
|
root_dir=SPARK_EC2_DIR + "/" + "deploy.generic",
|
|
|
|
opts=opts,
|
|
|
|
master_nodes=master_nodes,
|
|
|
|
slave_nodes=slave_nodes,
|
|
|
|
modules=modules
|
|
|
|
)
|
2014-06-01 18:39:04 -04:00
|
|
|
|
2015-03-07 07:56:59 -05:00
|
|
|
if opts.deploy_root_dir is not None:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Deploying {s} to master...".format(s=opts.deploy_root_dir))
|
2015-03-07 07:56:59 -05:00
|
|
|
deploy_user_files(
|
|
|
|
root_dir=opts.deploy_root_dir,
|
|
|
|
opts=opts,
|
|
|
|
master_nodes=master_nodes
|
|
|
|
)
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Running setup on master...")
|
2014-06-01 18:39:04 -04:00
|
|
|
setup_spark_cluster(master, opts)
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Done!")
|
2014-06-01 18:39:04 -04:00
|
|
|
|
2012-08-02 18:23:52 -04:00
|
|
|
|
2013-01-28 14:16:14 -05:00
|
|
|
def setup_spark_cluster(master, opts):
|
2014-06-01 18:39:04 -04:00
|
|
|
ssh(master, opts, "chmod u+x spark-ec2/setup.sh")
|
|
|
|
ssh(master, opts, "spark-ec2/setup.sh")
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Spark standalone cluster started at http://%s:8080" % master)
|
2013-02-18 21:30:36 -05:00
|
|
|
|
2014-06-01 18:39:04 -04:00
|
|
|
if opts.ganglia:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Ganglia started at http://%s:5080/ganglia" % master)
|
2013-01-28 14:16:14 -05:00
|
|
|
|
2012-06-11 02:06:15 -04:00
|
|
|
|
2015-02-09 04:44:53 -05:00
|
|
|
def is_ssh_available(host, opts, print_ssh_output=True):
|
2014-11-29 03:31:06 -05:00
|
|
|
"""
|
|
|
|
Check if SSH is available on a host.
|
|
|
|
"""
|
2015-02-09 04:44:53 -05:00
|
|
|
s = subprocess.Popen(
|
|
|
|
ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
|
|
|
|
'%s@%s' % (opts.user, host), stringify_command('true')],
|
|
|
|
stdout=subprocess.PIPE,
|
|
|
|
stderr=subprocess.STDOUT # we pipe stderr through stdout to preserve output order
|
|
|
|
)
|
|
|
|
cmd_output = s.communicate()[0] # [1] is stderr, which we redirected to stdout
|
|
|
|
|
|
|
|
if s.returncode != 0 and print_ssh_output:
|
|
|
|
# extra leading newline is for spacing in wait_for_cluster_state()
|
2015-04-16 19:20:57 -04:00
|
|
|
print(textwrap.dedent("""\n
|
2015-02-09 04:44:53 -05:00
|
|
|
Warning: SSH connection error. (This could be temporary.)
|
|
|
|
Host: {h}
|
|
|
|
SSH return code: {r}
|
|
|
|
SSH output: {o}
|
|
|
|
""").format(
|
|
|
|
h=host,
|
|
|
|
r=s.returncode,
|
|
|
|
o=cmd_output.strip()
|
2015-04-16 19:20:57 -04:00
|
|
|
))
|
2015-02-09 04:44:53 -05:00
|
|
|
|
|
|
|
return s.returncode == 0
|
2014-10-07 19:54:32 -04:00
|
|
|
|
|
|
|
|
|
|
|
def is_cluster_ssh_available(cluster_instances, opts):
|
2014-11-29 03:31:06 -05:00
|
|
|
"""
|
|
|
|
Check if SSH is available on all the instances in a cluster.
|
|
|
|
"""
|
2014-10-07 19:54:32 -04:00
|
|
|
for i in cluster_instances:
|
2015-04-08 16:48:45 -04:00
|
|
|
dns_name = get_dns_name(i, opts.private_ips)
|
|
|
|
if not is_ssh_available(host=dns_name, opts=opts):
|
2014-10-07 19:54:32 -04:00
|
|
|
return False
|
|
|
|
else:
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
2014-11-29 03:31:06 -05:00
|
|
|
def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state):
|
2014-10-07 19:54:32 -04:00
|
|
|
"""
|
2014-11-29 03:31:06 -05:00
|
|
|
Wait for all the instances in the cluster to reach a designated state.
|
|
|
|
|
2014-10-07 19:54:32 -04:00
|
|
|
cluster_instances: a list of boto.ec2.instance.Instance
|
|
|
|
cluster_state: a string representing the desired state of all the instances in the cluster
|
|
|
|
value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as
|
|
|
|
'running', 'terminated', etc.
|
|
|
|
(would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250)
|
|
|
|
"""
|
|
|
|
sys.stdout.write(
|
2014-11-29 03:31:06 -05:00
|
|
|
"Waiting for cluster to enter '{s}' state.".format(s=cluster_state)
|
2014-10-07 19:54:32 -04:00
|
|
|
)
|
|
|
|
sys.stdout.flush()
|
|
|
|
|
2014-11-29 03:31:06 -05:00
|
|
|
start_time = datetime.now()
|
2014-10-07 19:54:32 -04:00
|
|
|
num_attempts = 0
|
|
|
|
|
|
|
|
while True:
|
2014-11-29 03:31:06 -05:00
|
|
|
time.sleep(5 * num_attempts) # seconds
|
2014-10-07 19:54:32 -04:00
|
|
|
|
|
|
|
for i in cluster_instances:
|
2014-11-29 03:31:06 -05:00
|
|
|
i.update()
|
|
|
|
|
|
|
|
statuses = conn.get_all_instance_status(instance_ids=[i.id for i in cluster_instances])
|
2014-10-07 19:54:32 -04:00
|
|
|
|
|
|
|
if cluster_state == 'ssh-ready':
|
|
|
|
if all(i.state == 'running' for i in cluster_instances) and \
|
2014-11-29 03:31:06 -05:00
|
|
|
all(s.system_status.status == 'ok' for s in statuses) and \
|
|
|
|
all(s.instance_status.status == 'ok' for s in statuses) and \
|
2014-10-07 19:54:32 -04:00
|
|
|
is_cluster_ssh_available(cluster_instances, opts):
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
if all(i.state == cluster_state for i in cluster_instances):
|
|
|
|
break
|
|
|
|
|
|
|
|
num_attempts += 1
|
|
|
|
|
|
|
|
sys.stdout.write(".")
|
|
|
|
sys.stdout.flush()
|
|
|
|
|
|
|
|
sys.stdout.write("\n")
|
2012-06-11 02:06:15 -04:00
|
|
|
|
2014-11-29 03:31:06 -05:00
|
|
|
end_time = datetime.now()
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Cluster is now in '{s}' state. Waited {t} seconds.".format(
|
2014-11-29 03:31:06 -05:00
|
|
|
s=cluster_state,
|
|
|
|
t=(end_time - start_time).seconds
|
2015-04-16 19:20:57 -04:00
|
|
|
))
|
2014-11-29 03:31:06 -05:00
|
|
|
|
2012-06-11 02:06:15 -04:00
|
|
|
|
|
|
|
# Get number of local disks available for a given EC2 instance type.
|
|
|
|
def get_num_disks(instance_type):
|
2014-09-29 13:45:08 -04:00
|
|
|
# Source: http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/InstanceStorage.html
|
2015-05-08 18:59:34 -04:00
|
|
|
# Last Updated: 2015-05-08
|
2014-09-29 00:55:09 -04:00
|
|
|
# For easy maintainability, please keep this manually-inputted dictionary sorted by key.
|
2014-06-01 18:39:04 -04:00
|
|
|
disks_by_instance = {
|
|
|
|
"c1.medium": 1,
|
|
|
|
"c1.xlarge": 4,
|
2015-05-08 18:59:34 -04:00
|
|
|
"c3.large": 2,
|
|
|
|
"c3.xlarge": 2,
|
2014-09-29 00:55:09 -04:00
|
|
|
"c3.2xlarge": 2,
|
|
|
|
"c3.4xlarge": 2,
|
|
|
|
"c3.8xlarge": 2,
|
2015-05-08 18:59:34 -04:00
|
|
|
"c4.large": 0,
|
|
|
|
"c4.xlarge": 0,
|
|
|
|
"c4.2xlarge": 0,
|
|
|
|
"c4.4xlarge": 0,
|
|
|
|
"c4.8xlarge": 0,
|
2014-06-01 18:39:04 -04:00
|
|
|
"cc1.4xlarge": 2,
|
|
|
|
"cc2.8xlarge": 4,
|
|
|
|
"cg1.4xlarge": 2,
|
|
|
|
"cr1.8xlarge": 2,
|
2015-05-08 18:59:34 -04:00
|
|
|
"d2.xlarge": 3,
|
|
|
|
"d2.2xlarge": 6,
|
|
|
|
"d2.4xlarge": 12,
|
|
|
|
"d2.8xlarge": 24,
|
2014-09-29 00:55:09 -04:00
|
|
|
"g2.2xlarge": 1,
|
2015-05-08 18:59:34 -04:00
|
|
|
"g2.8xlarge": 2,
|
2014-06-01 18:39:04 -04:00
|
|
|
"hi1.4xlarge": 2,
|
2014-09-29 00:55:09 -04:00
|
|
|
"hs1.8xlarge": 24,
|
2015-05-08 18:59:34 -04:00
|
|
|
"i2.xlarge": 1,
|
2014-06-01 18:39:04 -04:00
|
|
|
"i2.2xlarge": 2,
|
|
|
|
"i2.4xlarge": 4,
|
|
|
|
"i2.8xlarge": 8,
|
2014-09-29 00:55:09 -04:00
|
|
|
"m1.small": 1,
|
2015-05-08 18:59:34 -04:00
|
|
|
"m1.medium": 1,
|
|
|
|
"m1.large": 2,
|
2014-09-29 00:55:09 -04:00
|
|
|
"m1.xlarge": 4,
|
2015-05-08 18:59:34 -04:00
|
|
|
"m2.xlarge": 1,
|
2014-09-29 00:55:09 -04:00
|
|
|
"m2.2xlarge": 1,
|
|
|
|
"m2.4xlarge": 2,
|
|
|
|
"m3.medium": 1,
|
2015-05-08 18:59:34 -04:00
|
|
|
"m3.large": 1,
|
2014-09-29 00:55:09 -04:00
|
|
|
"m3.xlarge": 2,
|
2015-05-08 18:59:34 -04:00
|
|
|
"m3.2xlarge": 2,
|
|
|
|
"r3.large": 1,
|
|
|
|
"r3.xlarge": 1,
|
2014-06-04 19:01:56 -04:00
|
|
|
"r3.2xlarge": 1,
|
|
|
|
"r3.4xlarge": 1,
|
2014-06-26 18:21:29 -04:00
|
|
|
"r3.8xlarge": 2,
|
2014-09-29 00:55:09 -04:00
|
|
|
"t1.micro": 0,
|
2015-05-08 18:59:34 -04:00
|
|
|
"t2.micro": 0,
|
|
|
|
"t2.small": 0,
|
|
|
|
"t2.medium": 0,
|
2014-06-01 18:39:04 -04:00
|
|
|
}
|
|
|
|
if instance_type in disks_by_instance:
|
|
|
|
return disks_by_instance[instance_type]
|
|
|
|
else:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("WARNING: Don't know number of disks on instance type %s; assuming 1"
|
|
|
|
% instance_type, file=stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
return 1
|
2012-06-11 02:06:15 -04:00
|
|
|
|
|
|
|
|
|
|
|
# Deploy the configuration file templates in a given local directory to
|
|
|
|
# a cluster, filling in any template parameters with information about the
|
|
|
|
# cluster (e.g. lists of masters and slaves). Files are only deployed to
|
|
|
|
# the first master instance in the cluster, and we expect the setup
|
|
|
|
# script to be run on that instance to copy them to other nodes.
|
2014-11-05 23:45:35 -05:00
|
|
|
#
|
|
|
|
# root_dir should be an absolute path to the directory with the files we want to deploy.
|
2013-06-26 14:15:48 -04:00
|
|
|
def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
|
2015-04-08 16:48:45 -04:00
|
|
|
active_master = get_dns_name(master_nodes[0], opts.private_ips)
|
2014-06-01 18:39:04 -04:00
|
|
|
|
|
|
|
num_disks = get_num_disks(opts.instance_type)
|
|
|
|
hdfs_data_dirs = "/mnt/ephemeral-hdfs/data"
|
|
|
|
mapred_local_dirs = "/mnt/hadoop/mrlocal"
|
|
|
|
spark_local_dirs = "/mnt/spark"
|
|
|
|
if num_disks > 1:
|
|
|
|
for i in range(2, num_disks + 1):
|
|
|
|
hdfs_data_dirs += ",/mnt%d/ephemeral-hdfs/data" % i
|
|
|
|
mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i
|
|
|
|
spark_local_dirs += ",/mnt%d/spark" % i
|
|
|
|
|
|
|
|
cluster_url = "%s:7077" % active_master
|
|
|
|
|
|
|
|
if "." in opts.spark_version:
|
2015-01-08 20:42:08 -05:00
|
|
|
# Pre-built Spark deploy
|
|
|
|
spark_v = get_validate_spark_version(opts.spark_version, opts.spark_git_repo)
|
2015-03-10 07:02:12 -04:00
|
|
|
tachyon_v = get_tachyon_version(spark_v)
|
2014-06-01 18:39:04 -04:00
|
|
|
else:
|
|
|
|
# Spark-only custom deploy
|
|
|
|
spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version)
|
2015-03-10 07:02:12 -04:00
|
|
|
tachyon_v = ""
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Deploying Spark via git hash; Tachyon won't be set up")
|
2015-03-10 07:02:12 -04:00
|
|
|
modules = filter(lambda x: x != "tachyon", modules)
|
2014-06-01 18:39:04 -04:00
|
|
|
|
2015-04-08 16:48:45 -04:00
|
|
|
master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes]
|
|
|
|
slave_addresses = [get_dns_name(i, opts.private_ips) for i in slave_nodes]
|
2015-06-03 18:14:38 -04:00
|
|
|
worker_instances_str = "%d" % opts.worker_instances if opts.worker_instances else ""
|
2014-06-01 18:39:04 -04:00
|
|
|
template_vars = {
|
2015-04-08 16:48:45 -04:00
|
|
|
"master_list": '\n'.join(master_addresses),
|
2014-06-01 18:39:04 -04:00
|
|
|
"active_master": active_master,
|
2015-04-08 16:48:45 -04:00
|
|
|
"slave_list": '\n'.join(slave_addresses),
|
2014-06-01 18:39:04 -04:00
|
|
|
"cluster_url": cluster_url,
|
|
|
|
"hdfs_data_dirs": hdfs_data_dirs,
|
|
|
|
"mapred_local_dirs": mapred_local_dirs,
|
|
|
|
"spark_local_dirs": spark_local_dirs,
|
|
|
|
"swap": str(opts.swap),
|
|
|
|
"modules": '\n'.join(modules),
|
|
|
|
"spark_version": spark_v,
|
2015-03-10 07:02:12 -04:00
|
|
|
"tachyon_version": tachyon_v,
|
2014-06-01 18:39:04 -04:00
|
|
|
"hadoop_major_version": opts.hadoop_major_version,
|
2015-06-03 18:14:38 -04:00
|
|
|
"spark_worker_instances": worker_instances_str,
|
2014-06-01 18:39:04 -04:00
|
|
|
"spark_master_opts": opts.master_opts
|
|
|
|
}
|
|
|
|
|
2014-09-16 16:40:16 -04:00
|
|
|
if opts.copy_aws_credentials:
|
|
|
|
template_vars["aws_access_key_id"] = conn.aws_access_key_id
|
|
|
|
template_vars["aws_secret_access_key"] = conn.aws_secret_access_key
|
|
|
|
else:
|
|
|
|
template_vars["aws_access_key_id"] = ""
|
|
|
|
template_vars["aws_secret_access_key"] = ""
|
|
|
|
|
2014-06-01 18:39:04 -04:00
|
|
|
# Create a temp directory in which we will place all the files to be
|
|
|
|
# deployed after we substitue template parameters in them
|
|
|
|
tmp_dir = tempfile.mkdtemp()
|
|
|
|
for path, dirs, files in os.walk(root_dir):
|
|
|
|
if path.find(".svn") == -1:
|
|
|
|
dest_dir = os.path.join('/', path[len(root_dir):])
|
|
|
|
local_dir = tmp_dir + dest_dir
|
|
|
|
if not os.path.exists(local_dir):
|
|
|
|
os.makedirs(local_dir)
|
|
|
|
for filename in files:
|
|
|
|
if filename[0] not in '#.~' and filename[-1] != '~':
|
|
|
|
dest_file = os.path.join(dest_dir, filename)
|
|
|
|
local_file = tmp_dir + dest_file
|
|
|
|
with open(os.path.join(path, filename)) as src:
|
|
|
|
with open(local_file, "w") as dest:
|
|
|
|
text = src.read()
|
|
|
|
for key in template_vars:
|
|
|
|
text = text.replace("{{" + key + "}}", template_vars[key])
|
|
|
|
dest.write(text)
|
|
|
|
dest.close()
|
|
|
|
# rsync the whole directory over to the master machine
|
|
|
|
command = [
|
|
|
|
'rsync', '-rv',
|
|
|
|
'-e', stringify_command(ssh_command(opts)),
|
|
|
|
"%s/" % tmp_dir,
|
|
|
|
"%s@%s:/" % (opts.user, active_master)
|
2013-05-17 20:10:47 -04:00
|
|
|
]
|
2014-06-01 18:39:04 -04:00
|
|
|
subprocess.check_call(command)
|
|
|
|
# Remove the temp directory we created above
|
|
|
|
shutil.rmtree(tmp_dir)
|
2012-06-11 02:06:15 -04:00
|
|
|
|
|
|
|
|
2015-03-07 07:56:59 -05:00
|
|
|
# Deploy a given local directory to a cluster, WITHOUT parameter substitution.
|
|
|
|
# Note that unlike deploy_files, this works for binary files.
|
|
|
|
# Also, it is up to the user to add (or not) the trailing slash in root_dir.
|
|
|
|
# Files are only deployed to the first master instance in the cluster.
|
|
|
|
#
|
|
|
|
# root_dir should be an absolute path.
|
|
|
|
def deploy_user_files(root_dir, opts, master_nodes):
|
2015-04-08 16:48:45 -04:00
|
|
|
active_master = get_dns_name(master_nodes[0], opts.private_ips)
|
2015-03-07 07:56:59 -05:00
|
|
|
command = [
|
|
|
|
'rsync', '-rv',
|
|
|
|
'-e', stringify_command(ssh_command(opts)),
|
|
|
|
"%s" % root_dir,
|
|
|
|
"%s@%s:/" % (opts.user, active_master)
|
|
|
|
]
|
|
|
|
subprocess.check_call(command)
|
|
|
|
|
|
|
|
|
2013-05-17 20:10:47 -04:00
|
|
|
def stringify_command(parts):
|
2014-06-01 18:39:04 -04:00
|
|
|
if isinstance(parts, str):
|
|
|
|
return parts
|
|
|
|
else:
|
|
|
|
return ' '.join(map(pipes.quote, parts))
|
2013-05-17 20:10:47 -04:00
|
|
|
|
|
|
|
|
|
|
|
def ssh_args(opts):
|
2014-06-01 18:39:04 -04:00
|
|
|
parts = ['-o', 'StrictHostKeyChecking=no']
|
2015-02-06 18:43:58 -05:00
|
|
|
parts += ['-o', 'UserKnownHostsFile=/dev/null']
|
2014-06-01 18:39:04 -04:00
|
|
|
if opts.identity_file is not None:
|
|
|
|
parts += ['-i', opts.identity_file]
|
|
|
|
return parts
|
2013-05-17 20:10:47 -04:00
|
|
|
|
|
|
|
|
|
|
|
def ssh_command(opts):
|
2014-06-01 18:39:04 -04:00
|
|
|
return ['ssh'] + ssh_args(opts)
|
2012-06-11 02:06:15 -04:00
|
|
|
|
|
|
|
|
2014-05-05 00:59:10 -04:00
|
|
|
# Run a command on a host through ssh, retrying up to five times
|
2013-04-10 00:37:02 -04:00
|
|
|
# and then throwing an exception if ssh continues to fail.
|
2012-06-11 02:06:15 -04:00
|
|
|
def ssh(host, opts, command):
|
2014-06-01 18:39:04 -04:00
|
|
|
tries = 0
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
return subprocess.check_call(
|
|
|
|
ssh_command(opts) + ['-t', '-t', '%s@%s' % (opts.user, host),
|
|
|
|
stringify_command(command)])
|
|
|
|
except subprocess.CalledProcessError as e:
|
2014-09-29 00:55:09 -04:00
|
|
|
if tries > 5:
|
2014-06-01 18:39:04 -04:00
|
|
|
# If this was an ssh failure, provide the user with hints.
|
|
|
|
if e.returncode == 255:
|
|
|
|
raise UsageError(
|
|
|
|
"Failed to SSH to remote host {0}.\n" +
|
|
|
|
"Please check that you have provided the correct --identity-file and " +
|
|
|
|
"--key-pair parameters and try again.".format(host))
|
|
|
|
else:
|
|
|
|
raise e
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Error executing remote command, retrying after 30 seconds: {0}".format(e),
|
|
|
|
file=stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
time.sleep(30)
|
|
|
|
tries = tries + 1
|
2013-07-29 20:19:33 -04:00
|
|
|
|
2014-07-10 15:56:00 -04:00
|
|
|
|
2014-06-17 02:42:27 -04:00
|
|
|
# Backported from Python 2.7 for compatiblity with 2.6 (See SPARK-1990)
|
|
|
|
def _check_output(*popenargs, **kwargs):
|
|
|
|
if 'stdout' in kwargs:
|
|
|
|
raise ValueError('stdout argument not allowed, it will be overridden.')
|
2014-06-17 18:09:24 -04:00
|
|
|
process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs)
|
2014-06-17 02:42:27 -04:00
|
|
|
output, unused_err = process.communicate()
|
|
|
|
retcode = process.poll()
|
|
|
|
if retcode:
|
|
|
|
cmd = kwargs.get("args")
|
|
|
|
if cmd is None:
|
|
|
|
cmd = popenargs[0]
|
|
|
|
raise subprocess.CalledProcessError(retcode, cmd, output=output)
|
|
|
|
return output
|
|
|
|
|
2013-07-29 20:19:33 -04:00
|
|
|
|
2013-07-03 19:57:22 -04:00
|
|
|
def ssh_read(host, opts, command):
|
2014-06-17 02:42:27 -04:00
|
|
|
return _check_output(
|
2014-06-01 18:39:04 -04:00
|
|
|
ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)])
|
2013-07-29 20:19:33 -04:00
|
|
|
|
2012-06-11 02:06:15 -04:00
|
|
|
|
2014-09-29 00:55:09 -04:00
|
|
|
def ssh_write(host, opts, command, arguments):
|
2014-06-01 18:39:04 -04:00
|
|
|
tries = 0
|
|
|
|
while True:
|
|
|
|
proc = subprocess.Popen(
|
|
|
|
ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)],
|
|
|
|
stdin=subprocess.PIPE)
|
2014-09-29 00:55:09 -04:00
|
|
|
proc.stdin.write(arguments)
|
2014-06-01 18:39:04 -04:00
|
|
|
proc.stdin.close()
|
|
|
|
status = proc.wait()
|
|
|
|
if status == 0:
|
|
|
|
break
|
2014-09-29 00:55:09 -04:00
|
|
|
elif tries > 5:
|
2014-06-01 18:39:04 -04:00
|
|
|
raise RuntimeError("ssh_write failed with error %s" % proc.returncode)
|
|
|
|
else:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Error {0} while executing remote command, retrying after 30 seconds".
|
|
|
|
format(status), file=stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
time.sleep(30)
|
|
|
|
tries = tries + 1
|
2013-07-03 19:57:22 -04:00
|
|
|
|
2012-06-11 02:06:15 -04:00
|
|
|
|
2012-11-16 20:25:28 -05:00
|
|
|
# Gets a list of zones to launch instances in
|
|
|
|
def get_zones(conn, opts):
|
2014-06-01 18:39:04 -04:00
|
|
|
if opts.zone == 'all':
|
|
|
|
zones = [z.name for z in conn.get_all_zones()]
|
|
|
|
else:
|
|
|
|
zones = [opts.zone]
|
|
|
|
return zones
|
2012-11-16 20:25:28 -05:00
|
|
|
|
|
|
|
|
|
|
|
# Gets the number of items in a partition
|
|
|
|
def get_partition(total, num_partitions, current_partitions):
|
2015-05-26 12:02:25 -04:00
|
|
|
num_slaves_this_zone = total // num_partitions
|
2014-06-01 18:39:04 -04:00
|
|
|
if (total % num_partitions) - current_partitions > 0:
|
|
|
|
num_slaves_this_zone += 1
|
|
|
|
return num_slaves_this_zone
|
2012-11-16 20:25:28 -05:00
|
|
|
|
|
|
|
|
2015-04-08 16:48:45 -04:00
|
|
|
# Gets the IP address, taking into account the --private-ips flag
|
|
|
|
def get_ip_address(instance, private_ips=False):
|
|
|
|
ip = instance.ip_address if not private_ips else \
|
|
|
|
instance.private_ip_address
|
|
|
|
return ip
|
|
|
|
|
|
|
|
|
|
|
|
# Gets the DNS name, taking into account the --private-ips flag
|
|
|
|
def get_dns_name(instance, private_ips=False):
|
|
|
|
dns = instance.public_dns_name if not private_ips else \
|
|
|
|
instance.private_ip_address
|
|
|
|
return dns
|
|
|
|
|
|
|
|
|
2013-09-11 17:59:42 -04:00
|
|
|
def real_main():
|
2014-06-01 18:39:04 -04:00
|
|
|
(opts, action, cluster_name) = parse_args()
|
2014-09-05 02:34:58 -04:00
|
|
|
|
|
|
|
# Input parameter validation
|
2015-01-08 20:42:08 -05:00
|
|
|
get_validate_spark_version(opts.spark_version, opts.spark_git_repo)
|
|
|
|
|
2014-10-07 19:54:32 -04:00
|
|
|
if opts.wait is not None:
|
|
|
|
# NOTE: DeprecationWarnings are silent in 2.7+ by default.
|
|
|
|
# To show them, run Python with the -Wdefault switch.
|
|
|
|
# See: https://docs.python.org/3.5/whatsnew/2.7.html
|
|
|
|
warnings.warn(
|
|
|
|
"This option is deprecated and has no effect. "
|
2014-11-29 03:31:06 -05:00
|
|
|
"spark-ec2 automatically waits as long as necessary for clusters to start up.",
|
2014-10-07 19:54:32 -04:00
|
|
|
DeprecationWarning
|
|
|
|
)
|
|
|
|
|
2015-02-08 05:08:51 -05:00
|
|
|
if opts.identity_file is not None:
|
|
|
|
if not os.path.exists(opts.identity_file):
|
2015-04-16 19:20:57 -04:00
|
|
|
print("ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file),
|
|
|
|
file=stderr)
|
2015-02-08 05:08:51 -05:00
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
file_mode = os.stat(opts.identity_file).st_mode
|
|
|
|
if not (file_mode & S_IRUSR) or not oct(file_mode)[-2:] == '00':
|
2015-04-16 19:20:57 -04:00
|
|
|
print("ERROR: The identity file must be accessible only by you.", file=stderr)
|
|
|
|
print('You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file),
|
|
|
|
file=stderr)
|
2015-02-08 05:08:51 -05:00
|
|
|
sys.exit(1)
|
|
|
|
|
2015-02-10 10:45:38 -05:00
|
|
|
if opts.instance_type not in EC2_INSTANCE_TYPES:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Warning: Unrecognized EC2 instance type for instance-type: {t}".format(
|
|
|
|
t=opts.instance_type), file=stderr)
|
2015-02-10 10:45:38 -05:00
|
|
|
|
|
|
|
if opts.master_instance_type != "":
|
|
|
|
if opts.master_instance_type not in EC2_INSTANCE_TYPES:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format(
|
|
|
|
t=opts.master_instance_type), file=stderr)
|
2015-02-10 10:45:38 -05:00
|
|
|
# Since we try instance types even if we can't resolve them, we check if they resolve first
|
|
|
|
# and, if they do, see if they resolve to the same virtualization type.
|
|
|
|
if opts.instance_type in EC2_INSTANCE_TYPES and \
|
|
|
|
opts.master_instance_type in EC2_INSTANCE_TYPES:
|
|
|
|
if EC2_INSTANCE_TYPES[opts.instance_type] != \
|
|
|
|
EC2_INSTANCE_TYPES[opts.master_instance_type]:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Error: spark-ec2 currently does not support having a master and slaves "
|
|
|
|
"with different AMI virtualization types.", file=stderr)
|
|
|
|
print("master instance virtualization type: {t}".format(
|
|
|
|
t=EC2_INSTANCE_TYPES[opts.master_instance_type]), file=stderr)
|
|
|
|
print("slave instance virtualization type: {t}".format(
|
|
|
|
t=EC2_INSTANCE_TYPES[opts.instance_type]), file=stderr)
|
2015-02-10 10:45:38 -05:00
|
|
|
sys.exit(1)
|
|
|
|
|
2014-09-05 02:34:58 -04:00
|
|
|
if opts.ebs_vol_num > 8:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("ebs-vol-num cannot be greater than 8", file=stderr)
|
2014-09-05 02:34:58 -04:00
|
|
|
sys.exit(1)
|
|
|
|
|
2015-02-09 18:47:07 -05:00
|
|
|
# Prevent breaking ami_prefix (/, .git and startswith checks)
|
|
|
|
# Prevent forks with non spark-ec2 names for now.
|
|
|
|
if opts.spark_ec2_git_repo.endswith("/") or \
|
|
|
|
opts.spark_ec2_git_repo.endswith(".git") or \
|
|
|
|
not opts.spark_ec2_git_repo.startswith("https://github.com") or \
|
|
|
|
not opts.spark_ec2_git_repo.endswith("spark-ec2"):
|
2015-04-16 19:20:57 -04:00
|
|
|
print("spark-ec2-git-repo must be a github repo and it must not have a trailing / or .git. "
|
|
|
|
"Furthermore, we currently only support forks named spark-ec2.", file=stderr)
|
2015-02-09 18:47:07 -05:00
|
|
|
sys.exit(1)
|
|
|
|
|
2015-03-07 07:56:59 -05:00
|
|
|
if not (opts.deploy_root_dir is None or
|
|
|
|
(os.path.isabs(opts.deploy_root_dir) and
|
|
|
|
os.path.isdir(opts.deploy_root_dir) and
|
|
|
|
os.path.exists(opts.deploy_root_dir))):
|
2015-04-16 19:20:57 -04:00
|
|
|
print("--deploy-root-dir must be an absolute path to a directory that exists "
|
|
|
|
"on the local file system", file=stderr)
|
2015-03-07 07:56:59 -05:00
|
|
|
sys.exit(1)
|
|
|
|
|
2014-06-01 18:39:04 -04:00
|
|
|
try:
|
|
|
|
conn = ec2.connect_to_region(opts.region)
|
|
|
|
except Exception as e:
|
2015-04-16 19:20:57 -04:00
|
|
|
print((e), file=stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
# Select an AZ at random if it was not specified.
|
|
|
|
if opts.zone == "":
|
|
|
|
opts.zone = random.choice(conn.get_all_zones()).name
|
|
|
|
|
|
|
|
if action == "launch":
|
|
|
|
if opts.slaves <= 0:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("ERROR: You have to start at least 1 slave", file=sys.stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
sys.exit(1)
|
|
|
|
if opts.resume:
|
|
|
|
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
|
|
|
|
else:
|
|
|
|
(master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name)
|
2014-10-07 19:54:32 -04:00
|
|
|
wait_for_cluster_state(
|
2014-11-29 03:31:06 -05:00
|
|
|
conn=conn,
|
|
|
|
opts=opts,
|
2014-10-07 19:54:32 -04:00
|
|
|
cluster_instances=(master_nodes + slave_nodes),
|
2014-11-29 03:31:06 -05:00
|
|
|
cluster_state='ssh-ready'
|
2014-10-07 19:54:32 -04:00
|
|
|
)
|
2014-06-01 18:39:04 -04:00
|
|
|
setup_cluster(conn, master_nodes, slave_nodes, opts, True)
|
|
|
|
|
|
|
|
elif action == "destroy":
|
2014-06-22 23:52:02 -04:00
|
|
|
(master_nodes, slave_nodes) = get_existing_cluster(
|
|
|
|
conn, opts, cluster_name, die_on_error=False)
|
|
|
|
|
2015-03-07 07:33:41 -05:00
|
|
|
if any(master_nodes + slave_nodes):
|
2015-04-16 19:20:57 -04:00
|
|
|
print("The following instances will be terminated:")
|
2015-03-07 07:33:41 -05:00
|
|
|
for inst in master_nodes + slave_nodes:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("> %s" % get_dns_name(inst, opts.private_ips))
|
|
|
|
print("ALL DATA ON ALL NODES WILL BE LOST!!")
|
2015-03-07 07:33:41 -05:00
|
|
|
|
|
|
|
msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name)
|
2014-06-22 23:52:02 -04:00
|
|
|
response = raw_input(msg)
|
2014-06-01 18:39:04 -04:00
|
|
|
if response == "y":
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Terminating master...")
|
2014-06-01 18:39:04 -04:00
|
|
|
for inst in master_nodes:
|
|
|
|
inst.terminate()
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Terminating slaves...")
|
2014-06-01 18:39:04 -04:00
|
|
|
for inst in slave_nodes:
|
|
|
|
inst.terminate()
|
|
|
|
|
|
|
|
# Delete security groups as well
|
|
|
|
if opts.delete_groups:
|
2014-11-25 19:07:09 -05:00
|
|
|
group_names = [cluster_name + "-master", cluster_name + "-slaves"]
|
2014-10-07 19:54:32 -04:00
|
|
|
wait_for_cluster_state(
|
2014-11-29 03:31:06 -05:00
|
|
|
conn=conn,
|
|
|
|
opts=opts,
|
2014-10-07 19:54:32 -04:00
|
|
|
cluster_instances=(master_nodes + slave_nodes),
|
2014-11-29 03:31:06 -05:00
|
|
|
cluster_state='terminated'
|
2014-10-07 19:54:32 -04:00
|
|
|
)
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Deleting security groups (this will take some time)...")
|
2014-06-01 18:39:04 -04:00
|
|
|
attempt = 1
|
|
|
|
while attempt <= 3:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Attempt %d" % attempt)
|
2014-06-01 18:39:04 -04:00
|
|
|
groups = [g for g in conn.get_all_security_groups() if g.name in group_names]
|
|
|
|
success = True
|
|
|
|
# Delete individual rules in all groups before deleting groups to
|
|
|
|
# remove dependencies between them
|
|
|
|
for group in groups:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Deleting rules in security group " + group.name)
|
2014-06-01 18:39:04 -04:00
|
|
|
for rule in group.rules:
|
|
|
|
for grant in rule.grants:
|
|
|
|
success &= group.revoke(ip_protocol=rule.ip_protocol,
|
|
|
|
from_port=rule.from_port,
|
|
|
|
to_port=rule.to_port,
|
|
|
|
src_group=grant)
|
|
|
|
|
|
|
|
# Sleep for AWS eventual-consistency to catch up, and for instances
|
|
|
|
# to terminate
|
|
|
|
time.sleep(30) # Yes, it does have to be this long :-(
|
|
|
|
for group in groups:
|
|
|
|
try:
|
2015-02-12 18:26:24 -05:00
|
|
|
# It is needed to use group_id to make it work with VPC
|
|
|
|
conn.delete_security_group(group_id=group.id)
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Deleted security group %s" % group.name)
|
2014-06-01 18:39:04 -04:00
|
|
|
except boto.exception.EC2ResponseError:
|
|
|
|
success = False
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Failed to delete security group %s" % group.name)
|
2014-06-01 18:39:04 -04:00
|
|
|
|
|
|
|
# Unfortunately, group.revoke() returns True even if a rule was not
|
|
|
|
# deleted, so this needs to be rerun if something fails
|
|
|
|
if success:
|
|
|
|
break
|
|
|
|
|
|
|
|
attempt += 1
|
|
|
|
|
|
|
|
if not success:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Failed to delete all security groups after 3 tries.")
|
|
|
|
print("Try re-running in a few minutes.")
|
2014-06-01 18:39:04 -04:00
|
|
|
|
|
|
|
elif action == "login":
|
|
|
|
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
|
2015-04-08 16:48:45 -04:00
|
|
|
if not master_nodes[0].public_dns_name and not opts.private_ips:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Master has no public DNS name. Maybe you meant to specify --private-ips?")
|
2015-04-08 16:48:45 -04:00
|
|
|
else:
|
|
|
|
master = get_dns_name(master_nodes[0], opts.private_ips)
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Logging into master " + master + "...")
|
2015-04-08 16:48:45 -04:00
|
|
|
proxy_opt = []
|
|
|
|
if opts.proxy_port is not None:
|
|
|
|
proxy_opt = ['-D', opts.proxy_port]
|
|
|
|
subprocess.check_call(
|
|
|
|
ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)])
|
2014-06-01 18:39:04 -04:00
|
|
|
|
2014-09-16 00:09:58 -04:00
|
|
|
elif action == "reboot-slaves":
|
|
|
|
response = raw_input(
|
|
|
|
"Are you sure you want to reboot the cluster " +
|
|
|
|
cluster_name + " slaves?\n" +
|
|
|
|
"Reboot cluster slaves " + cluster_name + " (y/N): ")
|
|
|
|
if response == "y":
|
|
|
|
(master_nodes, slave_nodes) = get_existing_cluster(
|
|
|
|
conn, opts, cluster_name, die_on_error=False)
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Rebooting slaves...")
|
2014-09-16 00:09:58 -04:00
|
|
|
for inst in slave_nodes:
|
|
|
|
if inst.state not in ["shutting-down", "terminated"]:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Rebooting " + inst.id)
|
2014-09-16 00:09:58 -04:00
|
|
|
inst.reboot()
|
|
|
|
|
2014-06-01 18:39:04 -04:00
|
|
|
elif action == "get-master":
|
|
|
|
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
|
2015-04-08 16:48:45 -04:00
|
|
|
if not master_nodes[0].public_dns_name and not opts.private_ips:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Master has no public DNS name. Maybe you meant to specify --private-ips?")
|
2015-04-08 16:48:45 -04:00
|
|
|
else:
|
2015-04-16 19:20:57 -04:00
|
|
|
print(get_dns_name(master_nodes[0], opts.private_ips))
|
2014-06-01 18:39:04 -04:00
|
|
|
|
|
|
|
elif action == "stop":
|
|
|
|
response = raw_input(
|
|
|
|
"Are you sure you want to stop the cluster " +
|
|
|
|
cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " +
|
|
|
|
"BUT THE CLUSTER WILL KEEP USING SPACE ON\n" +
|
|
|
|
"AMAZON EBS IF IT IS EBS-BACKED!!\n" +
|
|
|
|
"All data on spot-instance slaves will be lost.\n" +
|
|
|
|
"Stop cluster " + cluster_name + " (y/N): ")
|
|
|
|
if response == "y":
|
|
|
|
(master_nodes, slave_nodes) = get_existing_cluster(
|
|
|
|
conn, opts, cluster_name, die_on_error=False)
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Stopping master...")
|
2014-06-01 18:39:04 -04:00
|
|
|
for inst in master_nodes:
|
|
|
|
if inst.state not in ["shutting-down", "terminated"]:
|
|
|
|
inst.stop()
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Stopping slaves...")
|
2014-06-01 18:39:04 -04:00
|
|
|
for inst in slave_nodes:
|
|
|
|
if inst.state not in ["shutting-down", "terminated"]:
|
|
|
|
if inst.spot_instance_request_id:
|
|
|
|
inst.terminate()
|
|
|
|
else:
|
|
|
|
inst.stop()
|
|
|
|
|
|
|
|
elif action == "start":
|
|
|
|
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Starting slaves...")
|
2014-06-01 18:39:04 -04:00
|
|
|
for inst in slave_nodes:
|
|
|
|
if inst.state not in ["shutting-down", "terminated"]:
|
|
|
|
inst.start()
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Starting master...")
|
2014-06-01 18:39:04 -04:00
|
|
|
for inst in master_nodes:
|
|
|
|
if inst.state not in ["shutting-down", "terminated"]:
|
|
|
|
inst.start()
|
2014-10-07 19:54:32 -04:00
|
|
|
wait_for_cluster_state(
|
2014-11-29 03:31:06 -05:00
|
|
|
conn=conn,
|
|
|
|
opts=opts,
|
2014-10-07 19:54:32 -04:00
|
|
|
cluster_instances=(master_nodes + slave_nodes),
|
2014-11-29 03:31:06 -05:00
|
|
|
cluster_state='ssh-ready'
|
2014-10-07 19:54:32 -04:00
|
|
|
)
|
2015-03-09 10:16:07 -04:00
|
|
|
|
|
|
|
# Determine types of running instances
|
|
|
|
existing_master_type = master_nodes[0].instance_type
|
|
|
|
existing_slave_type = slave_nodes[0].instance_type
|
|
|
|
# Setting opts.master_instance_type to the empty string indicates we
|
|
|
|
# have the same instance type for the master and the slaves
|
|
|
|
if existing_master_type == existing_slave_type:
|
|
|
|
existing_master_type = ""
|
|
|
|
opts.master_instance_type = existing_master_type
|
|
|
|
opts.instance_type = existing_slave_type
|
|
|
|
|
2014-06-01 18:39:04 -04:00
|
|
|
setup_cluster(conn, master_nodes, slave_nodes, opts, False)
|
|
|
|
|
2012-06-11 02:06:15 -04:00
|
|
|
else:
|
2015-04-16 19:20:57 -04:00
|
|
|
print("Invalid action: %s" % action, file=stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
sys.exit(1)
|
2012-06-11 02:06:15 -04:00
|
|
|
|
|
|
|
|
2013-09-11 17:59:42 -04:00
|
|
|
def main():
|
2014-06-01 18:39:04 -04:00
|
|
|
try:
|
|
|
|
real_main()
|
2015-04-16 19:20:57 -04:00
|
|
|
except UsageError as e:
|
|
|
|
print("\nError:\n", e, file=stderr)
|
2014-06-01 18:39:04 -04:00
|
|
|
sys.exit(1)
|
2013-09-11 17:59:42 -04:00
|
|
|
|
|
|
|
|
2012-06-11 02:06:15 -04:00
|
|
|
if __name__ == "__main__":
|
2014-06-01 18:39:04 -04:00
|
|
|
logging.basicConfig()
|
|
|
|
main()
|