Commit graph

280 commits

Author SHA1 Message Date
Maxim Gekk 1d20d13149 [SPARK-25496][SQL] Deprecate from_utc_timestamp and to_utc_timestamp
## What changes were proposed in this pull request?

In the PR, I propose to deprecate the `from_utc_timestamp()` and `to_utc_timestamp`, and disable them by default. The functions can be enabled back via the SQL config `spark.sql.legacy.utcTimestampFunc.enabled`. By default, any calls of the functions throw an analysis exception.

One of the reason for deprecation is functions violate semantic of `TimestampType` which is number of microseconds since epoch in UTC time zone. Shifting microseconds since epoch by time zone offset doesn't make sense because the result doesn't represent microseconds since epoch in UTC time zone any more, and cannot be considered as `TimestampType`.

## How was this patch tested?

The changes were tested by `DateExpressionsSuite` and `DateFunctionsSuite`.

Closes #24195 from MaxGekk/conv-utc-timestamp-deprecate.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-04-03 10:55:56 +08:00
Dongjoon Hyun d575a453db Revert "[SPARK-25496][SQL] Deprecate from_utc_timestamp and to_utc_timestamp"
This reverts commit c5e83ab92c.
2019-04-02 01:05:54 -07:00
Maxim Gekk c5e83ab92c [SPARK-25496][SQL] Deprecate from_utc_timestamp and to_utc_timestamp
## What changes were proposed in this pull request?

In the PR, I propose to deprecate the `from_utc_timestamp()` and `to_utc_timestamp`, and disable them by default. The functions can be enabled back via the SQL config `spark.sql.legacy.utcTimestampFunc.enabled`. By default, any calls of the functions throw an analysis exception.

One of the reason for deprecation is functions violate semantic of `TimestampType` which is number of microseconds since epoch in UTC time zone. Shifting microseconds since epoch by time zone offset doesn't make sense because the result doesn't represent microseconds since epoch in UTC time zone any more, and cannot be considered as `TimestampType`.

## How was this patch tested?

The changes were tested by `DateExpressionsSuite` and `DateFunctionsSuite`.

Closes #24195 from MaxGekk/conv-utc-timestamp-deprecate.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-04-02 10:20:06 +08:00
Maxim Gekk 027ed2d11b [SPARK-23643][CORE][SQL][ML] Shrinking the buffer in hashSeed up to size of the seed parameter
## What changes were proposed in this pull request?

The hashSeed method allocates 64 bytes instead of 8. Other bytes are always zeros (thanks to default behavior of ByteBuffer). And they could be excluded from hash calculation because they don't differentiate inputs.

## How was this patch tested?

By running the existing tests - XORShiftRandomSuite

Closes #20793 from MaxGekk/hash-buff-size.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-23 11:26:09 -05:00
Huon Wilson b67d369572 [SPARK-27099][SQL] Add 'xxhash64' for hashing arbitrary columns to Long
## What changes were proposed in this pull request?

This introduces a new SQL function 'xxhash64' for getting a 64-bit hash of an arbitrary number of columns.

This is designed to exactly mimic the 32-bit `hash`, which uses
MurmurHash3. The name is designed to be more future-proof than the
'hash', by indicating the exact algorithm used, similar to md5 and the
sha hashes.

## How was this patch tested?

The tests for the existing `hash` function were duplicated to run with `xxhash64`.

Closes #24019 from huonw/hash64.

Authored-by: Huon Wilson <Huon.Wilson@data61.csiro.au>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-20 16:34:34 +08:00
Hyukjin Kwon c99463d4cf [SPARK-26979][PYTHON][FOLLOW-UP] Make binary math/string functions take string as columns as well
## What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/23882 to handle binary math/string functions. For instance, see the cases below:

**Before:**

```python
>>> from pyspark.sql.functions import lit, ascii
>>> spark.range(1).select(lit('a').alias("value")).select(ascii("value"))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/functions.py", line 51, in _
    jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
  File "/.../spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
  File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/.../spark/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 332, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.ascii. Trace:
py4j.Py4JException: Method ascii([class java.lang.String]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
	at py4j.Gateway.invoke(Gateway.java:276)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
```

```python
>>> from pyspark.sql.functions import atan2
>>> spark.range(1).select(atan2("id", "id"))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/functions.py", line 78, in _
    jc = getattr(sc._jvm.functions, name)(col1._jc if isinstance(col1, Column) else float(col1),
ValueError: could not convert string to float: id
```

**After:**

```python
>>> from pyspark.sql.functions import lit, ascii
>>> spark.range(1).select(lit('a').alias("value")).select(ascii("value"))
DataFrame[ascii(value): int]
```

```python
>>> from pyspark.sql.functions import atan2
>>> spark.range(1).select(atan2("id", "id"))
DataFrame[ATAN2(id, id): double]
```

Note that,

- This PR causes a slight behaviour changes for math functions. For instance, numbers as strings (e.g., `"1"`) were supported as arguments of binary math functions before. After this PR, it recognises it as column names.

- I also intentionally didn't document this behaviour changes since we're going ahead for Spark 3.0 and I don't think numbers as strings make much sense in math functions.

- There is another exception `when`, which takes string as literal values as below. This PR doeesn't fix this ambiguity.
  ```python
  >>> spark.range(1).select(when(lit(True), col("id"))).show()
  ```

  ```
  +--------------------------+
  |CASE WHEN true THEN id END|
  +--------------------------+
  |                         0|
  +--------------------------+
  ```

  ```python
  >>> spark.range(1).select(when(lit(True), "id")).show()
  ```

  ```
  +--------------------------+
  |CASE WHEN true THEN id END|
  +--------------------------+
  |                        id|
  +--------------------------+
  ```

This PR also fixes as below:

https://github.com/apache/spark/pull/23882 fixed it to:

- Rename `_create_function` to `_create_name_function`
- Define new `_create_function` to take strings as column names.

This PR, I proposes to:

- Revert `_create_name_function` name to `_create_function`.
- Define new `_create_function_over_column` to take strings as column names.

## How was this patch tested?

Some unit tests were added for binary math / string functions.

Closes #24121 from HyukjinKwon/SPARK-26979.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-03-20 08:06:10 +09:00
André Sá de Mello f9180f8752 [SPARK-26979][PYTHON] Add missing string column name support for some SQL functions
## What changes were proposed in this pull request?

Most SQL functions defined in `spark.sql.functions` have two calling patterns, one with a Column object as input, and another with a string representing a column name, which is then converted into a Column object internally.

There are, however, a few notable exceptions:

- lower()
- upper()
- abs()
- bitwiseNOT()
- ltrim()
- rtrim()
- trim()
- ascii()
- base64()
- unbase64()

While this doesn't break anything, as you can easily create a Column object yourself prior to passing it to one of these functions, it has two undesirable consequences:

1. It is surprising - it breaks coder's expectations when they are first starting with Spark. Every API should be as consistent as possible, so as to make the learning curve smoother and to reduce causes for human error;

2. It gets in the way of stylistic conventions. Most of the time it makes Python code more readable to use literal names, and the API provides ample support for that, but these few exceptions prevent this pattern from being universally applicable.

This patch is meant to fix the aforementioned problem.

### Effect

This patch **enables** support for passing column names as input to those functions mentioned above.

### Side effects

This PR also **fixes** an issue with some functions being defined multiple times by using `_create_function()`.

### How it works

`_create_function()` was redefined to always convert the argument to a Column object. The old implementation has been kept under `_create_name_function()`, and is still being used to generate the following special functions:

- lit()
- col()
- column()
- asc()
- desc()
- asc_nulls_first()
- asc_nulls_last()
- desc_nulls_first()
- desc_nulls_last()

This is because these functions can only take a column name as their argument. This is not a problem, as their semantics require so.

## How was this patch tested?

Ran ./dev/run-tests and tested it manually.

Closes #23882 from asmello/col-name-support-pyspark.

Authored-by: André Sá de Mello <amello@palantir.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-17 12:58:16 -05:00
Bryan Cutler ddc2052ebd [SPARK-23836][PYTHON] Add support for StructType return in Scalar Pandas UDF
## What changes were proposed in this pull request?

This change adds support for returning StructType from a scalar Pandas UDF, where the return value of the function is a pandas.DataFrame. Nested structs are not supported and an error will be raised, child types can be any other type currently supported.

## How was this patch tested?

Added additional unit tests to `test_pandas_udf_scalar`

Closes #23900 from BryanCutler/pyspark-support-scalar_udf-StructType-SPARK-23836.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-03-07 08:52:24 -08:00
deepyaman 68496c1af3 [SPARK-26451][SQL] Change lead/lag argument name from count to offset
## What changes were proposed in this pull request?

Change aligns argument name with that in Scala version and documentation.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #23357 from deepyaman/patch-1.

Authored-by: deepyaman <deepyaman.datta@utexas.edu>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2018-12-28 00:02:41 +08:00
Maxim Gekk 7c7fccfeb5 [SPARK-26424][SQL] Use java.time API in date/timestamp expressions
## What changes were proposed in this pull request?

In the PR, I propose to switch the `DateFormatClass`, `ToUnixTimestamp`, `FromUnixTime`, `UnixTime` on java.time API for parsing/formatting dates and timestamps. The API has been already implemented by the `Timestamp`/`DateFormatter` classes. One of benefit is those classes support parsing timestamps with microsecond precision. Old behaviour can be switched on via SQL config: `spark.sql.legacy.timeParser.enabled` (`false` by default).

## How was this patch tested?

It was tested by existing test suites - `DateFunctionsSuite`, `DateExpressionsSuite`, `JsonSuite`, `CsvSuite`, `SQLQueryTestSuite` as well as PySpark tests.

Closes #23358 from MaxGekk/new-time-cast.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-12-27 11:09:50 +08:00
Li Jin 86100df54b [SPARK-24561][SQL][PYTHON] User-defined window aggregation functions with Pandas UDF (bounded window)
## What changes were proposed in this pull request?

This PR implements a new feature - window aggregation Pandas UDF for bounded window.

#### Doc:
https://docs.google.com/document/d/14EjeY5z4-NC27-SmIP9CsMPCANeTcvxN44a7SIJtZPc/edit#heading=h.c87w44wcj3wj

#### Example:
```
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.window import Window

df = spark.range(0, 10, 2).toDF('v')
w1 = Window.partitionBy().orderBy('v').rangeBetween(-2, 4)
w2 = Window.partitionBy().orderBy('v').rowsBetween(-2, 2)

pandas_udf('double', PandasUDFType.GROUPED_AGG)
def avg(v):
    return v.mean()

df.withColumn('v_mean', avg(df['v']).over(w1)).show()
# +---+------+
# |  v|v_mean|
# +---+------+
# |  0|   1.0|
# |  2|   2.0|
# |  4|   4.0|
# |  6|   6.0|
# |  8|   7.0|
# +---+------+

df.withColumn('v_mean', avg(df['v']).over(w2)).show()
# +---+------+
# |  v|v_mean|
# +---+------+
# |  0|   2.0|
# |  2|   3.0|
# |  4|   4.0|
# |  6|   5.0|
# |  8|   6.0|
# +---+------+

```

#### High level changes:

This PR modifies the existing WindowInPandasExec physical node to deal with unbounded (growing, shrinking and sliding) windows.

* `WindowInPandasExec` now share the same base class as `WindowExec` and share utility functions. See `WindowExecBase`
* `WindowFunctionFrame` now has two new functions `currentLowerBound` and `currentUpperBound` - to return the lower and upper window bound for the current output row. It is also modified to allow `AggregateProcessor` == null. Null aggregator processor is used for `WindowInPandasExec` where we don't have an aggregator and only uses lower and upper bound functions from `WindowFunctionFrame`
* The biggest change is in `WindowInPandasExec`, where it is modified to take `currentLowerBound` and `currentUpperBound` and write those values together with the input data to the python process for rolling window aggregation. See `WindowInPandasExec` for more details.

#### Discussion
In benchmarking, I found numpy variant of the rolling window UDF is much faster than the pandas version:

Spark SQL window function: 20s
Pandas variant: ~80s
Numpy variant: 10s
Numpy variant with numba: 4s

Allowing numpy variant of the vectorized UDFs is something I want to discuss because of the performance improvement, but doesn't have to be in this PR.

## How was this patch tested?

New tests

Closes #22305 from icexelloss/SPARK-24561-bounded-window-udf.

Authored-by: Li Jin <ice.xelloss@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2018-12-18 09:15:21 +08:00
Wenchen Fan fa0d4bf699 [SPARK-25829][SQL] remove duplicated map keys with last wins policy
## What changes were proposed in this pull request?

Currently duplicated map keys are not handled consistently. For example, map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc.

This PR proposes to remove duplicated map keys with last wins policy, to follow Java/Scala and Presto. It only applies to built-in functions, as users can create map with duplicated map keys via private APIs anyway.

updated functions: `CreateMap`, `MapFromArrays`, `MapFromEntries`, `StringToMap`, `MapConcat`, `TransformKeys`.

For other places:
1. data source v1 doesn't have this problem, as users need to provide a java/scala map, which can't have duplicated keys.
2. data source v2 may have this problem. I've added a note to `ArrayBasedMapData` to ask the caller to take care of duplicated keys. In the future we should enforce it in the stable data APIs for data source v2.
3. UDF doesn't have this problem, as users need to provide a java/scala map. Same as data source v1.
4. file format. I checked all of them and only parquet does not enforce it. For backward compatibility reasons I change nothing but leave a note saying that the behavior will be undefined if users write map with duplicated keys to parquet files. Maybe we can add a config and fail by default if parquet files have map with duplicated keys. This can be done in followup.

## How was this patch tested?

updated tests and new tests

Closes #23124 from cloud-fan/map.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-11-28 23:42:13 +08:00
Takuya UESHIN 48ea64bf5b [SPARK-26112][SQL] Update since versions of new built-in functions.
## What changes were proposed in this pull request?

The following 5 functions were removed from branch-2.4:

- map_entries
- map_filter
- transform_values
- transform_keys
- map_zip_with

We should update the since version to 3.0.0.

## How was this patch tested?

Existing tests.

Closes #23082 from ueshin/issues/SPARK-26112/since.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-11-19 22:18:20 +08:00
Sean Owen 0025a8397f [SPARK-25908][CORE][SQL] Remove old deprecated items in Spark 3
## What changes were proposed in this pull request?

- Remove some AccumulableInfo .apply() methods
- Remove non-label-specific multiclass precision/recall/fScore in favor of accuracy
- Remove toDegrees/toRadians in favor of degrees/radians (SparkR: only deprecated)
- Remove approxCountDistinct in favor of approx_count_distinct (SparkR: only deprecated)
- Remove unused Python StorageLevel constants
- Remove Dataset unionAll in favor of union
- Remove unused multiclass option in libsvm parsing
- Remove references to deprecated spark configs like spark.yarn.am.port
- Remove TaskContext.isRunningLocally
- Remove ShuffleMetrics.shuffle* methods
- Remove BaseReadWrite.context in favor of session
- Remove Column.!== in favor of =!=
- Remove Dataset.explode
- Remove Dataset.registerTempTable
- Remove SQLContext.getOrCreate, setActive, clearActive, constructors

Not touched yet

- everything else in MLLib
- HiveContext
- Anything deprecated more recently than 2.0.0, generally

## How was this patch tested?

Existing tests

Closes #22921 from srowen/SPARK-25908.

Lead-authored-by: Sean Owen <sean.owen@databricks.com>
Co-authored-by: hyukjinkwon <gurwls223@apache.org>
Co-authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-11-07 22:48:50 -06:00
Maxim Gekk 39399f40b8 [SPARK-25638][SQL] Adding new function - to_csv()
## What changes were proposed in this pull request?

New functions takes a struct and converts it to a CSV strings using passed CSV options. It accepts the same CSV options as CSV data source does.

## How was this patch tested?

Added `CsvExpressionsSuite`, `CsvFunctionsSuite` as well as R, Python and SQL tests similar to tests for `to_json()`

Closes #22626 from MaxGekk/to_csv.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-04 14:57:38 +08:00
hyukjinkwon c9667aff4f [SPARK-25672][SQL] schema_of_csv() - schema inference from an example
## What changes were proposed in this pull request?

In the PR, I propose to add new function - *schema_of_csv()* which infers schema of CSV string literal. The result of the function is a string containing a schema in DDL format. For example:

```sql
select schema_of_csv('1|abc', map('delimiter', '|'))
```
```
struct<_c0:int,_c1:string>
```

## How was this patch tested?

Added new tests to `CsvFunctionsSuite`, `CsvExpressionsSuite` and SQL tests to `csv-functions.sql`

Closes #22666 from MaxGekk/schema_of_csv-function.

Lead-authored-by: hyukjinkwon <gurwls223@apache.org>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-01 09:14:16 +08:00
hyukjinkwon 33e337c118 [SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json's input json as literal only
## What changes were proposed in this pull request?

The main purpose of `schema_of_json` is the usage of combination with `from_json` (to make up the leak of schema inference) which takes its schema only as literal; however, currently `schema_of_json` allows JSON input as non-literal expressions (e.g, column).

This was mistakenly allowed - we don't have to take other usages rather then the main purpose into account for now.

This PR makes a followup to only allow literals for `schema_of_json`'s JSON input. We can allow non literal expressions later when it's needed or there are some usecase for it.

## How was this patch tested?

Unit tests were added.

Closes #22775 from HyukjinKwon/SPARK-25447-followup.

Lead-authored-by: hyukjinkwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-10-26 22:14:43 +08:00
Reynold Xin 89d748b33c [SPARK-25842][SQL] Deprecate rangeBetween APIs introduced in SPARK-21608
## What changes were proposed in this pull request?
See the detailed information at https://issues.apache.org/jira/browse/SPARK-25841 on why these APIs should be deprecated and redesigned.

This patch also reverts 8acb51f08b which applies to 2.4.

## How was this patch tested?
Only deprecation and doc changes.

Closes #22841 from rxin/SPARK-25842.

Authored-by: Reynold Xin <rxin@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-10-26 13:17:24 +08:00
hyukjinkwon 7251be0c04 [SPARK-25798][PYTHON] Internally document type conversion between Pandas data and SQL types in Pandas UDFs
## What changes were proposed in this pull request?

We are facing some problems about type conversions between Pandas data and SQL types in Pandas UDFs.
It's even difficult to identify the problems (see #20163 and #22610).

This PR targets to internally document the type conversion table. Some of them looks buggy and we should fix them.

Table can be generated via the codes below:

```python
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf

columns = [
    ('none', 'object(NoneType)'),
    ('bool', 'bool'),
    ('int8', 'int8'),
    ('int16', 'int16'),
    ('int32', 'int32'),
    ('int64', 'int64'),
    ('uint8', 'uint8'),
    ('uint16', 'uint16'),
    ('uint32', 'uint32'),
    ('uint64', 'uint64'),
    ('float64', 'float16'),
    ('float64', 'float32'),
    ('float64', 'float64'),
    ('date', 'datetime64[ns]'),
    ('tz_aware_dates', 'datetime64[ns, US/Eastern]'),
    ('string', 'object(string)'),
    ('decimal', 'object(Decimal)'),
    ('array', 'object(array[int32])'),
    ('float128', 'float128'),
    ('complex64', 'complex64'),
    ('complex128', 'complex128'),
    ('category', 'category'),
    ('tdeltas', 'timedelta64[ns]'),
]

def create_dataframe():
    import pandas as pd
    import numpy as np
    import decimal
    pdf = pd.DataFrame({
        'none': [None, None],
        'bool': [True, False],
        'int8': np.arange(1, 3).astype('int8'),
        'int16': np.arange(1, 3).astype('int16'),
        'int32': np.arange(1, 3).astype('int32'),
        'int64': np.arange(1, 3).astype('int64'),
        'uint8': np.arange(1, 3).astype('uint8'),
        'uint16': np.arange(1, 3).astype('uint16'),
        'uint32': np.arange(1, 3).astype('uint32'),
        'uint64': np.arange(1, 3).astype('uint64'),
        'float16': np.arange(1, 3).astype('float16'),
        'float32': np.arange(1, 3).astype('float32'),
        'float64': np.arange(1, 3).astype('float64'),
        'float128': np.arange(1, 3).astype('float128'),
        'complex64': np.arange(1, 3).astype('complex64'),
        'complex128': np.arange(1, 3).astype('complex128'),
        'string': list('ab'),
        'array': pd.Series([np.array([1, 2, 3], dtype=np.int32), np.array([1, 2, 3], dtype=np.int32)]),
        'decimal': pd.Series([decimal.Decimal('1'), decimal.Decimal('2')]),
        'date': pd.date_range('19700101', periods=2).values,
        'category': pd.Series(list("AB")).astype('category')})
    pdf['tdeltas'] = [pdf.date.diff()[1], pdf.date.diff()[0]]
    pdf['tz_aware_dates'] = pd.date_range('19700101', periods=2, tz='US/Eastern')
    return pdf

types =  [
    BooleanType(),
    ByteType(),
    ShortType(),
    IntegerType(),
    LongType(),
    FloatType(),
    DoubleType(),
    DateType(),
    TimestampType(),
    StringType(),
    DecimalType(10, 0),
    ArrayType(IntegerType()),
    MapType(StringType(), IntegerType()),
    StructType([StructField("_1", IntegerType())]),
    BinaryType(),
]

df = spark.range(2).repartition(1)
results = []
count = 0
total = len(types) * len(columns)
values = []
spark.sparkContext.setLogLevel("FATAL")
for t in types:
    result = []
    for column, pandas_t in columns:
        v = create_dataframe()[column][0]
        values.append(v)
        try:
            row = df.select(pandas_udf(lambda _: create_dataframe()[column], t)(df.id)).first()
            ret_str = repr(row[0])
        except Exception:
            ret_str = "X"
        result.append(ret_str)
        progress = "SQL Type: [%s]\n  Pandas Value(Type): %s(%s)]\n  Result Python Value: [%s]" % (
            t.simpleString(), v, pandas_t, ret_str)
        count += 1
        print("%s/%s:\n  %s" % (count, total, progress))
    results.append([t.simpleString()] + list(map(str, result)))

schema = ["SQL Type \\ Pandas Value(Type)"] + list(map(lambda values_column: "%s(%s)" % (values_column[0], values_column[1][1]), zip(values, columns)))
strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False)
print("\n".join(map(lambda line: "    # %s  # noqa" % line, strings.strip().split("\n"))))

```

This code is compatible with both Python 2 and 3 but the table was generated under Python 2.

## How was this patch tested?

Manually tested and lint check.

Closes #22795 from HyukjinKwon/SPARK-25798.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2018-10-24 10:04:17 -07:00
Maxim Gekk 4d6704db4d [SPARK-25243][SQL] Use FailureSafeParser in from_json
## What changes were proposed in this pull request?

In the PR, I propose to switch `from_json` on `FailureSafeParser`, and to make the function compatible to `PERMISSIVE` mode by default, and to support the `FAILFAST` mode as well. The `DROPMALFORMED` mode is not supported by `from_json`.

## How was this patch tested?

It was tested by existing `JsonSuite`/`CSVSuite`, `JsonFunctionsSuite` and `JsonExpressionsSuite` as well as new tests for `from_json` which checks different modes.

Closes #22237 from MaxGekk/from_json-failuresafe.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-10-24 19:09:15 +08:00
Maxim Gekk e9af9460bc [SPARK-25393][SQL] Adding new function from_csv()
## What changes were proposed in this pull request?

The PR adds new function `from_csv()` similar to `from_json()` to parse columns with CSV strings. I added the following methods:
```Scala
def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column
```
and this signature to call it from Python, R and Java:
```Scala
def from_csv(e: Column, schema: String, options: java.util.Map[String, String]): Column
```

## How was this patch tested?

Added new test suites `CsvExpressionsSuite`, `CsvFunctionsSuite` and sql tests.

Closes #22379 from MaxGekk/from_csv.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Co-authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-10-17 09:32:05 +08:00
hyukjinkwon a853a80202 [SPARK-25666][PYTHON] Internally document type conversion between Python data and SQL types in normal UDFs
### What changes were proposed in this pull request?

We are facing some problems about type conversions between Python data and SQL types in UDFs (Pandas UDFs as well).
It's even difficult to identify the problems (see https://github.com/apache/spark/pull/20163 and https://github.com/apache/spark/pull/22610).

This PR targets to internally document the type conversion table. Some of them looks buggy and we should fix them.

```python
import sys
import array
import datetime
from decimal import Decimal

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import udf

if sys.version >= '3':
    long = int

data = [
    None,
    True,
    1,
    long(1),
    "a",
    u"a",
    datetime.date(1970, 1, 1),
    datetime.datetime(1970, 1, 1, 0, 0),
    1.0,
    array.array("i", [1]),
    [1],
    (1,),
    bytearray([65, 66, 67]),
    Decimal(1),
    {"a": 1},
    Row(kwargs=1),
    Row("namedtuple")(1),
]

types =  [
    BooleanType(),
    ByteType(),
    ShortType(),
    IntegerType(),
    LongType(),
    StringType(),
    DateType(),
    TimestampType(),
    FloatType(),
    DoubleType(),
    ArrayType(IntegerType()),
    BinaryType(),
    DecimalType(10, 0),
    MapType(StringType(), IntegerType()),
    StructType([StructField("_1", IntegerType())]),
]

df = spark.range(1)
results = []
count = 0
total = len(types) * len(data)
spark.sparkContext.setLogLevel("FATAL")
for t in types:
    result = []
    for v in data:
        try:
            row = df.select(udf(lambda: v, t)()).first()
            ret_str = repr(row[0])
        except Exception:
            ret_str = "X"
        result.append(ret_str)
        progress = "SQL Type: [%s]\n  Python Value: [%s(%s)]\n  Result Python Value: [%s]" % (
            t.simpleString(), str(v), type(v).__name__, ret_str)
        count += 1
        print("%s/%s:\n  %s" % (count, total, progress))
    results.append([t.simpleString()] + list(map(str, result)))

schema = ["SQL Type \\ Python Value(Type)"] + list(map(lambda v: "%s(%s)" % (str(v), type(v).__name__), data))
strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False)
print("\n".join(map(lambda line: "    # %s  # noqa" % line, strings.strip().split("\n"))))
```

This table was generated under Python 2 but the code above is Python 3 compatible as well.

## How was this patch tested?

Manually tested and lint check.

Closes #22655 from HyukjinKwon/SPARK-25666.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-10-08 15:47:15 +08:00
Liang-Chi Hsieh 3eb8429699 [SPARK-25461][PYSPARK][SQL] Add document for mismatch between return type of Pandas.Series and return type of pandas udf
## What changes were proposed in this pull request?

For Pandas UDFs, we get arrow type from defined Catalyst return data type of UDFs. We use this arrow type to do serialization of data. If the defined return data type doesn't match with actual return type of Pandas.Series returned by Pandas UDFs, it has a risk to return incorrect data from Python side.

Currently we don't have reliable approach to check if the data conversion is safe or not. We leave some document to notify this to users for now. When there is next upgrade of PyArrow available we can use to check it, we should add the option to check it.

## How was this patch tested?

Only document change.

Closes #22610 from viirya/SPARK-25461.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-10-07 23:18:46 +08:00
Parker Hegstrom 17781d7530 [SPARK-25202][SQL] Implements split with limit sql function
## What changes were proposed in this pull request?

Adds support for the setting limit in the sql split function

## How was this patch tested?

1. Updated unit tests
2. Tested using Scala spark shell

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #22227 from phegstrom/master.

Authored-by: Parker Hegstrom <phegstrom@palantir.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-10-06 14:30:43 +08:00
gatorsmile 9bf397c0e4 [SPARK-25592] Setting version to 3.0.0-SNAPSHOT
## What changes were proposed in this pull request?

This patch is to bump the master branch version to 3.0.0-SNAPSHOT.

## How was this patch tested?
N/A

Closes #22606 from gatorsmile/bump3.0.

Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2018-10-02 08:48:24 -07:00
Maxim Gekk 1007cae20e [SPARK-25447][SQL] Support JSON options by schema_of_json()
## What changes were proposed in this pull request?

In the PR, I propose to extended the `schema_of_json()` function, and accept JSON options since they can impact on schema inferring. Purpose is to support the same options that `from_json` can use during schema inferring.

## How was this patch tested?

Added SQL, Python and Scala tests (`JsonExpressionsSuite` and `JsonFunctionsSuite`) that checks JSON options are used.

Closes #22442 from MaxGekk/schema_of_json-options.

Authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-09-29 17:53:30 +08:00
Wenchen Fan ff876137fa [SPARK-23715][SQL][DOC] improve document for from/to_utc_timestamp
## What changes were proposed in this pull request?

We have an agreement that the behavior of `from/to_utc_timestamp` is corrected, although the function itself doesn't make much sense in Spark: https://issues.apache.org/jira/browse/SPARK-23715

This PR improves the document.

## How was this patch tested?

N/A

Closes #22543 from cloud-fan/doc.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-09-27 15:02:20 +08:00
Maxim Gekk 473d0d862d [SPARK-25514][SQL] Generating pretty JSON by to_json
## What changes were proposed in this pull request?

The PR introduces new JSON option `pretty` which allows to turn on `DefaultPrettyPrinter` of `Jackson`'s Json generator. New option is useful in exploring of deep nested columns and in converting of JSON columns in more readable representation (look at the added test).

## How was this patch tested?

Added rount trip test which convert an JSON string to pretty representation via `from_json()` and `to_json()`.

Closes #22534 from MaxGekk/pretty-json.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-09-26 09:52:15 +08:00
cclauss 9bb798f2e6 [SPARK-25238][PYTHON] lint-python: Upgrade pycodestyle to v2.4.0
See https://pycodestyle.readthedocs.io/en/latest/developer.html#changes for changes made in this release.

## What changes were proposed in this pull request?

Upgrade pycodestyle to v2.4.0

## How was this patch tested?

__pycodestyle__

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #22231 from cclauss/patch-1.

Authored-by: cclauss <cclauss@bluewin.ch>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-09-14 20:13:07 -05:00
Sean Owen 08c76b5d39 [SPARK-25238][PYTHON] lint-python: Fix W605 warnings for pycodestyle 2.4
(This change is a subset of the changes needed for the JIRA; see https://github.com/apache/spark/pull/22231)

## What changes were proposed in this pull request?

Use raw strings and simpler regex syntax consistently in Python, which also avoids warnings from pycodestyle about accidentally relying Python's non-escaping of non-reserved chars in normal strings. Also, fix a few long lines.

## How was this patch tested?

Existing tests, and some manual double-checking of the behavior of regexes in Python 2/3 to be sure.

Closes #22400 from srowen/SPARK-25238.2.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-09-13 11:19:43 +08:00
Holden Karau da5685b5bb [SPARK-23672][PYTHON] Document support for nested return types in scalar with arrow udfs
## What changes were proposed in this pull request?

Clarify docstring for Scalar functions

## How was this patch tested?

Adds a unit test showing use similar to wordcount, there's existing unit test for array of floats as well.

Closes #20908 from holdenk/SPARK-23672-document-support-for-nested-return-types-in-scalar-with-arrow-udfs.

Authored-by: Holden Karau <holden@pigscanfly.ca>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2018-09-10 11:01:51 -07:00
hyukjinkwon 7ef6d1daf8 [SPARK-25328][PYTHON] Add an example for having two columns as the grouping key in group aggregate pandas UDF
## What changes were proposed in this pull request?

This PR proposes to add another example for multiple grouping key in group aggregate pandas UDF since this feature could make users still confused.

## How was this patch tested?

Manually tested and documentation built.

Closes #22329 from HyukjinKwon/SPARK-25328.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2018-09-06 08:18:49 -07:00
Maxim Gekk d749d034a8 [SPARK-25252][SQL] Support arrays of any types by to_json
## What changes were proposed in this pull request?

In the PR, I propose to extended `to_json` and support any types as element types of input arrays. It should allow converting arrays of primitive types and arrays of arrays. For example:

```
select to_json(array('1','2','3'))
> ["1","2","3"]
select to_json(array(array(1,2,3),array(4)))
> [[1,2,3],[4]]
```

## How was this patch tested?

Added a couple sql tests for arrays of primitive type and of arrays. Also I added round trip test `from_json` -> `to_json`.

Closes #22226 from MaxGekk/to_json-array.

Authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-09-06 12:35:59 +08:00
Kevin Yu 2381953ab5 [SPARK-25105][PYSPARK][SQL] Include PandasUDFType in the import all of pyspark.sql.functions
## What changes were proposed in this pull request?

Include PandasUDFType in the import all of pyspark.sql.functions

## How was this patch tested?

Run the test case from the pyspark shell from the jira [spark-25105](https://jira.apache.org/jira/browse/SPARK-25105?jql=project%20%3D%20SPARK%20AND%20component%20in%20(ML%2C%20PySpark%2C%20SQL%2C%20%22Structured%20Streaming%22))
I manually test on pyspark-shell:
before:
`
>>> from pyspark.sql.functions import *
>>> foo = pandas_udf(lambda x: x, 'v int', PandasUDFType.GROUPED_MAP)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
NameError: name 'PandasUDFType' is not defined
>>>
`
after:
`
>>> from pyspark.sql.functions import *
>>> foo = pandas_udf(lambda x: x, 'v int', PandasUDFType.GROUPED_MAP)
>>>
`
Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #22100 from kevinyu98/spark-25105.

Authored-by: Kevin Yu <qyu@us.ibm.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2018-08-22 10:16:47 -07:00
Maxim Gekk ab06c25350 [SPARK-24391][SQL] Support arrays of any types by from_json
## What changes were proposed in this pull request?

The PR removes a restriction for element types of array type which exists in `from_json` for the root type. Currently, the function can handle only arrays of structs. Even array of primitive types is disallowed. The PR allows arrays of any types currently supported by JSON datasource. Here is an example of an array of a primitive type:

```
scala> import org.apache.spark.sql.functions._
scala> val df = Seq("[1, 2, 3]").toDF("a")
scala> val schema = new ArrayType(IntegerType, false)
scala> val arr = df.select(from_json($"a", schema))
scala> arr.printSchema
root
 |-- jsontostructs(a): array (nullable = true)
 |    |-- element: integer (containsNull = true)
```
and result of converting of the json string to the `ArrayType`:
```
scala> arr.show
+----------------+
|jsontostructs(a)|
+----------------+
|       [1, 2, 3]|
+----------------+
```

## How was this patch tested?

I added a few positive and negative tests:
- array of primitive types
- array of arrays
- array of structs
- array of maps

Closes #21439 from MaxGekk/from_json-array.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-13 20:13:09 +08:00
Kazuaki Ishizaki 1a5e460762 [SPARK-23913][SQL] Add array_intersect function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_intersect`. The behavior of the function is based on Presto's one.

This function returns returns an array of the elements in the intersection of array1 and array2.

Note: The order of elements in the result is not defined.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21102 from kiszk/SPARK-23913.
2018-08-06 23:27:57 +09:00
Kazuaki Ishizaki 95a9d5e3a5 [SPARK-23915][SQL] Add array_except function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_except`. The behavior of the function is based on Presto's one.

This function returns returns an array of the elements in array1 but not in array2.

Note: The order of elements in the result is not defined.

## How was this patch tested?

Added UTs.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21103 from kiszk/SPARK-23915.
2018-08-02 02:52:30 +08:00
Li Jin 8141d55926 [SPARK-23633][SQL] Update Pandas UDFs section in sql-programming-guide
## What changes were proposed in this pull request?

Update Pandas UDFs section in sql-programming-guide. Add section for grouped aggregate pandas UDF.

## How was this patch tested?

Author: Li Jin <ice.xelloss@gmail.com>

Closes #21887 from icexelloss/SPARK-23633-sql-programming-guide.
2018-07-31 10:10:38 +08:00
pkuwm ef6c8395c4 [SPARK-23928][SQL] Add shuffle collection function.
## What changes were proposed in this pull request?

This PR adds a new collection function: shuffle. It generates a random permutation of the given array. This implementation uses the "inside-out" version of Fisher-Yates algorithm.

## How was this patch tested?

New tests are added to CollectionExpressionsSuite.scala and DataFrameFunctionsSuite.scala.

Author: Takuya UESHIN <ueshin@databricks.com>
Author: pkuwm <ihuizhi.lu@gmail.com>

Closes #21802 from ueshin/issues/SPARK-23928/shuffle.
2018-07-27 23:02:48 +09:00
Huaxin Gao 0ab07b357b [SPARK-24868][PYTHON] add sequence function in Python
## What changes were proposed in this pull request?

Add ```sequence``` in functions.py

## How was this patch tested?

Add doctest.

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #21820 from huaxingao/spark-24868.
2018-07-20 17:53:14 +08:00
Kazuaki Ishizaki 301bff7063 [SPARK-23914][SQL] Add array_union function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_union`. The behavior of the function is based on Presto's one.

This function returns returns an array of the elements in the union of array1 and array2.

Note: The order of elements in the result is not defined.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21061 from kiszk/SPARK-23914.
2018-07-12 17:42:29 +09:00
Bruce Robbins 034913b62b [SPARK-23936][SQL] Implement map_concat
## What changes were proposed in this pull request?

Implement map_concat high order function.

This implementation does not pick a winner when the specified maps have overlapping keys. Therefore, this implementation preserves existing duplicate keys in the maps and potentially introduces new duplicates (After discussion with ueshin, we settled on option 1 from [here](https://issues.apache.org/jira/browse/SPARK-23936?focusedCommentId=16464245&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16464245)).

## How was this patch tested?

New tests
Manual tests
Run all sbt SQL tests
Run all pyspark sql tests

Author: Bruce Robbins <bersprockets@gmail.com>

Closes #21073 from bersprockets/SPARK-23936.
2018-07-09 21:21:38 +09:00
Takeshi Yamamuro a381bce728 [SPARK-24673][SQL][PYTHON][FOLLOWUP] Support Column arguments in timezone of from_utc_timestamp/to_utc_timestamp
## What changes were proposed in this pull request?
This pr supported column arguments in timezone of `from_utc_timestamp/to_utc_timestamp` (follow-up of #21693).

## How was this patch tested?
Added tests.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21723 from maropu/SPARK-24673-FOLLOWUP.
2018-07-06 18:28:54 +08:00
Maxim Gekk 776f299fc8 [SPARK-24709][SQL] schema_of_json() - schema inference from an example
## What changes were proposed in this pull request?

In the PR, I propose to add new function - *schema_of_json()* which infers schema of JSON string literal. The result of the function is a string containing a schema in DDL format.

One of the use cases is using of *schema_of_json()* in the combination with *from_json()*. Currently, _from_json()_ requires a schema as a mandatory argument. The *schema_of_json()* function will allow to point out an JSON string as an example which has the same schema as the first argument of _from_json()_. For instance:

```sql
select from_json(json_column, schema_of_json('{"c1": [0], "c2": [{"c3":0}]}'))
from json_table;
```

## How was this patch tested?

Added new test to `JsonFunctionsSuite`, `JsonExpressionsSuite` and SQL tests to `json-functions.sql`

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21686 from MaxGekk/infer_schema_json.
2018-07-04 09:38:18 +08:00
Bryan Cutler a5849ad9a3 [SPARK-24324][PYTHON] Pandas Grouped Map UDF should assign result columns by name
## What changes were proposed in this pull request?

Currently, a `pandas_udf` of type `PandasUDFType.GROUPED_MAP` will assign the resulting columns based on index of the return pandas.DataFrame.  If a new DataFrame is returned and constructed using a dict, then the order of the columns could be arbitrary and be different than the defined schema for the UDF.  If the schema types still match, then no error will be raised and the user will see column names and column data mixed up.

This change will first try to assign columns using the return type field names.  If a KeyError occurs, then the column index is checked if it is string based. If so, then the error is raised as it is most likely a naming mistake, else it will fallback to assign columns by position and raise a TypeError if the field types do not match.

## How was this patch tested?

Added a test that returns a new DataFrame with column order different than the schema.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #21427 from BryanCutler/arrow-grouped-map-mixesup-cols-SPARK-24324.
2018-06-24 09:28:46 +08:00
Marek Novotny 92c2f00bd2 [SPARK-23934][SQL] Adding map_from_entries function
## What changes were proposed in this pull request?
The PR adds the `map_from_entries` function that returns a map created from the given array of entries.

## How was this patch tested?
New tests added into:
- `CollectionExpressionSuite`
- `DataFrameFunctionSuite`

## CodeGen Examples
### Primitive-type Keys and Values
```
val idf = Seq(
  Seq((1, 10), (2, 20), (3, 10)),
  Seq((1, 10), null, (2, 20))
).toDF("a")
idf.filter('a.isNotNull).select(map_from_entries('a)).debugCodegen
```
Result:
```
/* 042 */         boolean project_isNull_0 = false;
/* 043 */         MapData project_value_0 = null;
/* 044 */
/* 045 */         for (int project_idx_2 = 0; !project_isNull_0 && project_idx_2 < inputadapter_value_0.numElements(); project_idx_2++) {
/* 046 */           project_isNull_0 |= inputadapter_value_0.isNullAt(project_idx_2);
/* 047 */         }
/* 048 */         if (!project_isNull_0) {
/* 049 */           final int project_numEntries_0 = inputadapter_value_0.numElements();
/* 050 */
/* 051 */           final long project_keySectionSize_0 = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(project_numEntries_0, 4);
/* 052 */           final long project_valueSectionSize_0 = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(project_numEntries_0, 4);
/* 053 */           final long project_byteArraySize_0 = 8 + project_keySectionSize_0 + project_valueSectionSize_0;
/* 054 */           if (project_byteArraySize_0 > 2147483632) {
/* 055 */             final Object[] project_keys_0 = new Object[project_numEntries_0];
/* 056 */             final Object[] project_values_0 = new Object[project_numEntries_0];
/* 057 */
/* 058 */             for (int project_idx_1 = 0; project_idx_1 < project_numEntries_0; project_idx_1++) {
/* 059 */               InternalRow project_entry_1 = inputadapter_value_0.getStruct(project_idx_1, 2);
/* 060 */
/* 061 */               project_keys_0[project_idx_1] = project_entry_1.getInt(0);
/* 062 */               project_values_0[project_idx_1] = project_entry_1.getInt(1);
/* 063 */             }
/* 064 */
/* 065 */             project_value_0 = org.apache.spark.sql.catalyst.util.ArrayBasedMapData.apply(project_keys_0, project_values_0);
/* 066 */
/* 067 */           } else {
/* 068 */             final byte[] project_byteArray_0 = new byte[(int)project_byteArraySize_0];
/* 069 */             UnsafeMapData project_unsafeMapData_0 = new UnsafeMapData();
/* 070 */             Platform.putLong(project_byteArray_0, 16, project_keySectionSize_0);
/* 071 */             Platform.putLong(project_byteArray_0, 24, project_numEntries_0);
/* 072 */             Platform.putLong(project_byteArray_0, 24 + project_keySectionSize_0, project_numEntries_0);
/* 073 */             project_unsafeMapData_0.pointTo(project_byteArray_0, 16, (int)project_byteArraySize_0);
/* 074 */             ArrayData project_keyArrayData_0 = project_unsafeMapData_0.keyArray();
/* 075 */             ArrayData project_valueArrayData_0 = project_unsafeMapData_0.valueArray();
/* 076 */
/* 077 */             for (int project_idx_0 = 0; project_idx_0 < project_numEntries_0; project_idx_0++) {
/* 078 */               InternalRow project_entry_0 = inputadapter_value_0.getStruct(project_idx_0, 2);
/* 079 */
/* 080 */               project_keyArrayData_0.setInt(project_idx_0, project_entry_0.getInt(0));
/* 081 */               project_valueArrayData_0.setInt(project_idx_0, project_entry_0.getInt(1));
/* 082 */             }
/* 083 */
/* 084 */             project_value_0 = project_unsafeMapData_0;
/* 085 */           }
/* 086 */
/* 087 */         }
```
### Non-primitive-type Keys and Values
```
val sdf = Seq(
  Seq(("a", null), ("b", "bb"), ("c", "aa")),
  Seq(("a", "aa"), null, (null, "bb"))
).toDF("a")
sdf.filter('a.isNotNull).select(map_from_entries('a)).debugCodegen
```
Result:
```
/* 042 */         boolean project_isNull_0 = false;
/* 043 */         MapData project_value_0 = null;
/* 044 */
/* 045 */         for (int project_idx_1 = 0; !project_isNull_0 && project_idx_1 < inputadapter_value_0.numElements(); project_idx_1++) {
/* 046 */           project_isNull_0 |= inputadapter_value_0.isNullAt(project_idx_1);
/* 047 */         }
/* 048 */         if (!project_isNull_0) {
/* 049 */           final int project_numEntries_0 = inputadapter_value_0.numElements();
/* 050 */
/* 051 */           final Object[] project_keys_0 = new Object[project_numEntries_0];
/* 052 */           final Object[] project_values_0 = new Object[project_numEntries_0];
/* 053 */
/* 054 */           for (int project_idx_0 = 0; project_idx_0 < project_numEntries_0; project_idx_0++) {
/* 055 */             InternalRow project_entry_0 = inputadapter_value_0.getStruct(project_idx_0, 2);
/* 056 */
/* 057 */             if (project_entry_0.isNullAt(0)) {
/* 058 */               throw new RuntimeException("The first field from a struct (key) can't be null.");
/* 059 */             }
/* 060 */
/* 061 */             project_keys_0[project_idx_0] = project_entry_0.getUTF8String(0);
/* 062 */             project_values_0[project_idx_0] = project_entry_0.getUTF8String(1);
/* 063 */           }
/* 064 */
/* 065 */           project_value_0 = org.apache.spark.sql.catalyst.util.ArrayBasedMapData.apply(project_keys_0, project_values_0);
/* 066 */
/* 067 */         }
```

Author: Marek Novotny <mn.mikke@gmail.com>

Closes #21282 from mn-mikke/feature/array-api-map_from_entries-to-master.
2018-06-22 16:18:22 +09:00
Huaxin Gao 9de11d3f90 [SPARK-23912][SQL] add array_distinct
## What changes were proposed in this pull request?

Add array_distinct to remove duplicate value from the array.

## How was this patch tested?

Add unit tests

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #21050 from huaxingao/spark-23912.
2018-06-21 12:24:53 +09:00
Maxim Gekk b8f27ae3b3 [SPARK-24543][SQL] Support any type as DDL string for from_json's schema
## What changes were proposed in this pull request?

In the PR, I propose to support any DataType represented as DDL string for the from_json function. After the changes, it will be possible to specify `MapType` in SQL like:
```sql
select from_json('{"a":1, "b":2}', 'map<string, int>')
```
and in Scala (similar in other languages)
```scala
val in = Seq("""{"a": {"b": 1}}""").toDS()
val schema = "map<string, map<string, int>>"
val out = in.select(from_json($"value", schema, Map.empty[String, String]))
```

## How was this patch tested?

Added a couple sql tests and modified existing tests for Python and Scala. The former tests were modified because it is not imported for them in which format schema for `from_json` is provided.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21550 from MaxGekk/from_json-ddl-schema.
2018-06-14 13:27:27 -07:00
Li Jin 9786ce66c5 [SPARK-22239][SQL][PYTHON] Enable grouped aggregate pandas UDFs as window functions with unbounded window frames
## What changes were proposed in this pull request?
This PR enables using a grouped aggregate pandas UDFs as window functions. The semantics is the same as using SQL aggregation function as window functions.

```
       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
       >>> from pyspark.sql import Window
       >>> df = spark.createDataFrame(
       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
       ...     ("id", "v"))
       >>> pandas_udf("double", PandasUDFType.GROUPED_AGG)
       ... def mean_udf(v):
       ...     return v.mean()
       >>> w = Window.partitionBy('id')
       >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
       +---+----+------+
       | id|   v|mean_v|
       +---+----+------+
       |  1| 1.0|   1.5|
       |  1| 2.0|   1.5|
       |  2| 3.0|   6.0|
       |  2| 5.0|   6.0|
       |  2|10.0|   6.0|
       +---+----+------+
```

The scope of this PR is somewhat limited in terms of:
(1) Only supports unbounded window, which acts essentially as group by.
(2) Only supports aggregation functions, not "transform" like window functions (n -> n mapping)

Both of these are left as future work. Especially, (1) needs careful thinking w.r.t. how to pass rolling window data to python efficiently. (2) is a bit easier but does require more changes therefore I think it's better to leave it as a separate PR.

## How was this patch tested?

WindowPandasUDFTests

Author: Li Jin <ice.xelloss@gmail.com>

Closes #21082 from icexelloss/SPARK-22239-window-udf.
2018-06-13 09:10:52 +08:00
Kazuaki Ishizaki ada28f2595 [SPARK-23933][SQL] Add map_from_arrays function
## What changes were proposed in this pull request?

The PR adds the SQL function `map_from_arrays`. The behavior of the function is based on Presto's `map`. Since SparkSQL already had a `map` function, we prepared the different name for this behavior.

This function returns returns a map from a pair of arrays for keys and values.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21258 from kiszk/SPARK-23933.
2018-06-12 12:31:22 -07:00
DylanGuedes f0ef1b311d [SPARK-23931][SQL] Adds arrays_zip function to sparksql
Signed-off-by: DylanGuedes <djmgguedesgmail.com>

## What changes were proposed in this pull request?

Addition of arrays_zip function to spark sql functions.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
Unit tests that checks if the results are correct.

Author: DylanGuedes <djmgguedes@gmail.com>

Closes #21045 from DylanGuedes/SPARK-23931.
2018-06-12 11:57:25 -07:00
Huaxin Gao 98909c398d [SPARK-23920][SQL] add array_remove to remove all elements that equal element from array
## What changes were proposed in this pull request?

add array_remove to remove all elements that equal element from array

## How was this patch tested?

add unit tests

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #21069 from huaxingao/spark-23920.
2018-05-31 22:04:26 -07:00
Bryan Cutler b2d0226562 [SPARK-24444][DOCS][PYTHON] Improve Pandas UDF docs to explain column assignment
## What changes were proposed in this pull request?

Added sections to pandas_udf docs, in the grouped map section, to indicate columns are assigned by position.

## How was this patch tested?

NA

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #21471 from BryanCutler/arrow-doc-pandas_udf-column_by_pos-SPARK-21427.
2018-06-01 11:58:59 +08:00
Bryan Cutler fa2ae9d201 [SPARK-24392][PYTHON] Label pandas_udf as Experimental
## What changes were proposed in this pull request?

The pandas_udf functionality was introduced in 2.3.0, but is not completely stable and still evolving.  This adds a label to indicate it is still an experimental API.

## How was this patch tested?

NA

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #21435 from BryanCutler/arrow-pandas_udf-experimental-SPARK-24392.
2018-05-28 12:56:05 +08:00
Marek Novotny a6e883feb3 [SPARK-23935][SQL] Adding map_entries function
## What changes were proposed in this pull request?

This PR adds `map_entries` function that returns an unordered array of all entries in the given map.

## How was this patch tested?

New tests added into:
- `CollectionExpressionSuite`
- `DataFrameFunctionsSuite`

## CodeGen examples
### Primitive types
```
val df = Seq(Map(1 -> 5, 2 -> 6)).toDF("m")
df.filter('m.isNotNull).select(map_entries('m)).debugCodegen
```
Result:
```
/* 042 */         boolean project_isNull_0 = false;
/* 043 */
/* 044 */         ArrayData project_value_0 = null;
/* 045 */
/* 046 */         final int project_numElements_0 = inputadapter_value_0.numElements();
/* 047 */         final ArrayData project_keys_0 = inputadapter_value_0.keyArray();
/* 048 */         final ArrayData project_values_0 = inputadapter_value_0.valueArray();
/* 049 */
/* 050 */         final long project_size_0 = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(
/* 051 */           project_numElements_0,
/* 052 */           32);
/* 053 */         if (project_size_0 > 2147483632) {
/* 054 */           final Object[] project_internalRowArray_0 = new Object[project_numElements_0];
/* 055 */           for (int z = 0; z < project_numElements_0; z++) {
/* 056 */             project_internalRowArray_0[z] = new org.apache.spark.sql.catalyst.expressions.GenericInternalRow(new Object[]{project_keys_0.getInt(z), project_values_0.getInt(z)});
/* 057 */           }
/* 058 */           project_value_0 = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_internalRowArray_0);
/* 059 */
/* 060 */         } else {
/* 061 */           final byte[] project_arrayBytes_0 = new byte[(int)project_size_0];
/* 062 */           UnsafeArrayData project_unsafeArrayData_0 = new UnsafeArrayData();
/* 063 */           Platform.putLong(project_arrayBytes_0, 16, project_numElements_0);
/* 064 */           project_unsafeArrayData_0.pointTo(project_arrayBytes_0, 16, (int)project_size_0);
/* 065 */
/* 066 */           final int project_structsOffset_0 = UnsafeArrayData.calculateHeaderPortionInBytes(project_numElements_0) + project_numElements_0 * 8;
/* 067 */           UnsafeRow project_unsafeRow_0 = new UnsafeRow(2);
/* 068 */           for (int z = 0; z < project_numElements_0; z++) {
/* 069 */             long offset = project_structsOffset_0 + z * 24L;
/* 070 */             project_unsafeArrayData_0.setLong(z, (offset << 32) + 24L);
/* 071 */             project_unsafeRow_0.pointTo(project_arrayBytes_0, 16 + offset, 24);
/* 072 */             project_unsafeRow_0.setInt(0, project_keys_0.getInt(z));
/* 073 */             project_unsafeRow_0.setInt(1, project_values_0.getInt(z));
/* 074 */           }
/* 075 */           project_value_0 = project_unsafeArrayData_0;
/* 076 */
/* 077 */         }
```
### Non-primitive types
```
val df = Seq(Map("a" -> "foo", "b" -> null)).toDF("m")
df.filter('m.isNotNull).select(map_entries('m)).debugCodegen
```
Result:
```
/* 042 */         boolean project_isNull_0 = false;
/* 043 */
/* 044 */         ArrayData project_value_0 = null;
/* 045 */
/* 046 */         final int project_numElements_0 = inputadapter_value_0.numElements();
/* 047 */         final ArrayData project_keys_0 = inputadapter_value_0.keyArray();
/* 048 */         final ArrayData project_values_0 = inputadapter_value_0.valueArray();
/* 049 */
/* 050 */         final Object[] project_internalRowArray_0 = new Object[project_numElements_0];
/* 051 */         for (int z = 0; z < project_numElements_0; z++) {
/* 052 */           project_internalRowArray_0[z] = new org.apache.spark.sql.catalyst.expressions.GenericInternalRow(new Object[]{project_keys_0.getUTF8String(z), project_values_0.getUTF8String(z)});
/* 053 */         }
/* 054 */         project_value_0 = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_internalRowArray_0);
```

Author: Marek Novotny <mn.mikke@gmail.com>

Closes #21236 from mn-mikke/feature/array-api-map_entries-to-master.
2018-05-21 23:14:03 +09:00
Marco Gaido 69350aa2f0 [SPARK-23922][SQL] Add arrays_overlap function
## What changes were proposed in this pull request?

The PR adds the function `arrays_overlap`. This function returns `true` if the input arrays contain a non-null common element; if not, it returns `null` if any of the arrays contains a `null` element, `false` otherwise.

## How was this patch tested?

added UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21028 from mgaido91/SPARK-23922.
2018-05-17 20:45:32 +08:00
Florent Pépin 3e66350c24 [SPARK-23925][SQL] Add array_repeat collection function
## What changes were proposed in this pull request?

The PR adds a new collection function, array_repeat. As there already was a function repeat with the same signature, with the only difference being the expected return type (String instead of Array), the new function is called array_repeat to distinguish.
The behaviour of the function is based on Presto's one.

The function creates an array containing a given element repeated the requested number of times.

## How was this patch tested?

New unit tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite

Author: Florent Pépin <florentpepin.92@gmail.com>
Author: Florent Pépin <florent.pepin14@imperial.ac.uk>

Closes #21208 from pepinoflo/SPARK-23925.
2018-05-17 13:31:14 +09:00
Maxim Gekk 8cd83acf40 [SPARK-24027][SQL] Support MapType with StringType for keys as the root type by from_json
## What changes were proposed in this pull request?

Currently, the from_json function support StructType or ArrayType as the root type. The PR allows to specify MapType(StringType, DataType) as the root type additionally to mentioned types. For example:

```scala
import org.apache.spark.sql.types._
val schema = MapType(StringType, IntegerType)
val in = Seq("""{"a": 1, "b": 2, "c": 3}""").toDS()
in.select(from_json($"value", schema, Map[String, String]())).collect()
```
```
res1: Array[org.apache.spark.sql.Row] = Array([Map(a -> 1, b -> 2, c -> 3)])
```

## How was this patch tested?

It was checked by new tests for the map type with integer type and struct type as value types. Also roundtrip tests like from_json(to_json) and to_json(from_json) for MapType are added.

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #21108 from MaxGekk/from_json-map-type.
2018-05-14 14:05:42 -07:00
aditkumar 92f6f52ff0 [MINOR][DOCS] Documenting months_between direction
## What changes were proposed in this pull request?

It's useful to know what relationship between date1 and date2 results in a positive number.

Author: aditkumar <aditkumar@gmail.com>
Author: Adit Kumar <aditkumar@gmail.com>

Closes #20787 from aditkumar/master.
2018-05-11 14:42:23 -05:00
Maxim Gekk f4fed05121 [SPARK-24171] Adding a note for non-deterministic functions
## What changes were proposed in this pull request?

I propose to add a clear statement for functions like `collect_list()` about non-deterministic behavior of such functions. The behavior must be taken into account by user while creating and running queries.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21228 from MaxGekk/deterministic-comments.
2018-05-10 09:44:49 -07:00
Marco Gaido e35ad3cadd [SPARK-23930][SQL] Add slice function
## What changes were proposed in this pull request?

The PR add the `slice` function. The behavior of the function is based on Presto's one.

The function slices an array according to the requested start index and length.

## How was this patch tested?

added UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21040 from mgaido91/SPARK-23930.
2018-05-07 16:57:37 +09:00
Kazuaki Ishizaki 7564a9a706 [SPARK-23921][SQL] Add array_sort function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_sort`. The behavior of the function is based on Presto's one.

The function sorts the input array in ascending order. The elements of the input array must be orderable. Null elements will be placed at the end of the returned array.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21021 from kiszk/SPARK-23921.
2018-05-07 15:22:23 +09:00
Marco Gaido cd10f9df82 [SPARK-23916][SQL] Add array_join function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_join`. The behavior of the function is based on Presto's one.

The function accepts an `array` of `string` which is to be joined, a `string` which is the delimiter to use between the items of the first argument and optionally a `string` which is used to replace `null` values.

## How was this patch tested?

added UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21011 from mgaido91/SPARK-23916.
2018-04-26 13:37:13 +09:00
Marco Gaido 58c55cb4a6 [SPARK-23902][SQL] Add roundOff flag to months_between
## What changes were proposed in this pull request?

HIVE-15511 introduced the `roundOff` flag in order to disable the rounding to 8 digits which is performed in `months_between`. Since this can be a computational intensive operation, skipping it may improve performances when the rounding is not needed.

## How was this patch tested?

modified existing UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21008 from mgaido91/SPARK-23902.
2018-04-26 12:19:20 +09:00
mn-mikke 5fea17b3be [SPARK-23821][SQL] Collection function: flatten
## What changes were proposed in this pull request?

This PR adds a new collection function that transforms an array of arrays into a single array. The PR comprises:
- An expression for flattening array structure
- Flatten function
- A wrapper for PySpark

## How was this patch tested?

New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite

## Codegen examples
### Primitive type
```
val df = Seq(
  Seq(Seq(1, 2), Seq(4, 5)),
  Seq(null, Seq(1))
).toDF("i")
df.filter($"i".isNotNull || $"i".isNull).select(flatten($"i")).debugCodegen
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         boolean filter_value = true;
/* 038 */
/* 039 */         if (!(!inputadapter_isNull)) {
/* 040 */           filter_value = inputadapter_isNull;
/* 041 */         }
/* 042 */         if (!filter_value) continue;
/* 043 */
/* 044 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 045 */
/* 046 */         boolean project_isNull = inputadapter_isNull;
/* 047 */         ArrayData project_value = null;
/* 048 */
/* 049 */         if (!inputadapter_isNull) {
/* 050 */           for (int z = 0; !project_isNull && z < inputadapter_value.numElements(); z++) {
/* 051 */             project_isNull |= inputadapter_value.isNullAt(z);
/* 052 */           }
/* 053 */           if (!project_isNull) {
/* 054 */             long project_numElements = 0;
/* 055 */             for (int z = 0; z < inputadapter_value.numElements(); z++) {
/* 056 */               project_numElements += inputadapter_value.getArray(z).numElements();
/* 057 */             }
/* 058 */             if (project_numElements > 2147483632) {
/* 059 */               throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
/* 060 */                 project_numElements + " elements due to exceeding the array size limit 2147483632.");
/* 061 */             }
/* 062 */
/* 063 */             long project_size = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(
/* 064 */               project_numElements,
/* 065 */               4);
/* 066 */             if (project_size > 2147483632) {
/* 067 */               throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
/* 068 */                 project_size + " bytes of data due to exceeding the limit 2147483632" +
/* 069 */                 " bytes for UnsafeArrayData.");
/* 070 */             }
/* 071 */
/* 072 */             byte[] project_array = new byte[(int)project_size];
/* 073 */             UnsafeArrayData project_tempArrayData = new UnsafeArrayData();
/* 074 */             Platform.putLong(project_array, 16, project_numElements);
/* 075 */             project_tempArrayData.pointTo(project_array, 16, (int)project_size);
/* 076 */             int project_counter = 0;
/* 077 */             for (int k = 0; k < inputadapter_value.numElements(); k++) {
/* 078 */               ArrayData arr = inputadapter_value.getArray(k);
/* 079 */               for (int l = 0; l < arr.numElements(); l++) {
/* 080 */                 if (arr.isNullAt(l)) {
/* 081 */                   project_tempArrayData.setNullAt(project_counter);
/* 082 */                 } else {
/* 083 */                   project_tempArrayData.setInt(
/* 084 */                     project_counter,
/* 085 */                     arr.getInt(l)
/* 086 */                   );
/* 087 */                 }
/* 088 */                 project_counter++;
/* 089 */               }
/* 090 */             }
/* 091 */             project_value = project_tempArrayData;
/* 092 */
/* 093 */           }
/* 094 */
/* 095 */         }
```
### Non-primitive type
```
val df = Seq(
  Seq(Seq("a", "b"), Seq(null, "d")),
  Seq(null, Seq("a"))
).toDF("s")
df.filter($"s".isNotNull || $"s".isNull).select(flatten($"s")).debugCodegen
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         boolean filter_value = true;
/* 038 */
/* 039 */         if (!(!inputadapter_isNull)) {
/* 040 */           filter_value = inputadapter_isNull;
/* 041 */         }
/* 042 */         if (!filter_value) continue;
/* 043 */
/* 044 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 045 */
/* 046 */         boolean project_isNull = inputadapter_isNull;
/* 047 */         ArrayData project_value = null;
/* 048 */
/* 049 */         if (!inputadapter_isNull) {
/* 050 */           for (int z = 0; !project_isNull && z < inputadapter_value.numElements(); z++) {
/* 051 */             project_isNull |= inputadapter_value.isNullAt(z);
/* 052 */           }
/* 053 */           if (!project_isNull) {
/* 054 */             long project_numElements = 0;
/* 055 */             for (int z = 0; z < inputadapter_value.numElements(); z++) {
/* 056 */               project_numElements += inputadapter_value.getArray(z).numElements();
/* 057 */             }
/* 058 */             if (project_numElements > 2147483632) {
/* 059 */               throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
/* 060 */                 project_numElements + " elements due to exceeding the array size limit 2147483632.");
/* 061 */             }
/* 062 */
/* 063 */             Object[] project_arrayObject = new Object[(int)project_numElements];
/* 064 */             int project_counter = 0;
/* 065 */             for (int k = 0; k < inputadapter_value.numElements(); k++) {
/* 066 */               ArrayData arr = inputadapter_value.getArray(k);
/* 067 */               for (int l = 0; l < arr.numElements(); l++) {
/* 068 */                 project_arrayObject[project_counter] = arr.getUTF8String(l);
/* 069 */                 project_counter++;
/* 070 */               }
/* 071 */             }
/* 072 */             project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_arrayObject);
/* 073 */
/* 074 */           }
/* 075 */
/* 076 */         }
```

Author: mn-mikke <mrkAha12346github>

Closes #20938 from mn-mikke/feature/array-api-flatten-to-master.
2018-04-25 11:19:08 +09:00
mn-mikke e6b466084c [SPARK-23736][SQL] Extending the concat function to support array columns
## What changes were proposed in this pull request?
The PR adds a logic for easy concatenation of multiple array columns and covers:
- Concat expression has been extended to support array columns
- A Python wrapper

## How was this patch tested?
New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite
- typeCoercion/native/concat.sql

## Codegen examples
### Primitive-type elements
```
val df = Seq(
  (Seq(1 ,2), Seq(3, 4)),
  (Seq(1, 2, 3), null)
).toDF("a", "b")
df.filter('a.isNotNull).select(concat('a, 'b)).debugCodegen()
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         if (!(!inputadapter_isNull)) continue;
/* 038 */
/* 039 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 040 */
/* 041 */         ArrayData[] project_args = new ArrayData[2];
/* 042 */
/* 043 */         if (!false) {
/* 044 */           project_args[0] = inputadapter_value;
/* 045 */         }
/* 046 */
/* 047 */         boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 048 */         ArrayData inputadapter_value1 = inputadapter_isNull1 ?
/* 049 */         null : (inputadapter_row.getArray(1));
/* 050 */         if (!inputadapter_isNull1) {
/* 051 */           project_args[1] = inputadapter_value1;
/* 052 */         }
/* 053 */
/* 054 */         ArrayData project_value = new Object() {
/* 055 */           public ArrayData concat(ArrayData[] args) {
/* 056 */             for (int z = 0; z < 2; z++) {
/* 057 */               if (args[z] == null) return null;
/* 058 */             }
/* 059 */
/* 060 */             long project_numElements = 0L;
/* 061 */             for (int z = 0; z < 2; z++) {
/* 062 */               project_numElements += args[z].numElements();
/* 063 */             }
/* 064 */             if (project_numElements > 2147483632) {
/* 065 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_numElements +
/* 066 */                 " elements due to exceeding the array size limit 2147483632.");
/* 067 */             }
/* 068 */
/* 069 */             long project_size = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(
/* 070 */               project_numElements,
/* 071 */               4);
/* 072 */             if (project_size > 2147483632) {
/* 073 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_size +
/* 074 */                 " bytes of data due to exceeding the limit 2147483632 bytes" +
/* 075 */                 " for UnsafeArrayData.");
/* 076 */             }
/* 077 */
/* 078 */             byte[] project_array = new byte[(int)project_size];
/* 079 */             UnsafeArrayData project_arrayData = new UnsafeArrayData();
/* 080 */             Platform.putLong(project_array, 16, project_numElements);
/* 081 */             project_arrayData.pointTo(project_array, 16, (int)project_size);
/* 082 */             int project_counter = 0;
/* 083 */             for (int y = 0; y < 2; y++) {
/* 084 */               for (int z = 0; z < args[y].numElements(); z++) {
/* 085 */                 if (args[y].isNullAt(z)) {
/* 086 */                   project_arrayData.setNullAt(project_counter);
/* 087 */                 } else {
/* 088 */                   project_arrayData.setInt(
/* 089 */                     project_counter,
/* 090 */                     args[y].getInt(z)
/* 091 */                   );
/* 092 */                 }
/* 093 */                 project_counter++;
/* 094 */               }
/* 095 */             }
/* 096 */             return project_arrayData;
/* 097 */           }
/* 098 */         }.concat(project_args);
/* 099 */         boolean project_isNull = project_value == null;
```

### Non-primitive-type elements
```
val df = Seq(
  (Seq("aa" ,"bb"), Seq("ccc", "ddd")),
  (Seq("x", "y"), null)
).toDF("a", "b")
df.filter('a.isNotNull).select(concat('a, 'b)).debugCodegen()
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         if (!(!inputadapter_isNull)) continue;
/* 038 */
/* 039 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 040 */
/* 041 */         ArrayData[] project_args = new ArrayData[2];
/* 042 */
/* 043 */         if (!false) {
/* 044 */           project_args[0] = inputadapter_value;
/* 045 */         }
/* 046 */
/* 047 */         boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 048 */         ArrayData inputadapter_value1 = inputadapter_isNull1 ?
/* 049 */         null : (inputadapter_row.getArray(1));
/* 050 */         if (!inputadapter_isNull1) {
/* 051 */           project_args[1] = inputadapter_value1;
/* 052 */         }
/* 053 */
/* 054 */         ArrayData project_value = new Object() {
/* 055 */           public ArrayData concat(ArrayData[] args) {
/* 056 */             for (int z = 0; z < 2; z++) {
/* 057 */               if (args[z] == null) return null;
/* 058 */             }
/* 059 */
/* 060 */             long project_numElements = 0L;
/* 061 */             for (int z = 0; z < 2; z++) {
/* 062 */               project_numElements += args[z].numElements();
/* 063 */             }
/* 064 */             if (project_numElements > 2147483632) {
/* 065 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_numElements +
/* 066 */                 " elements due to exceeding the array size limit 2147483632.");
/* 067 */             }
/* 068 */
/* 069 */             Object[] project_arrayObjects = new Object[(int)project_numElements];
/* 070 */             int project_counter = 0;
/* 071 */             for (int y = 0; y < 2; y++) {
/* 072 */               for (int z = 0; z < args[y].numElements(); z++) {
/* 073 */                 project_arrayObjects[project_counter] = args[y].getUTF8String(z);
/* 074 */                 project_counter++;
/* 075 */               }
/* 076 */             }
/* 077 */             return new org.apache.spark.sql.catalyst.util.GenericArrayData(project_arrayObjects);
/* 078 */           }
/* 079 */         }.concat(project_args);
/* 080 */         boolean project_isNull = project_value == null;
```

Author: mn-mikke <mrkAha12346github>

Closes #20858 from mn-mikke/feature/array-api-concat_arrays-to-master.
2018-04-20 14:58:11 +09:00
Kazuaki Ishizaki 46bb2b5129 [SPARK-23924][SQL] Add element_at function
## What changes were proposed in this pull request?

The PR adds the SQL function `element_at`. The behavior of the function is based on Presto's one.

This function returns element of array at given index in value if column is array, or returns value for the given key in value if column is map.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21053 from kiszk/SPARK-23924.
2018-04-19 21:00:10 +09:00
Kazuaki Ishizaki d5bec48b9c [SPARK-23919][SQL] Add array_position function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_position`. The behavior of the function is based on Presto's one.

The function returns the position of the first occurrence of the element in array x (or 0 if not found) using 1-based index as BigInt.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21037 from kiszk/SPARK-23919.
2018-04-19 11:59:17 +09:00
mn-mikke f81fa478ff [SPARK-23926][SQL] Extending reverse function to support ArrayType arguments
## What changes were proposed in this pull request?

This PR extends `reverse` functions to be able to operate over array columns and covers:
- Introduction of `Reverse` expression that represents logic for reversing arrays and also strings
- Removal of `StringReverse` expression
- A wrapper for PySpark

## How was this patch tested?

New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite

## Codegen examples
### Primitive type
```
val df = Seq(
  Seq(1, 3, 4, 2),
  null
).toDF("i")
df.filter($"i".isNotNull || $"i".isNull).select(reverse($"i")).debugCodegen
```
Result:
```
/* 032 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 033 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 034 */         null : (inputadapter_row.getArray(0));
/* 035 */
/* 036 */         boolean filter_value = true;
/* 037 */
/* 038 */         if (!(!inputadapter_isNull)) {
/* 039 */           filter_value = inputadapter_isNull;
/* 040 */         }
/* 041 */         if (!filter_value) continue;
/* 042 */
/* 043 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 044 */
/* 045 */         boolean project_isNull = inputadapter_isNull;
/* 046 */         ArrayData project_value = null;
/* 047 */
/* 048 */         if (!inputadapter_isNull) {
/* 049 */           final int project_length = inputadapter_value.numElements();
/* 050 */           project_value = inputadapter_value.copy();
/* 051 */           for(int k = 0; k < project_length / 2; k++) {
/* 052 */             int l = project_length - k - 1;
/* 053 */             boolean isNullAtK = project_value.isNullAt(k);
/* 054 */             boolean isNullAtL = project_value.isNullAt(l);
/* 055 */             if(!isNullAtK) {
/* 056 */               int el = project_value.getInt(k);
/* 057 */               if(!isNullAtL) {
/* 058 */                 project_value.setInt(k, project_value.getInt(l));
/* 059 */               } else {
/* 060 */                 project_value.setNullAt(k);
/* 061 */               }
/* 062 */               project_value.setInt(l, el);
/* 063 */             } else if (!isNullAtL) {
/* 064 */               project_value.setInt(k, project_value.getInt(l));
/* 065 */               project_value.setNullAt(l);
/* 066 */             }
/* 067 */           }
/* 068 */
/* 069 */         }
```
### Non-primitive type
```
val df = Seq(
  Seq("a", "c", "d", "b"),
  null
).toDF("s")
df.filter($"s".isNotNull || $"s".isNull).select(reverse($"s")).debugCodegen
```
Result:
```
/* 032 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 033 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 034 */         null : (inputadapter_row.getArray(0));
/* 035 */
/* 036 */         boolean filter_value = true;
/* 037 */
/* 038 */         if (!(!inputadapter_isNull)) {
/* 039 */           filter_value = inputadapter_isNull;
/* 040 */         }
/* 041 */         if (!filter_value) continue;
/* 042 */
/* 043 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 044 */
/* 045 */         boolean project_isNull = inputadapter_isNull;
/* 046 */         ArrayData project_value = null;
/* 047 */
/* 048 */         if (!inputadapter_isNull) {
/* 049 */           final int project_length = inputadapter_value.numElements();
/* 050 */           project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(new Object[project_length]);
/* 051 */           for(int k = 0; k < project_length; k++) {
/* 052 */             int l = project_length - k - 1;
/* 053 */             project_value.update(k, inputadapter_value.getUTF8String(l));
/* 054 */           }
/* 055 */
/* 056 */         }
```

Author: mn-mikke <mrkAha12346github>

Closes #21034 from mn-mikke/feature/array-api-reverse-to-master.
2018-04-18 18:41:55 +09:00
Marco Gaido 14844a62c0 [SPARK-23918][SQL] Add array_min function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_min`. It takes an array as argument and returns the minimum value in it.

## How was this patch tested?

added UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21025 from mgaido91/SPARK-23918.
2018-04-17 17:55:35 +09:00
Marco Gaido 6931022031 [SPARK-23917][SQL] Add array_max function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_max`. It takes an array as argument and returns the maximum value in it.

## How was this patch tested?

added UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21024 from mgaido91/SPARK-23917.
2018-04-15 21:45:55 -07:00
Huaxin Gao 2c1fe64757 [SPARK-23847][PYTHON][SQL] Add asc_nulls_first, asc_nulls_last to PySpark
## What changes were proposed in this pull request?

Column.scala and Functions.scala have asc_nulls_first, asc_nulls_last,  desc_nulls_first and desc_nulls_last. Add the corresponding python APIs in column.py and functions.py

## How was this patch tested?
Add doctest

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #20962 from huaxingao/spark-23847.
2018-04-08 12:09:06 +08:00
Michael (Stu) Stewart 087fb31420 [SPARK-23645][MINOR][DOCS][PYTHON] Add docs RE pandas_udf with keyword args
## What changes were proposed in this pull request?

Add documentation about the limitations of `pandas_udf` with keyword arguments and related concepts, like `functools.partial` fn objects.

NOTE: intermediate commits on this PR show some of the steps that can be taken to fix some (but not all) of these pain points.

### Survey of problems we face today:

(Initialize) Note: python 3.6 and spark 2.4snapshot.
```
 from pyspark.sql import SparkSession
 import inspect, functools
 from pyspark.sql.functions import pandas_udf, PandasUDFType, col, lit, udf

 spark = SparkSession.builder.getOrCreate()
 print(spark.version)

 df = spark.range(1,6).withColumn('b', col('id') * 2)

 def ok(a,b): return a+b
```

Using a keyword argument at the call site `b=...` (and yes, *full* stack trace below, haha):
```
---> 14 df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')('id', b='id')).show() # no kwargs

TypeError: wrapper() got an unexpected keyword argument 'b'
```

Using partial with a keyword argument where the kw-arg is the first argument of the fn:
*(Aside: kind of interesting that lines 15,16 work great and then 17 explodes)*
```
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-9-e9f31b8799c1> in <module>()
     15 df.withColumn('ok', pandas_udf(f=functools.partial(ok, 7), returnType='bigint')('id')).show()
     16 df.withColumn('ok', pandas_udf(f=functools.partial(ok, b=7), returnType='bigint')('id')).show()
---> 17 df.withColumn('ok', pandas_udf(f=functools.partial(ok, a=7), returnType='bigint')('id')).show()

/Users/stu/ZZ/spark/python/pyspark/sql/functions.py in pandas_udf(f, returnType, functionType)
   2378         return functools.partial(_create_udf, returnType=return_type, evalType=eval_type)
   2379     else:
-> 2380         return _create_udf(f=f, returnType=return_type, evalType=eval_type)
   2381
   2382

/Users/stu/ZZ/spark/python/pyspark/sql/udf.py in _create_udf(f, returnType, evalType)
     54                 argspec.varargs is None:
     55             raise ValueError(
---> 56                 "Invalid function: 0-arg pandas_udfs are not supported. "
     57                 "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
     58             )

ValueError: Invalid function: 0-arg pandas_udfs are not supported. Instead, create a 1-arg pandas_udf and ignore the arg in your function.
```

Author: Michael (Stu) Stewart <mstewart141@gmail.com>

Closes #20900 from mstewart141/udfkw2.
2018-03-26 12:45:45 +09:00
Bryan Cutler a9350d7095 [SPARK-23700][PYTHON] Cleanup imports in pyspark.sql
## What changes were proposed in this pull request?

This cleans up unused imports, mainly from pyspark.sql module.  Added a note in function.py that imports `UserDefinedFunction` only to maintain backwards compatibility for using `from pyspark.sql.function import UserDefinedFunction`.

## How was this patch tested?

Existing tests and built docs.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #20892 from BryanCutler/pyspark-cleanup-imports-SPARK-23700.
2018-03-26 12:42:32 +09:00
Benjamin Peterson 7013eea11c [SPARK-23522][PYTHON] always use sys.exit over builtin exit
The exit() builtin is only for interactive use. applications should use sys.exit().

## What changes were proposed in this pull request?

All usage of the builtin `exit()` function is replaced by `sys.exit()`.

## How was this patch tested?

I ran `python/run-tests`.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Benjamin Peterson <benjamin@python.org>

Closes #20682 from benjaminp/sys-exit.
2018-03-08 20:38:34 +09:00
Li Jin 2cb23a8f51 [SPARK-23011][SQL][PYTHON] Support alternative function form with group aggregate pandas UDF
## What changes were proposed in this pull request?

This PR proposes to support an alternative function from with group aggregate pandas UDF.

The current form:
```
def foo(pdf):
    return ...
```
Takes a single arg that is a pandas DataFrame.

With this PR, an alternative form is supported:
```
def foo(key, pdf):
    return ...
```
The alternative form takes two argument - a tuple that presents the grouping key, and a pandas DataFrame represents the data.

## How was this patch tested?

GroupbyApplyTests

Author: Li Jin <ice.xelloss@gmail.com>

Closes #20295 from icexelloss/SPARK-23011-groupby-apply-key.
2018-03-08 20:29:07 +09:00
Mihaly Toth a366b950b9 [SPARK-23329][SQL] Fix documentation of trigonometric functions
## What changes were proposed in this pull request?

Provide more details in trigonometric function documentations. Referenced `java.lang.Math` for further details in the descriptions.
## How was this patch tested?

Ran full build, checked generated documentation manually

Author: Mihaly Toth <misutoth@gmail.com>

Closes #20618 from misutoth/trigonometric-doc.
2018-03-05 23:46:40 +09:00
Huaxin Gao 8acb51f08b [SPARK-23084][PYTHON] Add unboundedPreceding(), unboundedFollowing() and currentRow() to PySpark
## What changes were proposed in this pull request?

Added unboundedPreceding(), unboundedFollowing() and currentRow() to PySpark, also updated the rangeBetween API

## How was this patch tested?

did unit test on my local. Please let me know if I need to add unit test in tests.py

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #20400 from huaxingao/spark_23084.
2018-02-11 18:55:38 +09:00
gatorsmile c36fecc3b4 [SPARK-23327][SQL] Update the description and tests of three external API or functions
## What changes were proposed in this pull request?
Update the description and tests of three external API or functions `createFunction `, `length` and `repartitionByRange `

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20495 from gatorsmile/updateFunc.
2018-02-06 16:46:43 -08:00
gatorsmile 7a2ada223e [SPARK-23261][PYSPARK] Rename Pandas UDFs
## What changes were proposed in this pull request?
Rename the public APIs and names of pandas udfs.

- `PANDAS SCALAR UDF` -> `SCALAR PANDAS UDF`
- `PANDAS GROUP MAP UDF` -> `GROUPED MAP PANDAS UDF`
- `PANDAS GROUP AGG UDF` -> `GROUPED AGG PANDAS UDF`

## How was this patch tested?
The existing tests

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20428 from gatorsmile/renamePandasUDFs.
2018-01-30 21:55:55 +09:00
Li Jin b2ce17b4c9 [SPARK-22274][PYTHON][SQL] User-defined aggregation functions with pandas udf (full shuffle)
## What changes were proposed in this pull request?

Add support for using pandas UDFs with groupby().agg().

This PR introduces a new type of pandas UDF - group aggregate pandas UDF. This type of UDF defines a transformation of multiple pandas Series -> a scalar value. Group aggregate pandas UDFs can be used with groupby().agg(). Note group aggregate pandas UDF doesn't support partial aggregation, i.e., a full shuffle is required.

This PR doesn't support group aggregate pandas UDFs that return ArrayType, StructType or MapType. Support for these types is left for future PR.

## How was this patch tested?

GroupbyAggPandasUDFTests

Author: Li Jin <ice.xelloss@gmail.com>

Closes #19872 from icexelloss/SPARK-22274-groupby-agg.
2018-01-23 14:11:30 +09:00
Takuya UESHIN 5063b74811 [SPARK-23141][SQL][PYSPARK] Support data type string as a returnType for registerJavaFunction.
## What changes were proposed in this pull request?

Currently `UDFRegistration.registerJavaFunction` doesn't support data type string as a `returnType` whereas `UDFRegistration.register`, `udf`, or `pandas_udf` does.
We can support it for `UDFRegistration.registerJavaFunction` as well.

## How was this patch tested?

Added a doctest and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #20307 from ueshin/issues/SPARK-23141.
2018-01-18 22:33:04 +09:00
hyukjinkwon 39d244d921 [SPARK-23122][PYTHON][SQL] Deprecate register* for UDFs in SQLContext and Catalog in PySpark
## What changes were proposed in this pull request?

This PR proposes to deprecate `register*` for UDFs in `SQLContext` and `Catalog` in Spark 2.3.0.

These are inconsistent with Scala / Java APIs and also these basically do the same things with `spark.udf.register*`.

Also, this PR moves the logcis from `[sqlContext|spark.catalog].register*` to `spark.udf.register*` and reuse the docstring.

This PR also handles minor doc corrections. It also includes https://github.com/apache/spark/pull/20158

## How was this patch tested?

Manually tested, manually checked the API documentation and tests added to check if deprecated APIs call the aliases correctly.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20288 from HyukjinKwon/deprecate-udf.
2018-01-18 14:51:05 +09:00
Takeshi Yamamuro b59808385c [SPARK-23023][SQL] Cast field data to strings in showString
## What changes were proposed in this pull request?
The current `Datset.showString` prints rows thru `RowEncoder` deserializers like;
```
scala> Seq(Seq(Seq(1, 2), Seq(3), Seq(4, 5, 6))).toDF("a").show(false)
+------------------------------------------------------------+
|a                                                           |
+------------------------------------------------------------+
|[WrappedArray(1, 2), WrappedArray(3), WrappedArray(4, 5, 6)]|
+------------------------------------------------------------+
```
This result is incorrect because the correct one is;
```
scala> Seq(Seq(Seq(1, 2), Seq(3), Seq(4, 5, 6))).toDF("a").show(false)
+------------------------+
|a                       |
+------------------------+
|[[1, 2], [3], [4, 5, 6]]|
+------------------------+
```
So, this pr fixed code in `showString` to cast field data to strings before printing.

## How was this patch tested?
Added tests in `DataFrameSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20214 from maropu/SPARK-23023.
2018-01-15 16:26:52 +08:00
hyukjinkwon cd9f49a2ae [SPARK-22980][PYTHON][SQL] Clarify the length of each series is of each batch within scalar Pandas UDF
## What changes were proposed in this pull request?

This PR proposes to add a note that saying the length of a scalar Pandas UDF's `Series` is not of the whole input column but of the batch.

We are fine for a group map UDF because the usage is different from our typical UDF but scalar UDFs might cause confusion with the normal UDF.

For example, please consider this example:

```python
from pyspark.sql.functions import pandas_udf, col, lit

df = spark.range(1)
f = pandas_udf(lambda x, y: len(x) + y, LongType())
df.select(f(lit('text'), col('id'))).show()
```

```
+------------------+
|<lambda>(text, id)|
+------------------+
|                 1|
+------------------+
```

```python
from pyspark.sql.functions import udf, col, lit

df = spark.range(1)
f = udf(lambda x, y: len(x) + y, "long")
df.select(f(lit('text'), col('id'))).show()
```

```
+------------------+
|<lambda>(text, id)|
+------------------+
|                 4|
+------------------+
```

## How was this patch tested?

Manually built the doc and checked the output.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20237 from HyukjinKwon/SPARK-22980.
2018-01-13 16:13:44 +09:00
Li Jin f2dd8b9237 [SPARK-22930][PYTHON][SQL] Improve the description of Vectorized UDFs for non-deterministic cases
## What changes were proposed in this pull request?

Add tests for using non deterministic UDFs in aggregate.

Update pandas_udf docstring w.r.t to determinism.

## How was this patch tested?
test_nondeterministic_udf_in_aggregate

Author: Li Jin <ice.xelloss@gmail.com>

Closes #20142 from icexelloss/SPARK-22930-pandas-udf-deterministic.
2018-01-06 16:11:20 +08:00
Takeshi Yamamuro f2b3525c17 [SPARK-22771][SQL] Concatenate binary inputs into a binary output
## What changes were proposed in this pull request?
This pr modified `concat` to concat binary inputs into a single binary output.
`concat` in the current master always output data as a string. But, in some databases (e.g., PostgreSQL), if all inputs are binary, `concat` also outputs binary.

## How was this patch tested?
Added tests in `SQLQueryTestSuite` and `TypeCoercionSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #19977 from maropu/SPARK-22771.
2017-12-30 14:09:56 +08:00
Marco Gaido ff48b1b338 [SPARK-22901][PYTHON] Add deterministic flag to pyspark UDF
## What changes were proposed in this pull request?

In SPARK-20586 the flag `deterministic` was added to Scala UDF, but it is not available for python UDF. This flag is useful for cases when the UDF's code can return different result with the same input. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query. This can lead to unexpected behavior.

This PR adds the deterministic flag, via the `asNondeterministic` method, to let the user mark the function as non-deterministic and therefore avoid the optimizations which might lead to strange behaviors.

## How was this patch tested?

Manual tests:
```
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>> df_br = spark.createDataFrame([{'name': 'hello'}])
>>> import random
>>> udf_random_col =  udf(lambda: int(100*random.random()), IntegerType()).asNondeterministic()
>>> df_br = df_br.withColumn('RAND', udf_random_col())
>>> random.seed(1234)
>>> udf_add_ten =  udf(lambda rand: rand + 10, IntegerType())
>>> df_br.withColumn('RAND_PLUS_TEN', udf_add_ten('RAND')).show()
+-----+----+-------------+
| name|RAND|RAND_PLUS_TEN|
+-----+----+-------------+
|hello|   3|           13|
+-----+----+-------------+

```

Author: Marco Gaido <marcogaido91@gmail.com>
Author: Marco Gaido <mgaido@hortonworks.com>

Closes #19929 from mgaido91/SPARK-22629.
2017-12-26 06:39:40 -08:00
Bryan Cutler 59d52631eb [SPARK-22324][SQL][PYTHON] Upgrade Arrow to 0.8.0
## What changes were proposed in this pull request?

Upgrade Spark to Arrow 0.8.0 for Java and Python.  Also includes an upgrade of Netty to 4.1.17 to resolve dependency requirements.

The highlights that pertain to Spark for the update from Arrow versoin 0.4.1 to 0.8.0 include:

* Java refactoring for more simple API
* Java reduced heap usage and streamlined hot code paths
* Type support for DecimalType, ArrayType
* Improved type casting support in Python
* Simplified type checking in Python

## How was this patch tested?

Existing tests

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #19884 from BryanCutler/arrow-upgrade-080-SPARK-22324.
2017-12-21 20:43:56 +09:00
Youngbin Kim 6e36d8d562 [SPARK-22829] Add new built-in function date_trunc()
## What changes were proposed in this pull request?

Adding date_trunc() as a built-in function.
`date_trunc` is common in other databases, but Spark or Hive does not have support for this. `date_trunc` is commonly used by data scientists and business intelligence application such as Superset (https://github.com/apache/incubator-superset).
We do have `trunc` but this only works with 'MONTH' and 'YEAR' level on the DateType input.

date_trunc() in other databases:
AWS Redshift: http://docs.aws.amazon.com/redshift/latest/dg/r_DATE_TRUNC.html
PostgreSQL: https://www.postgresql.org/docs/9.1/static/functions-datetime.html
Presto: https://prestodb.io/docs/current/functions/datetime.html

## How was this patch tested?

Unit tests

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Youngbin Kim <ykim828@hotmail.com>

Closes #20015 from youngbink/date_trunc.
2017-12-19 20:22:33 -08:00
Liang-Chi Hsieh 9d45e675e2 [SPARK-22541][SQL] Explicitly claim that Python udfs can't be conditionally executed with short-curcuit evaluation
## What changes were proposed in this pull request?

Besides conditional expressions such as `when` and `if`, users may want to conditionally execute python udfs by short-curcuit evaluation. We should also explicitly note that python udfs don't support this kind of conditional execution too.

## How was this patch tested?

N/A, just document change.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19787 from viirya/SPARK-22541.
2017-11-21 09:36:37 +01:00
Li Jin 7d039e0c0a [SPARK-22409] Introduce function type argument in pandas_udf
## What changes were proposed in this pull request?

* Add a "function type" argument to pandas_udf.
* Add a new public enum class `PandasUdfType` in pyspark.sql.functions
* Refactor udf related code from pyspark.sql.functions to pyspark.sql.udf
* Merge "PythonUdfType" and "PythonEvalType" into a single enum class "PythonEvalType"

Example:
```
from pyspark.sql.functions import pandas_udf, PandasUDFType

pandas_udf('double', PandasUDFType.SCALAR):
def plus_one(v):
    return v + 1
```

## Design doc
https://docs.google.com/document/d/1KlLaa-xJ3oz28xlEJqXyCAHU3dwFYkFs_ixcUXrJNTc/edit

## How was this patch tested?

Added PandasUDFTests

## TODO:
* [x] Implement proper enum type for `PandasUDFType`
* [x] Update documentation
* [x] Add more tests in PandasUDFTests

Author: Li Jin <ice.xelloss@gmail.com>

Closes #19630 from icexelloss/spark-22409-pandas-udf-type.
2017-11-17 16:43:08 +01:00
ptkool d01044233c [SPARK-22456][SQL] Add support for dayofweek function
## What changes were proposed in this pull request?
This PR adds support for a new function called `dayofweek` that returns the day of the week of the given argument as an integer value in the range 1-7, where 1 represents Sunday.

## How was this patch tested?
Unit tests and manual tests.

Author: ptkool <michael.styles@shopify.com>

Closes #19672 from ptkool/day_of_week_function.
2017-11-09 14:44:39 +09:00
Liang-Chi Hsieh 07f390a27d [SPARK-22347][PYSPARK][DOC] Add document to notice users for using udfs with conditional expressions
## What changes were proposed in this pull request?

Under the current execution mode of Python UDFs, we don't well support Python UDFs as branch values or else value in CaseWhen expression.

Since to fix it might need the change not small (e.g., #19592) and this issue has simpler workaround. We should just notice users in the document about this.

## How was this patch tested?

Only document change.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19617 from viirya/SPARK-22347-3.
2017-11-01 13:09:35 +01:00
hyukjinkwon d9798c834f [SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?

This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.

This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:

**Before**

<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />

**After**

<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />

For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):

```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```

so, it won't actually mess up the terminal much unless it is intended.

If this is intendedly enabled, it'd should as below:

```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
  "Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```

These instances were found by:

```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```

## How was this patch tested?

Manually tested.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-24 12:44:47 +09:00
Takuya UESHIN b8624b06e5 [SPARK-20396][SQL][PYSPARK][FOLLOW-UP] groupby().apply() with pandas udf
## What changes were proposed in this pull request?

This is a follow-up of #18732.
This pr modifies `GroupedData.apply()` method to convert pandas udf to grouped udf implicitly.

## How was this patch tested?

Exisiting tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #19517 from ueshin/issues/SPARK-20396/fup2.
2017-10-20 12:44:30 -07:00
Li Jin bfc7e1fe1a [SPARK-20396][SQL][PYSPARK] groupby().apply() with pandas udf
## What changes were proposed in this pull request?

This PR adds an apply() function on df.groupby(). apply() takes a pandas udf that is a transformation on `pandas.DataFrame` -> `pandas.DataFrame`.

Static schema
-------------------
```
schema = df.schema

pandas_udf(schema)
def normalize(df):
    df = df.assign(v1 = (df.v1 - df.v1.mean()) / df.v1.std()
    return df

df.groupBy('id').apply(normalize)
```
Dynamic schema
-----------------------
**This use case is removed from the PR and we will discuss this as a follow up. See discussion https://github.com/apache/spark/pull/18732#pullrequestreview-66583248**

Another example to use pd.DataFrame dtypes as output schema of the udf:

```
sample_df = df.filter(df.id == 1).toPandas()

def foo(df):
      ret = # Some transformation on the input pd.DataFrame
      return ret

foo_udf = pandas_udf(foo, foo(sample_df).dtypes)

df.groupBy('id').apply(foo_udf)
```
In interactive use case, user usually have a sample pd.DataFrame to test function `foo` in their notebook. Having been able to use `foo(sample_df).dtypes` frees user from specifying the output schema of `foo`.

Design doc: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md

## How was this patch tested?
* Added GroupbyApplyTest

Author: Li Jin <ice.xelloss@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Author: Bryan Cutler <cutlerb@gmail.com>

Closes #18732 from icexelloss/groupby-apply-SPARK-20396.
2017-10-11 07:32:01 +09:00
Bryan Cutler 7bf4da8a33 [MINOR] Fixed up pandas_udf related docs and formatting
## What changes were proposed in this pull request?

Fixed some minor issues with pandas_udf related docs and formatting.

## How was this patch tested?

NA

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #19375 from BryanCutler/arrow-pandas_udf-cleanup-minor.
2017-09-28 10:24:51 +09:00
Bryan Cutler d8e825e3bc [SPARK-22106][PYSPARK][SQL] Disable 0-parameter pandas_udf and add doctests
## What changes were proposed in this pull request?

This change disables the use of 0-parameter pandas_udfs due to the API being overly complex and awkward, and can easily be worked around by using an index column as an input argument.  Also added doctests for pandas_udfs which revealed bugs for handling empty partitions and using the pandas_udf decorator.

## How was this patch tested?

Reworked existing 0-parameter test to verify error is raised, added doctest for pandas_udf, added new tests for empty partition and decorator usage.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #19325 from BryanCutler/arrow-pandas_udf-0-param-remove-SPARK-22106.
2017-09-26 10:54:00 +09:00
Bryan Cutler 27fc536d9a [SPARK-21190][PYSPARK] Python Vectorized UDFs
This PR adds vectorized UDFs to the Python API

**Proposed API**
Introduce a flag to turn on vectorization for a defined UDF, for example:

```
pandas_udf(DoubleType())
def plus(a, b)
    return a + b
```
or

```
plus = pandas_udf(lambda a, b: a + b, DoubleType())
```
Usage is the same as normal UDFs

0-parameter UDFs
pandas_udf functions can declare an optional `**kwargs` and when evaluated, will contain a key "size" that will give the required length of the output.  For example:

```
pandas_udf(LongType())
def f0(**kwargs):
    return pd.Series(1).repeat(kwargs["size"])

df.select(f0())
```

Added new unit tests in pyspark.sql that are enabled if pyarrow and Pandas are available.

- [x] Fix support for promoted types with null values
- [ ] Discuss 0-param UDF API (use of kwargs)
- [x] Add tests for chained UDFs
- [ ] Discuss behavior when pyarrow not installed / enabled
- [ ] Cleanup pydoc and add user docs

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18659 from BryanCutler/arrow-vectorized-udfs-SPARK-21404.
2017-09-22 16:17:50 +08:00