[SPARK-31926][SQL][TESTS][FOLLOWUP][TEST-HIVE1.2][TEST-MAVEN] Fix concurrency issue for ThriftCLIService to getPortNumber

### What changes were proposed in this pull request?

This PR brings https://github.com/apache/spark/pull/28751 back

- It once reverted by 4a25200 because of inevitable maven test failure
    - See related updates in this followup a0187cd6b5

- And reverted again because of the flakiness of the added unit tests
   - In this PR, The flakiness reason found is caused by the hive metastore connection that the SparkSQLCLIService trying to create which turns out is unnecessary at all. This metastore client points to a dummy metastore server only.
   - Also, add some cleanups for SharedThriftServer trait in before and after to prevent its configurations being polluted or polluting others

### Why are the changes needed?

fix flaky test

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

passing sbt and maven tests

Closes #28835 from yaooqinn/SPARK-31926-F.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Kent Yao 2020-06-19 05:58:54 +00:00 committed by Wenchen Fan
parent 86b54f3321
commit abc8ccc37b
12 changed files with 224 additions and 36 deletions

View file

@ -480,7 +480,6 @@ object SparkParallelTestGrouping {
"org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite",
"org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite",
"org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2ListenerSuite",
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite",
"org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite"
)

View file

@ -22,13 +22,14 @@ import java.util.{List => JList}
import javax.security.auth.login.LoginException
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import org.apache.commons.logging.Log
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.shims.Utils
import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation}
import org.apache.hive.service.{AbstractService, Service, ServiceException}
import org.apache.hive.service.{AbstractService, CompositeService, Service, ServiceException}
import org.apache.hive.service.Service.STATE
import org.apache.hive.service.auth.HiveAuthFactory
import org.apache.hive.service.cli._
@ -94,6 +95,12 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC
initCompositeService(hiveConf)
}
/**
* the super class [[CLIService#start]] starts a useless dummy metastore client, skip it and call
* the ancestor [[CompositeService#start]] directly.
*/
override def start(): Unit = startCompositeService()
override def getInfo(sessionHandle: SessionHandle, getInfoType: GetInfoType): GetInfoValue = {
getInfoType match {
case GetInfoType.CLI_SERVER_NAME => new GetInfoValue("Spark SQL")
@ -105,6 +112,19 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC
}
private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
private val logInfo = (msg: String) => if (HiveUtils.isHive23) {
getAncestorField[Logger](this, 3, "LOG").info(msg)
} else {
getAncestorField[Log](this, 3, "LOG").info(msg)
}
private val logError = (msg: String, e: Throwable) => if (HiveUtils.isHive23) {
getAncestorField[Logger](this, 3, "LOG").error(msg, e)
} else {
getAncestorField[Log](this, 3, "LOG").error(msg, e)
}
def initCompositeService(hiveConf: HiveConf): Unit = {
// Emulating `CompositeService.init(hiveConf)`
val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList")
@ -114,10 +134,30 @@ private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED)
setAncestorField(this, 3, "hiveConf", hiveConf)
invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.INITED)
if (HiveUtils.isHive23) {
getAncestorField[Logger](this, 3, "LOG").info(s"Service: $getName is inited.")
} else {
getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.")
logInfo(s"Service: $getName is inited.")
}
def startCompositeService(): Unit = {
// Emulating `CompositeService.start`
val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList")
var serviceStartCount = 0
try {
serviceList.asScala.foreach { service =>
service.start()
serviceStartCount += 1
}
// Emulating `AbstractService.start`
val startTime = new java.lang.Long(System.currentTimeMillis())
setAncestorField(this, 3, "startTime", startTime)
invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.INITED)
invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.STARTED)
logInfo(s"Service: $getName is started.")
} catch {
case NonFatal(e) =>
logError(s"Error starting services $getName", e)
invoke(classOf[CompositeService], this, "stop",
classOf[Int] -> new Integer(serviceStartCount))
throw new ServiceException("Failed to Start " + getName, e)
}
}
}

View file

@ -0,0 +1,65 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the file hive-thriftserver/target/unit-tests.log
log4j.rootLogger=DEBUG, CA, FA
#Console Appender
log4j.appender.CA=org.apache.log4j.ConsoleAppender
log4j.appender.CA.layout=org.apache.log4j.PatternLayout
log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
log4j.appender.CA.Threshold = WARN
#File Appender
log4j.appender.FA=org.apache.log4j.FileAppender
log4j.appender.FA.append=false
log4j.appender.FA.file=target/unit-tests.log
log4j.appender.FA.layout=org.apache.log4j.PatternLayout
log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n
# Set the logger level of File Appender to WARN
log4j.appender.FA.Threshold = DEBUG
# Some packages are noisy for no good reason.
log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF
log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF
log4j.additivity.hive.log=false
log4j.logger.hive.log=OFF
log4j.additivity.parquet.hadoop.ParquetRecordReader=false
log4j.logger.parquet.hadoop.ParquetRecordReader=OFF
log4j.additivity.org.apache.parquet.hadoop.ParquetRecordReader=false
log4j.logger.org.apache.parquet.hadoop.ParquetRecordReader=OFF
log4j.additivity.org.apache.parquet.hadoop.ParquetOutputCommitter=false
log4j.logger.org.apache.parquet.hadoop.ParquetOutputCommitter=OFF
log4j.additivity.hive.ql.metadata.Hive=false
log4j.logger.hive.ql.metadata.Hive=OFF
log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false
log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR
# Parquet related logging
log4j.logger.org.apache.parquet.CorruptStatistics=ERROR
log4j.logger.parquet.CorruptStatistics=ERROR

View file

@ -24,6 +24,8 @@ import scala.concurrent.duration._
import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hive.service.cli.thrift.ThriftCLIService
import org.apache.spark.sql.test.SharedSparkSession
@ -33,6 +35,8 @@ trait SharedThriftServer extends SharedSparkSession {
private var hiveServer2: HiveThriftServer2 = _
private var serverPort: Int = 0
def mode: ServerMode.Value
override def beforeAll(): Unit = {
super.beforeAll()
// Retries up to 3 times with different port numbers if the server fails to start
@ -47,17 +51,27 @@ trait SharedThriftServer extends SharedSparkSession {
override def afterAll(): Unit = {
try {
if (hiveServer2 != null) {
hiveServer2.stop()
}
} finally {
super.afterAll()
SessionState.detachSession()
Hive.closeCurrent()
}
}
protected def jdbcUri: String = if (mode == ServerMode.http) {
s"jdbc:hive2://localhost:$serverPort/default;transportMode=http;httpPath=cliservice"
} else {
s"jdbc:hive2://localhost:$serverPort/"
}
protected def withJdbcStatement(fs: (Statement => Unit)*): Unit = {
val user = System.getProperty("user.name")
require(serverPort != 0, "Failed to bind an actual port for HiveThriftServer2")
val connections =
fs.map { _ => DriverManager.getConnection(s"jdbc:hive2://localhost:$serverPort", user, "") }
fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") }
val statements = connections.map(_.createStatement())
try {
@ -69,16 +83,21 @@ trait SharedThriftServer extends SharedSparkSession {
}
private def startThriftServer(attempt: Int): Unit = {
logInfo(s"Trying to start HiveThriftServer2:, attempt=$attempt")
logInfo(s"Trying to start HiveThriftServer2: mode=$mode, attempt=$attempt")
val sqlContext = spark.newSession().sqlContext
// Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any free port to use.
// Set the HIVE_SERVER2_THRIFT_PORT and HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could
// randomly pick any free port to use.
// It's much more robust than set a random port generated by ourselves ahead
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0")
sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, mode.toString)
try {
hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
hiveServer2.getServices.asScala.foreach {
case t: ThriftCLIService if t.getPortNumber != 0 =>
case t: ThriftCLIService =>
serverPort = t.getPortNumber
logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt")
logInfo(s"Started HiveThriftServer2: mode=$mode, port=$serverPort, attempt=$attempt")
case _ =>
}
@ -87,5 +106,16 @@ trait SharedThriftServer extends SharedSparkSession {
eventually(timeout(30.seconds), interval(1.seconds)) {
withJdbcStatement { _.execute("SELECT 1") }
}
} catch {
case e: Exception =>
logError("Error start hive server with Context ", e)
if (hiveServer2 != null) {
hiveServer2.stop()
hiveServer2 = null
}
SessionState.detachSession()
Hive.closeCurrent()
throw e
}
}
}

View file

@ -54,6 +54,9 @@ import org.apache.spark.sql.types._
*/
class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServer {
override def mode: ServerMode.Value = ServerMode.binary
override protected def testFile(fileName: String): String = {
val url = Thread.currentThread().getContextClassLoader.getResource(fileName)
// Copy to avoid URISyntaxException during accessing the resources in `sql/core`

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.hive.thriftserver
class ThriftServerWithSparkContextSuite extends SharedThriftServer {
trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
test("SPARK-29911: Uncache cached tables when session closed") {
val cacheManager = spark.sharedState.cacheManager
@ -42,3 +42,12 @@ class ThriftServerWithSparkContextSuite extends SharedThriftServer {
}
}
}
class ThriftServerWithSparkContextInBinarySuite extends ThriftServerWithSparkContextSuite {
override def mode: ServerMode.Value = ServerMode.binary
}
class ThriftServerWithSparkContextInHttpSuite extends ThriftServerWithSparkContextSuite {
override def mode: ServerMode.Value = ServerMode.http
}

View file

@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
@ -45,7 +46,7 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
}
@Override
public void run() {
protected void initializeServer() {
try {
// Server thread pool
String threadPoolName = "HiveServer2-Handler-Pool";
@ -100,6 +101,14 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
+ serverSocket.getServerSocket().getLocalPort() + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
LOG.info(msg);
} catch (Exception t) {
throw new ServiceException("Error initializing " + getName(), t);
}
}
@Override
public void run() {
try {
server.serve();
} catch (Throwable t) {
LOG.fatal(

View file

@ -175,6 +175,7 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
public synchronized void start() {
super.start();
if (!isStarted && !isEmbedded) {
initializeServer();
new Thread(this).start();
isStarted = true;
}
@ -633,6 +634,8 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
return resp;
}
protected abstract void initializeServer();
@Override
public abstract void run();

View file

@ -28,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.cli.thrift.TCLIService.Iface;
@ -53,13 +54,8 @@ public class ThriftHttpCLIService extends ThriftCLIService {
super(cliService, ThriftHttpCLIService.class.getSimpleName());
}
/**
* Configure Jetty to serve http requests. Example of a client connection URL:
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
* e.g. http://gateway:port/hive2/servlets/thrifths2/
*/
@Override
public void run() {
protected void initializeServer() {
try {
// Server thread pool
// Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests
@ -150,6 +146,19 @@ public class ThriftHttpCLIService extends ThriftCLIService {
+ " mode on port " + connector.getLocalPort()+ " path=" + httpPath + " with " + minWorkerThreads + "..."
+ maxWorkerThreads + " worker threads";
LOG.info(msg);
} catch (Exception t) {
throw new ServiceException("Error initializing " + getName(), t);
}
}
/**
* Configure Jetty to serve http requests. Example of a client connection URL:
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
* e.g. http://gateway:port/hive2/servlets/thrifths2/
*/
@Override
public void run() {
try {
httpServer.join();
} catch (Throwable t) {
LOG.fatal(

View file

@ -29,6 +29,7 @@ import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
@ -46,7 +47,7 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
}
@Override
public void run() {
protected void initializeServer() {
try {
// Server thread pool
String threadPoolName = "HiveServer2-Handler-Pool";
@ -101,6 +102,14 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
+ portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
LOG.info(msg);
} catch (Exception t) {
throw new ServiceException("Error initializing " + getName(), t);
}
}
@Override
public void run() {
try {
server.serve();
} catch (Throwable t) {
LOG.error(

View file

@ -176,6 +176,7 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
public synchronized void start() {
super.start();
if (!isStarted && !isEmbedded) {
initializeServer();
new Thread(this).start();
isStarted = true;
}
@ -670,6 +671,8 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
return resp;
}
protected abstract void initializeServer();
@Override
public abstract void run();

View file

@ -28,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.rpc.thrift.TCLIService;
@ -54,13 +55,8 @@ public class ThriftHttpCLIService extends ThriftCLIService {
super(cliService, ThriftHttpCLIService.class.getSimpleName());
}
/**
* Configure Jetty to serve http requests. Example of a client connection URL:
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
* e.g. http://gateway:port/hive2/servlets/thrifths2/
*/
@Override
public void run() {
protected void initializeServer() {
try {
// Server thread pool
// Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests
@ -151,6 +147,19 @@ public class ThriftHttpCLIService extends ThriftCLIService {
+ " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..."
+ maxWorkerThreads + " worker threads";
LOG.info(msg);
} catch (Exception t) {
throw new ServiceException("Error initializing " + getName(), t);
}
}
/**
* Configure Jetty to serve http requests. Example of a client connection URL:
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
* e.g. http://gateway:port/hive2/servlets/thrifths2/
*/
@Override
public void run() {
try {
httpServer.join();
} catch (Throwable t) {
LOG.error(