Commit graph

753 commits

Author SHA1 Message Date
hyukjinkwon c7622befda [SPARK-23847][FOLLOWUP][PYTHON][SQL] Actually test [desc|acs]_nulls_[first|last] functions in PySpark
## What changes were proposed in this pull request?

There was a mistake in `tests.py` missing `assertEquals`.

## How was this patch tested?

Fixed tests.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21035 from HyukjinKwon/SPARK-23847.
2018-04-11 19:42:09 +08:00
Huaxin Gao 2c1fe64757 [SPARK-23847][PYTHON][SQL] Add asc_nulls_first, asc_nulls_last to PySpark
## What changes were proposed in this pull request?

Column.scala and Functions.scala have asc_nulls_first, asc_nulls_last,  desc_nulls_first and desc_nulls_last. Add the corresponding python APIs in column.py and functions.py

## How was this patch tested?
Add doctest

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #20962 from huaxingao/spark-23847.
2018-04-08 12:09:06 +08:00
Li Jin d766ea2ff2 [SPARK-23861][SQL][DOC] Clarify default window frame with and without orderBy clause
## What changes were proposed in this pull request?

Add docstring to clarify default window frame boundaries with and without orderBy clause

## How was this patch tested?

Manually generate doc and check.

Author: Li Jin <ice.xelloss@gmail.com>

Closes #20978 from icexelloss/SPARK-23861-window-doc.
2018-04-07 00:15:54 +08:00
hyukjinkwon 34c4b9c57e [SPARK-23765][SQL] Supports custom line separator for json datasource
## What changes were proposed in this pull request?

This PR proposes to add lineSep option for a configurable line separator in text datasource.
It supports this option by using `LineRecordReader`'s functionality with passing it to the constructor.

The approach is similar with https://github.com/apache/spark/pull/20727; however, one main difference is, it uses text datasource's `lineSep` option to parse line by line in JSON's schema inference.

## How was this patch tested?

Manually tested and unit tests were added.

Author: hyukjinkwon <gurwls223@apache.org>
Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20877 from HyukjinKwon/linesep-json.
2018-03-28 19:49:27 +08:00
Bryan Cutler ed72badb04 [SPARK-23699][PYTHON][SQL] Raise same type of error caught with Arrow enabled
## What changes were proposed in this pull request?

When using Arrow for createDataFrame or toPandas and an error is encountered with fallback disabled, this will raise the same type of error instead of a RuntimeError.  This change also allows for the traceback of the error to be retained and prevents the accidental chaining of exceptions with Python 3.

## How was this patch tested?

Updated existing tests to verify error type.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #20839 from BryanCutler/arrow-raise-same-error-SPARK-23699.
2018-03-27 20:06:12 -07:00
Michael (Stu) Stewart 087fb31420 [SPARK-23645][MINOR][DOCS][PYTHON] Add docs RE pandas_udf with keyword args
## What changes were proposed in this pull request?

Add documentation about the limitations of `pandas_udf` with keyword arguments and related concepts, like `functools.partial` fn objects.

NOTE: intermediate commits on this PR show some of the steps that can be taken to fix some (but not all) of these pain points.

### Survey of problems we face today:

(Initialize) Note: python 3.6 and spark 2.4snapshot.
```
 from pyspark.sql import SparkSession
 import inspect, functools
 from pyspark.sql.functions import pandas_udf, PandasUDFType, col, lit, udf

 spark = SparkSession.builder.getOrCreate()
 print(spark.version)

 df = spark.range(1,6).withColumn('b', col('id') * 2)

 def ok(a,b): return a+b
```

Using a keyword argument at the call site `b=...` (and yes, *full* stack trace below, haha):
```
---> 14 df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')('id', b='id')).show() # no kwargs

TypeError: wrapper() got an unexpected keyword argument 'b'
```

Using partial with a keyword argument where the kw-arg is the first argument of the fn:
*(Aside: kind of interesting that lines 15,16 work great and then 17 explodes)*
```
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-9-e9f31b8799c1> in <module>()
     15 df.withColumn('ok', pandas_udf(f=functools.partial(ok, 7), returnType='bigint')('id')).show()
     16 df.withColumn('ok', pandas_udf(f=functools.partial(ok, b=7), returnType='bigint')('id')).show()
---> 17 df.withColumn('ok', pandas_udf(f=functools.partial(ok, a=7), returnType='bigint')('id')).show()

/Users/stu/ZZ/spark/python/pyspark/sql/functions.py in pandas_udf(f, returnType, functionType)
   2378         return functools.partial(_create_udf, returnType=return_type, evalType=eval_type)
   2379     else:
-> 2380         return _create_udf(f=f, returnType=return_type, evalType=eval_type)
   2381
   2382

/Users/stu/ZZ/spark/python/pyspark/sql/udf.py in _create_udf(f, returnType, evalType)
     54                 argspec.varargs is None:
     55             raise ValueError(
---> 56                 "Invalid function: 0-arg pandas_udfs are not supported. "
     57                 "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
     58             )

ValueError: Invalid function: 0-arg pandas_udfs are not supported. Instead, create a 1-arg pandas_udf and ignore the arg in your function.
```

Author: Michael (Stu) Stewart <mstewart141@gmail.com>

Closes #20900 from mstewart141/udfkw2.
2018-03-26 12:45:45 +09:00
Bryan Cutler a9350d7095 [SPARK-23700][PYTHON] Cleanup imports in pyspark.sql
## What changes were proposed in this pull request?

This cleans up unused imports, mainly from pyspark.sql module.  Added a note in function.py that imports `UserDefinedFunction` only to maintain backwards compatibility for using `from pyspark.sql.function import UserDefinedFunction`.

## How was this patch tested?

Existing tests and built docs.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #20892 from BryanCutler/pyspark-cleanup-imports-SPARK-23700.
2018-03-26 12:42:32 +09:00
hyukjinkwon a649fcf32a [MINOR][PYTHON] Remove unused codes in schema parsing logics of PySpark
## What changes were proposed in this pull request?

This PR proposes to remove out unused codes, `_ignore_brackets_split` and `_BRACKETS`.

`_ignore_brackets_split` was introduced in d57daf1f77 to refactor and support `toDF("...")`; however, ebc124d4c4 replaced the logics here. Seems `_ignore_brackets_split` is not referred anymore.

`_BRACKETS` was introduced in 880eabec37; however, all other usages were removed out in 648a8626b8.

This is rather a followup for ebc124d4c4 which I missed in that PR.

## How was this patch tested?

Manually tested. Existing tests should cover this. I also double checked by `grep` in the whole repo.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #20878 from HyukjinKwon/minor-remove-unused.
2018-03-22 21:20:41 -07:00
hyukjinkwon 8d79113b81 [SPARK-23577][SQL] Supports custom line separator for text datasource
## What changes were proposed in this pull request?

This PR proposes to add `lineSep` option for a configurable line separator in text datasource.

It supports this option by using `LineRecordReader`'s functionality with passing it to the constructor.

## How was this patch tested?

Manual tests and unit tests were added.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20727 from HyukjinKwon/linesep-text.
2018-03-21 09:46:47 -07:00
hyukjinkwon 566321852b [SPARK-23691][PYTHON] Use sql_conf util in PySpark tests where possible
## What changes were proposed in this pull request?

d6632d185e added an useful util

```python
contextmanager
def sql_conf(self, pairs):
    ...
```

to allow configuration set/unset within a block:

```python
with self.sql_conf({"spark.blah.blah.blah", "blah"})
    # test codes
```

This PR proposes to use this util where possible in PySpark tests.

Note that there look already few places affecting tests without restoring the original value back in unittest classes.

## How was this patch tested?

Manually tested via:

```
./run-tests --modules=pyspark-sql --python-executables=python2
./run-tests --modules=pyspark-sql --python-executables=python3
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20830 from HyukjinKwon/cleanup-sql-conf.
2018-03-19 21:25:37 -07:00
hyukjinkwon 61487b308b [SPARK-23706][PYTHON] spark.conf.get(value, default=None) should produce None in PySpark
## What changes were proposed in this pull request?

Scala:

```
scala> spark.conf.get("hey", null)
res1: String = null
```

```
scala> spark.conf.get("spark.sql.sources.partitionOverwriteMode", null)
res2: String = null
```

Python:

**Before**

```
>>> spark.conf.get("hey", None)
...
py4j.protocol.Py4JJavaError: An error occurred while calling o30.get.
: java.util.NoSuchElementException: hey
...
```

```
>>> spark.conf.get("spark.sql.sources.partitionOverwriteMode", None)
u'STATIC'
```

**After**

```
>>> spark.conf.get("hey", None) is None
True
```

```
>>> spark.conf.get("spark.sql.sources.partitionOverwriteMode", None) is None
True
```

*Note that this PR preserves the case below:

```
>>> spark.conf.get("spark.sql.sources.partitionOverwriteMode")
u'STATIC'
```

## How was this patch tested?

Manually tested and unit tests were added.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20841 from HyukjinKwon/spark-conf-get.
2018-03-18 20:24:14 +09:00
Dongjoon Hyun 5414abca4f [SPARK-23553][TESTS] Tests should not assume the default value of spark.sql.sources.default
## What changes were proposed in this pull request?

Currently, some tests have an assumption that `spark.sql.sources.default=parquet`. In fact, that is a correct assumption, but that assumption makes it difficult to test new data source format.

This PR aims to
- Improve test suites more robust and makes it easy to test new data sources in the future.
- Test new native ORC data source with the full existing Apache Spark test coverage.

As an example, the PR uses `spark.sql.sources.default=orc` during reviews. The value should be `parquet` when this PR is accepted.

## How was this patch tested?

Pass the Jenkins with updated tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #20705 from dongjoon-hyun/SPARK-23553.
2018-03-16 09:36:30 -07:00
Benjamin Peterson 7013eea11c [SPARK-23522][PYTHON] always use sys.exit over builtin exit
The exit() builtin is only for interactive use. applications should use sys.exit().

## What changes were proposed in this pull request?

All usage of the builtin `exit()` function is replaced by `sys.exit()`.

## How was this patch tested?

I ran `python/run-tests`.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Benjamin Peterson <benjamin@python.org>

Closes #20682 from benjaminp/sys-exit.
2018-03-08 20:38:34 +09:00
Li Jin 2cb23a8f51 [SPARK-23011][SQL][PYTHON] Support alternative function form with group aggregate pandas UDF
## What changes were proposed in this pull request?

This PR proposes to support an alternative function from with group aggregate pandas UDF.

The current form:
```
def foo(pdf):
    return ...
```
Takes a single arg that is a pandas DataFrame.

With this PR, an alternative form is supported:
```
def foo(key, pdf):
    return ...
```
The alternative form takes two argument - a tuple that presents the grouping key, and a pandas DataFrame represents the data.

## How was this patch tested?

GroupbyApplyTests

Author: Li Jin <ice.xelloss@gmail.com>

Closes #20295 from icexelloss/SPARK-23011-groupby-apply-key.
2018-03-08 20:29:07 +09:00
hyukjinkwon d6632d185e [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in toPandas/createDataFrame with Pandas DataFrame
## What changes were proposed in this pull request?

This PR adds a configuration to control the fallback of Arrow optimization for `toPandas` and `createDataFrame` with Pandas DataFrame.

## How was this patch tested?

Manually tested and unit tests added.

You can test this by:

**`createDataFrame`**

```python
spark.conf.set("spark.sql.execution.arrow.enabled", False)
pdf = spark.createDataFrame([[{'a': 1}]]).toPandas()
spark.conf.set("spark.sql.execution.arrow.enabled", True)
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", True)
spark.createDataFrame(pdf, "a: map<string, int>")
```

```python
spark.conf.set("spark.sql.execution.arrow.enabled", False)
pdf = spark.createDataFrame([[{'a': 1}]]).toPandas()
spark.conf.set("spark.sql.execution.arrow.enabled", True)
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", False)
spark.createDataFrame(pdf, "a: map<string, int>")
```

**`toPandas`**

```python
spark.conf.set("spark.sql.execution.arrow.enabled", True)
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", True)
spark.createDataFrame([[{'a': 1}]]).toPandas()
```

```python
spark.conf.set("spark.sql.execution.arrow.enabled", True)
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", False)
spark.createDataFrame([[{'a': 1}]]).toPandas()
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20678 from HyukjinKwon/SPARK-23380-conf.
2018-03-08 20:22:07 +09:00
Mihaly Toth a366b950b9 [SPARK-23329][SQL] Fix documentation of trigonometric functions
## What changes were proposed in this pull request?

Provide more details in trigonometric function documentations. Referenced `java.lang.Math` for further details in the descriptions.
## How was this patch tested?

Ran full build, checked generated documentation manually

Author: Mihaly Toth <misutoth@gmail.com>

Closes #20618 from misutoth/trigonometric-doc.
2018-03-05 23:46:40 +09:00
Anirudh 5ff72ffcf4 [SPARK-23566][MINOR][DOC] Argument name mismatch fixed
Argument name mismatch fixed.

## What changes were proposed in this pull request?

`col` changed to `new` in doc string to match the argument list.

Patch file added: https://issues.apache.org/jira/browse/SPARK-23566

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Anirudh <animenon@mail.com>

Closes #20716 from animenon/master.
2018-03-05 23:17:16 +09:00
Michael (Stu) Stewart 7965c91d8a [SPARK-23569][PYTHON] Allow pandas_udf to work with python3 style type-annotated functions
## What changes were proposed in this pull request?

Check python version to determine whether to use `inspect.getargspec` or `inspect.getfullargspec` before applying `pandas_udf` core logic to a function. The former is python2.7 (deprecated in python3) and the latter is python3.x. The latter correctly accounts for type annotations, which are syntax errors in python2.x.

## How was this patch tested?

Locally, on python 2.7 and 3.6.

Author: Michael (Stu) Stewart <mstewart141@gmail.com>

Closes #20728 from mstewart141/pandas_udf_fix.
2018-03-05 13:36:42 +09:00
Liang-Chi Hsieh b14993e1fc [SPARK-23448][SQL] Clarify JSON and CSV parser behavior in document
## What changes were proposed in this pull request?

Clarify JSON and CSV reader behavior in document.

JSON doesn't support partial results for corrupted records.
CSV only supports partial results for the records with more or less tokens.

## How was this patch tested?

Pass existing tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #20666 from viirya/SPARK-23448-2.
2018-02-28 11:00:54 +09:00
hyukjinkwon c5857e496f [SPARK-23446][PYTHON] Explicitly check supported types in toPandas
## What changes were proposed in this pull request?

This PR explicitly specifies and checks the types we supported in `toPandas`. This was a hole. For example, we haven't finished the binary type support in Python side yet but now it allows as below:

```python
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
df = spark.createDataFrame([[bytearray("a")]])
df.toPandas()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df.toPandas()
```

```
     _1
0  [97]
  _1
0  a
```

This should be disallowed. I think the same things also apply to nested timestamps too.

I also added some nicer message about `spark.sql.execution.arrow.enabled` in the error message.

## How was this patch tested?

Manually tested and tests added in `python/pyspark/sql/tests.py`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20625 from HyukjinKwon/pandas_convertion_supported_type.
2018-02-16 09:41:17 -08:00
gatorsmile 407f672496 [SPARK-20090][FOLLOW-UP] Revert the deprecation of names in PySpark
## What changes were proposed in this pull request?
Deprecating the field `name` in PySpark is not expected. This PR is to revert the change.

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20595 from gatorsmile/removeDeprecate.
2018-02-13 15:05:13 +09:00
hyukjinkwon c338c8cf82 [SPARK-23352][PYTHON] Explicitly specify supported types in Pandas UDFs
## What changes were proposed in this pull request?

This PR targets to explicitly specify supported types in Pandas UDFs.
The main change here is to add a deduplicated and explicit type checking in `returnType` ahead with documenting this; however, it happened to fix multiple things.

1. Currently, we don't support `BinaryType` in Pandas UDFs, for example, see:

    ```python
    from pyspark.sql.functions import pandas_udf
    pudf = pandas_udf(lambda x: x, "binary")
    df = spark.createDataFrame([[bytearray(1)]])
    df.select(pudf("_1")).show()
    ```
    ```
    ...
    TypeError: Unsupported type in conversion to Arrow: BinaryType
    ```

    We can document this behaviour for its guide.

2. Also, the grouped aggregate Pandas UDF fails fast on `ArrayType` but seems we can support this case.

    ```python
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    foo = pandas_udf(lambda v: v.mean(), 'array<double>', PandasUDFType.GROUPED_AGG)
    df = spark.range(100).selectExpr("id", "array(id) as value")
    df.groupBy("id").agg(foo("value")).show()
    ```

    ```
    ...
     NotImplementedError: ArrayType, StructType and MapType are not supported with PandasUDFType.GROUPED_AGG
    ```

3. Since we can check the return type ahead, we can fail fast before actual execution.

    ```python
    # we can fail fast at this stage because we know the schema ahead
    pandas_udf(lambda x: x, BinaryType())
    ```

## How was this patch tested?

Manually tested and unit tests for `BinaryType` and `ArrayType(...)` were added.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20531 from HyukjinKwon/pudf-cleanup.
2018-02-12 20:49:36 +09:00
xubo245 eacb62fbbe [SPARK-22624][PYSPARK] Expose range partitioning shuffle introduced by spark-22614
## What changes were proposed in this pull request?

 Expose range partitioning shuffle introduced by spark-22614

## How was this patch tested?

Unit test in dataframe.py

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: xubo245 <601450868@qq.com>

Closes #20456 from xubo245/SPARK22624_PysparkRangePartition.
2018-02-11 19:23:15 +09:00
Huaxin Gao 8acb51f08b [SPARK-23084][PYTHON] Add unboundedPreceding(), unboundedFollowing() and currentRow() to PySpark
## What changes were proposed in this pull request?

Added unboundedPreceding(), unboundedFollowing() and currentRow() to PySpark, also updated the rangeBetween API

## How was this patch tested?

did unit test on my local. Please let me know if I need to add unit test in tests.py

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #20400 from huaxingao/spark_23084.
2018-02-11 18:55:38 +09:00
Li Jin a34fce19bc [SPARK-23314][PYTHON] Add ambiguous=False when localizing tz-naive timestamps in Arrow codepath to deal with dst
## What changes were proposed in this pull request?
When tz_localize a tz-naive timetamp, pandas will throw exception if the timestamp is during daylight saving time period, e.g., `2015-11-01 01:30:00`. This PR fixes this issue by setting `ambiguous=False` when calling tz_localize, which is the same default behavior of pytz.

## How was this patch tested?
Add `test_timestamp_dst`

Author: Li Jin <ice.xelloss@gmail.com>

Closes #20537 from icexelloss/SPARK-23314.
2018-02-11 17:31:35 +09:00
Takuya UESHIN 97a224a855 [SPARK-23360][SQL][PYTHON] Get local timezone from environment via pytz, or dateutil.
## What changes were proposed in this pull request?

Currently we use `tzlocal()` to get Python local timezone, but it sometimes causes unexpected behavior.
I changed the way to get Python local timezone to use pytz if the timezone is specified in environment variable, or timezone file via dateutil .

## How was this patch tested?

Added a test and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #20559 from ueshin/issues/SPARK-23360/master.
2018-02-11 01:08:02 +09:00
hyukjinkwon 4b4ee26010 [SPARK-23328][PYTHON] Disallow default value None in na.replace/replace when 'to_replace' is not a dictionary
## What changes were proposed in this pull request?

This PR proposes to disallow default value None when 'to_replace' is not a dictionary.

It seems weird we set the default value of `value` to `None` and we ended up allowing the case as below:

```python
>>> df.show()
```
```
+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|Alice|
...
```

```python
>>> df.na.replace('Alice').show()
```
```
+----+------+----+
| age|height|name|
+----+------+----+
|  10|    80|null|
...
```

**After**

This PR targets to disallow the case above:

```python
>>> df.na.replace('Alice').show()
```
```
...
TypeError: value is required when to_replace is not a dictionary.
```

while we still allow when `to_replace` is a dictionary:

```python
>>> df.na.replace({'Alice': None}).show()
```
```
+----+------+----+
| age|height|name|
+----+------+----+
|  10|    80|null|
...
```

## How was this patch tested?

Manually tested, tests were added in `python/pyspark/sql/tests.py` and doctests were fixed.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20499 from HyukjinKwon/SPARK-19454-followup.
2018-02-09 14:21:10 +08:00
Takuya UESHIN a62f30d3fa [SPARK-23319][TESTS][FOLLOWUP] Fix a test for Python 3 without pandas.
## What changes were proposed in this pull request?

This is a followup pr of #20487.

When importing module but it doesn't exists, the error message is slightly different between Python 2 and 3.

E.g., in Python 2:

```
No module named pandas
```

in Python 3:

```
No module named 'pandas'
```

So, one test to check an import error fails in Python 3 without pandas.

This pr fixes it.

## How was this patch tested?

Tested manually in my local environment.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #20538 from ueshin/issues/SPARK-23319/fup1.
2018-02-08 12:46:10 +09:00
hyukjinkwon 71cfba04ae [SPARK-23319][TESTS] Explicitly specify Pandas and PyArrow versions in PySpark tests (to skip or test)
## What changes were proposed in this pull request?

This PR proposes to explicitly specify Pandas and PyArrow versions in PySpark tests to skip or test.

We declared the extra dependencies:

b8bfce51ab/python/setup.py (L204)

In case of PyArrow:

Currently we only check if pyarrow is installed or not without checking the version. It already fails to run tests. For example, if PyArrow 0.7.0 is installed:

```
======================================================================
ERROR: test_vectorized_udf_wrong_return_type (pyspark.sql.tests.ScalarPandasUDF)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/.../spark/python/pyspark/sql/tests.py", line 4019, in test_vectorized_udf_wrong_return_type
    f = pandas_udf(lambda x: x * 1.0, MapType(LongType(), LongType()))
  File "/.../spark/python/pyspark/sql/functions.py", line 2309, in pandas_udf
    return _create_udf(f=f, returnType=return_type, evalType=eval_type)
  File "/.../spark/python/pyspark/sql/udf.py", line 47, in _create_udf
    require_minimum_pyarrow_version()
  File "/.../spark/python/pyspark/sql/utils.py", line 132, in require_minimum_pyarrow_version
    "however, your version was %s." % pyarrow.__version__)
ImportError: pyarrow >= 0.8.0 must be installed on calling Python process; however, your version was 0.7.0.

----------------------------------------------------------------------
Ran 33 tests in 8.098s

FAILED (errors=33)
```

In case of Pandas:

There are few tests for old Pandas which were tested only when Pandas version was lower, and I rewrote them to be tested when both Pandas version is lower and missing.

## How was this patch tested?

Manually tested by modifying the condition:

```
test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 1.19.2 must be installed; however, your version was 0.19.2.'
test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 1.19.2 must be installed; however, your version was 0.19.2.'
test_createDataFrame_respect_session_timezone (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 1.19.2 must be installed; however, your version was 0.19.2.'
```

```
test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.'
test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.'
test_createDataFrame_respect_session_timezone (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.'
```

```
test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 1.8.0 must be installed; however, your version was 0.8.0.'
test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 1.8.0 must be installed; however, your version was 0.8.0.'
test_createDataFrame_respect_session_timezone (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 1.8.0 must be installed; however, your version was 0.8.0.'
```

```
test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.'
test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.'
test_createDataFrame_respect_session_timezone (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.'
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20487 from HyukjinKwon/pyarrow-pandas-skip.
2018-02-07 23:28:10 +09:00
gatorsmile 9775df67f9 [SPARK-23122][PYSPARK][FOLLOWUP] Replace registerTempTable by createOrReplaceTempView
## What changes were proposed in this pull request?
Replace `registerTempTable` by `createOrReplaceTempView`.

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20523 from gatorsmile/updateExamples.
2018-02-07 23:24:16 +09:00
gatorsmile c36fecc3b4 [SPARK-23327][SQL] Update the description and tests of three external API or functions
## What changes were proposed in this pull request?
Update the description and tests of three external API or functions `createFunction `, `length` and `repartitionByRange `

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20495 from gatorsmile/updateFunc.
2018-02-06 16:46:43 -08:00
Li Jin caf3044563 [MINOR][TEST] Fix class name for Pandas UDF tests
## What changes were proposed in this pull request?

In b2ce17b4c9, I mistakenly renamed `VectorizedUDFTests` to `ScalarPandasUDF`. This PR fixes the mistake.

## How was this patch tested?

Existing tests.

Author: Li Jin <ice.xelloss@gmail.com>

Closes #20489 from icexelloss/fix-scalar-udf-tests.
2018-02-06 12:30:04 -08:00
Takuya UESHIN 63c5bf13ce [SPARK-23334][SQL][PYTHON] Fix pandas_udf with return type StringType() to handle str type properly in Python 2.
## What changes were proposed in this pull request?

In Python 2, when `pandas_udf` tries to return string type value created in the udf with `".."`, the execution fails. E.g.,

```python
from pyspark.sql.functions import pandas_udf, col
import pandas as pd

df = spark.range(10)
str_f = pandas_udf(lambda x: pd.Series(["%s" % i for i in x]), "string")
df.select(str_f(col('id'))).show()
```

raises the following exception:

```
...

java.lang.AssertionError: assertion failed: Invalid schema from pandas_udf: expected StringType, got BinaryType
	at scala.Predef$.assert(Predef.scala:170)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:93)

...
```

Seems like pyarrow ignores `type` parameter for `pa.Array.from_pandas()` and consider it as binary type when the type is string type and the string values are `str` instead of `unicode` in Python 2.

This pr adds a workaround for the case.

## How was this patch tested?

Added a test and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #20507 from ueshin/issues/SPARK-23334.
2018-02-06 18:30:50 +09:00
Takuya UESHIN a24c03138a [SPARK-23290][SQL][PYTHON] Use datetime.date for date type when converting Spark DataFrame to Pandas DataFrame.
## What changes were proposed in this pull request?

In #18664, there was a change in how `DateType` is being returned to users ([line 1968 in dataframe.py](https://github.com/apache/spark/pull/18664/files#diff-6fc344560230bf0ef711bb9b5573f1faR1968)). This can cause client code which works in Spark 2.2 to fail.
See [SPARK-23290](https://issues.apache.org/jira/browse/SPARK-23290?focusedCommentId=16350917&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16350917) for an example.

This pr modifies to use `datetime.date` for date type as Spark 2.2 does.

## How was this patch tested?

Tests modified to fit the new behavior and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #20506 from ueshin/issues/SPARK-23290.
2018-02-06 14:52:25 +08:00
hyukjinkwon 551dff2bcc [SPARK-21658][SQL][PYSPARK] Revert "[] Add default None for value in na.replace in PySpark"
This reverts commit 0fcde87aad.

See the discussion in [SPARK-21658](https://issues.apache.org/jira/browse/SPARK-21658),  [SPARK-19454](https://issues.apache.org/jira/browse/SPARK-19454) and https://github.com/apache/spark/pull/16793

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20496 from HyukjinKwon/revert-SPARK-21658.
2018-02-03 10:40:21 -08:00
Takuya UESHIN 07cee33736 [SPARK-22274][PYTHON][SQL][FOLLOWUP] Use assertRaisesRegexp instead of assertRaisesRegex.
## What changes were proposed in this pull request?

This is a follow-up pr of #19872 which uses `assertRaisesRegex` but it doesn't exist in Python 2, so some tests fail when running tests in Python 2 environment.
Unfortunately, we missed it because currently Python 2 environment of the pr builder doesn't have proper versions of pandas or pyarrow, so the tests were skipped.

This pr modifies to use `assertRaisesRegexp` instead of `assertRaisesRegex`.

## How was this patch tested?

Tested manually in my local environment.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #20467 from ueshin/issues/SPARK-22274/fup1.
2018-01-31 22:26:27 -08:00
Henry Robinson f470df2fcf [SPARK-23157][SQL][FOLLOW-UP] DataFrame -> SparkDataFrame in R comment
Author: Henry Robinson <henry@cloudera.com>

Closes #20443 from henryr/SPARK-23157.
2018-02-01 11:15:17 +09:00
jerryshao 3d0911bbe4 [SPARK-23228][PYSPARK] Add Python Created jsparkSession to JVM's defaultSession
## What changes were proposed in this pull request?

In the current PySpark code, Python created `jsparkSession` doesn't add to JVM's defaultSession, this `SparkSession` object cannot be fetched from Java side, so the below scala code will be failed when loaded in PySpark application.

```scala
class TestSparkSession extends SparkListener with Logging {
  override def onOtherEvent(event: SparkListenerEvent): Unit = {
    event match {
      case CreateTableEvent(db, table) =>
        val session = SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession)
        assert(session.isDefined)
        val tableInfo = session.get.sharedState.externalCatalog.getTable(db, table)
        logInfo(s"Table info ${tableInfo}")

      case e =>
        logInfo(s"event $e")

    }
  }
}
```

So here propose to add fresh create `jsparkSession` to `defaultSession`.

## How was this patch tested?

Manual verification.

Author: jerryshao <sshao@hortonworks.com>
Author: hyukjinkwon <gurwls223@gmail.com>
Author: Saisai Shao <sai.sai.shao@gmail.com>

Closes #20404 from jerryshao/SPARK-23228.
2018-01-31 20:04:51 +09:00
gatorsmile 7a2ada223e [SPARK-23261][PYSPARK] Rename Pandas UDFs
## What changes were proposed in this pull request?
Rename the public APIs and names of pandas udfs.

- `PANDAS SCALAR UDF` -> `SCALAR PANDAS UDF`
- `PANDAS GROUP MAP UDF` -> `GROUPED MAP PANDAS UDF`
- `PANDAS GROUP AGG UDF` -> `GROUPED AGG PANDAS UDF`

## How was this patch tested?
The existing tests

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20428 from gatorsmile/renamePandasUDFs.
2018-01-30 21:55:55 +09:00
Henry Robinson 8b983243e4 [SPARK-23157][SQL] Explain restriction on column expression in withColumn()
## What changes were proposed in this pull request?

It's not obvious from the comments that any added column must be a
function of the dataset that we are adding it to. Add a comment to
that effect to Scala, Python and R Data* methods.

Author: Henry Robinson <henry@cloudera.com>

Closes #20429 from henryr/SPARK-23157.
2018-01-29 22:19:59 -08:00
hyukjinkwon 3227d14feb [SPARK-23233][PYTHON] Reset the cache in asNondeterministic to set deterministic properly
## What changes were proposed in this pull request?

Reproducer:

```python
from pyspark.sql.functions import udf
f = udf(lambda x: x)
spark.range(1).select(f("id"))  # cache JVM UDF instance.
f = f.asNondeterministic()
spark.range(1).select(f("id"))._jdf.logicalPlan().projectList().head().deterministic()
```

It should return `False` but the current master returns `True`. Seems it's because we cache the JVM UDF instance and then we reuse it even after setting `deterministic` disabled once it's called.

## How was this patch tested?

Manually tested. I am not sure if I should add the test with a lot of JVM accesses with the intetnal stuff .. Let me know if anyone feels so. I will add.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20409 from HyukjinKwon/SPARK-23233.
2018-01-27 11:26:09 -08:00
Huaxin Gao 8480c0c576 [SPARK-23081][PYTHON] Add colRegex API to PySpark
## What changes were proposed in this pull request?

Add colRegex API to PySpark

## How was this patch tested?

add a test in sql/tests.py

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #20390 from huaxingao/spark-23081.
2018-01-26 07:50:48 +09:00
Liang-Chi Hsieh a3911cf896 [SPARK-23177][SQL][PYSPARK] Extract zero-parameter UDFs from aggregate
## What changes were proposed in this pull request?

We extract Python UDFs in logical aggregate which depends on aggregate expression or grouping key in ExtractPythonUDFFromAggregate rule. But Python UDFs which don't depend on above expressions should also be extracted to avoid the issue reported in the JIRA.

A small code snippet to reproduce that issue looks like:
```python
import pyspark.sql.functions as f

df = spark.createDataFrame([(1,2), (3,4)])
f_udf = f.udf(lambda: str("const_str"))
df2 = df.distinct().withColumn("a", f_udf())
df2.show()
```

Error exception is raised as:
```
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF0#50
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:91)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:90)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:90)
        at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:514)
        at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:513)
```

This exception raises because `HashAggregateExec` tries to bind the aliased Python UDF expression (e.g., `pythonUDF0#50 AS a#44`) to grouping key.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #20360 from viirya/SPARK-23177.
2018-01-24 11:43:48 +09:00
Li Jin b2ce17b4c9 [SPARK-22274][PYTHON][SQL] User-defined aggregation functions with pandas udf (full shuffle)
## What changes were proposed in this pull request?

Add support for using pandas UDFs with groupby().agg().

This PR introduces a new type of pandas UDF - group aggregate pandas UDF. This type of UDF defines a transformation of multiple pandas Series -> a scalar value. Group aggregate pandas UDFs can be used with groupby().agg(). Note group aggregate pandas UDF doesn't support partial aggregation, i.e., a full shuffle is required.

This PR doesn't support group aggregate pandas UDFs that return ArrayType, StructType or MapType. Support for these types is left for future PR.

## How was this patch tested?

GroupbyAggPandasUDFTests

Author: Li Jin <ice.xelloss@gmail.com>

Closes #19872 from icexelloss/SPARK-22274-groupby-agg.
2018-01-23 14:11:30 +09:00
gatorsmile 73281161fc [SPARK-23122][PYSPARK][FOLLOW-UP] Update the docs for UDF Registration
## What changes were proposed in this pull request?

This PR is to update the docs for UDF registration

## How was this patch tested?

N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20348 from gatorsmile/testUpdateDoc.
2018-01-22 04:27:59 -08:00
Takuya UESHIN 568055da93 [SPARK-23054][SQL][PYSPARK][FOLLOWUP] Use sqlType casting when casting PythonUserDefinedType to String.
## What changes were proposed in this pull request?

This is a follow-up of #20246.

If a UDT in Python doesn't have its corresponding Scala UDT, cast to string will be the raw string of the internal value, e.g. `"org.apache.spark.sql.catalyst.expressions.UnsafeArrayDataxxxxxxxx"` if the internal type is `ArrayType`.

This pr fixes it by using its `sqlType` casting.

## How was this patch tested?

Added a test and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #20306 from ueshin/issues/SPARK-23054/fup1.
2018-01-19 11:37:08 +08:00
Tathagata Das 2d41f040a3 [SPARK-23143][SS][PYTHON] Added python API for setting continuous trigger
## What changes were proposed in this pull request?
Self-explanatory.

## How was this patch tested?
New python tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #20309 from tdas/SPARK-23143.
2018-01-18 12:25:52 -08:00
Takuya UESHIN 5063b74811 [SPARK-23141][SQL][PYSPARK] Support data type string as a returnType for registerJavaFunction.
## What changes were proposed in this pull request?

Currently `UDFRegistration.registerJavaFunction` doesn't support data type string as a `returnType` whereas `UDFRegistration.register`, `udf`, or `pandas_udf` does.
We can support it for `UDFRegistration.registerJavaFunction` as well.

## How was this patch tested?

Added a doctest and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #20307 from ueshin/issues/SPARK-23141.
2018-01-18 22:33:04 +09:00
hyukjinkwon 39d244d921 [SPARK-23122][PYTHON][SQL] Deprecate register* for UDFs in SQLContext and Catalog in PySpark
## What changes were proposed in this pull request?

This PR proposes to deprecate `register*` for UDFs in `SQLContext` and `Catalog` in Spark 2.3.0.

These are inconsistent with Scala / Java APIs and also these basically do the same things with `spark.udf.register*`.

Also, this PR moves the logcis from `[sqlContext|spark.catalog].register*` to `spark.udf.register*` and reuse the docstring.

This PR also handles minor doc corrections. It also includes https://github.com/apache/spark/pull/20158

## How was this patch tested?

Manually tested, manually checked the API documentation and tests added to check if deprecated APIs call the aliases correctly.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20288 from HyukjinKwon/deprecate-udf.
2018-01-18 14:51:05 +09:00
Henry Robinson 1f3d933e0b [SPARK-23062][SQL] Improve EXCEPT documentation
## What changes were proposed in this pull request?

Make the default behavior of EXCEPT (i.e. EXCEPT DISTINCT) more
explicit in the documentation, and call out the change in behavior
from 1.x.

Author: Henry Robinson <henry@cloudera.com>

Closes #20254 from henryr/spark-23062.
2018-01-17 16:01:41 +08:00
gatorsmile b85eb946ac [SPARK-22978][PYSPARK] Register Vectorized UDFs for SQL Statement
## What changes were proposed in this pull request?
Register Vectorized UDFs for SQL Statement. For example,

```Python
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> pandas_udf("integer", PandasUDFType.SCALAR)
... def add_one(x):
...     return x + 1
...
>>> _ = spark.udf.register("add_one", add_one)
>>> spark.sql("SELECT add_one(id) FROM range(3)").collect()
[Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
```

## How was this patch tested?
Added test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20171 from gatorsmile/supportVectorizedUDF.
2018-01-16 20:20:33 +09:00
Takeshi Yamamuro b59808385c [SPARK-23023][SQL] Cast field data to strings in showString
## What changes were proposed in this pull request?
The current `Datset.showString` prints rows thru `RowEncoder` deserializers like;
```
scala> Seq(Seq(Seq(1, 2), Seq(3), Seq(4, 5, 6))).toDF("a").show(false)
+------------------------------------------------------------+
|a                                                           |
+------------------------------------------------------------+
|[WrappedArray(1, 2), WrappedArray(3), WrappedArray(4, 5, 6)]|
+------------------------------------------------------------+
```
This result is incorrect because the correct one is;
```
scala> Seq(Seq(Seq(1, 2), Seq(3), Seq(4, 5, 6))).toDF("a").show(false)
+------------------------+
|a                       |
+------------------------+
|[[1, 2], [3], [4, 5, 6]]|
+------------------------+
```
So, this pr fixed code in `showString` to cast field data to strings before printing.

## How was this patch tested?
Added tests in `DataFrameSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20214 from maropu/SPARK-23023.
2018-01-15 16:26:52 +08:00
hyukjinkwon cd9f49a2ae [SPARK-22980][PYTHON][SQL] Clarify the length of each series is of each batch within scalar Pandas UDF
## What changes were proposed in this pull request?

This PR proposes to add a note that saying the length of a scalar Pandas UDF's `Series` is not of the whole input column but of the batch.

We are fine for a group map UDF because the usage is different from our typical UDF but scalar UDFs might cause confusion with the normal UDF.

For example, please consider this example:

```python
from pyspark.sql.functions import pandas_udf, col, lit

df = spark.range(1)
f = pandas_udf(lambda x, y: len(x) + y, LongType())
df.select(f(lit('text'), col('id'))).show()
```

```
+------------------+
|<lambda>(text, id)|
+------------------+
|                 1|
+------------------+
```

```python
from pyspark.sql.functions import udf, col, lit

df = spark.range(1)
f = udf(lambda x, y: len(x) + y, "long")
df.select(f(lit('text'), col('id'))).show()
```

```
+------------------+
|<lambda>(text, id)|
+------------------+
|                 4|
+------------------+
```

## How was this patch tested?

Manually built the doc and checked the output.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20237 from HyukjinKwon/SPARK-22980.
2018-01-13 16:13:44 +09:00
Bryan Cutler e599837248 [SPARK-23009][PYTHON] Fix for non-str col names to createDataFrame from Pandas
## What changes were proposed in this pull request?

This the case when calling `SparkSession.createDataFrame` using a Pandas DataFrame that has non-str column labels.

The column name conversion logic to handle non-string or unicode in python2 is:
```
if column is not any type of string:
    name = str(column)
else if column is unicode in Python 2:
    name = column.encode('utf-8')
```

## How was this patch tested?

Added a new test with a Pandas DataFrame that has int column labels

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #20210 from BryanCutler/python-createDataFrame-int-col-error-SPARK-23009.
2018-01-10 14:55:24 +09:00
Bryan Cutler 7bcc266681 [SPARK-23018][PYTHON] Fix createDataFrame from Pandas timestamp series assignment
## What changes were proposed in this pull request?

This fixes createDataFrame from Pandas to only assign modified timestamp series back to a copied version of the Pandas DataFrame.  Previously, if the Pandas DataFrame was only a reference (e.g. a slice of another) each series will still get assigned back to the reference even if it is not a modified timestamp column.  This caused the following warning "SettingWithCopyWarning: A value is trying to be set on a copy of a slice from a DataFrame."

## How was this patch tested?

existing tests

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #20213 from BryanCutler/pyspark-createDataFrame-copy-slice-warn-SPARK-23018.
2018-01-10 14:00:07 +09:00
Guilherme Berger 3e40eb3f1f [SPARK-22566][PYTHON] Better error message for _merge_type in Pandas to Spark DF conversion
## What changes were proposed in this pull request?

It provides a better error message when doing `spark_session.createDataFrame(pandas_df)` with no schema and an error occurs in the schema inference due to incompatible types.

The Pandas column names are propagated down and the error message mentions which column had the merging error.

https://issues.apache.org/jira/browse/SPARK-22566

## How was this patch tested?

Manually in the `./bin/pyspark` console, and with new tests: `./python/run-tests`

<img width="873" alt="screen shot 2017-11-21 at 13 29 49" src="https://user-images.githubusercontent.com/3977115/33080121-382274e0-cecf-11e7-808f-057a65bb7b00.png">

I state that the contribution is my original work and that I license the work to the Apache Spark project under the project’s open source license.

Author: Guilherme Berger <gberger@palantir.com>

Closes #19792 from gberger/master.
2018-01-08 14:32:05 +09:00
hyukjinkwon 993f21567a [SPARK-22901][PYTHON][FOLLOWUP] Adds the doc for asNondeterministic for wrapped UDF function
## What changes were proposed in this pull request?

This PR wraps the `asNondeterministic` attribute in the wrapped UDF function to set the docstring properly.

```python
from pyspark.sql.functions import udf
help(udf(lambda x: x).asNondeterministic)
```

Before:

```
Help on function <lambda> in module pyspark.sql.udf:

<lambda> lambda
(END
```

After:

```
Help on function asNondeterministic in module pyspark.sql.udf:

asNondeterministic()
    Updates UserDefinedFunction to nondeterministic.

    .. versionadded:: 2.3
(END)
```

## How was this patch tested?

Manually tested and a simple test was added.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20173 from HyukjinKwon/SPARK-22901-followup.
2018-01-06 23:08:26 +08:00
Li Jin f2dd8b9237 [SPARK-22930][PYTHON][SQL] Improve the description of Vectorized UDFs for non-deterministic cases
## What changes were proposed in this pull request?

Add tests for using non deterministic UDFs in aggregate.

Update pandas_udf docstring w.r.t to determinism.

## How was this patch tested?
test_nondeterministic_udf_in_aggregate

Author: Li Jin <ice.xelloss@gmail.com>

Closes #20142 from icexelloss/SPARK-22930-pandas-udf-deterministic.
2018-01-06 16:11:20 +08:00
gatorsmile 5aadbc929c [SPARK-22939][PYSPARK] Support Spark UDF in registerFunction
## What changes were proposed in this pull request?
```Python
import random
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType
random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()
spark.catalog.registerFunction("random_udf", random_udf, StringType())
spark.sql("SELECT random_udf()").collect()
```

We will get the following error.
```
Py4JError: An error occurred while calling o29.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
```

This PR is to support it.

## How was this patch tested?
WIP

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20137 from gatorsmile/registerFunction.
2018-01-04 21:07:31 +08:00
Felix Cheung df95a908ba [SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, trigger, partitionBy
## What changes were proposed in this pull request?

R Structured Streaming API for withWatermark, trigger, partitionBy

## How was this patch tested?

manual, unit tests

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #20129 from felixcheung/rwater.
2018-01-03 21:43:14 -08:00
Bryan Cutler 1c9f95cb77 [SPARK-22530][PYTHON][SQL] Adding Arrow support for ArrayType
## What changes were proposed in this pull request?

This change adds `ArrayType` support for working with Arrow in pyspark when creating a DataFrame, calling `toPandas()`, and using vectorized `pandas_udf`.

## How was this patch tested?

Added new Python unit tests using Array data.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #20114 from BryanCutler/arrow-ArrayType-support-SPARK-22530.
2018-01-02 07:13:27 +09:00
Takeshi Yamamuro f2b3525c17 [SPARK-22771][SQL] Concatenate binary inputs into a binary output
## What changes were proposed in this pull request?
This pr modified `concat` to concat binary inputs into a single binary output.
`concat` in the current master always output data as a string. But, in some databases (e.g., PostgreSQL), if all inputs are binary, `concat` also outputs binary.

## How was this patch tested?
Added tests in `SQLQueryTestSuite` and `TypeCoercionSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #19977 from maropu/SPARK-22771.
2017-12-30 14:09:56 +08:00
Takuya UESHIN 11a849b3a7 [SPARK-22370][SQL][PYSPARK][FOLLOW-UP] Fix a test failure when xmlrunner is installed.
## What changes were proposed in this pull request?

This is a follow-up pr of #19587.

If `xmlrunner` is installed, `VectorizedUDFTests.test_vectorized_udf_check_config` fails by the following error because the `self` which is a subclass of `unittest.TestCase` in the UDF `check_records_per_batch` can't be pickled anymore.

```
PicklingError: Cannot pickle files that are not opened for reading: w
```

This changes the UDF not to refer the `self`.

## How was this patch tested?

Tested locally.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #20115 from ueshin/issues/SPARK-22370_fup1.
2017-12-29 23:04:28 +09:00
soonmok-kwon ffe6fd77a4 [SPARK-22818][SQL] csv escape of quote escape
## What changes were proposed in this pull request?

Escape of escape should be considered when using the UniVocity csv encoding/decoding library.

Ref: https://github.com/uniVocity/univocity-parsers#escaping-quote-escape-characters

One option is added for reading and writing CSV: `escapeQuoteEscaping`

## How was this patch tested?

Unit test added.

Author: soonmok-kwon <soonmok.kwon@navercorp.com>

Closes #20004 from ep1804/SPARK-22818.
2017-12-29 07:30:06 +08:00
Marco Gaido ff48b1b338 [SPARK-22901][PYTHON] Add deterministic flag to pyspark UDF
## What changes were proposed in this pull request?

In SPARK-20586 the flag `deterministic` was added to Scala UDF, but it is not available for python UDF. This flag is useful for cases when the UDF's code can return different result with the same input. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query. This can lead to unexpected behavior.

This PR adds the deterministic flag, via the `asNondeterministic` method, to let the user mark the function as non-deterministic and therefore avoid the optimizations which might lead to strange behaviors.

## How was this patch tested?

Manual tests:
```
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>> df_br = spark.createDataFrame([{'name': 'hello'}])
>>> import random
>>> udf_random_col =  udf(lambda: int(100*random.random()), IntegerType()).asNondeterministic()
>>> df_br = df_br.withColumn('RAND', udf_random_col())
>>> random.seed(1234)
>>> udf_add_ten =  udf(lambda rand: rand + 10, IntegerType())
>>> df_br.withColumn('RAND_PLUS_TEN', udf_add_ten('RAND')).show()
+-----+----+-------------+
| name|RAND|RAND_PLUS_TEN|
+-----+----+-------------+
|hello|   3|           13|
+-----+----+-------------+

```

Author: Marco Gaido <marcogaido91@gmail.com>
Author: Marco Gaido <mgaido@hortonworks.com>

Closes #19929 from mgaido91/SPARK-22629.
2017-12-26 06:39:40 -08:00
Takuya UESHIN eb386be1ed [SPARK-21552][SQL] Add DecimalType support to ArrowWriter.
## What changes were proposed in this pull request?

Decimal type is not yet supported in `ArrowWriter`.
This is adding the decimal type support.

## How was this patch tested?

Added a test to `ArrowConvertersSuite`.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18754 from ueshin/issues/SPARK-21552.
2017-12-26 21:37:25 +09:00
Takuya UESHIN 12d20dd75b [SPARK-22874][PYSPARK][SQL][FOLLOW-UP] Modify error messages to show actual versions.
## What changes were proposed in this pull request?

This is a follow-up pr of #20054 modifying error messages for both pandas and pyarrow to show actual versions.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #20074 from ueshin/issues/SPARK-22874_fup1.
2017-12-25 20:29:10 +09:00
Takuya UESHIN 13190a4f60 [SPARK-22874][PYSPARK][SQL] Modify checking pandas version to use LooseVersion.
## What changes were proposed in this pull request?

Currently we check pandas version by capturing if `ImportError` for the specific imports is raised or not but we can compare `LooseVersion` of the version strings as the same as we're checking pyarrow version.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #20054 from ueshin/issues/SPARK-22874.
2017-12-22 20:09:51 +09:00
Bryan Cutler 59d52631eb [SPARK-22324][SQL][PYTHON] Upgrade Arrow to 0.8.0
## What changes were proposed in this pull request?

Upgrade Spark to Arrow 0.8.0 for Java and Python.  Also includes an upgrade of Netty to 4.1.17 to resolve dependency requirements.

The highlights that pertain to Spark for the update from Arrow versoin 0.4.1 to 0.8.0 include:

* Java refactoring for more simple API
* Java reduced heap usage and streamlined hot code paths
* Type support for DecimalType, ArrayType
* Improved type casting support in Python
* Simplified type checking in Python

## How was this patch tested?

Existing tests

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #19884 from BryanCutler/arrow-upgrade-080-SPARK-22324.
2017-12-21 20:43:56 +09:00
Dongjoon Hyun 9962390af7 [SPARK-22781][SS] Support creating streaming dataset with ORC files
## What changes were proposed in this pull request?

Like `Parquet`, users can use `ORC` with Apache Spark structured streaming. This PR adds `orc()` to `DataStreamReader`(Scala/Python) in order to support creating streaming dataset with ORC file format more easily like the other file formats. Also, this adds a test coverage for ORC data source and updates the document.

**BEFORE**

```scala
scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
<console>:24: error: value orc is not a member of org.apache.spark.sql.streaming.DataStreamReader
       spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
```

**AFTER**
```scala
scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper678b3746

scala>
-------------------------------------------
Batch: 0
-------------------------------------------
+---+
|  a|
+---+
|  1|
+---+
```

## How was this patch tested?

Pass the newly added test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19975 from dongjoon-hyun/SPARK-22781.
2017-12-19 23:50:06 -08:00
Fernando Pereira 13268a58f8 [SPARK-22649][PYTHON][SQL] Adding localCheckpoint to Dataset API
## What changes were proposed in this pull request?

This change adds local checkpoint support to datasets and respective bind from Python Dataframe API.

If reliability requirements can be lowered to favor performance, as in cases of further quick transformations followed by a reliable save, localCheckpoints() fit very well.
Furthermore, at the moment Reliable checkpoints still incur double computation (see #9428)
In general it makes the API more complete as well.

## How was this patch tested?

Python land quick use case:

```python
>>> from time import sleep
>>> from pyspark.sql import types as T
>>> from pyspark.sql import functions as F

>>> def f(x):
    sleep(1)
    return x*2
   ...:

>>> df1 = spark.range(30, numPartitions=6)
>>> df2 = df1.select(F.udf(f, T.LongType())("id"))

>>> %time _ = df2.collect()
CPU times: user 7.79 ms, sys: 5.84 ms, total: 13.6 ms
Wall time: 12.2 s

>>> %time df3 = df2.localCheckpoint()
CPU times: user 2.38 ms, sys: 2.3 ms, total: 4.68 ms
Wall time: 10.3 s

>>> %time _ = df3.collect()
CPU times: user 5.09 ms, sys: 410 µs, total: 5.5 ms
Wall time: 148 ms

>>> sc.setCheckpointDir(".")
>>> %time df3 = df2.checkpoint()
CPU times: user 4.04 ms, sys: 1.63 ms, total: 5.67 ms
Wall time: 20.3 s
```

Author: Fernando Pereira <fernando.pereira@epfl.ch>

Closes #19805 from ferdonline/feature_dataset_localCheckpoint.
2017-12-19 20:47:12 -08:00
Youngbin Kim 6e36d8d562 [SPARK-22829] Add new built-in function date_trunc()
## What changes were proposed in this pull request?

Adding date_trunc() as a built-in function.
`date_trunc` is common in other databases, but Spark or Hive does not have support for this. `date_trunc` is commonly used by data scientists and business intelligence application such as Superset (https://github.com/apache/incubator-superset).
We do have `trunc` but this only works with 'MONTH' and 'YEAR' level on the DateType input.

date_trunc() in other databases:
AWS Redshift: http://docs.aws.amazon.com/redshift/latest/dg/r_DATE_TRUNC.html
PostgreSQL: https://www.postgresql.org/docs/9.1/static/functions-datetime.html
Presto: https://prestodb.io/docs/current/functions/datetime.html

## How was this patch tested?

Unit tests

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Youngbin Kim <ykim828@hotmail.com>

Closes #20015 from youngbink/date_trunc.
2017-12-19 20:22:33 -08:00
Prashant Sharma 40de176c93 [SPARK-16496][SQL] Add wholetext as option for reading text in SQL.
## What changes were proposed in this pull request?

In multiple text analysis problems, it is not often desirable for the rows to be split by "\n". There exists a wholeText reader for RDD API, and this JIRA just adds the same support for Dataset API.
## How was this patch tested?

Added relevant new tests for both scala and Java APIs

Author: Prashant Sharma <prashsh1@in.ibm.com>
Author: Prashant Sharma <prashant@apache.org>

Closes #14151 from ScrapCodes/SPARK-16496/wholetext.
2017-12-14 11:19:34 -08:00
Takuya UESHIN 64817c423c [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp values for Pandas to respect session timezone
## What changes were proposed in this pull request?

When converting Pandas DataFrame/Series from/to Spark DataFrame using `toPandas()` or pandas udfs, timestamp values behave to respect Python system timezone instead of session timezone.

For example, let's say we use `"America/Los_Angeles"` as session timezone and have a timestamp value `"1970-01-01 00:00:01"` in the timezone. Btw, I'm in Japan so Python timezone would be `"Asia/Tokyo"`.

The timestamp value from current `toPandas()` will be the following:

```
>>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>>> df = spark.createDataFrame([28801], "long").selectExpr("timestamp(value) as ts")
>>> df.show()
+-------------------+
|                 ts|
+-------------------+
|1970-01-01 00:00:01|
+-------------------+

>>> df.toPandas()
                   ts
0 1970-01-01 17:00:01
```

As you can see, the value becomes `"1970-01-01 17:00:01"` because it respects Python timezone.
As we discussed in #18664, we consider this behavior is a bug and the value should be `"1970-01-01 00:00:01"`.

## How was this patch tested?

Added tests and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #19607 from ueshin/issues/SPARK-22395.
2017-11-28 16:45:22 +08:00
gaborgsomogyi 33d43bf1b6 [SPARK-22484][DOC] Document PySpark DataFrame csv writer behavior whe…
## What changes were proposed in this pull request?

In PySpark API Document, DataFrame.write.csv() says that setting the quote parameter to an empty string should turn off quoting. Instead, it uses the [null character](https://en.wikipedia.org/wiki/Null_character) as the quote.

This PR fixes the doc.

## How was this patch tested?

Manual.

```
cd python/docs
make html
open _build/html/pyspark.sql.html
```

Author: gaborgsomogyi <gabor.g.somogyi@gmail.com>

Closes #19814 from gaborgsomogyi/SPARK-22484.
2017-11-28 10:14:35 +09:00
Liang-Chi Hsieh 9d45e675e2 [SPARK-22541][SQL] Explicitly claim that Python udfs can't be conditionally executed with short-curcuit evaluation
## What changes were proposed in this pull request?

Besides conditional expressions such as `when` and `if`, users may want to conditionally execute python udfs by short-curcuit evaluation. We should also explicitly note that python udfs don't support this kind of conditional execution too.

## How was this patch tested?

N/A, just document change.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19787 from viirya/SPARK-22541.
2017-11-21 09:36:37 +01:00
Li Jin 7d039e0c0a [SPARK-22409] Introduce function type argument in pandas_udf
## What changes were proposed in this pull request?

* Add a "function type" argument to pandas_udf.
* Add a new public enum class `PandasUdfType` in pyspark.sql.functions
* Refactor udf related code from pyspark.sql.functions to pyspark.sql.udf
* Merge "PythonUdfType" and "PythonEvalType" into a single enum class "PythonEvalType"

Example:
```
from pyspark.sql.functions import pandas_udf, PandasUDFType

pandas_udf('double', PandasUDFType.SCALAR):
def plus_one(v):
    return v + 1
```

## Design doc
https://docs.google.com/document/d/1KlLaa-xJ3oz28xlEJqXyCAHU3dwFYkFs_ixcUXrJNTc/edit

## How was this patch tested?

Added PandasUDFTests

## TODO:
* [x] Implement proper enum type for `PandasUDFType`
* [x] Update documentation
* [x] Add more tests in PandasUDFTests

Author: Li Jin <ice.xelloss@gmail.com>

Closes #19630 from icexelloss/spark-22409-pandas-udf-type.
2017-11-17 16:43:08 +01:00
Dongjoon Hyun aa88b8dbbb [SPARK-22490][DOC] Add PySpark doc for SparkSession.builder
## What changes were proposed in this pull request?

In PySpark API Document, [SparkSession.build](http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html) is not documented and shows default value description.
```
SparkSession.builder = <pyspark.sql.session.Builder object ...
```

This PR adds the doc.

![screen](https://user-images.githubusercontent.com/9700541/32705514-1bdcafaa-c7ca-11e7-88bf-05566fea42de.png)

The following is the diff of the generated result.

```
$ diff old.html new.html
95a96,101
> <dl class="attribute">
> <dt id="pyspark.sql.SparkSession.builder">
> <code class="descname">builder</code><a class="headerlink" href="#pyspark.sql.SparkSession.builder" title="Permalink to this definition">¶</a></dt>
> <dd><p>A class attribute having a <a class="reference internal" href="#pyspark.sql.SparkSession.Builder" title="pyspark.sql.SparkSession.Builder"><code class="xref py py-class docutils literal"><span class="pre">Builder</span></code></a> to construct <a class="reference internal" href="#pyspark.sql.SparkSession" title="pyspark.sql.SparkSession"><code class="xref py py-class docutils literal"><span class="pre">SparkSession</span></code></a> instances</p>
> </dd></dl>
>
212,216d217
< <dt id="pyspark.sql.SparkSession.builder">
< <code class="descname">builder</code><em class="property"> = &lt;pyspark.sql.session.SparkSession.Builder object&gt;</em><a class="headerlink" href="#pyspark.sql.SparkSession.builder" title="Permalink to this definition">¶</a></dt>
< <dd></dd></dl>
<
< <dl class="attribute">
```

## How was this patch tested?

Manual.

```
cd python/docs
make html
open _build/html/pyspark.sql.html
```

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19726 from dongjoon-hyun/SPARK-22490.
2017-11-15 08:59:29 -08:00
Bryan Cutler 8f0e88df03 [SPARK-20791][PYTHON][FOLLOWUP] Check for unicode column names in createDataFrame with Arrow
## What changes were proposed in this pull request?

If schema is passed as a list of unicode strings for column names, they should be re-encoded to 'utf-8' to be consistent.  This is similar to the #13097 but for creation of DataFrame using Arrow.

## How was this patch tested?

Added new test of using unicode names for schema.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #19738 from BryanCutler/arrow-createDataFrame-followup-unicode-SPARK-20791.
2017-11-15 23:35:13 +09:00
Bryan Cutler 209b9361ac [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFrame from Pandas
## What changes were proposed in this pull request?

This change uses Arrow to optimize the creation of a Spark DataFrame from a Pandas DataFrame. The input df is sliced according to the default parallelism. The optimization is enabled with the existing conf "spark.sql.execution.arrow.enabled" and is disabled by default.

## How was this patch tested?

Added new unit test to create DataFrame with and without the optimization enabled, then compare results.

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>

Closes #19459 from BryanCutler/arrow-createDataFrame-from_pandas-SPARK-20791.
2017-11-13 13:16:01 +09:00
hyukjinkwon 695647bf2e [SPARK-21640][SQL][PYTHON][R][FOLLOWUP] Add errorifexists in SparkR and other documentations
## What changes were proposed in this pull request?

This PR proposes to add `errorifexists` to SparkR API and fix the rest of them describing the mode, mainly, in API documentations as well.

This PR also replaces `convertToJSaveMode` to `setWriteMode` so that string as is is passed to JVM and executes:

b034f2565f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala (L72-L82)

and remove the duplication here:

3f958a9992/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala (L187-L194)

## How was this patch tested?

Manually checked the built documentation. These were mainly found by `` grep -r `error` `` and `grep -r 'error'`.

Also, unit tests added in `test_sparkSQL.R`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19673 from HyukjinKwon/SPARK-21640-followup.
2017-11-09 15:00:31 +09:00
ptkool d01044233c [SPARK-22456][SQL] Add support for dayofweek function
## What changes were proposed in this pull request?
This PR adds support for a new function called `dayofweek` that returns the day of the week of the given argument as an integer value in the range 1-7, where 1 represents Sunday.

## How was this patch tested?
Unit tests and manual tests.

Author: ptkool <michael.styles@shopify.com>

Closes #19672 from ptkool/day_of_week_function.
2017-11-09 14:44:39 +09:00
Bryan Cutler 1d341042d6 [SPARK-22417][PYTHON] Fix for createDataFrame from pandas.DataFrame with timestamp
## What changes were proposed in this pull request?

Currently, a pandas.DataFrame that contains a timestamp of type 'datetime64[ns]' when converted to a Spark DataFrame with `createDataFrame` will interpret the values as LongType. This fix will check for a timestamp type and convert it to microseconds which will allow Spark to read as TimestampType.

## How was this patch tested?

Added unit test to verify Spark schema is expected for TimestampType and DateType when created from pandas

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #19646 from BryanCutler/pyspark-non-arrow-createDataFrame-ts-fix-SPARK-22417.
2017-11-07 21:32:37 +01:00
Marco Gaido e7adb7d7a6 [SPARK-22437][PYSPARK] default mode for jdbc is wrongly set to None
## What changes were proposed in this pull request?

When writing using jdbc with python currently we are wrongly assigning by default None as writing mode. This is due to wrongly calling mode on the `_jwrite` object instead of `self` and it causes an exception.

## How was this patch tested?

manual tests

Author: Marco Gaido <mgaido@hortonworks.com>

Closes #19654 from mgaido91/SPARK-22437.
2017-11-04 16:59:58 +09:00
hyukjinkwon 41b60125b6 [SPARK-22369][PYTHON][DOCS] Exposes catalog API documentation in PySpark
## What changes were proposed in this pull request?

This PR proposes to add a link from `spark.catalog(..)` to `Catalog` and expose Catalog APIs in PySpark as below:

<img width="740" alt="2017-10-29 12 25 46" src="https://user-images.githubusercontent.com/6477701/32135863-f8e9b040-bc40-11e7-92ad-09c8043a1295.png">

<img width="1131" alt="2017-10-29 12 26 33" src="https://user-images.githubusercontent.com/6477701/32135849-bb257b86-bc40-11e7-9eda-4d58fc1301c2.png">

Note that this is not shown in the list on the top - https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql

<img width="674" alt="2017-10-29 12 30 58" src="https://user-images.githubusercontent.com/6477701/32135854-d50fab16-bc40-11e7-9181-812c56fd22f5.png">

This is basically similar with `DataFrameReader` and `DataFrameWriter`.

## How was this patch tested?

Manually built the doc.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19596 from HyukjinKwon/SPARK-22369.
2017-11-02 15:22:52 +01:00
Liang-Chi Hsieh 07f390a27d [SPARK-22347][PYSPARK][DOC] Add document to notice users for using udfs with conditional expressions
## What changes were proposed in this pull request?

Under the current execution mode of Python UDFs, we don't well support Python UDFs as branch values or else value in CaseWhen expression.

Since to fix it might need the change not small (e.g., #19592) and this issue has simpler workaround. We should just notice users in the document about this.

## How was this patch tested?

Only document change.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19617 from viirya/SPARK-22347-3.
2017-11-01 13:09:35 +01:00
hyukjinkwon 188b47e683 [SPARK-22379][PYTHON] Reduce duplication setUpClass and tearDownClass in PySpark SQL tests
## What changes were proposed in this pull request?

This PR propose to add `ReusedSQLTestCase` which deduplicate `setUpClass` and  `tearDownClass` in `sql/tests.py`.

## How was this patch tested?

Jenkins tests and manual tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19595 from HyukjinKwon/reduce-dupe.
2017-10-30 11:50:22 +09:00
Takuya UESHIN 4c5269f1aa [SPARK-22370][SQL][PYSPARK] Config values should be captured in Driver.
## What changes were proposed in this pull request?

`ArrowEvalPythonExec` and `FlatMapGroupsInPandasExec` are refering config values of `SQLConf` in function for `mapPartitions`/`mapPartitionsInternal`, but we should capture them in Driver.

## How was this patch tested?

Added a test and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #19587 from ueshin/issues/SPARK-22370.
2017-10-28 18:33:09 +01:00
Bryan Cutler 17af727e38 [SPARK-21375][PYSPARK][SQL] Add Date and Timestamp support to ArrowConverters for toPandas() Conversion
## What changes were proposed in this pull request?

Adding date and timestamp support with Arrow for `toPandas()` and `pandas_udf`s.  Timestamps are stored in Arrow as UTC and manifested to the user as timezone-naive localized to the Python system timezone.

## How was this patch tested?

Added Scala tests for date and timestamp types under ArrowConverters, ArrowUtils, and ArrowWriter suites.  Added Python tests for `toPandas()` and `pandas_udf`s with date and timestamp types.

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18664 from BryanCutler/arrow-date-timestamp-SPARK-21375.
2017-10-26 23:02:46 -07:00
hyukjinkwon d9798c834f [SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?

This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.

This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:

**Before**

<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />

**After**

<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />

For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):

```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```

so, it won't actually mess up the terminal much unless it is intended.

If this is intendedly enabled, it'd should as below:

```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
  "Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```

These instances were found by:

```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```

## How was this patch tested?

Manually tested.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-24 12:44:47 +09:00
Takuya UESHIN b8624b06e5 [SPARK-20396][SQL][PYSPARK][FOLLOW-UP] groupby().apply() with pandas udf
## What changes were proposed in this pull request?

This is a follow-up of #18732.
This pr modifies `GroupedData.apply()` method to convert pandas udf to grouped udf implicitly.

## How was this patch tested?

Exisiting tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #19517 from ueshin/issues/SPARK-20396/fup2.
2017-10-20 12:44:30 -07:00
Zhenhua Wang 655f6f86f8 [SPARK-22208][SQL] Improve percentile_approx by not rounding up targetError and starting from index 0
## What changes were proposed in this pull request?

Currently percentile_approx never returns the first element when percentile is in (relativeError, 1/N], where relativeError default 1/10000, and N is the total number of elements. But ideally, percentiles in [0, 1/N] should all return the first element as the answer.

For example, given input data 1 to 10, if a user queries 10% (or even less) percentile, it should return 1, because the first value 1 already reaches 10%. Currently it returns 2.

Based on the paper, targetError is not rounded up, and searching index should start from 0 instead of 1. By following the paper, we should be able to fix the cases mentioned above.

## How was this patch tested?

Added a new test case and fix existing test cases.

Author: Zhenhua Wang <wzh_zju@163.com>

Closes #19438 from wzhfy/improve_percentile_approx.
2017-10-11 00:16:12 -07:00
Li Jin bfc7e1fe1a [SPARK-20396][SQL][PYSPARK] groupby().apply() with pandas udf
## What changes were proposed in this pull request?

This PR adds an apply() function on df.groupby(). apply() takes a pandas udf that is a transformation on `pandas.DataFrame` -> `pandas.DataFrame`.

Static schema
-------------------
```
schema = df.schema

pandas_udf(schema)
def normalize(df):
    df = df.assign(v1 = (df.v1 - df.v1.mean()) / df.v1.std()
    return df

df.groupBy('id').apply(normalize)
```
Dynamic schema
-----------------------
**This use case is removed from the PR and we will discuss this as a follow up. See discussion https://github.com/apache/spark/pull/18732#pullrequestreview-66583248**

Another example to use pd.DataFrame dtypes as output schema of the udf:

```
sample_df = df.filter(df.id == 1).toPandas()

def foo(df):
      ret = # Some transformation on the input pd.DataFrame
      return ret

foo_udf = pandas_udf(foo, foo(sample_df).dtypes)

df.groupBy('id').apply(foo_udf)
```
In interactive use case, user usually have a sample pd.DataFrame to test function `foo` in their notebook. Having been able to use `foo(sample_df).dtypes` frees user from specifying the output schema of `foo`.

Design doc: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md

## How was this patch tested?
* Added GroupbyApplyTest

Author: Li Jin <ice.xelloss@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Author: Bryan Cutler <cutlerb@gmail.com>

Closes #18732 from icexelloss/groupby-apply-SPARK-20396.
2017-10-11 07:32:01 +09:00
Takuya UESHIN af8a34c787 [SPARK-22159][SQL][FOLLOW-UP] Make config names consistently end with "enabled".
## What changes were proposed in this pull request?

This is a follow-up of #19384.

In the previous pr, only definitions of the config names were modified, but we also need to modify the names in runtime or tests specified as string literal.

## How was this patch tested?

Existing tests but modified the config names.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #19462 from ueshin/issues/SPARK-22159/fup1.
2017-10-09 22:35:34 -07:00
Bryan Cutler 7bf4da8a33 [MINOR] Fixed up pandas_udf related docs and formatting
## What changes were proposed in this pull request?

Fixed some minor issues with pandas_udf related docs and formatting.

## How was this patch tested?

NA

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #19375 from BryanCutler/arrow-pandas_udf-cleanup-minor.
2017-09-28 10:24:51 +09:00
goldmedal 1fdfe69352 [SPARK-22112][PYSPARK] Supports RDD of strings as input in spark.read.csv in PySpark
## What changes were proposed in this pull request?
We added a method to the scala API for creating a `DataFrame` from `DataSet[String]` storing CSV in [SPARK-15463](https://issues.apache.org/jira/browse/SPARK-15463) but PySpark doesn't have `Dataset` to support this feature. Therfore, I add an API to create a `DataFrame` from `RDD[String]` storing csv and it's also consistent with PySpark's `spark.read.json`.

For example as below
```
>>> rdd = sc.textFile('python/test_support/sql/ages.csv')
>>> df2 = spark.read.csv(rdd)
>>> df2.dtypes
[('_c0', 'string'), ('_c1', 'string')]
```
## How was this patch tested?
add unit test cases.

Author: goldmedal <liugs963@gmail.com>

Closes #19339 from goldmedal/SPARK-22112.
2017-09-27 11:19:45 +09:00
Bryan Cutler d8e825e3bc [SPARK-22106][PYSPARK][SQL] Disable 0-parameter pandas_udf and add doctests
## What changes were proposed in this pull request?

This change disables the use of 0-parameter pandas_udfs due to the API being overly complex and awkward, and can easily be worked around by using an index column as an input argument.  Also added doctests for pandas_udfs which revealed bugs for handling empty partitions and using the pandas_udf decorator.

## How was this patch tested?

Reworked existing 0-parameter test to verify error is raised, added doctest for pandas_udf, added new tests for empty partition and decorator usage.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #19325 from BryanCutler/arrow-pandas_udf-0-param-remove-SPARK-22106.
2017-09-26 10:54:00 +09:00
Zhenhua Wang 365a29bdbf [SPARK-22100][SQL] Make percentile_approx support date/timestamp type and change the output type to be the same as input type
## What changes were proposed in this pull request?

The `percentile_approx` function previously accepted numeric type input and output double type results.

But since all numeric types, date and timestamp types are represented as numerics internally, `percentile_approx` can support them easily.

After this PR, it supports date type, timestamp type and numeric types as input types. The result type is also changed to be the same as the input type, which is more reasonable for percentiles.

This change is also required when we generate equi-height histograms for these types.

## How was this patch tested?

Added a new test and modified some existing tests.

Author: Zhenhua Wang <wangzhenhua@huawei.com>

Closes #19321 from wzhfy/approx_percentile_support_types.
2017-09-25 09:28:42 -07:00
Liang-Chi Hsieh 3e6a714c9e [SPARK-21766][PYSPARK][SQL] DataFrame toPandas() raises ValueError with nullable int columns
## What changes were proposed in this pull request?

When calling `DataFrame.toPandas()` (without Arrow enabled), if there is a `IntegralType` column (`IntegerType`, `ShortType`, `ByteType`) that has null values the following exception is thrown:

    ValueError: Cannot convert non-finite values (NA or inf) to integer

This is because the null values first get converted to float NaN during the construction of the Pandas DataFrame in `from_records`, and then it is attempted to be converted back to to an integer where it fails.

The fix is going to check if the Pandas DataFrame can cause such failure when converting, if so, we don't do the conversion and use the inferred type by Pandas.

Closes #18945

## How was this patch tested?

Added pyspark test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19319 from viirya/SPARK-21766.
2017-09-22 22:39:47 +09:00
Bryan Cutler 27fc536d9a [SPARK-21190][PYSPARK] Python Vectorized UDFs
This PR adds vectorized UDFs to the Python API

**Proposed API**
Introduce a flag to turn on vectorization for a defined UDF, for example:

```
pandas_udf(DoubleType())
def plus(a, b)
    return a + b
```
or

```
plus = pandas_udf(lambda a, b: a + b, DoubleType())
```
Usage is the same as normal UDFs

0-parameter UDFs
pandas_udf functions can declare an optional `**kwargs` and when evaluated, will contain a key "size" that will give the required length of the output.  For example:

```
pandas_udf(LongType())
def f0(**kwargs):
    return pd.Series(1).repeat(kwargs["size"])

df.select(f0())
```

Added new unit tests in pyspark.sql that are enabled if pyarrow and Pandas are available.

- [x] Fix support for promoted types with null values
- [ ] Discuss 0-param UDF API (use of kwargs)
- [x] Add tests for chained UDFs
- [ ] Discuss behavior when pyarrow not installed / enabled
- [ ] Cleanup pydoc and add user docs

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18659 from BryanCutler/arrow-vectorized-udfs-SPARK-21404.
2017-09-22 16:17:50 +08:00
Sean Owen e17901d6df [SPARK-22049][DOCS] Confusing behavior of from_utc_timestamp and to_utc_timestamp
## What changes were proposed in this pull request?

Clarify behavior of to_utc_timestamp/from_utc_timestamp with an example

## How was this patch tested?

Doc only change / existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #19276 from srowen/SPARK-22049.
2017-09-20 20:47:17 +09:00
Maciej Bryński f4073020ad [SPARK-22032][PYSPARK] Speed up StructType conversion
## What changes were proposed in this pull request?

StructType.fromInternal is calling f.fromInternal(v) for every field.
We can use precalculated information about type to limit the number of function calls. (its calculated once per StructType and used in per record calculations)

Benchmarks (Python profiler)
```
df = spark.range(10000000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache()
df.count()
df.rdd.map(lambda x: x).count()
```

Before
```
310274584 function calls (300272456 primitive calls) in 1320.684 seconds

Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 10000000  253.417    0.000  486.991    0.000 types.py:619(<listcomp>)
 30000000  192.272    0.000 1009.986    0.000 types.py:612(fromInternal)
100000000  176.140    0.000  176.140    0.000 types.py:88(fromInternal)
 20000000  156.832    0.000  328.093    0.000 types.py:1471(_create_row)
    14000  107.206    0.008 1237.917    0.088 {built-in method loads}
 20000000   80.176    0.000 1090.162    0.000 types.py:1468(<lambda>)
```

After
```
210274584 function calls (200272456 primitive calls) in 1035.974 seconds

Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 30000000  215.845    0.000  698.748    0.000 types.py:612(fromInternal)
 20000000  165.042    0.000  351.572    0.000 types.py:1471(_create_row)
    14000  116.834    0.008  946.791    0.068 {built-in method loads}
 20000000   87.326    0.000  786.073    0.000 types.py:1468(<lambda>)
 20000000   85.477    0.000  134.607    0.000 types.py:1519(__new__)
 10000000   65.777    0.000  126.712    0.000 types.py:619(<listcomp>)
```

Main difference is types.py:619(<listcomp>) and types.py:88(fromInternal) (which is removed in After)
The number of function calls is 100 million less. And performance is 20% better.

Benchmark (worst case scenario.)

Test
```
df = spark.range(1000000).selectExpr("current_timestamp as id0", "current_timestamp as id1", "current_timestamp as id2", "current_timestamp as id3", "current_timestamp as id4", "current_timestamp as id5", "current_timestamp as id6", "current_timestamp as id7", "current_timestamp as id8", "current_timestamp as id9").cache()
df.count()
df.rdd.map(lambda x: x).count()
```

Before
```
31166064 function calls (31163984 primitive calls) in 150.882 seconds
```

After
```
31166064 function calls (31163984 primitive calls) in 153.220 seconds
```

IMPORTANT:
The benchmark was done on top of https://github.com/apache/spark/pull/19246.
Without https://github.com/apache/spark/pull/19246 the performance improvement will be even greater.

## How was this patch tested?

Existing tests.
Performance benchmark.

Author: Maciej Bryński <maciek-github@brynski.pl>

Closes #19249 from maver1ck/spark_22032.
2017-09-18 02:34:44 +09:00
goldmedal a28728a9af [SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json support converting MapType to json for PySpark and SparkR
## What changes were proposed in this pull request?
In previous work SPARK-21513, we has allowed `MapType` and `ArrayType` of `MapType`s convert to a json string but only for Scala API. In this follow-up PR, we will make SparkSQL support it for PySpark and SparkR, too. We also fix some little bugs and comments of the previous work in this follow-up PR.

### For PySpark
```
>>> data = [(1, {"name": "Alice"})]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'{"name":"Alice")']
>>> data = [(1, [{"name": "Alice"}, {"name": "Bob"}])]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'[{"name":"Alice"},{"name":"Bob"}]')]
```
### For SparkR
```
# Converts a map into a JSON object
df2 <- sql("SELECT map('name', 'Bob')) as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
# Converts an array of maps into a JSON array
df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
```
## How was this patch tested?
Add unit test cases.

cc viirya HyukjinKwon

Author: goldmedal <liugs963@gmail.com>

Closes #19223 from goldmedal/SPARK-21513-fp-PySaprkAndSparkR.
2017-09-15 11:53:10 +09:00
Peter Szalai 520d92a191 [SPARK-20098][PYSPARK] dataType's typeName fix
## What changes were proposed in this pull request?
`typeName`  classmethod has been fixed by using type -> typeName map.

## How was this patch tested?
local build

Author: Peter Szalai <szalaipeti.vagyok@gmail.com>

Closes #17435 from szalai1/datatype-gettype-fix.
2017-09-10 17:47:45 +09:00
Yanbo Liang e4d8f9a36a [MINOR][SQL] Correct DataFrame doc.
## What changes were proposed in this pull request?
Correct DataFrame doc.

## How was this patch tested?
Only doc change, no tests.

Author: Yanbo Liang <ybliang8@gmail.com>

Closes #19173 from yanboliang/df-doc.
2017-09-09 09:25:12 -07:00
hyukjinkwon 8598d03a00 [SPARK-15243][ML][SQL][PYTHON] Add missing support for unicode in Param methods & functions in dataframe
## What changes were proposed in this pull request?

This PR proposes to support unicodes in Param methods in ML, other missed functions in DataFrame.

For example, this causes a `ValueError` in Python 2.x when param is a unicode string:

```python
>>> from pyspark.ml.classification import LogisticRegression
>>> lr = LogisticRegression()
>>> lr.hasParam("threshold")
True
>>> lr.hasParam(u"threshold")
Traceback (most recent call last):
 ...
    raise TypeError("hasParam(): paramName must be a string")
TypeError: hasParam(): paramName must be a string
```

This PR is based on https://github.com/apache/spark/pull/13036

## How was this patch tested?

Unit tests in `python/pyspark/ml/tests.py` and `python/pyspark/sql/tests.py`.

Author: hyukjinkwon <gurwls223@gmail.com>
Author: sethah <seth.hendrickson16@gmail.com>

Closes #17096 from HyukjinKwon/SPARK-15243.
2017-09-08 11:57:33 -07:00
Takuya UESHIN 57bc1e9eb4 [SPARK-21950][SQL][PYTHON][TEST] pyspark.sql.tests.SQLTests2 should stop SparkContext.
## What changes were proposed in this pull request?

`pyspark.sql.tests.SQLTests2` doesn't stop newly created spark context in the test and it might affect the following tests.
This pr makes `pyspark.sql.tests.SQLTests2` stop `SparkContext`.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #19158 from ueshin/issues/SPARK-21950.
2017-09-08 14:26:07 +09:00
hyukjinkwon 07fd68a29f [SPARK-21897][PYTHON][R] Add unionByName API to DataFrame in Python and R
## What changes were proposed in this pull request?

This PR proposes to add a wrapper for `unionByName` API to R and Python as well.

**Python**

```python
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()
```

```
+----+----+----+
|col0|col1|col3|
+----+----+----+
|   1|   2|   3|
|   6|   4|   5|
+----+----+----+
```

**R**

```R
df1 <- select(createDataFrame(mtcars), "carb", "am", "gear")
df2 <- select(createDataFrame(mtcars), "am", "gear", "carb")
head(unionByName(limit(df1, 2), limit(df2, 2)))
```

```
  carb am gear
1    4  1    4
2    4  1    4
3    4  1    4
4    4  1    4
```

## How was this patch tested?

Doctests for Python and unit test added in `test_sparkSQL.R` for R.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19105 from HyukjinKwon/unionByName-r-python.
2017-09-03 21:03:21 +09:00
hyukjinkwon 648a8626b8 [SPARK-21789][PYTHON] Remove obsolete codes for parsing abstract schema strings
## What changes were proposed in this pull request?

This PR proposes to remove private functions that look not used in the main codes, `_split_schema_abstract`, `_parse_field_abstract`, `_parse_schema_abstract` and `_infer_schema_type`.

## How was this patch tested?

Existing tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18647 from HyukjinKwon/remove-abstract.
2017-09-01 13:09:24 +09:00
hyukjinkwon 5cd8ea99f0 [SPARK-21779][PYTHON] Simpler DataFrame.sample API in Python
## What changes were proposed in this pull request?

This PR make `DataFrame.sample(...)` can omit `withReplacement` defaulting `False`, consistently with equivalent Scala / Java API.

In short, the following examples are allowed:

```python
>>> df = spark.range(10)
>>> df.sample(0.5).count()
7
>>> df.sample(fraction=0.5).count()
3
>>> df.sample(0.5, seed=42).count()
5
>>> df.sample(fraction=0.5, seed=42).count()
5
```

In addition, this PR also adds some type checking logics as below:

```python
>>> df = spark.range(10)
>>> df.sample().count()
...
TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got [].
>>> df.sample(True).count()
...
TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got [<type 'bool'>].
>>> df.sample(42).count()
...
TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got [<type 'int'>].
>>> df.sample(fraction=False, seed="a").count()
...
TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got [<type 'bool'>, <type 'str'>].
>>> df.sample(seed=[1]).count()
...
TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got [<type 'list'>].
>>> df.sample(withReplacement="a", fraction=0.5, seed=1)
...
TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got [<type 'str'>, <type 'float'>, <type 'int'>].
```

## How was this patch tested?

Manually tested, unit tests added in doc tests and manually checked the built documentation for Python.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18999 from HyukjinKwon/SPARK-21779.
2017-09-01 13:01:23 +09:00
Liang-Chi Hsieh ecf437a648 [SPARK-21534][SQL][PYSPARK] PickleException when creating dataframe from python row with empty bytearray
## What changes were proposed in this pull request?

`PickleException` is thrown when creating dataframe from python row with empty bytearray

    spark.createDataFrame(spark.sql("select unhex('') as xx").rdd.map(lambda x: {"abc": x.xx})).show()

    net.razorvine.pickle.PickleException: invalid pickle data for bytearray; expected 1 or 2 args, got 0
    	at net.razorvine.pickle.objects.ByteArrayConstructor.construct(ByteArrayConstructor.java
        ...

`ByteArrayConstructor` doesn't deal with empty byte array pickled by Python3.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19085 from viirya/SPARK-21534.
2017-08-31 12:55:38 +09:00
Dongjoon Hyun d8f4540863 [SPARK-21839][SQL] Support SQL config for ORC compression
## What changes were proposed in this pull request?

This PR aims to support `spark.sql.orc.compression.codec` like Parquet's `spark.sql.parquet.compression.codec`. Users can use SQLConf to control ORC compression, too.

## How was this patch tested?

Pass the Jenkins with new and updated test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19055 from dongjoon-hyun/SPARK-21839.
2017-08-31 08:16:58 +09:00
vinodkc 51620e288b [SPARK-21756][SQL] Add JSON option to allow unquoted control characters
## What changes were proposed in this pull request?

This patch adds allowUnquotedControlChars option in JSON data source to allow JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters)

## How was this patch tested?
Add new test cases

Author: vinodkc <vinod.kc.in@gmail.com>

Closes #19008 from vinodkc/br_fix_SPARK-21756.
2017-08-25 10:18:03 -07:00
hyukjinkwon dc5d34d8dc [SPARK-19165][PYTHON][SQL] PySpark APIs using columns as arguments should validate input types for column
## What changes were proposed in this pull request?

While preparing to take over https://github.com/apache/spark/pull/16537, I realised a (I think) better approach to make the exception handling in one point.

This PR proposes to fix `_to_java_column` in `pyspark.sql.column`, which most of functions in `functions.py` and some other APIs use. This `_to_java_column` basically looks not working with other types than `pyspark.sql.column.Column` or string (`str` and `unicode`).

If this is not `Column`, then it calls `_create_column_from_name` which calls `functions.col` within JVM:

42b9eda80e/sql/core/src/main/scala/org/apache/spark/sql/functions.scala (L76)

And it looks we only have `String` one with `col`.

So, these should work:

```python
>>> from pyspark.sql.column import _to_java_column, Column
>>> _to_java_column("a")
JavaObject id=o28
>>> _to_java_column(u"a")
JavaObject id=o29
>>> _to_java_column(spark.range(1).id)
JavaObject id=o33
```

whereas these do not:

```python
>>> _to_java_column(1)
```
```
...
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.lang.Integer]) does not exist
    ...
```

```python
>>> _to_java_column([])
```
```
...
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.util.ArrayList]) does not exist
    ...
```

```python
>>> class A(): pass
>>> _to_java_column(A())
```
```
...
AttributeError: 'A' object has no attribute '_get_object_id'
```

Meaning most of functions using `_to_java_column` such as `udf` or `to_json` or some other APIs throw an exception as below:

```python
>>> from pyspark.sql.functions import udf
>>> udf(lambda x: x)(None)
```

```
...
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.col.
: java.lang.NullPointerException
    ...
```

```python
>>> from pyspark.sql.functions import to_json
>>> to_json(None)
```

```
...
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.col.
: java.lang.NullPointerException
    ...
```

**After this PR**:

```python
>>> from pyspark.sql.functions import udf
>>> udf(lambda x: x)(None)
...
```

```
TypeError: Invalid argument, not a string or column: None of type <type 'NoneType'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' functions.
```

```python
>>> from pyspark.sql.functions import to_json
>>> to_json(None)
```

```
...
TypeError: Invalid argument, not a string or column: None of type <type 'NoneType'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' functions.
```

## How was this patch tested?

Unit tests added in `python/pyspark/sql/tests.py` and manual tests.

Author: hyukjinkwon <gurwls223@gmail.com>
Author: zero323 <zero323@users.noreply.github.com>

Closes #19027 from HyukjinKwon/SPARK-19165.
2017-08-24 20:29:03 +09:00
Andrew Ray 10be01848e [SPARK-21566][SQL][PYTHON] Python method for summary
## What changes were proposed in this pull request?

Adds the recently added `summary` method to the python dataframe interface.

## How was this patch tested?

Additional inline doctests.

Author: Andrew Ray <ray.andrew@gmail.com>

Closes #18762 from aray/summary-py.
2017-08-18 18:10:54 -07:00
Nicholas Chammas 9660831050 [SPARK-21712][PYSPARK] Clarify type error for Column.substr()
Proposed changes:
* Clarify the type error that `Column.substr()` gives.

Test plan:
* Tested this manually.
* Test code:
    ```python
    from pyspark.sql.functions import col, lit
    spark.createDataFrame([['nick']], schema=['name']).select(col('name').substr(0, lit(1)))
    ```
* Before:
    ```
    TypeError: Can not mix the type
    ```
* After:
    ```
    TypeError: startPos and length must be the same type. Got <class 'int'> and
    <class 'pyspark.sql.column.Column'>, respectively.
    ```

Author: Nicholas Chammas <nicholas.chammas@gmail.com>

Closes #18926 from nchammas/SPARK-21712-substr-type-error.
2017-08-16 11:19:15 +09:00
byakuinss 0fcde87aad [SPARK-21658][SQL][PYSPARK] Add default None for value in na.replace in PySpark
## What changes were proposed in this pull request?
JIRA issue: https://issues.apache.org/jira/browse/SPARK-21658

Add default None for value in `na.replace` since `Dataframe.replace` and `DataframeNaFunctions.replace` are alias.

The default values are the same now.
```
>>> df = sqlContext.createDataFrame([('Alice', 10, 80.0)])
>>> df.replace({"Alice": "a"}).first()
Row(_1=u'a', _2=10, _3=80.0)
>>> df.na.replace({"Alice": "a"}).first()
Row(_1=u'a', _2=10, _3=80.0)
```

## How was this patch tested?
Existing tests.

cc viirya

Author: byakuinss <grace.chinhanyu@gmail.com>

Closes #18895 from byakuinss/SPARK-21658.
2017-08-15 00:41:01 +09:00
bravo-zhang 84454d7d33 [SPARK-14932][SQL] Allow DataFrame.replace() to replace values with None
## What changes were proposed in this pull request?

Currently `df.na.replace("*", Map[String, String]("NULL" -> null))` will produce exception.
This PR enables passing null/None as value in the replacement map in DataFrame.replace().
Note that the replacement map keys and values should still be the same type, while the values can have a mix of null/None and that type.
This PR enables following operations for example:
`df.na.replace("*", Map[String, String]("NULL" -> null))`(scala)
`df.na.replace("*", Map[Any, Any](60 -> null, 70 -> 80))`(scala)
`df.na.replace('Alice', None)`(python)
`df.na.replace([10, 20])`(python, replacing with None is by default)
One use case could be: I want to replace all the empty strings with null/None because they were incorrectly generated and then drop all null/None data
`df.na.replace("*", Map("" -> null)).na.drop()`(scala)
`df.replace(u'', None).dropna()`(python)

## How was this patch tested?

Scala unit test.
Python doctest and unit test.

Author: bravo-zhang <mzhang1230@gmail.com>

Closes #18820 from bravo-zhang/spark-14932.
2017-08-09 17:42:21 -07:00
Mac 4f7ec3a316 [SPARK][DOCS] Added note on meaning of position to substring function
## What changes were proposed in this pull request?

Enhanced some existing documentation

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Mac <maclockard@gmail.com>

Closes #18710 from maclockard/maclockard-patch-1.
2017-08-07 17:16:03 +01:00
hyukjinkwon b56f79cc35 [SPARK-20090][PYTHON] Add StructType.fieldNames in PySpark
## What changes were proposed in this pull request?

This PR proposes `StructType.fieldNames` that returns a copy of a field name list rather than a (undocumented) `StructType.names`.

There are two points here:

  - API consistency with Scala/Java

  - Provide a safe way to get the field names. Manipulating these might cause unexpected behaviour as below:

    ```python
    from pyspark.sql.types import *

    struct = StructType([StructField("f1", StringType(), True)])
    names = struct.names
    del names[0]
    spark.createDataFrame([{"f1": 1}], struct).show()
    ```

    ```
    ...
    java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 1 fields are required while 0 values are provided.
    	at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:138)
    	at org.apache.spark.sql.SparkSession$$anonfun$6.apply(SparkSession.scala:741)
    	at org.apache.spark.sql.SparkSession$$anonfun$6.apply(SparkSession.scala:741)
    ...
    ```

## How was this patch tested?

Added tests in `python/pyspark/sql/tests.py`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18618 from HyukjinKwon/SPARK-20090.
2017-07-28 20:59:32 -07:00
Takuya UESHIN 2ff35a057e [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and add ArrayType and StructType support.
## What changes were proposed in this pull request?

This is a refactoring of `ArrowConverters` and related classes.

1. Refactor `ColumnWriter` as `ArrowWriter`.
2. Add `ArrayType` and `StructType` support.
3. Refactor `ArrowConverters` to skip intermediate `ArrowRecordBatch` creation.

## How was this patch tested?

Added some tests and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18655 from ueshin/issues/SPARK-21440.
2017-07-27 19:19:51 +08:00
gatorsmile ebc24a9b7f [SPARK-20586][SQL] Add deterministic to ScalaUDF
### What changes were proposed in this pull request?
Like [Hive UDFType](https://hive.apache.org/javadocs/r2.0.1/api/org/apache/hadoop/hive/ql/udf/UDFType.html), we should allow users to add the extra flags for ScalaUDF and JavaUDF too. _stateful_/_impliesOrder_ are not applicable to our Scala UDF. Thus, we only add the following two flags.

- deterministic: Certain optimizations should not be applied if UDF is not deterministic. Deterministic UDF returns same result each time it is invoked with a particular input. This determinism just needs to hold within the context of a query.

When the deterministic flag is not correctly set, the results could be wrong.

For ScalaUDF in Dataset APIs, users can call the following extra APIs for `UserDefinedFunction` to make the corresponding changes.
- `nonDeterministic`: Updates UserDefinedFunction to non-deterministic.

Also fixed the Java UDF name loss issue.

Will submit a separate PR for `distinctLike`  for UDAF

### How was this patch tested?
Added test cases for both ScalaUDF

Author: gatorsmile <gatorsmile@gmail.com>
Author: Wenchen Fan <cloud0fan@gmail.com>

Closes #17848 from gatorsmile/udfRegister.
2017-07-25 17:19:44 -07:00
Xiang Gao b7a40f64e6 [SPARK-16542][SQL][PYSPARK] Fix bugs about types that result an array of null when creating DataFrame using python
## What changes were proposed in this pull request?
This is the reopen of https://github.com/apache/spark/pull/14198, with merge conflicts resolved.

ueshin Could you please take a look at my code?

Fix bugs about types that result an array of null when creating DataFrame using python.

Python's array.array have richer type than python itself, e.g. we can have `array('f',[1,2,3])` and `array('d',[1,2,3])`. Codes in spark-sql and pyspark didn't take this into consideration which might cause a problem that you get an array of null values when you have `array('f')` in your rows.

A simple code to reproduce this bug is:

```
from pyspark import SparkContext
from pyspark.sql import SQLContext,Row,DataFrame
from array import array

sc = SparkContext()
sqlContext = SQLContext(sc)

row1 = Row(floatarray=array('f',[1,2,3]), doublearray=array('d',[1,2,3]))
rows = sc.parallelize([ row1 ])
df = sqlContext.createDataFrame(rows)
df.show()
```

which have output

```
+---------------+------------------+
|    doublearray|        floatarray|
+---------------+------------------+
|[1.0, 2.0, 3.0]|[null, null, null]|
+---------------+------------------+
```

## How was this patch tested?

New test case added

Author: Xiang Gao <qasdfgtyuiop@gmail.com>
Author: Gao, Xiang <qasdfgtyuiop@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18444 from zasdfgbnm/fix_array_infer.
2017-07-20 12:46:06 +09:00
hyukjinkwon 4ce735eed1 [SPARK-21394][SPARK-21432][PYTHON] Reviving callable object/partial function support in UDF in PySpark
## What changes were proposed in this pull request?

This PR proposes to avoid `__name__` in the tuple naming the attributes assigned directly from the wrapped function to the wrapper function, and use `self._name` (`func.__name__` or `obj.__class__.name__`).

After SPARK-19161, we happened to break callable objects as UDFs in Python as below:

```python
from pyspark.sql import functions

class F(object):
    def __call__(self, x):
        return x

foo = F()
udf = functions.udf(foo)
```

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/sql/functions.py", line 2142, in udf
    return _udf(f=f, returnType=returnType)
  File ".../spark/python/pyspark/sql/functions.py", line 2133, in _udf
    return udf_obj._wrapped()
  File ".../spark/python/pyspark/sql/functions.py", line 2090, in _wrapped
    functools.wraps(self.func)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/functools.py", line 33, in update_wrapper
    setattr(wrapper, attr, getattr(wrapped, attr))
AttributeError: F instance has no attribute '__name__'
```

This worked in Spark 2.1:

```python
from pyspark.sql import functions

class F(object):
    def __call__(self, x):
        return x

foo = F()
udf = functions.udf(foo)
spark.range(1).select(udf("id")).show()
```

```
+-----+
|F(id)|
+-----+
|    0|
+-----+
```

**After**

```python
from pyspark.sql import functions

class F(object):
    def __call__(self, x):
        return x

foo = F()
udf = functions.udf(foo)
spark.range(1).select(udf("id")).show()
```

```
+-----+
|F(id)|
+-----+
|    0|
+-----+
```

_In addition, we also happened to break partial functions as below_:

```python
from pyspark.sql import functions
from functools import partial

partial_func = partial(lambda x: x, x=1)
udf = functions.udf(partial_func)
```

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/sql/functions.py", line 2154, in udf
    return _udf(f=f, returnType=returnType)
  File ".../spark/python/pyspark/sql/functions.py", line 2145, in _udf
    return udf_obj._wrapped()
  File ".../spark/python/pyspark/sql/functions.py", line 2099, in _wrapped
    functools.wraps(self.func, assigned=assignments)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/functools.py", line 33, in update_wrapper
    setattr(wrapper, attr, getattr(wrapped, attr))
AttributeError: 'functools.partial' object has no attribute '__module__'
```

This worked in Spark 2.1:

```python
from pyspark.sql import functions
from functools import partial

partial_func = partial(lambda x: x, x=1)
udf = functions.udf(partial_func)
spark.range(1).select(udf()).show()
```

```
+---------+
|partial()|
+---------+
|        1|
+---------+
```

**After**

```python
from pyspark.sql import functions
from functools import partial

partial_func = partial(lambda x: x, x=1)
udf = functions.udf(partial_func)
spark.range(1).select(udf()).show()
```

```
+---------+
|partial()|
+---------+
|        1|
+---------+
```

## How was this patch tested?

Unit tests in `python/pyspark/sql/tests.py` and manual tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18615 from HyukjinKwon/callable-object.
2017-07-17 00:37:36 -07:00
hyukjinkwon ebc124d4c4 [SPARK-21365][PYTHON] Deduplicate logics parsing DDL type/schema definition
## What changes were proposed in this pull request?

This PR deals with four points as below:

- Reuse existing DDL parser APIs rather than reimplementing within PySpark

- Support DDL formatted string, `field type, field type`.

- Support case-insensitivity for parsing.

- Support nested data types as below:

  **Before**
  ```
  >>> spark.createDataFrame([[[1]]], "struct<a: struct<b: int>>").show()
  ...
  ValueError: The strcut field string format is: 'field_name:field_type', but got: a: struct<b: int>
  ```

  ```
  >>> spark.createDataFrame([[[1]]], "a: struct<b: int>").show()
  ...
  ValueError: The strcut field string format is: 'field_name:field_type', but got: a: struct<b: int>
  ```

  ```
  >>> spark.createDataFrame([[1]], "a int").show()
  ...
  ValueError: Could not parse datatype: a int
  ```

  **After**
  ```
  >>> spark.createDataFrame([[[1]]], "struct<a: struct<b: int>>").show()
  +---+
  |  a|
  +---+
  |[1]|
  +---+
  ```

  ```
  >>> spark.createDataFrame([[[1]]], "a: struct<b: int>").show()
  +---+
  |  a|
  +---+
  |[1]|
  +---+
  ```

  ```
  >>> spark.createDataFrame([[1]], "a int").show()
  +---+
  |  a|
  +---+
  |  1|
  +---+
  ```

## How was this patch tested?

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18590 from HyukjinKwon/deduplicate-python-ddl.
2017-07-11 22:03:10 +08:00
hyukjinkwon d4d9e17b31 [SPARK-20456][PYTHON][FOLLOWUP] Fix timezone-dependent doctests in unix_timestamp and from_unixtime
## What changes were proposed in this pull request?

This PR proposes to simply ignore the results in examples that are timezone-dependent in `unix_timestamp` and `from_unixtime`.

```
Failed example:
    time_df.select(unix_timestamp('dt', 'yyyy-MM-dd').alias('unix_time')).collect()
Expected:
    [Row(unix_time=1428476400)]
Got:unix_timestamp
    [Row(unix_time=1428418800)]
```

```
Failed example:
    time_df.select(from_unixtime('unix_time').alias('ts')).collect()
Expected:
    [Row(ts=u'2015-04-08 00:00:00')]
Got:
    [Row(ts=u'2015-04-08 16:00:00')]
```

## How was this patch tested?

Manually tested and `./run-tests --modules pyspark-sql`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18597 from HyukjinKwon/SPARK-20456.
2017-07-11 15:23:03 +09:00
Bryan Cutler d03aebbe65 [SPARK-13534][PYSPARK] Using Apache Arrow to increase performance of DataFrame.toPandas
## What changes were proposed in this pull request?
Integrate Apache Arrow with Spark to increase performance of `DataFrame.toPandas`.  This has been done by using Arrow to convert data partitions on the executor JVM to Arrow payload byte arrays where they are then served to the Python process.  The Python DataFrame can then collect the Arrow payloads where they are combined and converted to a Pandas DataFrame.  Data types except complex, date, timestamp, and decimal  are currently supported, otherwise an `UnsupportedOperation` exception is thrown.

Additions to Spark include a Scala package private method `Dataset.toArrowPayload` that will convert data partitions in the executor JVM to `ArrowPayload`s as byte arrays so they can be easily served.  A package private class/object `ArrowConverters` that provide data type mappings and conversion routines.  In Python, a private method `DataFrame._collectAsArrow` is added to collect Arrow payloads and a SQLConf "spark.sql.execution.arrow.enable" can be used in `toPandas()` to enable using Arrow (uses the old conversion by default).

## How was this patch tested?
Added a new test suite `ArrowConvertersSuite` that will run tests on conversion of Datasets to Arrow payloads for supported types.  The suite will generate a Dataset and matching Arrow JSON data, then the dataset is converted to an Arrow payload and finally validated against the JSON data.  This will ensure that the schema and data has been converted correctly.

Added PySpark tests to verify the `toPandas` method is producing equal DataFrames with and without pyarrow.  A roundtrip test to ensure the pandas DataFrame produced by pyspark is equal to a one made directly with pandas.

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Li Jin <ice.xelloss@gmail.com>
Author: Li Jin <li.jin@twosigma.com>
Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #18459 from BryanCutler/toPandas_with_arrow-SPARK-13534.
2017-07-10 15:21:03 -07:00
hyukjinkwon 2bfd5accdc [SPARK-21266][R][PYTHON] Support schema a DDL-formatted string in dapply/gapply/from_json
## What changes were proposed in this pull request?

This PR supports schema in a DDL formatted string for `from_json` in R/Python and `dapply` and `gapply` in R, which are commonly used and/or consistent with Scala APIs.

Additionally, this PR exposes `structType` in R to allow working around in other possible corner cases.

**Python**

`from_json`

```python
from pyspark.sql.functions import from_json

data = [(1, '''{"a": 1}''')]
df = spark.createDataFrame(data, ("key", "value"))
df.select(from_json(df.value, "a INT").alias("json")).show()
```

**R**

`from_json`

```R
df <- sql("SELECT named_struct('name', 'Bob') as people")
df <- mutate(df, people_json = to_json(df$people))
head(select(df, from_json(df$people_json, "name STRING")))
```

`structType.character`

```R
structType("a STRING, b INT")
```

`dapply`

```R
dapply(createDataFrame(list(list(1.0)), "a"), function(x) {x}, "a DOUBLE")
```

`gapply`

```R
gapply(createDataFrame(list(list(1.0)), "a"), "a", function(key, x) { x }, "a DOUBLE")
```

## How was this patch tested?

Doc tests for `from_json` in Python and unit tests `test_sparkSQL.R` in R.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18498 from HyukjinKwon/SPARK-21266.
2017-07-10 10:40:03 -07:00
Michael Patterson f5f02d213d [SPARK-20456][DOCS] Add examples for functions collection for pyspark
## What changes were proposed in this pull request?

This adds documentation to many functions in pyspark.sql.functions.py:
`upper`, `lower`, `reverse`, `unix_timestamp`, `from_unixtime`, `rand`, `randn`, `collect_list`, `collect_set`, `lit`
Add units to the trigonometry functions.
Renames columns in datetime examples to be more informative.
Adds links between some functions.

## How was this patch tested?

`./dev/lint-python`
`python python/pyspark/sql/functions.py`
`./python/run-tests.py --module pyspark-sql`

Author: Michael Patterson <map222@gmail.com>

Closes #17865 from map222/spark-20456.
2017-07-07 23:59:34 -07:00
Takuya UESHIN 53c2eb59b2 [SPARK-21327][SQL][PYSPARK] ArrayConstructor should handle an array of typecode 'l' as long rather than int in Python 2.
## What changes were proposed in this pull request?

Currently `ArrayConstructor` handles an array of typecode `'l'` as `int` when converting Python object in Python 2 into Java object, so if the value is larger than `Integer.MAX_VALUE` or smaller than `Integer.MIN_VALUE` then the overflow occurs.

```python
import array
data = [Row(longarray=array.array('l', [-9223372036854775808, 0, 9223372036854775807]))]
df = spark.createDataFrame(data)
df.show(truncate=False)
```

```
+----------+
|longarray |
+----------+
|[0, 0, -1]|
+----------+
```

This should be:

```
+----------------------------------------------+
|longarray                                     |
+----------------------------------------------+
|[-9223372036854775808, 0, 9223372036854775807]|
+----------------------------------------------+
```

## How was this patch tested?

Added a test and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18553 from ueshin/issues/SPARK-21327.
2017-07-07 14:05:22 +09:00
Jeff Zhang 742da08685 [SPARK-19439][PYSPARK][SQL] PySpark's registerJavaFunction Should Support UDAFs
## What changes were proposed in this pull request?

Support register Java UDAFs in PySpark so that user can use Java UDAF in PySpark. Besides that I also add api in `UDFRegistration`

## How was this patch tested?

Unit test is added

Author: Jeff Zhang <zjffdu@apache.org>

Closes #17222 from zjffdu/SPARK-19439.
2017-07-05 10:59:10 -07:00
hyukjinkwon d492cc5a21 [SPARK-19507][SPARK-21296][PYTHON] Avoid per-record type dispatch in schema verification and improve exception message
## What changes were proposed in this pull request?
**Context**

While reviewing https://github.com/apache/spark/pull/17227, I realised here we type-dispatch per record. The PR itself is fine in terms of performance as is but this prints a prefix, `"obj"` in exception message as below:

```
from pyspark.sql.types import *
schema = StructType([StructField('s', IntegerType(), nullable=False)])
spark.createDataFrame([["1"]], schema)
...
TypeError: obj.s: IntegerType can not accept object '1' in type <type 'str'>
```

I suggested to get rid of this but during investigating this, I realised my approach might bring a performance regression as it is a hot path.

Only for SPARK-19507 and https://github.com/apache/spark/pull/17227, It needs more changes to cleanly get rid of the prefix and I rather decided to fix both issues together.

**Propersal**

This PR tried to

  - get rid of per-record type dispatch as we do in many code paths in Scala  so that it improves the performance (roughly ~25% improvement) - SPARK-21296

    This was tested with a simple code `spark.createDataFrame(range(1000000), "int")`. However, I am quite sure the actual improvement in practice is larger than this, in particular, when the schema is complicated.

   - improve error message in exception describing field information as prose - SPARK-19507

## How was this patch tested?

Manually tested and unit tests were added in `python/pyspark/sql/tests.py`.

Benchmark - codes: https://gist.github.com/HyukjinKwon/c3397469c56cb26c2d7dd521ed0bc5a3
Error message - codes: https://gist.github.com/HyukjinKwon/b1b2c7f65865444c4a8836435100e398

**Before**

Benchmark:
  - Results: https://gist.github.com/HyukjinKwon/4a291dab45542106301a0c1abcdca924

Error message
  - Results: https://gist.github.com/HyukjinKwon/57b1916395794ce924faa32b14a3fe19

**After**

Benchmark
  - Results: https://gist.github.com/HyukjinKwon/21496feecc4a920e50c4e455f836266e

Error message
  - Results: https://gist.github.com/HyukjinKwon/7a494e4557fe32a652ce1236e504a395

Closes #17227

Author: hyukjinkwon <gurwls223@gmail.com>
Author: David Gingrich <david@textio.com>

Closes #18521 from HyukjinKwon/python-type-dispatch.
2017-07-04 20:45:58 +08:00
hyukjinkwon a848d552ef [SPARK-21264][PYTHON] Call cross join path in join without 'on' and with 'how'
## What changes were proposed in this pull request?

Currently, it throws a NPE when missing columns but join type is speicified in join at PySpark as below:

```python
spark.conf.set("spark.sql.crossJoin.enabled", "false")
spark.range(1).join(spark.range(1), how="inner").show()
```

```
Traceback (most recent call last):
...
py4j.protocol.Py4JJavaError: An error occurred while calling o66.join.
: java.lang.NullPointerException
	at org.apache.spark.sql.Dataset.join(Dataset.scala:931)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
```

```python
spark.conf.set("spark.sql.crossJoin.enabled", "true")
spark.range(1).join(spark.range(1), how="inner").show()
```

```
...
py4j.protocol.Py4JJavaError: An error occurred while calling o84.join.
: java.lang.NullPointerException
	at org.apache.spark.sql.Dataset.join(Dataset.scala:931)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
```

This PR suggests to follow Scala's one as below:

```scala
scala> spark.conf.set("spark.sql.crossJoin.enabled", "false")

scala> spark.range(1).join(spark.range(1), Seq.empty[String], "inner").show()
```

```
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Range (0, 1, step=1, splits=Some(8))
and
Range (0, 1, step=1, splits=Some(8))
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
...
```

```scala
scala> spark.conf.set("spark.sql.crossJoin.enabled", "true")

scala> spark.range(1).join(spark.range(1), Seq.empty[String], "inner").show()
```
```
+---+---+
| id| id|
+---+---+
|  0|  0|
+---+---+
```

**After**

```python
spark.conf.set("spark.sql.crossJoin.enabled", "false")
spark.range(1).join(spark.range(1), how="inner").show()
```

```
Traceback (most recent call last):
...
pyspark.sql.utils.AnalysisException: u'Detected cartesian product for INNER join between logical plans\nRange (0, 1, step=1, splits=Some(8))\nand\nRange (0, 1, step=1, splits=Some(8))\nJoin condition is missing or trivial.\nUse the CROSS JOIN syntax to allow cartesian products between these relations.;'
```

```python
spark.conf.set("spark.sql.crossJoin.enabled", "true")
spark.range(1).join(spark.range(1), how="inner").show()
```
```
+---+---+
| id| id|
+---+---+
|  0|  0|
+---+---+
```

## How was this patch tested?

Added tests in `python/pyspark/sql/tests.py`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18484 from HyukjinKwon/SPARK-21264.
2017-07-04 11:35:08 +09:00
Wenchen Fan 838effb98a Revert "[SPARK-13534][PYSPARK] Using Apache Arrow to increase performance of DataFrame.toPandas"
This reverts commit e44697606f.
2017-06-28 14:28:40 +08:00
hyukjinkwon 7525ce98b4 [SPARK-20431][SS][FOLLOWUP] Specify a schema by using a DDL-formatted string in DataStreamReader
## What changes were proposed in this pull request?

This pr supported a DDL-formatted string in `DataStreamReader.schema`.
This fix could make users easily define a schema without importing the type classes.

For example,

```scala
scala> spark.readStream.schema("col0 INT, col1 DOUBLE").load("/tmp/abc").printSchema()
root
 |-- col0: integer (nullable = true)
 |-- col1: double (nullable = true)
```

## How was this patch tested?

Added tests in `DataStreamReaderWriterSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18373 from HyukjinKwon/SPARK-20431.
2017-06-24 11:39:41 +08:00
Bryan Cutler e44697606f [SPARK-13534][PYSPARK] Using Apache Arrow to increase performance of DataFrame.toPandas
## What changes were proposed in this pull request?
Integrate Apache Arrow with Spark to increase performance of `DataFrame.toPandas`.  This has been done by using Arrow to convert data partitions on the executor JVM to Arrow payload byte arrays where they are then served to the Python process.  The Python DataFrame can then collect the Arrow payloads where they are combined and converted to a Pandas DataFrame.  All non-complex data types are currently supported, otherwise an `UnsupportedOperation` exception is thrown.

Additions to Spark include a Scala package private method `Dataset.toArrowPayloadBytes` that will convert data partitions in the executor JVM to `ArrowPayload`s as byte arrays so they can be easily served.  A package private class/object `ArrowConverters` that provide data type mappings and conversion routines.  In Python, a public method `DataFrame.collectAsArrow` is added to collect Arrow payloads and an optional flag in `toPandas(useArrow=False)` to enable using Arrow (uses the old conversion by default).

## How was this patch tested?
Added a new test suite `ArrowConvertersSuite` that will run tests on conversion of Datasets to Arrow payloads for supported types.  The suite will generate a Dataset and matching Arrow JSON data, then the dataset is converted to an Arrow payload and finally validated against the JSON data.  This will ensure that the schema and data has been converted correctly.

Added PySpark tests to verify the `toPandas` method is producing equal DataFrames with and without pyarrow.  A roundtrip test to ensure the pandas DataFrame produced by pyspark is equal to a one made directly with pandas.

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Li Jin <ice.xelloss@gmail.com>
Author: Li Jin <li.jin@twosigma.com>
Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #15821 from BryanCutler/wip-toPandas_with_arrow-SPARK-13534.
2017-06-23 09:01:13 +08:00
hyukjinkwon 67c75021c5 [SPARK-21163][SQL] DataFrame.toPandas should respect the data type
## What changes were proposed in this pull request?

Currently we convert a spark DataFrame to Pandas Dataframe by `pd.DataFrame.from_records`. It infers the data type from the data and doesn't respect the spark DataFrame Schema. This PR fixes it.

## How was this patch tested?

a new regression test

Author: hyukjinkwon <gurwls223@gmail.com>
Author: Wenchen Fan <wenchen@databricks.com>
Author: Wenchen Fan <cloud0fan@gmail.com>

Closes #18378 from cloud-fan/to_pandas.
2017-06-22 16:22:02 +08:00
zero323 215281d88e [SPARK-20830][PYSPARK][SQL] Add posexplode and posexplode_outer
## What changes were proposed in this pull request?

Add Python wrappers for `o.a.s.sql.functions.explode_outer` and `o.a.s.sql.functions.posexplode_outer`.

## How was this patch tested?

Unit tests, doctests.

Author: zero323 <zero323@users.noreply.github.com>

Closes #18049 from zero323/SPARK-20830.
2017-06-21 14:59:52 -07:00
Yong Tang e5387018e7 [SPARK-19975][PYTHON][SQL] Add map_keys and map_values functions to Python
## What changes were proposed in this pull request?

This fix tries to address the issue in SPARK-19975 where we
have `map_keys` and `map_values` functions in SQL yet there
is no Python equivalent functions.

This fix adds `map_keys` and `map_values` functions to Python.

## How was this patch tested?

This fix is tested manually (See Python docs for examples).

Author: Yong Tang <yong.tang.github@outlook.com>

Closes #17328 from yongtang/SPARK-19975.
2017-06-19 11:40:07 -07:00
Xiao Li 2051428173 [SPARK-20980][SQL] Rename wholeFile to multiLine for both CSV and JSON
### What changes were proposed in this pull request?
The current option name `wholeFile` is misleading for CSV users. Currently, it is not representing a record per file. Actually, one file could have multiple records. Thus, we should rename it. Now, the proposal is `multiLine`.

### How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #18202 from gatorsmile/renameCVSOption.
2017-06-15 13:18:19 +08:00
Reynold Xin b78e3849b2 [SPARK-21042][SQL] Document Dataset.union is resolution by position
## What changes were proposed in this pull request?
Document Dataset.union is resolution by position, not by name, since this has been a confusing point for a lot of users.

## How was this patch tested?
N/A - doc only change.

Author: Reynold Xin <rxin@databricks.com>

Closes #18256 from rxin/SPARK-21042.
2017-06-09 18:29:33 -07:00
Ruben Berenguel Montoro 6cbc61d107 [SPARK-19732][SQL][PYSPARK] Add fill functions for nulls in bool fields of datasets
## What changes were proposed in this pull request?

Allow fill/replace of NAs with booleans, both in Python and Scala

## How was this patch tested?

Unit tests, doctests

This PR is original work from me and I license this work to the Spark project

Author: Ruben Berenguel Montoro <ruben@mostlymaths.net>
Author: Ruben Berenguel <ruben@mostlymaths.net>

Closes #18164 from rberenguel/SPARK-19732-fillna-bools.
2017-06-03 14:56:42 +09:00
gatorsmile de934e6718 [SPARK-19236][SQL][FOLLOW-UP] Added createOrReplaceGlobalTempView method
### What changes were proposed in this pull request?
This PR does the following tasks:
- Added  since
- Added the Python API
- Added test cases

### How was this patch tested?
Added test cases to both Scala and Python

Author: gatorsmile <gatorsmile@gmail.com>

Closes #18147 from gatorsmile/createOrReplaceGlobalTempView.
2017-05-31 11:38:43 -07:00
Michael Armbrust d935e0a9d9 [SPARK-20844] Remove experimental from Structured Streaming APIs
Now that Structured Streaming has been out for several Spark release and has large production use cases, the `Experimental` label is no longer appropriate.  I've left `InterfaceStability.Evolving` however, as I think we may make a few changes to the pluggable Source & Sink API in Spark 2.3.

Author: Michael Armbrust <michael@databricks.com>

Closes #18065 from marmbrus/streamingGA.
2017-05-26 13:33:23 -07:00
hyukjinkwon 720708ccdd [SPARK-20639][SQL] Add single argument support for to_timestamp in SQL with documentation improvement
## What changes were proposed in this pull request?

This PR proposes three things as below:

- Use casting rules to a timestamp in `to_timestamp` by default (it was `yyyy-MM-dd HH:mm:ss`).

- Support single argument for `to_timestamp` similarly with APIs in other languages.

  For example, the one below works

  ```
  import org.apache.spark.sql.functions._
  Seq("2016-12-31 00:12:00.00").toDF("a").select(to_timestamp(col("a"))).show()
  ```

  prints

  ```
  +----------------------------------------+
  |to_timestamp(`a`, 'yyyy-MM-dd HH:mm:ss')|
  +----------------------------------------+
  |                     2016-12-31 00:12:00|
  +----------------------------------------+
  ```

  whereas this does not work in SQL.

  **Before**

  ```
  spark-sql> SELECT to_timestamp('2016-12-31 00:12:00');
  Error in query: Invalid number of arguments for function to_timestamp; line 1 pos 7
  ```

  **After**

  ```
  spark-sql> SELECT to_timestamp('2016-12-31 00:12:00');
  2016-12-31 00:12:00
  ```

- Related document improvement for SQL function descriptions and other API descriptions accordingly.

  **Before**

  ```
  spark-sql> DESCRIBE FUNCTION extended to_date;
  ...
  Usage: to_date(date_str, fmt) - Parses the `left` expression with the `fmt` expression. Returns null with invalid input.
  Extended Usage:
      Examples:
        > SELECT to_date('2016-12-31', 'yyyy-MM-dd');
         2016-12-31
  ```

  ```
  spark-sql> DESCRIBE FUNCTION extended to_timestamp;
  ...
  Usage: to_timestamp(timestamp, fmt) - Parses the `left` expression with the `format` expression to a timestamp. Returns null with invalid input.
  Extended Usage:
      Examples:
        > SELECT to_timestamp('2016-12-31', 'yyyy-MM-dd');
         2016-12-31 00:00:00.0
  ```

  **After**

  ```
  spark-sql> DESCRIBE FUNCTION extended to_date;
  ...
  Usage:
      to_date(date_str[, fmt]) - Parses the `date_str` expression with the `fmt` expression to
        a date. Returns null with invalid input. By default, it follows casting rules to a date if
        the `fmt` is omitted.

  Extended Usage:
      Examples:
        > SELECT to_date('2009-07-30 04:17:52');
         2009-07-30
        > SELECT to_date('2016-12-31', 'yyyy-MM-dd');
         2016-12-31
  ```

  ```
  spark-sql> DESCRIBE FUNCTION extended to_timestamp;
  ...
   Usage:
      to_timestamp(timestamp[, fmt]) - Parses the `timestamp` expression with the `fmt` expression to
        a timestamp. Returns null with invalid input. By default, it follows casting rules to
        a timestamp if the `fmt` is omitted.

  Extended Usage:
      Examples:
        > SELECT to_timestamp('2016-12-31 00:12:00');
         2016-12-31 00:12:00
        > SELECT to_timestamp('2016-12-31', 'yyyy-MM-dd');
         2016-12-31 00:00:00
  ```

## How was this patch tested?

Added tests in `datetime.sql`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17901 from HyukjinKwon/to_timestamp_arg.
2017-05-12 16:42:58 +08:00
Takeshi Yamamuro 04901dd03a [SPARK-20431][SQL] Specify a schema by using a DDL-formatted string
## What changes were proposed in this pull request?
This pr supported a DDL-formatted string in `DataFrameReader.schema`.
This fix could make users easily define a schema without importing  `o.a.spark.sql.types._`.

## How was this patch tested?
Added tests in `DataFrameReaderWriterSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #17719 from maropu/SPARK-20431.
2017-05-11 11:06:29 -07:00
Josh Rosen 8ddbc431d8 [SPARK-20685] Fix BatchPythonEvaluation bug in case of single UDF w/ repeated arg.
## What changes were proposed in this pull request?

There's a latent corner-case bug in PySpark UDF evaluation where executing a `BatchPythonEvaluation` with a single multi-argument UDF where _at least one argument value is repeated_ will crash at execution with a confusing error.

This problem was introduced in #12057: the code there has a fast path for handling a "batch UDF evaluation consisting of a single Python UDF", but that branch incorrectly assumes that a single UDF won't have repeated arguments and therefore skips the code for unpacking arguments from the input row (whose schema may not necessarily match the UDF inputs due to de-duplication of repeated arguments which occurred in the JVM before sending UDF inputs to Python).

This fix here is simply to remove this special-casing: it turns out that the code in the "multiple UDFs" branch just so happens to work for the single-UDF case because Python treats `(x)` as equivalent to `x`, not as a single-argument tuple.

## How was this patch tested?

New regression test in `pyspark.python.sql.tests` module (tested and confirmed that it fails before my fix).

Author: Josh Rosen <joshrosen@databricks.com>

Closes #17927 from JoshRosen/SPARK-20685.
2017-05-10 16:50:57 -07:00
Felix Cheung af8b6cc823 [SPARK-20689][PYSPARK] python doctest leaking bucketed table
## What changes were proposed in this pull request?

It turns out pyspark doctest is calling saveAsTable without ever dropping them. Since we have separate python tests for bucketed table, and there is no checking of results, there is really no need to run the doctest, other than leaving it as an example in the generated doc

## How was this patch tested?

Jenkins

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #17932 from felixcheung/pytablecleanup.
2017-05-10 09:33:49 -07:00
zero323 f53a820721 [SPARK-16931][PYTHON][SQL] Add Python wrapper for bucketBy
## What changes were proposed in this pull request?

Adds Python wrappers for `DataFrameWriter.bucketBy` and `DataFrameWriter.sortBy` ([SPARK-16931](https://issues.apache.org/jira/browse/SPARK-16931))

## How was this patch tested?

Unit tests covering new feature.

__Note__: Based on work of GregBowyer (f49b9a23468f7af32cb53d2b654272757c151725)

CC HyukjinKwon

Author: zero323 <zero323@users.noreply.github.com>
Author: Greg Bowyer <gbowyer@fastmail.co.uk>

Closes #17077 from zero323/SPARK-16931.
2017-05-08 10:58:27 +08:00
zero323 63d90e7da4 [SPARK-18777][PYTHON][SQL] Return UDF from udf.register
## What changes were proposed in this pull request?

- Move udf wrapping code from `functions.udf` to `functions.UserDefinedFunction`.
- Return wrapped udf from `catalog.registerFunction` and dependent methods.
- Update docstrings in `catalog.registerFunction` and `SQLContext.registerFunction`.
- Unit tests.

## How was this patch tested?

- Existing unit tests and docstests.
- Additional tests covering new feature.

Author: zero323 <zero323@users.noreply.github.com>

Closes #17831 from zero323/SPARK-18777.
2017-05-06 22:28:42 -07:00