### What changes were proposed in this pull request?
Implement `Index.map`.
The PR is based on https://github.com/databricks/koalas/pull/2136. Thanks awdavidson for the prototype.
`map` of CategoricalIndex and DatetimeIndex will be implemented in separate PRs.
### Why are the changes needed?
Mapping values using input correspondence (a dict, Series, or function) is supported in pandas as [Index.map](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Index.map.html).
We shall also support hat.
### Does this PR introduce _any_ user-facing change?
Yes. `Index.map` is available now.
```py
>>> psidx = ps.Index([1, 2, 3])
>>> psidx.map({1: "one", 2: "two", 3: "three"})
Index(['one', 'two', 'three'], dtype='object')
>>> psidx.map(lambda id: "{id} + 1".format(id=id))
Index(['1 + 1', '2 + 1', '3 + 1'], dtype='object')
>>> pser = pd.Series(["one", "two", "three"], index=[1, 2, 3])
>>> psidx.map(pser)
Index(['one', 'two', 'three'], dtype='object')
```
### How was this patch tested?
Unit tests.
Closes#33694 from xinrong-databricks/index_map.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(cherry picked from commit 4dcd746025)
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
This patch supports dynamic gap duration in session window.
### Why are the changes needed?
The gap duration used in session window for now is a static value. To support more complex usage, it is better to support dynamic gap duration which determines the gap duration by looking at the current data. For example, in our usecase, we may have different gap by looking at the certain column in the input rows.
### Does this PR introduce _any_ user-facing change?
Yes, users can specify dynamic gap duration.
### How was this patch tested?
Modified existing tests and new test.
Closes#33691 from viirya/dynamic-session-window-gap.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 8b8d91cf64)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
This PR is followup for https://github.com/apache/spark/pull/32964, to improve the warning message.
### Why are the changes needed?
To improve the warning message.
### Does this PR introduce _any_ user-facing change?
The warning is changed from "Deprecated in 3.2, Use `spark.to_spark_io` instead." to "Deprecated in 3.2, Use `DataFrame.spark.to_spark_io` instead."
### How was this patch tested?
Manually run `dev/lint-python`
Closes#33631 from itholic/SPARK-35811-followup.
Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 3d72c20e64)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Better error messages for DataTypeOps against lists.
### Why are the changes needed?
Currently, DataTypeOps against lists throw a Py4JJavaError, we shall throw a TypeError with proper messages instead.
### Does this PR introduce _any_ user-facing change?
Yes. A TypeError message will be showed rather than a Py4JJavaError.
From:
```py
>>> import pyspark.pandas as ps
>>> ps.Series([1, 2, 3]) > [3, 2, 1]
Traceback (most recent call last):
...
py4j.protocol.Py4JJavaError: An error occurred while calling o107.gt.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [3, 2, 1]
...
```
To:
```py
>>> import pyspark.pandas as ps
>>> ps.Series([1, 2, 3]) > [3, 2, 1]
Traceback (most recent call last):
...
TypeError: The operation can not be applied to list.
```
### How was this patch tested?
Unit tests.
Closes#33581 from xinrong-databricks/data_type_ops_list.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 8ca11fe39f)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Partially backport from #33598 to avoid unexpected error caused by pandas 1.3.
### Why are the changes needed?
If uses tries to use pandas 1.3 as the underlying pandas, it will raise unexpected errors caused by removed APIs or behavior change.
Note that pandas API on Spark 3.2 will still follow the pandas 1.2 behavior.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#33614 from ueshin/issues/SPARK-36367/3.2/partially_backport.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Change the `NullType.simpleString` to "void" to set "void" as the formal type name of `NullType`
### Why are the changes needed?
This PR is intended to address the type name discussion in PR #28833. Here are the reasons:
1. The type name of NullType is displayed everywhere, e.g. schema string, error message, document. Hence it's not possible to hide it from users, we have to choose a proper name
2. The "void" is widely used as the type name of "NULL", e.g. Hive, pgSQL
3. Changing to "void" can enable the round trip of `toDDL`/`fromDDL` for NullType. (i.e. make `from_json(col, schema.toDDL)`) work
### Does this PR introduce _any_ user-facing change?
Yes, the type name of "NULL" is changed from "null" to "void". for example:
```
scala> sql("select null as a, 1 as b").schema.catalogString
res5: String = struct<a:void,b:int>
```
### How was this patch tested?
existing test cases
Closes#33437 from linhongliu-db/SPARK-36224-void-type-name.
Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 2f700773c2)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This patch set value to `<NA>` (pd.NA) in BooleanExtensionOps and StringExtensionOps.
### Why are the changes needed?
The pandas behavior:
```python
>>> pd.Series([True, False, None], dtype="boolean").astype(str).tolist()
['True', 'False', '<NA>']
>>> pd.Series(['s1', 's2', None], dtype="string").astype(str).tolist()
['1', '2', '<NA>']
```
pandas on spark
```python
>>> import pandas as pd
>>> from pyspark import pandas as ps
# Before
>>> ps.from_pandas(pd.Series([True, False, None], dtype="boolean")).astype(str).tolist()
['True', 'False', 'None']
>>> ps.from_pandas(pd.Series(['s1', 's2', None], dtype="string")).astype(str).tolist()
['True', 'False', 'None']
# After
>>> ps.from_pandas(pd.Series([True, False, None], dtype="boolean")).astype(str).tolist()
['True', 'False', '<NA>']
>>> ps.from_pandas(pd.Series(['s1', 's2', None], dtype="string")).astype(str).tolist()
['s1', 's2', '<NA>']
```
See more in [SPARK-35976](https://issues.apache.org/jira/browse/SPARK-35976)
### Does this PR introduce _any_ user-facing change?
Yes, return `<NA>` when None to follow the pandas behavior
### How was this patch tested?
Change the ut to cover this scenario.
Closes#33585 from Yikun/SPARK-35976.
Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit f04e991e6a)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Remove old workarounds related to null ordering.
### Why are the changes needed?
In pandas-on-Spark, there are still some remaining places to call `Column._jc.(asc|desc)_nulls_(first|last)` as a workaround from Koalas to support Spark 2.3.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Modified a couple of tests and existing tests.
Closes#33597 from ueshin/issues/SPARK-36365/nulls_first_last.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 90d31dfcb7)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR is a followup of https://github.com/apache/spark/pull/33570, which mistakenly changed the default value of the default index
### Why are the changes needed?
It was mistakenly changed. It was changed to check if the tests actually pass but I forgot to change it back.
### Does this PR introduce _any_ user-facing change?
No, it's not related yet. It fixes up the mistake of the default value mistakenly changed.
(Changed default value makes the test flaky because of the order affected by extra shuffle)
### How was this patch tested?
Manually tested.
Closes#33596 from HyukjinKwon/SPARK-36338-followup.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 74a6b9d23b)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Move some logic related to `F.nanvl` to `DataTypeOps`.
### Why are the changes needed?
There are several places to branch by `FloatType` or `DoubleType` to use `F.nanvl` but `DataTypeOps` should handle it.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#33582 from ueshin/issues/SPARK-36350/nan_to_null.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(cherry picked from commit 895e3f5e2a)
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
This PR proposes to implement `distributed-sequence` index in Scala side.
### Why are the changes needed?
- Avoid unnecessary (de)serialization
- Keep the nullability in the input DataFrame when `distributed-sequence` is enabled. During the serialization, all fields are being nullable for now (see https://github.com/apache/spark/pull/32775#discussion_r645882104)
### Does this PR introduce _any_ user-facing change?
No to end users since pandas API on Spark is not released yet.
```python
import pyspark.pandas as ps
ps.set_option('compute.default_index_type', 'distributed-sequence')
ps.range(1).spark.print_schema()
```
Before:
```
root
|-- id: long (nullable = true)
```
After:
```
root
|-- id: long (nullable = false)
```
### How was this patch tested?
Manually tested, and existing tests should cover them.
Closes#33570 from HyukjinKwon/SPARK-36338.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit c6140d4d0a)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR is a partial revert of https://github.com/apache/spark/pull/33567 that keeps the logic to skip mlflow related tests if that's not installed.
### Why are the changes needed?
It's consistent with other libraries, e.g) PyArrow.
It also fixes up the potential dev breakage (see also https://github.com/apache/spark/pull/33567#issuecomment-889841829)
### Does this PR introduce _any_ user-facing change?
No, dev-only.
### How was this patch tested?
This is a partial revert. CI should test it out too.
Closes#33589 from HyukjinKwon/SPARK-36254.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit dd2ca0aee2)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR proposes adding a Python package, `mlflow` and `sklearn` to enable the MLflow test in pandas API on Spark.
### Why are the changes needed?
To enable the MLflow test in pandas API on Spark.
### Does this PR introduce _any_ user-facing change?
No, it's test-only
### How was this patch tested?
Manually test on local, with `python/run-tests --testnames pyspark.pandas.mlflow`.
Closes#33567 from itholic/SPARK-36254.
Lead-authored-by: itholic <haejoon.lee@databricks.com>
Co-authored-by: Haejoon Lee <44108233+itholic@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit abce61f3fd)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR is follow-up for https://github.com/apache/spark/pull/33414 to support the more options for `mode` argument for all APIs that has `mode` argument, not only `DataFrame.to_csv`.
### Why are the changes needed?
To keep the usage consistency for the arguments that have same name.
### Does this PR introduce _any_ user-facing change?
More options is available for all APIs that has `mode` argument, same as `DataFrame.to_csv`
### How was this patch tested?
Manually test on local
Closes#33569 from itholic/SPARK-35085-followup.
Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 94cb2bbbc2)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Reuse `IndexOpsMixin.isnull()` where the null check is needed.
### Why are the changes needed?
There are some places where we can reuse `IndexOpsMixin.isnull()` instead of directly using Spark `Column`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#33562 from ueshin/issues/SPARK-36333/reuse_isnull.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(cherry picked from commit 07ed82be0b)
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
Improve the error message for wrong type when calling dropDuplicates in pyspark.
### Why are the changes needed?
The current error message is cryptic and can be unclear to less experienced users.
### Does this PR introduce _any_ user-facing change?
Yes, it adds a type error for when a user gives the wrong type to dropDuplicates
### How was this patch tested?
There is currently no testing for error messages in pyspark dataframe functions
Closes#33364 from sammyjmoseley/sm/add-type-checking-for-drop-duplicates.
Lead-authored-by: Samuel Moseley <smoseley@palantir.com>
Co-authored-by: Sammy Moseley <moseley.sammy@gmail.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit a07df1acc6)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Improve the rest of DataTypeOps tests by avoiding joins.
### Why are the changes needed?
bool, string, numeric DataTypeOps tests have been improved by avoiding joins.
We should improve the rest of the DataTypeOps tests in the same way.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests.
Closes#33546 from xinrong-databricks/test_no_join.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(cherry picked from commit 9c5cb99d6e)
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
Adjust `astype` of fractional Series with missing values to follow pandas.
Non-goal: Adjust the issue of `astype` of Decimal Series with missing values to follow pandas.
### Why are the changes needed?
`astype` of fractional Series with missing values doesn't behave the same as pandas, for example, float Series returns itself when `astype` integer, while a ValueError is raised in pandas.
We ought to follow pandas.
### Does this PR introduce _any_ user-facing change?
Yes.
From:
```py
>>> import numpy as np
>>> import pyspark.pandas as ps
>>> psser = ps.Series([1, 2, np.nan])
>>> psser.astype(int)
0 1.0
1 2.0
2 NaN
dtype: float64
```
To:
```py
>>> import numpy as np
>>> import pyspark.pandas as ps
>>> psser = ps.Series([1, 2, np.nan])
>>> psser.astype(int)
Traceback (most recent call last):
...
ValueError: Cannot convert fractions with missing values to integer
```
### How was this patch tested?
Unit tests.
Closes#33466 from xinrong-databricks/extension_astype.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(cherry picked from commit 01213095e2)
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
Fix `Series`/`Index.copy()` to drop extra columns.
### Why are the changes needed?
Currently `Series`/`Index.copy()` keeps the copy of the anchor DataFrame which holds unnecessary columns.
We can drop those when `Series`/`Index.copy()`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#33549 from ueshin/issues/SPARK-36320/index_ops_copy.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 3c76a924ce)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Fix `IndexOpsMixin.hasnans` to use `IndexOpsMixin.isnull().any()`.
### Why are the changes needed?
`IndexOpsMixin.hasnans` has a potential issue to cause `a window function inside an aggregate function` error.
Also it returns a wrong value when the `Series`/`Index` is empty.
```py
>>> ps.Series([]).hasnans
None
```
whereas:
```py
>>> pd.Series([]).hasnans
False
```
`IndexOpsMixin.any()` is safe for both cases.
### Does this PR introduce _any_ user-facing change?
`IndexOpsMixin.hasnans` will return `False` when empty.
### How was this patch tested?
Added some tests.
Closes#33547 from ueshin/issues/SPARK-36310/hasnan.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit bcc595c112)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
The following code should type-check:
```python3
import uuid
import pyspark.sql.functions as F
my_udf = F.udf(lambda: str(uuid.uuid4())).asNondeterministic()
```
### What changes were proposed in this pull request?
The `udf` function should return a more specific type.
### Why are the changes needed?
Right now, `mypy` will throw spurious errors, such as for the code given above.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
This was not tested. Sorry, I am not very familiar with this repo -- are there any typing tests?
Closes#33399 from luranhe/patch-1.
Lead-authored-by: Luran He <luranjhe@gmail.com>
Co-authored-by: Luran He <luran.he@compass.com>
Signed-off-by: zero323 <mszymkiewicz@gmail.com>
(cherry picked from commit ede1bc6b51)
Signed-off-by: zero323 <mszymkiewicz@gmail.com>
### What changes were proposed in this pull request?
Clean up `CategoricalAccessor` and `CategoricalIndex`.
- Clean up the classes
- Add deprecation warnings
- Clean up the docs
### Why are the changes needed?
To finalize the series of PRs for `CategoricalAccessor` and `CategoricalIndex`, we should clean up the classes.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#33528 from ueshin/issues/SPARK-36267/cleanup.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit c40d9d46f1)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Set the result to 1 when the exp with 0(or False).
### Why are the changes needed?
Currently, exponentiation between fractional series and bools is not consistent with pandas' behavior.
```
>>> pser = pd.Series([1, 2, np.nan], dtype=float)
>>> psser = ps.from_pandas(pser)
>>> pser ** False
0 1.0
1 1.0
2 1.0
dtype: float64
>>> psser ** False
0 1.0
1 1.0
2 NaN
dtype: float64
```
We ought to adjust that.
See more in [SPARK-36142](https://issues.apache.org/jira/browse/SPARK-36142)
### Does this PR introduce _any_ user-facing change?
Yes, it introduces a user-facing change, resulting in a different result for pow between fractional Series with missing values and bool literal, the results follow pandas behavior.
### How was this patch tested?
- Add test_pow_with_float_nan ut
- Exsiting test in test_pow
Closes#33521 from Yikun/SPARK-36142.
Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit d52c2de08b)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Add set_categories to CategoricalAccessor and CategoricalIndex.
### Why are the changes needed?
set_categories is supported in pandas CategoricalAccessor and CategoricalIndex. We ought to follow pandas.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to use `set_categories`.
### How was this patch tested?
Unit tests.
Closes#33506 from xinrong-databricks/set_categories.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(cherry picked from commit 55971b70fe)
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
Changing references to Dataset in python docstrings to DataFrame
### Why are the changes needed?
no Dataset class in pyspark
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Doc change only
Closes#33438 from dominikgehl/feature/SPARK-36225.
Lead-authored-by: Dominik Gehl <dog@open.ch>
Co-authored-by: Dominik Gehl <gehl@fastmail.fm>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit ae1c20ee0d)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Fix `lint-python` to pick `PYTHON_EXECUTABLE` from the environment variable first to switch the Python and explicitly specify `PYTHON_EXECUTABLE` to use `python3.9` in CI.
### Why are the changes needed?
Currently `lint-python` uses `python3`, but it's not the one we expect in CI.
As a result, `black` check is not working.
```
The python3 -m black command was not found. Skipping black checks for now.
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
The `black` check in `lint-python` should work.
Closes#33507 from ueshin/issues/SPARK-36279/lint-python.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 663cbdfbe5)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Fix equality comparison of unordered Categoricals.
### Why are the changes needed?
Codes of a Categorical Series are used for Series equality comparison. However, that doesn't apply to unordered Categoricals, where the same value can have different codes in two same categories in a different order.
So we should map codes to value respectively and then compare the equality of value.
### Does this PR introduce _any_ user-facing change?
Yes.
From:
```py
>>> psser1 = ps.Series(pd.Categorical(list("abca")))
>>> psser2 = ps.Series(pd.Categorical(list("bcaa"), categories=list("bca")))
>>> with ps.option_context("compute.ops_on_diff_frames", True):
... (psser1 == psser2).sort_index()
...
0 True
1 True
2 True
3 False
dtype: bool
```
To:
```py
>>> psser1 = ps.Series(pd.Categorical(list("abca")))
>>> psser2 = ps.Series(pd.Categorical(list("bcaa"), categories=list("bca")))
>>> with ps.option_context("compute.ops_on_diff_frames", True):
... (psser1 == psser2).sort_index()
...
0 False
1 False
2 False
3 True
dtype: bool
```
### How was this patch tested?
Unit tests.
Closes#33497 from xinrong-databricks/cat_bug.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(cherry picked from commit 85adc2ff60)
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
Add `reorder_categories` to `CategoricalAccessor` and `CategoricalIndex`.
### Why are the changes needed?
We should implement `reorder_categories` in `CategoricalAccessor` and `CategoricalIndex`.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to use `reorder_categories`.
### How was this patch tested?
Added some tests.
Closes#33499 from ueshin/issues/SPARK-36264/reorder_categories.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(cherry picked from commit e12bc4d31d)
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
additional links to other classes in python documentation
### Why are the changes needed?
python docstring syntax wasn't fully used everywhere
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Documentation change only
Closes#33440 from dominikgehl/feature/python-docstrings.
Authored-by: Dominik Gehl <dog@open.ch>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 701756ac95)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Add `remove_unused_categories` to `CategoricalAccessor` and `CategoricalIndex`.
### Why are the changes needed?
We should implement `remove_unused_categories` in `CategoricalAccessor` and `CategoricalIndex`.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to use `remove_unused_categories`.
### How was this patch tested?
Added some tests.
Closes#33485 from ueshin/issues/SPARK-36261/remove_unused_categories.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 2fe12a7520)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Improve bool, string, numeric DataTypeOps tests by avoiding joins.
Previously, bool, string, numeric DataTypeOps tests are conducted between two different Series.
After the PR, bool, string, numeric DataTypeOps tests should perform on a single DataFrame.
### Why are the changes needed?
A considerable number of DataTypeOps tests have operations on different Series, so joining is needed, which takes a long time.
We shall avoid joins for a shorter test duration.
The majority of joins happen in bool, string, numeric DataTypeOps tests, so we improve them first.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests.
Closes#33402 from xinrong-databricks/datatypeops_diffframe.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 75fd1f5b82)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Use `Column.__getitem__` instead of `Column.getItem` to suppress warnings.
### Why are the changes needed?
In pandas API on Spark code base, there are some places using `Column.getItem` with `Column` object, but it shows a deprecation warning.
### Does this PR introduce _any_ user-facing change?
Yes, users won't see the warnings anymore.
- before
```py
>>> s = ps.Series(list("abbccc"), dtype="category")
>>> s.astype(str)
/path/to/spark/python/pyspark/sql/column.py:322: FutureWarning: A column as 'key' in getItem is deprecated as of Spark 3.0, and will not be supported in the future release. Use `column[key]` or `column.key` syntax instead.
warnings.warn(
0 a
1 b
2 b
3 c
4 c
5 c
dtype: object
```
- after
```py
>>> s = ps.Series(list("abbccc"), dtype="category")
>>> s.astype(str)
0 a
1 b
2 b
3 c
4 c
5 c
dtype: object
```
### How was this patch tested?
Existing tests.
Closes#33486 from ueshin/issues/SPARK-36265/getitem.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit a76a087f7f)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR follows up #33379 to fix build error in Sphinx
### Why are the changes needed?
The Sphinx build is failed with missing newline in docstring
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manually test the Sphinx build
Closes#33479 from itholic/SPARK-35810-followup.
Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit d1a037a27c)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR proposes adding an argument `index_col` for `ps.sql` function, to preserve the index when users want.
NOTE that the `reset_index()` have to be performed before using `ps.sql` with `index_col`.
```python
>>> psdf
A B
a 1 4
b 2 5
c 3 6
>>> psdf_reset_index = psdf.reset_index()
>>> ps.sql("SELECT * from {psdf_reset_index} WHERE A > 1", index_col="index")
A B
index
b 2 5
c 3 6
```
Otherwise, the index is always lost.
```python
>>> ps.sql("SELECT * from {psdf} WHERE A > 1")
A B
0 2 5
1 3 6
```
### Why are the changes needed?
Index is one of the key object for the existing pandas users, so we should provide the way to keep the index after computing the `ps.sql`.
### Does this PR introduce _any_ user-facing change?
Yes, the new argument is added.
### How was this patch tested?
Add a unit test and manually check the build pass.
Closes#33450 from itholic/SPARK-35809.
Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 6578f0b135)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Add `remove_categories` to `CategoricalAccessor` and `CategoricalIndex`.
### Why are the changes needed?
We should implement `remove_categories` in `CategoricalAccessor` and `CategoricalIndex`.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to use `remove_categories`.
### How was this patch tested?
Added some tests.
Closes#33474 from ueshin/issues/SPARK-36249/remove_categories.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit a3c7ae18e2)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Add `add_categories` to `CategoricalAccessor` and `CategoricalIndex`.
### Why are the changes needed?
We should implement `add_categories` in `CategoricalAccessor` and `CategoricalIndex`.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to use `add_categories`.
### How was this patch tested?
Added some tests.
Closes#33470 from ueshin/issues/SPARK-36214/add_categories.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(cherry picked from commit dcc0aaa3ef)
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
This PR adds the version that added pandas API on Spark in PySpark documentation.
### Why are the changes needed?
To document the version added.
### Does this PR introduce _any_ user-facing change?
No to end user. Spark 3.2 is not released yet.
### How was this patch tested?
Linter and documentation build.
Closes#33473 from HyukjinKwon/SPARK-36253.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit f3e29574d9)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Add categories setter to `CategoricalAccessor` and `CategoricalIndex`.
### Why are the changes needed?
We should implement categories setter in `CategoricalAccessor` and `CategoricalIndex`.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to use categories setter.
### How was this patch tested?
Added some tests.
Closes#33448 from ueshin/issues/SPARK-36188/categories_setter.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(cherry picked from commit d506815a92)
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
Add `as_ordered`/`as_unordered` to `CategoricalAccessor` and `CategoricalIndex`.
### Why are the changes needed?
We should implement `as_ordered`/`as_unordered` in `CategoricalAccessor` and `CategoricalIndex` yet.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to use `as_ordered`/`as_unordered`.
### How was this patch tested?
Added some tests.
Closes#33400 from ueshin/issues/SPARK-36186/as_ordered_unordered.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(cherry picked from commit 376fadc89c)
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
Test is flaky (https://github.com/apache/spark/runs/3109815586):
```
Traceback (most recent call last):
File "/__w/spark/spark/python/pyspark/mllib/tests/test_streaming_algorithms.py", line 391, in test_parameter_convergence
eventually(condition, catch_assertions=True)
File "/__w/spark/spark/python/pyspark/testing/utils.py", line 91, in eventually
raise lastValue
File "/__w/spark/spark/python/pyspark/testing/utils.py", line 82, in eventually
lastValue = condition()
File "/__w/spark/spark/python/pyspark/mllib/tests/test_streaming_algorithms.py", line 387, in condition
self.assertEqual(len(model_weights), len(batches))
AssertionError: 9 != 10
```
Should probably increase timeout
### Why are the changes needed?
To avoid flakiness in the test.
### Does this PR introduce _any_ user-facing change?
Nope, dev-only.
### How was this patch tested?
CI should test it out.
Closes#33427 from HyukjinKwon/SPARK-36216.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit d6b974f8ce)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This is a backport of #33377.
Revisit and manage `InternalField` in more places.
### Why are the changes needed?
There are other places we can manage `InternalField`, and we can keep extension dtypes or `CategoricalDtype`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added some tests.
Closes#33384 from ueshin/issues/SPARK-36167/3.2/internal_field.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Support comparison between a Categorical and a scalar.
There are 3 main changes:
- Modify `==` and `!=` from comparing **codes** of the Categorical to the scalar to comparing **actual values** of the Categorical to the scalar.
- Support `<`, `<=`, `>`, `>=` between a Categorical and a scalar.
- TypeError message fix.
### Why are the changes needed?
pandas supports comparison between a Categorical and a scalar, we should follow pandas' behaviors.
### Does this PR introduce _any_ user-facing change?
Yes.
Before:
```py
>>> import pyspark.pandas as ps
>>> import pandas as pd
>>> from pandas.api.types import CategoricalDtype
>>> pser = pd.Series(pd.Categorical([1, 2, 3], categories=[3, 2, 1], ordered=True))
>>> psser = ps.from_pandas(pser)
>>> psser == 2
0 True
1 False
2 False
dtype: bool
>>> psser <= 1
Traceback (most recent call last):
...
NotImplementedError: <= can not be applied to categoricals.
```
After:
```py
>>> import pyspark.pandas as ps
>>> import pandas as pd
>>> from pandas.api.types import CategoricalDtype
>>> pser = pd.Series(pd.Categorical([1, 2, 3], categories=[3, 2, 1], ordered=True))
>>> psser = ps.from_pandas(pser)
>>> psser == 2
0 False
1 True
2 False
dtype: bool
>>> psser <= 1
0 True
1 True
2 True
dtype: bool
```
### How was this patch tested?
Unit tests.
Closes#33373 from xinrong-databricks/categorical_eq.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(cherry picked from commit 8dd43351d5)
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
The `DataFrame.to_csv` has `mode` arguments both in pandas and pandas API on Spark.
However, pandas allows the string "w", "w+", "a", "a+" where as pandas-on-Spark allows "append", "overwrite", "ignore", "error" or "errorifexists".
We should map them while `mode` can still accept the existing parameters("append", "overwrite", "ignore", "error" or "errorifexists") as well.
### Why are the changes needed?
APIs in pandas-on-Spark should follows the behavior of pandas for preventing the existing pandas code break.
### Does this PR introduce _any_ user-facing change?
`DataFrame.to_csv` now can accept "w", "w+", "a", "a+" as well, same as pandas.
### How was this patch tested?
Add the unit test and manually write the file with the new acceptable strings.
Closes#33414 from itholic/SPARK-35806.
Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 2f42afc53a)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Updating the pyspark sql readwriter documentation to the level of detail provided by the scala documentation
### Why are the changes needed?
documentation clarity
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Only documentation change
Closes#33394 from dominikgehl/feature/SPARK-36181.
Authored-by: Dominik Gehl <dog@open.ch>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 2ef8ced27a)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
The `broadcast` functions in `pyspark.pandas` is duplicated to `DataFrame.spark.hint` with `"broadcast"`.
```python
# The below 2 lines are the same
df.spark.hint("broadcast")
ps.broadcast(df)
```
So, we should remove `broadcast` in the future, and show deprecation warning for now.
### Why are the changes needed?
For deduplication of functions
### Does this PR introduce _any_ user-facing change?
They see the deprecation warning when using `broadcast` in `pyspark.pandas`.
```python
>>> ps.broadcast(df)
FutureWarning: `broadcast` has been deprecated and will be removed in a future version. use `DataFrame.spark.hint` with 'broadcast' for `name` parameter instead.
warnings.warn(
```
### How was this patch tested?
Manually check the warning message and see the build passed.
Closes#33379 from itholic/SPARK-35810.
Lead-authored-by: itholic <haejoon.lee@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Co-authored-by: Haejoon Lee <44108233+itholic@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 67e6120a85)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Introduction: this PR is the last part of SPARK-10816 (EventTime based sessionization (session window)). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.)
### What changes were proposed in this pull request?
This PR proposes to support native session window. Please refer the comments/design doc in SPARK-10816 for more details on the rationalization and design (could be outdated a bit compared to the PR).
The definition of the boundary of "session window" is [the timestamp of start event ~ the timestamp of last event + gap duration). That said, unlike time window, session window is a dynamic window which can expand if new input row is added to the session. To handle expansion of session window, Spark defines session window per input row, and "merge" windows if they can be merged (boundaries are overlapped).
This PR leverages two different approaches on merging session windows:
1. merging session windows with Spark's aggregation logic (a variant of sort aggregation)
2. updating session window for all rows bound to the same session, and applying aggregation logic afterwards
First one is preferable as it outperforms compared to the second one, though it can be only used if merging session window can be applied altogether with aggregation. It is not applicable on all the cases, so second one is used to cover the remaining cases.
This PR also applies the optimization on merging input rows and existing sessions with retaining the order (group keys + start timestamp of session window), leveraging the fact the number of existing sessions per group key won't be huge.
The state format is versioned, so that we can bring a new state format if we find a better one.
### Why are the changes needed?
For now, to deal with sessionization, Spark requires end users to play with (flat)MapGroupsWithState directly which has a couple of major drawbacks:
1. (flat)MapGroupsWithState is lower level API and end users have to code everything in details for defining session window and merging windows
2. built-in aggregate functions cannot be used and end users have to deal with aggregation by themselves
3. (flat)MapGroupsWithState is only available in Scala/Java.
With native support of session window, end users simply use "session_window" like they use "window" for tumbling/sliding window, and leverage built-in aggregate functions as well as UDAFs to simply define aggregations.
Quoting the query example from test suite:
```
val inputData = MemoryStream[(String, Long)]
// Split the lines into words, treat words as sessionId of events
val events = inputData.toDF()
.select($"_1".as("value"), $"_2".as("timestamp"))
.withColumn("eventTime", $"timestamp".cast("timestamp"))
.selectExpr("explode(split(value, ' ')) AS sessionId", "eventTime")
.withWatermark("eventTime", "30 seconds")
val sessionUpdates = events
.groupBy(session_window($"eventTime", "10 seconds") as 'session, 'sessionId)
.agg(count("*").as("numEvents"))
.selectExpr("sessionId", "CAST(session.start AS LONG)", "CAST(session.end AS LONG)",
"CAST(session.end AS LONG) - CAST(session.start AS LONG) AS durationMs",
"numEvents")
```
which is same as StructuredSessionization (native session window is shorter and clearer even ignoring model classes).
39542bb81f/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala (L66-L105)
(Worth noting that the code in StructuredSessionization only works with processing time. The code doesn't consider old event can update the start time of old session.)
### Does this PR introduce _any_ user-facing change?
Yes. This PR brings the new feature to support session window on both batch and streaming query, which adds a new function "session_window" which usage is similar with "window".
### How was this patch tested?
New test suites. Also tested with benchmark code.
Closes#33081 from HeartSaVioR/SPARK-34893-SPARK-10816-PR-31570-part-5.
Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit f2bf8b051b)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
This PR proposes to use Python 3.9 in documentation and linter at GitHub Actions. This PR also contains the fixes for mypy check (introduced by Python 3.9 upgrade)
```
python/pyspark/sql/pandas/_typing/protocols/frame.pyi:64: error: Name "np.ndarray" is not defined
python/pyspark/sql/pandas/_typing/protocols/frame.pyi:91: error: Name "np.recarray" is not defined
python/pyspark/sql/pandas/_typing/protocols/frame.pyi:165: error: Name "np.ndarray" is not defined
python/pyspark/pandas/categorical.py:82: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "categories"
python/pyspark/pandas/categorical.py:109: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "ordered"
python/pyspark/ml/linalg/__init__.pyi:184: error: Return type "ndarray[Any, Any]" of "toArray" incompatible with return type "NoReturn" in supertype "Matrix"
python/pyspark/ml/linalg/__init__.pyi:217: error: Return type "ndarray[Any, Any]" of "toArray" incompatible with return type "NoReturn" in supertype "Matrix"
python/pyspark/pandas/typedef/typehints.py:163: error: Module has no attribute "bool"; maybe "bool_" or "bool8"?
python/pyspark/pandas/typedef/typehints.py:174: error: Module has no attribute "float"; maybe "float_", "cfloat", or "float96"?
python/pyspark/pandas/typedef/typehints.py:180: error: Module has no attribute "int"; maybe "uint", "rint", or "intp"?
python/pyspark/pandas/ml.py:81: error: Value of type variable "_DTypeScalar_co" of "dtype" cannot be "object"
python/pyspark/pandas/indexing.py:1649: error: Module has no attribute "int"; maybe "uint", "rint", or "intp"?
python/pyspark/pandas/indexing.py:1656: error: Module has no attribute "int"; maybe "uint", "rint", or "intp"?
python/pyspark/pandas/frame.py:4969: error: Function "numpy.array" is not valid as a type
python/pyspark/pandas/frame.py:4969: note: Perhaps you need "Callable[...]" or a callback protocol?
python/pyspark/pandas/frame.py:4970: error: Function "numpy.array" is not valid as a type
python/pyspark/pandas/frame.py:4970: note: Perhaps you need "Callable[...]" or a callback protocol?
python/pyspark/pandas/frame.py:7402: error: "List[Any]" has no attribute "tolist"
python/pyspark/pandas/series.py:1030: error: Module has no attribute "_NoValue"
python/pyspark/pandas/series.py:1031: error: Module has no attribute "_NoValue"
python/pyspark/pandas/indexes/category.py:159: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "categories"
python/pyspark/pandas/indexes/category.py:180: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "ordered"
python/pyspark/pandas/namespace.py:2036: error: Argument 1 to "column_name" has incompatible type "float"; expected "str"
python/pyspark/pandas/mlflow.py:59: error: Incompatible types in assignment (expression has type "Type[floating[Any]]", variable has type "str")
python/pyspark/pandas/data_type_ops/categorical_ops.py:43: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "categories"
python/pyspark/pandas/data_type_ops/categorical_ops.py:43: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "ordered"
python/pyspark/pandas/data_type_ops/categorical_ops.py:56: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "categories"
python/pyspark/pandas/tests/test_typedef.py:70: error: Name "np.float" is not defined
python/pyspark/pandas/tests/test_typedef.py:77: error: Name "np.float" is not defined
python/pyspark/pandas/tests/test_typedef.py:85: error: Name "np.float" is not defined
python/pyspark/pandas/tests/test_typedef.py💯 error: Name "np.float" is not defined
python/pyspark/pandas/tests/test_typedef.py:108: error: Name "np.float" is not defined
python/pyspark/mllib/clustering.pyi:152: error: Incompatible types in assignment (expression has type "ndarray[Any, Any]", base class "KMeansModel" defined the type as "List[ndarray[Any, Any]]")
python/pyspark/mllib/classification.pyi:93: error: Signature of "predict" incompatible with supertype "LinearClassificationModel"
Found 32 errors in 15 files (checked 315 source files)
1
```
Python 3.6 is deprecated at SPARK-35938
No. Maybe static analysis, etc. by some type hints but they are really non-breaking..
I manually checked by GitHub Actions build in forked repository.
Closes#33356 from HyukjinKwon/SPARK-36146.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit a71dd6af2f)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>