spark-instrumented-optimizer/python/pyspark/context.pyi

195 lines
6.6 KiB
Python
Raw Normal View History

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
NoReturn,
Optional,
Tuple,
Type,
TypeVar,
)
from types import TracebackType
from py4j.java_gateway import JavaGateway, JavaObject # type: ignore[import]
from pyspark.accumulators import Accumulator, AccumulatorParam
from pyspark.broadcast import Broadcast
from pyspark.conf import SparkConf
from pyspark.profiler import Profiler # noqa: F401
from pyspark.resource.information import ResourceInformation
from pyspark.rdd import RDD
from pyspark.serializers import Serializer
from pyspark.status import StatusTracker
T = TypeVar("T")
U = TypeVar("U")
class SparkContext:
master: str
appName: str
sparkHome: str
PACKAGE_EXTENSIONS: Iterable[str]
def __init__(
self,
master: Optional[str] = ...,
appName: Optional[str] = ...,
sparkHome: Optional[str] = ...,
pyFiles: Optional[List[str]] = ...,
environment: Optional[Dict[str, str]] = ...,
batchSize: int = ...,
serializer: Serializer = ...,
conf: Optional[SparkConf] = ...,
gateway: Optional[JavaGateway] = ...,
jsc: Optional[JavaObject] = ...,
profiler_cls: type = ...,
) -> None: ...
def __getnewargs__(self) -> NoReturn: ...
def __enter__(self) -> SparkContext: ...
def __exit__(
self,
type: Optional[Type[BaseException]],
value: Optional[BaseException],
trace: Optional[TracebackType],
) -> None: ...
@classmethod
def getOrCreate(cls, conf: Optional[SparkConf] = ...) -> SparkContext: ...
def setLogLevel(self, logLevel: str) -> None: ...
@classmethod
def setSystemProperty(cls, key: str, value: str) -> None: ...
@property
def version(self) -> str: ...
@property
def applicationId(self) -> str: ...
@property
def uiWebUrl(self) -> str: ...
@property
def startTime(self) -> int: ...
@property
def defaultParallelism(self) -> int: ...
@property
def defaultMinPartitions(self) -> int: ...
def stop(self) -> None: ...
def emptyRDD(self) -> RDD[Any]: ...
def range(
self,
start: int,
end: Optional[int] = ...,
step: int = ...,
numSlices: Optional[int] = ...,
) -> RDD[int]: ...
def parallelize(self, c: Iterable[T], numSlices: Optional[int] = ...) -> RDD[T]: ...
def pickleFile(self, name: str, minPartitions: Optional[int] = ...) -> RDD[Any]: ...
def textFile(
self, name: str, minPartitions: Optional[int] = ..., use_unicode: bool = ...
) -> RDD[str]: ...
def wholeTextFiles(
self, path: str, minPartitions: Optional[int] = ..., use_unicode: bool = ...
) -> RDD[Tuple[str, str]]: ...
def binaryFiles(
self, path: str, minPartitions: Optional[int] = ...
) -> RDD[Tuple[str, bytes]]: ...
def binaryRecords(self, path: str, recordLength: int) -> RDD[bytes]: ...
def sequenceFile(
self,
path: str,
keyClass: Optional[str] = ...,
valueClass: Optional[str] = ...,
keyConverter: Optional[str] = ...,
valueConverter: Optional[str] = ...,
minSplits: Optional[int] = ...,
batchSize: int = ...,
) -> RDD[Tuple[T, U]]: ...
def newAPIHadoopFile(
self,
path: str,
inputFormatClass: str,
keyClass: str,
valueClass: str,
keyConverter: Optional[str] = ...,
valueConverter: Optional[str] = ...,
conf: Optional[Dict[str, str]] = ...,
batchSize: int = ...,
) -> RDD[Tuple[T, U]]: ...
def newAPIHadoopRDD(
self,
inputFormatClass: str,
keyClass: str,
valueClass: str,
keyConverter: Optional[str] = ...,
valueConverter: Optional[str] = ...,
conf: Optional[Dict[str, str]] = ...,
batchSize: int = ...,
) -> RDD[Tuple[T, U]]: ...
def hadoopFile(
self,
path: str,
inputFormatClass: str,
keyClass: str,
valueClass: str,
keyConverter: Optional[str] = ...,
valueConverter: Optional[str] = ...,
conf: Optional[Dict[str, str]] = ...,
batchSize: int = ...,
) -> RDD[Tuple[T, U]]: ...
def hadoopRDD(
self,
inputFormatClass: str,
keyClass: str,
valueClass: str,
keyConverter: Optional[str] = ...,
valueConverter: Optional[str] = ...,
conf: Optional[Dict[str, str]] = ...,
batchSize: int = ...,
) -> RDD[Tuple[T, U]]: ...
def union(self, rdds: Iterable[RDD[T]]) -> RDD[T]: ...
def broadcast(self, value: T) -> Broadcast[T]: ...
def accumulator(
self, value: T, accum_param: Optional[AccumulatorParam[T]] = ...
) -> Accumulator[T]: ...
def addFile(self, path: str, recursive: bool = ...) -> None: ...
def addPyFile(self, path: str) -> None: ...
def setCheckpointDir(self, dirName: str) -> None: ...
[SPARK-33017][PYTHON] Add getCheckpointDir method to PySpark Context ### What changes were proposed in this pull request? Adding a method to get the checkpoint directory from the PySpark context to match the Scala API ### Why are the changes needed? To make the Scala and Python APIs consistent and remove the need to use the JavaObject ### Does this PR introduce _any_ user-facing change? Yes, there is a new method which makes it easier to get the checkpoint directory directly rather than using the JavaObject #### Previous behaviour: ```python >>> spark.sparkContext.setCheckpointDir('/tmp/spark/checkpoint/') >>> sc._jsc.sc().getCheckpointDir().get() 'file:/tmp/spark/checkpoint/63f7b67c-e5dc-4d11-a70c-33554a71717a' ``` This method returns a confusing Scala error if it has not been set ```python >>> sc._jsc.sc().getCheckpointDir().get() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/paul/Desktop/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/home/paul/Desktop/spark/python/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/home/paul/Desktop/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o25.get. : java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) ``` #### New method: ```python >>> spark.sparkContext.setCheckpointDir('/tmp/spark/checkpoint/') >>> spark.sparkContext.getCheckpointDir() 'file:/tmp/spark/checkpoint/b38aca2e-8ace-44fc-a4c4-f4e36c2da2a7' ``` ``getCheckpointDir()`` returns ``None`` if it has not been set ```python >>> print(spark.sparkContext.getCheckpointDir()) None ``` ### How was this patch tested? Added to existing unit tests. But I'm not sure how to add a test for the case where ``getCheckpointDir()`` should return ``None`` since the existing checkpoint tests set the checkpoint directory in the ``setUp`` method before any tests are run as far as I can tell. Closes #29918 from reidy-p/SPARK-33017. Authored-by: reidy-p <paul_reidy@outlook.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-04 22:48:28 -04:00
def getCheckpointDir(self) -> Optional[str]: ...
def setJobGroup(
self, groupId: str, description: str, interruptOnCancel: bool = ...
) -> None: ...
def setLocalProperty(self, key: str, value: str) -> None: ...
def getLocalProperty(self, key: str) -> Optional[str]: ...
def sparkUser(self) -> str: ...
def setJobDescription(self, value: str) -> None: ...
def cancelJobGroup(self, groupId: str) -> None: ...
def cancelAllJobs(self) -> None: ...
def statusTracker(self) -> StatusTracker: ...
def runJob(
self,
rdd: RDD[T],
partitionFunc: Callable[[Iterable[T]], Iterable[U]],
partitions: Optional[List[int]] = ...,
allowLocal: bool = ...,
) -> List[U]: ...
def show_profiles(self) -> None: ...
def dump_profiles(self, path: str) -> None: ...
def getConf(self) -> SparkConf: ...
@property
def resources(self) -> Dict[str, ResourceInformation]: ...