spark-instrumented-optimizer/python/pyspark/java_gateway.py
Marcelo Vanzin 517975d89d [SPARK-4924] Add a library for launching Spark jobs programmatically.
This change encapsulates all the logic involved in launching a Spark job
into a small Java library that can be easily embedded into other applications.

The overall goal of this change is twofold, as described in the bug:

- Provide a public API for launching Spark processes. This is a common request
  from users and currently there's no good answer for it.

- Remove a lot of the duplicated code and other coupling that exists in the
  different parts of Spark that deal with launching processes.

A lot of the duplication was due to different code needed to build an
application's classpath (and the bootstrapper needed to run the driver in
certain situations), and also different code needed to parse spark-submit
command line options in different contexts. The change centralizes those
as much as possible so that all code paths can rely on the library for
handling those appropriately.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #3916 from vanzin/SPARK-4924 and squashes the following commits:

18c7e4d [Marcelo Vanzin] Fix make-distribution.sh.
2ce741f [Marcelo Vanzin] Add lots of quotes.
3b28a75 [Marcelo Vanzin] Update new pom.
a1b8af1 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924
897141f [Marcelo Vanzin] Review feedback.
e2367d2 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924
28cd35e [Marcelo Vanzin] Remove stale comment.
b1d86b0 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924
00505f9 [Marcelo Vanzin] Add blurb about new API in the programming guide.
5f4ddcc [Marcelo Vanzin] Better usage messages.
92a9cfb [Marcelo Vanzin] Fix Win32 launcher, usage.
6184c07 [Marcelo Vanzin] Rename field.
4c19196 [Marcelo Vanzin] Update comment.
7e66c18 [Marcelo Vanzin] Fix pyspark tests.
0031a8e [Marcelo Vanzin] Review feedback.
c12d84b [Marcelo Vanzin] Review feedback. And fix spark-submit on Windows.
e2d4d71 [Marcelo Vanzin] Simplify some code used to launch pyspark.
43008a7 [Marcelo Vanzin] Don't make builder extend SparkLauncher.
b4d6912 [Marcelo Vanzin] Use spark-submit script in SparkLauncher.
28b1434 [Marcelo Vanzin] Add a comment.
304333a [Marcelo Vanzin] Fix propagation of properties file arg.
bb67b93 [Marcelo Vanzin] Remove unrelated Yarn change (that is also wrong).
8ec0243 [Marcelo Vanzin] Add missing newline.
95ddfa8 [Marcelo Vanzin] Fix handling of --help for spark-class command builder.
72da7ec [Marcelo Vanzin] Rename SparkClassLauncher.
62978e4 [Marcelo Vanzin] Minor cleanup of Windows code path.
9cd5b44 [Marcelo Vanzin] Make all non-public APIs package-private.
e4c80b6 [Marcelo Vanzin] Reorganize the code so that only SparkLauncher is public.
e50dc5e [Marcelo Vanzin] Merge branch 'master' into SPARK-4924
de81da2 [Marcelo Vanzin] Fix CommandUtils.
86a87bf [Marcelo Vanzin] Merge branch 'master' into SPARK-4924
2061967 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924
46d46da [Marcelo Vanzin] Clean up a test and make it more future-proof.
b93692a [Marcelo Vanzin] Merge branch 'master' into SPARK-4924
ad03c48 [Marcelo Vanzin] Revert "Fix a thread-safety issue in "local" mode."
0b509d0 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924
23aa2a9 [Marcelo Vanzin] Read java-opts from conf dir, not spark home.
7cff919 [Marcelo Vanzin] Javadoc updates.
eae4d8e [Marcelo Vanzin] Fix new unit tests on Windows.
e570fb5 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924
44cd5f7 [Marcelo Vanzin] Add package-info.java, clean up javadocs.
f7cacff [Marcelo Vanzin] Remove "launch Spark in new thread" feature.
7ed8859 [Marcelo Vanzin] Some more feedback.
54cd4fd [Marcelo Vanzin] Merge branch 'master' into SPARK-4924
61919df [Marcelo Vanzin] Clean leftover debug statement.
aae5897 [Marcelo Vanzin] Use launcher classes instead of jars in non-release mode.
e584fc3 [Marcelo Vanzin] Rework command building a little bit.
525ef5b [Marcelo Vanzin] Rework Unix spark-class to handle argument with newlines.
8ac4e92 [Marcelo Vanzin] Minor test cleanup.
e946a99 [Marcelo Vanzin] Merge PySparkLauncher into SparkSubmitCliLauncher.
c617539 [Marcelo Vanzin] Review feedback round 1.
fc6a3e2 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924
f26556b [Marcelo Vanzin] Fix a thread-safety issue in "local" mode.
2f4e8b4 [Marcelo Vanzin] Changes needed to make this work with SPARK-4048.
799fc20 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924
bb5d324 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924
53faef1 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924
a7936ef [Marcelo Vanzin] Fix pyspark tests.
656374e [Marcelo Vanzin] Mima fixes.
4d511e7 [Marcelo Vanzin] Fix tools search code.
7a01e4a [Marcelo Vanzin] Fix pyspark on Yarn.
1b3f6e9 [Marcelo Vanzin] Call SparkSubmit from spark-class launcher for unknown classes.
25c5ae6 [Marcelo Vanzin] Centralize SparkSubmit command line parsing.
27be98a [Marcelo Vanzin] Modify Spark to use launcher lib.
6f70eea [Marcelo Vanzin] [SPARK-4924] Add a library for launching Spark jobs programatically.
2015-03-11 01:03:01 -07:00

111 lines
5.2 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import atexit
import os
import select
import signal
import shlex
import socket
import platform
from subprocess import Popen, PIPE
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
from pyspark.serializers import read_int
def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]
if "PYSPARK_GATEWAY_PORT" in os.environ:
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
else:
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS")
submit_args = submit_args if submit_args is not None else ""
submit_args = shlex.split(submit_args)
command = [os.path.join(SPARK_HOME, script)] + submit_args
# Start a socket that will be used by PythonGatewayServer to communicate its port to us
callback_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
callback_socket.bind(('127.0.0.1', 0))
callback_socket.listen(1)
callback_host, callback_port = callback_socket.getsockname()
env = dict(os.environ)
env['_PYSPARK_DRIVER_CALLBACK_HOST'] = callback_host
env['_PYSPARK_DRIVER_CALLBACK_PORT'] = str(callback_port)
# Launch the Java gateway.
# We open a pipe to stdin so that the Java gateway can die when the pipe is broken
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env)
else:
# preexec_fn not supported on Windows
proc = Popen(command, stdin=PIPE, env=env)
gateway_port = None
# We use select() here in order to avoid blocking indefinitely if the subprocess dies
# before connecting
while gateway_port is None and proc.poll() is None:
timeout = 1 # (seconds)
readable, _, _ = select.select([callback_socket], [], [], timeout)
if callback_socket in readable:
gateway_connection = callback_socket.accept()[0]
# Determine which ephemeral port the server started on:
gateway_port = read_int(gateway_connection.makefile())
gateway_connection.close()
callback_socket.close()
if gateway_port is None:
raise Exception("Java gateway process exited before sending the driver its port number")
# In Windows, ensure the Java child processes do not linger after Python has exited.
# In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
# the parent process' stdin sends an EOF). In Windows, however, this is not possible
# because java.lang.Process reads directly from the parent process' stdin, contending
# with any opportunity to read an EOF from the parent. Note that this is only best
# effort and will not take effect if the python process is violently terminated.
if on_windows:
# In Windows, the child process here is "spark-submit.cmd", not the JVM itself
# (because the UNIX "exec" command is not available). This means we cannot simply
# call proc.kill(), which kills only the "spark-submit.cmd" process but not the
# JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all
# child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx)
def killChild():
Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)])
atexit.register(killChild)
# Connect to the gateway
gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=False)
# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
# TODO(davies): move into sql
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "scala.Tuple2")
return gateway