Commit graph

80 commits

Author SHA1 Message Date
HyukjinKwon e5abbab0ed [SPARK-30128][DOCS][PYTHON][SQL] Document/promote 'recursiveFileLookup' and 'pathGlobFilter' in file sources 'mergeSchema' in ORC
### What changes were proposed in this pull request?

This PR adds and exposes the options, 'recursiveFileLookup' and 'pathGlobFilter' in file sources 'mergeSchema' in ORC, into documentation.

- `recursiveFileLookup` at file sources: https://github.com/apache/spark/pull/24830 ([SPARK-27627](https://issues.apache.org/jira/browse/SPARK-27627))
- `pathGlobFilter` at file sources: https://github.com/apache/spark/pull/24518 ([SPARK-27990](https://issues.apache.org/jira/browse/SPARK-27990))
- `mergeSchema` at ORC: https://github.com/apache/spark/pull/24043 ([SPARK-11412](https://issues.apache.org/jira/browse/SPARK-11412))

**Note that** `timeZone` option was not moved from `DataFrameReader.options` as I assume it will likely affect other datasources as well once DSv2 is complete.

### Why are the changes needed?

To document available options in sources properly.

### Does this PR introduce any user-facing change?

In PySpark, `pathGlobFilter` can be set via `DataFrameReader.(text|orc|parquet|json|csv)` and `DataStreamReader.(text|orc|parquet|json|csv)`.

### How was this patch tested?

Manually built the doc and checked the output. Option setting in PySpark is rather a logical change. I manually tested one only:

```bash
$ ls -al tmp
...
-rw-r--r--   1 hyukjin.kwon  staff     3 Dec 20 12:19 aa
-rw-r--r--   1 hyukjin.kwon  staff     3 Dec 20 12:19 ab
-rw-r--r--   1 hyukjin.kwon  staff     3 Dec 20 12:19 ac
-rw-r--r--   1 hyukjin.kwon  staff     3 Dec 20 12:19 cc
```

```python
>>> spark.read.text("tmp", pathGlobFilter="*c").show()
```

```
+-----+
|value|
+-----+
|   ac|
|   cc|
+-----+
```

Closes #26958 from HyukjinKwon/doc-followup.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-12-23 09:57:42 +09:00
Nicholas Chammas c8922d9145 [SPARK-30113][SQL][PYTHON] Expose mergeSchema option in PySpark's ORC APIs
### What changes were proposed in this pull request?

This PR is a follow-up to #24043 and cousin of #26730. It exposes the `mergeSchema` option directly in the ORC APIs.

### Why are the changes needed?

So the Python API matches the Scala API.

### Does this PR introduce any user-facing change?

Yes, it adds a new option directly in the ORC reader method signatures.

### How was this patch tested?

I tested this manually as follows:

```
>>> spark.range(3).write.orc('test-orc')
>>> spark.range(3).withColumnRenamed('id', 'name').write.orc('test-orc/nested')
>>> spark.read.orc('test-orc', recursiveFileLookup=True, mergeSchema=True)
DataFrame[id: bigint, name: bigint]
>>> spark.read.orc('test-orc', recursiveFileLookup=True, mergeSchema=False)
DataFrame[id: bigint]
>>> spark.conf.set('spark.sql.orc.mergeSchema', True)
>>> spark.read.orc('test-orc', recursiveFileLookup=True)
DataFrame[id: bigint, name: bigint]
>>> spark.read.orc('test-orc', recursiveFileLookup=True, mergeSchema=False)
DataFrame[id: bigint]
```

Closes #26755 from nchammas/SPARK-30113-ORC-mergeSchema.

Authored-by: Nicholas Chammas <nicholas.chammas@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-12-04 11:44:24 +09:00
Nicholas Chammas e766a323bc [SPARK-30091][SQL][PYTHON] Document mergeSchema option directly in the PySpark Parquet APIs
### What changes were proposed in this pull request?

This change properly documents the `mergeSchema` option directly in the Python APIs for reading Parquet data.

### Why are the changes needed?

The docstring for `DataFrameReader.parquet()` mentions `mergeSchema` but doesn't show it in the API. It seems like a simple oversight.

Before this PR, you'd have to do this to use `mergeSchema`:

```python
spark.read.option('mergeSchema', True).parquet('test-parquet').show()
```

After this PR, you can use the option as (I believe) it was intended to be used:

```python
spark.read.parquet('test-parquet', mergeSchema=True).show()
```

### Does this PR introduce any user-facing change?

Yes, this PR changes the signatures of `DataFrameReader.parquet()` and `DataStreamReader.parquet()` to match their docstrings.

### How was this patch tested?

Testing the `mergeSchema` option directly seems to be left to the Scala side of the codebase. I tested my change manually to confirm the API works.

I also confirmed that setting `spark.sql.parquet.mergeSchema` at the session does not get overridden by leaving `mergeSchema` at its default when calling `parquet()`:

```
>>> spark.conf.set('spark.sql.parquet.mergeSchema', True)
>>> spark.range(3).write.parquet('test-parquet/id')
>>> spark.range(3).withColumnRenamed('id', 'name').write.parquet('test-parquet/name')
>>> spark.read.option('recursiveFileLookup', True).parquet('test-parquet').show()
+----+----+
|  id|name|
+----+----+
|null|   1|
|null|   2|
|null|   0|
|   1|null|
|   2|null|
|   0|null|
+----+----+
>>> spark.read.option('recursiveFileLookup', True).parquet('test-parquet', mergeSchema=False).show()
+----+
|  id|
+----+
|null|
|null|
|null|
|   1|
|   2|
|   0|
+----+
```

Closes #26730 from nchammas/parquet-merge-schema.

Authored-by: Nicholas Chammas <nicholas.chammas@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-12-04 11:31:57 +09:00
Nicholas Chammas 3dd3a623f2 [SPARK-27990][SPARK-29903][PYTHON] Add recursiveFileLookup option to Python DataFrameReader
### What changes were proposed in this pull request?

As a follow-up to #24830, this PR adds the `recursiveFileLookup` option to the Python DataFrameReader API.

### Why are the changes needed?

This PR maintains Python feature parity with Scala.

### Does this PR introduce any user-facing change?

Yes.

Before this PR, you'd only be able to use this option as follows:

```python
spark.read.option("recursiveFileLookup", True).text("test-data").show()
```

With this PR, you can reference the option from within the format-specific method:

```python
spark.read.text("test-data", recursiveFileLookup=True).show()
```

This option now also shows up in the Python API docs.

### How was this patch tested?

I tested this manually by creating the following directories with dummy data:

```
test-data
├── 1.txt
└── nested
   └── 2.txt
test-parquet
├── nested
│  ├── _SUCCESS
│  ├── part-00000-...-.parquet
├── _SUCCESS
├── part-00000-...-.parquet
```

I then ran the following tests and confirmed the output looked good:

```python
spark.read.parquet("test-parquet", recursiveFileLookup=True).show()
spark.read.text("test-data", recursiveFileLookup=True).show()
spark.read.csv("test-data", recursiveFileLookup=True).show()
```

`python/pyspark/sql/tests/test_readwriter.py` seems pretty sparse. I'm happy to add my tests there, though it seems we have been deferring testing like this to the Scala side of things.

Closes #26718 from nchammas/SPARK-27990-recursiveFileLookup-python.

Authored-by: Nicholas Chammas <nicholas.chammas@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-12-04 10:10:30 +09:00
Matt Stillwell 1e1b7302f4 [MINOR][PYSPARK][DOCS] Fix typo in example documentation
### What changes were proposed in this pull request?

I propose that we change the example code documentation to call the proper function .
For example, under the `foreachBatch` function, the example code was calling the `foreach()` function by mistake.

### Why are the changes needed?

I suppose it could confuse some people, and it is a typo

### Does this PR introduce any user-facing change?

No, there is no "meaningful" code being change, simply the documentation

### How was this patch tested?

I made the change on a fork and it still worked

Closes #26299 from mstill3/patch-1.

Authored-by: Matt Stillwell <18670089+mstill3@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-01 11:55:29 -07:00
Jeff Evans 95de93b24e [SPARK-24540][SQL] Support for multiple character delimiter in Spark CSV read
Updating univocity-parsers version to 2.8.3, which adds support for multiple character delimiters

Moving univocity-parsers version to spark-parent pom dependencyManagement section

Adding new utility method to build multi-char delimiter string, which delegates to existing one

Adding tests for multiple character delimited CSV

### What changes were proposed in this pull request?

Adds support for parsing CSV data using multiple-character delimiters.  Existing logic for converting the input delimiter string to characters was kept and invoked in a loop.  Project dependencies were updated to remove redundant declaration of `univocity-parsers` version, and also to change that version to the latest.

### Why are the changes needed?

It is quite common for people to have delimited data, where the delimiter is not a single character, but rather a sequence of characters.  Currently, it is difficult to handle such data in Spark (typically needs pre-processing).

### Does this PR introduce any user-facing change?

Yes. Specifying the "delimiter" option for the DataFrame read, and providing more than one character, will no longer result in an exception.  Instead, it will be converted as before and passed to the underlying library (Univocity), which has accepted multiple character delimiters since 2.8.0.

### How was this patch tested?

The `CSVSuite` tests were confirmed passing (including new methods), and `sbt` tests for `sql` were executed.

Closes #26027 from jeff303/SPARK-24540.

Authored-by: Jeff Evans <jeffrey.wayne.evans@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-10-15 15:44:51 -05:00
Shixiong Zhu 5bb69945e4 [SPARK-28651][SS] Force the schema of Streaming file source to be nullable
## What changes were proposed in this pull request?

Right now, batch DataFrame always changes the schema to nullable automatically (See this line: 325bc8e9c6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala (L399)). But streaming file source is missing this.

This PR updates the streaming file source schema to force it be nullable. I also added a flag `spark.sql.streaming.fileSource.schema.forceNullable` to disable this change since some users may rely on the old behavior.

## How was this patch tested?

The new unit test.

Closes #25382 from zsxwing/SPARK-28651.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-09 18:54:55 +09:00
Maxim Gekk a5a5da78cf [SPARK-28471][SQL] Replace yyyy by uuuu in date-timestamp patterns without era
## What changes were proposed in this pull request?

In the PR, I propose to use `uuuu` for years instead of `yyyy` in date/timestamp patterns without the era pattern `G` (https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). **Parsing/formatting of positive years (current era) will be the same.** The difference is in formatting negative years belong to previous era - BC (Before Christ).

I replaced the `yyyy` pattern by `uuuu` everywhere except:
1. Test, Suite & Benchmark. Existing tests must work as is.
2. `SimpleDateFormat` because it doesn't support the `uuuu` pattern.
3. Comments and examples (except comments related to already replaced patterns).

Before the changes, the year of common era `100` and the year of BC era `-99`, showed similarly as `100`.  After the changes negative years will be formatted with the `-` sign.

Before:
```Scala
scala> Seq(java.time.LocalDate.of(-99, 1, 1)).toDF().show
+----------+
|     value|
+----------+
|0100-01-01|
+----------+
```

After:
```Scala
scala> Seq(java.time.LocalDate.of(-99, 1, 1)).toDF().show
+-----------+
|      value|
+-----------+
|-0099-01-01|
+-----------+
```

## How was this patch tested?

By existing test suites, and added tests for negative years to `DateFormatterSuite` and `TimestampFormatterSuite`.

Closes #25230 from MaxGekk/year-pattern-uuuu.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-28 20:36:36 -07:00
Gengliang Wang 78a403fab9 [SPARK-27627][SQL] Make option "pathGlobFilter" as a general option for all file sources
## What changes were proposed in this pull request?

### Background:
The data source option `pathGlobFilter` is introduced for Binary file format: https://github.com/apache/spark/pull/24354 , which can be used for filtering file names, e.g. reading `.png` files only while there is `.json` files in the same directory.

### Proposal:
Make the option `pathGlobFilter` as a general option for all file sources. The path filtering should happen in the path globbing on Driver.

### Motivation:
Filtering the file path names in file scan tasks on executors is kind of ugly.

### Impact:
1. The splitting of file partitions will be more balanced.
2. The metrics of file scan will be more accurate.
3. Users can use the option for reading other file sources.

## How was this patch tested?

Unit tests

Closes #24518 from gengliangwang/globFilter.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-05-09 08:41:43 +09:00
Gabor Somogyi fb6b19ab7c [SPARK-23014][SS] Fully remove V1 memory sink.
## What changes were proposed in this pull request?

There is a MemorySink v2 already so v1 can be removed. In this PR I've removed it completely.
What this PR contains:
* V1 memory sink removal
* V2 memory sink renamed to become the only implementation
* Since DSv2 sends exceptions in a chained format (linking them with cause field) I've made python side compliant
* Adapted all the tests

## How was this patch tested?

Existing unit tests.

Closes #24403 from gaborgsomogyi/SPARK-23014.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-04-29 09:44:23 -07:00
Sean Owen 0deebd3820 [SPARK-26016][DOCS] Clarify that text DataSource read/write, and RDD methods that read text, always use UTF-8
## What changes were proposed in this pull request?

Clarify that text DataSource read/write, and RDD methods that read text, always use UTF-8 as they use Hadoop's implementation underneath. I think these are all the places that this needs a mention in the user-facing docs.

## How was this patch tested?

Doc tests.

Closes #23962 from srowen/SPARK-26016.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-03-05 08:03:39 +09:00
Maxim Gekk 1008ab0801 [SPARK-26178][SPARK-26243][SQL][FOLLOWUP] Replacing SimpleDateFormat by DateTimeFormatter in comments
## What changes were proposed in this pull request?

The PRs #23150 and #23196 switched JSON and CSV datasources on new formatter for dates/timestamps which is based on `DateTimeFormatter`. In this PR, I replaced `SimpleDateFormat` by `DateTimeFormatter` to reflect the changes.

Closes #23374 from MaxGekk/java-time-docs.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2018-12-24 10:47:47 +08:00
Maxim Gekk 4e1d859c19 [SPARK-26303][SQL] Return partial results for bad JSON records
## What changes were proposed in this pull request?

In the PR, I propose to return partial results from JSON datasource and JSON functions in the PERMISSIVE mode if some of JSON fields are parsed and converted to desired types successfully. The changes are made only for `StructType`. Whole bad JSON records are placed into the corrupt column specified by the `columnNameOfCorruptRecord` option or SQL config.

Partial results are not returned for malformed JSON input.

## How was this patch tested?

Added new UT which checks converting JSON strings with one invalid and one valid field at the end of the string.

Closes #23253 from MaxGekk/json-bad-record.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2018-12-11 16:06:57 +08:00
Maxim Gekk 8e8d1177e6 [SPARK-26108][SQL] Support custom lineSep in CSV datasource
## What changes were proposed in this pull request?

In the PR,  I propose new options for CSV datasource - `lineSep` similar to Text and JSON datasource. The option allows to specify custom line separator of maximum length of 2 characters (because of a restriction in `uniVocity` parser). New option can be used in reading and writing CSV files.

## How was this patch tested?

Added a few tests with custom `lineSep` for enabled/disabled `multiLine` in read as well as tests in write. Also I added roundtrip tests.

Closes #23080 from MaxGekk/csv-line-sep.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-24 00:50:20 +09:00
Maxim Gekk aec0af4a95 [SPARK-25972][PYTHON] Missed JSON options in streaming.py
## What changes were proposed in this pull request?

Added JSON options for `json()` in streaming.py that are presented in the similar method in readwriter.py. In particular, missed options are `dropFieldIfAllNull` and `encoding`.

Closes #22973 from MaxGekk/streaming-missed-options.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-11 21:01:29 +08:00
Maxim Gekk 79551f558d [SPARK-25945][SQL] Support locale while parsing date/timestamp from CSV/JSON
## What changes were proposed in this pull request?

In the PR, I propose to add new option `locale` into CSVOptions/JSONOptions to make parsing date/timestamps in local languages possible. Currently the locale is hard coded to `Locale.US`.

## How was this patch tested?

Added two tests for parsing a date from CSV/JSON - `ноя 2018`.

Closes #22951 from MaxGekk/locale.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-09 09:45:06 +08:00
Sean Owen 08c76b5d39 [SPARK-25238][PYTHON] lint-python: Fix W605 warnings for pycodestyle 2.4
(This change is a subset of the changes needed for the JIRA; see https://github.com/apache/spark/pull/22231)

## What changes were proposed in this pull request?

Use raw strings and simpler regex syntax consistently in Python, which also avoids warnings from pycodestyle about accidentally relying Python's non-escaping of non-reserved chars in normal strings. Also, fix a few long lines.

## How was this patch tested?

Existing tests, and some manual double-checking of the behavior of regexes in Python 2/3 to be sure.

Closes #22400 from srowen/SPARK-25238.2.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-09-13 11:19:43 +08:00
Mario Molina c9cb393dc4 [SPARK-17916][SPARK-25241][SQL][FOLLOW-UP] Fix empty string being parsed as null when nullValue is set.
## What changes were proposed in this pull request?

In the PR, I propose new CSV option `emptyValue` and an update in the SQL Migration Guide which describes how to revert previous behavior when empty strings were not written at all. Since Spark 2.4, empty strings are saved as `""` to distinguish them from saved `null`s.

Closes #22234
Closes #22367

## How was this patch tested?

It was tested by `CSVSuite` and new tests added in the PR #22234

Closes #22389 from MaxGekk/csv-empty-value-master.

Lead-authored-by: Mario Molina <mmolimar@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-09-11 20:47:14 +08:00
cclauss 71f38ac242 [SPARK-23698][PYTHON] Resolve undefined names in Python 3
## What changes were proposed in this pull request?

Fix issues arising from the fact that builtins __file__, __long__, __raw_input()__, __unicode__, __xrange()__, etc. were all removed from Python 3.  __Undefined names__ have the potential to raise [NameError](https://docs.python.org/3/library/exceptions.html#NameError) at runtime.

## How was this patch tested?
* $ __python2 -m flake8 . --count --select=E9,F82 --show-source --statistics__
* $ __python3 -m flake8 . --count --select=E9,F82 --show-source --statistics__

holdenk

flake8 testing of https://github.com/apache/spark on Python 3.6.3

$ __python3 -m flake8 . --count --select=E901,E999,F821,F822,F823 --show-source --statistics__
```
./dev/merge_spark_pr.py:98:14: F821 undefined name 'raw_input'
    result = raw_input("\n%s (y/n): " % prompt)
             ^
./dev/merge_spark_pr.py:136:22: F821 undefined name 'raw_input'
    primary_author = raw_input(
                     ^
./dev/merge_spark_pr.py:186:16: F821 undefined name 'raw_input'
    pick_ref = raw_input("Enter a branch name [%s]: " % default_branch)
               ^
./dev/merge_spark_pr.py:233:15: F821 undefined name 'raw_input'
    jira_id = raw_input("Enter a JIRA id [%s]: " % default_jira_id)
              ^
./dev/merge_spark_pr.py:278:20: F821 undefined name 'raw_input'
    fix_versions = raw_input("Enter comma-separated fix version(s) [%s]: " % default_fix_versions)
                   ^
./dev/merge_spark_pr.py:317:28: F821 undefined name 'raw_input'
            raw_assignee = raw_input(
                           ^
./dev/merge_spark_pr.py:430:14: F821 undefined name 'raw_input'
    pr_num = raw_input("Which pull request would you like to merge? (e.g. 34): ")
             ^
./dev/merge_spark_pr.py:442:18: F821 undefined name 'raw_input'
        result = raw_input("Would you like to use the modified title? (y/n): ")
                 ^
./dev/merge_spark_pr.py:493:11: F821 undefined name 'raw_input'
    while raw_input("\n%s (y/n): " % pick_prompt).lower() == "y":
          ^
./dev/create-release/releaseutils.py:58:16: F821 undefined name 'raw_input'
    response = raw_input("%s [y/n]: " % msg)
               ^
./dev/create-release/releaseutils.py:152:38: F821 undefined name 'unicode'
        author = unidecode.unidecode(unicode(author, "UTF-8")).strip()
                                     ^
./python/setup.py:37:11: F821 undefined name '__version__'
VERSION = __version__
          ^
./python/pyspark/cloudpickle.py:275:18: F821 undefined name 'buffer'
        dispatch[buffer] = save_buffer
                 ^
./python/pyspark/cloudpickle.py:807:18: F821 undefined name 'file'
        dispatch[file] = save_file
                 ^
./python/pyspark/sql/conf.py:61:61: F821 undefined name 'unicode'
        if not isinstance(obj, str) and not isinstance(obj, unicode):
                                                            ^
./python/pyspark/sql/streaming.py:25:21: F821 undefined name 'long'
    intlike = (int, long)
                    ^
./python/pyspark/streaming/dstream.py:405:35: F821 undefined name 'long'
        return self._sc._jvm.Time(long(timestamp * 1000))
                                  ^
./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:21:10: F821 undefined name 'xrange'
for i in xrange(50):
         ^
./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:22:14: F821 undefined name 'xrange'
    for j in xrange(5):
             ^
./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:23:18: F821 undefined name 'xrange'
        for k in xrange(20022):
                 ^
20    F821 undefined name 'raw_input'
20
```

Closes #20838 from cclauss/fix-undefined-names.

Authored-by: cclauss <cclauss@bluewin.ch>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2018-08-22 10:06:59 -07:00
Tathagata Das 2cb976355c [SPARK-24565][SS] Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame
## What changes were proposed in this pull request?

Currently, the micro-batches in the MicroBatchExecution is not exposed to the user through any public API. This was because we did not want to expose the micro-batches, so that all the APIs we expose, we can eventually support them in the Continuous engine. But now that we have better sense of buiding a ContinuousExecution, I am considering adding APIs which will run only the MicroBatchExecution. I have quite a few use cases where exposing the microbatch output as a dataframe is useful.
- Pass the output rows of each batch to a library that is designed only the batch jobs (example, uses many ML libraries need to collect() while learning).
- Reuse batch data sources for output whose streaming version does not exists (e.g. redshift data source).
- Writer the output rows to multiple places by writing twice for each batch. This is not the most elegant thing to do for multiple-output streaming queries but is likely to be better than running two streaming queries processing the same data twice.

The proposal is to add a method `foreachBatch(f: Dataset[T] => Unit)` to Scala/Java/Python `DataStreamWriter`.

## How was this patch tested?
New unit tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21571 from tdas/foreachBatch.
2018-06-19 13:56:51 -07:00
Tathagata Das b5ccf0d395 [SPARK-24396][SS][PYSPARK] Add Structured Streaming ForeachWriter for python
## What changes were proposed in this pull request?

This PR adds `foreach` for streaming queries in Python. Users will be able to specify their processing logic in two different ways.
- As a function that takes a row as input.
- As an object that has methods `open`, `process`, and `close` methods.

See the python docs in this PR for more details.

## How was this patch tested?
Added java and python unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21477 from tdas/SPARK-24396.
2018-06-15 12:56:39 -07:00
Maxim Gekk 1d9338bb10 [SPARK-23786][SQL] Checking column names of csv headers
## What changes were proposed in this pull request?

Currently column names of headers in CSV files are not checked against provided schema of CSV data. It could cause errors like showed in the [SPARK-23786](https://issues.apache.org/jira/browse/SPARK-23786) and https://github.com/apache/spark/pull/20894#issuecomment-375957777. I introduced new CSV option - `enforceSchema`. If it is enabled (by default `true`), Spark forcibly applies provided or inferred schema to CSV files. In that case, CSV headers are ignored and not checked against the schema. If `enforceSchema` is set to `false`, additional checks can be performed. For example, if column in CSV header and in the schema have different ordering, the following exception is thrown:

```
java.lang.IllegalArgumentException: CSV file header does not contain the expected fields
 Header: depth, temperature
 Schema: temperature, depth
CSV file: marina.csv
```

## How was this patch tested?

The changes were tested by existing tests of CSVSuite and by 2 new tests.

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #20894 from MaxGekk/check-column-names.
2018-06-03 22:02:21 -07:00
hyukjinkwon 34c4b9c57e [SPARK-23765][SQL] Supports custom line separator for json datasource
## What changes were proposed in this pull request?

This PR proposes to add lineSep option for a configurable line separator in text datasource.
It supports this option by using `LineRecordReader`'s functionality with passing it to the constructor.

The approach is similar with https://github.com/apache/spark/pull/20727; however, one main difference is, it uses text datasource's `lineSep` option to parse line by line in JSON's schema inference.

## How was this patch tested?

Manually tested and unit tests were added.

Author: hyukjinkwon <gurwls223@apache.org>
Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20877 from HyukjinKwon/linesep-json.
2018-03-28 19:49:27 +08:00
Bryan Cutler a9350d7095 [SPARK-23700][PYTHON] Cleanup imports in pyspark.sql
## What changes were proposed in this pull request?

This cleans up unused imports, mainly from pyspark.sql module.  Added a note in function.py that imports `UserDefinedFunction` only to maintain backwards compatibility for using `from pyspark.sql.function import UserDefinedFunction`.

## How was this patch tested?

Existing tests and built docs.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #20892 from BryanCutler/pyspark-cleanup-imports-SPARK-23700.
2018-03-26 12:42:32 +09:00
hyukjinkwon 8d79113b81 [SPARK-23577][SQL] Supports custom line separator for text datasource
## What changes were proposed in this pull request?

This PR proposes to add `lineSep` option for a configurable line separator in text datasource.

It supports this option by using `LineRecordReader`'s functionality with passing it to the constructor.

## How was this patch tested?

Manual tests and unit tests were added.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20727 from HyukjinKwon/linesep-text.
2018-03-21 09:46:47 -07:00
Benjamin Peterson 7013eea11c [SPARK-23522][PYTHON] always use sys.exit over builtin exit
The exit() builtin is only for interactive use. applications should use sys.exit().

## What changes were proposed in this pull request?

All usage of the builtin `exit()` function is replaced by `sys.exit()`.

## How was this patch tested?

I ran `python/run-tests`.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Benjamin Peterson <benjamin@python.org>

Closes #20682 from benjaminp/sys-exit.
2018-03-08 20:38:34 +09:00
Liang-Chi Hsieh b14993e1fc [SPARK-23448][SQL] Clarify JSON and CSV parser behavior in document
## What changes were proposed in this pull request?

Clarify JSON and CSV reader behavior in document.

JSON doesn't support partial results for corrupted records.
CSV only supports partial results for the records with more or less tokens.

## How was this patch tested?

Pass existing tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #20666 from viirya/SPARK-23448-2.
2018-02-28 11:00:54 +09:00
Tathagata Das 2d41f040a3 [SPARK-23143][SS][PYTHON] Added python API for setting continuous trigger
## What changes were proposed in this pull request?
Self-explanatory.

## How was this patch tested?
New python tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #20309 from tdas/SPARK-23143.
2018-01-18 12:25:52 -08:00
Felix Cheung df95a908ba [SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, trigger, partitionBy
## What changes were proposed in this pull request?

R Structured Streaming API for withWatermark, trigger, partitionBy

## How was this patch tested?

manual, unit tests

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #20129 from felixcheung/rwater.
2018-01-03 21:43:14 -08:00
soonmok-kwon ffe6fd77a4 [SPARK-22818][SQL] csv escape of quote escape
## What changes were proposed in this pull request?

Escape of escape should be considered when using the UniVocity csv encoding/decoding library.

Ref: https://github.com/uniVocity/univocity-parsers#escaping-quote-escape-characters

One option is added for reading and writing CSV: `escapeQuoteEscaping`

## How was this patch tested?

Unit test added.

Author: soonmok-kwon <soonmok.kwon@navercorp.com>

Closes #20004 from ep1804/SPARK-22818.
2017-12-29 07:30:06 +08:00
Dongjoon Hyun 9962390af7 [SPARK-22781][SS] Support creating streaming dataset with ORC files
## What changes were proposed in this pull request?

Like `Parquet`, users can use `ORC` with Apache Spark structured streaming. This PR adds `orc()` to `DataStreamReader`(Scala/Python) in order to support creating streaming dataset with ORC file format more easily like the other file formats. Also, this adds a test coverage for ORC data source and updates the document.

**BEFORE**

```scala
scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
<console>:24: error: value orc is not a member of org.apache.spark.sql.streaming.DataStreamReader
       spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
```

**AFTER**
```scala
scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper678b3746

scala>
-------------------------------------------
Batch: 0
-------------------------------------------
+---+
|  a|
+---+
|  1|
+---+
```

## How was this patch tested?

Pass the newly added test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19975 from dongjoon-hyun/SPARK-22781.
2017-12-19 23:50:06 -08:00
vinodkc 51620e288b [SPARK-21756][SQL] Add JSON option to allow unquoted control characters
## What changes were proposed in this pull request?

This patch adds allowUnquotedControlChars option in JSON data source to allow JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters)

## How was this patch tested?
Add new test cases

Author: vinodkc <vinod.kc.in@gmail.com>

Closes #19008 from vinodkc/br_fix_SPARK-21756.
2017-08-25 10:18:03 -07:00
hyukjinkwon 7525ce98b4 [SPARK-20431][SS][FOLLOWUP] Specify a schema by using a DDL-formatted string in DataStreamReader
## What changes were proposed in this pull request?

This pr supported a DDL-formatted string in `DataStreamReader.schema`.
This fix could make users easily define a schema without importing the type classes.

For example,

```scala
scala> spark.readStream.schema("col0 INT, col1 DOUBLE").load("/tmp/abc").printSchema()
root
 |-- col0: integer (nullable = true)
 |-- col1: double (nullable = true)
```

## How was this patch tested?

Added tests in `DataStreamReaderWriterSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18373 from HyukjinKwon/SPARK-20431.
2017-06-24 11:39:41 +08:00
Xiao Li 2051428173 [SPARK-20980][SQL] Rename wholeFile to multiLine for both CSV and JSON
### What changes were proposed in this pull request?
The current option name `wholeFile` is misleading for CSV users. Currently, it is not representing a record per file. Actually, one file could have multiple records. Thus, we should rename it. Now, the proposal is `multiLine`.

### How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #18202 from gatorsmile/renameCVSOption.
2017-06-15 13:18:19 +08:00
Michael Armbrust d935e0a9d9 [SPARK-20844] Remove experimental from Structured Streaming APIs
Now that Structured Streaming has been out for several Spark release and has large production use cases, the `Experimental` label is no longer appropriate.  I've left `InterfaceStability.Evolving` however, as I think we may make a few changes to the pluggable Source & Sink API in Spark 2.3.

Author: Michael Armbrust <michael@databricks.com>

Closes #18065 from marmbrus/streamingGA.
2017-05-26 13:33:23 -07:00
hyukjinkwon bca4259f12 [MINOR][DOCS] JSON APIs related documentation fixes
## What changes were proposed in this pull request?

This PR proposes corrections related to JSON APIs as below:

- Rendering links in Python documentation
- Replacing `RDD` to `Dataset` in programing guide
- Adding missing description about JSON Lines consistently in `DataFrameReader.json` in Python API
- De-duplicating little bit of `DataFrameReader.json` in Scala/Java API

## How was this patch tested?

Manually build the documentation via `jekyll build`. Corresponding snapstops will be left on the codes.

Note that currently there are Javadoc8 breaks in several places. These are proposed to be handled in https://github.com/apache/spark/pull/17477. So, this PR does not fix those.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17602 from HyukjinKwon/minor-json-documentation.
2017-04-12 09:16:39 +01:00
hyukjinkwon cff11fd20e [SPARK-20166][SQL] Use XXX for ISO 8601 timezone instead of ZZ (FastDateFormat specific) in CSV/JSON timeformat options
## What changes were proposed in this pull request?

This PR proposes to use `XXX` format instead of `ZZ`. `ZZ` seems a `FastDateFormat` specific.

`ZZ` supports "ISO 8601 extended format time zones" but it seems `FastDateFormat` specific option.
I misunderstood this is compatible format with `SimpleDateFormat` when this change is introduced.
Please see [SimpleDateFormat documentation]( https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html#iso8601timezone) and [FastDateFormat documentation](https://commons.apache.org/proper/commons-lang/apidocs/org/apache/commons/lang3/time/FastDateFormat.html).

It seems we better replace `ZZ` to `XXX` because they look using the same strategy - [FastDateParser.java#L930](8767cd4f1a/src/main/java/org/apache/commons/lang3/time/FastDateParser.java (L930)), [FastDateParser.java#L932-L951 ](8767cd4f1a/src/main/java/org/apache/commons/lang3/time/FastDateParser.java (L932-L951)) and [FastDateParser.java#L596-L601](8767cd4f1a/src/main/java/org/apache/commons/lang3/time/FastDateParser.java (L596-L601)).

I also checked the codes and manually debugged it for sure. It seems both cases use the same pattern `( Z|(?:[+-]\\d{2}(?::)\\d{2}))`.

_Note that this should be rather a fix about documentation and not the behaviour change because `ZZ` seems invalid date format in `SimpleDateFormat` as documented in `DataFrameReader` and etc, and both `ZZ` and `XXX` look identically working with `FastDateFormat`_

Current documentation is as below:

```
   * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
   * indicates a timestamp format. Custom date formats follow the formats at
   * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
```

## How was this patch tested?

Existing tests should cover this. Also, manually tested as below (BTW, I don't think these are worth being added as tests within Spark):

**Parse**

```scala
scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000-11:00")
res4: java.util.Date = Tue Mar 21 20:00:00 KST 2017

scala>  new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000Z")
res10: java.util.Date = Tue Mar 21 09:00:00 KST 2017

scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000-11:00")
java.text.ParseException: Unparseable date: "2017-03-21T00:00:00.000-11:00"
  at java.text.DateFormat.parse(DateFormat.java:366)
  ... 48 elided
scala>  new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000Z")
java.text.ParseException: Unparseable date: "2017-03-21T00:00:00.000Z"
  at java.text.DateFormat.parse(DateFormat.java:366)
  ... 48 elided
```

```scala
scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000-11:00")
res7: java.util.Date = Tue Mar 21 20:00:00 KST 2017

scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000Z")
res1: java.util.Date = Tue Mar 21 09:00:00 KST 2017

scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000-11:00")
res8: java.util.Date = Tue Mar 21 20:00:00 KST 2017

scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000Z")
res2: java.util.Date = Tue Mar 21 09:00:00 KST 2017
```

**Format**

```scala
scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").format(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000-11:00"))
res6: String = 2017-03-21T20:00:00.000+09:00
```

```scala
scala> val fd = org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ")
fd: org.apache.commons.lang3.time.FastDateFormat = FastDateFormat[yyyy-MM-dd'T'HH:mm:ss.SSSZZ,ko_KR,Asia/Seoul]

scala> fd.format(fd.parse("2017-03-21T00:00:00.000-11:00"))
res1: String = 2017-03-21T20:00:00.000+09:00

scala> val fd = org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
fd: org.apache.commons.lang3.time.FastDateFormat = FastDateFormat[yyyy-MM-dd'T'HH:mm:ss.SSSXXX,ko_KR,Asia/Seoul]

scala> fd.format(fd.parse("2017-03-21T00:00:00.000-11:00"))
res2: String = 2017-03-21T20:00:00.000+09:00
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17489 from HyukjinKwon/SPARK-20166.
2017-04-03 10:07:41 +01:00
Tyson Condie 746a558de2 [SPARK-19876][SS][WIP] OneTime Trigger Executor
## What changes were proposed in this pull request?

An additional trigger and trigger executor that will execute a single trigger only. One can use this OneTime trigger to have more control over the scheduling of triggers.

In addition, this patch requires an optimization to StreamExecution that logs a commit record at the end of successfully processing a batch. This new commit log will be used to determine the next batch (offsets) to process after a restart, instead of using the offset log itself to determine what batch to process next after restart; using the offset log to determine this would process the previously logged batch, always, thus not permitting a OneTime trigger feature.

## How was this patch tested?

A number of existing tests have been revised. These tests all assumed that when restarting a stream, the last batch in the offset log is to be re-processed. Given that we now have a commit log that will tell us if that last batch was processed successfully, the results/assumptions of those tests needed to be revised accordingly.

In addition, a OneTime trigger test was added to StreamingQuerySuite, which tests:
- The semantics of OneTime trigger (i.e., on start, execute a single batch, then stop).
- The case when the commit log was not able to successfully log the completion of a batch before restart, which would mean that we should fall back to what's in the offset log.
- A OneTime trigger execution that results in an exception being thrown.

marmbrus tdas zsxwing

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tyson Condie <tcondie@gmail.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #17219 from tcondie/stream-commit.
2017-03-23 14:32:05 -07:00
hyukjinkwon 07c12c09a7 [SPARK-18579][SQL] Use ignoreLeadingWhiteSpace and ignoreTrailingWhiteSpace options in CSV writing
## What changes were proposed in this pull request?

This PR proposes to support _not_ trimming the white spaces when writing out. These are `false` by default in CSV reading path but these are `true` by default in CSV writing in univocity parser.

Both `ignoreLeadingWhiteSpace` and `ignoreTrailingWhiteSpace` options are not being used for writing and therefore, we are always trimming the white spaces.

It seems we should provide a way to keep this white spaces easily.

WIth the data below:

```scala
val df = spark.read.csv(Seq("a , b  , c").toDS)
df.show()
```

```
+---+----+---+
|_c0| _c1|_c2|
+---+----+---+
| a | b  |  c|
+---+----+---+
```

**Before**

```scala
df.write.csv("/tmp/text.csv")
spark.read.text("/tmp/text.csv").show()
```

```
+-----+
|value|
+-----+
|a,b,c|
+-----+
```

It seems this can't be worked around via `quoteAll` too.

```scala
df.write.option("quoteAll", true).csv("/tmp/text.csv")
spark.read.text("/tmp/text.csv").show()
```
```
+-----------+
|      value|
+-----------+
|"a","b","c"|
+-----------+
```

**After**

```scala
df.write.option("ignoreLeadingWhiteSpace", false).option("ignoreTrailingWhiteSpace", false).csv("/tmp/text.csv")
spark.read.text("/tmp/text.csv").show()
```

```
+----------+
|     value|
+----------+
|a , b  , c|
+----------+
```

Note that this case is possible in R

```r
> system("cat text.csv")
f1,f2,f3
a , b  , c
> df <- read.csv(file="text.csv")
> df
  f1   f2 f3
1 a   b    c
> write.csv(df, file="text1.csv", quote=F, row.names=F)
> system("cat text1.csv")
f1,f2,f3
a , b  , c
```

## How was this patch tested?

Unit tests in `CSVSuite` and manual tests for Python.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17310 from HyukjinKwon/SPARK-18579.
2017-03-23 00:25:01 -07:00
hyukjinkwon 465818389a [SPARK-19949][SQL][FOLLOW-UP] Clean up parse modes and update related comments
## What changes were proposed in this pull request?

This PR proposes to make `mode` options in both CSV and JSON to use `cass object` and fix some related comments related previous fix.

Also, this PR modifies some tests related parse modes.

## How was this patch tested?

Modified unit tests in both `CSVSuite.scala` and `JsonSuite.scala`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17377 from HyukjinKwon/SPARK-19949.
2017-03-22 09:52:37 -07:00
Liwei Lin e1ac553402 [SPARK-19817][SS] Make it clear that timeZone is a general option in DataStreamReader/Writer
## What changes were proposed in this pull request?

As timezone setting can also affect partition values, it works for all formats, we should make it clear.

## How was this patch tested?

N/A

Author: Liwei Lin <lwlin7@gmail.com>

Closes #17299 from lw-lin/timezone.
2017-03-14 22:30:16 -07:00
Felix Cheung 8d6ef895ee [SPARK-18352][DOCS] wholeFile JSON update doc and programming guide
## What changes were proposed in this pull request?

Update doc for R, programming guide. Clarify default behavior for all languages.

## How was this patch tested?

manually

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #17128 from felixcheung/jsonwholefiledoc.
2017-03-02 01:02:38 -08:00
hyukjinkwon 7e5359be5c [SPARK-19610][SQL] Support parsing multiline CSV files
## What changes were proposed in this pull request?

This PR proposes the support for multiple lines for CSV by resembling the multiline supports in JSON datasource (in case of JSON, per file).

So, this PR introduces `wholeFile` option which makes the format not splittable and reads each whole file. Since Univocity parser can produces each row from a stream, it should be capable of parsing very large documents when the internal rows are fix in the memory.

## How was this patch tested?

Unit tests in `CSVSuite` and `tests.py`

Manual tests with a single 9GB CSV file in local file system, for example,

```scala
spark.read.option("wholeFile", true).option("inferSchema", true).csv("tmp.csv").count()
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16976 from HyukjinKwon/SPARK-19610.
2017-02-28 13:34:33 -08:00
Takeshi Yamamuro 09ed6e7711 [SPARK-18699][SQL] Put malformed tokens into a new field when parsing CSV data
## What changes were proposed in this pull request?
This pr added a logic to put malformed tokens into a new field when parsing CSV data  in case of permissive modes. In the current master, if the CSV parser hits these malformed ones, it throws an exception below (and then a job fails);
```
Caused by: java.lang.IllegalArgumentException
	at java.sql.Date.valueOf(Date.java:143)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply$mcJ$sp(CSVInferSchema.scala:272)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272)
	at scala.util.Try.getOrElse(Try.scala:79)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:269)
	at
```
In case that users load large CSV-formatted data, the job failure makes users get some confused. So, this fix set NULL for original columns and put malformed tokens in a new field.

## How was this patch tested?
Added tests in `CSVSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #16928 from maropu/SPARK-18699-2.
2017-02-23 12:09:36 -08:00
Nathan Howell 21fde57f15 [SPARK-18352][SQL] Support parsing multiline json files
## What changes were proposed in this pull request?

If a new option `wholeFile` is set to `true` the JSON reader will parse each file (instead of a single line) as a value. This is done with Jackson streaming and it should be capable of parsing very large documents, assuming the row will fit in memory.

Because the file is not buffered in memory the corrupt record handling is also slightly different when `wholeFile` is enabled: the corrupt column will contain the filename instead of the literal JSON if there is a parsing failure. It would be easy to extend this to add the parser location (line, column and byte offsets) to the output if desired.

These changes have allowed types other than `String` to be parsed. Support for `UTF8String` and `Text` have been added (alongside `String` and `InputFormat`) and no longer require a conversion to `String` just for parsing.

I've also included a few other changes that generate slightly better bytecode and (imo) make it more obvious when and where boxing is occurring in the parser. These are included as separate commits, let me know if they should be flattened into this PR or moved to a new one.

## How was this patch tested?

New and existing unit tests. No performance or load tests have been run.

Author: Nathan Howell <nhowell@godaddy.com>

Closes #16386 from NathanHowell/SPARK-18352.
2017-02-16 20:51:19 -08:00
Takuya UESHIN 865b2fd84c [SPARK-18937][SQL] Timezone support in CSV/JSON parsing
## What changes were proposed in this pull request?

This is a follow-up pr of #16308.

This pr enables timezone support in CSV/JSON parsing.

We should introduce `timeZone` option for CSV/JSON datasources (the default value of the option is session local timezone).

The datasources should use the `timeZone` option to format/parse to write/read timestamp values.
Notice that while reading, if the timestampFormat has the timezone info, the timezone will not be used because we should respect the timezone in the values.

For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values written with the default timezone option, which is `"GMT"` because session local timezone is `"GMT"` here, are:

```scala
scala> spark.conf.set("spark.sql.session.timeZone", "GMT")

scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> df.show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+

scala> df.write.json("/path/to/gmtjson")
```

```sh
$ cat /path/to/gmtjson/part-*
{"ts":"2016-01-01T00:00:00.000Z"}
```

whereas setting the option to `"PST"`, they are:

```scala
scala> df.write.option("timeZone", "PST").json("/path/to/pstjson")
```

```sh
$ cat /path/to/pstjson/part-*
{"ts":"2015-12-31T16:00:00.000-08:00"}
```

We can properly read these files even if the timezone option is wrong because the timestamp values have timezone info:

```scala
scala> val schema = new StructType().add("ts", TimestampType)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true))

scala> spark.read.schema(schema).json("/path/to/gmtjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+

scala> spark.read.schema(schema).option("timeZone", "PST").json("/path/to/gmtjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+
```

And even if `timezoneFormat` doesn't contain timezone info, we can properly read the values with setting correct timezone option:

```scala
scala> df.write.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson")
```

```sh
$ cat /path/to/jstjson/part-*
{"ts":"2016-01-01T09:00:00"}
```

```scala
// wrong result
scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").json("/path/to/jstjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 09:00:00|
+-------------------+

// correct result
scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+
```

This pr also makes `JsonToStruct` and `StructToJson` `TimeZoneAwareExpression` to be able to evaluate values with timezone option.

## How was this patch tested?

Existing tests and added some tests.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #16750 from ueshin/issues/SPARK-18937.
2017-02-15 13:26:34 -08:00
Shixiong Zhu bc6c56e940 [SPARK-19140][SS] Allow update mode for non-aggregation streaming queries
## What changes were proposed in this pull request?

This PR allow update mode for non-aggregation streaming queries. It will be same as the append mode if a query has no aggregations.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16520 from zsxwing/update-without-agg.
2017-01-10 17:58:11 -08:00
Burak Yavuz 0917c8ee07 [SPARK-18888] partitionBy in DataStreamWriter in Python throws _to_seq not defined
## What changes were proposed in this pull request?

`_to_seq` wasn't imported.

## How was this patch tested?

Added partitionBy to existing write path unit test

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #16297 from brkyvz/SPARK-18888.
2016-12-15 14:26:54 -08:00
Shixiong Zhu 1ac6567bdb [SPARK-18852][SS] StreamingQuery.lastProgress should be null when recentProgress is empty
## What changes were proposed in this pull request?

Right now `StreamingQuery.lastProgress` throws NoSuchElementException and it's hard to be used in Python since Python user will just see Py4jError.

This PR just makes it return null instead.

## How was this patch tested?

`test("lastProgress should be null when recentProgress is empty")`

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16273 from zsxwing/SPARK-18852.
2016-12-14 13:36:41 -08:00
Michael Armbrust 70b2bf717d [SPARK-18754][SS] Rename recentProgresses to recentProgress
Based on an informal survey, users find this option easier to understand / remember.

Author: Michael Armbrust <michael@databricks.com>

Closes #16182 from marmbrus/renameRecentProgress.
2016-12-07 15:36:29 -08:00