[SPARK-36414][SQL] Disable timeout for BroadcastQueryStageExec in AQE

### What changes were proposed in this pull request?

This reverts SPARK-31475, as there are always more concurrent jobs running in AQE mode, especially when running multiple queries at the same time. Currently, the broadcast timeout does not record accurately for the BroadcastQueryStageExec only, but also including the time waiting for being scheduled. If all the resources are currently being occupied for materializing other stages, it timeouts without a chance to run actually.

 

![image](https://user-images.githubusercontent.com/8326978/128169612-4c96c8f6-6f8e-48ed-8eaf-450f87982c3b.png)

 

The default value is 300s, and it's hard to adjust the timeout for AQE mode. Usually, you need an extremely large number for real-world cases. As you can see in the example, above, the timeout we used for it was 1800s, and obviously, it needed 3x more or something

 

### Why are the changes needed?

AQE is default now, we can make it more stable with this PR

### Does this PR introduce _any_ user-facing change?

yes, broadcast timeout now is not used for AQE

### How was this patch tested?

modified test

Closes #33636 from yaooqinn/SPARK-36414.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Kent Yao 2021-08-05 21:15:35 +08:00 committed by Wenchen Fan
parent 095f9ff75b
commit 0c94e47aec
2 changed files with 8 additions and 28 deletions

View file

@ -17,10 +17,9 @@
package org.apache.spark.sql.execution.adaptive
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{Future, Promise}
import scala.concurrent.Future
import org.apache.spark.{FutureAction, MapOutputStatistics}
import org.apache.spark.broadcast.Broadcast
@ -29,11 +28,9 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.ThreadUtils
/**
* A query stage is an independent subgraph of the query plan. Query stage materializes its output
@ -221,22 +218,8 @@ case class BroadcastQueryStageExec(
throw new IllegalStateException(s"wrong plan for broadcast stage:\n ${plan.treeString}")
}
@transient private lazy val materializeWithTimeout = {
val broadcastFuture = broadcast.submitBroadcastJob
val timeout = conf.broadcastTimeout
val promise = Promise[Any]()
val fail = BroadcastQueryStageExec.scheduledExecutor.schedule(new Runnable() {
override def run(): Unit = {
promise.tryFailure(QueryExecutionErrors.executeBroadcastTimeoutError(timeout, None))
}
}, timeout, TimeUnit.SECONDS)
broadcastFuture.onComplete(_ => fail.cancel(false))(AdaptiveSparkPlanExec.executionContext)
Future.firstCompletedOf(
Seq(broadcastFuture, promise.future))(AdaptiveSparkPlanExec.executionContext)
}
override def doMaterialize(): Future[Any] = {
materializeWithTimeout
broadcast.submitBroadcastJob
}
override def newReuseInstance(newStageId: Int, newOutput: Seq[Attribute]): QueryStageExec = {
@ -257,8 +240,3 @@ case class BroadcastQueryStageExec(
override def getRuntimeStatistics: Statistics = broadcast.runtimeStatistics
}
object BroadcastQueryStageExec {
private val scheduledExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("BroadcastStageTimeout")
}

View file

@ -415,17 +415,19 @@ abstract class BroadcastJoinSuiteBase extends QueryTest with SQLTestUtils
test("Broadcast timeout") {
val timeout = 5
val slowUDF = udf({ x: Int => Thread.sleep(timeout * 10 * 1000); x })
val slowUDF = udf({ x: Int => Thread.sleep(timeout * 1000); x })
val df1 = spark.range(10).select($"id" as 'a)
val df2 = spark.range(5).select(slowUDF($"id") as 'a)
val testDf = df1.join(broadcast(df2), "a")
withSQLConf(SQLConf.BROADCAST_TIMEOUT.key -> timeout.toString) {
if (!conf.adaptiveExecutionEnabled) {
val e = intercept[Exception] {
testDf.collect()
}
assert(e.getMessage.contains(s"Could not execute broadcast in $timeout secs."))
}
}
}
test("broadcast join where streamed side's output partitioning is HashPartitioning") {
withTable("t1", "t3") {