## What changes were proposed in this pull request?
This PR deals with four points as below:
- Reuse existing DDL parser APIs rather than reimplementing within PySpark
- Support DDL formatted string, `field type, field type`.
- Support case-insensitivity for parsing.
- Support nested data types as below:
**Before**
```
>>> spark.createDataFrame([[[1]]], "struct<a: struct<b: int>>").show()
...
ValueError: The strcut field string format is: 'field_name:field_type', but got: a: struct<b: int>
```
```
>>> spark.createDataFrame([[[1]]], "a: struct<b: int>").show()
...
ValueError: The strcut field string format is: 'field_name:field_type', but got: a: struct<b: int>
```
```
>>> spark.createDataFrame([[1]], "a int").show()
...
ValueError: Could not parse datatype: a int
```
**After**
```
>>> spark.createDataFrame([[[1]]], "struct<a: struct<b: int>>").show()
+---+
| a|
+---+
|[1]|
+---+
```
```
>>> spark.createDataFrame([[[1]]], "a: struct<b: int>").show()
+---+
| a|
+---+
|[1]|
+---+
```
```
>>> spark.createDataFrame([[1]], "a int").show()
+---+
| a|
+---+
| 1|
+---+
```
## How was this patch tested?
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#18590 from HyukjinKwon/deduplicate-python-ddl.
## What changes were proposed in this pull request?
Integrate Apache Arrow with Spark to increase performance of `DataFrame.toPandas`. This has been done by using Arrow to convert data partitions on the executor JVM to Arrow payload byte arrays where they are then served to the Python process. The Python DataFrame can then collect the Arrow payloads where they are combined and converted to a Pandas DataFrame. Data types except complex, date, timestamp, and decimal are currently supported, otherwise an `UnsupportedOperation` exception is thrown.
Additions to Spark include a Scala package private method `Dataset.toArrowPayload` that will convert data partitions in the executor JVM to `ArrowPayload`s as byte arrays so they can be easily served. A package private class/object `ArrowConverters` that provide data type mappings and conversion routines. In Python, a private method `DataFrame._collectAsArrow` is added to collect Arrow payloads and a SQLConf "spark.sql.execution.arrow.enable" can be used in `toPandas()` to enable using Arrow (uses the old conversion by default).
## How was this patch tested?
Added a new test suite `ArrowConvertersSuite` that will run tests on conversion of Datasets to Arrow payloads for supported types. The suite will generate a Dataset and matching Arrow JSON data, then the dataset is converted to an Arrow payload and finally validated against the JSON data. This will ensure that the schema and data has been converted correctly.
Added PySpark tests to verify the `toPandas` method is producing equal DataFrames with and without pyarrow. A roundtrip test to ensure the pandas DataFrame produced by pyspark is equal to a one made directly with pandas.
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Li Jin <ice.xelloss@gmail.com>
Author: Li Jin <li.jin@twosigma.com>
Author: Wes McKinney <wes.mckinney@twosigma.com>
Closes#18459 from BryanCutler/toPandas_with_arrow-SPARK-13534.
## What changes were proposed in this pull request?
Currently `ArrayConstructor` handles an array of typecode `'l'` as `int` when converting Python object in Python 2 into Java object, so if the value is larger than `Integer.MAX_VALUE` or smaller than `Integer.MIN_VALUE` then the overflow occurs.
```python
import array
data = [Row(longarray=array.array('l', [-9223372036854775808, 0, 9223372036854775807]))]
df = spark.createDataFrame(data)
df.show(truncate=False)
```
```
+----------+
|longarray |
+----------+
|[0, 0, -1]|
+----------+
```
This should be:
```
+----------------------------------------------+
|longarray |
+----------------------------------------------+
|[-9223372036854775808, 0, 9223372036854775807]|
+----------------------------------------------+
```
## How was this patch tested?
Added a test and existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#18553 from ueshin/issues/SPARK-21327.
## What changes were proposed in this pull request?
Support register Java UDAFs in PySpark so that user can use Java UDAF in PySpark. Besides that I also add api in `UDFRegistration`
## How was this patch tested?
Unit test is added
Author: Jeff Zhang <zjffdu@apache.org>
Closes#17222 from zjffdu/SPARK-19439.
## What changes were proposed in this pull request?
**Context**
While reviewing https://github.com/apache/spark/pull/17227, I realised here we type-dispatch per record. The PR itself is fine in terms of performance as is but this prints a prefix, `"obj"` in exception message as below:
```
from pyspark.sql.types import *
schema = StructType([StructField('s', IntegerType(), nullable=False)])
spark.createDataFrame([["1"]], schema)
...
TypeError: obj.s: IntegerType can not accept object '1' in type <type 'str'>
```
I suggested to get rid of this but during investigating this, I realised my approach might bring a performance regression as it is a hot path.
Only for SPARK-19507 and https://github.com/apache/spark/pull/17227, It needs more changes to cleanly get rid of the prefix and I rather decided to fix both issues together.
**Propersal**
This PR tried to
- get rid of per-record type dispatch as we do in many code paths in Scala so that it improves the performance (roughly ~25% improvement) - SPARK-21296
This was tested with a simple code `spark.createDataFrame(range(1000000), "int")`. However, I am quite sure the actual improvement in practice is larger than this, in particular, when the schema is complicated.
- improve error message in exception describing field information as prose - SPARK-19507
## How was this patch tested?
Manually tested and unit tests were added in `python/pyspark/sql/tests.py`.
Benchmark - codes: https://gist.github.com/HyukjinKwon/c3397469c56cb26c2d7dd521ed0bc5a3
Error message - codes: https://gist.github.com/HyukjinKwon/b1b2c7f65865444c4a8836435100e398
**Before**
Benchmark:
- Results: https://gist.github.com/HyukjinKwon/4a291dab45542106301a0c1abcdca924
Error message
- Results: https://gist.github.com/HyukjinKwon/57b1916395794ce924faa32b14a3fe19
**After**
Benchmark
- Results: https://gist.github.com/HyukjinKwon/21496feecc4a920e50c4e455f836266e
Error message
- Results: https://gist.github.com/HyukjinKwon/7a494e4557fe32a652ce1236e504a395Closes#17227
Author: hyukjinkwon <gurwls223@gmail.com>
Author: David Gingrich <david@textio.com>
Closes#18521 from HyukjinKwon/python-type-dispatch.
## What changes were proposed in this pull request?
Currently, it throws a NPE when missing columns but join type is speicified in join at PySpark as below:
```python
spark.conf.set("spark.sql.crossJoin.enabled", "false")
spark.range(1).join(spark.range(1), how="inner").show()
```
```
Traceback (most recent call last):
...
py4j.protocol.Py4JJavaError: An error occurred while calling o66.join.
: java.lang.NullPointerException
at org.apache.spark.sql.Dataset.join(Dataset.scala:931)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
```
```python
spark.conf.set("spark.sql.crossJoin.enabled", "true")
spark.range(1).join(spark.range(1), how="inner").show()
```
```
...
py4j.protocol.Py4JJavaError: An error occurred while calling o84.join.
: java.lang.NullPointerException
at org.apache.spark.sql.Dataset.join(Dataset.scala:931)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
```
This PR suggests to follow Scala's one as below:
```scala
scala> spark.conf.set("spark.sql.crossJoin.enabled", "false")
scala> spark.range(1).join(spark.range(1), Seq.empty[String], "inner").show()
```
```
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Range (0, 1, step=1, splits=Some(8))
and
Range (0, 1, step=1, splits=Some(8))
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
...
```
```scala
scala> spark.conf.set("spark.sql.crossJoin.enabled", "true")
scala> spark.range(1).join(spark.range(1), Seq.empty[String], "inner").show()
```
```
+---+---+
| id| id|
+---+---+
| 0| 0|
+---+---+
```
**After**
```python
spark.conf.set("spark.sql.crossJoin.enabled", "false")
spark.range(1).join(spark.range(1), how="inner").show()
```
```
Traceback (most recent call last):
...
pyspark.sql.utils.AnalysisException: u'Detected cartesian product for INNER join between logical plans\nRange (0, 1, step=1, splits=Some(8))\nand\nRange (0, 1, step=1, splits=Some(8))\nJoin condition is missing or trivial.\nUse the CROSS JOIN syntax to allow cartesian products between these relations.;'
```
```python
spark.conf.set("spark.sql.crossJoin.enabled", "true")
spark.range(1).join(spark.range(1), how="inner").show()
```
```
+---+---+
| id| id|
+---+---+
| 0| 0|
+---+---+
```
## How was this patch tested?
Added tests in `python/pyspark/sql/tests.py`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#18484 from HyukjinKwon/SPARK-21264.
## What changes were proposed in this pull request?
Integrate Apache Arrow with Spark to increase performance of `DataFrame.toPandas`. This has been done by using Arrow to convert data partitions on the executor JVM to Arrow payload byte arrays where they are then served to the Python process. The Python DataFrame can then collect the Arrow payloads where they are combined and converted to a Pandas DataFrame. All non-complex data types are currently supported, otherwise an `UnsupportedOperation` exception is thrown.
Additions to Spark include a Scala package private method `Dataset.toArrowPayloadBytes` that will convert data partitions in the executor JVM to `ArrowPayload`s as byte arrays so they can be easily served. A package private class/object `ArrowConverters` that provide data type mappings and conversion routines. In Python, a public method `DataFrame.collectAsArrow` is added to collect Arrow payloads and an optional flag in `toPandas(useArrow=False)` to enable using Arrow (uses the old conversion by default).
## How was this patch tested?
Added a new test suite `ArrowConvertersSuite` that will run tests on conversion of Datasets to Arrow payloads for supported types. The suite will generate a Dataset and matching Arrow JSON data, then the dataset is converted to an Arrow payload and finally validated against the JSON data. This will ensure that the schema and data has been converted correctly.
Added PySpark tests to verify the `toPandas` method is producing equal DataFrames with and without pyarrow. A roundtrip test to ensure the pandas DataFrame produced by pyspark is equal to a one made directly with pandas.
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Li Jin <ice.xelloss@gmail.com>
Author: Li Jin <li.jin@twosigma.com>
Author: Wes McKinney <wes.mckinney@twosigma.com>
Closes#15821 from BryanCutler/wip-toPandas_with_arrow-SPARK-13534.
## What changes were proposed in this pull request?
Currently we convert a spark DataFrame to Pandas Dataframe by `pd.DataFrame.from_records`. It infers the data type from the data and doesn't respect the spark DataFrame Schema. This PR fixes it.
## How was this patch tested?
a new regression test
Author: hyukjinkwon <gurwls223@gmail.com>
Author: Wenchen Fan <wenchen@databricks.com>
Author: Wenchen Fan <cloud0fan@gmail.com>
Closes#18378 from cloud-fan/to_pandas.
## What changes were proposed in this pull request?
Add Python wrappers for `o.a.s.sql.functions.explode_outer` and `o.a.s.sql.functions.posexplode_outer`.
## How was this patch tested?
Unit tests, doctests.
Author: zero323 <zero323@users.noreply.github.com>
Closes#18049 from zero323/SPARK-20830.
### What changes were proposed in this pull request?
The current option name `wholeFile` is misleading for CSV users. Currently, it is not representing a record per file. Actually, one file could have multiple records. Thus, we should rename it. Now, the proposal is `multiLine`.
### How was this patch tested?
N/A
Author: Xiao Li <gatorsmile@gmail.com>
Closes#18202 from gatorsmile/renameCVSOption.
## What changes were proposed in this pull request?
Allow fill/replace of NAs with booleans, both in Python and Scala
## How was this patch tested?
Unit tests, doctests
This PR is original work from me and I license this work to the Spark project
Author: Ruben Berenguel Montoro <ruben@mostlymaths.net>
Author: Ruben Berenguel <ruben@mostlymaths.net>
Closes#18164 from rberenguel/SPARK-19732-fillna-bools.
## What changes were proposed in this pull request?
This PR proposes three things as below:
- Use casting rules to a timestamp in `to_timestamp` by default (it was `yyyy-MM-dd HH:mm:ss`).
- Support single argument for `to_timestamp` similarly with APIs in other languages.
For example, the one below works
```
import org.apache.spark.sql.functions._
Seq("2016-12-31 00:12:00.00").toDF("a").select(to_timestamp(col("a"))).show()
```
prints
```
+----------------------------------------+
|to_timestamp(`a`, 'yyyy-MM-dd HH:mm:ss')|
+----------------------------------------+
| 2016-12-31 00:12:00|
+----------------------------------------+
```
whereas this does not work in SQL.
**Before**
```
spark-sql> SELECT to_timestamp('2016-12-31 00:12:00');
Error in query: Invalid number of arguments for function to_timestamp; line 1 pos 7
```
**After**
```
spark-sql> SELECT to_timestamp('2016-12-31 00:12:00');
2016-12-31 00:12:00
```
- Related document improvement for SQL function descriptions and other API descriptions accordingly.
**Before**
```
spark-sql> DESCRIBE FUNCTION extended to_date;
...
Usage: to_date(date_str, fmt) - Parses the `left` expression with the `fmt` expression. Returns null with invalid input.
Extended Usage:
Examples:
> SELECT to_date('2016-12-31', 'yyyy-MM-dd');
2016-12-31
```
```
spark-sql> DESCRIBE FUNCTION extended to_timestamp;
...
Usage: to_timestamp(timestamp, fmt) - Parses the `left` expression with the `format` expression to a timestamp. Returns null with invalid input.
Extended Usage:
Examples:
> SELECT to_timestamp('2016-12-31', 'yyyy-MM-dd');
2016-12-31 00:00:00.0
```
**After**
```
spark-sql> DESCRIBE FUNCTION extended to_date;
...
Usage:
to_date(date_str[, fmt]) - Parses the `date_str` expression with the `fmt` expression to
a date. Returns null with invalid input. By default, it follows casting rules to a date if
the `fmt` is omitted.
Extended Usage:
Examples:
> SELECT to_date('2009-07-30 04:17:52');
2009-07-30
> SELECT to_date('2016-12-31', 'yyyy-MM-dd');
2016-12-31
```
```
spark-sql> DESCRIBE FUNCTION extended to_timestamp;
...
Usage:
to_timestamp(timestamp[, fmt]) - Parses the `timestamp` expression with the `fmt` expression to
a timestamp. Returns null with invalid input. By default, it follows casting rules to
a timestamp if the `fmt` is omitted.
Extended Usage:
Examples:
> SELECT to_timestamp('2016-12-31 00:12:00');
2016-12-31 00:12:00
> SELECT to_timestamp('2016-12-31', 'yyyy-MM-dd');
2016-12-31 00:00:00
```
## How was this patch tested?
Added tests in `datetime.sql`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#17901 from HyukjinKwon/to_timestamp_arg.
## What changes were proposed in this pull request?
There's a latent corner-case bug in PySpark UDF evaluation where executing a `BatchPythonEvaluation` with a single multi-argument UDF where _at least one argument value is repeated_ will crash at execution with a confusing error.
This problem was introduced in #12057: the code there has a fast path for handling a "batch UDF evaluation consisting of a single Python UDF", but that branch incorrectly assumes that a single UDF won't have repeated arguments and therefore skips the code for unpacking arguments from the input row (whose schema may not necessarily match the UDF inputs due to de-duplication of repeated arguments which occurred in the JVM before sending UDF inputs to Python).
This fix here is simply to remove this special-casing: it turns out that the code in the "multiple UDFs" branch just so happens to work for the single-UDF case because Python treats `(x)` as equivalent to `x`, not as a single-argument tuple.
## How was this patch tested?
New regression test in `pyspark.python.sql.tests` module (tested and confirmed that it fails before my fix).
Author: Josh Rosen <joshrosen@databricks.com>
Closes#17927 from JoshRosen/SPARK-20685.
## What changes were proposed in this pull request?
Adds Python wrappers for `DataFrameWriter.bucketBy` and `DataFrameWriter.sortBy` ([SPARK-16931](https://issues.apache.org/jira/browse/SPARK-16931))
## How was this patch tested?
Unit tests covering new feature.
__Note__: Based on work of GregBowyer (f49b9a23468f7af32cb53d2b654272757c151725)
CC HyukjinKwon
Author: zero323 <zero323@users.noreply.github.com>
Author: Greg Bowyer <gbowyer@fastmail.co.uk>
Closes#17077 from zero323/SPARK-16931.
## What changes were proposed in this pull request?
- Move udf wrapping code from `functions.udf` to `functions.UserDefinedFunction`.
- Return wrapped udf from `catalog.registerFunction` and dependent methods.
- Update docstrings in `catalog.registerFunction` and `SQLContext.registerFunction`.
- Unit tests.
## How was this patch tested?
- Existing unit tests and docstests.
- Additional tests covering new feature.
Author: zero323 <zero323@users.noreply.github.com>
Closes#17831 from zero323/SPARK-18777.
## What changes were proposed in this pull request?
Adds `hint` method to PySpark `DataFrame`.
## How was this patch tested?
Unit tests, doctests.
Author: zero323 <zero323@users.noreply.github.com>
Closes#17850 from zero323/SPARK-20584.
## What changes were proposed in this pull request?
Adds Python bindings for `Column.eqNullSafe`
## How was this patch tested?
Manual tests, existing unit tests, doc build.
Author: zero323 <zero323@users.noreply.github.com>
Closes#17605 from zero323/SPARK-20290.
## What changes were proposed in this pull request?
Currently pyspark Dataframe.fillna API supports boolean type when we pass dict, but it is missing in documentation.
## How was this patch tested?
>>> spark.createDataFrame([Row(a=True),Row(a=None)]).fillna({"a" : True}).show()
+----+
| a|
+----+
|true|
|true|
+----+
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Srinivasa Reddy Vundela <vsr@cloudera.com>
Closes#17688 from vundela/fillna_doc_fix.
## What changes were proposed in this pull request?
- Allows skipping `value` argument if `to_replace` is a `dict`:
```python
df = sc.parallelize([("Alice", 1, 3.0)]).toDF()
df.replace({"Alice": "Bob"}).show()
````
- Adds validation step to ensure homogeneous values / replacements.
- Simplifies internal control flow.
- Improves unit tests coverage.
## How was this patch tested?
Existing unit tests, additional unit tests, manual testing.
Author: zero323 <zero323@users.noreply.github.com>
Closes#16793 from zero323/SPARK-19454.
## What changes were proposed in this pull request?
This PR proposes to match minor documentations changes in https://github.com/apache/spark/pull/17399 and https://github.com/apache/spark/pull/17380 to R/Python.
## How was this patch tested?
Manual tests in Python , Python tests via `./python/run-tests.py --module=pyspark-sql` and lint-checks for Python/R.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#17429 from HyukjinKwon/minor-match-doc.
## What changes were proposed in this pull request?
An additional trigger and trigger executor that will execute a single trigger only. One can use this OneTime trigger to have more control over the scheduling of triggers.
In addition, this patch requires an optimization to StreamExecution that logs a commit record at the end of successfully processing a batch. This new commit log will be used to determine the next batch (offsets) to process after a restart, instead of using the offset log itself to determine what batch to process next after restart; using the offset log to determine this would process the previously logged batch, always, thus not permitting a OneTime trigger feature.
## How was this patch tested?
A number of existing tests have been revised. These tests all assumed that when restarting a stream, the last batch in the offset log is to be re-processed. Given that we now have a commit log that will tell us if that last batch was processed successfully, the results/assumptions of those tests needed to be revised accordingly.
In addition, a OneTime trigger test was added to StreamingQuerySuite, which tests:
- The semantics of OneTime trigger (i.e., on start, execute a single batch, then stop).
- The case when the commit log was not able to successfully log the completion of a batch before restart, which would mean that we should fall back to what's in the offset log.
- A OneTime trigger execution that results in an exception being thrown.
marmbrus tdas zsxwing
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Tyson Condie <tcondie@gmail.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#17219 from tcondie/stream-commit.
## What changes were proposed in this pull request?
This PR proposes to support _not_ trimming the white spaces when writing out. These are `false` by default in CSV reading path but these are `true` by default in CSV writing in univocity parser.
Both `ignoreLeadingWhiteSpace` and `ignoreTrailingWhiteSpace` options are not being used for writing and therefore, we are always trimming the white spaces.
It seems we should provide a way to keep this white spaces easily.
WIth the data below:
```scala
val df = spark.read.csv(Seq("a , b , c").toDS)
df.show()
```
```
+---+----+---+
|_c0| _c1|_c2|
+---+----+---+
| a | b | c|
+---+----+---+
```
**Before**
```scala
df.write.csv("/tmp/text.csv")
spark.read.text("/tmp/text.csv").show()
```
```
+-----+
|value|
+-----+
|a,b,c|
+-----+
```
It seems this can't be worked around via `quoteAll` too.
```scala
df.write.option("quoteAll", true).csv("/tmp/text.csv")
spark.read.text("/tmp/text.csv").show()
```
```
+-----------+
| value|
+-----------+
|"a","b","c"|
+-----------+
```
**After**
```scala
df.write.option("ignoreLeadingWhiteSpace", false).option("ignoreTrailingWhiteSpace", false).csv("/tmp/text.csv")
spark.read.text("/tmp/text.csv").show()
```
```
+----------+
| value|
+----------+
|a , b , c|
+----------+
```
Note that this case is possible in R
```r
> system("cat text.csv")
f1,f2,f3
a , b , c
> df <- read.csv(file="text.csv")
> df
f1 f2 f3
1 a b c
> write.csv(df, file="text1.csv", quote=F, row.names=F)
> system("cat text1.csv")
f1,f2,f3
a , b , c
```
## How was this patch tested?
Unit tests in `CSVSuite` and manual tests for Python.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#17310 from HyukjinKwon/SPARK-18579.
Beside the issue in spark api, also fix 2 minor issues in pyspark
- support read from multiple input paths for orc
- support read from multiple input paths for text
Author: Jeff Zhang <zjffdu@apache.org>
Closes#10307 from zjffdu/SPARK-12334.
## What changes were proposed in this pull request?
Add handling of input of type `Int` for dataType `TimestampType` to `EvaluatePython.scala`. Py4J serializes ints smaller than MIN_INT or larger than MAX_INT to Long, which are handled correctly already, but values between MIN_INT and MAX_INT are serialized to Int.
These range limits correspond to roughly half an hour on either side of the epoch. As a result, PySpark doesn't allow TimestampType values to be created in this range.
Alternatives attempted: patching the `TimestampType.toInternal` function to cast return values to `long`, so Py4J would always serialize them to Scala Long. Python3 does not have a `long` type, so this approach failed on Python3.
## How was this patch tested?
Added a new PySpark-side test that fails without the change.
The contribution is my original work and I license the work to the project under the project’s open source license.
Resubmission of https://github.com/apache/spark/pull/16896. The original PR didn't go through Jenkins and broke the build. davies dongjoon-hyun
cloud-fan Could you kick off a Jenkins run for me? It passed everything for me locally, but it's possible something has changed in the last few weeks.
Author: Jason White <jason.white@shopify.com>
Closes#17200 from JasonMWhite/SPARK-19561.
## What changes were proposed in this pull request?
Cast the output of `TimestampType.toInternal` to long to allow for proper Timestamp creation in DataFrames near the epoch.
## How was this patch tested?
Added a new test that fails without the change.
dongjoon-hyun davies Mind taking a look?
The contribution is my original work and I license the work to the project under the project’s open source license.
Author: Jason White <jason.white@shopify.com>
Closes#16896 from JasonMWhite/SPARK-19561.
## What changes were proposed in this pull request?
This PR proposes to remove incorrect implementation that has been not executed so far (at least from Spark 1.5.2) for `in` operator and throw a correct exception rather than saying it is a bool. I tested the codes above in 1.5.2, 1.6.3, 2.1.0 and in the master branch as below:
**1.5.2**
```python
>>> df = sqlContext.createDataFrame([[1]])
>>> 1 in df._1
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark-1.5.2-bin-hadoop2.6/python/pyspark/sql/column.py", line 418, in __nonzero__
raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
```
**1.6.3**
```python
>>> 1 in sqlContext.range(1).id
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark-1.6.3-bin-hadoop2.6/python/pyspark/sql/column.py", line 447, in __nonzero__
raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
```
**2.1.0**
```python
>>> 1 in spark.range(1).id
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/column.py", line 426, in __nonzero__
raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
```
**Current Master**
```python
>>> 1 in spark.range(1).id
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark/python/pyspark/sql/column.py", line 452, in __nonzero__
raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
```
**After**
```python
>>> 1 in spark.range(1).id
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark/python/pyspark/sql/column.py", line 184, in __contains__
raise ValueError("Cannot apply 'in' operator against a column: please use 'contains' "
ValueError: Cannot apply 'in' operator against a column: please use 'contains' in a string column or 'array_contains' function for an array column.
```
In more details,
It seems the implementation intended to support this
```python
1 in df.column
```
However, currently, it throws an exception as below:
```python
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark/python/pyspark/sql/column.py", line 426, in __nonzero__
raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
```
What happens here is as below:
```python
class Column(object):
def __contains__(self, item):
print "I am contains"
return Column()
def __nonzero__(self):
raise Exception("I am nonzero.")
>>> 1 in Column()
I am contains
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 6, in __nonzero__
Exception: I am nonzero.
```
It seems it calls `__contains__` first and then `__nonzero__` or `__bool__` is being called against `Column()` to make this a bool (or int to be specific).
It seems `__nonzero__` (for Python 2), `__bool__` (for Python 3) and `__contains__` forcing the the return into a bool unlike other operators. There are few references about this as below:
https://bugs.python.org/issue16011http://stackoverflow.com/questions/12244074/python-source-code-for-built-in-in-operator/12244378#12244378http://stackoverflow.com/questions/38542543/functionality-of-python-in-vs-contains/38542777
It seems we can't overwrite `__nonzero__` or `__bool__` as a workaround to make this working because these force the return type as a bool as below:
```python
class Column(object):
def __contains__(self, item):
print "I am contains"
return Column()
def __nonzero__(self):
return "a"
>>> 1 in Column()
I am contains
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: __nonzero__ should return bool or int, returned str
```
## How was this patch tested?
Added unit tests in `tests.py`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#17160 from HyukjinKwon/SPARK-19701.
## What changes were proposed in this pull request?
This PR proposes the support for multiple lines for CSV by resembling the multiline supports in JSON datasource (in case of JSON, per file).
So, this PR introduces `wholeFile` option which makes the format not splittable and reads each whole file. Since Univocity parser can produces each row from a stream, it should be capable of parsing very large documents when the internal rows are fix in the memory.
## How was this patch tested?
Unit tests in `CSVSuite` and `tests.py`
Manual tests with a single 9GB CSV file in local file system, for example,
```scala
spark.read.option("wholeFile", true).option("inferSchema", true).csv("tmp.csv").count()
```
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16976 from HyukjinKwon/SPARK-19610.
## What changes were proposed in this pull request?
Replaces `UserDefinedFunction` object returned from `udf` with a function wrapper providing docstring and arguments information as proposed in [SPARK-19161](https://issues.apache.org/jira/browse/SPARK-19161).
### Backward incompatible changes:
- `pyspark.sql.functions.udf` will return a `function` instead of `UserDefinedFunction`. To ensure backward compatible public API we use function attributes to mimic `UserDefinedFunction` API (`func` and `returnType` attributes). This should have a minimal impact on the user code.
An alternative implementation could use dynamical sub-classing. This would ensure full backward compatibility but is more fragile in practice.
### Limitations:
Full functionality (retained docstring and argument list) is achieved only in the recent Python version. Legacy Python version will preserve only docstrings, but not argument list. This should be an acceptable trade-off between achieved improvements and overall complexity.
### Possible impact on other tickets:
This can affect [SPARK-18777](https://issues.apache.org/jira/browse/SPARK-18777).
## How was this patch tested?
Existing unit tests to ensure backward compatibility, additional tests targeting proposed changes.
Author: zero323 <zero323@users.noreply.github.com>
Closes#16534 from zero323/SPARK-19161.
## What changes were proposed in this pull request?
to be consistent with the scala API, we should also add `contains` to `Column` in pyspark.
## How was this patch tested?
updated unit test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#17036 from cloud-fan/pyspark.
## What changes were proposed in this pull request?
If a new option `wholeFile` is set to `true` the JSON reader will parse each file (instead of a single line) as a value. This is done with Jackson streaming and it should be capable of parsing very large documents, assuming the row will fit in memory.
Because the file is not buffered in memory the corrupt record handling is also slightly different when `wholeFile` is enabled: the corrupt column will contain the filename instead of the literal JSON if there is a parsing failure. It would be easy to extend this to add the parser location (line, column and byte offsets) to the output if desired.
These changes have allowed types other than `String` to be parsed. Support for `UTF8String` and `Text` have been added (alongside `String` and `InputFormat`) and no longer require a conversion to `String` just for parsing.
I've also included a few other changes that generate slightly better bytecode and (imo) make it more obvious when and where boxing is occurring in the parser. These are included as separate commits, let me know if they should be flattened into this PR or moved to a new one.
## How was this patch tested?
New and existing unit tests. No performance or load tests have been run.
Author: Nathan Howell <nhowell@godaddy.com>
Closes#16386 from NathanHowell/SPARK-18352.
## What changes were proposed in this pull request?
This PR adds `udf` decorator syntax as proposed in [SPARK-19160](https://issues.apache.org/jira/browse/SPARK-19160).
This allows users to define UDF using simplified syntax:
```python
from pyspark.sql.decorators import udf
udf(IntegerType())
def add_one(x):
"""Adds one"""
if x is not None:
return x + 1
```
without need to define a separate function and udf.
## How was this patch tested?
Existing unit tests to ensure backward compatibility and additional unit tests covering new functionality.
Author: zero323 <zero323@users.noreply.github.com>
Closes#16533 from zero323/SPARK-19160.
## What changes were proposed in this pull request?
Add a `metadata` keyword parameter to `pyspark.sql.Column.alias()` to allow users to mix-in metadata while manipulating `DataFrame`s in `pyspark`. Without this, I believe it was necessary to pass back through `SparkSession.createDataFrame` each time a user wanted to manipulate `StructField.metadata` in `pyspark`.
This pull request also improves consistency between the Scala and Python APIs (i.e. I did not add any functionality that was not already in the Scala API).
Discussed ahead of time on JIRA with marmbrus
## How was this patch tested?
Added unit tests (and doc tests). Ran the pertinent tests manually.
Author: Sheamus K. Parkes <shea.parkes@milliman.com>
Closes#16094 from shea-parkes/pyspark-column-alias-metadata.
## What changes were proposed in this pull request?
UDF constructor checks if `func` argument is callable and if it is not, fails fast instead of waiting for an action.
## How was this patch tested?
Unit tests.
Author: zero323 <zero323@users.noreply.github.com>
Closes#16535 from zero323/SPARK-19162.
## What changes were proposed in this pull request?
- Add support for `slice` arguments in `Column.__getitem__`.
- Remove obsolete `__getslice__` bindings.
## How was this patch tested?
Existing unit tests, additional tests covering `[]` with `slice`.
Author: zero323 <zero323@users.noreply.github.com>
Closes#16771 from zero323/SPARK-19429.
## What changes were proposed in this pull request?
Add support for data type string as a return type argument of `UserDefinedFunction`:
```python
f = udf(lambda x: x, "integer")
f.returnType
## IntegerType
```
## How was this patch tested?
Existing unit tests, additional unit tests covering new feature.
Author: zero323 <zero323@users.noreply.github.com>
Closes#16769 from zero323/SPARK-19427.
## What changes were proposed in this pull request?
This pull request adds two new user facing functions:
- `to_date` which accepts an expression and a format and returns a date.
- `to_timestamp` which accepts an expression and a format and returns a timestamp.
For example, Given a date in format: `2016-21-05`. (YYYY-dd-MM)
### Date Function
*Previously*
```
to_date(unix_timestamp(lit("2016-21-05"), "yyyy-dd-MM").cast("timestamp"))
```
*Current*
```
to_date(lit("2016-21-05"), "yyyy-dd-MM")
```
### Timestamp Function
*Previously*
```
unix_timestamp(lit("2016-21-05"), "yyyy-dd-MM").cast("timestamp")
```
*Current*
```
to_timestamp(lit("2016-21-05"), "yyyy-dd-MM")
```
### Tasks
- [X] Add `to_date` to Scala Functions
- [x] Add `to_date` to Python Functions
- [x] Add `to_date` to SQL Functions
- [X] Add `to_timestamp` to Scala Functions
- [x] Add `to_timestamp` to Python Functions
- [x] Add `to_timestamp` to SQL Functions
- [x] Add function to R
## How was this patch tested?
- [x] Add Functions to `DateFunctionsSuite`
- Test new `ParseToTimestamp` Expression (*not necessary*)
- Test new `ParseToDate` Expression (*not necessary*)
- [x] Add test for R
- [x] Add test for Python in test.py
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: anabranch <wac.chambers@gmail.com>
Author: Bill Chambers <bill@databricks.com>
Author: anabranch <bill@databricks.com>
Closes#16138 from anabranch/SPARK-16609.
## What changes were proposed in this pull request?
1, add the multi-cols support based on current private api
2, add the multi-cols support to pyspark
## How was this patch tested?
unit tests
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Author: Ruifeng Zheng <ruifengz@foxmail.com>
Closes#12135 from zhengruifeng/quantile4multicols.
## What changes were proposed in this pull request?
Defer `UserDefinedFunction._judf` initialization to the first call. This prevents unintended `SparkSession` initialization. This allows users to define and import UDF without creating a context / session as a side effect.
[SPARK-19163](https://issues.apache.org/jira/browse/SPARK-19163)
## How was this patch tested?
Unit tests.
Author: zero323 <zero323@users.noreply.github.com>
Closes#16536 from zero323/SPARK-19163.
### What changes were proposed in this pull request?
It is weird to create Hive source tables when using InMemoryCatalog. We are unable to operate it. This PR is to block users to create Hive source tables.
### How was this patch tested?
Fixed the test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes#16587 from gatorsmile/blockHiveTable.
## What changes were proposed in this pull request?
PythonUDF is unevaluable, which can not be used inside a join condition, currently the optimizer will push a PythonUDF which accessing both side of join into the join condition, then the query will fail to plan.
This PR fix this issue by checking the expression is evaluable or not before pushing it into Join.
## How was this patch tested?
Add a regression test.
Author: Davies Liu <davies@databricks.com>
Closes#16581 from davies/pyudf_join.
## What changes were proposed in this pull request?
For some datasources which are based on HadoopRDD or NewHadoopRDD, such as spark-xml, InputFileBlockHolder doesn't work with Python UDF.
The method to reproduce it is, running the following codes with `bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`:
from pyspark.sql.functions import udf,input_file_name
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession
def filename(path):
return path
session = SparkSession.builder.appName('APP').getOrCreate()
session.udf.register('sameText', filename)
sameText = udf(filename, StringType())
df = session.read.format('xml').load('a.xml', rowTag='root').select('*', input_file_name().alias('file'))
df.select('file').show() # works
df.select(sameText(df['file'])).show() # returns empty content
The issue is because in `HadoopRDD` and `NewHadoopRDD` we set the file block's info in `InputFileBlockHolder` before the returned iterator begins consuming. `InputFileBlockHolder` will record this info into thread local variable. When running Python UDF in batch, we set up another thread to consume the iterator from child plan's output rdd, so we can't read the info back in another thread.
To fix this, we have to set the info in `InputFileBlockHolder` after the iterator begins consuming. So the info can be read in correct thread.
## How was this patch tested?
Manual test with above example codes for spark-xml package on pyspark: `bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`.
Added pyspark test.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#16585 from viirya/fix-inputfileblock-hadooprdd.
Change is for SQLContext to reuse the active SparkSession during construction if the sparkContext supplied is the same as the currently active SparkContext. Without this change, a new SparkSession is instantiated that results in a Derby error when attempting to create a dataframe using a new SQLContext object even though the SparkContext supplied to the new SQLContext is same as the currently active one. Refer https://issues.apache.org/jira/browse/SPARK-18687 for details on the error and a repro.
Existing unit tests and a new unit test added to pyspark-sql:
/python/run-tests --python-executables=python --modules=pyspark-sql
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Vinayak <vijoshi5@in.ibm.com>
Author: Vinayak Joshi <vijoshi@users.noreply.github.com>
Closes#16119 from vijoshi/SPARK-18687_master.
## What changes were proposed in this pull request?
In SparkSession initialization, we store created the instance of SparkSession into a class variable _instantiatedContext. Next time we can use SparkSession.builder.getOrCreate() to retrieve the existing SparkSession instance.
However, when the active SparkContext is stopped and we create another new SparkContext to use, the existing SparkSession is still associated with the stopped SparkContext. So the operations with this existing SparkSession will be failed.
We need to detect such case in SparkSession and renew the class variable _instantiatedContext if needed.
## How was this patch tested?
New test added in PySpark.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#16454 from viirya/fix-pyspark-sparksession.
## What changes were proposed in this pull request?
`_to_seq` wasn't imported.
## How was this patch tested?
Added partitionBy to existing write path unit test
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#16297 from brkyvz/SPARK-18888.
## What changes were proposed in this pull request?
Right now `StreamingQuery.lastProgress` throws NoSuchElementException and it's hard to be used in Python since Python user will just see Py4jError.
This PR just makes it return null instead.
## How was this patch tested?
`test("lastProgress should be null when recentProgress is empty")`
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16273 from zsxwing/SPARK-18852.
### What changes were proposed in this pull request?
Currently, when users use Python UDF in Filter, BatchEvalPython is always generated below FilterExec. However, not all the predicates need to be evaluated after Python UDF execution. Thus, this PR is to push down the determinisitc predicates through `BatchEvalPython`.
```Python
>>> df = spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
>>> from pyspark.sql.functions import udf, col
>>> from pyspark.sql.types import BooleanType
>>> my_filter = udf(lambda a: a < 2, BooleanType())
>>> sel = df.select(col("key"), col("value")).filter((my_filter(col("key"))) & (df.value < "2"))
>>> sel.explain(True)
```
Before the fix, the plan looks like
```
== Optimized Logical Plan ==
Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
+- LogicalRDD [key#0L, value#1]
== Physical Plan ==
*Project [key#0L, value#1]
+- *Filter ((isnotnull(value#1) && pythonUDF0#9) && (value#1 < 2))
+- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, pythonUDF0#9]
+- Scan ExistingRDD[key#0L,value#1]
```
After the fix, the plan looks like
```
== Optimized Logical Plan ==
Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
+- LogicalRDD [key#0L, value#1]
== Physical Plan ==
*Project [key#0L, value#1]
+- *Filter pythonUDF0#9: boolean
+- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, pythonUDF0#9]
+- *Filter (isnotnull(value#1) && (value#1 < 2))
+- Scan ExistingRDD[key#0L,value#1]
```
### How was this patch tested?
Added both unit test cases for `BatchEvalPythonExec` and also add an end-to-end test case in Python test suite.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#16193 from gatorsmile/pythonUDFPredicatePushDown.
## What changes were proposed in this pull request?
`input_file_name` doesn't return filename when working with UDF in PySpark. An example shows the problem:
from pyspark.sql.functions import *
from pyspark.sql.types import *
def filename(path):
return path
sourceFile = udf(filename, StringType())
spark.read.json("tmp.json").select(sourceFile(input_file_name())).show()
+---------------------------+
|filename(input_file_name())|
+---------------------------+
| |
+---------------------------+
The cause of this issue is, we group rows in `BatchEvalPythonExec` for batching processing of PythonUDF. Currently we group rows first and then evaluate expressions on the rows. If the data is less than the required number of rows for a group, the iterator will be consumed to the end before the evaluation. However, once the iterator reaches the end, we will unset input filename. So the input_file_name expression can't return correct filename.
This patch fixes the approach to group the batch of rows. We evaluate the expression first and then group evaluated results to batch.
## How was this patch tested?
Added unit test to PySpark.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#16115 from viirya/fix-py-udf-input-filename.
Based on an informal survey, users find this option easier to understand / remember.
Author: Michael Armbrust <michael@databricks.com>
Closes#16182 from marmbrus/renameRecentProgress.