From 8b0a54e6ffadd6326fd254f67ebeb1673eb8f33b Mon Sep 17 00:00:00 2001 From: Ali Smesseim Date: Wed, 8 Jul 2020 09:28:16 +0900 Subject: [PATCH] [SPARK-32057][SQL][TEST-HIVE1.2][TEST-HADOOP2.7] ExecuteStatement: cancel and close should not transiently ERROR ### What changes were proposed in this pull request? #28671 introduced a change where the order in which CANCELED state for SparkExecuteStatementOperation is set was changed. Before setting the state to CANCELED, `cleanup()` was called which kills the jobs, causing an exception to be thrown inside `execute()`. This causes the state to transiently become ERROR before being set to CANCELED. This PR fixes the order. ### Why are the changes needed? Bug: wrong operation state is set. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test in SparkExecuteStatementOperationSuite.scala. Closes #28912 from alismess-db/execute-statement-operation-cleanup-order. Authored-by: Ali Smesseim Signed-off-by: HyukjinKwon --- project/SparkBuild.scala | 1 + .../SparkExecuteStatementOperation.scala | 2 +- .../hive/thriftserver/SparkOperation.scala | 2 +- .../SparkExecuteStatementOperationSuite.scala | 81 ++++++++++++++++++- 4 files changed, 83 insertions(+), 3 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 04a3fc4b63..60c54dfc98 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -476,6 +476,7 @@ object SparkParallelTestGrouping { "org.apache.spark.ml.classification.LogisticRegressionSuite", "org.apache.spark.ml.classification.LinearSVCSuite", "org.apache.spark.sql.SQLQueryTestSuite", + "org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperationSuite", "org.apache.spark.sql.hive.thriftserver.ThriftServerQueryTestSuite", "org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite", "org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite", diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index eae5d5d4bc..57ed15a76a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -333,8 +333,8 @@ private[hive] class SparkExecuteStatementOperation( synchronized { if (!getStatus.getState.isTerminal) { logInfo(s"Cancel query with $statementId") - cleanup() setState(OperationState.CANCELED) + cleanup() HiveThriftServer2.eventManager.onStatementCanceled(statementId) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala index 0acd1b3e98..446669d08e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala @@ -46,8 +46,8 @@ private[hive] trait SparkOperation extends Operation with Logging { } abstract override def close(): Unit = { - cleanup() super.close() + cleanup() logInfo(s"Close statement with $statementId") HiveThriftServer2.eventManager.onOperationClosed(statementId) } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala index 13df3fabc4..4c2f29e0bf 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala @@ -17,10 +17,25 @@ package org.apache.spark.sql.hive.thriftserver +import java.util +import java.util.concurrent.Semaphore + +import scala.concurrent.duration._ + +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hive.service.cli.OperationState +import org.apache.hive.service.cli.session.{HiveSession, HiveSessionImpl} +import org.mockito.Mockito.{doReturn, mock, spy, when, RETURNS_DEEP_STUBS} +import org.mockito.invocation.InvocationOnMock + import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2EventManager +import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, NullType, StringType, StructField, StructType} -class SparkExecuteStatementOperationSuite extends SparkFunSuite { +class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSparkSession { + test("SPARK-17112 `select null` via JDBC triggers IllegalArgumentException in ThriftServer") { val field1 = StructField("NULL", NullType) val field2 = StructField("(IF(true, NULL, NULL))", NullType) @@ -42,4 +57,68 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite { assert(columns.get(1).getType().getName == "INT") assert(columns.get(1).getComment() == "") } + + Seq( + (OperationState.CANCELED, (_: SparkExecuteStatementOperation).cancel()), + (OperationState.CLOSED, (_: SparkExecuteStatementOperation).close()) + ).foreach { case (finalState, transition) => + test("SPARK-32057 SparkExecuteStatementOperation should not transiently become ERROR " + + s"before being set to $finalState") { + val hiveSession = new HiveSessionImpl(ThriftserverShimUtils.testedProtocolVersions.head, + "username", "password", new HiveConf, "ip address") + hiveSession.open(new util.HashMap) + + HiveThriftServer2.eventManager = mock(classOf[HiveThriftServer2EventManager]) + + val spySqlContext = spy(sqlContext) + + // When cancel() is called on the operation, cleanup causes an exception to be thrown inside + // of execute(). This should not cause the state to become ERROR. The exception here will be + // triggered in our custom cleanup(). + val signal = new Semaphore(0) + val dataFrame = mock(classOf[DataFrame], RETURNS_DEEP_STUBS) + when(dataFrame.collect()).thenAnswer((_: InvocationOnMock) => { + signal.acquire() + throw new RuntimeException("Operation was cancelled by test cleanup.") + }) + val statement = "stmt" + doReturn(dataFrame, Nil: _*).when(spySqlContext).sql(statement) + + val executeStatementOperation = new MySparkExecuteStatementOperation(spySqlContext, + hiveSession, statement, signal, finalState) + + val run = new Thread() { + override def run(): Unit = executeStatementOperation.runInternal() + } + assert(executeStatementOperation.getStatus.getState === OperationState.INITIALIZED) + run.start() + eventually(timeout(5.seconds)) { + assert(executeStatementOperation.getStatus.getState === OperationState.RUNNING) + } + transition(executeStatementOperation) + run.join() + assert(executeStatementOperation.getStatus.getState === finalState) + } + } + + private class MySparkExecuteStatementOperation( + sqlContext: SQLContext, + hiveSession: HiveSession, + statement: String, + signal: Semaphore, + finalState: OperationState) + extends SparkExecuteStatementOperation(sqlContext, hiveSession, statement, + new util.HashMap, false) { + + override def cleanup(): Unit = { + super.cleanup() + signal.release() + // At this point, operation should already be in finalState (set by either close() or + // cancel()). We want to check if it stays in finalState after the exception thrown by + // releasing the semaphore propagates. We hence need to sleep for a short while. + Thread.sleep(1000) + // State should not be ERROR + assert(getStatus.getState === finalState) + } + } }