[SPARK-5794] [SQL] fix add jar

Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #4586 from adrian-wang/addjar and squashes the following commits:

efdd602 [Daoyuan Wang] move jar to another place
6c707e8 [Daoyuan Wang] restrict hive version for test
32c4fb8 [Daoyuan Wang] fix style and add a test
9957d87 [Daoyuan Wang] use sessionstate classloader in makeRDDforTable
0810e71 [Daoyuan Wang] remove variable substitution
1898309 [Daoyuan Wang] fix classnotfound
95a40da [Daoyuan Wang] support env argus in add jar, and set add jar ret to 0
This commit is contained in:
Daoyuan Wang 2015-04-13 18:26:00 -07:00 committed by Michael Armbrust
parent 3782e1f2be
commit b45059d0d7
5 changed files with 23 additions and 4 deletions

View file

@ -33,7 +33,7 @@ import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils,
import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.processors.{SetProcessor, CommandProcessor, CommandProcessorFactory} import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, SetProcessor, CommandProcessor, CommandProcessorFactory}
import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.shims.ShimLoader import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.thrift.transport.TSocket import org.apache.thrift.transport.TSocket
@ -264,7 +264,8 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hconf) val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hconf)
if (proc != null) { if (proc != null) {
if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor]) { if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor] ||
proc.isInstanceOf[AddResourceProcessor]) {
val driver = new SparkSQLDriver val driver = new SparkSQLDriver
driver.init() driver.init()

View file

@ -35,6 +35,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.DateUtils import org.apache.spark.sql.types.DateUtils
import org.apache.spark.util.Utils
/** /**
* A trait for subclasses that handle table scans. * A trait for subclasses that handle table scans.
@ -76,7 +77,9 @@ class HadoopTableReader(
override def makeRDDForTable(hiveTable: HiveTable): RDD[Row] = override def makeRDDForTable(hiveTable: HiveTable): RDD[Row] =
makeRDDForTable( makeRDDForTable(
hiveTable, hiveTable,
relation.tableDesc.getDeserializerClass.asInstanceOf[Class[Deserializer]], Class.forName(
relation.tableDesc.getSerdeClassName, true, sc.sessionState.getConf.getClassLoader)
.asInstanceOf[Class[Deserializer]],
filterOpt = None) filterOpt = None)
/** /**

View file

@ -80,7 +80,7 @@ case class AddJar(path: String) extends RunnableCommand {
val hiveContext = sqlContext.asInstanceOf[HiveContext] val hiveContext = sqlContext.asInstanceOf[HiveContext]
hiveContext.runSqlHive(s"ADD JAR $path") hiveContext.runSqlHive(s"ADD JAR $path")
hiveContext.sparkContext.addJar(path) hiveContext.sparkContext.addJar(path)
Seq.empty[Row] Seq(Row(0))
} }
} }

View file

@ -813,6 +813,21 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
sql("DROP TABLE alter1") sql("DROP TABLE alter1")
} }
test("ADD JAR command 2") {
// this is a test case from mapjoin_addjar.q
val testJar = TestHive.getHiveFile("hive-hcatalog-core-0.13.1.jar").getCanonicalPath
val testData = TestHive.getHiveFile("data/files/sample.json").getCanonicalPath
if (HiveShim.version == "0.13.1") {
sql(s"ADD JAR $testJar")
sql(
"""CREATE TABLE t1(a string, b string)
|ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'""".stripMargin)
sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE t1""")
sql("select * from src join t1 on src.key = t1.a")
sql("DROP TABLE t1")
}
}
test("ADD FILE command") { test("ADD FILE command") {
val testFile = TestHive.getHiveFile("data/files/v1.txt").getCanonicalFile val testFile = TestHive.getHiveFile("data/files/v1.txt").getCanonicalFile
sql(s"ADD FILE $testFile") sql(s"ADD FILE $testFile")