The following code should type-check:
```python3
import uuid
import pyspark.sql.functions as F
my_udf = F.udf(lambda: str(uuid.uuid4())).asNondeterministic()
```
### What changes were proposed in this pull request?
The `udf` function should return a more specific type.
### Why are the changes needed?
Right now, `mypy` will throw spurious errors, such as for the code given above.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
This was not tested. Sorry, I am not very familiar with this repo -- are there any typing tests?
Closes#33399 from luranhe/patch-1.
Lead-authored-by: Luran He <luranjhe@gmail.com>
Co-authored-by: Luran He <luran.he@compass.com>
Signed-off-by: zero323 <mszymkiewicz@gmail.com>
### What changes were proposed in this pull request?
Update api usage examples on PySpark pandas API documents.
### Why are the changes needed?
If users try to use PySpark pandas API from the document, they will see some API deprication warnings.
It is kind for users to update those documents to avoid confusion.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
```
make html
```
Closes#33519 from yoda-mon/update-pyspark-configurations.
Authored-by: Leona <yodal@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Clean up `CategoricalAccessor` and `CategoricalIndex`.
- Clean up the classes
- Add deprecation warnings
- Clean up the docs
### Why are the changes needed?
To finalize the series of PRs for `CategoricalAccessor` and `CategoricalIndex`, we should clean up the classes.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#33528 from ueshin/issues/SPARK-36267/cleanup.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Set the result to 1 when the exp with 0(or False).
### Why are the changes needed?
Currently, exponentiation between fractional series and bools is not consistent with pandas' behavior.
```
>>> pser = pd.Series([1, 2, np.nan], dtype=float)
>>> psser = ps.from_pandas(pser)
>>> pser ** False
0 1.0
1 1.0
2 1.0
dtype: float64
>>> psser ** False
0 1.0
1 1.0
2 NaN
dtype: float64
```
We ought to adjust that.
See more in [SPARK-36142](https://issues.apache.org/jira/browse/SPARK-36142)
### Does this PR introduce _any_ user-facing change?
Yes, it introduces a user-facing change, resulting in a different result for pow between fractional Series with missing values and bool literal, the results follow pandas behavior.
### How was this patch tested?
- Add test_pow_with_float_nan ut
- Exsiting test in test_pow
Closes#33521 from Yikun/SPARK-36142.
Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Add set_categories to CategoricalAccessor and CategoricalIndex.
### Why are the changes needed?
set_categories is supported in pandas CategoricalAccessor and CategoricalIndex. We ought to follow pandas.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to use `set_categories`.
### How was this patch tested?
Unit tests.
Closes#33506 from xinrong-databricks/set_categories.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
Changing references to Dataset in python docstrings to DataFrame
### Why are the changes needed?
no Dataset class in pyspark
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Doc change only
Closes#33438 from dominikgehl/feature/SPARK-36225.
Lead-authored-by: Dominik Gehl <dog@open.ch>
Co-authored-by: Dominik Gehl <gehl@fastmail.fm>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Fix `lint-python` to pick `PYTHON_EXECUTABLE` from the environment variable first to switch the Python and explicitly specify `PYTHON_EXECUTABLE` to use `python3.9` in CI.
### Why are the changes needed?
Currently `lint-python` uses `python3`, but it's not the one we expect in CI.
As a result, `black` check is not working.
```
The python3 -m black command was not found. Skipping black checks for now.
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
The `black` check in `lint-python` should work.
Closes#33507 from ueshin/issues/SPARK-36279/lint-python.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Fix equality comparison of unordered Categoricals.
### Why are the changes needed?
Codes of a Categorical Series are used for Series equality comparison. However, that doesn't apply to unordered Categoricals, where the same value can have different codes in two same categories in a different order.
So we should map codes to value respectively and then compare the equality of value.
### Does this PR introduce _any_ user-facing change?
Yes.
From:
```py
>>> psser1 = ps.Series(pd.Categorical(list("abca")))
>>> psser2 = ps.Series(pd.Categorical(list("bcaa"), categories=list("bca")))
>>> with ps.option_context("compute.ops_on_diff_frames", True):
... (psser1 == psser2).sort_index()
...
0 True
1 True
2 True
3 False
dtype: bool
```
To:
```py
>>> psser1 = ps.Series(pd.Categorical(list("abca")))
>>> psser2 = ps.Series(pd.Categorical(list("bcaa"), categories=list("bca")))
>>> with ps.option_context("compute.ops_on_diff_frames", True):
... (psser1 == psser2).sort_index()
...
0 False
1 False
2 False
3 True
dtype: bool
```
### How was this patch tested?
Unit tests.
Closes#33497 from xinrong-databricks/cat_bug.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
Add `reorder_categories` to `CategoricalAccessor` and `CategoricalIndex`.
### Why are the changes needed?
We should implement `reorder_categories` in `CategoricalAccessor` and `CategoricalIndex`.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to use `reorder_categories`.
### How was this patch tested?
Added some tests.
Closes#33499 from ueshin/issues/SPARK-36264/reorder_categories.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
additional links to other classes in python documentation
### Why are the changes needed?
python docstring syntax wasn't fully used everywhere
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Documentation change only
Closes#33440 from dominikgehl/feature/python-docstrings.
Authored-by: Dominik Gehl <dog@open.ch>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Exposing functionExists in pyspark sql catalog
### Why are the changes needed?
method was available in scala but not pyspark
### Does this PR introduce _any_ user-facing change?
Additional method
### How was this patch tested?
Unit tests
Closes#33481 from dominikgehl/SPARK-36258.
Authored-by: Dominik Gehl <dog@open.ch>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Add `remove_unused_categories` to `CategoricalAccessor` and `CategoricalIndex`.
### Why are the changes needed?
We should implement `remove_unused_categories` in `CategoricalAccessor` and `CategoricalIndex`.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to use `remove_unused_categories`.
### How was this patch tested?
Added some tests.
Closes#33485 from ueshin/issues/SPARK-36261/remove_unused_categories.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Improve bool, string, numeric DataTypeOps tests by avoiding joins.
Previously, bool, string, numeric DataTypeOps tests are conducted between two different Series.
After the PR, bool, string, numeric DataTypeOps tests should perform on a single DataFrame.
### Why are the changes needed?
A considerable number of DataTypeOps tests have operations on different Series, so joining is needed, which takes a long time.
We shall avoid joins for a shorter test duration.
The majority of joins happen in bool, string, numeric DataTypeOps tests, so we improve them first.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests.
Closes#33402 from xinrong-databricks/datatypeops_diffframe.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Use `Column.__getitem__` instead of `Column.getItem` to suppress warnings.
### Why are the changes needed?
In pandas API on Spark code base, there are some places using `Column.getItem` with `Column` object, but it shows a deprecation warning.
### Does this PR introduce _any_ user-facing change?
Yes, users won't see the warnings anymore.
- before
```py
>>> s = ps.Series(list("abbccc"), dtype="category")
>>> s.astype(str)
/path/to/spark/python/pyspark/sql/column.py:322: FutureWarning: A column as 'key' in getItem is deprecated as of Spark 3.0, and will not be supported in the future release. Use `column[key]` or `column.key` syntax instead.
warnings.warn(
0 a
1 b
2 b
3 c
4 c
5 c
dtype: object
```
- after
```py
>>> s = ps.Series(list("abbccc"), dtype="category")
>>> s.astype(str)
0 a
1 b
2 b
3 c
4 c
5 c
dtype: object
```
### How was this patch tested?
Existing tests.
Closes#33486 from ueshin/issues/SPARK-36265/getitem.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR proposes removing some APIs from pandas-on-Spark documentation.
Because they can be easily workaround via Spark DataFrame or Column functions, so they might be removed In the future.
### Why are the changes needed?
Because we don't want to expose some functions as a public API.
### Does this PR introduce _any_ user-facing change?
The APIs such as `(Series|Index).spark.data_type`, `(Series|Index).spark.nullable`, `DataFrame.spark.schema`, `DataFrame.spark.print_schema`, `DataFrame.pandas_on_spark.attach_id_column`, `DataFrame.spark.checkpoint`, `DataFrame.spark.localcheckpoint` and `DataFrame.spark.explain` is removed in the documentation.
### How was this patch tested?
Manually build the documents.
Closes#33458 from itholic/SPARK-36239.
Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Additional tests for pyspark tableExists with regard to views and temporary views
### Why are the changes needed?
scala documentation indicates that tableExists works for tables/view and also temporary views. This unit tests try to verify that claim. While views seem ok, temporary views don't seem to work.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
tests
Closes#33461 from dominikgehl/bug/SPARK-36243.
Lead-authored-by: Dominik Gehl <dog@open.ch>
Co-authored-by: Dominik Gehl <gehl@fastmail.fm>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR follows up #33379 to fix build error in Sphinx
### Why are the changes needed?
The Sphinx build is failed with missing newline in docstring
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manually test the Sphinx build
Closes#33479 from itholic/SPARK-35810-followup.
Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR proposes adding an argument `index_col` for `ps.sql` function, to preserve the index when users want.
NOTE that the `reset_index()` have to be performed before using `ps.sql` with `index_col`.
```python
>>> psdf
A B
a 1 4
b 2 5
c 3 6
>>> psdf_reset_index = psdf.reset_index()
>>> ps.sql("SELECT * from {psdf_reset_index} WHERE A > 1", index_col="index")
A B
index
b 2 5
c 3 6
```
Otherwise, the index is always lost.
```python
>>> ps.sql("SELECT * from {psdf} WHERE A > 1")
A B
0 2 5
1 3 6
```
### Why are the changes needed?
Index is one of the key object for the existing pandas users, so we should provide the way to keep the index after computing the `ps.sql`.
### Does this PR introduce _any_ user-facing change?
Yes, the new argument is added.
### How was this patch tested?
Add a unit test and manually check the build pass.
Closes#33450 from itholic/SPARK-35809.
Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Add `remove_categories` to `CategoricalAccessor` and `CategoricalIndex`.
### Why are the changes needed?
We should implement `remove_categories` in `CategoricalAccessor` and `CategoricalIndex`.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to use `remove_categories`.
### How was this patch tested?
Added some tests.
Closes#33474 from ueshin/issues/SPARK-36249/remove_categories.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Add `add_categories` to `CategoricalAccessor` and `CategoricalIndex`.
### Why are the changes needed?
We should implement `add_categories` in `CategoricalAccessor` and `CategoricalIndex`.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to use `add_categories`.
### How was this patch tested?
Added some tests.
Closes#33470 from ueshin/issues/SPARK-36214/add_categories.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
This PR adds the version that added pandas API on Spark in PySpark documentation.
### Why are the changes needed?
To document the version added.
### Does this PR introduce _any_ user-facing change?
No to end user. Spark 3.2 is not released yet.
### How was this patch tested?
Linter and documentation build.
Closes#33473 from HyukjinKwon/SPARK-36253.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Add categories setter to `CategoricalAccessor` and `CategoricalIndex`.
### Why are the changes needed?
We should implement categories setter in `CategoricalAccessor` and `CategoricalIndex`.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to use categories setter.
### How was this patch tested?
Added some tests.
Closes#33448 from ueshin/issues/SPARK-36188/categories_setter.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
Add `as_ordered`/`as_unordered` to `CategoricalAccessor` and `CategoricalIndex`.
### Why are the changes needed?
We should implement `as_ordered`/`as_unordered` in `CategoricalAccessor` and `CategoricalIndex` yet.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to use `as_ordered`/`as_unordered`.
### How was this patch tested?
Added some tests.
Closes#33400 from ueshin/issues/SPARK-36186/as_ordered_unordered.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
Expose databaseExists in pyspark.sql.catalog
### Why are the changes needed?
Was available in scala, but not in pyspark
### Does this PR introduce _any_ user-facing change?
New method databaseExists
### How was this patch tested?
Unit tests in codebase
Closes#33416 from dominikgehl/feature/SPARK-36207.
Lead-authored-by: Dominik Gehl <dog@open.ch>
Co-authored-by: Dominik Gehl <gehl@fastmail.fm>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Test is flaky (https://github.com/apache/spark/runs/3109815586):
```
Traceback (most recent call last):
File "/__w/spark/spark/python/pyspark/mllib/tests/test_streaming_algorithms.py", line 391, in test_parameter_convergence
eventually(condition, catch_assertions=True)
File "/__w/spark/spark/python/pyspark/testing/utils.py", line 91, in eventually
raise lastValue
File "/__w/spark/spark/python/pyspark/testing/utils.py", line 82, in eventually
lastValue = condition()
File "/__w/spark/spark/python/pyspark/mllib/tests/test_streaming_algorithms.py", line 387, in condition
self.assertEqual(len(model_weights), len(batches))
AssertionError: 9 != 10
```
Should probably increase timeout
### Why are the changes needed?
To avoid flakiness in the test.
### Does this PR introduce _any_ user-facing change?
Nope, dev-only.
### How was this patch tested?
CI should test it out.
Closes#33427 from HyukjinKwon/SPARK-36216.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
exposing tableExists in pyspark.sql.catalog
### Why are the changes needed?
avoids pyspark users having to go through listTables
### Does this PR introduce _any_ user-facing change?
Yes, additional tableExists method available in pyspark
### How was this patch tested?
test added
Closes#33388 from dominikgehl/feature/SPARK-36176.
Authored-by: Dominik Gehl <dog@open.ch>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Fix test failures with `pandas < 1.2`.
### Why are the changes needed?
There are some test failures with `pandas < 1.2`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Fixed tests.
Closes#33398 from ueshin/issues/SPARK-36167/test.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Support comparison between a Categorical and a scalar.
There are 3 main changes:
- Modify `==` and `!=` from comparing **codes** of the Categorical to the scalar to comparing **actual values** of the Categorical to the scalar.
- Support `<`, `<=`, `>`, `>=` between a Categorical and a scalar.
- TypeError message fix.
### Why are the changes needed?
pandas supports comparison between a Categorical and a scalar, we should follow pandas' behaviors.
### Does this PR introduce _any_ user-facing change?
Yes.
Before:
```py
>>> import pyspark.pandas as ps
>>> import pandas as pd
>>> from pandas.api.types import CategoricalDtype
>>> pser = pd.Series(pd.Categorical([1, 2, 3], categories=[3, 2, 1], ordered=True))
>>> psser = ps.from_pandas(pser)
>>> psser == 2
0 True
1 False
2 False
dtype: bool
>>> psser <= 1
Traceback (most recent call last):
...
NotImplementedError: <= can not be applied to categoricals.
```
After:
```py
>>> import pyspark.pandas as ps
>>> import pandas as pd
>>> from pandas.api.types import CategoricalDtype
>>> pser = pd.Series(pd.Categorical([1, 2, 3], categories=[3, 2, 1], ordered=True))
>>> psser = ps.from_pandas(pser)
>>> psser == 2
0 False
1 True
2 False
dtype: bool
>>> psser <= 1
0 True
1 True
2 True
dtype: bool
```
### How was this patch tested?
Unit tests.
Closes#33373 from xinrong-databricks/categorical_eq.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
The `DataFrame.to_csv` has `mode` arguments both in pandas and pandas API on Spark.
However, pandas allows the string "w", "w+", "a", "a+" where as pandas-on-Spark allows "append", "overwrite", "ignore", "error" or "errorifexists".
We should map them while `mode` can still accept the existing parameters("append", "overwrite", "ignore", "error" or "errorifexists") as well.
### Why are the changes needed?
APIs in pandas-on-Spark should follows the behavior of pandas for preventing the existing pandas code break.
### Does this PR introduce _any_ user-facing change?
`DataFrame.to_csv` now can accept "w", "w+", "a", "a+" as well, same as pandas.
### How was this patch tested?
Add the unit test and manually write the file with the new acceptable strings.
Closes#33414 from itholic/SPARK-35806.
Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Updating the pyspark sql readwriter documentation to the level of detail provided by the scala documentation
### Why are the changes needed?
documentation clarity
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Only documentation change
Closes#33394 from dominikgehl/feature/SPARK-36181.
Authored-by: Dominik Gehl <dog@open.ch>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
The pyspark.sql.catalog APIs were missing from the documentation. PR fixes this omission.
### Why are the changes needed?
Documentation consistency
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Documentation change only.
Closes#33392 from dominikgehl/feature/SPARK-36178.
Authored-by: Dominik Gehl <dog@open.ch>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
The `broadcast` functions in `pyspark.pandas` is duplicated to `DataFrame.spark.hint` with `"broadcast"`.
```python
# The below 2 lines are the same
df.spark.hint("broadcast")
ps.broadcast(df)
```
So, we should remove `broadcast` in the future, and show deprecation warning for now.
### Why are the changes needed?
For deduplication of functions
### Does this PR introduce _any_ user-facing change?
They see the deprecation warning when using `broadcast` in `pyspark.pandas`.
```python
>>> ps.broadcast(df)
FutureWarning: `broadcast` has been deprecated and will be removed in a future version. use `DataFrame.spark.hint` with 'broadcast' for `name` parameter instead.
warnings.warn(
```
### How was this patch tested?
Manually check the warning message and see the build passed.
Closes#33379 from itholic/SPARK-35810.
Lead-authored-by: itholic <haejoon.lee@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Co-authored-by: Haejoon Lee <44108233+itholic@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Adapting documentation of `between`, `getField`, `dropFields` and `cast` to the corresponding scala doc
### Why are the changes needed?
Documentation clarity
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Only documentation change
Closes#33369 from dominikgehl/feature/SPARK-36160.
Authored-by: Dominik Gehl <dog@open.ch>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Introduction: this PR is the last part of SPARK-10816 (EventTime based sessionization (session window)). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.)
### What changes were proposed in this pull request?
This PR proposes to support native session window. Please refer the comments/design doc in SPARK-10816 for more details on the rationalization and design (could be outdated a bit compared to the PR).
The definition of the boundary of "session window" is [the timestamp of start event ~ the timestamp of last event + gap duration). That said, unlike time window, session window is a dynamic window which can expand if new input row is added to the session. To handle expansion of session window, Spark defines session window per input row, and "merge" windows if they can be merged (boundaries are overlapped).
This PR leverages two different approaches on merging session windows:
1. merging session windows with Spark's aggregation logic (a variant of sort aggregation)
2. updating session window for all rows bound to the same session, and applying aggregation logic afterwards
First one is preferable as it outperforms compared to the second one, though it can be only used if merging session window can be applied altogether with aggregation. It is not applicable on all the cases, so second one is used to cover the remaining cases.
This PR also applies the optimization on merging input rows and existing sessions with retaining the order (group keys + start timestamp of session window), leveraging the fact the number of existing sessions per group key won't be huge.
The state format is versioned, so that we can bring a new state format if we find a better one.
### Why are the changes needed?
For now, to deal with sessionization, Spark requires end users to play with (flat)MapGroupsWithState directly which has a couple of major drawbacks:
1. (flat)MapGroupsWithState is lower level API and end users have to code everything in details for defining session window and merging windows
2. built-in aggregate functions cannot be used and end users have to deal with aggregation by themselves
3. (flat)MapGroupsWithState is only available in Scala/Java.
With native support of session window, end users simply use "session_window" like they use "window" for tumbling/sliding window, and leverage built-in aggregate functions as well as UDAFs to simply define aggregations.
Quoting the query example from test suite:
```
val inputData = MemoryStream[(String, Long)]
// Split the lines into words, treat words as sessionId of events
val events = inputData.toDF()
.select($"_1".as("value"), $"_2".as("timestamp"))
.withColumn("eventTime", $"timestamp".cast("timestamp"))
.selectExpr("explode(split(value, ' ')) AS sessionId", "eventTime")
.withWatermark("eventTime", "30 seconds")
val sessionUpdates = events
.groupBy(session_window($"eventTime", "10 seconds") as 'session, 'sessionId)
.agg(count("*").as("numEvents"))
.selectExpr("sessionId", "CAST(session.start AS LONG)", "CAST(session.end AS LONG)",
"CAST(session.end AS LONG) - CAST(session.start AS LONG) AS durationMs",
"numEvents")
```
which is same as StructuredSessionization (native session window is shorter and clearer even ignoring model classes).
39542bb81f/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala (L66-L105)
(Worth noting that the code in StructuredSessionization only works with processing time. The code doesn't consider old event can update the start time of old session.)
### Does this PR introduce _any_ user-facing change?
Yes. This PR brings the new feature to support session window on both batch and streaming query, which adds a new function "session_window" which usage is similar with "window".
### How was this patch tested?
New test suites. Also tested with benchmark code.
Closes#33081 from HeartSaVioR/SPARK-34893-SPARK-10816-PR-31570-part-5.
Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
Revisit and manage `InternalField` in more places.
### Why are the changes needed?
There are other places we can manage `InternalField`, and we can keep extension dtypes or `CategoricalDtype`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added some tests.
Closes#33377 from ueshin/issues/SPARK-36167/internal_field.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
Updating pyspark months_between documentation to the precision in the scala documentation
### Why are the changes needed?
Documentation clarity
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Only documentation change
Closes#33366 from dominikgehl/feature/SPARK-36158.
Authored-by: Dominik Gehl <dog@open.ch>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
This PR proposes to use Python 3.9 in documentation and linter at GitHub Actions. This PR also contains the fixes for mypy check (introduced by Python 3.9 upgrade)
```
python/pyspark/sql/pandas/_typing/protocols/frame.pyi:64: error: Name "np.ndarray" is not defined
python/pyspark/sql/pandas/_typing/protocols/frame.pyi:91: error: Name "np.recarray" is not defined
python/pyspark/sql/pandas/_typing/protocols/frame.pyi:165: error: Name "np.ndarray" is not defined
python/pyspark/pandas/categorical.py:82: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "categories"
python/pyspark/pandas/categorical.py:109: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "ordered"
python/pyspark/ml/linalg/__init__.pyi:184: error: Return type "ndarray[Any, Any]" of "toArray" incompatible with return type "NoReturn" in supertype "Matrix"
python/pyspark/ml/linalg/__init__.pyi:217: error: Return type "ndarray[Any, Any]" of "toArray" incompatible with return type "NoReturn" in supertype "Matrix"
python/pyspark/pandas/typedef/typehints.py:163: error: Module has no attribute "bool"; maybe "bool_" or "bool8"?
python/pyspark/pandas/typedef/typehints.py:174: error: Module has no attribute "float"; maybe "float_", "cfloat", or "float96"?
python/pyspark/pandas/typedef/typehints.py:180: error: Module has no attribute "int"; maybe "uint", "rint", or "intp"?
python/pyspark/pandas/ml.py:81: error: Value of type variable "_DTypeScalar_co" of "dtype" cannot be "object"
python/pyspark/pandas/indexing.py:1649: error: Module has no attribute "int"; maybe "uint", "rint", or "intp"?
python/pyspark/pandas/indexing.py:1656: error: Module has no attribute "int"; maybe "uint", "rint", or "intp"?
python/pyspark/pandas/frame.py:4969: error: Function "numpy.array" is not valid as a type
python/pyspark/pandas/frame.py:4969: note: Perhaps you need "Callable[...]" or a callback protocol?
python/pyspark/pandas/frame.py:4970: error: Function "numpy.array" is not valid as a type
python/pyspark/pandas/frame.py:4970: note: Perhaps you need "Callable[...]" or a callback protocol?
python/pyspark/pandas/frame.py:7402: error: "List[Any]" has no attribute "tolist"
python/pyspark/pandas/series.py:1030: error: Module has no attribute "_NoValue"
python/pyspark/pandas/series.py:1031: error: Module has no attribute "_NoValue"
python/pyspark/pandas/indexes/category.py:159: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "categories"
python/pyspark/pandas/indexes/category.py:180: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "ordered"
python/pyspark/pandas/namespace.py:2036: error: Argument 1 to "column_name" has incompatible type "float"; expected "str"
python/pyspark/pandas/mlflow.py:59: error: Incompatible types in assignment (expression has type "Type[floating[Any]]", variable has type "str")
python/pyspark/pandas/data_type_ops/categorical_ops.py:43: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "categories"
python/pyspark/pandas/data_type_ops/categorical_ops.py:43: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "ordered"
python/pyspark/pandas/data_type_ops/categorical_ops.py:56: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "categories"
python/pyspark/pandas/tests/test_typedef.py:70: error: Name "np.float" is not defined
python/pyspark/pandas/tests/test_typedef.py:77: error: Name "np.float" is not defined
python/pyspark/pandas/tests/test_typedef.py:85: error: Name "np.float" is not defined
python/pyspark/pandas/tests/test_typedef.py💯 error: Name "np.float" is not defined
python/pyspark/pandas/tests/test_typedef.py:108: error: Name "np.float" is not defined
python/pyspark/mllib/clustering.pyi:152: error: Incompatible types in assignment (expression has type "ndarray[Any, Any]", base class "KMeansModel" defined the type as "List[ndarray[Any, Any]]")
python/pyspark/mllib/classification.pyi:93: error: Signature of "predict" incompatible with supertype "LinearClassificationModel"
Found 32 errors in 15 files (checked 315 source files)
1
```
### Why are the changes needed?
Python 3.6 is deprecated at SPARK-35938
### Does this PR introduce _any_ user-facing change?
No. Maybe static analysis, etc. by some type hints but they are really non-breaking..
### How was this patch tested?
I manually checked by GitHub Actions build in forked repository.
Closes#33356 from HyukjinKwon/SPARK-36146.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Added missing documentation of week and quarter as valid formats to pyspark sql/functions trunc
### Why are the changes needed?
Pyspark documentation and scala documentation didn't mentioned the same supported formats
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Only documentation change
Closes#33359 from dominikgehl/feature/SPARK-36154.
Authored-by: Dominik Gehl <dog@open.ch>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
Clearly state which weekday corresponds to which integer
### Why are the changes needed?
Documentation clarity
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
only documentation change
Closes#33345 from dominikgehl/doc/pyspark-dayofweek.
Authored-by: Dominik Gehl <dog@open.ch>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
Implement non-equality comparison operators between two Categoricals.
Non-goal: supporting Scalar input will be a follow-up task.
### Why are the changes needed?
pandas supports non-equality comparisons between two Categoricals. We should follow that.
### Does this PR introduce _any_ user-facing change?
Yes. No `NotImplementedError` for `<`, `<=`, `>`, `>=` operators between two Categoricals. An example is shown as below:
From:
```py
>>> import pyspark.pandas as ps
>>> from pandas.api.types import CategoricalDtype
>>> psser = ps.Series([1, 2, 3]).astype(CategoricalDtype([3, 2, 1], ordered=True))
>>> other_psser = ps.Series([2, 1, 3]).astype(CategoricalDtype([3, 2, 1], ordered=True))
>>> with ps.option_context("compute.ops_on_diff_frames", True):
... psser <= other_psser
...
Traceback (most recent call last):
...
NotImplementedError: <= can not be applied to categoricals.
```
To:
```py
>>> import pyspark.pandas as ps
>>> from pandas.api.types import CategoricalDtype
>>> psser = ps.Series([1, 2, 3]).astype(CategoricalDtype([3, 2, 1], ordered=True))
>>> other_psser = ps.Series([2, 1, 3]).astype(CategoricalDtype([3, 2, 1], ordered=True))
>>> with ps.option_context("compute.ops_on_diff_frames", True):
... psser <= other_psser
...
0 False
1 True
2 True
dtype: bool
```
### How was this patch tested?
Unit tests.
Closes#33331 from xinrong-databricks/categorical_compare.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
This is a followup PR for SPARK-36104 (#33307) and removes unused import `typing.cast`.
After that change, Python linter fails.
```
./dev/lint-python
shell: sh -e {0}
env:
LC_ALL: C.UTF-8
LANG: C.UTF-8
pythonLocation: /__t/Python/3.6.13/x64
LD_LIBRARY_PATH: /__t/Python/3.6.13/x64/lib
starting python compilation test...
python compilation succeeded.
starting black test...
black checks passed.
starting pycodestyle test...
pycodestyle checks passed.
starting flake8 test...
flake8 checks failed:
./python/pyspark/pandas/data_type_ops/num_ops.py:19:1: F401 'typing.cast' imported but unused
from typing import cast, Any, Union
^
1 F401 'typing.cast' imported but unused
```
### Why are the changes needed?
To recover CI.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#33315 from sarutak/followup-SPARK-36104.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Manage InternalField for DataTypeOps.neg/abs.
### Why are the changes needed?
The spark data type and nullability must be the same as the original when DataTypeOps.neg/abs.
We should manage InternalField for this case.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests.
Closes#33307 from xinrong-databricks/internalField.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Properly set `InternalField` for `DataTypeOps.invert`.
### Why are the changes needed?
The spark data type and nullability must be the same as the original when `DataTypeOps.invert`.
We should manage `InternalField` for this case.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#33306 from ueshin/issues/SPARK-36103/invert.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Implement unary operator `invert` of integral ps.Series/Index.
### Why are the changes needed?
Currently, unary operator `invert` of integral ps.Series/Index is not supported. We ought to implement that following pandas' behaviors.
### Does this PR introduce _any_ user-facing change?
Yes.
Before:
```py
>>> import pyspark.pandas as ps
>>> psser = ps.Series([1, 2, 3])
>>> ~psser
Traceback (most recent call last):
...
NotImplementedError: Unary ~ can not be applied to integrals.
```
After:
```py
>>> import pyspark.pandas as ps
>>> psser = ps.Series([1, 2, 3])
>>> ~psser
0 -2
1 -3
2 -4
dtype: int64
```
### How was this patch tested?
Unit tests.
Closes#33285 from xinrong-databricks/numeric_invert.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Properly set `InternalField` more in `DataTypeOps`.
### Why are the changes needed?
There are more places in `DataTypeOps` where we can manage `InternalField`.
We should manage `InternalField` for these cases.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#33275 from ueshin/issues/SPARK-36064/fields.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Adjust `test_astype`, `test_neg` for old pandas versions.
### Why are the changes needed?
There are issues in old pandas versions that fail tests in pandas API on Spark. We ought to adjust `test_astype` and `test_neg` for old pandas versions.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests. Please refer to https://github.com/apache/spark/pull/33272 for test results with pandas 1.0.1.
Closes#33250 from xinrong-databricks/SPARK-36035.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Merge test_decimal_ops into test_num_ops
- merge test_isnull() into test_num_ops.test_isnull()
- remove test_datatype_ops(), which already covered in 11fcbc73cb/python/pyspark/pandas/tests/data_type_ops/test_base.py (L58-L59)
### Why are the changes needed?
Tests for data-type-based operations of decimal Series are in two places:
- python/pyspark/pandas/tests/data_type_ops/test_decimal_ops.py
- python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
We'd better merge test_decimal_ops into test_num_ops.
See also [SPARK-36002](https://issues.apache.org/jira/browse/SPARK-36002) .
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
unittests passed
Closes#33206 from Yikun/SPARK-36002.
Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
For tests with operations on different Series, sort index of results before comparing them with pandas.
### Why are the changes needed?
We have many tests with operations on different Series in `spark/python/pyspark/pandas/tests/data_type_ops/` that assume the result's index to be sorted and then compare to the pandas' behavior.
The assumption on the result's index ordering is wrong since Spark DataFrame join is used internally and the order is not preserved if the data being in different partitions.
So we should assume the result to be disordered and sort the index of such results before comparing them with pandas.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests.
Closes#33274 from xinrong-databricks/datatypeops_testdiffframe.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Try to capture the error message from the `faulthandler` when the Python worker crashes.
### Why are the changes needed?
Currently, we just see an error message saying `"exited unexpectedly (crashed)"` when the UDFs causes the Python worker to crash by like segmentation fault.
We should take advantage of [`faulthandler`](https://docs.python.org/3/library/faulthandler.html) and try to capture the error message from the `faulthandler`.
### Does this PR introduce _any_ user-facing change?
Yes, when a Spark config `spark.python.worker.faulthandler.enabled` is `true`, the stack trace will be seen in the error message when the Python worker crashes.
```py
>>> def f():
... import ctypes
... ctypes.string_at(0)
...
>>> sc.parallelize([1]).map(lambda x: f()).count()
```
```
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed): Fatal Python error: Segmentation fault
Current thread 0x000000010965b5c0 (most recent call first):
File "/.../ctypes/__init__.py", line 525 in string_at
File "<stdin>", line 3 in f
File "<stdin>", line 1 in <lambda>
...
```
### How was this patch tested?
Added some tests, and manually.
Closes#33273 from ueshin/issues/SPARK-36062/faulthandler.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>