[SPARK-29022][SQL] Fix SparkSQLCLI can not add jars by AddJarCommand
### What changes were proposed in this pull request? For issue mentioned in [SPARK-29022](https://issues.apache.org/jira/browse/SPARK-29022) Spark SQL CLI can't use class as serde class in jars add by SQL `ADD JAR`. When we create table with `serde` class contains by jar added by SQL 'ADD JAR'. We can create table with `serde` class construct success since we call `HiveClientImpl.createTable` under `withHiveState` method, it will add `clientLoader.classLoader` to `HiveClientImpl.state.getConf.classLoader`. Jars added by SQL `ADD JAR` will be add to 1. `sparkSession.sharedState.jarClassLoader`. 2. 'HiveClientLoader.clientLoader.classLoader' In Current spark-sql MODE, `HiveClientImpl.state` will use CliSessionState created when initialize SparkSQLCliDriver, When we select data from table, it will check `serde` class, when call method `HiveTableScanExec#addColumnMetadataToConf()` to check for table desc serde class. ``` val deserializer = tableDesc.getDeserializerClass.getConstructor().newInstance() deserializer.initialize(hiveConf, tableDesc.getProperties) ``` `getDeserializer` will use CliSessionState's hiveConf's classLoader in `Spark SQL CLI` mode. But when we call `ADD JAR` in spark, the jar won't be added to `Classloader of CliSessionState' conf `, then `ClassNotFound` error happen. So we reset `CliSessionState conf's classLoader ` to `sharedState.jarClassLoader` when `sharedState.jarClassLoader` has added jar passed by `HIVEAUXJARS` Then when we use `ADD JAR ` to add jar, jar path will be added to CliSessionState's conf's ClassLoader ### Why are the changes needed? Fix bug ### Does this PR introduce any user-facing change? No ### How was this patch tested? ADD UT Closes #25729 from AngersZhuuuu/SPARK-29015. Authored-by: angerszhu <angers.zhu@gmail.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
parent
bd031c2173
commit
0cf2f48dfe
|
@ -165,6 +165,13 @@ private[hive] object SparkSQLCLIDriver extends Logging {
|
|||
StringUtils.split(auxJars, ",").foreach(resourceLoader.addJar(_))
|
||||
}
|
||||
|
||||
// The class loader of CliSessionState's conf is current main thread's class loader
|
||||
// used to load jars passed by --jars. One class loader used by AddJarCommand is
|
||||
// sharedState.jarClassLoader which contain jar path passed by --jars in main thread.
|
||||
// We set CliSessionState's conf class loader to sharedState.jarClassLoader.
|
||||
// Thus we can load all jars passed by --jars and AddJarCommand.
|
||||
sessionState.getConf.setClassLoader(SparkSQLEnv.sqlContext.sharedState.jarClassLoader)
|
||||
|
||||
// TODO work around for set the log output to console, because the HiveContext
|
||||
// will set the output into an invalid buffer.
|
||||
sessionState.in = System.in
|
||||
|
|
|
@ -226,6 +226,32 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
|
|||
)
|
||||
}
|
||||
|
||||
test("SPARK-29022: Commands using SerDe provided in --hive.aux.jars.path") {
|
||||
val dataFilePath =
|
||||
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
|
||||
val hiveContribJar = HiveTestJars.getHiveHcatalogCoreJar().getCanonicalPath
|
||||
runCliWithin(
|
||||
3.minute,
|
||||
Seq("--conf", s"spark.hadoop.${ConfVars.HIVEAUXJARS}=$hiveContribJar"))(
|
||||
"""CREATE TABLE addJarWithHiveAux(key string, val string)
|
||||
|ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
|
||||
""".stripMargin
|
||||
-> "",
|
||||
"CREATE TABLE sourceTableForWithHiveAux (key INT, val STRING);"
|
||||
-> "",
|
||||
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTableForWithHiveAux;"
|
||||
-> "",
|
||||
"INSERT INTO TABLE addJarWithHiveAux SELECT key, val FROM sourceTableForWithHiveAux;"
|
||||
-> "",
|
||||
"SELECT collect_list(array(val)) FROM addJarWithHiveAux;"
|
||||
-> """[["val_238"],["val_86"],["val_311"],["val_27"],["val_165"]]""",
|
||||
"DROP TABLE addJarWithHiveAux;"
|
||||
-> "",
|
||||
"DROP TABLE sourceTableForWithHiveAux;"
|
||||
-> ""
|
||||
)
|
||||
}
|
||||
|
||||
test("SPARK-11188 Analysis error reporting") {
|
||||
runCliWithin(timeout = 2.minute,
|
||||
errorResponses = Seq("AnalysisException"))(
|
||||
|
@ -332,4 +358,30 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
|
|||
"SELECT concat_ws(',', 'First', example_max(1234321), 'Third');" -> "First,1234321,Third"
|
||||
)
|
||||
}
|
||||
|
||||
test("SPARK-29022 Commands using SerDe provided in ADD JAR sql") {
|
||||
val dataFilePath =
|
||||
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
|
||||
val hiveContribJar = HiveTestJars.getHiveHcatalogCoreJar().getCanonicalPath
|
||||
runCliWithin(
|
||||
3.minute)(
|
||||
s"ADD JAR ${hiveContribJar};" -> "",
|
||||
"""CREATE TABLE addJarWithSQL(key string, val string)
|
||||
|ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
|
||||
""".stripMargin
|
||||
-> "",
|
||||
"CREATE TABLE sourceTableForWithSQL(key INT, val STRING);"
|
||||
-> "",
|
||||
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTableForWithSQL;"
|
||||
-> "",
|
||||
"INSERT INTO TABLE addJarWithSQL SELECT key, val FROM sourceTableForWithSQL;"
|
||||
-> "",
|
||||
"SELECT collect_list(array(val)) FROM addJarWithSQL;"
|
||||
-> """[["val_238"],["val_86"],["val_311"],["val_27"],["val_165"]]""",
|
||||
"DROP TABLE addJarWithSQL;"
|
||||
-> "",
|
||||
"DROP TABLE sourceTableForWithSQL;"
|
||||
-> ""
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue