[SPARK-32034][SQL] Port HIVE-14817: Shutdown the SessionManager timeoutChecker thread properly upon shutdown
### What changes were proposed in this pull request? This PR port https://issues.apache.org/jira/browse/HIVE-14817 for spark thrift server. ### Why are the changes needed? When stopping the HiveServer2, the non-daemon thread stops the server from terminating ```sql "HiveServer2-Background-Pool: Thread-79" #79 prio=5 os_prio=31 tid=0x00007fde26138800 nid=0x13713 waiting on condition [0x0000700010c32000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.hive.service.cli.session.SessionManager$1.sleepInterval(SessionManager.java:178) at org.apache.hive.service.cli.session.SessionManager$1.run(SessionManager.java:156) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` Here is an example to reproduce: https://github.com/yaooqinn/kyuubi/blob/master/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/spark/SparkSQLEngineApp.scala Also, it causes issues as HIVE-14817 described which ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Passing Jenkins Closes #28870 from yaooqinn/SPARK-32034. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
parent
d2a656c81e
commit
9f8e15bb2e
|
@ -148,14 +148,20 @@ public class SessionManager extends CompositeService {
|
|||
}
|
||||
}
|
||||
|
||||
private final Object timeoutCheckerLock = new Object();
|
||||
|
||||
private void startTimeoutChecker() {
|
||||
final long interval = Math.max(checkInterval, 3000L); // minimum 3 seconds
|
||||
Runnable timeoutChecker = new Runnable() {
|
||||
final Runnable timeoutChecker = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
|
||||
sleepFor(interval);
|
||||
while (!shutdown) {
|
||||
long current = System.currentTimeMillis();
|
||||
for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) {
|
||||
if (shutdown) {
|
||||
break;
|
||||
}
|
||||
if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current
|
||||
&& (!checkOperation || session.getNoOperationTime() > sessionTimeout)) {
|
||||
SessionHandle handle = session.getSessionHandle();
|
||||
|
@ -170,24 +176,34 @@ public class SessionManager extends CompositeService {
|
|||
session.closeExpiredOperations();
|
||||
}
|
||||
}
|
||||
sleepFor(interval);
|
||||
}
|
||||
}
|
||||
|
||||
private void sleepInterval(long interval) {
|
||||
try {
|
||||
Thread.sleep(interval);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
private void sleepFor(long interval) {
|
||||
synchronized (timeoutCheckerLock) {
|
||||
try {
|
||||
timeoutCheckerLock.wait(interval);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore, and break.
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
backgroundOperationPool.execute(timeoutChecker);
|
||||
}
|
||||
|
||||
private void shutdownTimeoutChecker() {
|
||||
shutdown = true;
|
||||
synchronized (timeoutCheckerLock) {
|
||||
timeoutCheckerLock.notify();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
super.stop();
|
||||
shutdown = true;
|
||||
shutdownTimeoutChecker();
|
||||
if (backgroundOperationPool != null) {
|
||||
backgroundOperationPool.shutdown();
|
||||
long timeout = hiveConf.getTimeVar(
|
||||
|
|
|
@ -148,14 +148,20 @@ public class SessionManager extends CompositeService {
|
|||
}
|
||||
}
|
||||
|
||||
private final Object timeoutCheckerLock = new Object();
|
||||
|
||||
private void startTimeoutChecker() {
|
||||
final long interval = Math.max(checkInterval, 3000L); // minimum 3 seconds
|
||||
Runnable timeoutChecker = new Runnable() {
|
||||
final Runnable timeoutChecker = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
|
||||
sleepFor(interval);
|
||||
while (!shutdown) {
|
||||
long current = System.currentTimeMillis();
|
||||
for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) {
|
||||
if (shutdown) {
|
||||
break;
|
||||
}
|
||||
if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current
|
||||
&& (!checkOperation || session.getNoOperationTime() > sessionTimeout)) {
|
||||
SessionHandle handle = session.getSessionHandle();
|
||||
|
@ -170,24 +176,34 @@ public class SessionManager extends CompositeService {
|
|||
session.closeExpiredOperations();
|
||||
}
|
||||
}
|
||||
sleepFor(interval);
|
||||
}
|
||||
}
|
||||
|
||||
private void sleepInterval(long interval) {
|
||||
try {
|
||||
Thread.sleep(interval);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
private void sleepFor(long interval) {
|
||||
synchronized (timeoutCheckerLock) {
|
||||
try {
|
||||
timeoutCheckerLock.wait(interval);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore, and break.
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
backgroundOperationPool.execute(timeoutChecker);
|
||||
}
|
||||
|
||||
private void shutdownTimeoutChecker() {
|
||||
shutdown = true;
|
||||
synchronized (timeoutCheckerLock) {
|
||||
timeoutCheckerLock.notify();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
super.stop();
|
||||
shutdown = true;
|
||||
shutdownTimeoutChecker();
|
||||
if (backgroundOperationPool != null) {
|
||||
backgroundOperationPool.shutdown();
|
||||
long timeout = hiveConf.getTimeVar(
|
||||
|
|
Loading…
Reference in a new issue