From 0c94e47aecab0a8c346e1a004686d1496a9f2b07 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 5 Aug 2021 21:15:35 +0800 Subject: [PATCH] [SPARK-36414][SQL] Disable timeout for BroadcastQueryStageExec in AQE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? This reverts SPARK-31475, as there are always more concurrent jobs running in AQE mode, especially when running multiple queries at the same time. Currently, the broadcast timeout does not record accurately for the BroadcastQueryStageExec only, but also including the time waiting for being scheduled. If all the resources are currently being occupied for materializing other stages, it timeouts without a chance to run actually.   ![image](https://user-images.githubusercontent.com/8326978/128169612-4c96c8f6-6f8e-48ed-8eaf-450f87982c3b.png)   The default value is 300s, and it's hard to adjust the timeout for AQE mode. Usually, you need an extremely large number for real-world cases. As you can see in the example, above, the timeout we used for it was 1800s, and obviously, it needed 3x more or something   ### Why are the changes needed? AQE is default now, we can make it more stable with this PR ### Does this PR introduce _any_ user-facing change? yes, broadcast timeout now is not used for AQE ### How was this patch tested? modified test Closes #33636 from yaooqinn/SPARK-36414. Authored-by: Kent Yao Signed-off-by: Wenchen Fan --- .../execution/adaptive/QueryStageExec.scala | 26 ++----------------- .../execution/joins/BroadcastJoinSuite.scala | 10 ++++--- 2 files changed, 8 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index f308829f66..e2f763eb71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -17,10 +17,9 @@ package org.apache.spark.sql.execution.adaptive -import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference -import scala.concurrent.{Future, Promise} +import scala.concurrent.Future import org.apache.spark.{FutureAction, MapOutputStatistics} import org.apache.spark.broadcast.Broadcast @@ -29,11 +28,9 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.ThreadUtils /** * A query stage is an independent subgraph of the query plan. Query stage materializes its output @@ -221,22 +218,8 @@ case class BroadcastQueryStageExec( throw new IllegalStateException(s"wrong plan for broadcast stage:\n ${plan.treeString}") } - @transient private lazy val materializeWithTimeout = { - val broadcastFuture = broadcast.submitBroadcastJob - val timeout = conf.broadcastTimeout - val promise = Promise[Any]() - val fail = BroadcastQueryStageExec.scheduledExecutor.schedule(new Runnable() { - override def run(): Unit = { - promise.tryFailure(QueryExecutionErrors.executeBroadcastTimeoutError(timeout, None)) - } - }, timeout, TimeUnit.SECONDS) - broadcastFuture.onComplete(_ => fail.cancel(false))(AdaptiveSparkPlanExec.executionContext) - Future.firstCompletedOf( - Seq(broadcastFuture, promise.future))(AdaptiveSparkPlanExec.executionContext) - } - override def doMaterialize(): Future[Any] = { - materializeWithTimeout + broadcast.submitBroadcastJob } override def newReuseInstance(newStageId: Int, newOutput: Seq[Attribute]): QueryStageExec = { @@ -257,8 +240,3 @@ case class BroadcastQueryStageExec( override def getRuntimeStatistics: Statistics = broadcast.runtimeStatistics } - -object BroadcastQueryStageExec { - private val scheduledExecutor = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("BroadcastStageTimeout") -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala index 83163cfb42..dd6a41219d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala @@ -415,15 +415,17 @@ abstract class BroadcastJoinSuiteBase extends QueryTest with SQLTestUtils test("Broadcast timeout") { val timeout = 5 - val slowUDF = udf({ x: Int => Thread.sleep(timeout * 10 * 1000); x }) + val slowUDF = udf({ x: Int => Thread.sleep(timeout * 1000); x }) val df1 = spark.range(10).select($"id" as 'a) val df2 = spark.range(5).select(slowUDF($"id") as 'a) val testDf = df1.join(broadcast(df2), "a") withSQLConf(SQLConf.BROADCAST_TIMEOUT.key -> timeout.toString) { - val e = intercept[Exception] { - testDf.collect() + if (!conf.adaptiveExecutionEnabled) { + val e = intercept[Exception] { + testDf.collect() + } + assert(e.getMessage.contains(s"Could not execute broadcast in $timeout secs.")) } - assert(e.getMessage.contains(s"Could not execute broadcast in $timeout secs.")) } }