What changes were proposed in this pull request?
1.Add class info output in org.apache.spark.ml.util.SchemaUtils#checkColumnType to distinct Vectors in ml and mllib
2.Add unit test
Why are the changes needed?
the catalogString doesn't distinguish Vectors in ml and mllib when mllib vector misused in ml
https://issues.apache.org/jira/browse/SPARK-31400
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test is added
Closes#28347 from TJX2014/master-catalogString-distinguish-Vectors-in-ml-and-mllib.
Authored-by: TJX2014 <xiaoxingstack@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
1. Remove console.log(), which seems unnecessary in the releases.
2. Replace the double equals to triple equals
3. Reuse jquery selector.
### Why are the changes needed?
For better code quality.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Existing tests + manual test.
Closes#28333 from gengliangwang/removeLog.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
### What changes were proposed in this pull request?
Fix flakiness by checking `1970/01/01` instead of `1970`.
The test was added by SPARK-27125 for 3.0.0.
### Why are the changes needed?
the `org.apache.spark.sql.execution.ui.AllExecutionsPageSuite.SPARK-27019:correctly display SQL page when event reordering happens` test is flaky for just checking the `html` content not containing 1970. I will add a ticket to check and fix that.
In the specific failure https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121799/testReport, it failed because the `html`
```
...
<td sorttable_customkey="1587806019707">
...
```
contained `1970`.
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
passing jenkins
Closes#28344 from yaooqinn/SPARK-31564.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
In the PR, I propose to fix the `InSet.sql` method for the cases when input collection contains values of internal Catalyst's types, for instance `UTF8String`. Elements of the input set `hset` are converted to Scala types, and wrapped by `Literal` to properly form SQL view of the input collection.
### Why are the changes needed?
The changes fixed the bug in `InSet.sql` that makes wrong assumption about types of collection elements. See more details in SPARK-31563.
### Does this PR introduce any user-facing change?
Highly likely, not.
### How was this patch tested?
Added a test to `ColumnExpressionSuite`
Closes#28343 from MaxGekk/fix-InSet-sql.
Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Fix the wrong fetch size.
### Why are the changes needed?
The fetch size should be the sum of the size of merged block and the total size of those merging blocks. But we missed the size of merged block.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Added a regression test.
Closes#28301 from Ngone51/fix_merged_block_size.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR proposes to remove the non-existed `hiveClientCalls.count` metric documentation of `CodeGenerator` of the Spark metrics system in the monitoring guide.
There is a duplicated `hiveClientCalls.count` metric in both `namespace=HiveExternalCatalog` and `namespace=CodeGenerator` bullet lists, but there is only one defined inside object `HiveCatalogMetrics`.
Closes#28292 from wezhang/monitoringdoc.
Authored-by: Wei Zhang <wezhang@outlook.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Add V1/V2 tests for TextSuite and WholeTextFileSuite
### Why are the changes needed?
This is missing part since #24207. We should have these tests for test coverage.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit tests.
Closes#28335 from gengliangwang/testV2Suite.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
the 2 method `arrayClassFor` and `dataTypeFor` in `ScalaReflection` call each other circularly, the cases in `dataTypeFor` are not fully handled in `arrayClassFor`
For example:
```scala
scala> implicit def newArrayEncoder[T <: Array[_] : TypeTag]: Encoder[T] = ExpressionEncoder()
newArrayEncoder: [T <: Array[_]](implicit evidence$1: reflect.runtime.universe.TypeTag[T])org.apache.spark.sql.Encoder[T]
scala> val decOne = Decimal(1, 38, 18)
decOne: org.apache.spark.sql.types.Decimal = 1E-18
scala> val decTwo = Decimal(2, 38, 18)
decTwo: org.apache.spark.sql.types.Decimal = 2E-18
scala> val decSpark = Array(decOne, decTwo)
decSpark: Array[org.apache.spark.sql.types.Decimal] = Array(1E-18, 2E-18)
scala> Seq(decSpark).toDF()
java.lang.ClassCastException: org.apache.spark.sql.types.DecimalType cannot be cast to org.apache.spark.sql.types.ObjectType
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$arrayClassFor$1(ScalaReflection.scala:131)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:879)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:878)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
at org.apache.spark.sql.catalyst.ScalaReflection$.arrayClassFor(ScalaReflection.scala:120)
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$dataTypeFor$1(ScalaReflection.scala:105)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:879)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:878)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
at org.apache.spark.sql.catalyst.ScalaReflection$.dataTypeFor(ScalaReflection.scala:88)
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerForType$1(ScalaReflection.scala:399)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:879)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:878)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerForType(ScalaReflection.scala:393)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:57)
at newArrayEncoder(<console>:57)
... 53 elided
scala>
```
In this PR, we add the missing cases to `arrayClassFor`
### Why are the changes needed?
bugfix as described above
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
add a test for array encoders
Closes#28324 from yaooqinn/SPARK-31552.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This is a followup PR discussed [here](https://github.com/apache/spark/pull/28215#discussion_r410748547).
### Why are the changes needed?
It would be good to re-enable `DB2IntegrationSuite` and upgrade the docker image inside to use the latest.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing docker integration tests.
Closes#28325 from gaborgsomogyi/SPARK-31533.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
SparkSessionBuilder shoud not propagate static sql configurations to the existing active/default SparkSession
This seems a long-standing bug.
```scala
scala> spark.sql("set spark.sql.warehouse.dir").show
+--------------------+--------------------+
| key| value|
+--------------------+--------------------+
|spark.sql.warehou...|file:/Users/kenty...|
+--------------------+--------------------+
scala> spark.sql("set spark.sql.warehouse.dir=2");
org.apache.spark.sql.AnalysisException: Cannot modify the value of a static config: spark.sql.warehouse.dir;
at org.apache.spark.sql.RuntimeConfig.requireNonStaticConf(RuntimeConfig.scala:154)
at org.apache.spark.sql.RuntimeConfig.set(RuntimeConfig.scala:42)
at org.apache.spark.sql.execution.command.SetCommand.$anonfun$x$7$6(SetCommand.scala:100)
at org.apache.spark.sql.execution.command.SetCommand.run(SetCommand.scala:156)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
... 47 elided
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").get
getClass getOrCreate
scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").getOrCreate
20/04/23 23:49:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
res7: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession6403d574
scala> spark.sql("set spark.sql.warehouse.dir").show
+--------------------+-----+
| key|value|
+--------------------+-----+
|spark.sql.warehou...| xyz|
+--------------------+-----+
scala>
OptionsAttachments
```
### Why are the changes needed?
bugfix as shown in the previous section
### Does this PR introduce any user-facing change?
Yes, static SQL configurations with SparkSession.builder.config do not propagate to any existing or new SparkSession instances.
### How was this patch tested?
new ut.
Closes#28316 from yaooqinn/SPARK-31532.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
This PR aims to add a benchmark suite for nested predicate pushdown with parquet file:
Performance comparison: Nested predicate pushdown disabled vs enabled, with the following queries scenarios:
1. When predicate pushed down, parquet reader are able to filter out all the row groups without loading them.
2. When predicate pushed down, parquet reader only loads one of the row groups.
3. When predicate pushed down, parquet reader can't filter out any row group in order to see if we introduce too much overhead or not when enabling nested predicate push down.
### Why are the changes needed?
No benchmark exists today for nested fields predicate pushdown performance evaluation.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Benchmark runs and reporting result.
Closes#28319 from JiJiTang/SPARK-31364.
Authored-by: Jian Tang <jian_tang@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
### What changes were proposed in this pull request?
After changes in SPARK-20628, CoarseGrainedSchedulerBackend can decommission an executor and stop assigning new tasks on it. We should also decommission the corresponding blockmanagers in the same way. i.e. Move the cached RDD blocks from those executors to other active executors.
### Why are the changes needed?
We need to gracefully decommission the block managers so that the underlying RDD cache blocks are not lost in case the executors are taken away forcefully after some timeout (because of spotloss/pre-emptible VM etc). Its good to save as much cache data as possible.
Also In future once the decommissioning signal comes from Cluster Manager (say YARN/Mesos etc), dynamic allocation + this change gives us opportunity to downscale the executors faster by making the executors free of cache data.
Note that this is a best effort approach. We try to move cache blocks from decommissioning executors to active executors. If the active executors don't have free resources available on them for caching, then the decommissioning executors will keep the cache block which it was not able to move and it will still be able to serve them.
Current overall Flow:
1. CoarseGrainedSchedulerBackend receives a signal to decommissionExecutor. On receiving the signal, it do 2 things - Stop assigning new tasks (SPARK-20628), Send another message to BlockManagerMasterEndpoint (via BlockManagerMaster) to decommission the corresponding BlockManager.
2. BlockManagerMasterEndpoint receives "DecommissionBlockManagers" message. On receiving this, it moves the corresponding block managers to "decommissioning" state. All decommissioning BMs are excluded from the getPeers RPC call which is used for replication. All these decommissioning BMs are also sent message from BlockManagerMasterEndpoint to start decommissioning process on themselves.
3. BlockManager on worker (say BM-x) receives the "DecommissionBlockManager" message. Now it will start BlockManagerDecommissionManager thread to offload all the RDD cached blocks. This thread can make multiple reattempts to decommission the existing cache blocks (multiple reattempts might be needed as there might not be sufficient space in other active BMs initially).
### Does this PR introduce any user-facing change?
NO
### How was this patch tested?
Added UTs.
Closes#27864 from prakharjain09/SPARK-20732-rddcache-1.
Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
### What changes were proposed in this pull request?
apply Lemma 1 in [Using the Triangle Inequality to Accelerate K-Means](https://www.aaai.org/Papers/ICML/2003/ICML03-022.pdf):
> Let x be a point, and let b and c be centers. If d(b,c)>=2d(x,b) then d(x,c) >= d(x,b);
It can be directly applied in EuclideanDistance, but not in CosineDistance.
However, for CosineDistance we can luckily get a variant in the space of radian/angle.
### Why are the changes needed?
It help improving the performance of prediction and training (mostly)
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
existing testsuites
Closes#27758 from zhengruifeng/km_triangle.
Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
`LIKE ANY/SOME` and `LIKE ALL` operators are mostly used when we are matching a text field with numbers of patterns. For example:
Teradata / Hive 3.0 / Snowflake:
```sql
--like any
select 'foo' LIKE ANY ('%foo%','%bar%');
--like all
select 'foo' LIKE ALL ('%foo%','%bar%');
```
PostgreSQL:
```sql
-- like any
select 'foo' LIKE ANY (array['%foo%','%bar%']);
-- like all
select 'foo' LIKE ALL (array['%foo%','%bar%']);
```
This PR add support these two operators.
More details:
https://docs.teradata.com/reader/756LNiPSFdY~4JcCCcR5Cw/4~AyrPNmDN0Xk4SALLo6aQhttps://issues.apache.org/jira/browse/HIVE-15229https://docs.snowflake.net/manuals/sql-reference/functions/like_any.html
### Why are the changes needed?
To smoothly migrate SQLs to Spark SQL.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit test.
Closes#27477 from wangyum/SPARK-30724.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
Give more friendly warning message/migration guide of deprecated scala udf to users.
### Why are the changes needed?
User can not distinguish function signature between typed and untyped scala udf. Instead, we shall tell user what to do directly.
### Does this PR introduce any user-facing change?
No, it's newly added in Spark 3.0.
### How was this patch tested?
Pass Jenkins.
Closes#28311 from Ngone51/update_udf_doc.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Use `dagScheduler.taskSetFailed` to abort a barrier stage instead of throwing exception within `resourceOffers`.
### Why are the changes needed?
Any non fatal exception thrown within Spark RPC framework can be swallowed:
100fc58da5/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala (L202-L211)
The method `TaskSchedulerImpl.resourceOffers` is also within the scope of Spark RPC framework. Thus, throw exception inside `resourceOffers` won't fail the application.
As a result, if a barrier stage fail the require check at `require(addressesWithDescs.size == taskSet.numTasks, ...)`, the barrier stage will fail the check again and again util all tasks from `TaskSetManager` dequeued. But since the barrier stage isn't really executed, the application will hang.
The issue can be reproduced by the following test:
```scala
initLocalClusterSparkContext(2)
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"),Seq("executor_h_0")))
rdd.barrier().mapPartitions { iter =>
BarrierTaskContext.get().barrier()
iter
}.collect()
```
### Does this PR introduce any user-facing change?
Yes, application hang previously but fail-fast after this fix.
### How was this patch tested?
Added a regression test.
Closes#28257 from Ngone51/fix_barrier_abort.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This patch adds some log messages to log elapsed time for "compact" operation in FileStreamSourceLog and FileStreamSinkLog (added in CompactibleFileStreamLog) to help investigating the mysterious latency spike during the batch run.
### Why are the changes needed?
Tracking latency is a critical aspect of streaming query. While "compact" operation may bring nontrivial latency (it's even synchronous, adding all the latency to the batch run), it's not measured and end users have to guess.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
N/A for UT. Manual test with streaming query using file source & file sink.
> grep "for compact batch" <driver log>
```
...
20/02/20 19:27:36 WARN FileStreamSinkLog: Compacting took 24473 ms (load: 14185 ms, write: 10288 ms) for compact batch 21359
20/02/20 19:27:39 WARN FileStreamSinkLog: Loaded 1068000 entries (397985432 bytes in memory), and wrote 1068000 entries for compact batch 21359
20/02/20 19:29:52 WARN FileStreamSourceLog: Compacting took 3777 ms (load: 1524 ms, write: 2253 ms) for compact batch 21369
20/02/20 19:29:52 WARN FileStreamSourceLog: Loaded 229477 entries (68970112 bytes in memory), and wrote 229477 entries for compact batch 21369
20/02/20 19:30:17 WARN FileStreamSinkLog: Compacting took 24183 ms (load: 12992 ms, write: 11191 ms) for compact batch 21369
20/02/20 19:30:20 WARN FileStreamSinkLog: Loaded 1068500 entries (398171880 bytes in memory), and wrote 1068500 entries for compact batch 21369
...
```
![Screen Shot 2020-02-21 at 12 34 22 PM](https://user-images.githubusercontent.com/1317309/75002142-c6830100-54a6-11ea-8da6-17afb056653b.png)
This messages are explaining why the operation duration peaks per every 10 batches which is compact interval. Latency from addBatch heavily increases in each peak which DOES NOT mean it takes more time to write outputs, but we have no idea if such message is not presented.
NOTE: The output may be a bit different from the code, as it may be changed a bit during review phase.
Closes#27557 from HeartSaVioR/SPARK-30804.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR aims to upgrade Genjavadoc to 0.16.
### Why are the changes needed?
Although we skipped Scala 2.12.11, this brings 2.12.11 official support and better 2.12.12 compatibility.
- https://github.com/lightbend/genjavadoc/commits/v0.16
### Does this PR introduce any user-facing change?
No. (The generated doc is the same)
### How was this patch tested?
Build with 0.15 and 0.16.
```
$ SKIP_PYTHONDOC=1 SKIP_RDOC=1 SKIP_SQLDOC=1 jekyll build
```
Compare the result. The generated doc is identical.
```
$ diff -r _site_0.15 _site_0.16 | grep -v '^diff -r' | grep -v 'Generated by javadoc' | sort | uniq
---
5c5
```
Closes#28321 from dongjoon-hyun/SPARK-31547.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
### What changes were proposed in this pull request?
1. Modified `ParquetFilters.valueCanMakeFilterOn()` to accept filters with `java.time.LocalDate` attributes.
2. Modified `ParquetFilters.dateToDays()` to support both types `java.sql.Date` and `java.time.LocalDate` in conversions to days.
3. Add implicit conversion from `LocalDate` to `Expression` (`Literal`).
### Why are the changes needed?
To support pushed down filters with `java.time.LocalDate` attributes. Before the changes, date filters are not pushed down to Parquet datasource when `spark.sql.datetime.java8API.enabled` is `true`.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Added a test to `ParquetFilterSuite`
Closes#28259 from MaxGekk/parquet-filter-java8-date-time.
Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR intends to add a new test suite for `ExpressionInfo`. Major changes are as follows;
- Added a new test suite named `ExpressionInfoSuite`
- To improve test coverage, added a test for error handling in `ExpressionInfoSuite`
- Moved the `ExpressionInfo`-related tests from `UDFSuite` to `ExpressionInfoSuite`
- Moved the related tests from `SQLQuerySuite` to `ExpressionInfoSuite`
- Added a comment in `ExpressionInfoSuite` (followup of https://github.com/apache/spark/pull/28224)
### Why are the changes needed?
To improve test suites/coverage.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Added tests.
Closes#28308 from maropu/SPARK-31526.
Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
HiveClient instance is cross-session, the following configurations which are defined in HiveUtils and used to create it should be considered static:
1. spark.sql.hive.metastore.version - used to determine the hive version in Spark
2. spark.sql.hive.metastore.jars - hive metastore related jars location which is used by spark to create hive client
3. spark.sql.hive.metastore.sharedPrefixes and spark.sql.hive.metastore.barrierPrefixes - package names of classes that are shared or separated between SparkContextLoader and hive client class loader
Those are used only once when creating the hive metastore client. They should be static in SQLConf for retrieving them correctly. We should avoid them being changed by users with SET/RESET command.
Speaking of spark.sql.hive.version - the fake of the spark.sql.hive.metastore.version, it is used by jdbc/thrift client for backward compatibility.
### Why are the changes needed?
bugfix, these configurations should not be changed.
### Does this PR introduce any user-facing change?
Yes, the following set of configs are not allowed to change.
```
Seq("spark.sql.hive.metastore.version ",
"spark.sql.hive.metastore.jars",
"spark.sql.hive.metastore.sharedPrefixes",
"spark.sql.hive.metastore.barrierPrefixes")
```
### How was this patch tested?
add unit test
Closes#28302 from yaooqinn/SPARK-31522.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Add migration guide for removed accumulator v1 APIs.
### Why are the changes needed?
Provide better guidance for users' migration.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Pass Jenkins.
Closes#28309 from Ngone51/SPARK-16775-migration-guide.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Need to address a few more comments
### Why are the changes needed?
Fix a few problems
### Does this PR introduce any user-facing change?
Yes
### How was this patch tested?
Manually build and check
Closes#28306 from huaxingao/literal-folllowup.
Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
Override the canonicalized fields with respect to the result of `needsTimeZone`.
### Why are the changes needed?
The current approach breaks sematic equal of two cast expressions that don't relate with datetime type. If we don't need to use `timeZone` information casting `from` type to `to` type, then the timeZoneId should not influence the canonicalize result.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
New UT added.
Closes#28288 from xuanyuanking/SPARK-31515.
Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
SPARK-31476 has supported `extract('field', source)` as side-effect, so this PR intends to add some tests for the function in `SQLQueryTestSuite`.
### Why are the changes needed?
For better test coverage.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Added tests.
Closes#28276 from maropu/SPARK-31476-FOLLOWUP.
Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
\_FUNC\_ is used in note() of `ExpressionDescription` since https://github.com/apache/spark/pull/28248, it can be more cases later, we should replace it with function name for documentation
### Why are the changes needed?
doc fix
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
pass Jenkins, and verify locally with Jekyll serve
Closes#28305 from yaooqinn/SPARK-31474-F.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
- Document row field values of `DATE` and `TIMESTAMP` type returned by `Row.get()` and `Row.apply`.
- Refer to `Row.get()` from the description of filter values
### Why are the changes needed?
Reflect current behaviour of Row's method `apply()` and `get()` in comments to inform users about different return types that are depended on the SQL config settings `spark.sql.datetime.java8API.enabled`.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Run `$ ./dev/scalastyle`
Closes#28300 from MaxGekk/doc-filter-date-time.
Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This exposes the `filterByRange` method from `OrderedRDDFunctions` in the Java API (as a method of JavaPairRDD).
This is the only method of `OrderedRDDFunctions` which is not exposed in the Java API so far.
### Why are the changes needed?
This improves the consistency between the Scala and Java APIs. Calling the Scala method manually from a Java context is cumbersome as it requires passing many ClassTags.
### Does this PR introduce any user-facing change?
Yes, a new method in the Java API.
### How was this patch tested?
With unit tests. The implementation of the Scala method is already tested independently and it was not touched in this PR.
Suggesting srowen as a reviewer.
Closes#28293 from wetneb/SPARK-31518.
Authored-by: Antonin Delpeuch <antonin@delpeuch.eu>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Seems like in certain environment, it requires to set `setwd` as below:
```
> library(devtools); devtools::document(pkg="./pkg", roclets=c("rd"))
Loading required package: usethis
Error: Could not find package root, is your working directory inside a package?
```
see also https://stackoverflow.com/questions/52670051/how-to-troubleshoot-error-could-not-find-package-root and https://groups.google.com/forum/#!topic/rdevtools/79jjjdc_wjg
We can make up another story too. For example, if you set a specific directory in your `~/.Rprofile`, then R documentation build will fail as below:
```
echo 'setwd("~")' > ~/.Rprofile
sh R/create-rd.sh
```
```
Using R_SCRIPT_PATH = /usr/local/bin
Loading required package: usethis
Error: Can't find './pkg'.
Execution halted
```
This PR proposes to set the `setwd` explicitly so it does not get affected on the global environment.
To make R dev env more independent.
No, dev only.
Manually tested:
```bash
echo 'setwd("~")' > ~/.Rprofile
sh R/create-rd.sh
```
Before:
```
Using R_SCRIPT_PATH = /usr/local/bin
Loading required package: usethis
Error: Can't find './pkg'.
Execution halted
```
After:
```
Using R_SCRIPT_PATH = /usr/local/bin
Loading required package: usethis
Updating SparkR documentation
Loading SparkR
Creating a new generic function for ‘as.data.frame’ in package ‘SparkR’
Creating a new generic function for ‘colnames’ in package ‘SparkR’
Creating a new generic function for ‘colnames<-’ in package ‘SparkR’
Creating a new generic function for ‘cov’ in package ‘SparkR’
Creating a new generic function for ‘drop’ in package ‘SparkR’
Creating a new generic function for ‘na.omit’ in package ‘SparkR’
Creating a new generic function for ‘filter’ in package ‘SparkR’
Creating a new generic function for ‘intersect’ in package ‘SparkR’
...
```
Closes#28285
### What changes were proposed in this pull request?
As part of the Stage level scheduling features, add the Python api's to set resource profiles.
This also adds the functionality to properly apply the pyspark memory configuration when specified in the ResourceProfile. The pyspark memory configuration is being passed in the task local properties. This was an easy way to get it to the PythonRunner that needs it. I modeled this off how the barrier task scheduling is passing the addresses. As part of this I added in the JavaRDD api's because those are needed by python.
### Why are the changes needed?
python api for this feature
### Does this PR introduce any user-facing change?
Yes adds the java and python apis for user to specify a ResourceProfile to use stage level scheduling.
### How was this patch tested?
unit tests and manually tested on yarn. Tests also run to verify it errors properly on standalone and local mode where its not yet supported.
Closes#28085 from tgravescs/SPARK-29641-pr-base.
Lead-authored-by: Thomas Graves <tgraves@nvidia.com>
Co-authored-by: Thomas Graves <tgraves@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
When loading DataFrames from JDBC datasource with Kerberos authentication, remote executors (yarn-client/cluster etc. modes) fail to establish a connection due to lack of Kerberos ticket or ability to generate it.
This is a real issue when trying to ingest data from kerberized data sources (SQL Server, Oracle) in enterprise environment where exposing simple authentication access is not an option due to IT policy issues.
In this PR I've added DB2 support (other supported databases will come in later PRs).
What this PR contains:
* Added `DB2ConnectionProvider`
* Added `DB2ConnectionProviderSuite`
* Added `DB2KrbIntegrationSuite` docker integration test
* Changed DB2 JDBC driver to use the latest (test scope only)
* Changed test table data type to a type which is supported by all the databases
* Removed double connection creation on test side
* Increased connection timeout in docker tests because DB2 docker takes quite a time to start
### Why are the changes needed?
Missing JDBC kerberos support.
### Does this PR introduce any user-facing change?
Yes, now user is able to connect to DB2 using kerberos.
### How was this patch tested?
* Additional + existing unit tests
* Additional + existing integration tests
* Test on cluster manually
Closes#28215 from gaborgsomogyi/SPARK-31272.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@apache.org>
### What changes were proposed in this pull request?
This PR fixes a typo in deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala file.
### Why are the changes needed?
To deliver correct explanation about how the placement policy works.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
UT as specified, although shouldn't influence any functionality since it's in the comment.
Closes#28267 from asclepiusaka/master.
Authored-by: Cong Du <asclepius1993@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
Remove the requirement to launch a task in order to reset locality wait timer.
### Why are the changes needed?
Recently https://github.com/apache/spark/pull/27207 was merged, but contained a bug which leads to undesirable behavior.
The crux of the issue is that single resource offers couldn't reset the timer, if there had been a previous reject followed by an allResourceOffer with no available resources.
This lead to a problem where once locality level reached ANY, single resource offers are all accepted, leading allResourceOffers to be left with no resources to utilize (hence no task being launched on an all resource offer -> no timer reset). The task manager would be stuck in ANY locality level.
Noting down here the downsides of using below reset conditions, in case we want to follow up.
As this is quite complex, I could easily be missing something, so please comment/respond if you have more bad behavior scenarios or find something wrong here:
The format is:
> **Reset condition**
> - the unwanted side effect
> - the cause/use case
Below references to locality increase/decrease mean:
```
PROCESS_LOCAL, NODE_LOCAL ... .. ANY
------ locality decrease --->
<----- locality increase -----
```
**Task launch:**
- locality decrease:
- Blacklisting, FAIR/FIFO scheduling, or task resource requirements can minimize tasks launched
- locality increase:
- single task launch decreases locality despite many tasks remaining
**No delay schedule reject since last allFreeResource offer**
- locality decrease:
- locality wait less than allFreeResource offer frequency, which occurs at least 1 per second
- locality increase:
- single resource (or none) not rejected despite many tasks remaining (other lower priority tasks utilizing resources)
**Current impl - No delay schedule reject since last (allFreeResource offer + task launch)**
- locality decrease:
- all from above
- locality increase:
- single resource accepted and task launched despite many tasks remaining
The current impl is an improvement on the legacy (task launch) in that unintended locality decrease case is similar and the unintended locality increase case only occurs when the cluster is fully utilized.
For the locality increase cases, perhaps a config which specifies a certain % of tasks in a taskset to finish before resetting locality levels would be helpful.
**If** that was considered a good approach then perhaps removing the task launch as a requirement would eliminate most of downsides listed above.
Lemme know if you have more ideas for eliminating locality increase downside of **No delay schedule reject since last allFreeResource offer**
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
TaskSchedulerImplSuite
Also manually tested similar to how I tested in https://github.com/apache/spark/pull/27207 using [this simple app](https://github.com/bmarcott/spark-test-apps/blob/master/src/main/scala/TestLocalityWait.scala).
With the new changes, given locality wait of 10s the behavior is generally:
10 seconds of locality being respected, followed by a single full utilization of resources using ANY locality level, followed by 10 seconds of locality being respected, and so on
If the legacy flag is enabled (spark.locality.wait.legacyResetOnTaskLaunch=true), the behavior is only scheduling PROCESS_LOCAL tasks (only utilizing a single executor)
cloud-fan
tgravescs
Closes#28188 from bmarcott/nmarcott-locality-fix.
Authored-by: Nicholas Marcott <481161+bmarcott@users.noreply.github.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
### What changes were proposed in this pull request?
As suggested by https://github.com/apache/spark/pull/28255#discussion_r412619438, this patch proposes to use taskAttemptId in checkpoint filename, instead of stageAttemptNumber + attemptNumber.
### Why are the changes needed?
To simplify checkpoint simplified and unique.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Existing tests.
Closes#28289 from viirya/SPARK-31484-followup.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Extracting millennium, century, decade, millisecond, microsecond and epoch from datetime is neither ANSI standard nor quite common in modern SQL platforms. Most of the systems listing below does not support these except PostgreSQL and redshift.
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDFhttps://docs.oracle.com/cd/B19306_01/server.102/b14200/functions050.htmhttps://prestodb.io/docs/current/functions/datetime.htmlhttps://docs.cloudera.com/documentation/enterprise/5-8-x/topics/impala_datetime_functions.htmlhttps://docs.snowflake.com/en/sql-reference/functions-date-time.html#label-supported-date-time-partshttps://www.postgresql.org/docs/9.1/functions-datetime.html#FUNCTIONS-DATETIME-EXTRACT
This PR removes these extract fields support from extract function for date and timestamp values
`isoyear` is PostgreSQL specific but `yearofweek` is more commonly used across platforms
`isodow` is PostgreSQL specific but `iso` as a suffix is more commonly used across platforms so, `dow_iso` and `dayofweek_iso` is used to replace it.
For historical reasons, we have [`dayofweek`, `dow`] implemented for representing a non-ISO day-of-week and a newly added `isodow` from PostgreSQL for ISO day-of-week. Many other systems only have one week-numbering system support and use either full names or abbreviations. Things in spark become a little bit complicated.
1. because of the existence of `isodow`, so we need to add iso-prefix to `dayofweek` to make a pair for it too. [`dayofweek`, `isodayofweek`, `dow` and `isodow`]
2. because there are rare `iso`-prefixed systems and more systems choose `iso`-suffixed way, so we may result in [`dayofweek`, `dayofweekiso`, `dow`, `dowiso`]
3. `dayofweekiso` looks nice and has use cases in the platforms listed above, e.g. snowflake, but `dowiso` looks weird and no use cases found.
4. with a discussion the community,we have agreed with an underscore before `iso` may look much better because `isodow` is new and there is no standard for `iso` kind of things, so this may be good for us to make it simple and clear for end-users if they are well documented too.
Thus, we finally result in [`dayofweek`, `dow`] for Non-ISO day-of-week system and [`dayofweek_iso`, `dow_iso`] for ISO system
### Why are the changes needed?
Remove some nonstandard and uncommon features as we can add them back if necessary
### Does this PR introduce any user-facing change?
NO, we should target this to 3.0.0 and these are added during 3.0.0
### How was this patch tested?
Remove unused tests
Closes#28284 from yaooqinn/SPARK-31507.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Currently, only the non-static public SQL configurations are dump to public doc, we'd better also add those static public ones as the command `set -v`
This PR force call StaticSQLConf to buildStaticConf.
### Why are the changes needed?
Fix missing SQL configurations in doc
### Does this PR introduce any user-facing change?
NO
### How was this patch tested?
add unit test and verify locally to see if public static SQL conf is in `docs/sql-config.html`
Closes#28274 from yaooqinn/SPARK-31498.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR increases the thread safety of the `BytesToBytesMap`:
- It makes the `iterator()` and `destructiveIterator()` methods used their own `Location` object. This used to be shared, and this was causing issues when the map was being iterated over in two threads by two different iterators.
- Removes the `safeIterator()` function. This is not needed anymore.
- Improves the documentation of a couple of methods w.r.t. thread-safety.
### Why are the changes needed?
It is unexpected an iterator shares the object it is returning with all other iterators. This is a violation of the iterator contract, and it causes issues with iterators over a map that are consumed in different threads.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Existing tests.
Closes#28286 from hvanhovell/SPARK-31511.
Authored-by: herman <herman@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
askAbortable method throw TimeoutException while it does no complete in time. Currently, the error message contains null as remoteAddr when receiver is in client mode.
This change is to print out correct rpcAddress instead of null in the error message.
### Why are the changes needed?
It provides the address of an endpoint which does not reply in time. It helps users to find slow executors when timeout happens.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Add a unit test.
Closes#28002 from Huang-yi-3456/SPARK-31233-enhance-rpctimeoutexception-log.
Authored-by: Huang-Yi <huang.yi.3456@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
override the `sql` method of `StringTrim`, `StringTrimLeft` and `StringTrimRight`, to use the standard SQL syntax.
### Why are the changes needed?
The current implementation is wrong. It gives you a SQL string that returns different result.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
new tests
Closes#28281 from cloud-fan/sql.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR adds a new parquet/avro file metadata: `org.apache.spark.legacyDatetime`. It indicates that the file was written with the "rebaseInWrite" config enabled, and spark need to do rebase when reading it.
This makes Spark be able to do rebase more smartly:
1. If we don't know which Spark version writes the file, do rebase if the "rebaseInRead" config is true.
2. If the file was written by Spark 2.4 and earlier, then do rebase.
3. If the file was written by Spark 3.0 and later, do rebase if the `org.apache.spark.legacyDatetime` exists in file metadata.
### Why are the changes needed?
It's very easy to have mixed-calendar parquet/avro files: e.g. A user upgrades to Spark 3.0 and writes some parquet files to an existing directory. Then he realizes that the directory contains legacy datetime values before 1582. However, it's too late and he has to find out all the legacy files manually and read them separately.
To support mixed-calendar parquet/avro files, we need to decide to rebase or not based on the file metadata.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Updated test
Closes#28137 from cloud-fan/datetime.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
In `verboseStringWithOperatorId`, use `output` (it's `Seq[Attribute]`) instead of `producedAttributes` (it's `AttributeSet`) to generates `"Output"` for the leaf node in order to make `"Output"` determined.
### Why are the changes needed?
Currently, Formatted Explain use `producedAttributes`, the `AttributeSet`, to generate `"Output"`. As a result, the fields order within `"Output"` can be different from time to time. It's That means, for the same plan, it could have different explain outputs.
### Does this PR introduce any user-facing change?
Yes, user see the determined fields order within formatted explain now.
### How was this patch tested?
Added a regression test.
Closes#28282 from Ngone51/fix_output.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
```sql
spark-sql> SELECT extract(dayofweek from '2009-07-26');
1
spark-sql> SELECT extract(dow from '2009-07-26');
0
spark-sql> SELECT extract(isodow from '2009-07-26');
7
spark-sql> SELECT dayofweek('2009-07-26');
1
spark-sql> SELECT weekday('2009-07-26');
6
```
Currently, there are 4 types of day-of-week range:
1. the function `dayofweek`(2.3.0) and extracting `dayofweek`(2.4.0) result as of Sunday(1) to Saturday(7)
2. extracting `dow`(3.0.0) results as of Sunday(0) to Saturday(6)
3. extracting` isodow` (3.0.0) results as of Monday(1) to Sunday(7)
4. the function `weekday`(2.4.0) results as of Monday(0) to Sunday(6)
Actually, extracting `dayofweek` and `dow` are both derived from PostgreSQL but have different meanings.
https://issues.apache.org/jira/browse/SPARK-23903https://issues.apache.org/jira/browse/SPARK-28623
In this PR, we make extracting `dow` as same as extracting `dayofweek` and the `dayofweek` function for historical reason and not breaking anything.
Also, add more documentation to the extracting function to make extract field more clear to understand.
### Why are the changes needed?
Consistency insurance
### Does this PR introduce any user-facing change?
yes, doc updated and extract `dow` is as same as `dayofweek`
### How was this patch tested?
1. modified ut
2. local SQL doc verification
#### before
![image](https://user-images.githubusercontent.com/8326978/79601949-3535b100-811c-11ea-957b-a33d68641181.png)
#### after
![image](https://user-images.githubusercontent.com/8326978/79601847-12a39800-811c-11ea-8ff6-aa329255d099.png)
Closes#28248 from yaooqinn/SPARK-31474.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Rewrite the periodically check logic of `abortableRpcFuture` to make sure that barrier task would always return either desired messages or expected exception.
This PR also simplify a bit around `AbortableRpcFuture`.
### Why are the changes needed?
Currently, the periodically check logic of `abortableRpcFuture` is done by following:
```scala
...
var messages: Array[String] = null
while (!abortableRpcFuture.toFuture.isCompleted) {
messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 1.second)
...
}
return messages
```
It's possible that `abortableRpcFuture` complete before next invocation on `messages = ...`. In this case, the task may return null messages or execute successfully while it should throw exception(e.g. `SparkException` from `BarrierCoordinator`).
And here's a flaky test which caused by this bug:
```
[info] BarrierTaskContextSuite:
[info] - share messages with allGather() call *** FAILED *** (18 seconds, 705 milliseconds)
[info] org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(0, 2) finished unsuccessfully.
[info] java.lang.NullPointerException
[info] at scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:204)
[info] at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:204)
[info] at scala.collection.IndexedSeqOptimized.toList(IndexedSeqOptimized.scala:285)
[info] at scala.collection.IndexedSeqOptimized.toList$(IndexedSeqOptimized.scala:284)
[info] at scala.collection.mutable.ArrayOps$ofRef.toList(ArrayOps.scala:198)
[info] at org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$new$4(BarrierTaskContextSuite.scala:68)
...
```
The test exception can be reproduced by changing the line `messages = ...` to the following:
```scala
messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 10.micros)
Thread.sleep(5000)
```
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Manually test and update some unit tests.
Closes#28245 from Ngone51/fix_barrier.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>