Commit graph

4 commits

Author SHA1 Message Date
Jungtaek Lim 8d4d433191 [SPARK-33836][SS][PYTHON] Expose DataStreamReader.table and DataStreamWriter.toTable
### What changes were proposed in this pull request?

This PR proposes to expose `DataStreamReader.table` (SPARK-32885) and `DataStreamWriter.toTable` (SPARK-32896) to PySpark, which are the only way to read and write with table in Structured Streaming.

### Why are the changes needed?

Please refer SPARK-32885 and SPARK-32896 for rationalizations of these public APIs. This PR only exposes them to PySpark.

### Does this PR introduce _any_ user-facing change?

Yes, PySpark users will be able to read and write with table in Structured Streaming query.

### How was this patch tested?

Manually tested.

> v1 table

>> create table A and ingest to the table A

```
spark.sql("""
create table table_pyspark_parquet (
    value long,
    `timestamp` timestamp
) USING parquet
""")
df = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query = df.writeStream.toTable('table_pyspark_parquet', checkpointLocation='/tmp/checkpoint5')
query.lastProgress
query.stop()
```

>> read table A and ingest to the table B which doesn't exist

```
df2 = spark.readStream.table('table_pyspark_parquet')
query2 = df2.writeStream.toTable('table_pyspark_parquet_nonexist', format='parquet', checkpointLocation='/tmp/checkpoint2')
query2.lastProgress
query2.stop()
```

>> select tables

```
spark.sql("DESCRIBE TABLE table_pyspark_parquet").show()
spark.sql("SELECT * FROM table_pyspark_parquet").show()

spark.sql("DESCRIBE TABLE table_pyspark_parquet_nonexist").show()
spark.sql("SELECT * FROM table_pyspark_parquet_nonexist").show()
```

> v2 table (leveraging Apache Iceberg as it provides V2 table and custom catalog as well)

>> create table A and ingest to the table A

```
spark.sql("""
create table iceberg_catalog.default.table_pyspark_v2table (
    value long,
    `timestamp` timestamp
) USING iceberg
""")
df = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query = df.select('value', 'timestamp').writeStream.toTable('iceberg_catalog.default.table_pyspark_v2table', checkpointLocation='/tmp/checkpoint_v2table_1')
query.lastProgress
query.stop()
```

>> ingest to the non-exist table B

```
df2 = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query2 = df2.select('value', 'timestamp').writeStream.toTable('iceberg_catalog.default.table_pyspark_v2table_nonexist', checkpointLocation='/tmp/checkpoint_v2table_2')
query2.lastProgress
query2.stop()
```

>> ingest to the non-exist table C partitioned by `value % 10`

```
df3 = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
df3a = df3.selectExpr('value', 'timestamp', 'value % 10 AS partition').repartition('partition')
query3 = df3a.writeStream.partitionBy('partition').toTable('iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned', checkpointLocation='/tmp/checkpoint_v2table_3')
query3.lastProgress
query3.stop()
```

>> select tables

```
spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table").show()
spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table").show()

spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table_nonexist").show()
spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table_nonexist").show()

spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned").show()
spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned").show()
```

Closes #30835 from HeartSaVioR/SPARK-33836.

Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-12-21 19:42:59 +09:00
yangjie01 433ae9064f [SPARK-33566][CORE][SQL][SS][PYTHON] Make unescapedQuoteHandling option configurable when read CSV
### What changes were proposed in this pull request?
There are some differences between Spark CSV, opencsv and commons-csv, the typical case are described in SPARK-33566, When there are both unescaped quotes and unescaped qualifier in value,  the results of parsing are different.

The reason for the difference is Spark use `STOP_AT_DELIMITER` as default `UnescapedQuoteHandling` to build `CsvParser` and it not configurable.

On the other hand, opencsv and commons-csv use the parsing mechanism similar to `STOP_AT_CLOSING_QUOTE ` by default.

So this pr make `unescapedQuoteHandling` option configurable to get the same parsing result as opencsv and commons-csv.

### Why are the changes needed?
Make unescapedQuoteHandling option configurable when read CSV to make parsing more flexible。

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass the Jenkins or GitHub Action

- Add a new case similar to that described in SPARK-33566

Closes #30518 from LuciferYang/SPARK-33566.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-11-27 15:47:39 +09:00
HyukjinKwon 3959f0d987 [SPARK-33250][PYTHON][DOCS] Migration to NumPy documentation style in SQL (pyspark.sql.*)
### What changes were proposed in this pull request?

This PR proposes to migrate to [NumPy documentation style](https://numpydoc.readthedocs.io/en/latest/format.html), see also SPARK-33243.
While I am migrating, I also fixed some Python type hints accordingly.

### Why are the changes needed?

For better documentation as text itself, and generated HTMLs

### Does this PR introduce _any_ user-facing change?

Yes, they will see a better format of HTMLs, and better text format. See SPARK-33243.

### How was this patch tested?

Manually tested via running `./dev/lint-python`.

Closes #30181 from HyukjinKwon/SPARK-33250.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-11-03 10:00:49 +09:00
zero323 31a16fbb40 [SPARK-32714][PYTHON] Initial pyspark-stubs port
### What changes were proposed in this pull request?

This PR proposes migration of [`pyspark-stubs`](https://github.com/zero323/pyspark-stubs) into Spark codebase.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

Yes. This PR adds type annotations directly to Spark source.

This can impact interaction with development tools for users, which haven't used `pyspark-stubs`.

### How was this patch tested?

- [x] MyPy tests of the PySpark source
    ```
    mypy --no-incremental --config python/mypy.ini python/pyspark
    ```
- [x] MyPy tests of Spark examples
    ```
   MYPYPATH=python/ mypy --no-incremental --config python/mypy.ini examples/src/main/python/ml examples/src/main/python/sql examples/src/main/python/sql/streaming
    ```
- [x] Existing Flake8 linter

- [x] Existing unit tests

Tested against:

- `mypy==0.790+dev.e959952d9001e9713d329a2f9b196705b028f894`
- `mypy==0.782`

Closes #29591 from zero323/SPARK-32681.

Authored-by: zero323 <mszymkiewicz@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-24 14:15:36 +09:00