8d4d433191
### What changes were proposed in this pull request? This PR proposes to expose `DataStreamReader.table` (SPARK-32885) and `DataStreamWriter.toTable` (SPARK-32896) to PySpark, which are the only way to read and write with table in Structured Streaming. ### Why are the changes needed? Please refer SPARK-32885 and SPARK-32896 for rationalizations of these public APIs. This PR only exposes them to PySpark. ### Does this PR introduce _any_ user-facing change? Yes, PySpark users will be able to read and write with table in Structured Streaming query. ### How was this patch tested? Manually tested. > v1 table >> create table A and ingest to the table A ``` spark.sql(""" create table table_pyspark_parquet ( value long, `timestamp` timestamp ) USING parquet """) df = spark.readStream.format('rate').option('rowsPerSecond', 100).load() query = df.writeStream.toTable('table_pyspark_parquet', checkpointLocation='/tmp/checkpoint5') query.lastProgress query.stop() ``` >> read table A and ingest to the table B which doesn't exist ``` df2 = spark.readStream.table('table_pyspark_parquet') query2 = df2.writeStream.toTable('table_pyspark_parquet_nonexist', format='parquet', checkpointLocation='/tmp/checkpoint2') query2.lastProgress query2.stop() ``` >> select tables ``` spark.sql("DESCRIBE TABLE table_pyspark_parquet").show() spark.sql("SELECT * FROM table_pyspark_parquet").show() spark.sql("DESCRIBE TABLE table_pyspark_parquet_nonexist").show() spark.sql("SELECT * FROM table_pyspark_parquet_nonexist").show() ``` > v2 table (leveraging Apache Iceberg as it provides V2 table and custom catalog as well) >> create table A and ingest to the table A ``` spark.sql(""" create table iceberg_catalog.default.table_pyspark_v2table ( value long, `timestamp` timestamp ) USING iceberg """) df = spark.readStream.format('rate').option('rowsPerSecond', 100).load() query = df.select('value', 'timestamp').writeStream.toTable('iceberg_catalog.default.table_pyspark_v2table', checkpointLocation='/tmp/checkpoint_v2table_1') query.lastProgress query.stop() ``` >> ingest to the non-exist table B ``` df2 = spark.readStream.format('rate').option('rowsPerSecond', 100).load() query2 = df2.select('value', 'timestamp').writeStream.toTable('iceberg_catalog.default.table_pyspark_v2table_nonexist', checkpointLocation='/tmp/checkpoint_v2table_2') query2.lastProgress query2.stop() ``` >> ingest to the non-exist table C partitioned by `value % 10` ``` df3 = spark.readStream.format('rate').option('rowsPerSecond', 100).load() df3a = df3.selectExpr('value', 'timestamp', 'value % 10 AS partition').repartition('partition') query3 = df3a.writeStream.partitionBy('partition').toTable('iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned', checkpointLocation='/tmp/checkpoint_v2table_3') query3.lastProgress query3.stop() ``` >> select tables ``` spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table").show() spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table").show() spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table_nonexist").show() spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table_nonexist").show() spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned").show() spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned").show() ``` Closes #30835 from HeartSaVioR/SPARK-33836. Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
---|---|---|
.. | ||
cloudpickle | ||
ml | ||
mllib | ||
resource | ||
sql | ||
streaming | ||
testing | ||
tests | ||
__init__.py | ||
__init__.pyi | ||
_globals.py | ||
_typing.pyi | ||
accumulators.py | ||
accumulators.pyi | ||
broadcast.py | ||
broadcast.pyi | ||
conf.py | ||
conf.pyi | ||
context.py | ||
context.pyi | ||
daemon.py | ||
files.py | ||
files.pyi | ||
find_spark_home.py | ||
install.py | ||
java_gateway.py | ||
join.py | ||
profiler.py | ||
profiler.pyi | ||
py.typed | ||
rdd.py | ||
rdd.pyi | ||
rddsampler.py | ||
resultiterable.py | ||
resultiterable.pyi | ||
serializers.py | ||
shell.py | ||
shuffle.py | ||
statcounter.py | ||
statcounter.pyi | ||
status.py | ||
status.pyi | ||
storagelevel.py | ||
storagelevel.pyi | ||
taskcontext.py | ||
taskcontext.pyi | ||
traceback_utils.py | ||
util.py | ||
version.py | ||
version.pyi | ||
worker.py |