[SPARK-31926][SQL][TEST-HIVE1.2] Fix concurrency issue for ThriftCLIService to getPortNumber
### What changes were proposed in this pull request? When` org.apache.spark.sql.hive.thriftserver.HiveThriftServer2#startWithContext` called, it starts `ThriftCLIService` in the background with a new Thread, at the same time we call `ThriftCLIService.getPortNumber,` we might not get the bound port if it's configured with 0. This PR moves the TServer/HttpServer initialization code out of that new Thread. ### Why are the changes needed? Fix concurrency issue, improve test robustness. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? add new tests Closes #28751 from yaooqinn/SPARK-31926. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
38873d5196
commit
02f32cfae4
|
@ -480,7 +480,8 @@ object SparkParallelTestGrouping {
|
||||||
"org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite",
|
"org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite",
|
||||||
"org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite",
|
"org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite",
|
||||||
"org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2ListenerSuite",
|
"org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2ListenerSuite",
|
||||||
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite",
|
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInHttpSuite",
|
||||||
|
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInBinarySuite",
|
||||||
"org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite"
|
"org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,8 @@ trait SharedThriftServer extends SharedSparkSession {
|
||||||
private var hiveServer2: HiveThriftServer2 = _
|
private var hiveServer2: HiveThriftServer2 = _
|
||||||
private var serverPort: Int = 0
|
private var serverPort: Int = 0
|
||||||
|
|
||||||
|
def mode: ServerMode.Value
|
||||||
|
|
||||||
override def beforeAll(): Unit = {
|
override def beforeAll(): Unit = {
|
||||||
super.beforeAll()
|
super.beforeAll()
|
||||||
// Retries up to 3 times with different port numbers if the server fails to start
|
// Retries up to 3 times with different port numbers if the server fails to start
|
||||||
|
@ -53,11 +55,17 @@ trait SharedThriftServer extends SharedSparkSession {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected def jdbcUri: String = if (mode == ServerMode.http) {
|
||||||
|
s"jdbc:hive2://localhost:$serverPort/default;transportMode=http;httpPath=cliservice"
|
||||||
|
} else {
|
||||||
|
s"jdbc:hive2://localhost:$serverPort"
|
||||||
|
}
|
||||||
|
|
||||||
protected def withJdbcStatement(fs: (Statement => Unit)*): Unit = {
|
protected def withJdbcStatement(fs: (Statement => Unit)*): Unit = {
|
||||||
val user = System.getProperty("user.name")
|
val user = System.getProperty("user.name")
|
||||||
require(serverPort != 0, "Failed to bind an actual port for HiveThriftServer2")
|
require(serverPort != 0, "Failed to bind an actual port for HiveThriftServer2")
|
||||||
val connections =
|
val connections =
|
||||||
fs.map { _ => DriverManager.getConnection(s"jdbc:hive2://localhost:$serverPort", user, "") }
|
fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") }
|
||||||
val statements = connections.map(_.createStatement())
|
val statements = connections.map(_.createStatement())
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -71,21 +79,33 @@ trait SharedThriftServer extends SharedSparkSession {
|
||||||
private def startThriftServer(attempt: Int): Unit = {
|
private def startThriftServer(attempt: Int): Unit = {
|
||||||
logInfo(s"Trying to start HiveThriftServer2:, attempt=$attempt")
|
logInfo(s"Trying to start HiveThriftServer2:, attempt=$attempt")
|
||||||
val sqlContext = spark.newSession().sqlContext
|
val sqlContext = spark.newSession().sqlContext
|
||||||
// Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any free port to use.
|
// Set the HIVE_SERVER2_THRIFT_PORT and HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could
|
||||||
|
// randomly pick any free port to use.
|
||||||
// It's much more robust than set a random port generated by ourselves ahead
|
// It's much more robust than set a random port generated by ourselves ahead
|
||||||
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
|
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
|
||||||
hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
|
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0")
|
||||||
hiveServer2.getServices.asScala.foreach {
|
sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, mode.toString)
|
||||||
case t: ThriftCLIService if t.getPortNumber != 0 =>
|
|
||||||
serverPort = t.getPortNumber
|
|
||||||
logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt")
|
|
||||||
case _ =>
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for thrift server to be ready to serve the query, via executing simple query
|
try {
|
||||||
// till the query succeeds. See SPARK-30345 for more details.
|
hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
|
||||||
eventually(timeout(30.seconds), interval(1.seconds)) {
|
hiveServer2.getServices.asScala.foreach {
|
||||||
withJdbcStatement { _.execute("SELECT 1") }
|
case t: ThriftCLIService =>
|
||||||
|
serverPort = t.getPortNumber
|
||||||
|
logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt")
|
||||||
|
case _ =>
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for thrift server to be ready to serve the query, via executing simple query
|
||||||
|
// till the query succeeds. See SPARK-30345 for more details.
|
||||||
|
eventually(timeout(30.seconds), interval(1.seconds)) {
|
||||||
|
withJdbcStatement { _.execute("SELECT 1") }
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
case e: Exception =>
|
||||||
|
logError("Error start hive server with Context ", e)
|
||||||
|
if (hiveServer2 != null) {
|
||||||
|
hiveServer2.stop()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,6 +54,9 @@ import org.apache.spark.sql.types._
|
||||||
*/
|
*/
|
||||||
class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServer {
|
class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServer {
|
||||||
|
|
||||||
|
|
||||||
|
override def mode: ServerMode.Value = ServerMode.binary
|
||||||
|
|
||||||
override protected def testFile(fileName: String): String = {
|
override protected def testFile(fileName: String): String = {
|
||||||
val url = Thread.currentThread().getContextClassLoader.getResource(fileName)
|
val url = Thread.currentThread().getContextClassLoader.getResource(fileName)
|
||||||
// Copy to avoid URISyntaxException during accessing the resources in `sql/core`
|
// Copy to avoid URISyntaxException during accessing the resources in `sql/core`
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
package org.apache.spark.sql.hive.thriftserver
|
package org.apache.spark.sql.hive.thriftserver
|
||||||
|
|
||||||
class ThriftServerWithSparkContextSuite extends SharedThriftServer {
|
trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
|
||||||
|
|
||||||
test("SPARK-29911: Uncache cached tables when session closed") {
|
test("SPARK-29911: Uncache cached tables when session closed") {
|
||||||
val cacheManager = spark.sharedState.cacheManager
|
val cacheManager = spark.sharedState.cacheManager
|
||||||
|
@ -42,3 +42,12 @@ class ThriftServerWithSparkContextSuite extends SharedThriftServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class ThriftServerWithSparkContextInBinarySuite extends ThriftServerWithSparkContextSuite {
|
||||||
|
override def mode: ServerMode.Value = ServerMode.binary
|
||||||
|
}
|
||||||
|
|
||||||
|
class ThriftServerWithSparkContextInHttpSuite extends ThriftServerWithSparkContextSuite {
|
||||||
|
override def mode: ServerMode.Value = ServerMode.http
|
||||||
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.hadoop.hive.conf.HiveConf;
|
import org.apache.hadoop.hive.conf.HiveConf;
|
||||||
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
|
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
|
||||||
import org.apache.hadoop.hive.shims.ShimLoader;
|
import org.apache.hadoop.hive.shims.ShimLoader;
|
||||||
|
import org.apache.hive.service.ServiceException;
|
||||||
import org.apache.hive.service.auth.HiveAuthFactory;
|
import org.apache.hive.service.auth.HiveAuthFactory;
|
||||||
import org.apache.hive.service.cli.CLIService;
|
import org.apache.hive.service.cli.CLIService;
|
||||||
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
|
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
|
||||||
|
@ -45,7 +46,7 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
protected void initializeServer() {
|
||||||
try {
|
try {
|
||||||
// Server thread pool
|
// Server thread pool
|
||||||
String threadPoolName = "HiveServer2-Handler-Pool";
|
String threadPoolName = "HiveServer2-Handler-Pool";
|
||||||
|
@ -100,6 +101,14 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
|
||||||
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
|
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
|
||||||
+ serverSocket.getServerSocket().getLocalPort() + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
|
+ serverSocket.getServerSocket().getLocalPort() + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
|
} catch (Exception t) {
|
||||||
|
throw new ServiceException("Error initializing " + getName(), t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
server.serve();
|
server.serve();
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.fatal(
|
LOG.fatal(
|
||||||
|
|
|
@ -175,6 +175,7 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
|
||||||
public synchronized void start() {
|
public synchronized void start() {
|
||||||
super.start();
|
super.start();
|
||||||
if (!isStarted && !isEmbedded) {
|
if (!isStarted && !isEmbedded) {
|
||||||
|
initializeServer();
|
||||||
new Thread(this).start();
|
new Thread(this).start();
|
||||||
isStarted = true;
|
isStarted = true;
|
||||||
}
|
}
|
||||||
|
@ -633,6 +634,8 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
|
||||||
return resp;
|
return resp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected abstract void initializeServer();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public abstract void run();
|
public abstract void run();
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
|
||||||
import org.apache.hadoop.hive.shims.ShimLoader;
|
import org.apache.hadoop.hive.shims.ShimLoader;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
|
import org.apache.hive.service.ServiceException;
|
||||||
import org.apache.hive.service.auth.HiveAuthFactory;
|
import org.apache.hive.service.auth.HiveAuthFactory;
|
||||||
import org.apache.hive.service.cli.CLIService;
|
import org.apache.hive.service.cli.CLIService;
|
||||||
import org.apache.hive.service.cli.thrift.TCLIService.Iface;
|
import org.apache.hive.service.cli.thrift.TCLIService.Iface;
|
||||||
|
@ -53,13 +54,8 @@ public class ThriftHttpCLIService extends ThriftCLIService {
|
||||||
super(cliService, ThriftHttpCLIService.class.getSimpleName());
|
super(cliService, ThriftHttpCLIService.class.getSimpleName());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Configure Jetty to serve http requests. Example of a client connection URL:
|
|
||||||
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
|
|
||||||
* e.g. http://gateway:port/hive2/servlets/thrifths2/
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
protected void initializeServer() {
|
||||||
try {
|
try {
|
||||||
// Server thread pool
|
// Server thread pool
|
||||||
// Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests
|
// Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests
|
||||||
|
@ -150,6 +146,19 @@ public class ThriftHttpCLIService extends ThriftCLIService {
|
||||||
+ " mode on port " + connector.getLocalPort()+ " path=" + httpPath + " with " + minWorkerThreads + "..."
|
+ " mode on port " + connector.getLocalPort()+ " path=" + httpPath + " with " + minWorkerThreads + "..."
|
||||||
+ maxWorkerThreads + " worker threads";
|
+ maxWorkerThreads + " worker threads";
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
|
} catch (Exception t) {
|
||||||
|
throw new ServiceException("Error initializing " + getName(), t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure Jetty to serve http requests. Example of a client connection URL:
|
||||||
|
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
|
||||||
|
* e.g. http://gateway:port/hive2/servlets/thrifths2/
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
httpServer.join();
|
httpServer.join();
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.fatal(
|
LOG.fatal(
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
|
||||||
import org.apache.hadoop.hive.conf.HiveConf;
|
import org.apache.hadoop.hive.conf.HiveConf;
|
||||||
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
|
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
|
||||||
import org.apache.hadoop.hive.shims.ShimLoader;
|
import org.apache.hadoop.hive.shims.ShimLoader;
|
||||||
|
import org.apache.hive.service.ServiceException;
|
||||||
import org.apache.hive.service.auth.HiveAuthFactory;
|
import org.apache.hive.service.auth.HiveAuthFactory;
|
||||||
import org.apache.hive.service.cli.CLIService;
|
import org.apache.hive.service.cli.CLIService;
|
||||||
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
|
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
|
||||||
|
@ -46,7 +47,7 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
protected void initializeServer() {
|
||||||
try {
|
try {
|
||||||
// Server thread pool
|
// Server thread pool
|
||||||
String threadPoolName = "HiveServer2-Handler-Pool";
|
String threadPoolName = "HiveServer2-Handler-Pool";
|
||||||
|
@ -101,6 +102,14 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
|
||||||
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
|
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
|
||||||
+ portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
|
+ portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
|
} catch (Exception t) {
|
||||||
|
throw new ServiceException("Error initializing " + getName(), t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
server.serve();
|
server.serve();
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.error(
|
LOG.error(
|
||||||
|
|
|
@ -176,6 +176,7 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
|
||||||
public synchronized void start() {
|
public synchronized void start() {
|
||||||
super.start();
|
super.start();
|
||||||
if (!isStarted && !isEmbedded) {
|
if (!isStarted && !isEmbedded) {
|
||||||
|
initializeServer();
|
||||||
new Thread(this).start();
|
new Thread(this).start();
|
||||||
isStarted = true;
|
isStarted = true;
|
||||||
}
|
}
|
||||||
|
@ -670,6 +671,8 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
|
||||||
return resp;
|
return resp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected abstract void initializeServer();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public abstract void run();
|
public abstract void run();
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
|
||||||
import org.apache.hadoop.hive.shims.ShimLoader;
|
import org.apache.hadoop.hive.shims.ShimLoader;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
|
import org.apache.hive.service.ServiceException;
|
||||||
import org.apache.hive.service.auth.HiveAuthFactory;
|
import org.apache.hive.service.auth.HiveAuthFactory;
|
||||||
import org.apache.hive.service.cli.CLIService;
|
import org.apache.hive.service.cli.CLIService;
|
||||||
import org.apache.hive.service.rpc.thrift.TCLIService;
|
import org.apache.hive.service.rpc.thrift.TCLIService;
|
||||||
|
@ -54,13 +55,8 @@ public class ThriftHttpCLIService extends ThriftCLIService {
|
||||||
super(cliService, ThriftHttpCLIService.class.getSimpleName());
|
super(cliService, ThriftHttpCLIService.class.getSimpleName());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Configure Jetty to serve http requests. Example of a client connection URL:
|
|
||||||
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
|
|
||||||
* e.g. http://gateway:port/hive2/servlets/thrifths2/
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
protected void initializeServer() {
|
||||||
try {
|
try {
|
||||||
// Server thread pool
|
// Server thread pool
|
||||||
// Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests
|
// Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests
|
||||||
|
@ -151,6 +147,19 @@ public class ThriftHttpCLIService extends ThriftCLIService {
|
||||||
+ " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..."
|
+ " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..."
|
||||||
+ maxWorkerThreads + " worker threads";
|
+ maxWorkerThreads + " worker threads";
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
|
} catch (Exception t) {
|
||||||
|
throw new ServiceException("Error initializing " + getName(), t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure Jetty to serve http requests. Example of a client connection URL:
|
||||||
|
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
|
||||||
|
* e.g. http://gateway:port/hive2/servlets/thrifths2/
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
httpServer.join();
|
httpServer.join();
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.error(
|
LOG.error(
|
||||||
|
|
Loading…
Reference in a new issue