[SPARK-29423][SS] lazily initialize StreamingQueryManager in SessionState
### What changes were proposed in this pull request? This PR makes `SessionState` lazily initialize `StreamingQueryManager` to avoid constructing `StreamingQueryManager` for each session when connecting to ThriftServer. ### Why are the changes needed? Reduce memory usage. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? manual test 1. Start thriftserver: ``` build/sbt clean package -Phive -Phadoop-3.2 -Phive-thriftserver export SPARK_PREPEND_CLASSES=true sbin/start-thriftserver.sh ``` 2. Open a session: ``` bin/beeline -u jdbc:hive2://localhost:10000 ``` 3. Check `StreamingQueryManager` instance: ``` jcmd | grep HiveThriftServer2 | awk -F ' ' '{print $1}' | xargs jmap -histo | grep StreamingQueryManager ``` **Before this PR**: ``` [rootspark-3267648 spark]# jcmd | grep HiveThriftServer2 | awk -F ' ' '{print $1}' | xargs jmap -histo | grep StreamingQueryManager 1954: 2 96 org.apache.spark.sql.streaming.StreamingQueryManager ``` **After this PR**: ``` [rootspark-3267648 spark]# jcmd | grep HiveThriftServer2 | awk -F ' ' '{print $1}' | xargs jmap -histo | grep StreamingQueryManager [rootspark-3267648 spark]# ``` Closes #26089 from wangyum/SPARK-29423. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
parent
51f10ed90f
commit
e00344edc1
|
@ -316,7 +316,7 @@ abstract class BaseSessionStateBuilder(
|
|||
() => analyzer,
|
||||
() => optimizer,
|
||||
planner,
|
||||
streamingQueryManager,
|
||||
() => streamingQueryManager,
|
||||
listenerManager,
|
||||
() => resourceLoader,
|
||||
createQueryExecution,
|
||||
|
|
|
@ -49,7 +49,8 @@ import org.apache.spark.sql.util.{ExecutionListenerManager, QueryExecutionListen
|
|||
* unresolved attributes and relations.
|
||||
* @param optimizerBuilder a function to create the logical query plan optimizer.
|
||||
* @param planner Planner that converts optimized logical plans to physical plans.
|
||||
* @param streamingQueryManager Interface to start and stop streaming queries.
|
||||
* @param streamingQueryManagerBuilder A function to create a streaming query manager to
|
||||
* start and stop streaming queries.
|
||||
* @param listenerManager Interface to register custom [[QueryExecutionListener]]s.
|
||||
* @param resourceLoaderBuilder a function to create a session shared resource loader to load JARs,
|
||||
* files, etc.
|
||||
|
@ -67,7 +68,7 @@ private[sql] class SessionState(
|
|||
analyzerBuilder: () => Analyzer,
|
||||
optimizerBuilder: () => Optimizer,
|
||||
val planner: SparkPlanner,
|
||||
val streamingQueryManager: StreamingQueryManager,
|
||||
val streamingQueryManagerBuilder: () => StreamingQueryManager,
|
||||
val listenerManager: ExecutionListenerManager,
|
||||
resourceLoaderBuilder: () => SessionResourceLoader,
|
||||
createQueryExecution: LogicalPlan => QueryExecution,
|
||||
|
@ -83,6 +84,10 @@ private[sql] class SessionState(
|
|||
|
||||
lazy val resourceLoader: SessionResourceLoader = resourceLoaderBuilder()
|
||||
|
||||
// The streamingQueryManager is lazy to avoid creating a StreamingQueryManager for each session
|
||||
// when connecting to ThriftServer.
|
||||
lazy val streamingQueryManager: StreamingQueryManager = streamingQueryManagerBuilder()
|
||||
|
||||
def catalogManager: CatalogManager = analyzer.catalogManager
|
||||
|
||||
def newHadoopConf(): Configuration = SessionState.newHadoopConf(
|
||||
|
|
Loading…
Reference in a new issue