[SPARK-12652][PYSPARK] Upgrade Py4J to 0.9.1
- [x] Upgrade Py4J to 0.9.1
- [x] SPARK-12657: Revert SPARK-12617
- [x] SPARK-12658: Revert SPARK-12511
- Still keep the change that only reading checkpoint once. This is a manual change and worth to take a look carefully. bfd4b5c040
- [x] Verify no leak any more after reverting our workarounds
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #10692 from zsxwing/py4j-0.9.1.
This commit is contained in:
parent
8ed5f12d2b
commit
4f60651cbe
2
LICENSE
2
LICENSE
|
@ -264,7 +264,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
|
|||
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
|
||||
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
|
||||
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
|
||||
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
|
||||
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9.1 - http://py4j.sourceforge.net/)
|
||||
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
|
||||
(BSD licence) sbt and sbt-launch-lib.bash
|
||||
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
|
||||
|
|
|
@ -67,7 +67,7 @@ export PYSPARK_PYTHON
|
|||
|
||||
# Add the PySpark classes to the Python path:
|
||||
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
|
||||
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
|
||||
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:$PYTHONPATH"
|
||||
|
||||
# Load the PySpark shell.py script when ./pyspark is used interactively:
|
||||
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
|
||||
|
|
|
@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
|
|||
)
|
||||
|
||||
set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
|
||||
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9-src.zip;%PYTHONPATH%
|
||||
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.1-src.zip;%PYTHONPATH%
|
||||
|
||||
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
|
||||
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
|
||||
|
|
|
@ -350,7 +350,7 @@
|
|||
<dependency>
|
||||
<groupId>net.sf.py4j</groupId>
|
||||
<artifactId>py4j</artifactId>
|
||||
<version>0.9</version>
|
||||
<version>0.9.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
|
|
|
@ -32,7 +32,7 @@ private[spark] object PythonUtils {
|
|||
val pythonPath = new ArrayBuffer[String]
|
||||
for (sparkHome <- sys.env.get("SPARK_HOME")) {
|
||||
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
|
||||
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9-src.zip").mkString(File.separator)
|
||||
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9.1-src.zip").mkString(File.separator)
|
||||
}
|
||||
pythonPath ++= SparkContext.jarOfObject(this)
|
||||
pythonPath.mkString(File.pathSeparator)
|
||||
|
|
|
@ -160,7 +160,7 @@ pmml-agent-1.2.7.jar
|
|||
pmml-model-1.2.7.jar
|
||||
pmml-schema-1.2.7.jar
|
||||
protobuf-java-2.5.0.jar
|
||||
py4j-0.9.jar
|
||||
py4j-0.9.1.jar
|
||||
pyrolite-4.9.jar
|
||||
quasiquotes_2.10-2.0.0-M8.jar
|
||||
reflectasm-1.07-shaded.jar
|
||||
|
|
|
@ -151,7 +151,7 @@ pmml-agent-1.2.7.jar
|
|||
pmml-model-1.2.7.jar
|
||||
pmml-schema-1.2.7.jar
|
||||
protobuf-java-2.5.0.jar
|
||||
py4j-0.9.jar
|
||||
py4j-0.9.1.jar
|
||||
pyrolite-4.9.jar
|
||||
quasiquotes_2.10-2.0.0-M8.jar
|
||||
reflectasm-1.07-shaded.jar
|
||||
|
|
|
@ -152,7 +152,7 @@ pmml-agent-1.2.7.jar
|
|||
pmml-model-1.2.7.jar
|
||||
pmml-schema-1.2.7.jar
|
||||
protobuf-java-2.5.0.jar
|
||||
py4j-0.9.jar
|
||||
py4j-0.9.1.jar
|
||||
pyrolite-4.9.jar
|
||||
quasiquotes_2.10-2.0.0-M8.jar
|
||||
reflectasm-1.07-shaded.jar
|
||||
|
|
|
@ -158,7 +158,7 @@ pmml-agent-1.2.7.jar
|
|||
pmml-model-1.2.7.jar
|
||||
pmml-schema-1.2.7.jar
|
||||
protobuf-java-2.5.0.jar
|
||||
py4j-0.9.jar
|
||||
py4j-0.9.1.jar
|
||||
pyrolite-4.9.jar
|
||||
quasiquotes_2.10-2.0.0-M8.jar
|
||||
reflectasm-1.07-shaded.jar
|
||||
|
|
|
@ -7,7 +7,7 @@ SPHINXBUILD = sphinx-build
|
|||
PAPER =
|
||||
BUILDDIR = _build
|
||||
|
||||
export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9-src.zip)
|
||||
export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9.1-src.zip)
|
||||
|
||||
# User-friendly check for sphinx-build
|
||||
ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)
|
||||
|
|
Binary file not shown.
BIN
python/lib/py4j-0.9.1-src.zip
Normal file
BIN
python/lib/py4j-0.9.1-src.zip
Normal file
Binary file not shown.
|
@ -19,7 +19,6 @@ from __future__ import print_function
|
|||
|
||||
import os
|
||||
import sys
|
||||
from threading import RLock, Timer
|
||||
|
||||
from py4j.java_gateway import java_import, JavaObject
|
||||
|
||||
|
@ -33,63 +32,6 @@ from pyspark.streaming.util import TransformFunction, TransformFunctionSerialize
|
|||
__all__ = ["StreamingContext"]
|
||||
|
||||
|
||||
class Py4jCallbackConnectionCleaner(object):
|
||||
|
||||
"""
|
||||
A cleaner to clean up callback connections that are not closed by Py4j. See SPARK-12617.
|
||||
It will scan all callback connections every 30 seconds and close the dead connections.
|
||||
"""
|
||||
|
||||
def __init__(self, gateway):
|
||||
self._gateway = gateway
|
||||
self._stopped = False
|
||||
self._timer = None
|
||||
self._lock = RLock()
|
||||
|
||||
def start(self):
|
||||
if self._stopped:
|
||||
return
|
||||
|
||||
def clean_closed_connections():
|
||||
from py4j.java_gateway import quiet_close, quiet_shutdown
|
||||
|
||||
callback_server = self._gateway._callback_server
|
||||
if callback_server:
|
||||
with callback_server.lock:
|
||||
try:
|
||||
closed_connections = []
|
||||
for connection in callback_server.connections:
|
||||
if not connection.isAlive():
|
||||
quiet_close(connection.input)
|
||||
quiet_shutdown(connection.socket)
|
||||
quiet_close(connection.socket)
|
||||
closed_connections.append(connection)
|
||||
|
||||
for closed_connection in closed_connections:
|
||||
callback_server.connections.remove(closed_connection)
|
||||
except Exception:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
self._start_timer(clean_closed_connections)
|
||||
|
||||
self._start_timer(clean_closed_connections)
|
||||
|
||||
def _start_timer(self, f):
|
||||
with self._lock:
|
||||
if not self._stopped:
|
||||
self._timer = Timer(30.0, f)
|
||||
self._timer.daemon = True
|
||||
self._timer.start()
|
||||
|
||||
def stop(self):
|
||||
with self._lock:
|
||||
self._stopped = True
|
||||
if self._timer:
|
||||
self._timer.cancel()
|
||||
self._timer = None
|
||||
|
||||
|
||||
class StreamingContext(object):
|
||||
"""
|
||||
Main entry point for Spark Streaming functionality. A StreamingContext
|
||||
|
@ -105,9 +47,6 @@ class StreamingContext(object):
|
|||
# Reference to a currently active StreamingContext
|
||||
_activeContext = None
|
||||
|
||||
# A cleaner to clean leak sockets of callback server every 30 seconds
|
||||
_py4j_cleaner = None
|
||||
|
||||
def __init__(self, sparkContext, batchDuration=None, jssc=None):
|
||||
"""
|
||||
Create a new StreamingContext.
|
||||
|
@ -155,34 +94,12 @@ class StreamingContext(object):
|
|||
# get the GatewayServer object in JVM by ID
|
||||
jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client)
|
||||
# update the port of CallbackClient with real port
|
||||
gw.jvm.PythonDStream.updatePythonGatewayPort(jgws, gw._python_proxy_port)
|
||||
_py4j_cleaner = Py4jCallbackConnectionCleaner(gw)
|
||||
_py4j_cleaner.start()
|
||||
jgws.resetCallbackClient(jgws.getCallbackClient().getAddress(), gw._python_proxy_port)
|
||||
|
||||
# register serializer for TransformFunction
|
||||
# it happens before creating SparkContext when loading from checkpointing
|
||||
if cls._transformerSerializer is None:
|
||||
transformer_serializer = TransformFunctionSerializer()
|
||||
transformer_serializer.init(
|
||||
SparkContext._active_spark_context, CloudPickleSerializer(), gw)
|
||||
# SPARK-12511 streaming driver with checkpointing unable to finalize leading to OOM
|
||||
# There is an issue that Py4J's PythonProxyHandler.finalize blocks forever.
|
||||
# (https://github.com/bartdag/py4j/pull/184)
|
||||
#
|
||||
# Py4j will create a PythonProxyHandler in Java for "transformer_serializer" when
|
||||
# calling "registerSerializer". If we call "registerSerializer" twice, the second
|
||||
# PythonProxyHandler will override the first one, then the first one will be GCed and
|
||||
# trigger "PythonProxyHandler.finalize". To avoid that, we should not call
|
||||
# "registerSerializer" more than once, so that "PythonProxyHandler" in Java side won't
|
||||
# be GCed.
|
||||
#
|
||||
# TODO Once Py4J fixes this issue, we should upgrade Py4j to the latest version.
|
||||
transformer_serializer.gateway.jvm.PythonDStream.registerSerializer(
|
||||
transformer_serializer)
|
||||
cls._transformerSerializer = transformer_serializer
|
||||
else:
|
||||
cls._transformerSerializer.init(
|
||||
SparkContext._active_spark_context, CloudPickleSerializer(), gw)
|
||||
cls._transformerSerializer = TransformFunctionSerializer(
|
||||
SparkContext._active_spark_context, CloudPickleSerializer(), gw)
|
||||
|
||||
@classmethod
|
||||
def getOrCreate(cls, checkpointPath, setupFunc):
|
||||
|
|
|
@ -89,10 +89,11 @@ class TransformFunctionSerializer(object):
|
|||
it uses this class to invoke Python, which returns the serialized function
|
||||
as a byte array.
|
||||
"""
|
||||
def init(self, ctx, serializer, gateway=None):
|
||||
def __init__(self, ctx, serializer, gateway=None):
|
||||
self.ctx = ctx
|
||||
self.serializer = serializer
|
||||
self.gateway = gateway or self.ctx._gateway
|
||||
self.gateway.jvm.PythonDStream.registerSerializer(self)
|
||||
self.failure = None
|
||||
|
||||
def dumps(self, id):
|
||||
|
|
|
@ -27,4 +27,4 @@ fi
|
|||
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
|
||||
# Add the PySpark classes to the PYTHONPATH:
|
||||
export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
|
||||
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9-src.zip:${PYTHONPATH}"
|
||||
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:${PYTHONPATH}"
|
||||
|
|
|
@ -169,16 +169,6 @@ private[python] object PythonDStream {
|
|||
PythonTransformFunctionSerializer.register(ser)
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the port of callback client to `port`
|
||||
*/
|
||||
def updatePythonGatewayPort(gws: GatewayServer, port: Int): Unit = {
|
||||
val cl = gws.getCallbackClient
|
||||
val f = cl.getClass.getDeclaredField("port")
|
||||
f.setAccessible(true)
|
||||
f.setInt(cl, port)
|
||||
}
|
||||
|
||||
/**
|
||||
* helper function for DStream.foreachRDD(),
|
||||
* cannot be `foreachRDD`, it will confusing py4j
|
||||
|
|
|
@ -1044,9 +1044,9 @@ private[spark] class Client(
|
|||
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
|
||||
require(pyArchivesFile.exists(),
|
||||
"pyspark.zip not found; cannot run pyspark application in YARN mode.")
|
||||
val py4jFile = new File(pyLibPath, "py4j-0.9-src.zip")
|
||||
val py4jFile = new File(pyLibPath, "py4j-0.9.1-src.zip")
|
||||
require(py4jFile.exists(),
|
||||
"py4j-0.9-src.zip not found; cannot run pyspark application in YARN mode.")
|
||||
"py4j-0.9.1-src.zip not found; cannot run pyspark application in YARN mode.")
|
||||
Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -151,9 +151,9 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
|
|||
// When running tests, let's not assume the user has built the assembly module, which also
|
||||
// creates the pyspark archive. Instead, let's use PYSPARK_ARCHIVES_PATH to point at the
|
||||
// needed locations.
|
||||
val sparkHome = sys.props("spark.test.home");
|
||||
val sparkHome = sys.props("spark.test.home")
|
||||
val pythonPath = Seq(
|
||||
s"$sparkHome/python/lib/py4j-0.9-src.zip",
|
||||
s"$sparkHome/python/lib/py4j-0.9.1-src.zip",
|
||||
s"$sparkHome/python")
|
||||
val extraEnv = Map(
|
||||
"PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),
|
||||
|
|
Loading…
Reference in a new issue