2013-07-16 20:21:33 -04:00
|
|
|
#
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
|
|
# this work for additional information regarding copyright ownership.
|
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
# (the "License"); you may not use this file except in compliance with
|
|
|
|
# the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
#
|
|
|
|
|
2014-08-27 01:52:16 -04:00
|
|
|
import atexit
|
2012-08-10 04:10:02 -04:00
|
|
|
import os
|
2015-04-21 03:08:18 -04:00
|
|
|
import sys
|
2013-08-28 19:39:44 -04:00
|
|
|
import signal
|
[SPARK-1808] Route bin/pyspark through Spark submit
**Problem.** For `bin/pyspark`, there is currently no other way to specify Spark configuration properties other than through `SPARK_JAVA_OPTS` in `conf/spark-env.sh`. However, this mechanism is supposedly deprecated. Instead, it needs to pick up configurations explicitly specified in `conf/spark-defaults.conf`.
**Solution.** Have `bin/pyspark` invoke `bin/spark-submit`, like all of its counterparts in Scala land (i.e. `bin/spark-shell`, `bin/run-example`). This has the additional benefit of making the invocation of all the user facing Spark scripts consistent.
**Details.** `bin/pyspark` inherently handles two cases: (1) running python applications and (2) running the python shell. For (1), Spark submit already handles running python applications. For cases in which `bin/pyspark` is given a python file, we can simply call pass the file directly to Spark submit and let it handle the rest.
For case (2), `bin/pyspark` starts a python process as before, which launches the JVM as a sub-process. The existing code already provides a code path to do this. All we needed to change is to use `bin/spark-submit` instead of `spark-class` to launch the JVM. This requires modifications to Spark submit to handle the pyspark shell as a special case.
This has been tested locally (OSX and Windows 7), on a standalone cluster, and on a YARN cluster. Running IPython also works as before, except now it takes in Spark submit arguments too.
Author: Andrew Or <andrewor14@gmail.com>
Closes #799 from andrewor14/pyspark-submit and squashes the following commits:
bf37e36 [Andrew Or] Minor changes
01066fa [Andrew Or] bin/pyspark for Windows
c8cb3bf [Andrew Or] Handle perverse app names (with escaped quotes)
1866f85 [Andrew Or] Windows is not cooperating
456d844 [Andrew Or] Guard against shlex hanging if PYSPARK_SUBMIT_ARGS is not set
7eebda8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-submit
b7ba0d8 [Andrew Or] Address a few comments (minor)
06eb138 [Andrew Or] Use shlex instead of writing our own parser
05879fa [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-submit
a823661 [Andrew Or] Fix --die-on-broken-pipe not propagated properly
6fba412 [Andrew Or] Deal with quotes + address various comments
fe4c8a7 [Andrew Or] Update --help for bin/pyspark
afe47bf [Andrew Or] Fix spark shell
f04aaa4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-submit
a371d26 [Andrew Or] Route bin/pyspark through Spark submit
2014-05-17 01:34:38 -04:00
|
|
|
import shlex
|
2018-04-13 17:28:24 -04:00
|
|
|
import shutil
|
2015-02-16 18:25:11 -05:00
|
|
|
import socket
|
2013-09-01 21:19:29 -04:00
|
|
|
import platform
|
2018-04-13 17:28:24 -04:00
|
|
|
import tempfile
|
|
|
|
import time
|
2012-12-28 01:47:37 -05:00
|
|
|
from subprocess import Popen, PIPE
|
2015-04-21 03:08:18 -04:00
|
|
|
|
|
|
|
if sys.version >= '3':
|
|
|
|
xrange = range
|
|
|
|
|
2018-06-19 16:56:51 -04:00
|
|
|
from py4j.java_gateway import java_import, JavaGateway, JavaObject, GatewayParameters
|
[SPARK-22340][PYTHON] Add a mode to pin Python thread into JVM's
## What changes were proposed in this pull request?
This PR proposes to add **Single threading model design (pinned thread model)** mode which is an experimental mode to sync threads on PVM and JVM. See https://www.py4j.org/advanced_topics.html#using-single-threading-model-pinned-thread
### Multi threading model
Currently, PySpark uses this model. Threads on PVM and JVM are independent. For instance, in a different Python thread, callbacks are received and relevant Python codes are executed. JVM threads are reused when possible.
Py4J will create a new thread every time a command is received and there is no thread available. See the current model we're using - https://www.py4j.org/advanced_topics.html#the-multi-threading-model
One problem in this model is that we can't sync threads on PVM and JVM out of the box. This leads to some problems in particular at some codes related to threading in JVM side. See:
https://github.com/apache/spark/blob/7056e004ee566fabbb9b22ddee2de55ef03260db/core/src/main/scala/org/apache/spark/SparkContext.scala#L334
Due to reusing JVM threads, seems the job groups in Python threads cannot be set in each thread as described in the JIRA.
### Single threading model design (pinned thread model)
This mode pins and syncs the threads on PVM and JVM to work around the problem above. For instance, in the same Python thread, callbacks are received and relevant Python codes are executed. See https://www.py4j.org/advanced_topics.html#the-single-threading-model
Even though this mode can sync threads on PVM and JVM for other thread related code paths,
this might cause another problem: seems unable to inherit properties as below (assuming multi-thread mode still creates new threads when existing threads are busy, I suspect this issue already exists when multiple jobs are submitted in multi-thread mode; however, it can be always seen in single threading mode):
```bash
$ PYSPARK_PIN_THREAD=true ./bin/pyspark
```
```python
import threading
spark.sparkContext.setLocalProperty("a", "hi")
def print_prop():
print(spark.sparkContext.getLocalProperty("a"))
threading.Thread(target=print_prop).start()
```
```
None
```
Unlike Scala side:
```scala
spark.sparkContext.setLocalProperty("a", "hi")
new Thread(new Runnable {
def run() = println(spark.sparkContext.getLocalProperty("a"))
}).start()
```
```
hi
```
This behaviour potentially could cause weird issues but this PR currently does not target this fix this for now since this mode is experimental.
### How does this PR fix?
Basically there are two types of Py4J servers `GatewayServer` and `ClientServer`. The former is for multi threading and the latter is for single threading. This PR adds a switch to use the latter.
In Scala side:
The logic to select a server is encapsulated in `Py4JServer` and use `Py4JServer` at `PythonRunner` for Spark summit and `PythonGatewayServer` for Spark shell. Each uses `ClientServer` when `PYSPARK_PIN_THREAD` is `true` and `GatewayServer` otherwise.
In Python side:
Simply do an if-else to switch the server to talk. It uses `ClientServer` when `PYSPARK_PIN_THREAD` is `true` and `GatewayServer` otherwise.
This is disabled by default for now.
## How was this patch tested?
Manually tested. This can be tested via:
```python
PYSPARK_PIN_THREAD=true ./bin/pyspark
```
and/or
```bash
cd python
./run-tests --python-executables=python --testnames "pyspark.tests.test_pin_thread"
```
Also, ran the Jenkins tests with `PYSPARK_PIN_THREAD` enabled.
Closes #24898 from HyukjinKwon/pinned-thread.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-11-07 16:44:58 -05:00
|
|
|
from py4j.clientserver import ClientServer, JavaParameters, PythonParameters
|
2016-11-16 17:22:15 -05:00
|
|
|
from pyspark.find_spark_home import _find_spark_home
|
2018-04-13 17:28:24 -04:00
|
|
|
from pyspark.serializers import read_int, write_with_length, UTF8Deserializer
|
2018-08-29 20:13:11 -04:00
|
|
|
from pyspark.util import _exception_message
|
2015-02-16 18:25:11 -05:00
|
|
|
|
2014-07-22 01:30:53 -04:00
|
|
|
|
2019-02-15 21:08:06 -05:00
|
|
|
def launch_gateway(conf=None, popen_kwargs=None):
|
2016-10-11 17:56:26 -04:00
|
|
|
"""
|
|
|
|
launch jvm gateway
|
|
|
|
:param conf: spark configuration passed to spark-submit
|
2019-02-15 21:08:06 -05:00
|
|
|
:param popen_kwargs: Dictionary of kwargs to pass to Popen when spawning
|
|
|
|
the py4j JVM. This is a developer feature intended for use in
|
|
|
|
customizing how pyspark interacts with the py4j JVM (e.g., capturing
|
|
|
|
stdout/stderr).
|
2016-10-11 17:56:26 -04:00
|
|
|
:return:
|
|
|
|
"""
|
[SPARK-1549] Add Python support to spark-submit
This PR updates spark-submit to allow submitting Python scripts (currently only with deploy-mode=client, but that's all that was supported before) and updates the PySpark code to properly find various paths, etc. One significant change is that we assume we can always find the Python files either from the Spark assembly JAR (which will happen with the Maven assembly build in make-distribution.sh) or from SPARK_HOME (which will exist in local mode even if you use sbt assembly, and should be enough for testing). This means we no longer need a weird hack to modify the environment for YARN.
This patch also updates the Python worker manager to run python with -u, which means unbuffered output (send it to our logs right away instead of waiting a while after stuff was written); this should simplify debugging.
In addition, it fixes https://issues.apache.org/jira/browse/SPARK-1709, setting the main class from a JAR's Main-Class attribute if not specified by the user, and fixes a few help strings and style issues in spark-submit.
In the future we may want to make the `pyspark` shell use spark-submit as well, but it seems unnecessary for 1.0.
Author: Matei Zaharia <matei@databricks.com>
Closes #664 from mateiz/py-submit and squashes the following commits:
15e9669 [Matei Zaharia] Fix some uses of path.separator property
051278c [Matei Zaharia] Small style fixes
0afe886 [Matei Zaharia] Add license headers
4650412 [Matei Zaharia] Add pyFiles to PYTHONPATH in executors, remove old YARN stuff, add tests
15f8e1e [Matei Zaharia] Set PYTHONPATH in PythonWorkerFactory in case it wasn't set from outside
47c0655 [Matei Zaharia] More work to make spark-submit work with Python:
d4375bd [Matei Zaharia] Clean up description of spark-submit args a bit and add Python ones
2014-05-06 18:12:35 -04:00
|
|
|
if "PYSPARK_GATEWAY_PORT" in os.environ:
|
|
|
|
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
|
2018-04-13 17:28:24 -04:00
|
|
|
gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"]
|
2019-02-15 21:08:06 -05:00
|
|
|
# Process already exists
|
|
|
|
proc = None
|
2013-09-01 21:19:29 -04:00
|
|
|
else:
|
2016-11-16 17:22:15 -05:00
|
|
|
SPARK_HOME = _find_spark_home()
|
[SPARK-1549] Add Python support to spark-submit
This PR updates spark-submit to allow submitting Python scripts (currently only with deploy-mode=client, but that's all that was supported before) and updates the PySpark code to properly find various paths, etc. One significant change is that we assume we can always find the Python files either from the Spark assembly JAR (which will happen with the Maven assembly build in make-distribution.sh) or from SPARK_HOME (which will exist in local mode even if you use sbt assembly, and should be enough for testing). This means we no longer need a weird hack to modify the environment for YARN.
This patch also updates the Python worker manager to run python with -u, which means unbuffered output (send it to our logs right away instead of waiting a while after stuff was written); this should simplify debugging.
In addition, it fixes https://issues.apache.org/jira/browse/SPARK-1709, setting the main class from a JAR's Main-Class attribute if not specified by the user, and fixes a few help strings and style issues in spark-submit.
In the future we may want to make the `pyspark` shell use spark-submit as well, but it seems unnecessary for 1.0.
Author: Matei Zaharia <matei@databricks.com>
Closes #664 from mateiz/py-submit and squashes the following commits:
15e9669 [Matei Zaharia] Fix some uses of path.separator property
051278c [Matei Zaharia] Small style fixes
0afe886 [Matei Zaharia] Add license headers
4650412 [Matei Zaharia] Add pyFiles to PYTHONPATH in executors, remove old YARN stuff, add tests
15f8e1e [Matei Zaharia] Set PYTHONPATH in PythonWorkerFactory in case it wasn't set from outside
47c0655 [Matei Zaharia] More work to make spark-submit work with Python:
d4375bd [Matei Zaharia] Clean up description of spark-submit args a bit and add Python ones
2014-05-06 18:12:35 -04:00
|
|
|
# Launch the Py4j gateway using Spark's run command so that we pick up the
|
|
|
|
# proper classpath and settings from spark-env.sh
|
|
|
|
on_windows = platform.system() == "Windows"
|
[SPARK-1808] Route bin/pyspark through Spark submit
**Problem.** For `bin/pyspark`, there is currently no other way to specify Spark configuration properties other than through `SPARK_JAVA_OPTS` in `conf/spark-env.sh`. However, this mechanism is supposedly deprecated. Instead, it needs to pick up configurations explicitly specified in `conf/spark-defaults.conf`.
**Solution.** Have `bin/pyspark` invoke `bin/spark-submit`, like all of its counterparts in Scala land (i.e. `bin/spark-shell`, `bin/run-example`). This has the additional benefit of making the invocation of all the user facing Spark scripts consistent.
**Details.** `bin/pyspark` inherently handles two cases: (1) running python applications and (2) running the python shell. For (1), Spark submit already handles running python applications. For cases in which `bin/pyspark` is given a python file, we can simply call pass the file directly to Spark submit and let it handle the rest.
For case (2), `bin/pyspark` starts a python process as before, which launches the JVM as a sub-process. The existing code already provides a code path to do this. All we needed to change is to use `bin/spark-submit` instead of `spark-class` to launch the JVM. This requires modifications to Spark submit to handle the pyspark shell as a special case.
This has been tested locally (OSX and Windows 7), on a standalone cluster, and on a YARN cluster. Running IPython also works as before, except now it takes in Spark submit arguments too.
Author: Andrew Or <andrewor14@gmail.com>
Closes #799 from andrewor14/pyspark-submit and squashes the following commits:
bf37e36 [Andrew Or] Minor changes
01066fa [Andrew Or] bin/pyspark for Windows
c8cb3bf [Andrew Or] Handle perverse app names (with escaped quotes)
1866f85 [Andrew Or] Windows is not cooperating
456d844 [Andrew Or] Guard against shlex hanging if PYSPARK_SUBMIT_ARGS is not set
7eebda8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-submit
b7ba0d8 [Andrew Or] Address a few comments (minor)
06eb138 [Andrew Or] Use shlex instead of writing our own parser
05879fa [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-submit
a823661 [Andrew Or] Fix --die-on-broken-pipe not propagated properly
6fba412 [Andrew Or] Deal with quotes + address various comments
fe4c8a7 [Andrew Or] Update --help for bin/pyspark
afe47bf [Andrew Or] Fix spark shell
f04aaa4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-submit
a371d26 [Andrew Or] Route bin/pyspark through Spark submit
2014-05-17 01:34:38 -04:00
|
|
|
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
|
2016-10-11 17:56:26 -04:00
|
|
|
command = [os.path.join(SPARK_HOME, script)]
|
|
|
|
if conf:
|
|
|
|
for k, v in conf.getAll():
|
|
|
|
command += ['--conf', '%s=%s' % (k, v)]
|
2015-03-16 19:26:55 -04:00
|
|
|
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
|
2015-06-30 00:32:40 -04:00
|
|
|
if os.environ.get("SPARK_TESTING"):
|
2015-07-30 13:45:32 -04:00
|
|
|
submit_args = ' '.join([
|
|
|
|
"--conf spark.ui.enabled=false",
|
|
|
|
submit_args
|
|
|
|
])
|
2016-10-11 17:56:26 -04:00
|
|
|
command = command + shlex.split(submit_args)
|
2015-02-16 18:25:11 -05:00
|
|
|
|
2018-04-13 17:28:24 -04:00
|
|
|
# Create a temporary directory where the gateway server should write the connection
|
|
|
|
# information.
|
|
|
|
conn_info_dir = tempfile.mkdtemp()
|
|
|
|
try:
|
|
|
|
fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir)
|
|
|
|
os.close(fd)
|
|
|
|
os.unlink(conn_info_file)
|
|
|
|
|
|
|
|
env = dict(os.environ)
|
|
|
|
env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file
|
|
|
|
|
|
|
|
# Launch the Java gateway.
|
2019-02-15 21:08:06 -05:00
|
|
|
popen_kwargs = {} if popen_kwargs is None else popen_kwargs
|
2018-04-13 17:28:24 -04:00
|
|
|
# We open a pipe to stdin so that the Java gateway can die when the pipe is broken
|
2019-02-15 21:08:06 -05:00
|
|
|
popen_kwargs['stdin'] = PIPE
|
|
|
|
# We always set the necessary environment variables.
|
|
|
|
popen_kwargs['env'] = env
|
2018-04-13 17:28:24 -04:00
|
|
|
if not on_windows:
|
|
|
|
# Don't send ctrl-c / SIGINT to the Java gateway:
|
|
|
|
def preexec_func():
|
|
|
|
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
2019-02-15 21:08:06 -05:00
|
|
|
popen_kwargs['preexec_fn'] = preexec_func
|
|
|
|
proc = Popen(command, **popen_kwargs)
|
2018-04-13 17:28:24 -04:00
|
|
|
else:
|
|
|
|
# preexec_fn not supported on Windows
|
2019-02-15 21:08:06 -05:00
|
|
|
proc = Popen(command, **popen_kwargs)
|
2018-04-13 17:28:24 -04:00
|
|
|
|
|
|
|
# Wait for the file to appear, or for the process to exit, whichever happens first.
|
|
|
|
while not proc.poll() and not os.path.isfile(conn_info_file):
|
|
|
|
time.sleep(0.1)
|
|
|
|
|
|
|
|
if not os.path.isfile(conn_info_file):
|
|
|
|
raise Exception("Java gateway process exited before sending its port number")
|
|
|
|
|
|
|
|
with open(conn_info_file, "rb") as info:
|
|
|
|
gateway_port = read_int(info)
|
|
|
|
gateway_secret = UTF8Deserializer().loads(info)
|
|
|
|
finally:
|
|
|
|
shutil.rmtree(conn_info_dir)
|
2014-06-18 16:16:26 -04:00
|
|
|
|
2014-08-27 01:52:16 -04:00
|
|
|
# In Windows, ensure the Java child processes do not linger after Python has exited.
|
|
|
|
# In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
|
|
|
|
# the parent process' stdin sends an EOF). In Windows, however, this is not possible
|
|
|
|
# because java.lang.Process reads directly from the parent process' stdin, contending
|
|
|
|
# with any opportunity to read an EOF from the parent. Note that this is only best
|
|
|
|
# effort and will not take effect if the python process is violently terminated.
|
|
|
|
if on_windows:
|
|
|
|
# In Windows, the child process here is "spark-submit.cmd", not the JVM itself
|
|
|
|
# (because the UNIX "exec" command is not available). This means we cannot simply
|
|
|
|
# call proc.kill(), which kills only the "spark-submit.cmd" process but not the
|
|
|
|
# JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all
|
|
|
|
# child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx)
|
|
|
|
def killChild():
|
|
|
|
Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)])
|
|
|
|
atexit.register(killChild)
|
|
|
|
|
[SPARK-22340][PYTHON] Add a mode to pin Python thread into JVM's
## What changes were proposed in this pull request?
This PR proposes to add **Single threading model design (pinned thread model)** mode which is an experimental mode to sync threads on PVM and JVM. See https://www.py4j.org/advanced_topics.html#using-single-threading-model-pinned-thread
### Multi threading model
Currently, PySpark uses this model. Threads on PVM and JVM are independent. For instance, in a different Python thread, callbacks are received and relevant Python codes are executed. JVM threads are reused when possible.
Py4J will create a new thread every time a command is received and there is no thread available. See the current model we're using - https://www.py4j.org/advanced_topics.html#the-multi-threading-model
One problem in this model is that we can't sync threads on PVM and JVM out of the box. This leads to some problems in particular at some codes related to threading in JVM side. See:
https://github.com/apache/spark/blob/7056e004ee566fabbb9b22ddee2de55ef03260db/core/src/main/scala/org/apache/spark/SparkContext.scala#L334
Due to reusing JVM threads, seems the job groups in Python threads cannot be set in each thread as described in the JIRA.
### Single threading model design (pinned thread model)
This mode pins and syncs the threads on PVM and JVM to work around the problem above. For instance, in the same Python thread, callbacks are received and relevant Python codes are executed. See https://www.py4j.org/advanced_topics.html#the-single-threading-model
Even though this mode can sync threads on PVM and JVM for other thread related code paths,
this might cause another problem: seems unable to inherit properties as below (assuming multi-thread mode still creates new threads when existing threads are busy, I suspect this issue already exists when multiple jobs are submitted in multi-thread mode; however, it can be always seen in single threading mode):
```bash
$ PYSPARK_PIN_THREAD=true ./bin/pyspark
```
```python
import threading
spark.sparkContext.setLocalProperty("a", "hi")
def print_prop():
print(spark.sparkContext.getLocalProperty("a"))
threading.Thread(target=print_prop).start()
```
```
None
```
Unlike Scala side:
```scala
spark.sparkContext.setLocalProperty("a", "hi")
new Thread(new Runnable {
def run() = println(spark.sparkContext.getLocalProperty("a"))
}).start()
```
```
hi
```
This behaviour potentially could cause weird issues but this PR currently does not target this fix this for now since this mode is experimental.
### How does this PR fix?
Basically there are two types of Py4J servers `GatewayServer` and `ClientServer`. The former is for multi threading and the latter is for single threading. This PR adds a switch to use the latter.
In Scala side:
The logic to select a server is encapsulated in `Py4JServer` and use `Py4JServer` at `PythonRunner` for Spark summit and `PythonGatewayServer` for Spark shell. Each uses `ClientServer` when `PYSPARK_PIN_THREAD` is `true` and `GatewayServer` otherwise.
In Python side:
Simply do an if-else to switch the server to talk. It uses `ClientServer` when `PYSPARK_PIN_THREAD` is `true` and `GatewayServer` otherwise.
This is disabled by default for now.
## How was this patch tested?
Manually tested. This can be tested via:
```python
PYSPARK_PIN_THREAD=true ./bin/pyspark
```
and/or
```bash
cd python
./run-tests --python-executables=python --testnames "pyspark.tests.test_pin_thread"
```
Also, ran the Jenkins tests with `PYSPARK_PIN_THREAD` enabled.
Closes #24898 from HyukjinKwon/pinned-thread.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-11-07 16:44:58 -05:00
|
|
|
# Connect to the gateway (or client server to pin the thread between JVM and Python)
|
|
|
|
if os.environ.get("PYSPARK_PIN_THREAD", "false").lower() == "true":
|
|
|
|
gateway = ClientServer(
|
|
|
|
java_parameters=JavaParameters(
|
|
|
|
port=gateway_port,
|
|
|
|
auth_token=gateway_secret,
|
|
|
|
auto_convert=True),
|
|
|
|
python_parameters=PythonParameters(
|
|
|
|
port=0,
|
|
|
|
eager_load=False))
|
|
|
|
else:
|
|
|
|
gateway = JavaGateway(
|
|
|
|
gateway_parameters=GatewayParameters(
|
|
|
|
port=gateway_port,
|
|
|
|
auth_token=gateway_secret,
|
|
|
|
auto_convert=True))
|
|
|
|
|
2019-02-15 21:08:06 -05:00
|
|
|
# Store a reference to the Popen object for use by the caller (e.g., in reading stdout/stderr)
|
|
|
|
gateway.proc = proc
|
[SPARK-1549] Add Python support to spark-submit
This PR updates spark-submit to allow submitting Python scripts (currently only with deploy-mode=client, but that's all that was supported before) and updates the PySpark code to properly find various paths, etc. One significant change is that we assume we can always find the Python files either from the Spark assembly JAR (which will happen with the Maven assembly build in make-distribution.sh) or from SPARK_HOME (which will exist in local mode even if you use sbt assembly, and should be enough for testing). This means we no longer need a weird hack to modify the environment for YARN.
This patch also updates the Python worker manager to run python with -u, which means unbuffered output (send it to our logs right away instead of waiting a while after stuff was written); this should simplify debugging.
In addition, it fixes https://issues.apache.org/jira/browse/SPARK-1709, setting the main class from a JAR's Main-Class attribute if not specified by the user, and fixes a few help strings and style issues in spark-submit.
In the future we may want to make the `pyspark` shell use spark-submit as well, but it seems unnecessary for 1.0.
Author: Matei Zaharia <matei@databricks.com>
Closes #664 from mateiz/py-submit and squashes the following commits:
15e9669 [Matei Zaharia] Fix some uses of path.separator property
051278c [Matei Zaharia] Small style fixes
0afe886 [Matei Zaharia] Add license headers
4650412 [Matei Zaharia] Add pyFiles to PYTHONPATH in executors, remove old YARN stuff, add tests
15f8e1e [Matei Zaharia] Set PYTHONPATH in PythonWorkerFactory in case it wasn't set from outside
47c0655 [Matei Zaharia] More work to make spark-submit work with Python:
d4375bd [Matei Zaharia] Clean up description of spark-submit args a bit and add Python ones
2014-05-06 18:12:35 -04:00
|
|
|
|
2012-12-28 01:47:37 -05:00
|
|
|
# Import the classes used by PySpark
|
2013-12-29 14:03:39 -05:00
|
|
|
java_import(gateway.jvm, "org.apache.spark.SparkConf")
|
2013-08-31 22:27:07 -04:00
|
|
|
java_import(gateway.jvm, "org.apache.spark.api.java.*")
|
|
|
|
java_import(gateway.jvm, "org.apache.spark.api.python.*")
|
2016-06-13 22:59:53 -04:00
|
|
|
java_import(gateway.jvm, "org.apache.spark.ml.python.*")
|
2013-12-24 16:49:03 -05:00
|
|
|
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
|
2015-01-27 19:08:24 -05:00
|
|
|
# TODO(davies): move into sql
|
|
|
|
java_import(gateway.jvm, "org.apache.spark.sql.*")
|
2017-11-12 23:16:01 -05:00
|
|
|
java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")
|
2015-01-27 19:08:24 -05:00
|
|
|
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
|
2012-08-10 04:10:02 -04:00
|
|
|
java_import(gateway.jvm, "scala.Tuple2")
|
2014-04-30 02:24:34 -04:00
|
|
|
|
[SPARK-1549] Add Python support to spark-submit
This PR updates spark-submit to allow submitting Python scripts (currently only with deploy-mode=client, but that's all that was supported before) and updates the PySpark code to properly find various paths, etc. One significant change is that we assume we can always find the Python files either from the Spark assembly JAR (which will happen with the Maven assembly build in make-distribution.sh) or from SPARK_HOME (which will exist in local mode even if you use sbt assembly, and should be enough for testing). This means we no longer need a weird hack to modify the environment for YARN.
This patch also updates the Python worker manager to run python with -u, which means unbuffered output (send it to our logs right away instead of waiting a while after stuff was written); this should simplify debugging.
In addition, it fixes https://issues.apache.org/jira/browse/SPARK-1709, setting the main class from a JAR's Main-Class attribute if not specified by the user, and fixes a few help strings and style issues in spark-submit.
In the future we may want to make the `pyspark` shell use spark-submit as well, but it seems unnecessary for 1.0.
Author: Matei Zaharia <matei@databricks.com>
Closes #664 from mateiz/py-submit and squashes the following commits:
15e9669 [Matei Zaharia] Fix some uses of path.separator property
051278c [Matei Zaharia] Small style fixes
0afe886 [Matei Zaharia] Add license headers
4650412 [Matei Zaharia] Add pyFiles to PYTHONPATH in executors, remove old YARN stuff, add tests
15f8e1e [Matei Zaharia] Set PYTHONPATH in PythonWorkerFactory in case it wasn't set from outside
47c0655 [Matei Zaharia] More work to make spark-submit work with Python:
d4375bd [Matei Zaharia] Clean up description of spark-submit args a bit and add Python ones
2014-05-06 18:12:35 -04:00
|
|
|
return gateway
|
2018-04-13 17:28:24 -04:00
|
|
|
|
|
|
|
|
2018-08-28 21:47:38 -04:00
|
|
|
def _do_server_auth(conn, auth_secret):
|
2018-04-13 17:28:24 -04:00
|
|
|
"""
|
|
|
|
Performs the authentication protocol defined by the SocketAuthHelper class on the given
|
|
|
|
file-like object 'conn'.
|
|
|
|
"""
|
|
|
|
write_with_length(auth_secret.encode("utf-8"), conn)
|
|
|
|
conn.flush()
|
|
|
|
reply = UTF8Deserializer().loads(conn)
|
|
|
|
if reply != "ok":
|
|
|
|
conn.close()
|
|
|
|
raise Exception("Unexpected reply from iterator server.")
|
2018-06-19 16:56:51 -04:00
|
|
|
|
|
|
|
|
2018-08-28 21:47:38 -04:00
|
|
|
def local_connect_and_auth(port, auth_secret):
|
|
|
|
"""
|
|
|
|
Connect to local host, authenticate with it, and return a (sockfile,sock) for that connection.
|
|
|
|
Handles IPV4 & IPV6, does some error handling.
|
|
|
|
:param port
|
|
|
|
:param auth_secret
|
|
|
|
:return: a tuple with (sockfile, sock)
|
|
|
|
"""
|
|
|
|
sock = None
|
|
|
|
errors = []
|
|
|
|
# Support for both IPv4 and IPv6.
|
|
|
|
# On most of IPv6-ready systems, IPv6 will take precedence.
|
|
|
|
for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
|
|
|
|
af, socktype, proto, _, sa = res
|
|
|
|
try:
|
|
|
|
sock = socket.socket(af, socktype, proto)
|
|
|
|
sock.settimeout(15)
|
|
|
|
sock.connect(sa)
|
[SPARK-27870][SQL][PYTHON] Add a runtime buffer size configuration for Pandas UDFs
## What changes were proposed in this pull request?
This PR is an alternative approach for #24734.
This PR fixes two things:
1. Respects `spark.buffer.size` in Python workers.
2. Adds a runtime buffer size configuration for Pandas UDFs, `spark.sql.pandas.udf.buffer.size` (which falls back to `spark.buffer.size`.
## How was this patch tested?
Manually tested:
```python
import time
from pyspark.sql.functions import *
spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1')
df = spark.range(1, 31, numPartitions=1).select(col('id').alias('a'))
pandas_udf("int", PandasUDFType.SCALAR)
def fp1(x):
print("run fp1")
time.sleep(1)
return x + 100
pandas_udf("int", PandasUDFType.SCALAR)
def fp2(x, y):
print("run fp2")
time.sleep(1)
return x + y
beg_time = time.time()
result = df.select(sum(fp2(fp1('a'), col('a')))).head()
print("result: " + str(result[0]))
print("consume time: " + str(time.time() - beg_time))
```
```
consume time: 62.68265891075134
```
```python
import time
from pyspark.sql.functions import *
spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1')
spark.conf.set('spark.sql.pandas.udf.buffer.size', '4')
df = spark.range(1, 31, numPartitions=1).select(col('id').alias('a'))
pandas_udf("int", PandasUDFType.SCALAR)
def fp1(x):
print("run fp1")
time.sleep(1)
return x + 100
pandas_udf("int", PandasUDFType.SCALAR)
def fp2(x, y):
print("run fp2")
time.sleep(1)
return x + y
beg_time = time.time()
result = df.select(sum(fp2(fp1('a'), col('a')))).head()
print("result: " + str(result[0]))
print("consume time: " + str(time.time() - beg_time))
```
```
consume time: 34.00594782829285
```
Closes #24826 from HyukjinKwon/SPARK-27870.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-15 07:56:22 -04:00
|
|
|
sockfile = sock.makefile("rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536)))
|
2018-08-28 21:47:38 -04:00
|
|
|
_do_server_auth(sockfile, auth_secret)
|
|
|
|
return (sockfile, sock)
|
|
|
|
except socket.error as e:
|
|
|
|
emsg = _exception_message(e)
|
|
|
|
errors.append("tried to connect to %s, but an error occured: %s" % (sa, emsg))
|
|
|
|
sock.close()
|
|
|
|
sock = None
|
2019-01-17 20:40:39 -05:00
|
|
|
raise Exception("could not open socket: %s" % errors)
|
2018-08-28 21:47:38 -04:00
|
|
|
|
|
|
|
|
2018-06-19 16:56:51 -04:00
|
|
|
def ensure_callback_server_started(gw):
|
|
|
|
"""
|
|
|
|
Start callback server if not already started. The callback server is needed if the Java
|
|
|
|
driver process needs to callback into the Python driver process to execute Python code.
|
|
|
|
"""
|
|
|
|
|
|
|
|
# getattr will fallback to JVM, so we cannot test by hasattr()
|
|
|
|
if "_callback_server" not in gw.__dict__ or gw._callback_server is None:
|
|
|
|
gw.callback_server_parameters.eager_load = True
|
|
|
|
gw.callback_server_parameters.daemonize = True
|
|
|
|
gw.callback_server_parameters.daemonize_connections = True
|
|
|
|
gw.callback_server_parameters.port = 0
|
|
|
|
gw.start_callback_server(gw.callback_server_parameters)
|
|
|
|
cbport = gw._callback_server.server_socket.getsockname()[1]
|
|
|
|
gw._callback_server.port = cbport
|
|
|
|
# gateway with real port
|
|
|
|
gw._python_proxy_port = gw._callback_server.port
|
|
|
|
# get the GatewayServer object in JVM by ID
|
|
|
|
jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client)
|
|
|
|
# update the port of CallbackClient with real port
|
|
|
|
jgws.resetCallbackClient(jgws.getCallbackClient().getAddress(), gw._python_proxy_port)
|