2484 commits
Author | SHA1 | Message | Date | |
---|---|---|---|---|
HyukjinKwon | 01e9dd9050 |
[SPARK-20680][SQL][FOLLOW-UP] Revert NullType.simpleString from 'unknown' to 'null'
### What changes were proposed in this pull request? This PR proposes to partially reverts the simple string in `NullType` at https://github.com/apache/spark/pull/28833: `NullType.simpleString` back from `unknown` to `null`. ### Why are the changes needed? - Technically speaking, it's orthogonal with the issue itself, SPARK-20680. - It needs some more discussion, see https://github.com/apache/spark/pull/28833#issuecomment-655277714 ### Does this PR introduce _any_ user-facing change? It reverts back the user-facing changes at https://github.com/apache/spark/pull/28833. The simple string of `NullType` is back to `null`. ### How was this patch tested? I just logically reverted. Jenkins should test it out. Closes #29041 from HyukjinKwon/SPARK-20680. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> |
||
Takuya UESHIN | cfecc2030d |
[SPARK-32160][CORE][PYSPARK] Disallow to create SparkContext in executors
### What changes were proposed in this pull request? This PR proposes to disallow to create `SparkContext` in executors, e.g., in UDFs. ### Why are the changes needed? Currently executors can create SparkContext, but shouldn't be able to create it. ```scala sc.range(0, 1).foreach { _ => new SparkContext(new SparkConf().setAppName("test").setMaster("local")) } ``` ### Does this PR introduce _any_ user-facing change? Yes, users won't be able to create `SparkContext` in executors. ### How was this patch tested? Addes tests. Closes #28986 from ueshin/issues/SPARK-32160/disallow_spark_context_in_executors. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
LantaoJin | b5297c43b0 |
[SPARK-20680][SQL] Spark-sql do not support for creating table with void column datatype
### What changes were proposed in this pull request? This is the new PR which to address the close one #17953 1. support "void" primitive data type in the `AstBuilder`, point it to `NullType` 2. forbid creating tables with VOID/NULL column type ### Why are the changes needed? 1. Spark is incompatible with hive void type. When Hive table schema contains void type, DESC table will throw an exception in Spark. >hive> create table bad as select 1 x, null z from dual; >hive> describe bad; OK x int z void In Spark2.0.x, the behaviour to read this view is normal: >spark-sql> describe bad; x int NULL z void NULL Time taken: 4.431 seconds, Fetched 2 row(s) But in lastest Spark version, it failed with SparkException: Cannot recognize hive type string: void >spark-sql> describe bad; 17/05/09 03:12:08 ERROR thriftserver.SparkSQLDriver: Failed in [describe bad] org.apache.spark.SparkException: Cannot recognize hive type string: void Caused by: org.apache.spark.sql.catalyst.parser.ParseException: DataType void() is not supported.(line 1, pos 0) == SQL == void ^^^ ... 61 more org.apache.spark.SparkException: Cannot recognize hive type string: void 2. Hive CTAS statements throws error when select clause has NULL/VOID type column since HIVE-11217 In Spark, creating table with a VOID/NULL column should throw readable exception message, include - create data source table (using parquet, json, ...) - create hive table (with or without stored as) - CTAS ### Does this PR introduce any user-facing change? No ### How was this patch tested? Add unit tests Closes #28833 from LantaoJin/SPARK-20680_COPY. Authored-by: LantaoJin <jinlantao@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> |
||
Bryan Cutler | 1d1809636b |
[SPARK-32162][PYTHON][TESTS] Improve error message of Pandas grouped map test with window
### What changes were proposed in this pull request? Improve the error message in test GroupedMapInPandasTests.test_grouped_over_window_with_key to show the incorrect values. ### Why are the changes needed? This test failure has come up often in Arrow testing because it tests a struct with timestamp values through a Pandas UDF. The current error message is not helpful as it doesn't show the incorrect values, only that it failed. This change will instead raise an assertion error with the incorrect values on a failure. Before: ``` ====================================================================== FAIL: test_grouped_over_window_with_key (pyspark.sql.tests.test_pandas_grouped_map.GroupedMapInPandasTests) ---------------------------------------------------------------------- Traceback (most recent call last): File "/spark/python/pyspark/sql/tests/test_pandas_grouped_map.py", line 588, in test_grouped_over_window_with_key self.assertTrue(all([r[0] for r in result])) AssertionError: False is not true ``` After: ``` ====================================================================== ERROR: test_grouped_over_window_with_key (pyspark.sql.tests.test_pandas_grouped_map.GroupedMapInPandasTests) ---------------------------------------------------------------------- ... AssertionError: {'start': datetime.datetime(2018, 3, 20, 0, 0), 'end': datetime.datetime(2018, 3, 25, 0, 0)}, != {'start': datetime.datetime(2020, 3, 20, 0, 0), 'end': datetime.datetime(2020, 3, 25, 0, 0)} ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Improved existing test Closes #28987 from BryanCutler/pandas-grouped-map-test-output-SPARK-32162. Authored-by: Bryan Cutler <cutlerb@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
animenon | 45fe6b62a7 |
[MINOR][DOCS] Pyspark getActiveSession docstring
### What changes were proposed in this pull request? Minor fix so that the documentation of `getActiveSession` is fixed. The sample code snippet doesn't come up formatted rightly, added spacing for this to be fixed. Also added return to docs. ### Why are the changes needed? The sample code is getting mixed up as description in the docs. [Current Doc Link](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=getactivesession#pyspark.sql.SparkSession.getActiveSession) ![image](https://user-images.githubusercontent.com/6907950/86331522-d7b6f800-bc66-11ea-998c-42085f5e5b04.png) ### Does this PR introduce _any_ user-facing change? Yes, documentation of getActiveSession is fixed. And added description about return. ### How was this patch tested? Adding a spacing between description and code seems to fix the issue. Closes #28978 from animenon/docs_minor. Authored-by: animenon <animenon@mail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
Huaxin Gao | f7d9e3d162 |
[SPARK-23631][ML][PYSPARK] Add summary to RandomForestClassificationModel
### What changes were proposed in this pull request? Add summary to RandomForestClassificationModel... ### Why are the changes needed? so user can get a summary of this classification model, and retrieve common metrics such as accuracy, weightedTruePositiveRate, roc (for binary), pr curves (for binary), etc. ### Does this PR introduce _any_ user-facing change? Yes ``` RandomForestClassificationModel.summary RandomForestClassificationModel.evaluate ``` ### How was this patch tested? Add new tests Closes #28913 from huaxingao/rf_summary. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: Sean Owen <srowen@gmail.com> |
||
Max Gekk | dd03c31ea5 |
[SPARK-32088][PYTHON][FOLLOWUP] Replace collect() by show() in the example for timestamp_seconds
### What changes were proposed in this pull request? Modify the example for `timestamp_seconds` and replace `collect()` by `show()`. ### Why are the changes needed? The SQL config `spark.sql.session.timeZone` doesn't influence on the `collect` in the example. The code below demonstrates that: ``` $ export TZ="UTC" ``` ```python >>> from pyspark.sql.functions import timestamp_seconds >>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") >>> time_df = spark.createDataFrame([(1230219000,)], ['unix_time']) >>> time_df.select(timestamp_seconds(time_df.unix_time).alias('ts')).collect() [Row(ts=datetime.datetime(2008, 12, 25, 15, 30))] ``` The expected time is **07:30 but we get 15:30**. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By running the modified example via: ``` $ ./python/run-tests --modules=pyspark-sql ``` Closes #28959 from MaxGekk/SPARK-32088-fix-timezone-issue-followup. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
GuoPhilipse | ac3a0551d8 |
[SPARK-32088][PYTHON] Pin the timezone in timestamp_seconds doctest
### What changes were proposed in this pull request? Add American timezone during timestamp_seconds doctest ### Why are the changes needed? `timestamp_seconds` doctest in `functions.py` used default timezone to get expected result For example: ```python >>> time_df = spark.createDataFrame([(1230219000,)], ['unix_time']) >>> time_df.select(timestamp_seconds(time_df.unix_time).alias('ts')).collect() [Row(ts=datetime.datetime(2008, 12, 25, 7, 30))] ``` But when we have a non-american timezone, the test case will get different test result. For example, when we set current timezone as `Asia/Shanghai`, the test result will be ``` [Row(ts=datetime.datetime(2008, 12, 25, 23, 30))] ``` So no matter where we run the test case ,we will always get the expected permanent result if we set the timezone on one specific area. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #28932 from GuoPhilipse/SPARK-32088-fix-timezone-issue. Lead-authored-by: GuoPhilipse <46367746+GuoPhilipse@users.noreply.github.com> Co-authored-by: GuoPhilipse <guofei_ok@126.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> |
||
Huaxin Gao | 8795133707 |
[SPARK-20249][ML][PYSPARK] Add training summary for LinearSVCModel
### What changes were proposed in this pull request? Add training summary for LinearSVCModel...... ### Why are the changes needed? so that user can get the training process status, such as loss value of each iteration and total iteration number. ### Does this PR introduce _any_ user-facing change? Yes ```LinearSVCModel.summary``` ```LinearSVCModel.evaluate``` ### How was this patch tested? new tests Closes #28884 from huaxingao/svc_summary. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: Sean Owen <srowen@gmail.com> |
||
Huaxin Gao | d1255297b8 |
[SPARK-19939][ML] Add support for association rules in ML
### What changes were proposed in this pull request? Adding support to Association Rules in Spark ml.fpm. ### Why are the changes needed? Support is an indication of how frequently the itemset of an association rule appears in the database and suggests if the rules are generally applicable to the dateset. Refer to [wiki](https://en.wikipedia.org/wiki/Association_rule_learning#Support) for more details. ### Does this PR introduce _any_ user-facing change? Yes. Associate Rules now have support measure ### How was this patch tested? existing and new unit test Closes #28903 from huaxingao/fpm. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: Sean Owen <srowen@gmail.com> |
||
HyukjinKwon | 1af19a7b68 |
[SPARK-32098][PYTHON] Use iloc for positional slicing instead of direct slicing in createDataFrame with Arrow
### What changes were proposed in this pull request? When you use floats are index of pandas, it creates a Spark DataFrame with a wrong results as below when Arrow is enabled: ```bash ./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true ``` ```python >>> import pandas as pd >>> spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])).show() +---+ | a| +---+ | 1| | 1| | 2| +---+ ``` This is because direct slicing uses the value as index when the index contains floats: ```python >>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])[2:] a 2.0 1 3.0 2 4.0 3 >>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.]).iloc[2:] a 4.0 3 >>> pd.DataFrame({'a': [1,2,3]}, index=[2, 3, 4])[2:] a 4 3 ``` This PR proposes to explicitly use `iloc` to positionally slide when we create a DataFrame from a pandas DataFrame with Arrow enabled. FWIW, I was trying to investigate why direct slicing refers the index value or the positional index sometimes but I stopped investigating further after reading this https://pandas.pydata.org/pandas-docs/stable/getting_started/10min.html#selection > While standard Python / Numpy expressions for selecting and setting are intuitive and come in handy for interactive work, for production code, we recommend the optimized pandas data access methods, `.at`, `.iat`, `.loc` and `.iloc`. ### Why are the changes needed? To create the correct Spark DataFrame from a pandas DataFrame without a data loss. ### Does this PR introduce _any_ user-facing change? Yes, it is a bug fix. ```bash ./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true ``` ```python import pandas as pd spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])).show() ``` Before: ``` +---+ | a| +---+ | 1| | 1| | 2| +---+ ``` After: ``` +---+ | a| +---+ | 1| | 2| | 3| +---+ ``` ### How was this patch tested? Manually tested and unittest were added. Closes #28928 from HyukjinKwon/SPARK-32098. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Bryan Cutler <cutlerb@gmail.com> |
||
Huaxin Gao | 297016e34e |
[SPARK-31893][ML] Add a generic ClassificationSummary trait
### What changes were proposed in this pull request? Add a generic ClassificationSummary trait ### Why are the changes needed? Add a generic ClassificationSummary trait so all the classification models can use it to implement summary. Currently in classification, we only have summary implemented in ```LogisticRegression```. There are requests to implement summary for ```LinearSVCModel``` in https://issues.apache.org/jira/browse/SPARK-20249 and to implement summary for ```RandomForestClassificationModel``` in https://issues.apache.org/jira/browse/SPARK-23631. If we add a generic ClassificationSummary trait and put all the common code there, we can easily add summary to ```LinearSVCModel``` and ```RandomForestClassificationModel```, and also add summary to all the other classification models. We can use the same approach to add a generic RegressionSummary trait to regression package and implement summary for all the regression models. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? existing tests Closes #28710 from huaxingao/summary_trait. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: Sean Owen <srowen@gmail.com> |
||
HyukjinKwon | feeca63198 |
[SPARK-32011][PYTHON][CORE] Remove warnings about pin-thread modes and guide to use collectWithJobGroup
### What changes were proposed in this pull request? This PR proposes to remove the warning about multi-thread in local properties, and change the guide to use `collectWithJobGroup` for multi-threads for now because: - It is too noisy to users who don't use multiple threads - the number of this single thread case is arguably more prevailing. - There was a critical issue found about pin-thread mode SPARK-32010, which will be fixed in Spark 3.1. - To smoothly migrate, `RDD.collectWithJobGroup` was added, which will be deprecated in Spark 3.1 with SPARK-32010 fixed. I will target to deprecate `RDD.collectWithJobGroup`, and make this pin-thread mode stable in Spark 3.1. In the future releases, I plan to make this mode as a default mode, and remove `RDD.collectWithJobGroup` away. ### Why are the changes needed? To avoid guiding users a feature with a critical issue, and provide a proper workaround for now. ### Does this PR introduce _any_ user-facing change? Yes, warning message and documentation. ### How was this patch tested? Manually tested: Before: ``` >>> spark.sparkContext.setLocalProperty("a", "b") /.../spark/python/pyspark/util.py:141: UserWarning: Currently, 'setLocalProperty' (set to local properties) with multiple threads does not properly work. Internally threads on PVM and JVM are not synced, and JVM thread can be reused for multiple threads on PVM, which fails to isolate local properties for each thread on PVM. To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). However, note that it cannot inherit the local properties from the parent thread although it isolates each thread on PVM and JVM with its own local properties. To work around this, you should manually copy and set the local properties from the parent thread to the child thread when you create another thread. ``` After: ``` >>> spark.sparkContext.setLocalProperty("a", "b") ``` Closes #28845 from HyukjinKwon/SPARK-32011. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
Liang-Chi Hsieh | 7f6a8ab166 |
[SPARK-31777][ML][PYSPARK] Add user-specified fold column to CrossValidator
### What changes were proposed in this pull request? This patch adds user-specified fold column support to `CrossValidator`. User can assign fold numbers to dataset instead of letting Spark do random splits. ### Why are the changes needed? This gives `CrossValidator` users more flexibility in splitting folds. ### Does this PR introduce _any_ user-facing change? Yes, a new `foldCol` param is added to `CrossValidator`. User can use it to specify custom fold splitting. ### How was this patch tested? Added unit tests. Closes #28704 from viirya/SPARK-31777. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Liang-Chi Hsieh <liangchi@uber.com> |
||
GuoPhilipse | f0e6d0ec13 |
[SPARK-31710][SQL] Fail casting numeric to timestamp by default
## What changes were proposed in this pull request? we fail casting from numeric to timestamp by default. ## Why are the changes needed? casting from numeric to timestamp is not a non-standard,meanwhile it may generate different result between spark and other systems,for example hive ## Does this PR introduce any user-facing change? Yes,user cannot cast numeric to timestamp directly,user have to use the following function to achieve the same effect:TIMESTAMP_SECONDS/TIMESTAMP_MILLIS/TIMESTAMP_MICROS ## How was this patch tested? unit test added Closes #28593 from GuoPhilipse/31710-fix-compatibility. Lead-authored-by: GuoPhilipse <guofei_ok@126.com> Co-authored-by: GuoPhilipse <46367746+GuoPhilipse@users.noreply.github.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Huaxin Gao | f83cb3cbb3 |
[SPARK-31925][ML] Summary.totalIterations greater than maxIters
### What changes were proposed in this pull request? In LogisticRegression and LinearRegression, if set maxIter=n, the model.summary.totalIterations returns n+1 if the training procedure does not drop out. This is because we use ```objectiveHistory.length``` as totalIterations, but ```objectiveHistory``` contains init sate, thus ```objectiveHistory.length``` is 1 larger than number of training iterations. ### Why are the changes needed? correctness ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new tests and also modify existing tests Closes #28786 from huaxingao/summary_iter. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: Sean Owen <srowen@gmail.com> |
||
HyukjinKwon | 56d4f27cf6 |
[SPARK-31966][ML][TESTS][PYTHON] Increase the timeout for StreamingLogisticRegressionWithSGDTests.test_training_and_prediction
### What changes were proposed in this pull request?
This is similar with
|
||
HyukjinKwon | 56264fb5d3 |
[SPARK-31965][TESTS][PYTHON] Move doctests related to Java function registration to test conditionally
### What changes were proposed in this pull request? This PR proposes to move the doctests in `registerJavaUDAF` and `registerJavaFunction` to the proper unittests that run conditionally when the test classes are present. Both tests are dependent on the test classes in JVM side, `test.org.apache.spark.sql.JavaStringLength` and `test.org.apache.spark.sql.MyDoubleAvg`. So if you run the tests against the plain `sbt package`, it fails as below: ``` ********************************************************************** File "/.../spark/python/pyspark/sql/udf.py", line 366, in pyspark.sql.udf.UDFRegistration.registerJavaFunction Failed example: spark.udf.registerJavaFunction( "javaStringLength", "test.org.apache.spark.sql.JavaStringLength", IntegerType()) Exception raised: Traceback (most recent call last): ... test.org.apache.spark.sql.JavaStringLength, please make sure it is on the classpath; ... 6 of 7 in pyspark.sql.udf.UDFRegistration.registerJavaFunction 2 of 4 in pyspark.sql.udf.UDFRegistration.registerJavaUDAF ***Test Failed*** 8 failures. ``` ### Why are the changes needed? In order to support to run the tests against the plain SBT build. See also https://spark.apache.org/developer-tools.html ### Does this PR introduce _any_ user-facing change? No, it's test-only. ### How was this patch tested? Manually tested as below: ```bash ./build/sbt -DskipTests -Phive-thriftserver clean package cd python ./run-tests --python-executable=python3 --testname="pyspark.sql.udf UserDefinedFunction" ./run-tests --python-executable=python3 --testname="pyspark.sql.tests.test_udf UDFTests" ``` ```bash ./build/sbt -DskipTests -Phive-thriftserver clean test:package cd python ./run-tests --python-executable=python3 --testname="pyspark.sql.udf UserDefinedFunction" ./run-tests --python-executable=python3 --testname="pyspark.sql.tests.test_udf UDFTests" ``` Closes #28795 from HyukjinKwon/SPARK-31965. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> |
||
Bryan Cutler | b7ef5294f1 |
[SPARK-31964][PYTHON] Use Pandas is_categorical on Arrow category type conversion
### What changes were proposed in this pull request? When using pyarrow to convert a Pandas categorical column, use `is_categorical` instead of trying to import `CategoricalDtype` ### Why are the changes needed? The import for `CategoricalDtype` had changed from Pandas 0.23 to 1.0 and pyspark currently tries both locations. Using `is_categorical` is a more stable API. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes #28793 from BryanCutler/arrow-use-is_categorical-SPARK-31964. Authored-by: Bryan Cutler <cutlerb@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
HyukjinKwon | 00d06cad56 |
[SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the case sensitivity in grouped and cogrouped pandas UDFs
### What changes were proposed in this pull request? This is another approach to fix the issue. See the previous try https://github.com/apache/spark/pull/28745. It was too invasive so I took more conservative approach. This PR proposes to resolve grouping attributes separately first so it can be properly referred when `FlatMapGroupsInPandas` and `FlatMapCoGroupsInPandas` are resolved without ambiguity. Previously, ```python from pyspark.sql.functions import * df = spark.createDataFrame([[1, 1]], ["column", "Score"]) pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP) def my_pandas_udf(pdf): return pdf.assign(Score=0.5) df.groupby('COLUMN').apply(my_pandas_udf).show() ``` was failed as below: ``` pyspark.sql.utils.AnalysisException: "Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;" ``` because the unresolved `COLUMN` in `FlatMapGroupsInPandas` doesn't know which reference to take from the child projection. After this fix, it resolves the child projection first with grouping keys and pass, to `FlatMapGroupsInPandas`, the attribute as a grouping key from the child projection that is positionally selected. ### Why are the changes needed? To resolve grouping keys correctly. ### Does this PR introduce _any_ user-facing change? Yes, ```python from pyspark.sql.functions import * df = spark.createDataFrame([[1, 1]], ["column", "Score"]) pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP) def my_pandas_udf(pdf): return pdf.assign(Score=0.5) df.groupby('COLUMN').apply(my_pandas_udf).show() ``` ```python df1 = spark.createDataFrame([(1, 1)], ("column", "value")) df2 = spark.createDataFrame([(1, 1)], ("column", "value")) df1.groupby("COLUMN").cogroup( df2.groupby("COLUMN") ).applyInPandas(lambda r, l: r + l, df1.schema).show() ``` Before: ``` pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.; ``` ``` pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input columns: [COLUMN, COLUMN, value, value];; 'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L, value#10L, column#13L, value#14L), [column#22L, value#23L] :- Project [COLUMN#9L, column#9L, value#10L] : +- LogicalRDD [column#9L, value#10L], false +- Project [COLUMN#13L, column#13L, value#14L] +- LogicalRDD [column#13L, value#14L], false ``` After: ``` +------+-----+ |column|Score| +------+-----+ | 1| 0.5| +------+-----+ ``` ``` +------+-----+ |column|value| +------+-----+ | 2| 2| +------+-----+ ``` ### How was this patch tested? Unittests were added and manually tested. Closes #28777 from HyukjinKwon/SPARK-31915-another. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Bryan Cutler <cutlerb@gmail.com> |
||
William Hyun | 2ab82fae57 |
[SPARK-31963][PYSPARK][SQL] Support both pandas 0.23 and 1.0 in serializers.py
### What changes were proposed in this pull request? This PR aims to support both pandas 0.23 and 1.0. ### Why are the changes needed? ``` $ pip install pandas==0.23.2 $ python -c "import pandas.CategoricalDtype" Traceback (most recent call last): File "<string>", line 1, in <module> ModuleNotFoundError: No module named 'pandas.CategoricalDtype' $ python -c "from pandas.api.types import CategoricalDtype" ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the Jenkins. ``` $ pip freeze | grep pandas pandas==0.23.2 $ python/run-tests.py --python-executables python --modules pyspark-sql ... Tests passed in 359 seconds ``` Closes #28789 from williamhyun/williamhyun-patch-2. Authored-by: William Hyun <williamhyun3@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> |
||
Takuya UESHIN | 032d17933b |
[SPARK-31945][SQL][PYSPARK] Enable cache for the same Python function
### What changes were proposed in this pull request? This PR proposes to make `PythonFunction` holds `Seq[Byte]` instead of `Array[Byte]` to be able to compare if the byte array has the same values for the cache manager. ### Why are the changes needed? Currently the cache manager doesn't use the cache for `udf` if the `udf` is created again even if the functions is the same. ```py >>> func = lambda x: x >>> df = spark.range(1) >>> df.select(udf(func)("id")).cache() ``` ```py >>> df.select(udf(func)("id")).explain() == Physical Plan == *(2) Project [pythonUDF0#14 AS <lambda>(id)#12] +- BatchEvalPython [<lambda>(id#0L)], [pythonUDF0#14] +- *(1) Range (0, 1, step=1, splits=12) ``` This is because `PythonFunction` holds `Array[Byte]`, and `equals` method of array equals only when the both array is the same instance. ### Does this PR introduce _any_ user-facing change? Yes, if the user reuse the Python function for the UDF, the cache manager will detect the same function and use the cache for it. ### How was this patch tested? I added a test case and manually. ```py >>> df.select(udf(func)("id")).explain() == Physical Plan == InMemoryTableScan [<lambda>(id)#12] +- InMemoryRelation [<lambda>(id)#12], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(2) Project [pythonUDF0#5 AS <lambda>(id)#3] +- BatchEvalPython [<lambda>(id#0L)], [pythonUDF0#5] +- *(1) Range (0, 1, step=1, splits=12) ``` Closes #28774 from ueshin/issues/SPARK-31945/udf_cache. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
HyukjinKwon | e28914095a |
[SPARK-31849][PYTHON][SQL][FOLLOW-UP] More correct error message in Python UDF exception message
### What changes were proposed in this pull request? This PR proposes to fix wordings in the Python UDF exception error message from: From: > An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. To: > An exception was thrown from the Python worker. Please see the stack trace below. It removes "executor" because Python worker is technically a separate process, and remove the duplicated wording "Python worker" . ### Why are the changes needed? To give users better exception messages. ### Does this PR introduce _any_ user-facing change? No, it's in unreleased branches only. If RC3 passes, yes, it will change the exception message. ### How was this patch tested? Manually tested. Closes #28762 from HyukjinKwon/SPARK-31849-followup-2. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
HyukjinKwon | a42af81706 |
[SPARK-31849][PYTHON][SQL][FOLLOW-UP] Deduplicate and reuse Utils.exceptionString in Python exception handling
### What changes were proposed in this pull request? This PR proposes to use existing util `org.apache.spark.util.Utils.exceptionString` for the same codes at: ```python jwriter = jvm.java.io.StringWriter() e.printStackTrace(jvm.java.io.PrintWriter(jwriter)) stacktrace = jwriter.toString() ``` ### Why are the changes needed? To deduplicate codes. Plus, less communication between JVM and Py4j. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually tested. Closes #28749 from HyukjinKwon/SPARK-31849-followup. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
HyukjinKwon | 53ce58da34 |
[MINOR][PYTHON] Add one more newline between JVM and Python tracebacks
### What changes were proposed in this pull request?
This PR proposes to add one more newline to clearly separate JVM and Python tracebacks:
Before:
```
Traceback (most recent call last):
...
pyspark.sql.utils.AnalysisException: Reference 'column' is ambiguous, could be: column, column.;
JVM stacktrace:
org.apache.spark.sql.AnalysisException: Reference 'column' is ambiguous, could be: column, column.;
...
```
After:
```
Traceback (most recent call last):
...
pyspark.sql.utils.AnalysisException: Reference 'column' is ambiguous, could be: column, column.;
JVM stacktrace:
org.apache.spark.sql.AnalysisException: Reference 'column' is ambiguous, could be: column, column.;
...
```
This is kind of a followup of
|
||
HyukjinKwon | e1d5201140 |
[SPARK-31895][PYTHON][SQL] Support DataFrame.explain(extended: str) case to be consistent with Scala side
### What changes were proposed in this pull request? Scala: ```scala scala> spark.range(10).explain("cost") ``` ``` == Optimized Logical Plan == Range (0, 10, step=1, splits=Some(12)), Statistics(sizeInBytes=80.0 B) == Physical Plan == *(1) Range (0, 10, step=1, splits=12) ``` PySpark: ```python >>> spark.range(10).explain("cost") ``` ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/dataframe.py", line 333, in explain raise TypeError(err_msg) TypeError: extended (optional) should be provided as bool, got <class 'str'> ``` In addition, it is consistent with other codes too, for example, `DataFrame.sample` also can support `DataFrame.sample(1.0)` and `DataFrame.sample(False)`. ### Why are the changes needed? To provide the consistent API support across APIs. ### Does this PR introduce _any_ user-facing change? Nope, it's only changes in unreleased branches. If this lands to master only, yes, users will be able to set `mode` as `df.explain("...")` in Spark 3.1. After this PR: ```python >>> spark.range(10).explain("cost") ``` ``` == Optimized Logical Plan == Range (0, 10, step=1, splits=Some(12)), Statistics(sizeInBytes=80.0 B) == Physical Plan == *(1) Range (0, 10, step=1, splits=12) ``` ### How was this patch tested? Unittest was added and manually tested as well to make sure: ```python spark.range(10).explain(True) spark.range(10).explain(False) spark.range(10).explain("cost") spark.range(10).explain(extended="cost") spark.range(10).explain(mode="cost") spark.range(10).explain() spark.range(10).explain(True, "cost") spark.range(10).explain(1.0) ``` Closes #28711 from HyukjinKwon/SPARK-31895. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
HyukjinKwon | 64cb6f7066 |
[SPARK-29137][ML][PYTHON][TESTS] Increase the timeout for StreamingLinearRegressionWithTests.test_train_prediction
### What changes were proposed in this pull request? It increases the timeout for `StreamingLinearRegressionWithTests.test_train_prediction` ``` Traceback (most recent call last): File "/home/jenkins/workspace/SparkPullRequestBuilder3/python/pyspark/mllib/tests/test_streaming_algorithms.py", line 503, in test_train_prediction self._eventually(condition) File "/home/jenkins/workspace/SparkPullRequestBuilder3/python/pyspark/mllib/tests/test_streaming_algorithms.py", line 69, in _eventually lastValue = condition() File "/home/jenkins/workspace/SparkPullRequestBuilder3/python/pyspark/mllib/tests/test_streaming_algorithms.py", line 498, in condition self.assertGreater(errors[1] - errors[-1], 2) AssertionError: 1.672640157855923 not greater than 2 ``` This could likely happen when the PySpark tests run in parallel and it become slow. ### Why are the changes needed? To make the tests less flaky. Seems it's being reported multiple times: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123144/consoleFull https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123146/testReport/ https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123141/testReport/ https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123142/testReport/ ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Jenkins will test it out. Closes #28701 from HyukjinKwon/SPARK-29137. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> |
||
HyukjinKwon | e69466056f |
[SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic
### What changes were proposed in this pull request? This PR proposes to make PySpark exception more Pythonic by hiding JVM stacktrace by default. It can be enabled by turning on `spark.sql.pyspark.jvmStacktrace.enabled` configuration. ``` Traceback (most recent call last): ... pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): ... ``` If this `spark.sql.pyspark.jvmStacktrace.enabled` is enabled, it appends: ``` JVM stacktrace: org.apache.spark.Exception: ... ... ``` For example, the codes below: ```python from pyspark.sql.functions import udf udf def divide_by_zero(v): raise v / 0 spark.range(1).select(divide_by_zero("id")).show() ``` will show an error messages that looks like Python exception thrown from the local. <details> <summary>Python exception message when <code>spark.sql.pyspark.jvmStacktrace.enabled</code> is off (default)</summary> ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco raise_from(converted) File "<string>", line 3, in raise_from pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr> result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda> return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "<stdin>", line 3, in divide_by_zero ZeroDivisionError: division by zero ``` </details> <details> <summary>Python exception message when <code>spark.sql.pyspark.jvmStacktrace.enabled</code> is on</summary> ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 137, in deco raise_from(converted) File "<string>", line 3, in raise_from pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr> result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda> return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "<stdin>", line 3, in divide_by_zero ZeroDivisionError: division by zero JVM stacktrace: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 192.168.35.193, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr> result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda> return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "<stdin>", line 3, in divide_by_zero ZeroDivisionError: division by zero at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2117) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2066) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2065) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2065) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1021) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1021) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1021) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2297) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2246) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2235) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:823) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2108) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2129) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2148) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3653) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642) at org.apache.spark.sql.Dataset.head(Dataset.scala:2695) at org.apache.spark.sql.Dataset.take(Dataset.scala:2902) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300) at org.apache.spark.sql.Dataset.showString(Dataset.scala:337) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr> result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda> return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "<stdin>", line 3, in divide_by_zero ZeroDivisionError: division by zero at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more ``` </details> <details> <summary>Python exception message without this change</summary> ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 98, in deco return f(*a, **kw) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o160.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 5.0 failed 4 times, most recent failure: Lost task 10.3 in stage 5.0 (TID 37, 192.168.35.193, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr> result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda> return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "<stdin>", line 3, in divide_by_zero ZeroDivisionError: division by zero at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2117) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2066) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2065) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2065) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1021) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1021) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1021) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2297) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2246) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2235) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:823) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2108) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2129) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2148) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3653) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642) at org.apache.spark.sql.Dataset.head(Dataset.scala:2695) at org.apache.spark.sql.Dataset.take(Dataset.scala:2902) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300) at org.apache.spark.sql.Dataset.showString(Dataset.scala:337) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr> result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda> return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "<stdin>", line 3, in divide_by_zero ZeroDivisionError: division by zero at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more ``` </details> <br/> Another example with Python 3.7: ```python sql("a") ``` <details> <summary>Python exception message when <code>spark.sql.pyspark.jvmStacktrace.enabled</code> is off (default)</summary> ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/session.py", line 646, in sql return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco raise_from(converted) File "<string>", line 3, in raise_from pyspark.sql.utils.ParseException: mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == a ^^^ ``` </details> <details> <summary>Python exception message when <code>spark.sql.pyspark.jvmStacktrace.enabled</code> is on</summary> ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/session.py", line 646, in sql return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco raise_from(converted) File "<string>", line 3, in raise_from pyspark.sql.utils.ParseException: mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == a ^^^ JVM stacktrace: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == a ^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:266) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:133) at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:81) at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:604) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:604) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) ``` </details> <details> <summary>Python exception message without this change</summary> ``` Traceback (most recent call last): File "/.../spark/python/pyspark/sql/utils.py", line 98, in deco return f(*a, **kw) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o26.sql. : org.apache.spark.sql.catalyst.parser.ParseException: mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == a ^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:266) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:133) at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:81) at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:604) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:604) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/session.py", line 646, in sql return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 102, in deco raise converted pyspark.sql.utils.ParseException: mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == a ^^^ ``` </details> ### Why are the changes needed? Currently, PySpark exceptions are very unfriendly to Python users with causing a bunch of JVM stacktrace. See "Python exception message without this change" above. ### Does this PR introduce _any_ user-facing change? Yes, it will change the exception message. See the examples above. ### How was this patch tested? Manually tested by ```bash ./bin/pyspark --conf spark.sql.pyspark.jvmStacktrace.enabled=true ``` and running the examples above. Closes #28661 from HyukjinKwon/python-debug. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
HyukjinKwon | 29c51d682b |
[SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams
### What changes were proposed in this pull request? This PR manually specifies the class for the input array being used in `(SparkContext|StreamingContext).union`. It fixes a regression introduced from SPARK-25737. ```python rdd1 = sc.parallelize([1,2,3,4,5]) rdd2 = sc.parallelize([6,7,8,9,10]) pairRDD1 = rdd1.zip(rdd2) sc.union([pairRDD1, pairRDD1]).collect() ``` in the current master and `branch-3.0`: ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/context.py", line 870, in union jrdds[i] = rdds[i]._jrdd File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py", line 238, in __setitem__ File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py", line 221, in __set_item File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 332, in get_return_value py4j.protocol.Py4JError: An error occurred while calling None.None. Trace: py4j.Py4JException: Cannot convert org.apache.spark.api.java.JavaPairRDD to org.apache.spark.api.java.JavaRDD at py4j.commands.ArrayCommand.convertArgument(ArrayCommand.java:166) at py4j.commands.ArrayCommand.setArray(ArrayCommand.java:144) at py4j.commands.ArrayCommand.execute(ArrayCommand.java:97) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) ``` which works in Spark 2.4.5: ``` [(1, 6), (2, 7), (3, 8), (4, 9), (5, 10), (1, 6), (2, 7), (3, 8), (4, 9), (5, 10)] ``` It assumed the class of the input array is the same `JavaRDD` or `JavaDStream`; however, that can be different such as `JavaPairRDD`. This fix is based on redsanket's initial approach, and will be co-authored. ### Why are the changes needed? To fix a regression from Spark 2.4.5. ### Does this PR introduce _any_ user-facing change? No, it's only in unreleased branches. This is to fix a regression. ### How was this patch tested? Manually tested, and a unittest was added. Closes #28648 from HyukjinKwon/SPARK-31788. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
Huaxin Gao | 45cf5e9950 |
[SPARK-31840][ML] Add instance weight support in LogisticRegressionSummary
### What changes were proposed in this pull request? Add instance weight support in LogisticRegressionSummary ### Why are the changes needed? LogisticRegression, MulticlassClassificationEvaluator and BinaryClassificationEvaluator support instance weight. We should support instance weight in LogisticRegressionSummary too. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new tests Closes #28657 from huaxingao/weighted_summary. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: Sean Owen <srowen@gmail.com> |
||
Bryan Cutler | 8bbb666622 |
[SPARK-25351][PYTHON][TEST][FOLLOWUP] Fix test assertions to be consistent
### What changes were proposed in this pull request? Followup to make assertions from recent test consistent with the rest of the module ### Why are the changes needed? Better to use assertions from `unittest` and be consistent ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes #28659 from BryanCutler/arrow-category-test-fix-SPARK-25351. Authored-by: Bryan Cutler <cutlerb@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
iRakson | 2f92ea0df4 |
[SPARK-31763][PYSPARK] Add inputFiles method in PySpark DataFrame Class
### What changes were proposed in this pull request? Adds `inputFiles()` method to PySpark `DataFrame`. Using this, PySpark users can list all files constituting a `DataFrame`. **Before changes:** ``` >>> spark.read.load("examples/src/main/resources/people.json", format="json").inputFiles() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/***/***/spark/python/pyspark/sql/dataframe.py", line 1388, in __getattr__ "'%s' object has no attribute '%s'" % (self.__class__.__name__, name)) AttributeError: 'DataFrame' object has no attribute 'inputFiles' ``` **After changes:** ``` >>> spark.read.load("examples/src/main/resources/people.json", format="json").inputFiles() [u'file:///***/***/spark/examples/src/main/resources/people.json'] ``` ### Why are the changes needed? This method is already supported for spark with scala and java. ### Does this PR introduce _any_ user-facing change? Yes, Now users can list all files of a DataFrame using `inputFiles()` ### How was this patch tested? UT added. Closes #28652 from iRakson/SPARK-31763. Authored-by: iRakson <raksonrakesh@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
Jalpan Randeri | 339b0ecadb |
[SPARK-25351][SQL][PYTHON] Handle Pandas category type when converting from Python with Arrow
Handle Pandas category type while converting from python with Arrow enabled. The category column will be converted to whatever type the category elements are as is the case with Arrow disabled. ### Does this PR introduce any user-facing change? No ### How was this patch tested? New unit tests were added for `createDataFrame` and scalar `pandas_udf` Closes #26585 from jalpan-randeri/feature-pyarrow-dictionary-type. Authored-by: Jalpan Randeri <randerij@amazon.com> Signed-off-by: Bryan Cutler <cutlerb@gmail.com> |
||
HyukjinKwon | 7fb2275f00 |
Revert "[SPARK-31788][CORE][PYTHON] Fix UnionRDD of PairRDDs"
This reverts commit
|
||
Huaxin Gao | d4007776f2 |
[SPARK-31734][ML][PYSPARK] Add weight support in ClusteringEvaluator
### What changes were proposed in this pull request? Add weight support in ClusteringEvaluator ### Why are the changes needed? Currently, BinaryClassificationEvaluator, RegressionEvaluator, and MulticlassClassificationEvaluator support instance weight, but ClusteringEvaluator doesn't, so we will add instance weight support in ClusteringEvaluator. ### Does this PR introduce _any_ user-facing change? Yes. ClusteringEvaluator.setWeightCol ### How was this patch tested? add new unit test Closes #28553 from huaxingao/weight_evaluator. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: Sean Owen <srowen@gmail.com> |
||
schintap | a61911c50c |
[SPARK-31788][CORE][PYTHON] Fix UnionRDD of PairRDDs
### What changes were proposed in this pull request? UnionRDD of PairRDDs causing a bug. The fix is to check for instance type before proceeding ### Why are the changes needed? Changes are needed to avoid users running into issues with union rdd operation with any other type other than JavaRDD. ### Does this PR introduce _any_ user-facing change? Yes Before: SparkSession available as 'spark'. >>> rdd1 = sc.parallelize([1,2,3,4,5]) >>> rdd2 = sc.parallelize([6,7,8,9,10]) >>> pairRDD1 = rdd1.zip(rdd2) >>> unionRDD1 = sc.union([pairRDD1, pairRDD1]) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/gs/spark/latest/python/pyspark/context.py", line 870, in union jrdds[i] = rdds[i]._jrdd File "/home/gs/spark/latest/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py", line 238, in setitem File "/home/gs/spark/latest/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py", line 221, in __set_item File "/home/gs/spark/latest/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 332, in get_return_value py4j.protocol.Py4JError: An error occurred while calling None.None. Trace: py4j.Py4JException: Cannot convert org.apache.spark.api.java.JavaPairRDD to org.apache.spark.api.java.JavaRDD at py4j.commands.ArrayCommand.convertArgument(ArrayCommand.java:166) at py4j.commands.ArrayCommand.setArray(ArrayCommand.java:144) at py4j.commands.ArrayCommand.execute(ArrayCommand.java:97) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) After: >>> rdd2 = sc.parallelize([6,7,8,9,10]) >>> pairRDD1 = rdd1.zip(rdd2) >>> unionRDD1 = sc.union([pairRDD1, pairRDD1]) >>> unionRDD1.collect() [(1, 6), (2, 7), (3, 8), (4, 9), (5, 10), (1, 6), (2, 7), (3, 8), (4, 9), (5, 10)] ### How was this patch tested? Tested with the reproduced piece of code above manually Closes #28603 from redsanket/SPARK-31788. Authored-by: schintap <schintap@verizonmedia.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
HyukjinKwon |
dc3a606fbd
|
[SPARK-31767][PYTHON][CORE] Remove ResourceInformation in pyspark module's namespace
### What changes were proposed in this pull request? This PR proposes to only allow the import of `ResourceInformation` as below: ``` pyspark.resource.ResourceInformation ``` instead of ``` pyspark.ResourceInformation pyspark.resource.ResourceInformation ``` because `pyspark.resource` is a separate module, and it is documented so. The constructor of `ResourceInformation` isn't supposed to directly call anyway. ### Why are the changes needed? To keep the code structure coherent. ### Does this PR introduce _any_ user-facing change? No, it will be in the unreleased branches. ### How was this patch tested? Manually tested via importing: Before: ```python >>> import pyspark >>> pyspark.ResourceInformation <class 'pyspark.resource.information.ResourceInformation'> >>> pyspark.resource.ResourceInformation <class 'pyspark.resource.information.ResourceInformation'> ``` After: ```python >>> import pyspark >>> pyspark.ResourceInformation Traceback (most recent call last): File "<stdin>", line 1, in <module> AttributeError: module 'pyspark' has no attribute 'ResourceInformation' >>> pyspark.resource.ResourceInformation <class 'pyspark.resource.information.ResourceInformation'> ``` Also tested via ```bash cd python ./run-tests --python-executables=python3 --modules=pyspark-core,pyspark-resource ``` Jenkins will test and existing tests should cover. Closes #28589 from HyukjinKwon/SPARK-31767. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> |
||
HyukjinKwon |
6fb22aa42d
|
[SPARK-31748][PYTHON] Document resource module in PySpark doc and rename/move classes
### What changes were proposed in this pull request?
This PR is kind of a followup for SPARK-29641 and SPARK-28234. This PR proposes:
1.. Document the new `pyspark.resource` module introduced at
|
||
David Toneian | acab558e55 |
[SPARK-31739][PYSPARK][DOCS][MINOR] Fix docstring syntax issues and misplaced space characters
This commit is published into the public domain. ### What changes were proposed in this pull request? Some syntax issues in docstrings have been fixed. ### Why are the changes needed? In some places, the documentation did not render as intended, e.g. parameter documentations were not formatted as such. ### Does this PR introduce any user-facing change? Slight improvements in documentation. ### How was this patch tested? Manual testing and `dev/lint-python` run. No new Sphinx warnings arise due to this change. Closes #28559 from DavidToneian/SPARK-31739. Authored-by: David Toneian <david@toneian.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
Huaxin Gao | e10516ae63 |
[SPARK-31681][ML][PYSPARK] Python multiclass logistic regression evaluate should return LogisticRegressionSummary
### What changes were proposed in this pull request? Return LogisticRegressionSummary for multiclass logistic regression evaluate in PySpark ### Why are the changes needed? Currently we have ``` since("2.0.0") def evaluate(self, dataset): if not isinstance(dataset, DataFrame): raise ValueError("dataset must be a DataFrame but got %s." % type(dataset)) java_blr_summary = self._call_java("evaluate", dataset) return BinaryLogisticRegressionSummary(java_blr_summary) ``` we should return LogisticRegressionSummary for multiclass logistic regression ### Does this PR introduce _any_ user-facing change? Yes return LogisticRegressionSummary instead of BinaryLogisticRegressionSummary for multiclass logistic regression in Python ### How was this patch tested? unit test Closes #28503 from huaxingao/lr_summary. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: Sean Owen <srowen@gmail.com> |
||
zhengruifeng | e7fa778dc7 |
[SPARK-30699][ML][PYSPARK] GMM blockify input vectors
### What changes were proposed in this pull request? 1, add new param blockSize; 2, if blockSize==1, keep original behavior, code path trainOnRows; 3, if blockSize>1, standardize and stack input vectors to blocks (like ALS/MLP), code path trainOnBlocks ### Why are the changes needed? performance gain on dense dataset HIGGS: 1, save about 45% RAM; 2, 3X faster with openBLAS ### Does this PR introduce any user-facing change? add a new expert param `blockSize` ### How was this patch tested? added testsuites Closes #27473 from zhengruifeng/blockify_gmm. Authored-by: zhengruifeng <ruifengz@foxmail.com> Signed-off-by: zhengruifeng <ruifengz@foxmail.com> |
||
Huaxin Gao | 7a670b5a0a |
[SPARK-31667][ML][PYSPARK] Python side flatten the result dataframe of ANOVATest/ChisqTest/FValueTest
### What changes were proposed in this pull request? Add Python version of ``` Since("3.1.0") def test( dataset: DataFrame, featuresCol: String, labelCol: String, flatten: Boolean): DataFrame ``` ### Why are the changes needed? parity between scala and python ### Does this PR introduce _any_ user-facing change? yes new method ``` Since("3.1.0") def test( dataset: DataFrame, featuresCol: String, labelCol: String, flatten: Boolean): DataFrame ``` in PySpark ANOVATest/ChisqTest/FValueTest ### How was this patch tested? New doctest Closes #28483 from huaxingao/flatten_py. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: Sean Owen <srowen@gmail.com> |
||
zhengruifeng | bb9b50c217 |
[SPARK-31656][ML][PYSPARK] AFT blockify input vectors
### What changes were proposed in this pull request? 1, add new param blockSize; 2, add a new class InstanceBlock; 3, if blockSize==1, keep original behavior; if blockSize>1, stack input vectors to blocks (like ALS/MLP); 4, if blockSize>1, standardize the input outside of optimization procedure; ### Why are the changes needed? it will obtain performance gain on dense datasets, such as epsilon 1, reduce RAM to persist traing dataset; (save about 40% RAM) 2, use Level-2 BLAS routines; (~10X speedup) ### Does this PR introduce _any_ user-facing change? Yes, a new param is added ### How was this patch tested? existing and added testsuites Closes #28473 from zhengruifeng/blockify_aft. Authored-by: zhengruifeng <ruifengz@foxmail.com> Signed-off-by: zhengruifeng <ruifengz@foxmail.com> |
||
Huaxin Gao | 18d2ba53e4 |
[SPARK-31652][ML][PYSPARK] Add ANOVASelector and FValueSelector to PySpark
### What changes were proposed in this pull request? Add ANOVASelector and FValueSelector to PySpark ### Why are the changes needed? ANOVASelector and FValueSelector have been implemented in Scala. We need to implement these in Python as well. ### Does this PR introduce _any_ user-facing change? Yes. Add Python version of ANOVASelector and FValueSelector ### How was this patch tested? new doctest Closes #28464 from huaxingao/selector_py. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: zhengruifeng <ruifengz@foxmail.com> |
||
zhengruifeng | 97332f26bf |
[SPARK-30660][ML][PYSPARK] LinearRegression blockify input vectors
### What changes were proposed in this pull request? 1, add new param blockSize; 2, add a new class InstanceBlock; 3, if blockSize==1, keep original behavior; if blockSize>1, stack input vectors to blocks (like ALS/MLP); 4, if blockSize>1, standardize the input outside of optimization procedure; ### Why are the changes needed? it will obtain performance gain on dense datasets, such as `epsilon` 1, reduce RAM to persist traing dataset; (save about 40% RAM) 2, use Level-2 BLAS routines; (up to 6X(squaredError)~12X(huber) speedup) ### Does this PR introduce _any_ user-facing change? Yes, a new param is added ### How was this patch tested? existing and added testsuites Closes #28471 from zhengruifeng/blockify_lir_II. Authored-by: zhengruifeng <ruifengz@foxmail.com> Signed-off-by: zhengruifeng <ruifengz@foxmail.com> |
||
Kent Yao | b31ae7bb0b |
[SPARK-31615][SQL] Pretty string output for sql method of RuntimeReplaceable expressions
### What changes were proposed in this pull request? The RuntimeReplaceable ones are runtime replaceable, thus, their original parameters are not going to be resolved to PrettyAttribute and remain debug style string if we directly implement their `sql` methods with their parameters' `sql` methods. This PR is raised with suggestions by maropu and cloud-fan https://github.com/apache/spark/pull/28402/files#r417656589. In this PR, we re-implement the `sql` methods of the RuntimeReplaceable ones with toPettySQL ### Why are the changes needed? Consistency of schema output between RuntimeReplaceable expressions and normal ones. For example, `date_format` vs `to_timestamp`, before this PR, they output differently #### Before ```sql select date_format(timestamp '2019-10-06', 'yyyy-MM-dd uuuu') struct<date_format(TIMESTAMP '2019-10-06 00:00:00', yyyy-MM-dd uuuu):string> select to_timestamp("2019-10-06S10:11:12.12345", "yyyy-MM-dd'S'HH:mm:ss.SSSSSS") struct<to_timestamp('2019-10-06S10:11:12.12345', 'yyyy-MM-dd\'S\'HH:mm:ss.SSSSSS'):timestamp> ``` #### After ```sql select date_format(timestamp '2019-10-06', 'yyyy-MM-dd uuuu') struct<date_format(TIMESTAMP '2019-10-06 00:00:00', yyyy-MM-dd uuuu):string> select to_timestamp("2019-10-06T10:11:12'12", "yyyy-MM-dd'T'HH:mm:ss''SSSS") struct<to_timestamp(2019-10-06T10:11:12'12, yyyy-MM-dd'T'HH:mm:ss''SSSS):timestamp> ```` ### Does this PR introduce _any_ user-facing change? Yes, the schema output style changed for the runtime replaceable expressions as shown in the above example ### How was this patch tested? regenerate all related tests Closes #28420 from yaooqinn/SPARK-31615. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org> |
||
zhengruifeng | 052ff49acd |
[SPARK-30659][ML][PYSPARK] LogisticRegression blockify input vectors
### What changes were proposed in this pull request? 1, reorg the `fit` method in LR to several blocks (`createModel`, `createBounds`, `createOptimizer`, `createInitCoefWithInterceptMatrix`); 2, add new param blockSize; 3, if blockSize==1, keep original behavior, code path `trainOnRows`; 4, if blockSize>1, standardize and stack input vectors to blocks (like ALS/MLP), code path `trainOnBlocks` ### Why are the changes needed? On dense dataset `epsilon_normalized.t`: 1, reduce RAM to persist traing dataset; (save about 40% RAM) 2, use Level-2 BLAS routines; (4x ~ 5x faster) ### Does this PR introduce _any_ user-facing change? Yes, a new param is added ### How was this patch tested? existing and added testsuites Closes #28458 from zhengruifeng/blockify_lor_II. Authored-by: zhengruifeng <ruifengz@foxmail.com> Signed-off-by: zhengruifeng <ruifengz@foxmail.com> |
||
Huaxin Gao | 09ece50799 |
[SPARK-31609][ML][PYSPARK] Add VarianceThresholdSelector to PySpark
### What changes were proposed in this pull request? Add VarianceThresholdSelector to PySpark ### Why are the changes needed? parity between Scala and Python ### Does this PR introduce any user-facing change? Yes. VarianceThresholdSelector is added to PySpark ### How was this patch tested? new doctest Closes #28409 from huaxingao/variance_py. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: Sean Owen <srowen@gmail.com> |
||
zhengruifeng | ebdf41dd69 |
[SPARK-30642][ML][PYSPARK] LinearSVC blockify input vectors
### What changes were proposed in this pull request? 1, add new param `blockSize`; 2, add a new class InstanceBlock; 3, **if `blockSize==1`, keep original behavior; if `blockSize>1`, stack input vectors to blocks (like ALS/MLP);** 4, if `blockSize>1`, standardize the input outside of optimization procedure; ### Why are the changes needed? 1, reduce RAM to persist traing dataset; (save about 40% RAM) 2, use Level-2 BLAS routines; (4x ~ 5x faster on dataset `epsilon`) ### Does this PR introduce any user-facing change? Yes, a new param is added ### How was this patch tested? existing and added testsuites Closes #28349 from zhengruifeng/blockify_svc_II. Authored-by: zhengruifeng <ruifengz@foxmail.com> Signed-off-by: zhengruifeng <ruifengz@foxmail.com> |
||
Weichen Xu | ee1de66fe4 |
[SPARK-31549][PYSPARK] Add a develop API invoking collect on Python RDD with user-specified job group
### What changes were proposed in this pull request? I add a new API in pyspark RDD class: def collectWithJobGroup(self, groupId, description, interruptOnCancel=False) This API do the same thing with `rdd.collect`, but it can specify the job group when do collect. The purpose of adding this API is, if we use: ``` sc.setJobGroup("group-id...") rdd.collect() ``` The `setJobGroup` API in pyspark won't work correctly. This related to a bug discussed in https://issues.apache.org/jira/browse/SPARK-31549 Note: This PR is a rather temporary workaround for `PYSPARK_PIN_THREAD`, and as a step to migrate to `PYSPARK_PIN_THREAD` smoothly. It targets Spark 3.0. - `PYSPARK_PIN_THREAD` is unstable at this moment that affects whole PySpark applications. - It is impossible to make it runtime configuration as it has to be set before JVM is launched. - There is a thread leak issue between Python and JVM. We should address but it's not a release blocker for Spark 3.0 since the feature is experimental. I plan to handle this after Spark 3.0 due to stability. Once `PYSPARK_PIN_THREAD` is enabled by default, we should remove this API out ideally. I will target to deprecate this API in Spark 3.1. ### Why are the changes needed? Fix bug. ### Does this PR introduce any user-facing change? A develop API in pyspark: `pyspark.RDD. collectWithJobGroup` ### How was this patch tested? Unit test. Closes #28395 from WeichenXu123/collect_with_job_group. Authored-by: Weichen Xu <weichen.xu@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |