Commit graph

223 commits

Author SHA1 Message Date
Takuya UESHIN 568055da93 [SPARK-23054][SQL][PYSPARK][FOLLOWUP] Use sqlType casting when casting PythonUserDefinedType to String.
## What changes were proposed in this pull request?

This is a follow-up of #20246.

If a UDT in Python doesn't have its corresponding Scala UDT, cast to string will be the raw string of the internal value, e.g. `"org.apache.spark.sql.catalyst.expressions.UnsafeArrayDataxxxxxxxx"` if the internal type is `ArrayType`.

This pr fixes it by using its `sqlType` casting.

## How was this patch tested?

Added a test and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #20306 from ueshin/issues/SPARK-23054/fup1.
2018-01-19 11:37:08 +08:00
Tathagata Das 2d41f040a3 [SPARK-23143][SS][PYTHON] Added python API for setting continuous trigger
## What changes were proposed in this pull request?
Self-explanatory.

## How was this patch tested?
New python tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #20309 from tdas/SPARK-23143.
2018-01-18 12:25:52 -08:00
hyukjinkwon 39d244d921 [SPARK-23122][PYTHON][SQL] Deprecate register* for UDFs in SQLContext and Catalog in PySpark
## What changes were proposed in this pull request?

This PR proposes to deprecate `register*` for UDFs in `SQLContext` and `Catalog` in Spark 2.3.0.

These are inconsistent with Scala / Java APIs and also these basically do the same things with `spark.udf.register*`.

Also, this PR moves the logcis from `[sqlContext|spark.catalog].register*` to `spark.udf.register*` and reuse the docstring.

This PR also handles minor doc corrections. It also includes https://github.com/apache/spark/pull/20158

## How was this patch tested?

Manually tested, manually checked the API documentation and tests added to check if deprecated APIs call the aliases correctly.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20288 from HyukjinKwon/deprecate-udf.
2018-01-18 14:51:05 +09:00
gatorsmile b85eb946ac [SPARK-22978][PYSPARK] Register Vectorized UDFs for SQL Statement
## What changes were proposed in this pull request?
Register Vectorized UDFs for SQL Statement. For example,

```Python
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> pandas_udf("integer", PandasUDFType.SCALAR)
... def add_one(x):
...     return x + 1
...
>>> _ = spark.udf.register("add_one", add_one)
>>> spark.sql("SELECT add_one(id) FROM range(3)").collect()
[Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
```

## How was this patch tested?
Added test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20171 from gatorsmile/supportVectorizedUDF.
2018-01-16 20:20:33 +09:00
Bryan Cutler e599837248 [SPARK-23009][PYTHON] Fix for non-str col names to createDataFrame from Pandas
## What changes were proposed in this pull request?

This the case when calling `SparkSession.createDataFrame` using a Pandas DataFrame that has non-str column labels.

The column name conversion logic to handle non-string or unicode in python2 is:
```
if column is not any type of string:
    name = str(column)
else if column is unicode in Python 2:
    name = column.encode('utf-8')
```

## How was this patch tested?

Added a new test with a Pandas DataFrame that has int column labels

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #20210 from BryanCutler/python-createDataFrame-int-col-error-SPARK-23009.
2018-01-10 14:55:24 +09:00
Guilherme Berger 3e40eb3f1f [SPARK-22566][PYTHON] Better error message for _merge_type in Pandas to Spark DF conversion
## What changes were proposed in this pull request?

It provides a better error message when doing `spark_session.createDataFrame(pandas_df)` with no schema and an error occurs in the schema inference due to incompatible types.

The Pandas column names are propagated down and the error message mentions which column had the merging error.

https://issues.apache.org/jira/browse/SPARK-22566

## How was this patch tested?

Manually in the `./bin/pyspark` console, and with new tests: `./python/run-tests`

<img width="873" alt="screen shot 2017-11-21 at 13 29 49" src="https://user-images.githubusercontent.com/3977115/33080121-382274e0-cecf-11e7-808f-057a65bb7b00.png">

I state that the contribution is my original work and that I license the work to the Apache Spark project under the project’s open source license.

Author: Guilherme Berger <gberger@palantir.com>

Closes #19792 from gberger/master.
2018-01-08 14:32:05 +09:00
hyukjinkwon 993f21567a [SPARK-22901][PYTHON][FOLLOWUP] Adds the doc for asNondeterministic for wrapped UDF function
## What changes were proposed in this pull request?

This PR wraps the `asNondeterministic` attribute in the wrapped UDF function to set the docstring properly.

```python
from pyspark.sql.functions import udf
help(udf(lambda x: x).asNondeterministic)
```

Before:

```
Help on function <lambda> in module pyspark.sql.udf:

<lambda> lambda
(END
```

After:

```
Help on function asNondeterministic in module pyspark.sql.udf:

asNondeterministic()
    Updates UserDefinedFunction to nondeterministic.

    .. versionadded:: 2.3
(END)
```

## How was this patch tested?

Manually tested and a simple test was added.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20173 from HyukjinKwon/SPARK-22901-followup.
2018-01-06 23:08:26 +08:00
Li Jin f2dd8b9237 [SPARK-22930][PYTHON][SQL] Improve the description of Vectorized UDFs for non-deterministic cases
## What changes were proposed in this pull request?

Add tests for using non deterministic UDFs in aggregate.

Update pandas_udf docstring w.r.t to determinism.

## How was this patch tested?
test_nondeterministic_udf_in_aggregate

Author: Li Jin <ice.xelloss@gmail.com>

Closes #20142 from icexelloss/SPARK-22930-pandas-udf-deterministic.
2018-01-06 16:11:20 +08:00
gatorsmile 5aadbc929c [SPARK-22939][PYSPARK] Support Spark UDF in registerFunction
## What changes were proposed in this pull request?
```Python
import random
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType
random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()
spark.catalog.registerFunction("random_udf", random_udf, StringType())
spark.sql("SELECT random_udf()").collect()
```

We will get the following error.
```
Py4JError: An error occurred while calling o29.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
```

This PR is to support it.

## How was this patch tested?
WIP

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20137 from gatorsmile/registerFunction.
2018-01-04 21:07:31 +08:00
Bryan Cutler 1c9f95cb77 [SPARK-22530][PYTHON][SQL] Adding Arrow support for ArrayType
## What changes were proposed in this pull request?

This change adds `ArrayType` support for working with Arrow in pyspark when creating a DataFrame, calling `toPandas()`, and using vectorized `pandas_udf`.

## How was this patch tested?

Added new Python unit tests using Array data.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #20114 from BryanCutler/arrow-ArrayType-support-SPARK-22530.
2018-01-02 07:13:27 +09:00
Takuya UESHIN 11a849b3a7 [SPARK-22370][SQL][PYSPARK][FOLLOW-UP] Fix a test failure when xmlrunner is installed.
## What changes were proposed in this pull request?

This is a follow-up pr of #19587.

If `xmlrunner` is installed, `VectorizedUDFTests.test_vectorized_udf_check_config` fails by the following error because the `self` which is a subclass of `unittest.TestCase` in the UDF `check_records_per_batch` can't be pickled anymore.

```
PicklingError: Cannot pickle files that are not opened for reading: w
```

This changes the UDF not to refer the `self`.

## How was this patch tested?

Tested locally.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #20115 from ueshin/issues/SPARK-22370_fup1.
2017-12-29 23:04:28 +09:00
Marco Gaido ff48b1b338 [SPARK-22901][PYTHON] Add deterministic flag to pyspark UDF
## What changes were proposed in this pull request?

In SPARK-20586 the flag `deterministic` was added to Scala UDF, but it is not available for python UDF. This flag is useful for cases when the UDF's code can return different result with the same input. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query. This can lead to unexpected behavior.

This PR adds the deterministic flag, via the `asNondeterministic` method, to let the user mark the function as non-deterministic and therefore avoid the optimizations which might lead to strange behaviors.

## How was this patch tested?

Manual tests:
```
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>> df_br = spark.createDataFrame([{'name': 'hello'}])
>>> import random
>>> udf_random_col =  udf(lambda: int(100*random.random()), IntegerType()).asNondeterministic()
>>> df_br = df_br.withColumn('RAND', udf_random_col())
>>> random.seed(1234)
>>> udf_add_ten =  udf(lambda rand: rand + 10, IntegerType())
>>> df_br.withColumn('RAND_PLUS_TEN', udf_add_ten('RAND')).show()
+-----+----+-------------+
| name|RAND|RAND_PLUS_TEN|
+-----+----+-------------+
|hello|   3|           13|
+-----+----+-------------+

```

Author: Marco Gaido <marcogaido91@gmail.com>
Author: Marco Gaido <mgaido@hortonworks.com>

Closes #19929 from mgaido91/SPARK-22629.
2017-12-26 06:39:40 -08:00
Takuya UESHIN eb386be1ed [SPARK-21552][SQL] Add DecimalType support to ArrowWriter.
## What changes were proposed in this pull request?

Decimal type is not yet supported in `ArrowWriter`.
This is adding the decimal type support.

## How was this patch tested?

Added a test to `ArrowConvertersSuite`.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18754 from ueshin/issues/SPARK-21552.
2017-12-26 21:37:25 +09:00
Takuya UESHIN 13190a4f60 [SPARK-22874][PYSPARK][SQL] Modify checking pandas version to use LooseVersion.
## What changes were proposed in this pull request?

Currently we check pandas version by capturing if `ImportError` for the specific imports is raised or not but we can compare `LooseVersion` of the version strings as the same as we're checking pyarrow version.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #20054 from ueshin/issues/SPARK-22874.
2017-12-22 20:09:51 +09:00
Bryan Cutler 59d52631eb [SPARK-22324][SQL][PYTHON] Upgrade Arrow to 0.8.0
## What changes were proposed in this pull request?

Upgrade Spark to Arrow 0.8.0 for Java and Python.  Also includes an upgrade of Netty to 4.1.17 to resolve dependency requirements.

The highlights that pertain to Spark for the update from Arrow versoin 0.4.1 to 0.8.0 include:

* Java refactoring for more simple API
* Java reduced heap usage and streamlined hot code paths
* Type support for DecimalType, ArrayType
* Improved type casting support in Python
* Simplified type checking in Python

## How was this patch tested?

Existing tests

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #19884 from BryanCutler/arrow-upgrade-080-SPARK-22324.
2017-12-21 20:43:56 +09:00
Takuya UESHIN 64817c423c [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp values for Pandas to respect session timezone
## What changes were proposed in this pull request?

When converting Pandas DataFrame/Series from/to Spark DataFrame using `toPandas()` or pandas udfs, timestamp values behave to respect Python system timezone instead of session timezone.

For example, let's say we use `"America/Los_Angeles"` as session timezone and have a timestamp value `"1970-01-01 00:00:01"` in the timezone. Btw, I'm in Japan so Python timezone would be `"Asia/Tokyo"`.

The timestamp value from current `toPandas()` will be the following:

```
>>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>>> df = spark.createDataFrame([28801], "long").selectExpr("timestamp(value) as ts")
>>> df.show()
+-------------------+
|                 ts|
+-------------------+
|1970-01-01 00:00:01|
+-------------------+

>>> df.toPandas()
                   ts
0 1970-01-01 17:00:01
```

As you can see, the value becomes `"1970-01-01 17:00:01"` because it respects Python timezone.
As we discussed in #18664, we consider this behavior is a bug and the value should be `"1970-01-01 00:00:01"`.

## How was this patch tested?

Added tests and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #19607 from ueshin/issues/SPARK-22395.
2017-11-28 16:45:22 +08:00
Li Jin 7d039e0c0a [SPARK-22409] Introduce function type argument in pandas_udf
## What changes were proposed in this pull request?

* Add a "function type" argument to pandas_udf.
* Add a new public enum class `PandasUdfType` in pyspark.sql.functions
* Refactor udf related code from pyspark.sql.functions to pyspark.sql.udf
* Merge "PythonUdfType" and "PythonEvalType" into a single enum class "PythonEvalType"

Example:
```
from pyspark.sql.functions import pandas_udf, PandasUDFType

pandas_udf('double', PandasUDFType.SCALAR):
def plus_one(v):
    return v + 1
```

## Design doc
https://docs.google.com/document/d/1KlLaa-xJ3oz28xlEJqXyCAHU3dwFYkFs_ixcUXrJNTc/edit

## How was this patch tested?

Added PandasUDFTests

## TODO:
* [x] Implement proper enum type for `PandasUDFType`
* [x] Update documentation
* [x] Add more tests in PandasUDFTests

Author: Li Jin <ice.xelloss@gmail.com>

Closes #19630 from icexelloss/spark-22409-pandas-udf-type.
2017-11-17 16:43:08 +01:00
Bryan Cutler 8f0e88df03 [SPARK-20791][PYTHON][FOLLOWUP] Check for unicode column names in createDataFrame with Arrow
## What changes were proposed in this pull request?

If schema is passed as a list of unicode strings for column names, they should be re-encoded to 'utf-8' to be consistent.  This is similar to the #13097 but for creation of DataFrame using Arrow.

## How was this patch tested?

Added new test of using unicode names for schema.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #19738 from BryanCutler/arrow-createDataFrame-followup-unicode-SPARK-20791.
2017-11-15 23:35:13 +09:00
Bryan Cutler 209b9361ac [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFrame from Pandas
## What changes were proposed in this pull request?

This change uses Arrow to optimize the creation of a Spark DataFrame from a Pandas DataFrame. The input df is sliced according to the default parallelism. The optimization is enabled with the existing conf "spark.sql.execution.arrow.enabled" and is disabled by default.

## How was this patch tested?

Added new unit test to create DataFrame with and without the optimization enabled, then compare results.

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>

Closes #19459 from BryanCutler/arrow-createDataFrame-from_pandas-SPARK-20791.
2017-11-13 13:16:01 +09:00
ptkool d01044233c [SPARK-22456][SQL] Add support for dayofweek function
## What changes were proposed in this pull request?
This PR adds support for a new function called `dayofweek` that returns the day of the week of the given argument as an integer value in the range 1-7, where 1 represents Sunday.

## How was this patch tested?
Unit tests and manual tests.

Author: ptkool <michael.styles@shopify.com>

Closes #19672 from ptkool/day_of_week_function.
2017-11-09 14:44:39 +09:00
Bryan Cutler 1d341042d6 [SPARK-22417][PYTHON] Fix for createDataFrame from pandas.DataFrame with timestamp
## What changes were proposed in this pull request?

Currently, a pandas.DataFrame that contains a timestamp of type 'datetime64[ns]' when converted to a Spark DataFrame with `createDataFrame` will interpret the values as LongType. This fix will check for a timestamp type and convert it to microseconds which will allow Spark to read as TimestampType.

## How was this patch tested?

Added unit test to verify Spark schema is expected for TimestampType and DateType when created from pandas

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #19646 from BryanCutler/pyspark-non-arrow-createDataFrame-ts-fix-SPARK-22417.
2017-11-07 21:32:37 +01:00
hyukjinkwon 188b47e683 [SPARK-22379][PYTHON] Reduce duplication setUpClass and tearDownClass in PySpark SQL tests
## What changes were proposed in this pull request?

This PR propose to add `ReusedSQLTestCase` which deduplicate `setUpClass` and  `tearDownClass` in `sql/tests.py`.

## How was this patch tested?

Jenkins tests and manual tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19595 from HyukjinKwon/reduce-dupe.
2017-10-30 11:50:22 +09:00
Takuya UESHIN 4c5269f1aa [SPARK-22370][SQL][PYSPARK] Config values should be captured in Driver.
## What changes were proposed in this pull request?

`ArrowEvalPythonExec` and `FlatMapGroupsInPandasExec` are refering config values of `SQLConf` in function for `mapPartitions`/`mapPartitionsInternal`, but we should capture them in Driver.

## How was this patch tested?

Added a test and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #19587 from ueshin/issues/SPARK-22370.
2017-10-28 18:33:09 +01:00
Bryan Cutler 17af727e38 [SPARK-21375][PYSPARK][SQL] Add Date and Timestamp support to ArrowConverters for toPandas() Conversion
## What changes were proposed in this pull request?

Adding date and timestamp support with Arrow for `toPandas()` and `pandas_udf`s.  Timestamps are stored in Arrow as UTC and manifested to the user as timezone-naive localized to the Python system timezone.

## How was this patch tested?

Added Scala tests for date and timestamp types under ArrowConverters, ArrowUtils, and ArrowWriter suites.  Added Python tests for `toPandas()` and `pandas_udf`s with date and timestamp types.

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18664 from BryanCutler/arrow-date-timestamp-SPARK-21375.
2017-10-26 23:02:46 -07:00
Takuya UESHIN b8624b06e5 [SPARK-20396][SQL][PYSPARK][FOLLOW-UP] groupby().apply() with pandas udf
## What changes were proposed in this pull request?

This is a follow-up of #18732.
This pr modifies `GroupedData.apply()` method to convert pandas udf to grouped udf implicitly.

## How was this patch tested?

Exisiting tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #19517 from ueshin/issues/SPARK-20396/fup2.
2017-10-20 12:44:30 -07:00
Li Jin bfc7e1fe1a [SPARK-20396][SQL][PYSPARK] groupby().apply() with pandas udf
## What changes were proposed in this pull request?

This PR adds an apply() function on df.groupby(). apply() takes a pandas udf that is a transformation on `pandas.DataFrame` -> `pandas.DataFrame`.

Static schema
-------------------
```
schema = df.schema

pandas_udf(schema)
def normalize(df):
    df = df.assign(v1 = (df.v1 - df.v1.mean()) / df.v1.std()
    return df

df.groupBy('id').apply(normalize)
```
Dynamic schema
-----------------------
**This use case is removed from the PR and we will discuss this as a follow up. See discussion https://github.com/apache/spark/pull/18732#pullrequestreview-66583248**

Another example to use pd.DataFrame dtypes as output schema of the udf:

```
sample_df = df.filter(df.id == 1).toPandas()

def foo(df):
      ret = # Some transformation on the input pd.DataFrame
      return ret

foo_udf = pandas_udf(foo, foo(sample_df).dtypes)

df.groupBy('id').apply(foo_udf)
```
In interactive use case, user usually have a sample pd.DataFrame to test function `foo` in their notebook. Having been able to use `foo(sample_df).dtypes` frees user from specifying the output schema of `foo`.

Design doc: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md

## How was this patch tested?
* Added GroupbyApplyTest

Author: Li Jin <ice.xelloss@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Author: Bryan Cutler <cutlerb@gmail.com>

Closes #18732 from icexelloss/groupby-apply-SPARK-20396.
2017-10-11 07:32:01 +09:00
Takuya UESHIN af8a34c787 [SPARK-22159][SQL][FOLLOW-UP] Make config names consistently end with "enabled".
## What changes were proposed in this pull request?

This is a follow-up of #19384.

In the previous pr, only definitions of the config names were modified, but we also need to modify the names in runtime or tests specified as string literal.

## How was this patch tested?

Existing tests but modified the config names.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #19462 from ueshin/issues/SPARK-22159/fup1.
2017-10-09 22:35:34 -07:00
Bryan Cutler d8e825e3bc [SPARK-22106][PYSPARK][SQL] Disable 0-parameter pandas_udf and add doctests
## What changes were proposed in this pull request?

This change disables the use of 0-parameter pandas_udfs due to the API being overly complex and awkward, and can easily be worked around by using an index column as an input argument.  Also added doctests for pandas_udfs which revealed bugs for handling empty partitions and using the pandas_udf decorator.

## How was this patch tested?

Reworked existing 0-parameter test to verify error is raised, added doctest for pandas_udf, added new tests for empty partition and decorator usage.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #19325 from BryanCutler/arrow-pandas_udf-0-param-remove-SPARK-22106.
2017-09-26 10:54:00 +09:00
Liang-Chi Hsieh 3e6a714c9e [SPARK-21766][PYSPARK][SQL] DataFrame toPandas() raises ValueError with nullable int columns
## What changes were proposed in this pull request?

When calling `DataFrame.toPandas()` (without Arrow enabled), if there is a `IntegralType` column (`IntegerType`, `ShortType`, `ByteType`) that has null values the following exception is thrown:

    ValueError: Cannot convert non-finite values (NA or inf) to integer

This is because the null values first get converted to float NaN during the construction of the Pandas DataFrame in `from_records`, and then it is attempted to be converted back to to an integer where it fails.

The fix is going to check if the Pandas DataFrame can cause such failure when converting, if so, we don't do the conversion and use the inferred type by Pandas.

Closes #18945

## How was this patch tested?

Added pyspark test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19319 from viirya/SPARK-21766.
2017-09-22 22:39:47 +09:00
Bryan Cutler 27fc536d9a [SPARK-21190][PYSPARK] Python Vectorized UDFs
This PR adds vectorized UDFs to the Python API

**Proposed API**
Introduce a flag to turn on vectorization for a defined UDF, for example:

```
pandas_udf(DoubleType())
def plus(a, b)
    return a + b
```
or

```
plus = pandas_udf(lambda a, b: a + b, DoubleType())
```
Usage is the same as normal UDFs

0-parameter UDFs
pandas_udf functions can declare an optional `**kwargs` and when evaluated, will contain a key "size" that will give the required length of the output.  For example:

```
pandas_udf(LongType())
def f0(**kwargs):
    return pd.Series(1).repeat(kwargs["size"])

df.select(f0())
```

Added new unit tests in pyspark.sql that are enabled if pyarrow and Pandas are available.

- [x] Fix support for promoted types with null values
- [ ] Discuss 0-param UDF API (use of kwargs)
- [x] Add tests for chained UDFs
- [ ] Discuss behavior when pyarrow not installed / enabled
- [ ] Cleanup pydoc and add user docs

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18659 from BryanCutler/arrow-vectorized-udfs-SPARK-21404.
2017-09-22 16:17:50 +08:00
Peter Szalai 520d92a191 [SPARK-20098][PYSPARK] dataType's typeName fix
## What changes were proposed in this pull request?
`typeName`  classmethod has been fixed by using type -> typeName map.

## How was this patch tested?
local build

Author: Peter Szalai <szalaipeti.vagyok@gmail.com>

Closes #17435 from szalai1/datatype-gettype-fix.
2017-09-10 17:47:45 +09:00
hyukjinkwon 8598d03a00 [SPARK-15243][ML][SQL][PYTHON] Add missing support for unicode in Param methods & functions in dataframe
## What changes were proposed in this pull request?

This PR proposes to support unicodes in Param methods in ML, other missed functions in DataFrame.

For example, this causes a `ValueError` in Python 2.x when param is a unicode string:

```python
>>> from pyspark.ml.classification import LogisticRegression
>>> lr = LogisticRegression()
>>> lr.hasParam("threshold")
True
>>> lr.hasParam(u"threshold")
Traceback (most recent call last):
 ...
    raise TypeError("hasParam(): paramName must be a string")
TypeError: hasParam(): paramName must be a string
```

This PR is based on https://github.com/apache/spark/pull/13036

## How was this patch tested?

Unit tests in `python/pyspark/ml/tests.py` and `python/pyspark/sql/tests.py`.

Author: hyukjinkwon <gurwls223@gmail.com>
Author: sethah <seth.hendrickson16@gmail.com>

Closes #17096 from HyukjinKwon/SPARK-15243.
2017-09-08 11:57:33 -07:00
Takuya UESHIN 57bc1e9eb4 [SPARK-21950][SQL][PYTHON][TEST] pyspark.sql.tests.SQLTests2 should stop SparkContext.
## What changes were proposed in this pull request?

`pyspark.sql.tests.SQLTests2` doesn't stop newly created spark context in the test and it might affect the following tests.
This pr makes `pyspark.sql.tests.SQLTests2` stop `SparkContext`.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #19158 from ueshin/issues/SPARK-21950.
2017-09-08 14:26:07 +09:00
hyukjinkwon 648a8626b8 [SPARK-21789][PYTHON] Remove obsolete codes for parsing abstract schema strings
## What changes were proposed in this pull request?

This PR proposes to remove private functions that look not used in the main codes, `_split_schema_abstract`, `_parse_field_abstract`, `_parse_schema_abstract` and `_infer_schema_type`.

## How was this patch tested?

Existing tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18647 from HyukjinKwon/remove-abstract.
2017-09-01 13:09:24 +09:00
hyukjinkwon 5cd8ea99f0 [SPARK-21779][PYTHON] Simpler DataFrame.sample API in Python
## What changes were proposed in this pull request?

This PR make `DataFrame.sample(...)` can omit `withReplacement` defaulting `False`, consistently with equivalent Scala / Java API.

In short, the following examples are allowed:

```python
>>> df = spark.range(10)
>>> df.sample(0.5).count()
7
>>> df.sample(fraction=0.5).count()
3
>>> df.sample(0.5, seed=42).count()
5
>>> df.sample(fraction=0.5, seed=42).count()
5
```

In addition, this PR also adds some type checking logics as below:

```python
>>> df = spark.range(10)
>>> df.sample().count()
...
TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got [].
>>> df.sample(True).count()
...
TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got [<type 'bool'>].
>>> df.sample(42).count()
...
TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got [<type 'int'>].
>>> df.sample(fraction=False, seed="a").count()
...
TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got [<type 'bool'>, <type 'str'>].
>>> df.sample(seed=[1]).count()
...
TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got [<type 'list'>].
>>> df.sample(withReplacement="a", fraction=0.5, seed=1)
...
TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got [<type 'str'>, <type 'float'>, <type 'int'>].
```

## How was this patch tested?

Manually tested, unit tests added in doc tests and manually checked the built documentation for Python.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18999 from HyukjinKwon/SPARK-21779.
2017-09-01 13:01:23 +09:00
Liang-Chi Hsieh ecf437a648 [SPARK-21534][SQL][PYSPARK] PickleException when creating dataframe from python row with empty bytearray
## What changes were proposed in this pull request?

`PickleException` is thrown when creating dataframe from python row with empty bytearray

    spark.createDataFrame(spark.sql("select unhex('') as xx").rdd.map(lambda x: {"abc": x.xx})).show()

    net.razorvine.pickle.PickleException: invalid pickle data for bytearray; expected 1 or 2 args, got 0
    	at net.razorvine.pickle.objects.ByteArrayConstructor.construct(ByteArrayConstructor.java
        ...

`ByteArrayConstructor` doesn't deal with empty byte array pickled by Python3.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19085 from viirya/SPARK-21534.
2017-08-31 12:55:38 +09:00
hyukjinkwon dc5d34d8dc [SPARK-19165][PYTHON][SQL] PySpark APIs using columns as arguments should validate input types for column
## What changes were proposed in this pull request?

While preparing to take over https://github.com/apache/spark/pull/16537, I realised a (I think) better approach to make the exception handling in one point.

This PR proposes to fix `_to_java_column` in `pyspark.sql.column`, which most of functions in `functions.py` and some other APIs use. This `_to_java_column` basically looks not working with other types than `pyspark.sql.column.Column` or string (`str` and `unicode`).

If this is not `Column`, then it calls `_create_column_from_name` which calls `functions.col` within JVM:

42b9eda80e/sql/core/src/main/scala/org/apache/spark/sql/functions.scala (L76)

And it looks we only have `String` one with `col`.

So, these should work:

```python
>>> from pyspark.sql.column import _to_java_column, Column
>>> _to_java_column("a")
JavaObject id=o28
>>> _to_java_column(u"a")
JavaObject id=o29
>>> _to_java_column(spark.range(1).id)
JavaObject id=o33
```

whereas these do not:

```python
>>> _to_java_column(1)
```
```
...
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.lang.Integer]) does not exist
    ...
```

```python
>>> _to_java_column([])
```
```
...
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.util.ArrayList]) does not exist
    ...
```

```python
>>> class A(): pass
>>> _to_java_column(A())
```
```
...
AttributeError: 'A' object has no attribute '_get_object_id'
```

Meaning most of functions using `_to_java_column` such as `udf` or `to_json` or some other APIs throw an exception as below:

```python
>>> from pyspark.sql.functions import udf
>>> udf(lambda x: x)(None)
```

```
...
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.col.
: java.lang.NullPointerException
    ...
```

```python
>>> from pyspark.sql.functions import to_json
>>> to_json(None)
```

```
...
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.col.
: java.lang.NullPointerException
    ...
```

**After this PR**:

```python
>>> from pyspark.sql.functions import udf
>>> udf(lambda x: x)(None)
...
```

```
TypeError: Invalid argument, not a string or column: None of type <type 'NoneType'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' functions.
```

```python
>>> from pyspark.sql.functions import to_json
>>> to_json(None)
```

```
...
TypeError: Invalid argument, not a string or column: None of type <type 'NoneType'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' functions.
```

## How was this patch tested?

Unit tests added in `python/pyspark/sql/tests.py` and manual tests.

Author: hyukjinkwon <gurwls223@gmail.com>
Author: zero323 <zero323@users.noreply.github.com>

Closes #19027 from HyukjinKwon/SPARK-19165.
2017-08-24 20:29:03 +09:00
Nicholas Chammas 9660831050 [SPARK-21712][PYSPARK] Clarify type error for Column.substr()
Proposed changes:
* Clarify the type error that `Column.substr()` gives.

Test plan:
* Tested this manually.
* Test code:
    ```python
    from pyspark.sql.functions import col, lit
    spark.createDataFrame([['nick']], schema=['name']).select(col('name').substr(0, lit(1)))
    ```
* Before:
    ```
    TypeError: Can not mix the type
    ```
* After:
    ```
    TypeError: startPos and length must be the same type. Got <class 'int'> and
    <class 'pyspark.sql.column.Column'>, respectively.
    ```

Author: Nicholas Chammas <nicholas.chammas@gmail.com>

Closes #18926 from nchammas/SPARK-21712-substr-type-error.
2017-08-16 11:19:15 +09:00
bravo-zhang 84454d7d33 [SPARK-14932][SQL] Allow DataFrame.replace() to replace values with None
## What changes were proposed in this pull request?

Currently `df.na.replace("*", Map[String, String]("NULL" -> null))` will produce exception.
This PR enables passing null/None as value in the replacement map in DataFrame.replace().
Note that the replacement map keys and values should still be the same type, while the values can have a mix of null/None and that type.
This PR enables following operations for example:
`df.na.replace("*", Map[String, String]("NULL" -> null))`(scala)
`df.na.replace("*", Map[Any, Any](60 -> null, 70 -> 80))`(scala)
`df.na.replace('Alice', None)`(python)
`df.na.replace([10, 20])`(python, replacing with None is by default)
One use case could be: I want to replace all the empty strings with null/None because they were incorrectly generated and then drop all null/None data
`df.na.replace("*", Map("" -> null)).na.drop()`(scala)
`df.replace(u'', None).dropna()`(python)

## How was this patch tested?

Scala unit test.
Python doctest and unit test.

Author: bravo-zhang <mzhang1230@gmail.com>

Closes #18820 from bravo-zhang/spark-14932.
2017-08-09 17:42:21 -07:00
hyukjinkwon b56f79cc35 [SPARK-20090][PYTHON] Add StructType.fieldNames in PySpark
## What changes were proposed in this pull request?

This PR proposes `StructType.fieldNames` that returns a copy of a field name list rather than a (undocumented) `StructType.names`.

There are two points here:

  - API consistency with Scala/Java

  - Provide a safe way to get the field names. Manipulating these might cause unexpected behaviour as below:

    ```python
    from pyspark.sql.types import *

    struct = StructType([StructField("f1", StringType(), True)])
    names = struct.names
    del names[0]
    spark.createDataFrame([{"f1": 1}], struct).show()
    ```

    ```
    ...
    java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 1 fields are required while 0 values are provided.
    	at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:138)
    	at org.apache.spark.sql.SparkSession$$anonfun$6.apply(SparkSession.scala:741)
    	at org.apache.spark.sql.SparkSession$$anonfun$6.apply(SparkSession.scala:741)
    ...
    ```

## How was this patch tested?

Added tests in `python/pyspark/sql/tests.py`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18618 from HyukjinKwon/SPARK-20090.
2017-07-28 20:59:32 -07:00
Takuya UESHIN 2ff35a057e [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and add ArrayType and StructType support.
## What changes were proposed in this pull request?

This is a refactoring of `ArrowConverters` and related classes.

1. Refactor `ColumnWriter` as `ArrowWriter`.
2. Add `ArrayType` and `StructType` support.
3. Refactor `ArrowConverters` to skip intermediate `ArrowRecordBatch` creation.

## How was this patch tested?

Added some tests and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18655 from ueshin/issues/SPARK-21440.
2017-07-27 19:19:51 +08:00
Xiang Gao b7a40f64e6 [SPARK-16542][SQL][PYSPARK] Fix bugs about types that result an array of null when creating DataFrame using python
## What changes were proposed in this pull request?
This is the reopen of https://github.com/apache/spark/pull/14198, with merge conflicts resolved.

ueshin Could you please take a look at my code?

Fix bugs about types that result an array of null when creating DataFrame using python.

Python's array.array have richer type than python itself, e.g. we can have `array('f',[1,2,3])` and `array('d',[1,2,3])`. Codes in spark-sql and pyspark didn't take this into consideration which might cause a problem that you get an array of null values when you have `array('f')` in your rows.

A simple code to reproduce this bug is:

```
from pyspark import SparkContext
from pyspark.sql import SQLContext,Row,DataFrame
from array import array

sc = SparkContext()
sqlContext = SQLContext(sc)

row1 = Row(floatarray=array('f',[1,2,3]), doublearray=array('d',[1,2,3]))
rows = sc.parallelize([ row1 ])
df = sqlContext.createDataFrame(rows)
df.show()
```

which have output

```
+---------------+------------------+
|    doublearray|        floatarray|
+---------------+------------------+
|[1.0, 2.0, 3.0]|[null, null, null]|
+---------------+------------------+
```

## How was this patch tested?

New test case added

Author: Xiang Gao <qasdfgtyuiop@gmail.com>
Author: Gao, Xiang <qasdfgtyuiop@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18444 from zasdfgbnm/fix_array_infer.
2017-07-20 12:46:06 +09:00
hyukjinkwon 4ce735eed1 [SPARK-21394][SPARK-21432][PYTHON] Reviving callable object/partial function support in UDF in PySpark
## What changes were proposed in this pull request?

This PR proposes to avoid `__name__` in the tuple naming the attributes assigned directly from the wrapped function to the wrapper function, and use `self._name` (`func.__name__` or `obj.__class__.name__`).

After SPARK-19161, we happened to break callable objects as UDFs in Python as below:

```python
from pyspark.sql import functions

class F(object):
    def __call__(self, x):
        return x

foo = F()
udf = functions.udf(foo)
```

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/sql/functions.py", line 2142, in udf
    return _udf(f=f, returnType=returnType)
  File ".../spark/python/pyspark/sql/functions.py", line 2133, in _udf
    return udf_obj._wrapped()
  File ".../spark/python/pyspark/sql/functions.py", line 2090, in _wrapped
    functools.wraps(self.func)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/functools.py", line 33, in update_wrapper
    setattr(wrapper, attr, getattr(wrapped, attr))
AttributeError: F instance has no attribute '__name__'
```

This worked in Spark 2.1:

```python
from pyspark.sql import functions

class F(object):
    def __call__(self, x):
        return x

foo = F()
udf = functions.udf(foo)
spark.range(1).select(udf("id")).show()
```

```
+-----+
|F(id)|
+-----+
|    0|
+-----+
```

**After**

```python
from pyspark.sql import functions

class F(object):
    def __call__(self, x):
        return x

foo = F()
udf = functions.udf(foo)
spark.range(1).select(udf("id")).show()
```

```
+-----+
|F(id)|
+-----+
|    0|
+-----+
```

_In addition, we also happened to break partial functions as below_:

```python
from pyspark.sql import functions
from functools import partial

partial_func = partial(lambda x: x, x=1)
udf = functions.udf(partial_func)
```

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/sql/functions.py", line 2154, in udf
    return _udf(f=f, returnType=returnType)
  File ".../spark/python/pyspark/sql/functions.py", line 2145, in _udf
    return udf_obj._wrapped()
  File ".../spark/python/pyspark/sql/functions.py", line 2099, in _wrapped
    functools.wraps(self.func, assigned=assignments)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/functools.py", line 33, in update_wrapper
    setattr(wrapper, attr, getattr(wrapped, attr))
AttributeError: 'functools.partial' object has no attribute '__module__'
```

This worked in Spark 2.1:

```python
from pyspark.sql import functions
from functools import partial

partial_func = partial(lambda x: x, x=1)
udf = functions.udf(partial_func)
spark.range(1).select(udf()).show()
```

```
+---------+
|partial()|
+---------+
|        1|
+---------+
```

**After**

```python
from pyspark.sql import functions
from functools import partial

partial_func = partial(lambda x: x, x=1)
udf = functions.udf(partial_func)
spark.range(1).select(udf()).show()
```

```
+---------+
|partial()|
+---------+
|        1|
+---------+
```

## How was this patch tested?

Unit tests in `python/pyspark/sql/tests.py` and manual tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18615 from HyukjinKwon/callable-object.
2017-07-17 00:37:36 -07:00
hyukjinkwon ebc124d4c4 [SPARK-21365][PYTHON] Deduplicate logics parsing DDL type/schema definition
## What changes were proposed in this pull request?

This PR deals with four points as below:

- Reuse existing DDL parser APIs rather than reimplementing within PySpark

- Support DDL formatted string, `field type, field type`.

- Support case-insensitivity for parsing.

- Support nested data types as below:

  **Before**
  ```
  >>> spark.createDataFrame([[[1]]], "struct<a: struct<b: int>>").show()
  ...
  ValueError: The strcut field string format is: 'field_name:field_type', but got: a: struct<b: int>
  ```

  ```
  >>> spark.createDataFrame([[[1]]], "a: struct<b: int>").show()
  ...
  ValueError: The strcut field string format is: 'field_name:field_type', but got: a: struct<b: int>
  ```

  ```
  >>> spark.createDataFrame([[1]], "a int").show()
  ...
  ValueError: Could not parse datatype: a int
  ```

  **After**
  ```
  >>> spark.createDataFrame([[[1]]], "struct<a: struct<b: int>>").show()
  +---+
  |  a|
  +---+
  |[1]|
  +---+
  ```

  ```
  >>> spark.createDataFrame([[[1]]], "a: struct<b: int>").show()
  +---+
  |  a|
  +---+
  |[1]|
  +---+
  ```

  ```
  >>> spark.createDataFrame([[1]], "a int").show()
  +---+
  |  a|
  +---+
  |  1|
  +---+
  ```

## How was this patch tested?

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18590 from HyukjinKwon/deduplicate-python-ddl.
2017-07-11 22:03:10 +08:00
Bryan Cutler d03aebbe65 [SPARK-13534][PYSPARK] Using Apache Arrow to increase performance of DataFrame.toPandas
## What changes were proposed in this pull request?
Integrate Apache Arrow with Spark to increase performance of `DataFrame.toPandas`.  This has been done by using Arrow to convert data partitions on the executor JVM to Arrow payload byte arrays where they are then served to the Python process.  The Python DataFrame can then collect the Arrow payloads where they are combined and converted to a Pandas DataFrame.  Data types except complex, date, timestamp, and decimal  are currently supported, otherwise an `UnsupportedOperation` exception is thrown.

Additions to Spark include a Scala package private method `Dataset.toArrowPayload` that will convert data partitions in the executor JVM to `ArrowPayload`s as byte arrays so they can be easily served.  A package private class/object `ArrowConverters` that provide data type mappings and conversion routines.  In Python, a private method `DataFrame._collectAsArrow` is added to collect Arrow payloads and a SQLConf "spark.sql.execution.arrow.enable" can be used in `toPandas()` to enable using Arrow (uses the old conversion by default).

## How was this patch tested?
Added a new test suite `ArrowConvertersSuite` that will run tests on conversion of Datasets to Arrow payloads for supported types.  The suite will generate a Dataset and matching Arrow JSON data, then the dataset is converted to an Arrow payload and finally validated against the JSON data.  This will ensure that the schema and data has been converted correctly.

Added PySpark tests to verify the `toPandas` method is producing equal DataFrames with and without pyarrow.  A roundtrip test to ensure the pandas DataFrame produced by pyspark is equal to a one made directly with pandas.

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Li Jin <ice.xelloss@gmail.com>
Author: Li Jin <li.jin@twosigma.com>
Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #18459 from BryanCutler/toPandas_with_arrow-SPARK-13534.
2017-07-10 15:21:03 -07:00
Takuya UESHIN 53c2eb59b2 [SPARK-21327][SQL][PYSPARK] ArrayConstructor should handle an array of typecode 'l' as long rather than int in Python 2.
## What changes were proposed in this pull request?

Currently `ArrayConstructor` handles an array of typecode `'l'` as `int` when converting Python object in Python 2 into Java object, so if the value is larger than `Integer.MAX_VALUE` or smaller than `Integer.MIN_VALUE` then the overflow occurs.

```python
import array
data = [Row(longarray=array.array('l', [-9223372036854775808, 0, 9223372036854775807]))]
df = spark.createDataFrame(data)
df.show(truncate=False)
```

```
+----------+
|longarray |
+----------+
|[0, 0, -1]|
+----------+
```

This should be:

```
+----------------------------------------------+
|longarray                                     |
+----------------------------------------------+
|[-9223372036854775808, 0, 9223372036854775807]|
+----------------------------------------------+
```

## How was this patch tested?

Added a test and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18553 from ueshin/issues/SPARK-21327.
2017-07-07 14:05:22 +09:00
Jeff Zhang 742da08685 [SPARK-19439][PYSPARK][SQL] PySpark's registerJavaFunction Should Support UDAFs
## What changes were proposed in this pull request?

Support register Java UDAFs in PySpark so that user can use Java UDAF in PySpark. Besides that I also add api in `UDFRegistration`

## How was this patch tested?

Unit test is added

Author: Jeff Zhang <zjffdu@apache.org>

Closes #17222 from zjffdu/SPARK-19439.
2017-07-05 10:59:10 -07:00
hyukjinkwon d492cc5a21 [SPARK-19507][SPARK-21296][PYTHON] Avoid per-record type dispatch in schema verification and improve exception message
## What changes were proposed in this pull request?
**Context**

While reviewing https://github.com/apache/spark/pull/17227, I realised here we type-dispatch per record. The PR itself is fine in terms of performance as is but this prints a prefix, `"obj"` in exception message as below:

```
from pyspark.sql.types import *
schema = StructType([StructField('s', IntegerType(), nullable=False)])
spark.createDataFrame([["1"]], schema)
...
TypeError: obj.s: IntegerType can not accept object '1' in type <type 'str'>
```

I suggested to get rid of this but during investigating this, I realised my approach might bring a performance regression as it is a hot path.

Only for SPARK-19507 and https://github.com/apache/spark/pull/17227, It needs more changes to cleanly get rid of the prefix and I rather decided to fix both issues together.

**Propersal**

This PR tried to

  - get rid of per-record type dispatch as we do in many code paths in Scala  so that it improves the performance (roughly ~25% improvement) - SPARK-21296

    This was tested with a simple code `spark.createDataFrame(range(1000000), "int")`. However, I am quite sure the actual improvement in practice is larger than this, in particular, when the schema is complicated.

   - improve error message in exception describing field information as prose - SPARK-19507

## How was this patch tested?

Manually tested and unit tests were added in `python/pyspark/sql/tests.py`.

Benchmark - codes: https://gist.github.com/HyukjinKwon/c3397469c56cb26c2d7dd521ed0bc5a3
Error message - codes: https://gist.github.com/HyukjinKwon/b1b2c7f65865444c4a8836435100e398

**Before**

Benchmark:
  - Results: https://gist.github.com/HyukjinKwon/4a291dab45542106301a0c1abcdca924

Error message
  - Results: https://gist.github.com/HyukjinKwon/57b1916395794ce924faa32b14a3fe19

**After**

Benchmark
  - Results: https://gist.github.com/HyukjinKwon/21496feecc4a920e50c4e455f836266e

Error message
  - Results: https://gist.github.com/HyukjinKwon/7a494e4557fe32a652ce1236e504a395

Closes #17227

Author: hyukjinkwon <gurwls223@gmail.com>
Author: David Gingrich <david@textio.com>

Closes #18521 from HyukjinKwon/python-type-dispatch.
2017-07-04 20:45:58 +08:00
hyukjinkwon a848d552ef [SPARK-21264][PYTHON] Call cross join path in join without 'on' and with 'how'
## What changes were proposed in this pull request?

Currently, it throws a NPE when missing columns but join type is speicified in join at PySpark as below:

```python
spark.conf.set("spark.sql.crossJoin.enabled", "false")
spark.range(1).join(spark.range(1), how="inner").show()
```

```
Traceback (most recent call last):
...
py4j.protocol.Py4JJavaError: An error occurred while calling o66.join.
: java.lang.NullPointerException
	at org.apache.spark.sql.Dataset.join(Dataset.scala:931)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
```

```python
spark.conf.set("spark.sql.crossJoin.enabled", "true")
spark.range(1).join(spark.range(1), how="inner").show()
```

```
...
py4j.protocol.Py4JJavaError: An error occurred while calling o84.join.
: java.lang.NullPointerException
	at org.apache.spark.sql.Dataset.join(Dataset.scala:931)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
```

This PR suggests to follow Scala's one as below:

```scala
scala> spark.conf.set("spark.sql.crossJoin.enabled", "false")

scala> spark.range(1).join(spark.range(1), Seq.empty[String], "inner").show()
```

```
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Range (0, 1, step=1, splits=Some(8))
and
Range (0, 1, step=1, splits=Some(8))
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
...
```

```scala
scala> spark.conf.set("spark.sql.crossJoin.enabled", "true")

scala> spark.range(1).join(spark.range(1), Seq.empty[String], "inner").show()
```
```
+---+---+
| id| id|
+---+---+
|  0|  0|
+---+---+
```

**After**

```python
spark.conf.set("spark.sql.crossJoin.enabled", "false")
spark.range(1).join(spark.range(1), how="inner").show()
```

```
Traceback (most recent call last):
...
pyspark.sql.utils.AnalysisException: u'Detected cartesian product for INNER join between logical plans\nRange (0, 1, step=1, splits=Some(8))\nand\nRange (0, 1, step=1, splits=Some(8))\nJoin condition is missing or trivial.\nUse the CROSS JOIN syntax to allow cartesian products between these relations.;'
```

```python
spark.conf.set("spark.sql.crossJoin.enabled", "true")
spark.range(1).join(spark.range(1), how="inner").show()
```
```
+---+---+
| id| id|
+---+---+
|  0|  0|
+---+---+
```

## How was this patch tested?

Added tests in `python/pyspark/sql/tests.py`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18484 from HyukjinKwon/SPARK-21264.
2017-07-04 11:35:08 +09:00
Wenchen Fan 838effb98a Revert "[SPARK-13534][PYSPARK] Using Apache Arrow to increase performance of DataFrame.toPandas"
This reverts commit e44697606f.
2017-06-28 14:28:40 +08:00