[SPARK-34373][SQL] HiveThriftServer2 startWithContext may hang with a race issue
### What changes were proposed in this pull request? fix a race issue by interrupting the thread ### Why are the changes needed? ``` 21:43:26.809 WARN org.apache.thrift.server.TThreadPoolServer: Transport error occurred during acceptance of message. org.apache.thrift.transport.TTransportException: No underlying server socket. at org.apache.thrift.transport.TServerSocket.acceptImpl(TServerSocket.java:126) at org.apache.thrift.transport.TServerSocket.acceptImpl(TServerSocket.java:35) at org.apache.thrift.transport.TServerTransport.acceException in thread "Thread-15" java.io.IOException: Stream closed at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) at java.io.BufferedInputStream.read(BufferedInputStream.java:336) at java.io.FilterInputStream.read(FilterInputStream.java:107) at scala.sys.process.BasicIO$.loop$1(BasicIO.scala:238) at scala.sys.process.BasicIO$.transferFullyImpl(BasicIO.scala:246) at scala.sys.process.BasicIO$.transferFully(BasicIO.scala:227) at scala.sys.process.BasicIO$.$anonfun$toStdOut$1(BasicIO.scala:221) ``` when the TServer try to `serve` after `stop`, it hangs with the log above forever ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? passing ci Closes #31479 from yaooqinn/SPARK-34373. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
parent
fadd0f5d9b
commit
1fac706db5
|
@ -39,6 +39,7 @@ import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
|
|||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.TProcessorFactory;
|
||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
import org.apache.thrift.server.TServer;
|
||||
import org.apache.thrift.server.TThreadPoolServer;
|
||||
import org.apache.thrift.transport.TServerSocket;
|
||||
import org.apache.thrift.transport.TTransportFactory;
|
||||
|
@ -46,6 +47,8 @@ import org.apache.thrift.transport.TTransportFactory;
|
|||
|
||||
public class ThriftBinaryCLIService extends ThriftCLIService {
|
||||
|
||||
protected TServer server;
|
||||
|
||||
public ThriftBinaryCLIService(CLIService cliService) {
|
||||
super(cliService, ThriftBinaryCLIService.class.getSimpleName());
|
||||
}
|
||||
|
@ -111,6 +114,13 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void stopServer() {
|
||||
server.stop();
|
||||
server = null;
|
||||
LOG.info("Thrift server has stopped");
|
||||
}
|
||||
|
||||
@Override
|
||||
public TGetQueryIdResp GetQueryId(TGetQueryIdReq req) throws TException {
|
||||
try {
|
||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.hive.service.server.HiveServer2;
|
|||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.protocol.TProtocol;
|
||||
import org.apache.thrift.server.ServerContext;
|
||||
import org.apache.thrift.server.TServer;
|
||||
import org.apache.thrift.server.TServerEventHandler;
|
||||
import org.apache.thrift.transport.TTransport;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -61,8 +60,7 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
|
|||
protected int portNum;
|
||||
protected InetAddress serverIPAddress;
|
||||
protected String hiveHost;
|
||||
protected TServer server;
|
||||
protected org.eclipse.jetty.server.Server httpServer;
|
||||
private Thread serverThread = null;
|
||||
|
||||
private boolean isStarted = false;
|
||||
protected boolean isEmbedded = false;
|
||||
|
@ -177,26 +175,23 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
|
|||
super.start();
|
||||
if (!isStarted && !isEmbedded) {
|
||||
initializeServer();
|
||||
new Thread(this).start();
|
||||
serverThread = new Thread(this);
|
||||
serverThread.setName(getName());
|
||||
serverThread.start();
|
||||
isStarted = true;
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void stopServer();
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
if (isStarted && !isEmbedded) {
|
||||
if(server != null) {
|
||||
server.stop();
|
||||
LOG.info("Thrift server has stopped");
|
||||
}
|
||||
if((httpServer != null) && httpServer.isStarted()) {
|
||||
try {
|
||||
httpServer.stop();
|
||||
LOG.info("Http server has stopped");
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error stopping Http server: ", e);
|
||||
}
|
||||
if (serverThread != null) {
|
||||
serverThread.interrupt();
|
||||
serverThread = null;
|
||||
}
|
||||
stopServer();
|
||||
isStarted = false;
|
||||
}
|
||||
super.stop();
|
||||
|
|
|
@ -51,6 +51,8 @@ import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
|
|||
|
||||
public class ThriftHttpCLIService extends ThriftCLIService {
|
||||
|
||||
protected org.eclipse.jetty.server.Server httpServer;
|
||||
|
||||
public ThriftHttpCLIService(CLIService cliService) {
|
||||
super(cliService, ThriftHttpCLIService.class.getSimpleName());
|
||||
}
|
||||
|
@ -152,6 +154,19 @@ public class ThriftHttpCLIService extends ThriftCLIService {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void stopServer() {
|
||||
if ((httpServer != null) && httpServer.isStarted()) {
|
||||
try {
|
||||
httpServer.stop();
|
||||
httpServer = null;
|
||||
LOG.info("Thrift HTTP server has been stopped");
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error stopping HTTP server: ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure Jetty to serve http requests. Example of a client connection URL:
|
||||
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
|
||||
|
@ -162,10 +177,14 @@ public class ThriftHttpCLIService extends ThriftCLIService {
|
|||
try {
|
||||
httpServer.join();
|
||||
} catch (Throwable t) {
|
||||
LOG.error(
|
||||
"Error starting HiveServer2: could not start "
|
||||
+ ThriftHttpCLIService.class.getSimpleName(), t);
|
||||
System.exit(-1);
|
||||
if (t instanceof InterruptedException) {
|
||||
// This is likely a shutdown
|
||||
LOG.info("Caught " + t.getClass().getSimpleName() + ". Shutting down thrift server.");
|
||||
} else {
|
||||
LOG.error("Error starting HiveServer2: could not start "
|
||||
+ ThriftHttpCLIService.class.getSimpleName(), t);
|
||||
System.exit(-1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue