spark-instrumented-optimizer/python/pyspark/java_gateway.py
Matei Zaharia 951a5d9398 [SPARK-1549] Add Python support to spark-submit
This PR updates spark-submit to allow submitting Python scripts (currently only with deploy-mode=client, but that's all that was supported before) and updates the PySpark code to properly find various paths, etc. One significant change is that we assume we can always find the Python files either from the Spark assembly JAR (which will happen with the Maven assembly build in make-distribution.sh) or from SPARK_HOME (which will exist in local mode even if you use sbt assembly, and should be enough for testing). This means we no longer need a weird hack to modify the environment for YARN.

This patch also updates the Python worker manager to run python with -u, which means unbuffered output (send it to our logs right away instead of waiting a while after stuff was written); this should simplify debugging.

In addition, it fixes https://issues.apache.org/jira/browse/SPARK-1709, setting the main class from a JAR's Main-Class attribute if not specified by the user, and fixes a few help strings and style issues in spark-submit.

In the future we may want to make the `pyspark` shell use spark-submit as well, but it seems unnecessary for 1.0.

Author: Matei Zaharia <matei@databricks.com>

Closes #664 from mateiz/py-submit and squashes the following commits:

15e9669 [Matei Zaharia] Fix some uses of path.separator property
051278c [Matei Zaharia] Small style fixes
0afe886 [Matei Zaharia] Add license headers
4650412 [Matei Zaharia] Add pyFiles to PYTHONPATH in executors, remove old YARN stuff, add tests
15f8e1e [Matei Zaharia] Set PYTHONPATH in PythonWorkerFactory in case it wasn't set from outside
47c0655 [Matei Zaharia] More work to make spark-submit work with Python:
d4375bd [Matei Zaharia] Clean up description of spark-submit args a bit and add Python ones
2014-05-06 15:12:35 -07:00

79 lines
3.3 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import sys
import signal
import platform
from subprocess import Popen, PIPE
from threading import Thread
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]
gateway_port = -1
if "PYSPARK_GATEWAY_PORT" in os.environ:
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
else:
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
"--die-on-broken-pipe", "0"]
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
else:
# preexec_fn not supported on Windows
proc = Popen(command, stdout=PIPE, stdin=PIPE)
# Determine which ephemeral port the server started on:
gateway_port = int(proc.stdout.readline())
# Create a thread to echo output from the GatewayServer, which is required
# for Java log output to show up:
class EchoOutputThread(Thread):
def __init__(self, stream):
Thread.__init__(self)
self.daemon = True
self.stream = stream
def run(self):
while True:
line = self.stream.readline()
sys.stderr.write(line)
EchoOutputThread(proc.stdout).start()
# Connect to the gateway
gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=False)
# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
java_import(gateway.jvm, "scala.Tuple2")
return gateway