[SPARK-29530][SQL] Make SQLConf in SQL parse process thread safe
### What changes were proposed in this pull request? As I have comment in [SPARK-29516](https://github.com/apache/spark/pull/26172#issuecomment-544364977) SparkSession.sql() method parse process not under current sparksession's conf, so some configuration about parser is not valid in multi-thread situation. In this pr, we add a SQLConf parameter to AbstractSqlParser and initial it with SessionState's conf. Then for each SparkSession's parser process. It will use's it's own SessionState's SQLConf and to be thread safe ### Why are the changes needed? Fix bug ### Does this PR introduce any user-facing change? NO ### How was this patch tested? NO Closes #26187 from AngersZhuuuu/SPARK-29530. Authored-by: angerszhu <angers.zhu@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
3d567a357c
commit
484f93e255
|
@ -33,7 +33,7 @@ import org.apache.spark.sql.types.{DataType, StructType}
|
|||
/**
|
||||
* Base SQL parsing infrastructure.
|
||||
*/
|
||||
abstract class AbstractSqlParser extends ParserInterface with Logging {
|
||||
abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Logging {
|
||||
|
||||
/** Creates/Resolves DataType for a given SQL string. */
|
||||
override def parseDataType(sqlText: String): DataType = parse(sqlText) { parser =>
|
||||
|
@ -91,16 +91,16 @@ abstract class AbstractSqlParser extends ParserInterface with Logging {
|
|||
val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
|
||||
lexer.removeErrorListeners()
|
||||
lexer.addErrorListener(ParseErrorListener)
|
||||
lexer.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced
|
||||
lexer.ansi = SQLConf.get.ansiEnabled
|
||||
lexer.legacy_setops_precedence_enbled = conf.setOpsPrecedenceEnforced
|
||||
lexer.ansi = conf.ansiEnabled
|
||||
|
||||
val tokenStream = new CommonTokenStream(lexer)
|
||||
val parser = new SqlBaseParser(tokenStream)
|
||||
parser.addParseListener(PostProcessor)
|
||||
parser.removeErrorListeners()
|
||||
parser.addErrorListener(ParseErrorListener)
|
||||
parser.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced
|
||||
parser.ansi = SQLConf.get.ansiEnabled
|
||||
parser.legacy_setops_precedence_enbled = conf.setOpsPrecedenceEnforced
|
||||
parser.ansi = conf.ansiEnabled
|
||||
|
||||
try {
|
||||
try {
|
||||
|
@ -134,12 +134,12 @@ abstract class AbstractSqlParser extends ParserInterface with Logging {
|
|||
/**
|
||||
* Concrete SQL parser for Catalyst-only SQL statements.
|
||||
*/
|
||||
class CatalystSqlParser(conf: SQLConf) extends AbstractSqlParser {
|
||||
class CatalystSqlParser(conf: SQLConf) extends AbstractSqlParser(conf) {
|
||||
val astBuilder = new AstBuilder(conf)
|
||||
}
|
||||
|
||||
/** For test-only. */
|
||||
object CatalystSqlParser extends AbstractSqlParser {
|
||||
object CatalystSqlParser extends AbstractSqlParser(SQLConf.get) {
|
||||
val astBuilder = new AstBuilder(SQLConf.get)
|
||||
}
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ import org.apache.spark.sql.types.StructType
|
|||
/**
|
||||
* Concrete parser for Spark SQL statements.
|
||||
*/
|
||||
class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
|
||||
class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser(conf) {
|
||||
val astBuilder = new SparkSqlAstBuilder(conf)
|
||||
|
||||
private val substitutor = new VariableSubstitution(conf)
|
||||
|
|
Loading…
Reference in a new issue