2020-09-24 01:15:36 -04:00
|
|
|
#
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
# or more contributor license agreements. See the NOTICE file
|
|
|
|
# distributed with this work for additional information
|
|
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
|
|
# to you under the Apache License, Version 2.0 (the
|
|
|
|
# "License"); you may not use this file except in compliance
|
|
|
|
# with the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing,
|
|
|
|
# software distributed under the License is distributed on an
|
|
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
|
# KIND, either express or implied. See the License for the
|
|
|
|
# specific language governing permissions and limitations
|
|
|
|
# under the License.
|
|
|
|
|
|
|
|
from typing import overload
|
|
|
|
from typing import Any, Callable, Dict, List, Optional, Union
|
|
|
|
|
|
|
|
from pyspark.sql._typing import SupportsProcess, OptionalPrimitiveType
|
|
|
|
from pyspark.sql.context import SQLContext
|
|
|
|
from pyspark.sql.dataframe import DataFrame
|
|
|
|
from pyspark.sql.readwriter import OptionUtils
|
|
|
|
from pyspark.sql.types import Row, StructType
|
|
|
|
from pyspark.sql.utils import StreamingQueryException
|
|
|
|
|
|
|
|
from py4j.java_gateway import JavaObject # type: ignore[import]
|
|
|
|
|
|
|
|
class StreamingQuery:
|
|
|
|
def __init__(self, jsq: JavaObject) -> None: ...
|
|
|
|
@property
|
|
|
|
def id(self) -> str: ...
|
|
|
|
@property
|
|
|
|
def runId(self) -> str: ...
|
|
|
|
@property
|
|
|
|
def name(self) -> str: ...
|
|
|
|
@property
|
|
|
|
def isActive(self) -> bool: ...
|
|
|
|
def awaitTermination(self, timeout: Optional[int] = ...) -> Optional[bool]: ...
|
|
|
|
@property
|
|
|
|
def status(self) -> Dict[str, Any]: ...
|
|
|
|
@property
|
|
|
|
def recentProgress(self) -> List[Dict[str, Any]]: ...
|
|
|
|
@property
|
|
|
|
def lastProgress(self) -> Optional[Dict[str, Any]]: ...
|
|
|
|
def processAllAvailable(self) -> None: ...
|
|
|
|
def stop(self) -> None: ...
|
|
|
|
def explain(self, extended: bool = ...) -> None: ...
|
|
|
|
def exception(self) -> Optional[StreamingQueryException]: ...
|
|
|
|
|
|
|
|
class StreamingQueryManager:
|
|
|
|
def __init__(self, jsqm: JavaObject) -> None: ...
|
|
|
|
@property
|
|
|
|
def active(self) -> List[StreamingQuery]: ...
|
|
|
|
def get(self, id: str) -> StreamingQuery: ...
|
|
|
|
def awaitAnyTermination(self, timeout: Optional[int] = ...) -> bool: ...
|
|
|
|
def resetTerminated(self) -> None: ...
|
|
|
|
|
|
|
|
class DataStreamReader(OptionUtils):
|
|
|
|
def __init__(self, spark: SQLContext) -> None: ...
|
|
|
|
def format(self, source: str) -> DataStreamReader: ...
|
|
|
|
def schema(self, schema: Union[StructType, str]) -> DataStreamReader: ...
|
|
|
|
def option(self, key: str, value: OptionalPrimitiveType) -> DataStreamReader: ...
|
|
|
|
def options(self, **options: OptionalPrimitiveType) -> DataStreamReader: ...
|
|
|
|
def load(
|
|
|
|
self,
|
|
|
|
path: Optional[str] = ...,
|
|
|
|
format: Optional[str] = ...,
|
2020-11-02 20:00:49 -05:00
|
|
|
schema: Optional[Union[StructType, str]] = ...,
|
2020-09-24 01:15:36 -04:00
|
|
|
**options: OptionalPrimitiveType
|
|
|
|
) -> DataFrame: ...
|
|
|
|
def json(
|
|
|
|
self,
|
|
|
|
path: str,
|
|
|
|
schema: Optional[Union[StructType, str]] = ...,
|
|
|
|
primitivesAsString: Optional[Union[bool, str]] = ...,
|
|
|
|
prefersDecimal: Optional[Union[bool, str]] = ...,
|
|
|
|
allowComments: Optional[Union[bool, str]] = ...,
|
|
|
|
allowUnquotedFieldNames: Optional[Union[bool, str]] = ...,
|
|
|
|
allowSingleQuotes: Optional[Union[bool, str]] = ...,
|
|
|
|
allowNumericLeadingZero: Optional[Union[bool, str]] = ...,
|
|
|
|
allowBackslashEscapingAnyCharacter: Optional[Union[bool, str]] = ...,
|
|
|
|
mode: Optional[str] = ...,
|
|
|
|
columnNameOfCorruptRecord: Optional[str] = ...,
|
|
|
|
dateFormat: Optional[str] = ...,
|
|
|
|
timestampFormat: Optional[str] = ...,
|
|
|
|
multiLine: Optional[Union[bool, str]] = ...,
|
|
|
|
allowUnquotedControlChars: Optional[Union[bool, str]] = ...,
|
|
|
|
lineSep: Optional[str] = ...,
|
|
|
|
locale: Optional[str] = ...,
|
|
|
|
dropFieldIfAllNull: Optional[Union[bool, str]] = ...,
|
|
|
|
encoding: Optional[str] = ...,
|
2020-11-02 20:00:49 -05:00
|
|
|
pathGlobFilter: Optional[Union[bool, str]] = ...,
|
|
|
|
recursiveFileLookup: Optional[Union[bool, str]] = ...,
|
|
|
|
allowNonNumericNumbers: Optional[Union[bool, str]] = ...,
|
2020-09-24 01:15:36 -04:00
|
|
|
) -> DataFrame: ...
|
|
|
|
def orc(
|
|
|
|
self,
|
|
|
|
path: str,
|
|
|
|
mergeSchema: Optional[bool] = ...,
|
2020-11-02 20:00:49 -05:00
|
|
|
pathGlobFilter: Optional[Union[bool, str]] = ...,
|
|
|
|
recursiveFileLookup: Optional[Union[bool, str]] = ...,
|
2020-09-24 01:15:36 -04:00
|
|
|
) -> DataFrame: ...
|
|
|
|
def parquet(
|
|
|
|
self,
|
|
|
|
path: str,
|
|
|
|
mergeSchema: Optional[bool] = ...,
|
2020-11-02 20:00:49 -05:00
|
|
|
pathGlobFilter: Optional[Union[bool, str]] = ...,
|
|
|
|
recursiveFileLookup: Optional[Union[bool, str]] = ...,
|
2020-09-24 01:15:36 -04:00
|
|
|
) -> DataFrame: ...
|
|
|
|
def text(
|
|
|
|
self,
|
|
|
|
path: str,
|
|
|
|
wholetext: bool = ...,
|
|
|
|
lineSep: Optional[str] = ...,
|
2020-11-02 20:00:49 -05:00
|
|
|
pathGlobFilter: Optional[Union[bool, str]] = ...,
|
|
|
|
recursiveFileLookup: Optional[Union[bool, str]] = ...,
|
2020-09-24 01:15:36 -04:00
|
|
|
) -> DataFrame: ...
|
|
|
|
def csv(
|
|
|
|
self,
|
|
|
|
path: str,
|
|
|
|
schema: Optional[Union[StructType, str]] = ...,
|
|
|
|
sep: Optional[str] = ...,
|
|
|
|
encoding: Optional[str] = ...,
|
|
|
|
quote: Optional[str] = ...,
|
|
|
|
escape: Optional[str] = ...,
|
|
|
|
comment: Optional[str] = ...,
|
|
|
|
header: Optional[Union[bool, str]] = ...,
|
|
|
|
inferSchema: Optional[Union[bool, str]] = ...,
|
|
|
|
ignoreLeadingWhiteSpace: Optional[Union[bool, str]] = ...,
|
|
|
|
ignoreTrailingWhiteSpace: Optional[Union[bool, str]] = ...,
|
|
|
|
nullValue: Optional[str] = ...,
|
|
|
|
nanValue: Optional[str] = ...,
|
|
|
|
positiveInf: Optional[str] = ...,
|
|
|
|
negativeInf: Optional[str] = ...,
|
|
|
|
dateFormat: Optional[str] = ...,
|
|
|
|
timestampFormat: Optional[str] = ...,
|
|
|
|
maxColumns: Optional[Union[int, str]] = ...,
|
|
|
|
maxCharsPerColumn: Optional[Union[int, str]] = ...,
|
|
|
|
mode: Optional[str] = ...,
|
|
|
|
columnNameOfCorruptRecord: Optional[str] = ...,
|
|
|
|
multiLine: Optional[Union[bool, str]] = ...,
|
|
|
|
charToEscapeQuoteEscaping: Optional[Union[bool, str]] = ...,
|
|
|
|
enforceSchema: Optional[Union[bool, str]] = ...,
|
|
|
|
emptyValue: Optional[str] = ...,
|
|
|
|
locale: Optional[str] = ...,
|
|
|
|
lineSep: Optional[str] = ...,
|
2020-11-02 20:00:49 -05:00
|
|
|
pathGlobFilter: Optional[Union[bool, str]] = ...,
|
|
|
|
recursiveFileLookup: Optional[Union[bool, str]] = ...,
|
2020-11-27 01:47:39 -05:00
|
|
|
unescapedQuoteHandling: Optional[str] = ...,
|
2020-09-24 01:15:36 -04:00
|
|
|
) -> DataFrame: ...
|
[SPARK-33836][SS][PYTHON] Expose DataStreamReader.table and DataStreamWriter.toTable
### What changes were proposed in this pull request?
This PR proposes to expose `DataStreamReader.table` (SPARK-32885) and `DataStreamWriter.toTable` (SPARK-32896) to PySpark, which are the only way to read and write with table in Structured Streaming.
### Why are the changes needed?
Please refer SPARK-32885 and SPARK-32896 for rationalizations of these public APIs. This PR only exposes them to PySpark.
### Does this PR introduce _any_ user-facing change?
Yes, PySpark users will be able to read and write with table in Structured Streaming query.
### How was this patch tested?
Manually tested.
> v1 table
>> create table A and ingest to the table A
```
spark.sql("""
create table table_pyspark_parquet (
value long,
`timestamp` timestamp
) USING parquet
""")
df = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query = df.writeStream.toTable('table_pyspark_parquet', checkpointLocation='/tmp/checkpoint5')
query.lastProgress
query.stop()
```
>> read table A and ingest to the table B which doesn't exist
```
df2 = spark.readStream.table('table_pyspark_parquet')
query2 = df2.writeStream.toTable('table_pyspark_parquet_nonexist', format='parquet', checkpointLocation='/tmp/checkpoint2')
query2.lastProgress
query2.stop()
```
>> select tables
```
spark.sql("DESCRIBE TABLE table_pyspark_parquet").show()
spark.sql("SELECT * FROM table_pyspark_parquet").show()
spark.sql("DESCRIBE TABLE table_pyspark_parquet_nonexist").show()
spark.sql("SELECT * FROM table_pyspark_parquet_nonexist").show()
```
> v2 table (leveraging Apache Iceberg as it provides V2 table and custom catalog as well)
>> create table A and ingest to the table A
```
spark.sql("""
create table iceberg_catalog.default.table_pyspark_v2table (
value long,
`timestamp` timestamp
) USING iceberg
""")
df = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query = df.select('value', 'timestamp').writeStream.toTable('iceberg_catalog.default.table_pyspark_v2table', checkpointLocation='/tmp/checkpoint_v2table_1')
query.lastProgress
query.stop()
```
>> ingest to the non-exist table B
```
df2 = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query2 = df2.select('value', 'timestamp').writeStream.toTable('iceberg_catalog.default.table_pyspark_v2table_nonexist', checkpointLocation='/tmp/checkpoint_v2table_2')
query2.lastProgress
query2.stop()
```
>> ingest to the non-exist table C partitioned by `value % 10`
```
df3 = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
df3a = df3.selectExpr('value', 'timestamp', 'value % 10 AS partition').repartition('partition')
query3 = df3a.writeStream.partitionBy('partition').toTable('iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned', checkpointLocation='/tmp/checkpoint_v2table_3')
query3.lastProgress
query3.stop()
```
>> select tables
```
spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table").show()
spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table").show()
spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table_nonexist").show()
spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table_nonexist").show()
spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned").show()
spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned").show()
```
Closes #30835 from HeartSaVioR/SPARK-33836.
Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-12-21 05:42:59 -05:00
|
|
|
def table(self, tableName: str) -> DataFrame: ...
|
2020-09-24 01:15:36 -04:00
|
|
|
|
|
|
|
class DataStreamWriter:
|
|
|
|
def __init__(self, df: DataFrame) -> None: ...
|
|
|
|
def outputMode(self, outputMode: str) -> DataStreamWriter: ...
|
|
|
|
def format(self, source: str) -> DataStreamWriter: ...
|
|
|
|
def option(self, key: str, value: OptionalPrimitiveType) -> DataStreamWriter: ...
|
|
|
|
def options(self, **options: OptionalPrimitiveType) -> DataStreamWriter: ...
|
|
|
|
@overload
|
|
|
|
def partitionBy(self, *cols: str) -> DataStreamWriter: ...
|
|
|
|
@overload
|
|
|
|
def partitionBy(self, __cols: List[str]) -> DataStreamWriter: ...
|
|
|
|
def queryName(self, queryName: str) -> DataStreamWriter: ...
|
|
|
|
@overload
|
|
|
|
def trigger(self, processingTime: str) -> DataStreamWriter: ...
|
|
|
|
@overload
|
|
|
|
def trigger(self, once: bool) -> DataStreamWriter: ...
|
|
|
|
@overload
|
|
|
|
def trigger(self, continuous: bool) -> DataStreamWriter: ...
|
|
|
|
def start(
|
|
|
|
self,
|
|
|
|
path: Optional[str] = ...,
|
|
|
|
format: Optional[str] = ...,
|
|
|
|
outputMode: Optional[str] = ...,
|
|
|
|
partitionBy: Optional[Union[str, List[str]]] = ...,
|
|
|
|
queryName: Optional[str] = ...,
|
|
|
|
**options: OptionalPrimitiveType
|
|
|
|
) -> StreamingQuery: ...
|
|
|
|
@overload
|
|
|
|
def foreach(self, f: Callable[[Row], None]) -> DataStreamWriter: ...
|
|
|
|
@overload
|
|
|
|
def foreach(self, f: SupportsProcess) -> DataStreamWriter: ...
|
|
|
|
def foreachBatch(
|
|
|
|
self, func: Callable[[DataFrame, int], None]
|
|
|
|
) -> DataStreamWriter: ...
|
[SPARK-33836][SS][PYTHON] Expose DataStreamReader.table and DataStreamWriter.toTable
### What changes were proposed in this pull request?
This PR proposes to expose `DataStreamReader.table` (SPARK-32885) and `DataStreamWriter.toTable` (SPARK-32896) to PySpark, which are the only way to read and write with table in Structured Streaming.
### Why are the changes needed?
Please refer SPARK-32885 and SPARK-32896 for rationalizations of these public APIs. This PR only exposes them to PySpark.
### Does this PR introduce _any_ user-facing change?
Yes, PySpark users will be able to read and write with table in Structured Streaming query.
### How was this patch tested?
Manually tested.
> v1 table
>> create table A and ingest to the table A
```
spark.sql("""
create table table_pyspark_parquet (
value long,
`timestamp` timestamp
) USING parquet
""")
df = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query = df.writeStream.toTable('table_pyspark_parquet', checkpointLocation='/tmp/checkpoint5')
query.lastProgress
query.stop()
```
>> read table A and ingest to the table B which doesn't exist
```
df2 = spark.readStream.table('table_pyspark_parquet')
query2 = df2.writeStream.toTable('table_pyspark_parquet_nonexist', format='parquet', checkpointLocation='/tmp/checkpoint2')
query2.lastProgress
query2.stop()
```
>> select tables
```
spark.sql("DESCRIBE TABLE table_pyspark_parquet").show()
spark.sql("SELECT * FROM table_pyspark_parquet").show()
spark.sql("DESCRIBE TABLE table_pyspark_parquet_nonexist").show()
spark.sql("SELECT * FROM table_pyspark_parquet_nonexist").show()
```
> v2 table (leveraging Apache Iceberg as it provides V2 table and custom catalog as well)
>> create table A and ingest to the table A
```
spark.sql("""
create table iceberg_catalog.default.table_pyspark_v2table (
value long,
`timestamp` timestamp
) USING iceberg
""")
df = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query = df.select('value', 'timestamp').writeStream.toTable('iceberg_catalog.default.table_pyspark_v2table', checkpointLocation='/tmp/checkpoint_v2table_1')
query.lastProgress
query.stop()
```
>> ingest to the non-exist table B
```
df2 = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query2 = df2.select('value', 'timestamp').writeStream.toTable('iceberg_catalog.default.table_pyspark_v2table_nonexist', checkpointLocation='/tmp/checkpoint_v2table_2')
query2.lastProgress
query2.stop()
```
>> ingest to the non-exist table C partitioned by `value % 10`
```
df3 = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
df3a = df3.selectExpr('value', 'timestamp', 'value % 10 AS partition').repartition('partition')
query3 = df3a.writeStream.partitionBy('partition').toTable('iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned', checkpointLocation='/tmp/checkpoint_v2table_3')
query3.lastProgress
query3.stop()
```
>> select tables
```
spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table").show()
spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table").show()
spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table_nonexist").show()
spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table_nonexist").show()
spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned").show()
spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned").show()
```
Closes #30835 from HeartSaVioR/SPARK-33836.
Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-12-21 05:42:59 -05:00
|
|
|
def toTable(
|
|
|
|
self,
|
|
|
|
tableName: str,
|
|
|
|
format: Optional[str] = ...,
|
|
|
|
outputMode: Optional[str] = ...,
|
|
|
|
partitionBy: Optional[Union[str, List[str]]] = ...,
|
|
|
|
queryName: Optional[str] = ...,
|
|
|
|
**options: OptionalPrimitiveType
|
|
|
|
) -> StreamingQuery: ...
|