### What changes were proposed in this pull request?
As I have comment in [SPARK-29516](https://github.com/apache/spark/pull/26172#issuecomment-544364977)
SparkSession.sql() method parse process not under current sparksession's conf, so some configuration about parser is not valid in multi-thread situation.
In this pr, we add a SQLConf parameter to AbstractSqlParser and initial it with SessionState's conf.
Then for each SparkSession's parser process. It will use's it's own SessionState's SQLConf and to be thread safe
### Why are the changes needed?
Fix bug
### Does this PR introduce any user-facing change?
NO
### How was this patch tested?
NO
Closes#26187 from AngersZhuuuu/SPARK-29530.
Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Only invoke `checkAndGlobPathIfNecessary()` when we have to use `InMemoryFileIndex`.
### Why are the changes needed?
Avoid unnecessary function invocation.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Pass Jenkins.
Closes#26196 from Ngone51/dev-avoid-unnecessary-invocation-on-globpath.
Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
I extended `ExtractBenchmark` to support the `INTERVAL` type of the `source` parameter of the `date_part` function.
### Why are the changes needed?
- To detect performance issues while changing implementation of the `date_part` function in the future.
- To find out current performance bottlenecks in `date_part` for the `INTERVAL` type
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
By running the benchmark and print out produced values per each `field` value.
Closes#26175 from MaxGekk/extract-interval-benchmark.
Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Added new benchmark `IntervalBenchmark` to measure performance of interval related functions. In the PR, I added benchmarks for casting strings to interval. In particular, interval strings with `interval` prefix and without it because there is special code for this da576a737c/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java (L100-L103) . And also I added benchmarks for different number of units in interval strings, for example 1 unit is `interval 10 years`, 2 units w/o interval is `10 years 5 months`, and etc.
### Why are the changes needed?
- To find out current performance issues in casting to intervals
- The benchmark can be used while refactoring/re-implementing `CalendarInterval.fromString()` or `CalendarInterval.fromCaseInsensitiveString()`.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
By running the benchmark via the command:
```shell
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.IntervalBenchmark"
```
Closes#26189 from MaxGekk/interval-from-string-benchmark.
Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This pr refine the code in ThriftServerQueryTestSuite.blackList to reuse the black list of SQLQueryTestSuite instead of duplicating all test cases from SQLQueryTestSuite.blackList.
### Why are the changes needed?
To reduce code duplication.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
N/A
Closes#26188 from fuwhu/SPARK-TBD.
Authored-by: fuwhu <bestwwg@163.com>
Signed-off-by: Yuming Wang <wgyumg@gmail.com>
### What changes were proposed in this pull request?
This PR upgrades `scala-maven-plugin` to `4.2.4` for Scala `2.13.1`.
### Why are the changes needed?
Scala 2.13.1 seems to break the binary compatibility.
We need to upgrade `scala-maven-plugin` to bring the the following fixes for the latest Scala 2.13.1.
- https://github.com/davidB/scala-maven-plugin/issues/363
- https://github.com/sbt/zinc/issues/698
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
For now, we don't support Scala-2.13. This PR at least needs to pass the existing Jenkins with Maven to get prepared for Scala-2.13.
Closes#26185 from dongjoon-hyun/SPARK-29528.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
`CatalogTable` to `HiveTable` will change the table's ownership. How to reproduce:
```scala
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.types.{LongType, StructType}
val identifier = TableIdentifier("spark_29498", None)
val owner = "SPARK-29498"
val newTable = CatalogTable(
identifier,
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = None,
outputFormat = None,
serde = None,
compressed = false,
properties = Map.empty),
owner = owner,
schema = new StructType().add("i", LongType, false),
provider = Some("hive"))
spark.sessionState.catalog.createTable(newTable, false)
// The owner is not SPARK-29498
println(spark.sessionState.catalog.getTableMetadata(identifier).owner)
```
This PR makes it set the `HiveTable`'s owner to `CatalogTable`'s owner if it's owner is not empty when converting `CatalogTable` to `HiveTable`.
### Why are the changes needed?
We should not change the ownership of the table when converting `CatalogTable` to `HiveTable`.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
unit test
Closes#26160 from wangyum/SPARK-29498.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
```
bit_and(expression) -- The bitwise AND of all non-null input values, or null if none
bit_or(expression) -- The bitwise OR of all non-null input values, or null if none
```
More details:
https://www.postgresql.org/docs/9.3/functions-aggregate.html
### Why are the changes needed?
Postgres, Mysql and many other popular db support them.
### Does this PR introduce any user-facing change?
add two bit agg
### How was this patch tested?
add ut
Closes#26155 from yaooqinn/SPARK-27879.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Instead of using GZIP for compressing the serialized `MapStatuses`, ZStd provides better compression rate and faster compression time.
The original approach is serializing and writing data directly into `GZIPOutputStream` as one step; however, the compression time is faster if a bigger chuck of the data is processed by the codec at once. As a result, in this PR, the serialized data is written into an uncompressed byte array first, and then the data is compressed. For smaller `MapStatues`, we find it's 2x faster.
Here is the benchmark result.
#### 20k map outputs, and each has 500 blocks
1. ZStd two steps in this PR: 0.402 ops/ms, 89,066 bytes
2. ZStd one step as the original approach: 0.370 ops/ms, 89,069 bytes
3. GZip: 0.092 ops/ms, 217,345 bytes
#### 20k map outputs, and each has 5 blocks
1. ZStd two steps in this PR: 0.9 ops/ms, 75,449 bytes
2. ZStd one step as the original approach: 0.38 ops/ms, 75,452 bytes
3. GZip: 0.21 ops/ms, 160,094 bytes
### Why are the changes needed?
Decrease the time for serializing the `MapStatuses` in large scale job.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#26085 from dbtsai/mapStatus.
Lead-authored-by: DB Tsai <d_tsai@apple.com>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR fix Fix the associated location already exists in `SQLQueryTestSuite`:
```
build/sbt "~sql/test-only *SQLQueryTestSuite -- -z postgreSQL/join.sql"
...
[info] - postgreSQL/join.sql *** FAILED *** (35 seconds, 420 milliseconds)
[info] postgreSQL/join.sql
[info] Expected "[]", but got "[org.apache.spark.sql.AnalysisException
[info] Can not create the managed table('`default`.`tt3`'). The associated location('file:/root/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQueryTestSuite/tt3') already exists.;]" Result did not match for query #108
```
### Why are the changes needed?
Fix bug.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
N/A
Closes#26181 from wangyum/TestError.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Currently pyspark doesn't write/read `avgMetrics` in `CrossValidatorModel`, whereas scala supports it.
### Why are the changes needed?
Test step to reproduce it:
```
dataset = spark.createDataFrame([(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,parallelism=2)
cvModel = cv.fit(dataset)
cvModel.write().save("/tmp/model")
cvModel2 = CrossValidatorModel.read().load("/tmp/model")
print(cvModel.avgMetrics) # prints non empty result as expected
print(cvModel2.avgMetrics) # Bug: prints an empty result.
```
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Manually tested
Before patch:
```
>>> cvModel.write().save("/tmp/model_0")
>>> cvModel2 = CrossValidatorModel.read().load("/tmp/model_0")
>>> print(cvModel2.avgMetrics)
[]
```
After patch:
```
>>> cvModel2 = CrossValidatorModel.read().load("/tmp/model_2")
>>> print(cvModel2.avgMetrics[0])
0.5
```
Closes#26038 from shahidki31/avgMetrics.
Authored-by: shahid <shahidki31@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
Add RepairTableStatement and make REPAIR TABLE go through the same catalog/table resolution framework of v2 commands.
### Why are the changes needed?
It's important to make all the commands have the same table resolution behavior, to avoid confusing end-users. e.g.
```
USE my_catalog
DESC t // success and describe the table t from my_catalog
MSCK REPAIR TABLE t // report table not found as there is no table t in the session catalog
```
### Does this PR introduce any user-facing change?
yes. When running MSCK REPAIR TABLE, Spark fails the command if the current catalog is set to a v2 catalog, or the table name specified a v2 catalog.
### How was this patch tested?
New unit tests
Closes#26168 from imback82/repair_table.
Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Liang-Chi Hsieh <liangchi@uber.com>
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/25241 .
The typed interval expression should fail for invalid format.
### Why are the changes needed?
Te be consistent with the typed timestamp/date expression
### Does this PR introduce any user-facing change?
Yes. But this feature is not released yet.
### How was this patch tested?
updated test
Closes#26151 from cloud-fan/bug.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Yuming Wang <wgyumg@gmail.com>
### What changes were proposed in this pull request?
This PR aims to add a new column `Duration` for running drivers in Apache Spark `Standalone` master web UI in order to improve UX. This help users like the other `Duration` columns in the `Running` and `Completed` application tables.
### Why are the changes needed?
When we use `--supervise`, the drivers can survive longer.
Technically, the `Duration` column is not the same. (Please see the image below.)
### Does this PR introduce any user-facing change?
Yes. The red box is added newly.
<img width="1312" alt="Screen Shot 2019-10-14 at 12 53 43 PM" src="https://user-images.githubusercontent.com/9700541/66779127-50301b80-ee82-11e9-853f-72222cd011ac.png">
### How was this patch tested?
Manual since this is a UI column. After starting standalone cluster and jobs, kill the `DriverWrapper` and see the UI.
```
$ sbin/start-master.sh
$ sbin/start-slave.sh spark://$(hostname):7077
$ bin/spark-submit --master spark://(hostname):7077 --deploy-mode cluster --supervise --class org.apache.spark.examples.JavaSparkPi examples/target/scala-2.12/jars/spark-examples_2.12-3.0.0-SNAPSHOT.jar 1000
$ jps
41521 DriverWrapper
...
$ kill -9 41521 // kill the `DriverWrapper`.
```
Closes#26113 from dongjoon-hyun/SPARK-29466.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
* Adding an additional check in `stringToTimestamp` to handle cases where the input has trailing ':'
* Added a test to make sure this works.
### Why are the changes needed?
In a couple of scenarios while converting from String to Timestamp `DateTimeUtils.stringToTimestamp` throws an array out of bounds exception if there is trailing ':'. The behavior of this method requires it to return `None` in case the format of the string is incorrect.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Added a test in the `DateTimeTestUtils` suite to test if my fix works.
Closes#26143 from rahulsmahadev/SPARK-29494.
Lead-authored-by: Rahul Mahadev <rahul.mahadev@databricks.com>
Co-authored-by: Rahul Shivu Mahadev <51690557+rahulsmahadev@users.noreply.github.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
<!--
Thanks for sending a pull request! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
4. Be sure to keep the PR description updated to reflect all changes.
5. Please write your PR title to summarize what this PR proposes.
6. If possible, provide a concise example to reproduce the issue for a faster review.
-->
### What changes were proposed in this pull request?
Add benchmark code for MapStatuses serialization & deserialization performance.
### Why are the changes needed?
For comparing the performance differences against optimization.
### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->
No
### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->
No test is required.
Closes#26169 from dbtsai/benchmark.
Lead-authored-by: DB Tsai <d_tsai@apple.com>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Co-authored-by: DB Tsai <dbtsai@dbtsai.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
### What changes were proposed in this pull request?
Current Spark SQL `SHOW FUNCTIONS` don't show `!=`, `<>`, `between`, `case`
But these expressions is truly functions. We should show it in SQL `SHOW FUNCTIONS`
### Why are the changes needed?
SHOW FUNCTIONS show '!=', '<>' , 'between', 'case'
### Does this PR introduce any user-facing change?
SHOW FUNCTIONS show '!=', '<>' , 'between', 'case'
### How was this patch tested?
UT
Closes#26053 from AngersZhuuuu/SPARK-29379.
Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR fixes the incorrect `EqualNullSafe` symbol in `sql-migration-guide.md`.
### Why are the changes needed?
Fix documentation error.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
N/A
Closes#26163 from wangyum/EqualNullSafe-symbol.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
The `date_part()` function can accept the `source` parameter of the `INTERVAL` type (`CalendarIntervalType`). The following values of the `field` parameter are supported:
- `"MILLENNIUM"` (`"MILLENNIA"`, `"MIL"`, `"MILS"`) - number of millenniums in the given interval. It is `YEAR / 1000`.
- `"CENTURY"` (`"CENTURIES"`, `"C"`, `"CENT"`) - number of centuries in the interval calculated as `YEAR / 100`.
- `"DECADE"` (`"DECADES"`, `"DEC"`, `"DECS"`) - decades in the `YEAR` part of the interval calculated as `YEAR / 10`.
- `"YEAR"` (`"Y"`, `"YEARS"`, `"YR"`, `"YRS"`) - years in a values of `CalendarIntervalType`. It is `MONTHS / 12`.
- `"QUARTER"` (`"QTR"`) - a quarter of year calculated as `MONTHS / 3 + 1`
- `"MONTH"` (`"MON"`, `"MONS"`, `"MONTHS"`) - the months part of the interval calculated as `CalendarInterval.months % 12`
- `"DAY"` (`"D"`, `"DAYS"`) - total number of days in `CalendarInterval.microseconds`
- `"HOUR"` (`"H"`, `"HOURS"`, `"HR"`, `"HRS"`) - the hour part of the interval.
- `"MINUTE"` (`"M"`, `"MIN"`, `"MINS"`, `"MINUTES"`) - the minute part of the interval.
- `"SECOND"` (`"S"`, `"SEC"`, `"SECONDS"`, `"SECS"`) - the seconds part with fractional microsecond part.
- `"MILLISECONDS"` (`"MSEC"`, `"MSECS"`, `"MILLISECON"`, `"MSECONDS"`, `"MS"`) - the millisecond part of the interval with fractional microsecond part.
- `"MICROSECONDS"` (`"USEC"`, `"USECS"`, `"USECONDS"`, `"MICROSECON"`, `"US"`) - the total number of microseconds in the `second`, `millisecond` and `microsecond` parts of the given interval.
- `"EPOCH"` - the total number of seconds in the interval including the fractional part with microsecond precision. Here we assume 365.25 days per year (leap year every four years).
For example:
```sql
> SELECT date_part('days', interval 1 year 10 months 5 days);
5
> SELECT date_part('seconds', interval 30 seconds 1 milliseconds 1 microseconds);
30.001001
```
### Why are the changes needed?
To maintain feature parity with PostgreSQL (https://www.postgresql.org/docs/11/functions-datetime.html#FUNCTIONS-DATETIME-EXTRACT)
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
- Added new test suite `IntervalExpressionsSuite`
- Add new test cases to `date_part.sql`
Closes#25981 from MaxGekk/extract-from-intervals.
Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
A followup of [#25295](https://github.com/apache/spark/pull/25295).
1) change the logWarning to logDebug in `OptimizeLocalShuffleReader`.
2) update the test to check whether query stage reuse can work well with local shuffle reader.
### Why are the changes needed?
make code robust
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
existing tests
Closes#26157 from JkSelf/followup-25295.
Authored-by: jiake <ke.a.jia@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
The handling of the catalog across plans should be as follows ([SPARK-29014](https://issues.apache.org/jira/browse/SPARK-29014)):
* The *current* catalog should be used when no catalog is specified
* The default catalog is the catalog *current* is initialized to
* If the *default* catalog is not set, then *current* catalog is the built-in Spark session catalog.
This PR addresses the issue where *current* catalog usage is not followed as describe above.
### Why are the changes needed?
It is a bug as described in the previous section.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Unit tests added.
Closes#26120 from imback82/cleanup_catalog.
Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Add `AnalyzeTableStatement` and `AnalyzeColumnStatement`, and make ANALYZE TABLE go through the same catalog/table resolution framework of v2 commands.
### Why are the changes needed?
It's important to make all the commands have the same table resolution behavior, to avoid confusing end-users. e.g.
```
USE my_catalog
DESC t // success and describe the table t from my_catalog
ANALYZE TABLE t // report table not found as there is no table t in the session catalog
```
### Does this PR introduce any user-facing change?
yes. When running ANALYZE TABLE, Spark fails the command if the current catalog is set to a v2 catalog, or the table name specified a v2 catalog.
### How was this patch tested?
new tests
Closes#26129 from cloud-fan/analyze-table.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
### What changes were proposed in this pull request?
`ml.MulticlassClassificationEvaluator` & `mllib.MulticlassMetrics` support log-loss
### Why are the changes needed?
log-loss is an important classification metric and is widely used in practice
### Does this PR introduce any user-facing change?
Yes, add new option ("logloss") and a related param `eps`
### How was this patch tested?
added testsuites & local tests refering to sklearn
Closes#26135 from zhengruifeng/logloss.
Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
### What changes were proposed in this pull request?
Add private _XXXParams classes for classification & regression
### Why are the changes needed?
To keep parity between scala and python
### Does this PR introduce any user-facing change?
Yes. Add gettters/setters for the following Model classes
```
LinearSVCModel:
get/setRegParam
get/setMaxIte
get/setFitIntercept
get/setTol
get/setStandardization
get/setWeightCol
get/setAggregationDepth
get/setThreshold
LogisticRegressionModel:
get/setRegParam
get/setElasticNetParam
get/setMaxIter
get/setFitIntercept
get/setTol
get/setStandardization
get/setWeightCol
get/setAggregationDepth
get/setThreshold
NaiveBayesModel:
get/setWeightCol
LinearRegressionModel:
get/setRegParam
get/setElasticNetParam
get/setMaxIter
get/setTol
get/setFitIntercept
get/setStandardization
get/setWeight
get/setSolver
get/setAggregationDepth
get/setLoss
GeneralizedLinearRegressionModel:
get/setFitIntercept
get/setMaxIter
get/setTol
get/setRegParam
get/setWeightCol
get/setSolver
```
### How was this patch tested?
Add a few doctest
Closes#26142 from huaxingao/spark-29381.
Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
### What changes were proposed in this pull request?
This patch proposes to delete old Hive external partition directory even the partition does not exist in Hive, when insert overwrite Hive external table partition.
### Why are the changes needed?
When insert overwrite to a Hive external table partition, if the partition does not exist, Hive will not check if the external partition directory exists or not before copying files. So if users drop the partition, and then do insert overwrite to the same partition, the partition will have both old and new data.
For example:
```scala
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") {
// test is an external Hive table.
sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 1")
sql("ALTER TABLE test DROP PARTITION(name='n1')")
sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 2")
sql("SELECT id FROM test WHERE name = 'n1' ORDER BY id") // Got both 1 and 2.
}
```
### Does this PR introduce any user-facing change?
Yes. This fix a correctness issue when users drop partition on a Hive external table partition and then insert overwrite it.
### How was this patch tested?
Added test.
Closes#25979 from viirya/SPARK-29295.
Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
In this change, we give preference to the original table's owner if it is not empty.
### Why are the changes needed?
When executing 'insert into/overwrite ...' DML, or 'alter table set tblproperties ...' DDL, spark would change the ownership of the table the one who runs the spark application.
### Does this PR introduce any user-facing change?
NO
### How was this patch tested?
Compare with the behavior of Apache Hive
Closes#26068 from yaooqinn/SPARK-29405.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### Why are the changes needed?
As mentioned in jira, sometimes we need to be able to support the retention of null columns when writing JSON.
For example, sparkmagic(used widely in jupyter with livy) will generate sql query results based on DataSet.toJSON and parse JSON to pandas DataFrame to display. If there is a null column, it is easy to have some column missing or even the query result is empty. The loss of the null column in the first row, may cause parsing exceptions or loss of entire column data.
### Does this PR introduce any user-facing change?
Example in spark-shell.
scala> spark.sql("select null as a, 1 as b").toJSON.collect.foreach(println)
{"b":1}
scala> spark.sql("set spark.sql.jsonGenerator.struct.ignore.null=false")
res2: org.apache.spark.sql.DataFrame = [key: string, value: string]
scala> spark.sql("select null as a, 1 as b").toJSON.collect.foreach(println)
{"a":null,"b":1}
### How was this patch tested?
Add new test to JacksonGeneratorSuite
Closes#26098 from stczwd/json.
Lead-authored-by: stczwd <qcsd2011@163.com>
Co-authored-by: Jackey Lee <qcsd2011@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
1. Regularize all the shuffle configurations related to adaptive execution.
2. Add default value for `BlockStoreShuffleReader.shouldBatchFetch`.
### Why are the changes needed?
It's a follow-up PR for #26040.
Regularize the existing `spark.sql.adaptive.shuffle` namespace in SQLConf.
### Does this PR introduce any user-facing change?
Rename one released user config `spark.sql.adaptive.minNumPostShufflePartitions` to `spark.sql.adaptive.shuffle.minNumPostShufflePartitions`, other changed configs is not released yet.
### How was this patch tested?
Existing UT.
Closes#26147 from xuanyuanking/SPARK-9853.
Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
change PySpark ml ```Params._clear``` to ```Params.clear```
### Why are the changes needed?
PySpark ML currently has a private _clear() method that will unset a param. This should be made public to match the Scala API and give users a way to unset a user supplied param.
### Does this PR introduce any user-facing change?
Yes. PySpark ml ```Params._clear``` ---> ```Params.clear```
### How was this patch tested?
Add test.
Closes#26130 from huaxingao/spark-29464.
Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
### What changes were proposed in this pull request?
JIRA: https://issues.apache.org/jira/browse/SPARK-28762
TL;DR: Automatically read the `Main-Class` from a JAR's manifest even if the JAR isn't in the local file system (i.e. in S3 or HDFS).
### Why are the changes needed?
When deploying a fat JAR (e.g. using `sbt-assembly`) to S3/HDFS, users might choose to include the main class for the JAR in its manifest. This change allows the user to `spark-submit` the JAR without having to specify the main class again via the `--class` argument.
### Does this PR introduce any user-facing change?
Yes. Previously, if the primary resource is a JAR and isn't located in the local file system, it will fail with the error:
```
$ spark-submit s3a://nonexistent.jar
Exception in thread "main" org.apache.spark.SparkException: Cannot load main class from JAR s3a://nonexistent.jar with URI s3a. Please specify a class through --class.
...
```
With this PR, the main class will be read from the manifest, assuming the classpath contains the appropriate JAR to read the file system.
### How was this patch tested?
Added some tests in `core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala`.
Closes#25910 from igozali/SPARK-28762.
Authored-by: Ivan Gozali <gozaliivan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
### What changes were proposed in this pull request?
Updated kubernetes client.
### Why are the changes needed?
https://issues.apache.org/jira/browse/SPARK-27812https://issues.apache.org/jira/browse/SPARK-27927
We need this fix https://github.com/fabric8io/kubernetes-client/pull/1768 that was released on version 4.6 of the client. The root cause of the problem is better explained in https://github.com/apache/spark/pull/25785
### Does this PR introduce any user-facing change?
Nope, it should be transparent to users
### How was this patch tested?
This patch was tested manually using a simple pyspark job
```python
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession.builder.getOrCreate()
```
The expected behaviour of this "job" is that both python's and jvm's process exit automatically after the main runs. This is the case for spark versions <= 2.4. On version 2.4.3, the jvm process hangs because there's a non daemon thread running
```
"OkHttp WebSocket https://10.96.0.1/..." #121 prio=5 os_prio=0 tid=0x00007fb27c005800 nid=0x24b waiting on condition [0x00007fb300847000]
"OkHttp WebSocket https://10.96.0.1/..." #117 prio=5 os_prio=0 tid=0x00007fb28c004000 nid=0x247 waiting on condition [0x00007fb300e4b000]
```
This is caused by a bug on `kubernetes-client` library, which is fixed on the version that we are upgrading to.
When the mentioned job is run with this patch applied, the behaviour from spark <= 2.4.3 is restored and both processes terminate successfully
Closes#26093 from igorcalabria/k8s-client-update.
Authored-by: igor.calabria <igor.calabria@ubee.in>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This patch is a part of [SPARK-28594](https://issues.apache.org/jira/browse/SPARK-28594) and design doc for SPARK-28594 is linked here: https://docs.google.com/document/d/12bdCC4nA58uveRxpeo8k7kGOI2NRTXmXyBOweSi4YcY/edit?usp=sharing
This patch proposes adding new feature to event logging, rolling event log files via configured file size.
Previously event logging is done with single file and related codebase (`EventLoggingListener`/`FsHistoryProvider`) is tightly coupled with it. This patch adds layer on both reader (`EventLogFileReader`) and writer (`EventLogFileWriter`) to decouple implementation details between "handling events" and "how to read/write events from/to file".
This patch adds two properties, `spark.eventLog.rollLog` and `spark.eventLog.rollLog.maxFileSize` which provides configurable behavior of rolling log. The feature is disabled by default, as we only expect huge event log for huge/long-running application. For other cases single event log file would be sufficient and still simpler.
### Why are the changes needed?
This is a part of SPARK-28594 which addresses event log growing infinitely for long-running application.
This patch itself also provides some option for the situation where event log file gets huge and consume their storage. End users may give up replaying their events and want to delete the event log file, but given application is still running and writing the file, it's not safe to delete the file. End users will be able to delete some of old files after applying rolling over event log.
### Does this PR introduce any user-facing change?
No, as the new feature is turned off by default.
### How was this patch tested?
Added unit tests, as well as basic manual tests.
Basic manual tests - ran SHS, ran structured streaming query with roll event log enabled, verified split files are generated as well as SHS can load these files, with handling app status as incomplete/complete.
Closes#25670 from HeartSaVioR/SPARK-28869.
Lead-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
The current RPC backend in Spark supports single- and multi-threaded
message delivery to endpoints, but they all share the same underlying
thread pool. So an RPC endpoint that blocks a dispatcher thread can
negatively affect other endpoints.
This can be more pronounced with configurations that limit the number
of RPC dispatch threads based on configuration and / or running
environment. And exposing the RPC layer to other code (for example
with something like SPARK-29396) could make it easy to affect normal
Spark operation with a badly written RPC handler.
This change adds a new RPC endpoint type that tells the RPC env to
create dedicated dispatch threads, so that those effects are minimised.
Other endpoints will still need CPU to process their messages, but
they won't be able to actively block the dispatch thread of these
isolated endpoints.
As part of the change, I've changed the most important Spark endpoints
(the driver, executor and block manager endpoints) to be isolated from
others. This means a couple of extra threads are created on the driver
and executor for these endpoints.
Tested with existing unit tests, which hammer the RPC system extensively,
and also by running applications on a cluster (with a prototype of
SPARK-29396).
Closes#26059 from vanzin/SPARK-29398.
Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
### What changes were proposed in this pull request?
Support executor for selecting scheduler through scheduler name in the case of k8s multi-scheduler scenario.
### Why are the changes needed?
If there is no such function, spark can not support the case of k8s multi-scheduler scenario.
### Does this PR introduce any user-facing change?
Yes, users can add scheduler name through configuration.
### How was this patch tested?
Manually tested with spark + k8s cluster
Closes#26088 from merrily01/SPARK-29436.
Authored-by: maruilei <maruilei@jd.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
This PR proposes a few typos:
1. Sparks => Spark's
2. parallize => parallelize
3. doesnt => doesn't
Closes#26140 from plusplusjiajia/fix-typos.
Authored-by: Jiajia Li <jiajia.li@intel.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
BIT_COUNT(N) - Returns the number of bits that are set in the argument N as an unsigned 64-bit integer, or NULL if the argument is NULL
### Why are the changes needed?
Supported by MySQL,Microsoft SQL Server ,etc.
### Does this PR introduce any user-facing change?
add a built-in function
### How was this patch tested?
add uts
Closes#26139 from yaooqinn/SPARK-29491.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This PR takes over #19788. After we split the shuffle fetch protocol from `OpenBlock` in #24565, this optimization can be extended in the new shuffle protocol. Credit to yucai, closes#19788.
### What changes were proposed in this pull request?
This PR adds the support for continuous shuffle block fetching in batch:
- Shuffle client changes:
- Add new feature tag `spark.shuffle.fetchContinuousBlocksInBatch`, implement the decision logic in `BlockStoreShuffleReader`.
- Merge the continuous shuffle block ids in batch if needed in ShuffleBlockFetcherIterator.
- Shuffle server changes:
- Add support in `ExternalBlockHandler` for the external shuffle service side.
- Make `ShuffleBlockResolver.getBlockData` accept getting block data by range.
- Protocol changes:
- Add new block id type `ShuffleBlockBatchId` represent continuous shuffle block ids.
- Extend `FetchShuffleBlocks` and `OneForOneBlockFetcher`.
- After the new shuffle fetch protocol completed in #24565, the backward compatibility for external shuffle service can be controlled by `spark.shuffle.useOldFetchProtocol`.
### Why are the changes needed?
In adaptive execution, one reducer may fetch multiple continuous shuffle blocks from one map output file. However, as the original approach, each reducer needs to fetch those 10 reducer blocks one by one. This way needs many IO and impacts performance. This PR is to support fetching those continuous shuffle blocks in one IO (batch way). See below example:
The shuffle block is stored like below:
![image](https://user-images.githubusercontent.com/2989575/51654634-c37fbd80-1fd3-11e9-935e-5652863676c3.png)
The ShuffleId format is s"shuffle_$shuffleId_$mapId_$reduceId", referring to BlockId.scala.
In adaptive execution, one reducer may want to read output for reducer 5 to 14, whose block Ids are from shuffle_0_x_5 to shuffle_0_x_14.
Before this PR, Spark needs 10 disk IOs + 10 network IOs for each output file.
After this PR, Spark only needs 1 disk IO and 1 network IO. This way can reduce IO dramatically.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Add new UT.
Integrate test with setting `spark.sql.adaptive.enabled=true`.
Closes#26040 from xuanyuanking/SPARK-9853.
Lead-authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Co-authored-by: yucai <yyu1@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
When adaptive execution is enabled, the Spark users who connected from JDBC always get adaptive execution error whatever the under root cause is. It's very confused. We have to check the driver log to find out why.
```shell
0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v;
SELECT * FROM testData join testData2 ON key = v;
Error: Error running query: org.apache.spark.SparkException: Adaptive execution failed due to stage materialization failures. (state=,code=0)
0: jdbc:hive2://localhost:10000>
```
For example, a job queried from JDBC failed due to HDFS missing block. User still get the error message `Adaptive execution failed due to stage materialization failures`.
The easiest way to reproduce is changing the code of `AdaptiveSparkPlanExec`, to let it throws out an exception when it faces `StageSuccess`.
```scala
case class AdaptiveSparkPlanExec(
events.drainTo(rem)
(Seq(nextMsg) ++ rem.asScala).foreach {
case StageSuccess(stage, res) =>
// stage.resultOption = Some(res)
val ex = new SparkException("Wrapper Exception",
new IllegalArgumentException("Root cause is IllegalArgumentException for Test"))
errors.append(
new SparkException(s"Failed to materialize query stage: ${stage.treeString}", ex))
case StageFailure(stage, ex) =>
errors.append(
new SparkException(s"Failed to materialize query stage: ${stage.treeString}", ex))
```
### Why are the changes needed?
To make the error message more user-friend and more useful for query from JDBC.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Manually test query:
```shell
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY VIEW testData (key, value) AS SELECT explode(array(1, 2, 3, 4)), cast(substring(rand(), 3, 4) as string);
CREATE TEMPORARY VIEW testData (key, value) AS SELECT explode(array(1, 2, 3, 4)), cast(substring(rand(), 3, 4) as string);
+---------+--+
| Result |
+---------+--+
+---------+--+
No rows selected (0.225 seconds)
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY VIEW testData2 (k, v) AS SELECT explode(array(1, 1, 2, 2)), cast(substring(rand(), 3, 4) as int);
CREATE TEMPORARY VIEW testData2 (k, v) AS SELECT explode(array(1, 1, 2, 2)), cast(substring(rand(), 3, 4) as int);
+---------+--+
| Result |
+---------+--+
+---------+--+
No rows selected (0.043 seconds)
```
Before:
```shell
0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v;
SELECT * FROM testData join testData2 ON key = v;
Error: Error running query: org.apache.spark.SparkException: Adaptive execution failed due to stage materialization failures. (state=,code=0)
0: jdbc:hive2://localhost:10000>
```
After:
```shell
0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v;
SELECT * FROM testData join testData2 ON key = v;
Error: Error running query: java.lang.IllegalArgumentException: Root cause is IllegalArgumentException for Test (state=,code=0)
0: jdbc:hive2://localhost:10000>
```
Closes#25960 from LantaoJin/SPARK-29283.
Authored-by: lajin <lajin@ebay.com>
Signed-off-by: Yuming Wang <wgyumg@gmail.com>
### What changes were proposed in this pull request?
Release blog: https://medium.com/cowtowncoder/jackson-2-10-features-cd880674d8a2
Fixes the following CVE's:
https://www.cvedetails.com/cve/CVE-2019-16942/https://www.cvedetails.com/cve/CVE-2019-16943/
Looking back, there were 3 major goals for this minor release:
- Resolve the growing problem of “endless CVE patches”, a stream of fixes for reported CVEs related to “Polymorphic Deserialization” problem (described in “On Jackson CVEs… ”) that resulted in security tools forcing Jackson upgrades. 2.10 now includes “Safe Default Typing” that is hoped to resolve this problem.
- Evolve 2.x API towards 3.0, based on changes that were done in master, within limits of 2.x API backwards-compatibility requirements.
- Add JDK support for versions beyond Java 8: specifically add“module-info.class” for JDK9+, defining proper module definitions for Jackson components
Full changelog: https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.10
Improved Scala 2.13 support: https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.10#scala
### Why are the changes needed?
Patches CVE's reported by the vulnerability scanner.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Ran `mvn clean install -DskipTests` locally.
Closes#26131 from Fokko/SPARK-29483.
Authored-by: Fokko Driesprong <fokko@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
bool_or(x) <=> any/some(x) <=> max(x)
bool_and(x) <=> every(x) <=> min(x)
Args:
x: boolean
### Why are the changes needed?
PostgreSQL, Presto and Vertica, etc also support this feature:
### Does this PR introduce any user-facing change?
add new functions support
### How was this patch tested?
add ut
Closes#26126 from yaooqinn/SPARK-27880.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Proposed new expression `SubtractDates` which is used in `date1` - `date2`. It has the `INTERVAL` type, and returns the interval from `date1` (inclusive) and `date2` (exclusive). For example:
```sql
> select date'tomorrow' - date'yesterday';
interval 2 days
```
Closes#26034
### Why are the changes needed?
- To conform the SQL standard which states the result type of `date operand 1` - `date operand 2` must be the interval type. See [4.5.3 Operations involving datetimes and intervals](http://www.contrib.andrew.cmu.edu/~shadow/sql/sql1992.txt).
- Improve Spark SQL UX and allow mixing date and timestamp in subtractions. For example: `select timestamp'now' + (date'2019-10-01' - date'2019-09-15')`
### Does this PR introduce any user-facing change?
Before the query below returns number of days:
```sql
spark-sql> select date'2019-10-05' - date'2018-09-01';
399
```
After it returns an interval:
```sql
spark-sql> select date'2019-10-05' - date'2018-09-01';
interval 1 years 1 months 4 days
```
### How was this patch tested?
- by new tests in `DateExpressionsSuite` and `TypeCoercionSuite`.
- by existing tests in `date.sql`
Closes#26112 from MaxGekk/date-subtract.
Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Yuming Wang <wgyumg@gmail.com>
### What changes were proposed in this pull request?
Change Literal.sql to output CAST('fpValue' AS FLOAT) instead of CAST(fpValue AS FLOAT) as the SQL for a floating point literal.
### Why are the changes needed?
The old version doesn't work for very small floating point numbers; the value will fail to parse if it doesn't fit in a DECIMAL(38).
This doesn't apply to doubles because they have special literal syntax.
### Does this PR introduce any user-facing change?
Not really.
### How was this patch tested?
New unit tests.
Closes#26114 from jose-torres/fpliteral.
Authored-by: Jose Torres <joseph.torres@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Binarizer support multi-column by extending `HasInputCols`/`HasOutputCols`/`HasThreshold`/`HasThresholds`
### Why are the changes needed?
similar algs in `ml.feature` already support multi-column, like `Bucketizer`/`StringIndexer`/`QuantileDiscretizer`
### Does this PR introduce any user-facing change?
yes, add setter/getter of `thresholds`/`inputCols`/`outputCols`
### How was this patch tested?
added suites
Closes#26064 from zhengruifeng/binarizer_multicols.
Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
### What changes were proposed in this pull request?
Support FETCH_PRIOR fetching in Thriftserver, and report correct fetch start offset it TFetchResultsResp.results.startRowOffset
The semantics of FETCH_PRIOR are as follow: Assuming the previous fetch returned a block of rows from offsets [10, 20)
* calling FETCH_PRIOR(maxRows=5) will scroll back and return rows [5, 10)
* calling FETCH_PRIOR(maxRows=10) again, will scroll back, but can't go earlier than 0. It will nevertheless return 10 rows, returning rows [0, 10) (overlapping with the previous fetch)
* calling FETCH_PRIOR(maxRows=4) again will again return rows starting from offset 0 - [0, 4)
* calling FETCH_NEXT(maxRows=6) after that will move the cursor forward and return rows [4, 10)
##### Client/server backwards/forwards compatibility:
Old driver with new server:
* Drivers that don't support FETCH_PRIOR will not attempt to use it
* Field TFetchResultsResp.results.startRowOffset was not set, old drivers don't depend on it.
New driver with old server
* Using an older thriftserver with FETCH_PRIOR will make the thriftserver return unsupported operation error. The driver can then recognize that it's an old server.
* Older thriftserver will return TFetchResultsResp.results.startRowOffset=0. If the client driver receives 0, it can know that it can not rely on it as correct offset. If the client driver intentionally wants to fetch from 0, it can use FETCH_FIRST.
### Why are the changes needed?
It's intended to be used to recover after connection errors. If a client lost connection during fetching (e.g. of rows [10, 20)), and wants to reconnect and continue, it could not know whether the request got lost before reaching the server, or on the response back. When it issued another FETCH_NEXT(10) request after reconnecting, because TFetchResultsResp.results.startRowOffset was not set, it could not know if the server will return rows [10,20) (because the previous request didn't reach it) or rows [20, 30) (because it returned data from the previous request but the connection got broken on the way back). Now, with TFetchResultsResp.results.startRowOffset the client can know after reconnecting which rows it is getting, and use FETCH_PRIOR to scroll back if a fetch block was lost in transmission.
Driver should always use FETCH_PRIOR after a broken connection.
* If the Thriftserver returns unsuported operation error, the driver knows that it's an old server that doesn't support it. The driver then must error the query, as it will also not support returning the correct startRowOffset, so the driver cannot reliably guarantee if it hadn't lost any rows on the fetch cursor.
* If the driver gets a response to FETCH_PRIOR, it should also have a correctly set startRowOffset, which the driver can use to position itself back where it left off before the connection broke.
* If FETCH_NEXT was used after a broken connection on the first fetch, and returned with an startRowOffset=0, then the client driver can't know if it's 0 because it's the older server version, or if it's genuinely 0. Better to call FETCH_PRIOR, as scrolling back may anyway be possibly required after a broken connection.
This way it is implemented in a backwards/forwards compatible way, and doesn't require bumping the protocol version. FETCH_ABSOLUTE might have been better, but that would require a bigger protocol change, as there is currently no field to specify the requested absolute offset.
### Does this PR introduce any user-facing change?
ODBC/JDBC drivers connecting to Thriftserver may now implement using the FETCH_PRIOR fetch order to scroll back in query results, and check TFetchResultsResp.results.startRowOffset if their cursor position is consistent after connection errors.
### How was this patch tested?
Added tests to HiveThriftServer2Suites
Closes#26014 from juliuszsompolski/SPARK-29349.
Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Yuming Wang <wgyumg@gmail.com>
### What changes were proposed in this pull request?
This PR aims to update the validation check on `length` from `length >= 0` to `length >= -1` in order to allow set `-1` to keep the default value.
### Why are the changes needed?
At Apache Spark 2.2.0, [SPARK-18702](https://github.com/apache/spark/pull/16133/files#diff-2c5519b1cf4308d77d6f12212971544fR27-R38) adds `class FileBlock` with the default `length` value, `-1`, initially.
There is no way to set `filePath` only while keeping `length` is `-1`.
```scala
def set(filePath: String, startOffset: Long, length: Long): Unit = {
require(filePath != null, "filePath cannot be null")
require(startOffset >= 0, s"startOffset ($startOffset) cannot be negative")
require(length >= 0, s"length ($length) cannot be negative")
inputBlock.set(new FileBlock(UTF8String.fromString(filePath), startOffset, length))
}
```
For compressed files (like GZ), the size of split can be set to -1. This was allowed till Spark 2.1 but regressed starting with spark 2.2.x. Please note that split length of -1 also means the length was unknown - a valid scenario. Thus, split length of -1 should be acceptable like pre Spark 2.2.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
This is updating the corner case on the requirement check. Manually check the code.
Closes#26123 from praneetsharma/fix-SPARK-27259.
Authored-by: prasha2 <prasha22@mail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
When ExternalBlockStoreClient was closed, retries from RetryingBlockFetcher will cause NPE. This proposes to skip retries by RetryingBlockFetcher when ExternalBlockStoreClient is closed.
### Why are the changes needed?
When ExternalBlockStoreClient was closed, retries from RetryingBlockFetcher will cause NPE:
```
2019-10-14 20:06:16 ERROR RetryingBlockFetcher:143 - Exception while beginning fetch of 2 outstanding blocks (after 3 retries)
java.lang.NullPointerException
at org.apache.spark.network.shuffle.ExternalShuffleClient.lambda$fetchBlocks$0(ExternalShuffleClient.java:100)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.lambda$initiateRetry$0(RetryingBlockFetcher.java:169)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
```
It was happened after BlockManager and ExternalBlockStoreClient was closed due to previous errors. In this cases, RetryingBlockFetcher does not need to retry. This NPE is harmless for job execution, but is a source of misleading when looking at log. Especially for end-users.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Existing tests.
Closes#26115 from viirya/SPARK-29469.
Lead-authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
A followup of https://github.com/apache/spark/pull/25295
This PR proposes a few code cleanups:
1. rename the special `getMapSizesByExecutorId` to `getMapSizesByMapIndex`
2. rename the parameter `mapId` to `mapIndex` as that's really a mapper index.
3. `BlockStoreShuffleReader` should take `blocksByAddress` directly instead of a map id.
4. rename `getMapReader` to `getReaderForOneMapper` to be more clearer.
### Why are the changes needed?
make code easier to understand
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
existing tests
Closes#26128 from cloud-fan/followup.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>