spark-instrumented-optimizer/python/pyspark/java_gateway.py
HyukjinKwon 4ec04e5ef3 [SPARK-22340][PYTHON] Add a mode to pin Python thread into JVM's
## What changes were proposed in this pull request?

This PR proposes to add **Single threading model design (pinned thread model)** mode which is an experimental mode to sync threads on PVM and JVM. See https://www.py4j.org/advanced_topics.html#using-single-threading-model-pinned-thread

### Multi threading model

Currently, PySpark uses this model. Threads on PVM and JVM are independent. For instance, in a different Python thread, callbacks are received and relevant Python codes are executed. JVM threads are reused when possible.

Py4J will create a new thread every time a command is received and there is no thread available. See the current model we're using - https://www.py4j.org/advanced_topics.html#the-multi-threading-model

One problem in this model is that we can't sync threads on PVM and JVM out of the box. This leads to some problems in particular at some codes related to threading in JVM side. See:
7056e004ee/core/src/main/scala/org/apache/spark/SparkContext.scala (L334)
Due to reusing JVM threads, seems the job groups in Python threads cannot be set in each thread as described in the JIRA.

### Single threading model design (pinned thread model)

This mode pins and syncs the threads on PVM and JVM to work around the problem above. For instance, in the same Python thread, callbacks are received and relevant Python codes are executed. See https://www.py4j.org/advanced_topics.html#the-single-threading-model

Even though this mode can sync threads on PVM and JVM for other thread related code paths,
 this might cause another problem: seems unable to inherit properties as below (assuming multi-thread mode still creates new threads when existing threads are busy, I suspect this issue already exists when multiple jobs are submitted in multi-thread mode; however, it can be always seen in single threading mode):

```bash
$ PYSPARK_PIN_THREAD=true ./bin/pyspark
```

```python
import threading

spark.sparkContext.setLocalProperty("a", "hi")
def print_prop():
    print(spark.sparkContext.getLocalProperty("a"))

threading.Thread(target=print_prop).start()
```

```
None
```

Unlike Scala side:

```scala
spark.sparkContext.setLocalProperty("a", "hi")
new Thread(new Runnable {
  def run() = println(spark.sparkContext.getLocalProperty("a"))
}).start()
```

```
hi
```

This behaviour potentially could cause weird issues but this PR currently does not target this fix this for now since this mode is experimental.

### How does this PR fix?

Basically there are two types of Py4J servers `GatewayServer` and `ClientServer`.  The former is for multi threading and the latter is for single threading. This PR adds a switch to use the latter.

In Scala side:
The logic to select a server is encapsulated in `Py4JServer` and use `Py4JServer` at `PythonRunner` for Spark summit and `PythonGatewayServer` for Spark shell. Each uses `ClientServer` when `PYSPARK_PIN_THREAD` is `true` and `GatewayServer` otherwise.

In Python side:
Simply do an if-else to switch the server to talk. It uses `ClientServer` when `PYSPARK_PIN_THREAD` is `true` and `GatewayServer` otherwise.

This is disabled by default for now.

## How was this patch tested?

Manually tested. This can be tested via:

```python
PYSPARK_PIN_THREAD=true ./bin/pyspark
```

and/or

```bash
cd python
./run-tests --python-executables=python --testnames "pyspark.tests.test_pin_thread"
```

Also, ran the Jenkins tests with `PYSPARK_PIN_THREAD` enabled.

Closes #24898 from HyukjinKwon/pinned-thread.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-11-08 06:44:58 +09:00

227 lines
9.6 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import atexit
import os
import sys
import signal
import shlex
import shutil
import socket
import platform
import tempfile
import time
from subprocess import Popen, PIPE
if sys.version >= '3':
xrange = range
from py4j.java_gateway import java_import, JavaGateway, JavaObject, GatewayParameters
from py4j.clientserver import ClientServer, JavaParameters, PythonParameters
from pyspark.find_spark_home import _find_spark_home
from pyspark.serializers import read_int, write_with_length, UTF8Deserializer
from pyspark.util import _exception_message
def launch_gateway(conf=None, popen_kwargs=None):
"""
launch jvm gateway
:param conf: spark configuration passed to spark-submit
:param popen_kwargs: Dictionary of kwargs to pass to Popen when spawning
the py4j JVM. This is a developer feature intended for use in
customizing how pyspark interacts with the py4j JVM (e.g., capturing
stdout/stderr).
:return:
"""
if "PYSPARK_GATEWAY_PORT" in os.environ:
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"]
# Process already exists
proc = None
else:
SPARK_HOME = _find_spark_home()
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
command = [os.path.join(SPARK_HOME, script)]
if conf:
for k, v in conf.getAll():
command += ['--conf', '%s=%s' % (k, v)]
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
if os.environ.get("SPARK_TESTING"):
submit_args = ' '.join([
"--conf spark.ui.enabled=false",
submit_args
])
command = command + shlex.split(submit_args)
# Create a temporary directory where the gateway server should write the connection
# information.
conn_info_dir = tempfile.mkdtemp()
try:
fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir)
os.close(fd)
os.unlink(conn_info_file)
env = dict(os.environ)
env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file
# Launch the Java gateway.
popen_kwargs = {} if popen_kwargs is None else popen_kwargs
# We open a pipe to stdin so that the Java gateway can die when the pipe is broken
popen_kwargs['stdin'] = PIPE
# We always set the necessary environment variables.
popen_kwargs['env'] = env
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
popen_kwargs['preexec_fn'] = preexec_func
proc = Popen(command, **popen_kwargs)
else:
# preexec_fn not supported on Windows
proc = Popen(command, **popen_kwargs)
# Wait for the file to appear, or for the process to exit, whichever happens first.
while not proc.poll() and not os.path.isfile(conn_info_file):
time.sleep(0.1)
if not os.path.isfile(conn_info_file):
raise Exception("Java gateway process exited before sending its port number")
with open(conn_info_file, "rb") as info:
gateway_port = read_int(info)
gateway_secret = UTF8Deserializer().loads(info)
finally:
shutil.rmtree(conn_info_dir)
# In Windows, ensure the Java child processes do not linger after Python has exited.
# In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
# the parent process' stdin sends an EOF). In Windows, however, this is not possible
# because java.lang.Process reads directly from the parent process' stdin, contending
# with any opportunity to read an EOF from the parent. Note that this is only best
# effort and will not take effect if the python process is violently terminated.
if on_windows:
# In Windows, the child process here is "spark-submit.cmd", not the JVM itself
# (because the UNIX "exec" command is not available). This means we cannot simply
# call proc.kill(), which kills only the "spark-submit.cmd" process but not the
# JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all
# child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx)
def killChild():
Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)])
atexit.register(killChild)
# Connect to the gateway (or client server to pin the thread between JVM and Python)
if os.environ.get("PYSPARK_PIN_THREAD", "false").lower() == "true":
gateway = ClientServer(
java_parameters=JavaParameters(
port=gateway_port,
auth_token=gateway_secret,
auto_convert=True),
python_parameters=PythonParameters(
port=0,
eager_load=False))
else:
gateway = JavaGateway(
gateway_parameters=GatewayParameters(
port=gateway_port,
auth_token=gateway_secret,
auto_convert=True))
# Store a reference to the Popen object for use by the caller (e.g., in reading stdout/stderr)
gateway.proc = proc
# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.ml.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
# TODO(davies): move into sql
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "scala.Tuple2")
return gateway
def _do_server_auth(conn, auth_secret):
"""
Performs the authentication protocol defined by the SocketAuthHelper class on the given
file-like object 'conn'.
"""
write_with_length(auth_secret.encode("utf-8"), conn)
conn.flush()
reply = UTF8Deserializer().loads(conn)
if reply != "ok":
conn.close()
raise Exception("Unexpected reply from iterator server.")
def local_connect_and_auth(port, auth_secret):
"""
Connect to local host, authenticate with it, and return a (sockfile,sock) for that connection.
Handles IPV4 & IPV6, does some error handling.
:param port
:param auth_secret
:return: a tuple with (sockfile, sock)
"""
sock = None
errors = []
# Support for both IPv4 and IPv6.
# On most of IPv6-ready systems, IPv6 will take precedence.
for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, _, sa = res
try:
sock = socket.socket(af, socktype, proto)
sock.settimeout(15)
sock.connect(sa)
sockfile = sock.makefile("rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536)))
_do_server_auth(sockfile, auth_secret)
return (sockfile, sock)
except socket.error as e:
emsg = _exception_message(e)
errors.append("tried to connect to %s, but an error occured: %s" % (sa, emsg))
sock.close()
sock = None
raise Exception("could not open socket: %s" % errors)
def ensure_callback_server_started(gw):
"""
Start callback server if not already started. The callback server is needed if the Java
driver process needs to callback into the Python driver process to execute Python code.
"""
# getattr will fallback to JVM, so we cannot test by hasattr()
if "_callback_server" not in gw.__dict__ or gw._callback_server is None:
gw.callback_server_parameters.eager_load = True
gw.callback_server_parameters.daemonize = True
gw.callback_server_parameters.daemonize_connections = True
gw.callback_server_parameters.port = 0
gw.start_callback_server(gw.callback_server_parameters)
cbport = gw._callback_server.server_socket.getsockname()[1]
gw._callback_server.port = cbport
# gateway with real port
gw._python_proxy_port = gw._callback_server.port
# get the GatewayServer object in JVM by ID
jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client)
# update the port of CallbackClient with real port
jgws.resetCallbackClient(jgws.getCallbackClient().getAddress(), gw._python_proxy_port)