[SPARK-10447][SPARK-3842][PYSPARK] upgrade pyspark to py4j0.9
Upgrade to Py4j0.9 Author: Holden Karau <holden@pigscanfly.ca> Author: Holden Karau <holden@us.ibm.com> Closes #8615 from holdenk/SPARK-10447-upgrade-pyspark-to-py4j0.9.
This commit is contained in:
parent
94139557c5
commit
e18b571c33
2
LICENSE
2
LICENSE
|
@ -265,7 +265,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
|
|||
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
|
||||
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
|
||||
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
|
||||
(The New BSD License) Py4J (net.sf.py4j:py4j:0.8.2.1 - http://py4j.sourceforge.net/)
|
||||
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
|
||||
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
|
||||
(BSD licence) sbt and sbt-launch-lib.bash
|
||||
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
|
||||
|
|
|
@ -65,7 +65,7 @@ export PYSPARK_PYTHON
|
|||
|
||||
# Add the PySpark classes to the Python path:
|
||||
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
|
||||
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
|
||||
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
|
||||
|
||||
# Load the PySpark shell.py script when ./pyspark is used interactively:
|
||||
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
|
||||
|
|
|
@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
|
|||
)
|
||||
|
||||
set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
|
||||
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.8.2.1-src.zip;%PYTHONPATH%
|
||||
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9-src.zip;%PYTHONPATH%
|
||||
|
||||
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
|
||||
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
|
||||
|
|
|
@ -350,7 +350,7 @@
|
|||
<dependency>
|
||||
<groupId>net.sf.py4j</groupId>
|
||||
<artifactId>py4j</artifactId>
|
||||
<version>0.8.2.1</version>
|
||||
<version>0.9</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
|
|
|
@ -32,7 +32,7 @@ private[spark] object PythonUtils {
|
|||
val pythonPath = new ArrayBuffer[String]
|
||||
for (sparkHome <- sys.env.get("SPARK_HOME")) {
|
||||
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
|
||||
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip").mkString(File.separator)
|
||||
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9-src.zip").mkString(File.separator)
|
||||
}
|
||||
pythonPath ++= SparkContext.jarOfObject(this)
|
||||
pythonPath.mkString(File.pathSeparator)
|
||||
|
|
|
@ -7,7 +7,7 @@ SPHINXBUILD = sphinx-build
|
|||
PAPER =
|
||||
BUILDDIR = _build
|
||||
|
||||
export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.8.2.1-src.zip)
|
||||
export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9-src.zip)
|
||||
|
||||
# User-friendly check for sphinx-build
|
||||
ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)
|
||||
|
|
Binary file not shown.
BIN
python/lib/py4j-0.9-src.zip
Normal file
BIN
python/lib/py4j-0.9-src.zip
Normal file
Binary file not shown.
|
@ -32,48 +32,6 @@ from pyspark.streaming.util import TransformFunction, TransformFunctionSerialize
|
|||
__all__ = ["StreamingContext"]
|
||||
|
||||
|
||||
def _daemonize_callback_server():
|
||||
"""
|
||||
Hack Py4J to daemonize callback server
|
||||
|
||||
The thread of callback server has daemon=False, it will block the driver
|
||||
from exiting if it's not shutdown. The following code replace `start()`
|
||||
of CallbackServer with a new version, which set daemon=True for this
|
||||
thread.
|
||||
|
||||
Also, it will update the port number (0) with real port
|
||||
"""
|
||||
# TODO: create a patch for Py4J
|
||||
import socket
|
||||
import py4j.java_gateway
|
||||
logger = py4j.java_gateway.logger
|
||||
from py4j.java_gateway import Py4JNetworkError
|
||||
from threading import Thread
|
||||
|
||||
def start(self):
|
||||
"""Starts the CallbackServer. This method should be called by the
|
||||
client instead of run()."""
|
||||
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
|
||||
1)
|
||||
try:
|
||||
self.server_socket.bind((self.address, self.port))
|
||||
if not self.port:
|
||||
# update port with real port
|
||||
self.port = self.server_socket.getsockname()[1]
|
||||
except Exception as e:
|
||||
msg = 'An error occurred while trying to start the callback server: %s' % e
|
||||
logger.exception(msg)
|
||||
raise Py4JNetworkError(msg)
|
||||
|
||||
# Maybe thread needs to be cleanup up?
|
||||
self.thread = Thread(target=self.run)
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
|
||||
py4j.java_gateway.CallbackServer.start = start
|
||||
|
||||
|
||||
class StreamingContext(object):
|
||||
"""
|
||||
Main entry point for Spark Streaming functionality. A StreamingContext
|
||||
|
@ -123,10 +81,14 @@ class StreamingContext(object):
|
|||
|
||||
# start callback server
|
||||
# getattr will fallback to JVM, so we cannot test by hasattr()
|
||||
if "_callback_server" not in gw.__dict__:
|
||||
_daemonize_callback_server()
|
||||
# use random port
|
||||
gw._start_callback_server(0)
|
||||
if "_callback_server" not in gw.__dict__ or gw._callback_server is None:
|
||||
gw.callback_server_parameters.eager_load = True
|
||||
gw.callback_server_parameters.daemonize = True
|
||||
gw.callback_server_parameters.daemonize_connections = True
|
||||
gw.callback_server_parameters.port = 0
|
||||
gw.start_callback_server(gw.callback_server_parameters)
|
||||
cbport = gw._callback_server.server_socket.getsockname()[1]
|
||||
gw._callback_server.port = cbport
|
||||
# gateway with real port
|
||||
gw._python_proxy_port = gw._callback_server.port
|
||||
# get the GatewayServer object in JVM by ID
|
||||
|
|
|
@ -20,7 +20,7 @@ if sys.version >= "3":
|
|||
from io import BytesIO
|
||||
else:
|
||||
from StringIO import StringIO
|
||||
from py4j.java_gateway import Py4JJavaError
|
||||
from py4j.protocol import Py4JJavaError
|
||||
|
||||
from pyspark.storagelevel import StorageLevel
|
||||
from pyspark.serializers import PairDeserializer, NoOpSerializer, UTF8Deserializer, read_int
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
# limitations under the License.
|
||||
#
|
||||
|
||||
from py4j.java_gateway import Py4JJavaError
|
||||
from py4j.protocol import Py4JJavaError
|
||||
|
||||
from pyspark.rdd import RDD
|
||||
from pyspark.storagelevel import StorageLevel
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
# limitations under the License.
|
||||
#
|
||||
|
||||
from py4j.java_gateway import Py4JJavaError
|
||||
from py4j.protocol import Py4JJavaError
|
||||
|
||||
from pyspark.serializers import PairDeserializer, NoOpSerializer
|
||||
from pyspark.storagelevel import StorageLevel
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
# limitations under the License.
|
||||
#
|
||||
|
||||
from py4j.java_gateway import Py4JJavaError
|
||||
from py4j.protocol import Py4JJavaError
|
||||
|
||||
from pyspark.storagelevel import StorageLevel
|
||||
from pyspark.serializers import UTF8Deserializer
|
||||
|
|
|
@ -61,9 +61,12 @@ class PySparkStreamingTestCase(unittest.TestCase):
|
|||
def tearDownClass(cls):
|
||||
cls.sc.stop()
|
||||
# Clean up in the JVM just in case there has been some issues in Python API
|
||||
jSparkContextOption = SparkContext._jvm.SparkContext.get()
|
||||
if jSparkContextOption.nonEmpty():
|
||||
jSparkContextOption.get().stop()
|
||||
try:
|
||||
jSparkContextOption = SparkContext._jvm.SparkContext.get()
|
||||
if jSparkContextOption.nonEmpty():
|
||||
jSparkContextOption.get().stop()
|
||||
except:
|
||||
pass
|
||||
|
||||
def setUp(self):
|
||||
self.ssc = StreamingContext(self.sc, self.duration)
|
||||
|
@ -72,9 +75,12 @@ class PySparkStreamingTestCase(unittest.TestCase):
|
|||
if self.ssc is not None:
|
||||
self.ssc.stop(False)
|
||||
# Clean up in the JVM just in case there has been some issues in Python API
|
||||
jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive()
|
||||
if jStreamingContextOption.nonEmpty():
|
||||
jStreamingContextOption.get().stop(False)
|
||||
try:
|
||||
jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive()
|
||||
if jStreamingContextOption.nonEmpty():
|
||||
jStreamingContextOption.get().stop(False)
|
||||
except:
|
||||
pass
|
||||
|
||||
def wait_for(self, result, n):
|
||||
start_time = time.time()
|
||||
|
|
|
@ -36,4 +36,4 @@ export SPARK_HOME="${SPARK_PREFIX}"
|
|||
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"$SPARK_HOME/conf"}"
|
||||
# Add the PySpark classes to the PYTHONPATH:
|
||||
export PYTHONPATH="$SPARK_HOME/python:$PYTHONPATH"
|
||||
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
|
||||
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
|
||||
|
|
|
@ -1008,9 +1008,9 @@ private[spark] class Client(
|
|||
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
|
||||
require(pyArchivesFile.exists(),
|
||||
"pyspark.zip not found; cannot run pyspark application in YARN mode.")
|
||||
val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
|
||||
val py4jFile = new File(pyLibPath, "py4j-0.9-src.zip")
|
||||
require(py4jFile.exists(),
|
||||
"py4j-0.8.2.1-src.zip not found; cannot run pyspark application in YARN mode.")
|
||||
"py4j-0.9-src.zip not found; cannot run pyspark application in YARN mode.")
|
||||
Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -153,7 +153,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
|
|||
// needed locations.
|
||||
val sparkHome = sys.props("spark.test.home");
|
||||
val pythonPath = Seq(
|
||||
s"$sparkHome/python/lib/py4j-0.8.2.1-src.zip",
|
||||
s"$sparkHome/python/lib/py4j-0.9-src.zip",
|
||||
s"$sparkHome/python")
|
||||
val extraEnv = Map(
|
||||
"PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),
|
||||
|
|
Loading…
Reference in a new issue