[SPARK-5840][SQL] HiveContext cannot be serialized due to tuple extraction

Also added test cases for checking the serializability of HiveContext and SQLContext.

Author: Reynold Xin <rxin@databricks.com>

Closes #4628 from rxin/SPARK-5840 and squashes the following commits:

ecb3bcd [Reynold Xin] test cases and reviews.
55eb822 [Reynold Xin] [SPARK-5840][SQL] HiveContext cannot be serialized due to tuple extraction.
This commit is contained in:
Reynold Xin 2015-02-18 14:02:32 -08:00 committed by Michael Armbrust
parent a8eb92dcb9
commit f0e3b71077
3 changed files with 84 additions and 16 deletions

View file

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.scalatest.FunSuite
import org.apache.spark.SparkConf
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.sql.test.TestSQLContext
class SerializationSuite extends FunSuite {
test("[SPARK-5235] SQLContext should be serializable") {
val sqlContext = new SQLContext(TestSQLContext.sparkContext)
new JavaSerializer(new SparkConf()).newInstance().serialize(sqlContext)
}
}

View file

@ -222,22 +222,25 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
* set in the SQLConf *as well as* in the HiveConf.
*/
@transient protected[hive] lazy val (hiveconf, sessionState) =
Option(SessionState.get())
.orElse {
val newState = new SessionState(new HiveConf(classOf[SessionState]))
// Only starts newly created `SessionState` instance. Any existing `SessionState` instance
// returned by `SessionState.get()` must be the most recently started one.
SessionState.start(newState)
Some(newState)
@transient protected[hive] lazy val sessionState: SessionState = {
var state = SessionState.get()
if (state == null) {
state = new SessionState(new HiveConf(classOf[SessionState]))
SessionState.start(state)
}
.map { state =>
setConf(state.getConf.getAllProperties)
if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8")
if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8")
(state.getConf, state)
if (state.out == null) {
state.out = new PrintStream(outputBuffer, true, "UTF-8")
}
if (state.err == null) {
state.err = new PrintStream(outputBuffer, true, "UTF-8")
}
state
}
@transient protected[hive] lazy val hiveconf: HiveConf = {
setConf(sessionState.getConf.getAllProperties)
sessionState.getConf
}
.get
override def setConf(key: String, value: String): Unit = {
super.setConf(key, value)

View file

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
import org.scalatest.FunSuite
import org.apache.spark.SparkConf
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.sql.hive.test.TestHive
class SerializationSuite extends FunSuite {
test("[SPARK-5840] HiveContext should be serializable") {
val hiveContext = new HiveContext(TestHive.sparkContext)
hiveContext.hiveconf
new JavaSerializer(new SparkConf()).newInstance().serialize(hiveContext)
}
}