Commit graph

27211 commits

Author SHA1 Message Date
yi.wu ab8cada1f9
[SPARK-31521][CORE] Correct the fetch size when merging blocks into a merged block
### What changes were proposed in this pull request?

Fix the wrong fetch size.

### Why are the changes needed?

The fetch size should be the sum of the size of merged block and the total size of those merging blocks. But we missed the size of merged block.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added a regression test.

Closes #28301 from Ngone51/fix_merged_block_size.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-04-24 22:11:35 -07:00
Wei Zhang 3e83ccc5d8
[SPARK-31516][DOC] Fix non-existed metric hiveClientCalls.count of CodeGenerator in DOC
### What changes were proposed in this pull request?
This PR proposes to remove the non-existed `hiveClientCalls.count` metric documentation of `CodeGenerator` of the Spark metrics system in the monitoring guide.

There is a duplicated `hiveClientCalls.count` metric in both `namespace=HiveExternalCatalog` and  `namespace=CodeGenerator` bullet lists, but there is only one defined inside object `HiveCatalogMetrics`.

Closes #28292 from wezhang/monitoringdoc.

Authored-by: Wei Zhang <wezhang@outlook.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-04-24 21:52:50 -07:00
Gengliang Wang 16b961526d
[SPARK-31560][SQL][TESTS] Add V1/V2 tests for TextSuite and WholeTextFileSuite
### What changes were proposed in this pull request?

 Add V1/V2 tests for TextSuite and WholeTextFileSuite

### Why are the changes needed?

This is missing part since #24207. We should have these tests for test coverage.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Unit tests.

Closes #28335 from gengliangwang/testV2Suite.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-04-24 18:59:15 -07:00
Holden Karau 9faad07ce7 HOTFIX Revert "[SPARK-20732][CORE] Decommission cache blocks
HOTFIX test issue introduced in SPARK-20732

Closes #28337 from holdenk/revert-SPARK-20732.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
2020-04-24 18:51:25 -07:00
Kent Yao f92652d0b5
[SPARK-31528][SQL] Remove millennium, century, decade from trunc/date_trunc fucntions
### What changes were proposed in this pull request?

Similar to https://jira.apache.org/jira/browse/SPARK-31507, millennium, century, and decade are not commonly used in most modern platforms.

For example
Negative:
https://docs.snowflake.com/en/sql-reference/functions-date-time.html#supported-date-and-time-parts
https://prestodb.io/docs/current/functions/datetime.html#date_trunc
https://teradata.github.io/presto/docs/148t/functions/datetime.html#date_trunc
https://www.oracletutorial.com/oracle-date-functions/oracle-trunc/

Positive:
https://docs.aws.amazon.com/redshift/latest/dg/r_Dateparts_for_datetime_functions.html
https://www.postgresql.org/docs/9.1/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC

This PR removes these `fmt`s support for trunc and date_trunc functions.

### Why are the changes needed?

clean uncommon datetime unit for easy maintenance, we can add them back if they are found very useful later.

### Does this PR introduce any user-facing change?
no, targeting 3.0.0, these are newly added in 3.0.0

### How was this patch tested?

remove and modify existing units tests

Closes #28313 from yaooqinn/SPARK-31528.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-04-24 18:28:41 -07:00
Kent Yao caf3ab8411
[SPARK-31552][SQL] Fix ClassCastException in ScalaReflection arrayClassFor
### What changes were proposed in this pull request?

the 2 method `arrayClassFor` and `dataTypeFor` in `ScalaReflection` call each other circularly, the cases in `dataTypeFor` are not fully handled in `arrayClassFor`

For example:
```scala
scala> implicit def newArrayEncoder[T <: Array[_] : TypeTag]: Encoder[T] = ExpressionEncoder()
newArrayEncoder: [T <: Array[_]](implicit evidence$1: reflect.runtime.universe.TypeTag[T])org.apache.spark.sql.Encoder[T]

scala> val decOne = Decimal(1, 38, 18)
decOne: org.apache.spark.sql.types.Decimal = 1E-18

scala> val decTwo = Decimal(2, 38, 18)
decTwo: org.apache.spark.sql.types.Decimal = 2E-18

scala> val decSpark = Array(decOne, decTwo)
decSpark: Array[org.apache.spark.sql.types.Decimal] = Array(1E-18, 2E-18)

scala> Seq(decSpark).toDF()
java.lang.ClassCastException: org.apache.spark.sql.types.DecimalType cannot be cast to org.apache.spark.sql.types.ObjectType
  at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$arrayClassFor$1(ScalaReflection.scala:131)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:879)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:878)
  at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
  at org.apache.spark.sql.catalyst.ScalaReflection$.arrayClassFor(ScalaReflection.scala:120)
  at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$dataTypeFor$1(ScalaReflection.scala:105)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:879)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:878)
  at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
  at org.apache.spark.sql.catalyst.ScalaReflection$.dataTypeFor(ScalaReflection.scala:88)
  at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerForType$1(ScalaReflection.scala:399)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:879)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:878)
  at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
  at org.apache.spark.sql.catalyst.ScalaReflection$.serializerForType(ScalaReflection.scala:393)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:57)
  at newArrayEncoder(<console>:57)
  ... 53 elided

scala>
```

In this PR, we add the missing cases to `arrayClassFor`

### Why are the changes needed?

bugfix as described above

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

add a test for array encoders

Closes #28324 from yaooqinn/SPARK-31552.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-04-24 18:04:26 -07:00
Gabor Somogyi 0ca3605d3d
[SPARK-31533][SQL][TESTS] Enable DB2IntegrationSuite test and upgrade the DB2 docker inside
### What changes were proposed in this pull request?
This is a followup PR discussed [here](https://github.com/apache/spark/pull/28215#discussion_r410748547).

### Why are the changes needed?
It would be good to re-enable `DB2IntegrationSuite` and upgrade the docker image inside to use the latest.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing docker integration tests.

Closes #28325 from gaborgsomogyi/SPARK-31533.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-04-24 17:56:58 -07:00
Huaxin Gao 054bef94ca [SPARK-31491][SQL][DOCS] Re-arrange Data Types page to document Floating Point Special Values
### What changes were proposed in this pull request?
Re-arrange Data Types page to document Floating Point Special Values

### Why are the changes needed?
To complete SQL Reference

### Does this PR introduce any user-facing change?
Yes

- add Floating Point Special Values in Data Types page
- move NaN Semantics to Data Types page

<img width="1050" alt="Screen Shot 2020-04-24 at 9 14 57 AM" src="https://user-images.githubusercontent.com/13592258/80233996-3da25600-860c-11ea-8285-538efc16e431.png">

<img width="1050" alt="Screen Shot 2020-04-24 at 9 15 22 AM" src="https://user-images.githubusercontent.com/13592258/80234001-4004b000-860c-11ea-8954-72f63c92d50d.png">

<img width="1049" alt="Screen Shot 2020-04-24 at 9 15 44 AM" src="https://user-images.githubusercontent.com/13592258/80234006-41ce7380-860c-11ea-96bf-15e1aa2102ff.png">

### How was this patch tested?
Manually build and check

Closes #28264 from huaxingao/datatypes.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-04-25 09:02:16 +09:00
Kent Yao 8424f55229 [SPARK-31532][SQL] Builder should not propagate static sql configs to the existing active or default SparkSession
### What changes were proposed in this pull request?

SparkSessionBuilder shoud not propagate static sql configurations to the existing active/default SparkSession
This seems a long-standing bug.

```scala
scala> spark.sql("set spark.sql.warehouse.dir").show
+--------------------+--------------------+
|                 key|               value|
+--------------------+--------------------+
|spark.sql.warehou...|file:/Users/kenty...|
+--------------------+--------------------+

scala> spark.sql("set spark.sql.warehouse.dir=2");
org.apache.spark.sql.AnalysisException: Cannot modify the value of a static config: spark.sql.warehouse.dir;
  at org.apache.spark.sql.RuntimeConfig.requireNonStaticConf(RuntimeConfig.scala:154)
  at org.apache.spark.sql.RuntimeConfig.set(RuntimeConfig.scala:42)
  at org.apache.spark.sql.execution.command.SetCommand.$anonfun$x$7$6(SetCommand.scala:100)
  at org.apache.spark.sql.execution.command.SetCommand.run(SetCommand.scala:156)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
  ... 47 elided

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").get
getClass   getOrCreate

scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").getOrCreate
20/04/23 23:49:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
res7: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession6403d574

scala> spark.sql("set spark.sql.warehouse.dir").show
+--------------------+-----+
|                 key|value|
+--------------------+-----+
|spark.sql.warehou...|  xyz|
+--------------------+-----+

scala>
OptionsAttachments
```

### Why are the changes needed?
bugfix as shown in the previous section

### Does this PR introduce any user-facing change?

Yes, static SQL configurations with SparkSession.builder.config do not propagate to any existing or new SparkSession instances.

### How was this patch tested?

new ut.

Closes #28316 from yaooqinn/SPARK-31532.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-04-25 08:53:00 +09:00
Jian Tang 6a576161ae [SPARK-31364][SQL][TESTS] Benchmark Parquet Nested Field Predicate Pushdown
### What changes were proposed in this pull request?

This PR aims to add a benchmark suite for nested predicate pushdown with parquet file:

Performance comparison: Nested predicate pushdown disabled vs enabled,  with the following queries scenarios:

1.  When predicate pushed down, parquet reader are able to filter out all the row groups without loading them.

2. When predicate pushed down, parquet reader only loads one of the row groups.

3. When predicate pushed down, parquet reader can't filter out any row group in order to see if we introduce too much overhead or not when enabling nested predicate push down.

### Why are the changes needed?

No benchmark exists today for nested fields predicate pushdown performance evaluation.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
 Benchmark runs and reporting result.

Closes #28319 from JiJiTang/SPARK-31364.

Authored-by: Jian Tang <jian_tang@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2020-04-24 22:10:58 +00:00
Prakhar Jain 249b214590 [SPARK-20732][CORE] Decommission cache blocks to other executors when an executor is decommissioned
### What changes were proposed in this pull request?
After changes in SPARK-20628, CoarseGrainedSchedulerBackend can decommission an executor and stop assigning new tasks on it. We should also decommission the corresponding blockmanagers in the same way. i.e. Move the cached RDD blocks from those executors to other active executors.

### Why are the changes needed?
We need to gracefully decommission the block managers so that the underlying RDD cache blocks are not lost in case the executors are taken away forcefully after some timeout (because of spotloss/pre-emptible VM etc). Its good to save as much cache data as possible.

Also In future once the decommissioning signal comes from Cluster Manager (say YARN/Mesos etc), dynamic allocation + this change gives us opportunity to downscale the executors faster by making the executors free of cache data.

Note that this is a best effort approach. We try to move cache blocks from decommissioning executors to active executors. If the active executors don't have free resources available on them for caching, then the decommissioning executors will keep the cache block which it was not able to move and it will still be able to serve them.

Current overall Flow:

1. CoarseGrainedSchedulerBackend receives a signal to decommissionExecutor. On receiving the signal, it do 2 things - Stop assigning new tasks (SPARK-20628), Send another message to BlockManagerMasterEndpoint (via BlockManagerMaster) to decommission the corresponding BlockManager.

2. BlockManagerMasterEndpoint receives "DecommissionBlockManagers" message. On receiving this, it moves the corresponding block managers to "decommissioning" state. All decommissioning BMs are excluded from the getPeers RPC call which is used for replication. All these decommissioning BMs are also sent message from BlockManagerMasterEndpoint to start decommissioning process on themselves.

3. BlockManager on worker (say BM-x) receives the "DecommissionBlockManager" message. Now it will start BlockManagerDecommissionManager thread to offload all the RDD cached blocks. This thread can make multiple reattempts to decommission the existing cache blocks (multiple reattempts might be needed as there might not be sufficient space in other active BMs initially).

### Does this PR introduce any user-facing change?
NO

### How was this patch tested?
Added UTs.

Closes #27864 from prakharjain09/SPARK-20732-rddcache-1.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
2020-04-24 11:22:08 -07:00
zhengruifeng 0ede08bcb2 [SPARK-31007][ML] KMeans optimization based on triangle-inequality
### What changes were proposed in this pull request?
apply Lemma 1 in [Using the Triangle Inequality to Accelerate K-Means](https://www.aaai.org/Papers/ICML/2003/ICML03-022.pdf):

> Let x be a point, and let b and c be centers. If d(b,c)>=2d(x,b) then d(x,c) >= d(x,b);

It can be directly applied in EuclideanDistance, but not in CosineDistance.
However,  for CosineDistance we can luckily get a variant in the space of radian/angle.

### Why are the changes needed?
It help improving the performance of prediction and training (mostly)

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
existing testsuites

Closes #27758 from zhengruifeng/km_triangle.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-04-24 11:24:15 -05:00
Yuming Wang b10263b8e5 [SPARK-30724][SQL] Support 'LIKE ANY' and 'LIKE ALL' operators
### What changes were proposed in this pull request?

`LIKE ANY/SOME` and `LIKE ALL` operators are mostly used when we are matching a text field with numbers of patterns. For example:

Teradata / Hive 3.0 / Snowflake:
```sql
--like any
select 'foo' LIKE ANY ('%foo%','%bar%');

--like all
select 'foo' LIKE ALL ('%foo%','%bar%');
```
PostgreSQL:
```sql
-- like any
select 'foo' LIKE ANY (array['%foo%','%bar%']);

-- like all
select 'foo' LIKE ALL (array['%foo%','%bar%']);
```

This PR add support these two operators.

More details:
https://docs.teradata.com/reader/756LNiPSFdY~4JcCCcR5Cw/4~AyrPNmDN0Xk4SALLo6aQ
https://issues.apache.org/jira/browse/HIVE-15229
https://docs.snowflake.net/manuals/sql-reference/functions/like_any.html

### Why are the changes needed?

To smoothly migrate SQLs to Spark SQL.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Unit test.

Closes #27477 from wangyum/SPARK-30724.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-04-24 22:20:32 +09:00
yi.wu 463c54419b [SPARK-31010][SQL][DOC][FOLLOW-UP] Improve deprecated warning message for untyped scala udf
### What changes were proposed in this pull request?

Give more friendly warning message/migration guide of deprecated scala udf to users.

### Why are the changes needed?

User can not distinguish function signature between typed and untyped scala udf. Instead, we shall tell user what to do directly.

### Does this PR introduce any user-facing change?

No, it's newly added in Spark 3.0.

### How was this patch tested?

Pass Jenkins.

Closes #28311 from Ngone51/update_udf_doc.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-04-24 19:10:18 +09:00
Huaxin Gao b14b980ab8 [SPARK-31502][SQL][DOCS] Document identifier in SQL Reference
### What changes were proposed in this pull request?
Document identifier in SQL Reference

### Why are the changes needed?
make SQL Reference complete

### Does this PR introduce any user-facing change?
Yes
<img width="1049" alt="Screen Shot 2020-04-23 at 11 14 10 PM" src="https://user-images.githubusercontent.com/13592258/80180695-2f2a4f00-85b8-11ea-819b-f96872956d05.png">

<img width="1050" alt="Screen Shot 2020-04-23 at 11 32 32 PM" src="https://user-images.githubusercontent.com/13592258/80182062-e6c06080-85ba-11ea-9502-1c38358c97c9.png">

### How was this patch tested?
Manually build and check

Closes #28277 from huaxingao/identifier.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-04-24 08:05:27 +00:00
yi.wu 263f04db86 [SPARK-31485][CORE] Avoid application hang if only partial barrier tasks launched
### What changes were proposed in this pull request?

Use `dagScheduler.taskSetFailed` to abort a barrier stage instead of throwing exception within `resourceOffers`.

### Why are the changes needed?

Any non fatal exception thrown within Spark RPC framework can be swallowed:

100fc58da5/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala (L202-L211)

 The method `TaskSchedulerImpl.resourceOffers` is also within the scope of Spark RPC framework. Thus, throw exception inside `resourceOffers` won't fail the application.

 As a result, if a barrier stage fail the require check at `require(addressesWithDescs.size == taskSet.numTasks, ...)`, the barrier stage will fail the check again and again util all tasks from `TaskSetManager` dequeued.   But since the barrier stage isn't really executed, the application will hang.

The issue can be reproduced by the following test:

```scala
initLocalClusterSparkContext(2)
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"),Seq("executor_h_0")))
rdd.barrier().mapPartitions { iter =>
  BarrierTaskContext.get().barrier()
  iter
}.collect()
```

### Does this PR introduce any user-facing change?

Yes, application hang previously but fail-fast after this fix.

### How was this patch tested?

Added a regression test.

Closes #28257 from Ngone51/fix_barrier_abort.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-04-24 04:17:06 +00:00
Jungtaek Lim (HeartSaVioR) 39bc50dbf8 [SPARK-30804][SS] Measure and log elapsed time for "compact" operation in CompactibleFileStreamLog
### What changes were proposed in this pull request?

This patch adds some log messages to log elapsed time for "compact" operation in FileStreamSourceLog and FileStreamSinkLog (added in CompactibleFileStreamLog) to help investigating the mysterious latency spike during the batch run.

### Why are the changes needed?

Tracking latency is a critical aspect of streaming query. While "compact" operation may bring nontrivial latency (it's even synchronous, adding all the latency to the batch run), it's not measured and end users have to guess.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

N/A for UT. Manual test with streaming query using file source & file sink.

> grep "for compact batch" <driver log>

```
...
20/02/20 19:27:36 WARN FileStreamSinkLog: Compacting took 24473 ms (load: 14185 ms, write: 10288 ms) for compact batch 21359
20/02/20 19:27:39 WARN FileStreamSinkLog: Loaded 1068000 entries (397985432 bytes in memory), and wrote 1068000 entries for compact batch 21359
20/02/20 19:29:52 WARN FileStreamSourceLog: Compacting took 3777 ms (load: 1524 ms, write: 2253 ms) for compact batch 21369
20/02/20 19:29:52 WARN FileStreamSourceLog: Loaded 229477 entries (68970112 bytes in memory), and wrote 229477 entries for compact batch 21369
20/02/20 19:30:17 WARN FileStreamSinkLog: Compacting took 24183 ms (load: 12992 ms, write: 11191 ms) for compact batch 21369
20/02/20 19:30:20 WARN FileStreamSinkLog: Loaded 1068500 entries (398171880 bytes in memory), and wrote 1068500 entries for compact batch 21369
...
```

![Screen Shot 2020-02-21 at 12 34 22 PM](https://user-images.githubusercontent.com/1317309/75002142-c6830100-54a6-11ea-8da6-17afb056653b.png)

This messages are explaining why the operation duration peaks per every 10 batches which is compact interval. Latency from addBatch heavily increases in each peak which DOES NOT mean it takes more time to write outputs, but we have no idea if such message is not presented.

NOTE: The output may be a bit different from the code, as it may be changed a bit during review phase.

Closes #27557 from HeartSaVioR/SPARK-30804.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-04-24 12:34:44 +09:00
Dongjoon Hyun 6180028a37 [SPARK-31547][BUILD] Upgrade Genjavadoc to 0.16
### What changes were proposed in this pull request?

This PR aims to upgrade Genjavadoc to 0.16.

### Why are the changes needed?

Although we skipped Scala 2.12.11, this brings 2.12.11 official support and better 2.12.12 compatibility.

- https://github.com/lightbend/genjavadoc/commits/v0.16

### Does this PR introduce any user-facing change?

No. (The generated doc is the same)

### How was this patch tested?

Build with 0.15 and 0.16.
```
$ SKIP_PYTHONDOC=1 SKIP_RDOC=1 SKIP_SQLDOC=1 jekyll build
```

Compare the result. The generated doc is identical.
```
$ diff -r _site_0.15 _site_0.16 | grep -v '^diff -r' | grep -v 'Generated by javadoc' | sort | uniq
---
5c5
```

Closes #28321 from dongjoon-hyun/SPARK-31547.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2020-04-24 12:13:10 +09:00
Max Gekk 26165427c7 [SPARK-31488][SQL] Support java.time.LocalDate in Parquet filter pushdown
### What changes were proposed in this pull request?
1. Modified `ParquetFilters.valueCanMakeFilterOn()` to accept filters with `java.time.LocalDate` attributes.
2. Modified `ParquetFilters.dateToDays()` to support both types `java.sql.Date` and `java.time.LocalDate` in conversions to days.
3. Add implicit conversion from `LocalDate` to `Expression` (`Literal`).

### Why are the changes needed?
To support pushed down filters with `java.time.LocalDate` attributes. Before the changes, date filters are not pushed down to Parquet datasource when `spark.sql.datetime.java8API.enabled` is `true`.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Added a test to `ParquetFilterSuite`

Closes #28259 from MaxGekk/parquet-filter-java8-date-time.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-04-24 02:21:53 +00:00
Takeshi Yamamuro 42f496f6ac [SPARK-31526][SQL][TESTS] Add a new test suite for ExpressionInfo
### What changes were proposed in this pull request?

This PR intends to add a new test suite for `ExpressionInfo`. Major changes are as follows;

 - Added a new test suite named `ExpressionInfoSuite`
 - To improve test coverage, added a test for error handling in `ExpressionInfoSuite`
 - Moved the `ExpressionInfo`-related tests from `UDFSuite` to `ExpressionInfoSuite`
 - Moved the related tests from `SQLQuerySuite` to `ExpressionInfoSuite`
 - Added a comment in `ExpressionInfoSuite` (followup of https://github.com/apache/spark/pull/28224)

### Why are the changes needed?

To improve test suites/coverage.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added tests.

Closes #28308 from maropu/SPARK-31526.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-04-24 11:19:20 +09:00
bmarcott f093480af9
fix version for config spark.locality.wait.legacyResetOnTaskLaunch (#28307)
fix method return type doc
2020-04-23 14:38:15 -05:00
Kent Yao 8dc2c0247b [SPARK-31522][SQL] Hive metastore client initialization related configurations should be static
### What changes were proposed in this pull request?
HiveClient instance is cross-session, the following configurations which are defined in HiveUtils and used to create it should be considered static:

1. spark.sql.hive.metastore.version - used to determine the hive version in Spark
2. spark.sql.hive.metastore.jars - hive metastore related jars location which is used by spark to create hive client
3. spark.sql.hive.metastore.sharedPrefixes and spark.sql.hive.metastore.barrierPrefixes -  package names of classes that are shared or separated between SparkContextLoader and hive client class loader

Those are used only once when creating the hive metastore client. They should be static in SQLConf for retrieving them correctly. We should avoid them being changed by users with SET/RESET command.

Speaking of spark.sql.hive.version - the fake of the spark.sql.hive.metastore.version, it is used by jdbc/thrift client for backward compatibility.

### Why are the changes needed?

bugfix, these configurations should not be changed.

### Does this PR introduce any user-facing change?

Yes, the following set of configs are not allowed to change.
```
Seq("spark.sql.hive.metastore.version ",
      "spark.sql.hive.metastore.jars",
      "spark.sql.hive.metastore.sharedPrefixes",
      "spark.sql.hive.metastore.barrierPrefixes")
```
### How was this patch tested?

add unit test

Closes #28302 from yaooqinn/SPARK-31522.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-04-23 15:07:44 +00:00
yi.wu 6c018b31e2 [SPARK-16775][DOC][FOLLOW-UP] Add migration guide for removed accumulator v1 APIs
### What changes were proposed in this pull request?

Add migration guide for removed accumulator v1 APIs.

### Why are the changes needed?

Provide better guidance for users' migration.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass Jenkins.

Closes #28309 from Ngone51/SPARK-16775-migration-guide.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-04-23 10:59:35 +00:00
Huaxin Gao f543d6a1ee [SPARK-31465][SQL][DOCS][FOLLOW-UP] Document Literal in SQL Reference
### What changes were proposed in this pull request?
Need to address a few more comments

### Why are the changes needed?
Fix a few problems

### Does this PR introduce any user-facing change?
Yes

### How was this patch tested?
Manually build and check

Closes #28306 from huaxingao/literal-folllowup.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-04-23 15:03:20 +09:00
Yuanjian Li ca90e1932d [SPARK-31515][SQL] Canonicalize Cast should consider the value of needTimeZone
### What changes were proposed in this pull request?
Override the canonicalized fields with respect to the result of `needsTimeZone`.

### Why are the changes needed?
The current approach breaks sematic equal of two cast expressions that don't relate with datetime type. If we don't need to use `timeZone` information casting `from` type to `to` type, then the timeZoneId should not influence the canonicalize result.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
New UT added.

Closes #28288 from xuanyuanking/SPARK-31515.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-04-23 14:32:10 +09:00
Huaxin Gao 03fe9ee428 [SPARK-31465][SQL][DOCS] Document Literal in SQL Reference
### What changes were proposed in this pull request?
Document Literal in SQL Reference

### Why are the changes needed?
Make SQL Reference complete

### Does this PR introduce any user-facing change?
Yes
<img width="1049" alt="Screen Shot 2020-04-22 at 8 50 04 PM" src="https://user-images.githubusercontent.com/13592258/80057912-9ecb0c00-84dc-11ea-881e-1415108d674f.png">

<img width="1050" alt="Screen Shot 2020-04-22 at 8 50 29 PM" src="https://user-images.githubusercontent.com/13592258/80057917-a12d6600-84dc-11ea-8884-81f2a94644d5.png">

<img width="1050" alt="Screen Shot 2020-04-22 at 8 50 54 PM" src="https://user-images.githubusercontent.com/13592258/80057922-a4c0ed00-84dc-11ea-9857-75db50f7b054.png">

<img width="1050" alt="Screen Shot 2020-04-22 at 8 51 15 PM" src="https://user-images.githubusercontent.com/13592258/80057927-a7234700-84dc-11ea-9124-45ae1f6143fd.png">

<img width="1050" alt="Screen Shot 2020-04-22 at 8 51 44 PM" src="https://user-images.githubusercontent.com/13592258/80057932-ab4f6480-84dc-11ea-8393-cf005af13ce9.png">

<img width="1050" alt="Screen Shot 2020-04-22 at 8 52 03 PM" src="https://user-images.githubusercontent.com/13592258/80057936-ad192800-84dc-11ea-8d78-9f071a82f1df.png">

<img width="1050" alt="Screen Shot 2020-04-22 at 8 52 28 PM" src="https://user-images.githubusercontent.com/13592258/80057940-b0141880-84dc-11ea-97a7-f787cad0ee03.png">

<img width="1050" alt="Screen Shot 2020-04-22 at 8 53 14 PM" src="https://user-images.githubusercontent.com/13592258/80057945-b30f0900-84dc-11ea-985f-c070609e2329.png">

<img width="1050" alt="Screen Shot 2020-04-22 at 8 53 34 PM" src="https://user-images.githubusercontent.com/13592258/80057949-b5716300-84dc-11ea-9452-3f51137fe03d.png">

<img width="1050" alt="Screen Shot 2020-04-22 at 8 53 56 PM" src="https://user-images.githubusercontent.com/13592258/80057957-b904ea00-84dc-11ea-8b12-a6f00362aa55.png">

<img width="1049" alt="Screen Shot 2020-04-22 at 8 54 12 PM" src="https://user-images.githubusercontent.com/13592258/80057962-bacead80-84dc-11ea-94da-916b1d1c1756.png">

### How was this patch tested?
Manually build and check

Closes #28237 from huaxingao/literal.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-04-23 14:12:10 +09:00
Takeshi Yamamuro 820733aee2 [SPARK-31476][SQL][FOLLOWUP] Add tests for extract('field', source)
### What changes were proposed in this pull request?

SPARK-31476 has supported `extract('field', source)` as side-effect, so this PR intends to add some tests for the function in `SQLQueryTestSuite`.

### Why are the changes needed?

For better test coverage.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added tests.

Closes #28276 from maropu/SPARK-31476-FOLLOWUP.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-04-23 04:59:59 +00:00
Kent Yao 3b5792114a [SPARK-31474][SQL][FOLLOWUP] Replace _FUNC_ placeholder with functionname in the note field of expression info
### What changes were proposed in this pull request?

\_FUNC\_ is used in note() of `ExpressionDescription` since https://github.com/apache/spark/pull/28248, it can be more cases later, we should replace it with function name for documentation

### Why are the changes needed?

doc fix

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

pass Jenkins, and verify locally with Jekyll serve

Closes #28305 from yaooqinn/SPARK-31474-F.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-04-23 13:33:04 +09:00
Max Gekk e7856a7902 [MINOR][SQL] Add comments for filters values and return values of Row.get()/apply()
### What changes were proposed in this pull request?
- Document row field values of `DATE` and `TIMESTAMP` type returned by `Row.get()` and `Row.apply`.
- Refer to `Row.get()` from the description of filter values

### Why are the changes needed?
Reflect current behaviour of Row's method `apply()` and `get()` in comments to inform users about different return types that are depended on the SQL config settings `spark.sql.datetime.java8API.enabled`.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Run `$ ./dev/scalastyle`

Closes #28300 from MaxGekk/doc-filter-date-time.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-04-23 04:23:33 +00:00
Antonin Delpeuch 497024956a
[SPARK-31518][CORE] Expose filterByRange in JavaPairRDD
### What changes were proposed in this pull request?

This exposes the `filterByRange` method from `OrderedRDDFunctions` in the Java API (as a method of JavaPairRDD).

This is the only method of `OrderedRDDFunctions` which is not exposed in the Java API so far.

### Why are the changes needed?

This improves the consistency between the Scala and Java APIs. Calling the Scala method manually from a Java context is cumbersome as it requires passing many ClassTags.

### Does this PR introduce any user-facing change?

Yes, a new method in the Java API.

### How was this patch tested?

With unit tests. The implementation of the Scala method is already tested independently and it was not touched in this PR.

Suggesting srowen as a reviewer.

Closes #28293 from wetneb/SPARK-31518.

Authored-by: Antonin Delpeuch <antonin@delpeuch.eu>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-04-22 20:04:17 -07:00
HyukjinKwon da3c6c4e35 [SPARK-31510][R][BUILD] Set setwd in R documentation build
Seems like in certain environment, it requires to set `setwd` as below:

```
> library(devtools); devtools::document(pkg="./pkg", roclets=c("rd"))
Loading required package: usethis
Error: Could not find package root, is your working directory inside a package?
```

see also https://stackoverflow.com/questions/52670051/how-to-troubleshoot-error-could-not-find-package-root and https://groups.google.com/forum/#!topic/rdevtools/79jjjdc_wjg

We can make up another story too. For example, if you set a specific directory in your `~/.Rprofile`, then R documentation build will fail as below:

```
echo 'setwd("~")' > ~/.Rprofile
sh R/create-rd.sh
```

```
Using R_SCRIPT_PATH = /usr/local/bin
Loading required package: usethis
Error: Can't find './pkg'.
Execution halted
```

This PR proposes to set the `setwd` explicitly so it does not get affected on the global environment.

To make R dev env more independent.

No, dev only.

Manually tested:

```bash
echo 'setwd("~")' > ~/.Rprofile
sh R/create-rd.sh
```

Before:

```
Using R_SCRIPT_PATH = /usr/local/bin
Loading required package: usethis
Error: Can't find './pkg'.
Execution halted
```

After:

```
Using R_SCRIPT_PATH = /usr/local/bin
Loading required package: usethis
Updating SparkR documentation
Loading SparkR
Creating a new generic function for ‘as.data.frame’ in package ‘SparkR’
Creating a new generic function for ‘colnames’ in package ‘SparkR’
Creating a new generic function for ‘colnames<-’ in package ‘SparkR’
Creating a new generic function for ‘cov’ in package ‘SparkR’
Creating a new generic function for ‘drop’ in package ‘SparkR’
Creating a new generic function for ‘na.omit’ in package ‘SparkR’
Creating a new generic function for ‘filter’ in package ‘SparkR’
Creating a new generic function for ‘intersect’ in package ‘SparkR’
...
```

Closes #28285
2020-04-23 10:23:01 +09:00
Thomas Graves 95aec091e4 [SPARK-29641][PYTHON][CORE] Stage Level Sched: Add python api's and tests
### What changes were proposed in this pull request?

As part of the Stage level scheduling features, add the Python api's to set resource profiles.
This also adds the functionality to properly apply the pyspark memory configuration when specified in the ResourceProfile. The pyspark memory configuration is being passed in the task local properties. This was an easy way to get it to the PythonRunner that needs it. I modeled this off how the barrier task scheduling is passing the addresses. As part of this I added in the JavaRDD api's because those are needed by python.

### Why are the changes needed?

python api for this feature

### Does this PR introduce any user-facing change?

Yes adds the java and python apis for user to specify a ResourceProfile to use stage level scheduling.

### How was this patch tested?

unit tests and manually tested on yarn. Tests also run to verify it errors properly on standalone and local mode where its not yet supported.

Closes #28085 from tgravescs/SPARK-29641-pr-base.

Lead-authored-by: Thomas Graves <tgraves@nvidia.com>
Co-authored-by: Thomas Graves <tgraves@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-04-23 10:20:39 +09:00
Gabor Somogyi c619990c1d [SPARK-31272][SQL] Support DB2 Kerberos login in JDBC connector
### What changes were proposed in this pull request?
When loading DataFrames from JDBC datasource with Kerberos authentication, remote executors (yarn-client/cluster etc. modes) fail to establish a connection due to lack of Kerberos ticket or ability to generate it.

This is a real issue when trying to ingest data from kerberized data sources (SQL Server, Oracle) in enterprise environment where exposing simple authentication access is not an option due to IT policy issues.

In this PR I've added DB2 support (other supported databases will come in later PRs).

What this PR contains:
* Added `DB2ConnectionProvider`
* Added `DB2ConnectionProviderSuite`
* Added `DB2KrbIntegrationSuite` docker integration test
* Changed DB2 JDBC driver to use the latest (test scope only)
* Changed test table data type to a type which is supported by all the databases
* Removed double connection creation on test side
* Increased connection timeout in docker tests because DB2 docker takes quite a time to start

### Why are the changes needed?
Missing JDBC kerberos support.

### Does this PR introduce any user-facing change?
Yes, now user is able to connect to DB2 using kerberos.

### How was this patch tested?
* Additional + existing unit tests
* Additional + existing integration tests
* Test on cluster manually

Closes #28215 from gaborgsomogyi/SPARK-31272.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@apache.org>
2020-04-22 17:10:30 -07:00
Cong Du 54b97b2e14 [MINOR][DOCS] Fix a typo in ContainerPlacementStrategy's class comment
### What changes were proposed in this pull request?
This PR fixes a typo in deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala file.

### Why are the changes needed?
To deliver correct explanation about how the placement policy works.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
UT as specified, although shouldn't influence any functionality since it's in the comment.

Closes #28267 from asclepiusaka/master.

Authored-by: Cong Du <asclepius1993@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-04-22 09:44:43 -05:00
Nicholas Marcott 8b77b31835 [SPARK-18886][CORE][FOLLOWUP] allow follow up locality resets even if no task was launched
### What changes were proposed in this pull request?
Remove the requirement to launch a task in order to reset locality wait timer.

### Why are the changes needed?
Recently https://github.com/apache/spark/pull/27207 was merged, but contained a bug which leads to undesirable behavior.

The crux of the issue is that single resource offers couldn't reset the timer, if there had been a previous reject followed by an allResourceOffer with no available resources.
This lead to a problem where once locality level reached ANY, single resource offers are all accepted, leading allResourceOffers to be left with no resources to utilize (hence no task being launched on an all resource offer -> no timer reset). The task manager would be stuck in ANY locality level.

Noting down here the downsides of using below reset conditions, in case we want to follow up.
As this is quite complex, I could easily be missing something, so please comment/respond if you have more bad behavior scenarios or find something wrong here:
The format is:

> **Reset condition**
>  - the unwanted side effect
>      - the cause/use case

Below references to locality increase/decrease mean:
```
PROCESS_LOCAL, NODE_LOCAL ... .. ANY
    ------ locality decrease --->
   <----- locality increase -----
```

**Task launch:**
- locality decrease:
   - Blacklisting, FAIR/FIFO scheduling, or task resource requirements can minimize tasks launched
 - locality increase:
   - single task launch decreases locality despite many tasks remaining

**No delay schedule reject since last allFreeResource offer**
- locality decrease:
   - locality wait less than allFreeResource offer frequency, which occurs at least 1 per second
- locality increase:
   - single resource (or none) not rejected despite many tasks remaining (other lower priority tasks utilizing resources)

**Current impl - No delay schedule reject since last (allFreeResource offer + task launch)**
- locality decrease:
  - all from above
- locality increase:
   - single resource accepted and task launched despite many tasks remaining

The current impl is an improvement on the legacy (task launch) in that unintended locality decrease case is similar and the unintended locality increase case only occurs when the cluster is fully utilized.

For the locality increase cases, perhaps a config which specifies a certain % of tasks in a taskset to finish before resetting locality levels would be helpful.

**If** that was considered a good approach then perhaps removing the task launch as a requirement would eliminate most of downsides listed above.
Lemme know if you have more ideas for eliminating locality increase downside of **No delay schedule reject since last allFreeResource offer**

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
TaskSchedulerImplSuite

Also manually tested similar to how I tested in https://github.com/apache/spark/pull/27207 using [this simple app](https://github.com/bmarcott/spark-test-apps/blob/master/src/main/scala/TestLocalityWait.scala).

With the new changes, given locality wait of 10s the behavior is generally:
10 seconds of locality being respected, followed by a single full utilization of resources using ANY locality level, followed by 10 seconds of locality being respected, and so on

If the legacy flag is enabled (spark.locality.wait.legacyResetOnTaskLaunch=true), the behavior is only scheduling PROCESS_LOCAL tasks (only utilizing a single executor)

cloud-fan
tgravescs

Closes #28188 from bmarcott/nmarcott-locality-fix.

Authored-by: Nicholas Marcott <481161+bmarcott@users.noreply.github.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
2020-04-22 08:25:24 -05:00
yi.wu 8fbfdb38c0 [SPARK-31495][SQL] Support formatted explain for AQE
### What changes were proposed in this pull request?

To support formatted explain for AQE.

### Why are the changes needed?

AQE does not support formatted explain yet. It's good to support it for better user experience, debugging, etc.

Before:
```
== Physical Plan ==
AdaptiveSparkPlan (1)
+- * HashAggregate (unknown)
   +- CustomShuffleReader (unknown)
      +- ShuffleQueryStage (unknown)
         +- Exchange (unknown)
            +- * HashAggregate (unknown)
               +- * Project (unknown)
                  +- * BroadcastHashJoin Inner BuildRight (unknown)
                     :- * LocalTableScan (unknown)
                     +- BroadcastQueryStage (unknown)
                        +- BroadcastExchange (unknown)
                           +- LocalTableScan (unknown)

(1) AdaptiveSparkPlan
Output [4]: [k#7, count(v1)#32L, sum(v1)#33L, avg(v2)#34]
Arguments: HashAggregate(keys=[k#7], functions=[count(1), sum(cast(v1#8 as bigint)), avg(cast(v2#19 as bigint))]), AdaptiveExecutionContext(org.apache.spark.sql.SparkSession104ab57b), [PlanAdaptiveSubqueries(Map())], false
```

After:
```
== Physical Plan ==
 AdaptiveSparkPlan (14)
 +- * HashAggregate (13)
    +- CustomShuffleReader (12)
       +- ShuffleQueryStage (11)
          +- Exchange (10)
             +- * HashAggregate (9)
                +- * Project (8)
                   +- * BroadcastHashJoin Inner BuildRight (7)
                      :- * Project (2)
                      :  +- * LocalTableScan (1)
                      +- BroadcastQueryStage (6)
                         +- BroadcastExchange (5)
                            +- * Project (4)
                               +- * LocalTableScan (3)

 (1) LocalTableScan [codegen id : 2]
 Output [2]: [_1#x, _2#x]
 Arguments: [_1#x, _2#x]

 (2) Project [codegen id : 2]
 Output [2]: [_1#x AS k#x, _2#x AS v1#x]
 Input [2]: [_1#x, _2#x]

 (3) LocalTableScan [codegen id : 1]
 Output [2]: [_1#x, _2#x]
 Arguments: [_1#x, _2#x]

 (4) Project [codegen id : 1]
 Output [2]: [_1#x AS k#x, _2#x AS v2#x]
 Input [2]: [_1#x, _2#x]

 (5) BroadcastExchange
 Input [2]: [k#x, v2#x]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#x]

 (6) BroadcastQueryStage
 Output [2]: [k#x, v2#x]
 Arguments: 0

 (7) BroadcastHashJoin [codegen id : 2]
 Left keys [1]: [k#x]
 Right keys [1]: [k#x]
 Join condition: None

 (8) Project [codegen id : 2]
 Output [3]: [k#x, v1#x, v2#x]
 Input [4]: [k#x, v1#x, k#x, v2#x]

 (9) HashAggregate [codegen id : 2]
 Input [3]: [k#x, v1#x, v2#x]
 Keys [1]: [k#x]
 Functions [3]: [partial_count(1), partial_sum(cast(v1#x as bigint)), partial_avg(cast(v2#x as bigint))]
 Aggregate Attributes [4]: [count#xL, sum#xL, sum#x, count#xL]
 Results [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]

 (10) Exchange
 Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
 Arguments: hashpartitioning(k#x, 5), true, [id=#x]

 (11) ShuffleQueryStage
 Output [5]: [sum#xL, k#x, sum#x, count#xL, count#xL]
 Arguments: 1

 (12) CustomShuffleReader
 Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
 Arguments: coalesced

 (13) HashAggregate [codegen id : 3]
 Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
 Keys [1]: [k#x]
 Functions [3]: [count(1), sum(cast(v1#x as bigint)), avg(cast(v2#x as bigint))]
 Aggregate Attributes [3]: [count(1)#xL, sum(cast(v1#x as bigint))#xL, avg(cast(v2#x as bigint))#x]
 Results [4]: [k#x, count(1)#xL AS count(v1)#xL, sum(cast(v1#x as bigint))#xL AS sum(v1)#xL, avg(cast(v2#x as bigint))#x AS avg(v2)#x]

 (14) AdaptiveSparkPlan
 Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x]
 Arguments: isFinalPlan=true
```

### Does this PR introduce any user-facing change?

No, this should be new feature along with AQE in Spark 3.0.

### How was this patch tested?

Added a query file: `explain-aqe.sql` and a unit test.

Closes #28271 from Ngone51/support_formatted_explain_for_aqe.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-04-22 12:44:06 +00:00
Liang-Chi Hsieh 1d30884963 [SPARK-31484][CORE][FLOLLOWUP] Use taskAttemptId in checkpoint filename
### What changes were proposed in this pull request?

As suggested by https://github.com/apache/spark/pull/28255#discussion_r412619438, this patch proposes to use taskAttemptId in checkpoint filename, instead of stageAttemptNumber + attemptNumber.

### Why are the changes needed?

To simplify checkpoint simplified and unique.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #28289 from viirya/SPARK-31484-followup.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-04-22 21:23:48 +09:00
Kent Yao 37d2e037ed [SPARK-31507][SQL] Remove uncommon fields support and update some fields with meaningful names for extract function
### What changes were proposed in this pull request?

Extracting millennium, century, decade, millisecond, microsecond and epoch from datetime is neither ANSI standard nor quite common in modern SQL platforms. Most of the systems listing below does not support these except PostgreSQL and redshift.

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF

https://docs.oracle.com/cd/B19306_01/server.102/b14200/functions050.htm

https://prestodb.io/docs/current/functions/datetime.html

https://docs.cloudera.com/documentation/enterprise/5-8-x/topics/impala_datetime_functions.html

https://docs.snowflake.com/en/sql-reference/functions-date-time.html#label-supported-date-time-parts

https://www.postgresql.org/docs/9.1/functions-datetime.html#FUNCTIONS-DATETIME-EXTRACT

This PR removes these extract fields support from extract function for date and timestamp values

`isoyear` is PostgreSQL specific but `yearofweek` is more commonly used across platforms
`isodow` is PostgreSQL specific but `iso` as a suffix is more commonly used across platforms so, `dow_iso` and `dayofweek_iso` is used to replace it.

For historical reasons, we have [`dayofweek`, `dow`] implemented for representing a non-ISO day-of-week and a newly added `isodow` from PostgreSQL for ISO day-of-week. Many other systems only have one week-numbering system support and use either full names or abbreviations. Things in spark become a little bit complicated.
1. because of the existence of `isodow`, so we need to add iso-prefix to `dayofweek` to make a pair for it too. [`dayofweek`, `isodayofweek`, `dow` and `isodow`]
2. because there are rare `iso`-prefixed systems and more systems choose `iso`-suffixed way, so we may result in [`dayofweek`, `dayofweekiso`, `dow`, `dowiso`]
3. `dayofweekiso` looks nice and has use cases in the platforms listed above, e.g. snowflake, but `dowiso` looks weird and no use cases found.
4. with a discussion the community,we have agreed with an underscore before `iso` may look much better because `isodow` is new and there is no standard for `iso` kind of things, so this may be good for us to make it simple and clear for end-users if they are well documented too.

Thus, we finally result in [`dayofweek`, `dow`] for Non-ISO day-of-week system and [`dayofweek_iso`, `dow_iso`] for ISO system

### Why are the changes needed?

Remove some nonstandard and uncommon features as we can add them back if necessary

### Does this PR introduce any user-facing change?

NO, we should target this to 3.0.0 and these are added during 3.0.0

### How was this patch tested?

Remove unused tests

Closes #28284 from yaooqinn/SPARK-31507.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-04-22 10:24:49 +00:00
Kent Yao 2c2062ea7c [SPARK-31498][SQL][DOCS] Dump public static sql configurations through doc generation
### What changes were proposed in this pull request?

Currently, only the non-static public SQL configurations are dump to public doc, we'd better also add those static public ones as the command `set -v`

This PR force call StaticSQLConf to buildStaticConf.

### Why are the changes needed?

Fix missing SQL configurations in doc

### Does this PR introduce any user-facing change?

NO

### How was this patch tested?

add unit test and verify locally to see if public static SQL conf is in `docs/sql-config.html`

Closes #28274 from yaooqinn/SPARK-31498.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-04-22 10:16:39 +00:00
herman cf6038499d
[SPARK-31511][SQL] Make BytesToBytesMap iterators thread-safe
### What changes were proposed in this pull request?
This PR increases the thread safety of the `BytesToBytesMap`:
- It makes the `iterator()` and `destructiveIterator()` methods used their own `Location` object. This used to be shared, and this was causing issues when the map was being iterated over in two threads by two different iterators.
- Removes the `safeIterator()` function. This is not needed anymore.
- Improves the documentation of a couple of methods w.r.t. thread-safety.

### Why are the changes needed?
It is unexpected an iterator shares the object it is returning with all other iterators. This is a violation of the iterator contract, and it causes issues with iterators over a map that are consumed in different threads.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Existing tests.

Closes #28286 from hvanhovell/SPARK-31511.

Authored-by: herman <herman@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-04-21 18:17:19 -07:00
Huang-Yi a5c16cbf05
[SPARK-31233][CORE] Enhance RpcTimeoutException Log Message
### What changes were proposed in this pull request?

askAbortable method throw TimeoutException while it does no complete in time. Currently, the error message contains null as remoteAddr when receiver is in client mode.
This change is to print out correct rpcAddress instead of null in the error message.

### Why are the changes needed?

It provides the address of an endpoint which does not reply in time. It helps users to find slow executors when timeout happens.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Add a unit test.

Closes #28002 from Huang-yi-3456/SPARK-31233-enhance-rpctimeoutexception-log.

Authored-by: Huang-Yi <huang.yi.3456@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-04-21 14:08:37 -07:00
Wenchen Fan b209b5f406
[SPARK-31503][SQL] fix the SQL string of the TRIM functions
### What changes were proposed in this pull request?

override the `sql` method of `StringTrim`, `StringTrimLeft` and `StringTrimRight`, to use the standard SQL syntax.

### Why are the changes needed?

The current implementation is wrong. It gives you a SQL string that returns different result.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

new tests

Closes #28281 from cloud-fan/sql.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-04-21 11:22:18 -07:00
Wenchen Fan a5ebbacf53 [SPARK-31361][SQL] Rebase datetime in parquet/avro according to file metadata
### What changes were proposed in this pull request?

This PR adds a new parquet/avro file metadata: `org.apache.spark.legacyDatetime`. It indicates that the file was written with the "rebaseInWrite" config enabled, and spark need to do rebase when reading it.

This makes Spark be able to do rebase more smartly:
1. If we don't know which Spark version writes the file, do rebase if the "rebaseInRead" config is true.
2. If the file was written by Spark 2.4 and earlier, then do rebase.
3. If the file was written by Spark 3.0 and later, do rebase if the `org.apache.spark.legacyDatetime` exists in file metadata.

### Why are the changes needed?

It's very easy to have mixed-calendar parquet/avro files: e.g. A user upgrades to Spark 3.0 and writes some parquet files to an existing directory. Then he realizes that the directory contains legacy datetime values before 1582. However, it's too late and he has to find out all the legacy files manually and read them separately.

To support mixed-calendar parquet/avro files, we need to decide to rebase or not based on the file metadata.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Updated test

Closes #28137 from cloud-fan/datetime.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-04-22 00:26:23 +09:00
yi.wu 55b026a783 [SPARK-31504][SQL] Formatted Explain should have determined order of Output fields
### What changes were proposed in this pull request?

In `verboseStringWithOperatorId`, use `output` (it's `Seq[Attribute]`) instead of `producedAttributes` (it's `AttributeSet`) to generates `"Output"` for the leaf node in order to make `"Output"` determined.

### Why are the changes needed?

Currently, Formatted Explain use `producedAttributes`, the `AttributeSet`,  to generate `"Output"`. As a result, the fields order within `"Output"` can be different from time to time. It's That means, for the same plan, it could have different explain outputs.

### Does this PR introduce any user-facing change?

Yes, user see the determined fields order within formatted explain now.

### How was this patch tested?

Added a regression test.

Closes #28282 from Ngone51/fix_output.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-04-21 12:33:58 +00:00
Kent Yao 1985437110 [SPARK-31474][SQL] Consistency between dayofweek/dow in extract exprsession and dayofweek function
### What changes were proposed in this pull request?
```sql
spark-sql> SELECT extract(dayofweek from '2009-07-26');
1
spark-sql> SELECT extract(dow from '2009-07-26');
0
spark-sql> SELECT extract(isodow from '2009-07-26');
7
spark-sql> SELECT dayofweek('2009-07-26');
1
spark-sql> SELECT weekday('2009-07-26');
6
```
Currently, there are 4 types of day-of-week range:
1. the function `dayofweek`(2.3.0) and extracting `dayofweek`(2.4.0) result as of Sunday(1) to Saturday(7)
2. extracting `dow`(3.0.0) results as of Sunday(0) to Saturday(6)
3. extracting` isodow` (3.0.0) results as of Monday(1) to Sunday(7)
4. the function `weekday`(2.4.0) results as of Monday(0) to Sunday(6)

Actually, extracting `dayofweek` and `dow` are both derived from PostgreSQL but have different meanings.
https://issues.apache.org/jira/browse/SPARK-23903
https://issues.apache.org/jira/browse/SPARK-28623

In this PR, we make extracting `dow` as same as extracting `dayofweek` and the `dayofweek` function for historical reason and not breaking anything.

Also, add more documentation to the extracting function to make extract field more clear to understand.

### Why are the changes needed?

Consistency insurance

### Does this PR introduce any user-facing change?

yes, doc updated and extract `dow` is as same as `dayofweek`

### How was this patch tested?

1. modified ut
2. local SQL doc verification
#### before
![image](https://user-images.githubusercontent.com/8326978/79601949-3535b100-811c-11ea-957b-a33d68641181.png)

#### after
![image](https://user-images.githubusercontent.com/8326978/79601847-12a39800-811c-11ea-8ff6-aa329255d099.png)

Closes #28248 from yaooqinn/SPARK-31474.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-04-21 11:55:33 +00:00
yi.wu 7103f19fea [SPARK-31472][CORE] Make sure Barrier Task always return messages or exception with abortableRpcFuture check
### What changes were proposed in this pull request?

Rewrite the periodically check logic of  `abortableRpcFuture` to make sure that barrier task would always return either desired messages or expected exception.

This PR also simplify a bit around `AbortableRpcFuture`.

### Why are the changes needed?

Currently, the periodically check logic of  `abortableRpcFuture` is done by following:

```scala
...
var messages: Array[String] = null

while (!abortableRpcFuture.toFuture.isCompleted) {
   messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 1.second)
   ...
}
return messages
```
It's possible that `abortableRpcFuture` complete before next invocation on `messages = ...`. In this case, the task may return null messages or execute successfully while it should throw exception(e.g. `SparkException` from `BarrierCoordinator`).

And here's a flaky test which caused by this bug:

```
[info] BarrierTaskContextSuite:
[info] - share messages with allGather() call *** FAILED *** (18 seconds, 705 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(0, 2) finished unsuccessfully.
[info] java.lang.NullPointerException
[info] 	at scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:204)
[info] 	at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:204)
[info] 	at scala.collection.IndexedSeqOptimized.toList(IndexedSeqOptimized.scala:285)
[info] 	at scala.collection.IndexedSeqOptimized.toList$(IndexedSeqOptimized.scala:284)
[info] 	at scala.collection.mutable.ArrayOps$ofRef.toList(ArrayOps.scala:198)
[info] 	at org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$new$4(BarrierTaskContextSuite.scala:68)
...
```

The test exception can be reproduced by changing the line `messages = ...` to the following:

```scala
messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 10.micros)
Thread.sleep(5000)
```

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Manually test and update some unit tests.

Closes #28245 from Ngone51/fix_barrier.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-04-21 10:12:56 +00:00
zhengruifeng e7bc6f38b9 [SPARK-31494][ML] flatten the result dataframe of ANOVATest
### What changes were proposed in this pull request?
add a new method `def test(dataset: DataFrame, featuresCol: String, labelCol: String, flatten: Boolean): DataFrame`

### Why are the changes needed?
Similar to new `test` method in `ChiSquareTest`, it will:
1, support df operation on the returned df;
2, make driver no longer a bottleneck with large numFeatures

### Does this PR introduce any user-facing change?
Yes, new method added

### How was this patch tested?
existing testsuites

Closes #28270 from zhengruifeng/flatten_anova.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
2020-04-21 12:43:14 +08:00
Onur Satici ad965103a5
[SPARK-30949][K8S][CORE] Decouple requests and parallelism on drivers in K8s
### What changes were proposed in this pull request?
`spark.driver.cores` configuration is used to set the amount of parallelism in kubernetes cluster mode drivers. Previously the amount of parallelism in the drivers were the number of cores in the host when running on JDK 8u120 or older, or the maximum of driver containers resource requests and limits when running on [JDK 8u121 or newer](https://bugs.openjdk.java.net/browse/JDK-8173345). This will enable users to specify `spark.driver.cores` to set parallelism, and specify `spark.kubernetes.driver.requests.cores` to limit the resource requests of the driver container, effectively decoupling the two

### Why are the changes needed?
Drivers submitted in kubernetes cluster mode set the parallelism of various components like `RpcEnv`, `MemoryManager`, `BlockManager` from inferring the number of available cores by calling `Runtime.getRuntime().availableProcessors()`. By using this, spark applications running on JDK 8u120 or older incorrectly get the total number of cores in the host, [ignoring the cgroup limits set by kubernetes](https://bugs.openjdk.java.net/browse/JDK-6515172). JDK 8u121 and newer runtimes do not have this problem.

Orthogonal to this, it is currently not possible to decouple resource limits on the driver container with the amount of parallelism of the various network and memory components listed above.

### Does this PR introduce any user-facing change?
Yes. Previously the amount of parallelism in kubernetes cluster mode submitted drivers were the number of cores in the host when running on JDK 8u120 or older, or the maximum of driver containers resource requests and limits when running on JDK 8u121 or newer. Now the value of `spark.driver.cores` is used.

### How was this patch tested?
happy to add tests if my proposal looks reasonable

Closes #27695 from onursatici/os/decouple-requests-and-parallelism.

Authored-by: Onur Satici <onursatici@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-04-20 21:32:43 -07:00
Maryann Xue ae29cf24fc [SPARK-31501][SQL] AQE update UI should not cause deadlock
### What changes were proposed in this pull request?

This PR makes sure that AQE does not call update UI if the current execution ID does not match the current query. This PR also includes a minor refactoring that moves `getOrCloneSessionWithAqeOff` from `QueryExecution` to `AdaptiveSparkPlanHelper` since that function is not used by `QueryExecution` any more.

### Why are the changes needed?

Without this fix, there could be a potential deadlock.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added UT.

Closes #28275 from maryannxue/aqe-ui-deadlock.

Authored-by: Maryann Xue <maryann.xue@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-04-21 03:56:42 +00:00
zhengruifeng 32259c9733 [SPARK-31492][ML] flatten the result dataframe of FValueTest
### What changes were proposed in this pull request?
add a new method  `def test(dataset: DataFrame, featuresCol: String, labelCol: String, flatten: Boolean): DataFrame`

### Why are the changes needed?

Similar to new test method in ChiSquareTest, it will:
1, support df operation on the returned df;
2, make driver no longer a bottleneck with large `numFeatures`

### Does this PR introduce any user-facing change?
Yes, add a new method

### How was this patch tested?
existing testsuites

Closes #28268 from zhengruifeng/flatten_fvalue.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
2020-04-21 11:09:05 +08:00