2013-09-02 15:23:03 -04:00
|
|
|
#
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
|
|
# this work for additional information regarding copyright ownership.
|
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
# (the "License"); you may not use this file except in compliance with
|
|
|
|
# the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
#
|
|
|
|
|
2013-01-01 16:52:14 -05:00
|
|
|
"""
|
2013-09-02 15:23:03 -04:00
|
|
|
PySpark is the Python API for Spark.
|
2013-01-01 16:52:14 -05:00
|
|
|
|
|
|
|
Public classes:
|
|
|
|
|
2014-10-07 21:09:27 -04:00
|
|
|
- :class:`SparkContext`:
|
2013-12-29 20:15:07 -05:00
|
|
|
Main entry point for Spark functionality.
|
2015-02-17 16:36:43 -05:00
|
|
|
- :class:`RDD`:
|
2013-12-29 20:15:07 -05:00
|
|
|
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
|
2015-02-17 16:36:43 -05:00
|
|
|
- :class:`Broadcast`:
|
2013-12-29 20:15:07 -05:00
|
|
|
A broadcast variable that gets reused across tasks.
|
2015-02-17 16:36:43 -05:00
|
|
|
- :class:`Accumulator`:
|
2013-12-29 20:15:07 -05:00
|
|
|
An "add-only" shared variable that tasks can only add values to.
|
2015-02-17 16:36:43 -05:00
|
|
|
- :class:`SparkConf`:
|
2013-12-29 20:15:07 -05:00
|
|
|
For configuring Spark.
|
2015-02-17 16:36:43 -05:00
|
|
|
- :class:`SparkFiles`:
|
2013-12-29 20:15:07 -05:00
|
|
|
Access files shipped with jobs.
|
2015-02-17 16:36:43 -05:00
|
|
|
- :class:`StorageLevel`:
|
2013-12-29 20:15:07 -05:00
|
|
|
Finer-grained cache persistence levels.
|
2016-12-20 18:51:21 -05:00
|
|
|
- :class:`TaskContext`:
|
2017-06-19 15:35:58 -04:00
|
|
|
Information about the current running task, available on the workers and experimental.
|
2018-08-29 10:22:03 -04:00
|
|
|
- :class:`RDDBarrier`:
|
|
|
|
Wraps an RDD under a barrier stage for barrier execution.
|
|
|
|
- :class:`BarrierTaskContext`:
|
|
|
|
A :class:`TaskContext` that provides extra info and tooling for barrier execution.
|
|
|
|
- :class:`BarrierTaskInfo`:
|
|
|
|
Information about a barrier task.
|
[SPARK-32010][PYTHON][CORE] Add InheritableThread for local properties and fixing a thread leak issue in pinned thread mode
### What changes were proposed in this pull request?
This PR proposes:
1. To introduce `InheritableThread` class, that works identically with `threading.Thread` but it can inherit the inheritable attributes of a JVM thread such as `InheritableThreadLocal`.
This was a problem from the pinned thread mode, see also https://github.com/apache/spark/pull/24898. Now it works as below:
```python
import pyspark
spark.sparkContext.setLocalProperty("a", "hi")
def print_prop():
print(spark.sparkContext.getLocalProperty("a"))
pyspark.InheritableThread(target=print_prop).start()
```
```
hi
```
2. Also, it adds the resource leak fix into `InheritableThread`. Py4J leaks the thread and does not close the connection from Python to JVM. In `InheritableThread`, it manually closes the connections when PVM garbage collection happens. So, JVM threads finish safely. I manually verified by profiling but there's also another easy way to verify:
```bash
PYSPARK_PIN_THREAD=true ./bin/pyspark
```
```python
>>> from threading import Thread
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> spark._jvm._gateway_client.deque
deque([<py4j.clientserver.ClientServerConnection object at 0x119f7aba8>, <py4j.clientserver.ClientServerConnection object at 0x119fc9b70>, <py4j.clientserver.ClientServerConnection object at 0x119fc9e10>, <py4j.clientserver.ClientServerConnection object at 0x11a015358>, <py4j.clientserver.ClientServerConnection object at 0x119fc00f0>])
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> spark._jvm._gateway_client.deque
deque([<py4j.clientserver.ClientServerConnection object at 0x119f7aba8>, <py4j.clientserver.ClientServerConnection object at 0x119fc9b70>, <py4j.clientserver.ClientServerConnection object at 0x119fc9e10>, <py4j.clientserver.ClientServerConnection object at 0x11a015358>, <py4j.clientserver.ClientServerConnection object at 0x119fc08d0>, <py4j.clientserver.ClientServerConnection object at 0x119fc00f0>])
```
This issue is fixed now.
3. Because now we have a fix for the issue here, it also proposes to deprecate `collectWithJobGroup` which was a temporary workaround added to avoid this leak issue.
### Why are the changes needed?
To support pinned thread mode properly without a resource leak, and a proper inheritable local properties.
### Does this PR introduce _any_ user-facing change?
Yes, it adds an API `InheritableThread` class for pinned thread mode.
### How was this patch tested?
Manually tested as described above, and unit test was added as well.
Closes #28968 from HyukjinKwon/SPARK-32010.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-29 21:15:25 -04:00
|
|
|
- :class:`InheritableThread`:
|
|
|
|
A inheritable thread to use in Spark when the pinned thread mode is on.
|
2013-01-01 16:52:14 -05:00
|
|
|
"""
|
2013-12-29 20:15:07 -05:00
|
|
|
|
2016-04-20 13:32:01 -04:00
|
|
|
from functools import wraps
|
2016-03-14 22:25:49 -04:00
|
|
|
import types
|
|
|
|
|
2013-12-29 14:03:39 -05:00
|
|
|
from pyspark.conf import SparkConf
|
2018-08-29 10:22:03 -04:00
|
|
|
from pyspark.rdd import RDD, RDDBarrier
|
2013-01-21 19:42:24 -05:00
|
|
|
from pyspark.files import SparkFiles
|
2020-08-30 22:23:31 -04:00
|
|
|
from pyspark.status import StatusTracker, SparkJobInfo, SparkStageInfo
|
[SPARK-32010][PYTHON][CORE] Add InheritableThread for local properties and fixing a thread leak issue in pinned thread mode
### What changes were proposed in this pull request?
This PR proposes:
1. To introduce `InheritableThread` class, that works identically with `threading.Thread` but it can inherit the inheritable attributes of a JVM thread such as `InheritableThreadLocal`.
This was a problem from the pinned thread mode, see also https://github.com/apache/spark/pull/24898. Now it works as below:
```python
import pyspark
spark.sparkContext.setLocalProperty("a", "hi")
def print_prop():
print(spark.sparkContext.getLocalProperty("a"))
pyspark.InheritableThread(target=print_prop).start()
```
```
hi
```
2. Also, it adds the resource leak fix into `InheritableThread`. Py4J leaks the thread and does not close the connection from Python to JVM. In `InheritableThread`, it manually closes the connections when PVM garbage collection happens. So, JVM threads finish safely. I manually verified by profiling but there's also another easy way to verify:
```bash
PYSPARK_PIN_THREAD=true ./bin/pyspark
```
```python
>>> from threading import Thread
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> spark._jvm._gateway_client.deque
deque([<py4j.clientserver.ClientServerConnection object at 0x119f7aba8>, <py4j.clientserver.ClientServerConnection object at 0x119fc9b70>, <py4j.clientserver.ClientServerConnection object at 0x119fc9e10>, <py4j.clientserver.ClientServerConnection object at 0x11a015358>, <py4j.clientserver.ClientServerConnection object at 0x119fc00f0>])
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> spark._jvm._gateway_client.deque
deque([<py4j.clientserver.ClientServerConnection object at 0x119f7aba8>, <py4j.clientserver.ClientServerConnection object at 0x119fc9b70>, <py4j.clientserver.ClientServerConnection object at 0x119fc9e10>, <py4j.clientserver.ClientServerConnection object at 0x11a015358>, <py4j.clientserver.ClientServerConnection object at 0x119fc08d0>, <py4j.clientserver.ClientServerConnection object at 0x119fc00f0>])
```
This issue is fixed now.
3. Because now we have a fix for the issue here, it also proposes to deprecate `collectWithJobGroup` which was a temporary workaround added to avoid this leak issue.
### Why are the changes needed?
To support pinned thread mode properly without a resource leak, and a proper inheritable local properties.
### Does this PR introduce _any_ user-facing change?
Yes, it adds an API `InheritableThread` class for pinned thread mode.
### How was this patch tested?
Manually tested as described above, and unit test was added as well.
Closes #28968 from HyukjinKwon/SPARK-32010.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-29 21:15:25 -04:00
|
|
|
from pyspark.util import InheritableThread
|
2013-09-07 17:41:31 -04:00
|
|
|
from pyspark.storagelevel import StorageLevel
|
2014-09-03 14:49:45 -04:00
|
|
|
from pyspark.accumulators import Accumulator, AccumulatorParam
|
|
|
|
from pyspark.broadcast import Broadcast
|
|
|
|
from pyspark.serializers import MarshalSerializer, PickleSerializer
|
2018-08-29 10:22:03 -04:00
|
|
|
from pyspark.taskcontext import TaskContext, BarrierTaskContext, BarrierTaskInfo
|
2015-01-28 16:48:06 -05:00
|
|
|
from pyspark.profiler import Profiler, BasicProfiler
|
2021-04-14 17:36:25 -04:00
|
|
|
from pyspark.version import __version__
|
2020-08-08 11:51:57 -04:00
|
|
|
from pyspark._globals import _NoValue # noqa: F401
|
2012-12-29 01:51:28 -05:00
|
|
|
|
2015-09-08 23:56:22 -04:00
|
|
|
|
|
|
|
def since(version):
|
|
|
|
"""
|
|
|
|
A decorator that annotates a function to append the version of Spark the function was added.
|
|
|
|
"""
|
|
|
|
import re
|
|
|
|
indent_p = re.compile(r'\n( +)')
|
|
|
|
|
|
|
|
def deco(f):
|
|
|
|
indents = indent_p.findall(f.__doc__)
|
|
|
|
indent = ' ' * (min(len(m) for m in indents) if indents else 0)
|
|
|
|
f.__doc__ = f.__doc__.rstrip() + "\n\n%s.. versionadded:: %s" % (indent, version)
|
|
|
|
return f
|
|
|
|
return deco
|
|
|
|
|
|
|
|
|
2016-03-14 22:25:49 -04:00
|
|
|
def copy_func(f, name=None, sinceversion=None, doc=None):
|
|
|
|
"""
|
|
|
|
Returns a function with same code, globals, defaults, closure, and
|
|
|
|
name (or provide a new name).
|
|
|
|
"""
|
|
|
|
# See
|
|
|
|
# http://stackoverflow.com/questions/6527633/how-can-i-make-a-deepcopy-of-a-function-in-python
|
|
|
|
fn = types.FunctionType(f.__code__, f.__globals__, name or f.__name__, f.__defaults__,
|
|
|
|
f.__closure__)
|
|
|
|
# in case f was given attrs (note this dict is a shallow copy):
|
|
|
|
fn.__dict__.update(f.__dict__)
|
|
|
|
if doc is not None:
|
|
|
|
fn.__doc__ = doc
|
|
|
|
if sinceversion is not None:
|
|
|
|
fn = since(sinceversion)(fn)
|
|
|
|
return fn
|
|
|
|
|
|
|
|
|
2016-04-20 13:32:01 -04:00
|
|
|
def keyword_only(func):
|
|
|
|
"""
|
|
|
|
A decorator that forces keyword arguments in the wrapped method
|
|
|
|
and saves actual input keyword arguments in `_input_kwargs`.
|
2017-03-03 19:43:45 -05:00
|
|
|
|
2020-11-15 20:21:50 -05:00
|
|
|
Notes
|
|
|
|
-----
|
|
|
|
Should only be used to wrap a method where first arg is `self`
|
2016-04-20 13:32:01 -04:00
|
|
|
"""
|
|
|
|
@wraps(func)
|
2017-03-03 19:43:45 -05:00
|
|
|
def wrapper(self, *args, **kwargs):
|
|
|
|
if len(args) > 0:
|
2016-04-20 13:32:01 -04:00
|
|
|
raise TypeError("Method %s forces keyword arguments." % func.__name__)
|
2017-03-03 19:43:45 -05:00
|
|
|
self._input_kwargs = kwargs
|
|
|
|
return func(self, **kwargs)
|
2016-04-20 13:32:01 -04:00
|
|
|
return wrapper
|
|
|
|
|
[SPARK-33017][PYTHON] Add getCheckpointDir method to PySpark Context
### What changes were proposed in this pull request?
Adding a method to get the checkpoint directory from the PySpark context to match the Scala API
### Why are the changes needed?
To make the Scala and Python APIs consistent and remove the need to use the JavaObject
### Does this PR introduce _any_ user-facing change?
Yes, there is a new method which makes it easier to get the checkpoint directory directly rather than using the JavaObject
#### Previous behaviour:
```python
>>> spark.sparkContext.setCheckpointDir('/tmp/spark/checkpoint/')
>>> sc._jsc.sc().getCheckpointDir().get()
'file:/tmp/spark/checkpoint/63f7b67c-e5dc-4d11-a70c-33554a71717a'
```
This method returns a confusing Scala error if it has not been set
```python
>>> sc._jsc.sc().getCheckpointDir().get()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/paul/Desktop/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/home/paul/Desktop/spark/python/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/home/paul/Desktop/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o25.get.
: java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:529)
at scala.None$.get(Option.scala:527)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```
#### New method:
```python
>>> spark.sparkContext.setCheckpointDir('/tmp/spark/checkpoint/')
>>> spark.sparkContext.getCheckpointDir()
'file:/tmp/spark/checkpoint/b38aca2e-8ace-44fc-a4c4-f4e36c2da2a7'
```
``getCheckpointDir()`` returns ``None`` if it has not been set
```python
>>> print(spark.sparkContext.getCheckpointDir())
None
```
### How was this patch tested?
Added to existing unit tests. But I'm not sure how to add a test for the case where ``getCheckpointDir()`` should return ``None`` since the existing checkpoint tests set the checkpoint directory in the ``setUp`` method before any tests are run as far as I can tell.
Closes #29918 from reidy-p/SPARK-33017.
Authored-by: reidy-p <paul_reidy@outlook.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-04 22:48:28 -04:00
|
|
|
# To avoid circular dependencies
|
|
|
|
from pyspark.context import SparkContext
|
2016-04-20 13:32:01 -04:00
|
|
|
|
2014-09-03 14:49:45 -04:00
|
|
|
# for back compatibility
|
2020-08-08 11:51:57 -04:00
|
|
|
from pyspark.sql import SQLContext, HiveContext, Row # noqa: F401
|
2012-12-29 01:51:28 -05:00
|
|
|
|
2014-09-03 14:49:45 -04:00
|
|
|
__all__ = [
|
|
|
|
"SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast",
|
|
|
|
"Accumulator", "AccumulatorParam", "MarshalSerializer", "PickleSerializer",
|
2016-12-20 18:51:21 -05:00
|
|
|
"StatusTracker", "SparkJobInfo", "SparkStageInfo", "Profiler", "BasicProfiler", "TaskContext",
|
[SPARK-32010][PYTHON][CORE] Add InheritableThread for local properties and fixing a thread leak issue in pinned thread mode
### What changes were proposed in this pull request?
This PR proposes:
1. To introduce `InheritableThread` class, that works identically with `threading.Thread` but it can inherit the inheritable attributes of a JVM thread such as `InheritableThreadLocal`.
This was a problem from the pinned thread mode, see also https://github.com/apache/spark/pull/24898. Now it works as below:
```python
import pyspark
spark.sparkContext.setLocalProperty("a", "hi")
def print_prop():
print(spark.sparkContext.getLocalProperty("a"))
pyspark.InheritableThread(target=print_prop).start()
```
```
hi
```
2. Also, it adds the resource leak fix into `InheritableThread`. Py4J leaks the thread and does not close the connection from Python to JVM. In `InheritableThread`, it manually closes the connections when PVM garbage collection happens. So, JVM threads finish safely. I manually verified by profiling but there's also another easy way to verify:
```bash
PYSPARK_PIN_THREAD=true ./bin/pyspark
```
```python
>>> from threading import Thread
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> spark._jvm._gateway_client.deque
deque([<py4j.clientserver.ClientServerConnection object at 0x119f7aba8>, <py4j.clientserver.ClientServerConnection object at 0x119fc9b70>, <py4j.clientserver.ClientServerConnection object at 0x119fc9e10>, <py4j.clientserver.ClientServerConnection object at 0x11a015358>, <py4j.clientserver.ClientServerConnection object at 0x119fc00f0>])
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> spark._jvm._gateway_client.deque
deque([<py4j.clientserver.ClientServerConnection object at 0x119f7aba8>, <py4j.clientserver.ClientServerConnection object at 0x119fc9b70>, <py4j.clientserver.ClientServerConnection object at 0x119fc9e10>, <py4j.clientserver.ClientServerConnection object at 0x11a015358>, <py4j.clientserver.ClientServerConnection object at 0x119fc08d0>, <py4j.clientserver.ClientServerConnection object at 0x119fc00f0>])
```
This issue is fixed now.
3. Because now we have a fix for the issue here, it also proposes to deprecate `collectWithJobGroup` which was a temporary workaround added to avoid this leak issue.
### Why are the changes needed?
To support pinned thread mode properly without a resource leak, and a proper inheritable local properties.
### Does this PR introduce _any_ user-facing change?
Yes, it adds an API `InheritableThread` class for pinned thread mode.
### How was this patch tested?
Manually tested as described above, and unit test was added as well.
Closes #28968 from HyukjinKwon/SPARK-32010.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-29 21:15:25 -04:00
|
|
|
"RDDBarrier", "BarrierTaskContext", "BarrierTaskInfo", "InheritableThread",
|
2021-04-14 17:36:25 -04:00
|
|
|
"__version__",
|
2014-09-03 14:49:45 -04:00
|
|
|
]
|