SPARK-1004. PySpark on YARN

This reopens https://github.com/apache/incubator-spark/pull/640 against the new repo

Author: Sandy Ryza <sandy@cloudera.com>

Closes #30 from sryza/sandy-spark-1004 and squashes the following commits:

89889d4 [Sandy Ryza] Move unzipping py4j to the generate-resources phase so that it gets included in the jar the first time
5165a02 [Sandy Ryza] Fix docs
fd0df79 [Sandy Ryza] PySpark on YARN
This commit is contained in:
Sandy Ryza 2014-04-29 23:24:34 -07:00 committed by Patrick Wendell
parent 7025dda8fa
commit ff5be9a41e
11 changed files with 86 additions and 20 deletions

View file

@ -46,6 +46,7 @@ export PYSPARK_PYTHON
# Add the PySpark classes to the Python path: # Add the PySpark classes to the Python path:
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
# Load the PySpark shell.py script when ./pyspark is used interactively: # Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP export OLD_PYTHONSTARTUP=$PYTHONSTARTUP

View file

@ -45,6 +45,7 @@ rem Figure out which Python to use.
if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python
set PYTHONPATH=%FWDIR%python;%PYTHONPATH% set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP% set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py

View file

@ -294,6 +294,48 @@
</environmentVariables> </environmentVariables>
</configuration> </configuration>
</plugin> </plugin>
<!-- Unzip py4j so we can include its files in the jar -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<phase>generate-resources</phase>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>unzip</executable>
<workingDirectory>../python</workingDirectory>
<arguments>
<argument>-o</argument>
<argument>lib/py4j*.zip</argument>
<argument>-d</argument>
<argument>build</argument>
</arguments>
</configuration>
</plugin>
</plugins> </plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>../python</directory>
<includes>
<include>pyspark/*.py</include>
</includes>
</resource>
<resource>
<directory>../python/build</directory>
<includes>
<include>py4j/*.py</include>
</includes>
</resource>
</resources>
</build> </build>
</project> </project>

View file

@ -78,12 +78,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1))) serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
// Create and start the worker // Create and start the worker
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/worker.py"))
val workerEnv = pb.environment() val workerEnv = pb.environment()
workerEnv.putAll(envVars) workerEnv.putAll(envVars)
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
workerEnv.put("PYTHONPATH", pythonPath)
val worker = pb.start() val worker = pb.start()
// Redirect the worker's stderr to ours // Redirect the worker's stderr to ours
@ -154,12 +151,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
try { try {
// Create and start the daemon // Create and start the daemon
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
val workerEnv = pb.environment() val workerEnv = pb.environment()
workerEnv.putAll(envVars) workerEnv.putAll(envVars)
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
workerEnv.put("PYTHONPATH", pythonPath)
daemon = pb.start() daemon = pb.start()
// Redirect the stderr to ours // Redirect the stderr to ours

View file

@ -63,6 +63,9 @@ All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.
Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`. Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`. The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`.
# Running PySpark on YARN
To run PySpark against a YARN cluster, simply set the MASTER environment variable to "yarn-client".
# Interactive Use # Interactive Use

3
python/.gitignore vendored
View file

@ -1,2 +1,5 @@
*.pyc *.pyc
docs/ docs/
pyspark.egg-info
build/
dist/

View file

@ -1 +0,0 @@
b7924aabe9c5e63f0a4d8bbd17019534c7ec014e

View file

@ -49,13 +49,6 @@ Hive:
Main entry point for accessing data stored in Apache Hive.. Main entry point for accessing data stored in Apache Hive..
""" """
import sys
import os
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j-0.8.1-src.zip"))
from pyspark.conf import SparkConf from pyspark.conf import SparkConf
from pyspark.context import SparkContext from pyspark.context import SparkContext
from pyspark.sql import SQLContext from pyspark.sql import SQLContext

View file

@ -24,10 +24,11 @@ from threading import Thread
from py4j.java_gateway import java_import, JavaGateway, GatewayClient from py4j.java_gateway import java_import, JavaGateway, GatewayClient
SPARK_HOME = os.environ["SPARK_HOME"]
def launch_gateway(): def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]
set_env_vars_for_yarn()
# Launch the Py4j gateway using Spark's run command so that we pick up the # Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh # proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows" on_windows = platform.system() == "Windows"
@ -70,3 +71,27 @@ def launch_gateway():
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
java_import(gateway.jvm, "scala.Tuple2") java_import(gateway.jvm, "scala.Tuple2")
return gateway return gateway
def set_env_vars_for_yarn():
# Add the spark jar, which includes the pyspark files, to the python path
env_map = parse_env(os.environ.get("SPARK_YARN_USER_ENV", ""))
if "PYTHONPATH" in env_map:
env_map["PYTHONPATH"] += ":spark.jar"
else:
env_map["PYTHONPATH"] = "spark.jar"
os.environ["SPARK_YARN_USER_ENV"] = ",".join(k + '=' + v for (k, v) in env_map.items())
def parse_env(env_str):
# Turns a comma-separated of env settings into a dict that maps env vars to
# their values.
env = {}
for var_str in env_str.split(","):
parts = var_str.split("=")
if len(parts) == 2:
env[parts[0]] = parts[1]
elif len(var_str) > 0:
print "Invalid entry in SPARK_YARN_USER_ENV: " + var_str
sys.exit(1)
return env

View file

@ -30,10 +30,12 @@ import unittest
from pyspark.context import SparkContext from pyspark.context import SparkContext
from pyspark.files import SparkFiles from pyspark.files import SparkFiles
from pyspark.java_gateway import SPARK_HOME
from pyspark.serializers import read_int from pyspark.serializers import read_int
SPARK_HOME = os.environ["SPARK_HOME"]
class PySparkTestCase(unittest.TestCase): class PySparkTestCase(unittest.TestCase):
def setUp(self): def setUp(self):

View file

@ -34,3 +34,6 @@ this="$config_bin/$script"
export SPARK_PREFIX=`dirname "$this"`/.. export SPARK_PREFIX=`dirname "$this"`/..
export SPARK_HOME=${SPARK_PREFIX} export SPARK_HOME=${SPARK_PREFIX}
export SPARK_CONF_DIR="$SPARK_HOME/conf" export SPARK_CONF_DIR="$SPARK_HOME/conf"
# Add the PySpark classes to the PYTHONPATH:
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH