[SPARK-32057][SQL][TEST-HIVE1.2][TEST-HADOOP2.7] ExecuteStatement: cancel and close should not transiently ERROR

### What changes were proposed in this pull request?
#28671 introduced a change where the order in which CANCELED state for SparkExecuteStatementOperation is set was changed. Before setting the state to CANCELED, `cleanup()` was called which kills the jobs, causing an exception to be thrown inside `execute()`. This causes the state to transiently become ERROR before being set to CANCELED. This PR fixes the order.

### Why are the changes needed?
Bug: wrong operation state is set.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit test in SparkExecuteStatementOperationSuite.scala.

Closes #28912 from alismess-db/execute-statement-operation-cleanup-order.

Authored-by: Ali Smesseim <ali.smesseim@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
Ali Smesseim 2020-07-08 09:28:16 +09:00 committed by HyukjinKwon
parent 1261fac674
commit 8b0a54e6ff
4 changed files with 83 additions and 3 deletions

View file

@ -476,6 +476,7 @@ object SparkParallelTestGrouping {
"org.apache.spark.ml.classification.LogisticRegressionSuite",
"org.apache.spark.ml.classification.LinearSVCSuite",
"org.apache.spark.sql.SQLQueryTestSuite",
"org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperationSuite",
"org.apache.spark.sql.hive.thriftserver.ThriftServerQueryTestSuite",
"org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite",
"org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite",

View file

@ -333,8 +333,8 @@ private[hive] class SparkExecuteStatementOperation(
synchronized {
if (!getStatus.getState.isTerminal) {
logInfo(s"Cancel query with $statementId")
cleanup()
setState(OperationState.CANCELED)
cleanup()
HiveThriftServer2.eventManager.onStatementCanceled(statementId)
}
}

View file

@ -46,8 +46,8 @@ private[hive] trait SparkOperation extends Operation with Logging {
}
abstract override def close(): Unit = {
cleanup()
super.close()
cleanup()
logInfo(s"Close statement with $statementId")
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}

View file

@ -17,10 +17,25 @@
package org.apache.spark.sql.hive.thriftserver
import java.util
import java.util.concurrent.Semaphore
import scala.concurrent.duration._
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hive.service.cli.OperationState
import org.apache.hive.service.cli.session.{HiveSession, HiveSessionImpl}
import org.mockito.Mockito.{doReturn, mock, spy, when, RETURNS_DEEP_STUBS}
import org.mockito.invocation.InvocationOnMock
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2EventManager
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, NullType, StringType, StructField, StructType}
class SparkExecuteStatementOperationSuite extends SparkFunSuite {
class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSparkSession {
test("SPARK-17112 `select null` via JDBC triggers IllegalArgumentException in ThriftServer") {
val field1 = StructField("NULL", NullType)
val field2 = StructField("(IF(true, NULL, NULL))", NullType)
@ -42,4 +57,68 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite {
assert(columns.get(1).getType().getName == "INT")
assert(columns.get(1).getComment() == "")
}
Seq(
(OperationState.CANCELED, (_: SparkExecuteStatementOperation).cancel()),
(OperationState.CLOSED, (_: SparkExecuteStatementOperation).close())
).foreach { case (finalState, transition) =>
test("SPARK-32057 SparkExecuteStatementOperation should not transiently become ERROR " +
s"before being set to $finalState") {
val hiveSession = new HiveSessionImpl(ThriftserverShimUtils.testedProtocolVersions.head,
"username", "password", new HiveConf, "ip address")
hiveSession.open(new util.HashMap)
HiveThriftServer2.eventManager = mock(classOf[HiveThriftServer2EventManager])
val spySqlContext = spy(sqlContext)
// When cancel() is called on the operation, cleanup causes an exception to be thrown inside
// of execute(). This should not cause the state to become ERROR. The exception here will be
// triggered in our custom cleanup().
val signal = new Semaphore(0)
val dataFrame = mock(classOf[DataFrame], RETURNS_DEEP_STUBS)
when(dataFrame.collect()).thenAnswer((_: InvocationOnMock) => {
signal.acquire()
throw new RuntimeException("Operation was cancelled by test cleanup.")
})
val statement = "stmt"
doReturn(dataFrame, Nil: _*).when(spySqlContext).sql(statement)
val executeStatementOperation = new MySparkExecuteStatementOperation(spySqlContext,
hiveSession, statement, signal, finalState)
val run = new Thread() {
override def run(): Unit = executeStatementOperation.runInternal()
}
assert(executeStatementOperation.getStatus.getState === OperationState.INITIALIZED)
run.start()
eventually(timeout(5.seconds)) {
assert(executeStatementOperation.getStatus.getState === OperationState.RUNNING)
}
transition(executeStatementOperation)
run.join()
assert(executeStatementOperation.getStatus.getState === finalState)
}
}
private class MySparkExecuteStatementOperation(
sqlContext: SQLContext,
hiveSession: HiveSession,
statement: String,
signal: Semaphore,
finalState: OperationState)
extends SparkExecuteStatementOperation(sqlContext, hiveSession, statement,
new util.HashMap, false) {
override def cleanup(): Unit = {
super.cleanup()
signal.release()
// At this point, operation should already be in finalState (set by either close() or
// cancel()). We want to check if it stays in finalState after the exception thrown by
// releasing the semaphore propagates. We hence need to sleep for a short while.
Thread.sleep(1000)
// State should not be ERROR
assert(getStatus.getState === finalState)
}
}
}