[SPARK-5872] [SQL] create a sqlCtx in pyspark shell
The sqlCtx will be HiveContext if hive is built in assembly jar, or SQLContext if not. It also skip the Hive tests in pyspark.sql.tests if no hive is available. Author: Davies Liu <davies@databricks.com> Closes #4659 from davies/sqlctx and squashes the following commits: 0e6629a [Davies Liu] sqlCtx in pyspark
This commit is contained in:
parent
3df85dccbc
commit
4d4cc760fa
|
@ -31,8 +31,12 @@ if sys.version_info[0] != 2:
|
||||||
import atexit
|
import atexit
|
||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
|
|
||||||
|
import py4j
|
||||||
|
|
||||||
import pyspark
|
import pyspark
|
||||||
from pyspark.context import SparkContext
|
from pyspark.context import SparkContext
|
||||||
|
from pyspark.sql import SQLContext, HiveContext
|
||||||
from pyspark.storagelevel import StorageLevel
|
from pyspark.storagelevel import StorageLevel
|
||||||
|
|
||||||
# this is the deprecated equivalent of ADD_JARS
|
# this is the deprecated equivalent of ADD_JARS
|
||||||
|
@ -46,6 +50,13 @@ if os.environ.get("SPARK_EXECUTOR_URI"):
|
||||||
sc = SparkContext(appName="PySparkShell", pyFiles=add_files)
|
sc = SparkContext(appName="PySparkShell", pyFiles=add_files)
|
||||||
atexit.register(lambda: sc.stop())
|
atexit.register(lambda: sc.stop())
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Try to access HiveConf, it will raise exception if Hive is not added
|
||||||
|
sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
|
||||||
|
sqlCtx = HiveContext(sc)
|
||||||
|
except py4j.protocol.Py4JError:
|
||||||
|
sqlCtx = SQLContext(sc)
|
||||||
|
|
||||||
print("""Welcome to
|
print("""Welcome to
|
||||||
____ __
|
____ __
|
||||||
/ __/__ ___ _____/ /__
|
/ __/__ ___ _____/ /__
|
||||||
|
@ -57,7 +68,7 @@ print("Using Python version %s (%s, %s)" % (
|
||||||
platform.python_version(),
|
platform.python_version(),
|
||||||
platform.python_build()[0],
|
platform.python_build()[0],
|
||||||
platform.python_build()[1]))
|
platform.python_build()[1]))
|
||||||
print("SparkContext available as sc.")
|
print("SparkContext available as sc, %s available as sqlCtx." % sqlCtx.__class__.__name__)
|
||||||
|
|
||||||
if add_files is not None:
|
if add_files is not None:
|
||||||
print("Warning: ADD_FILES environment variable is deprecated, use --py-files argument instead")
|
print("Warning: ADD_FILES environment variable is deprecated, use --py-files argument instead")
|
||||||
|
|
|
@ -25,6 +25,8 @@ import pydoc
|
||||||
import shutil
|
import shutil
|
||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
|
import py4j
|
||||||
|
|
||||||
if sys.version_info[:2] <= (2, 6):
|
if sys.version_info[:2] <= (2, 6):
|
||||||
try:
|
try:
|
||||||
import unittest2 as unittest
|
import unittest2 as unittest
|
||||||
|
@ -329,9 +331,12 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
|
||||||
def setUpClass(cls):
|
def setUpClass(cls):
|
||||||
ReusedPySparkTestCase.setUpClass()
|
ReusedPySparkTestCase.setUpClass()
|
||||||
cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
|
cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
|
||||||
|
try:
|
||||||
|
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
|
||||||
|
except py4j.protocol.Py4JError:
|
||||||
|
cls.sqlCtx = None
|
||||||
|
return
|
||||||
os.unlink(cls.tempdir.name)
|
os.unlink(cls.tempdir.name)
|
||||||
print "type", type(cls.sc)
|
|
||||||
print "type", type(cls.sc._jsc)
|
|
||||||
_scala_HiveContext =\
|
_scala_HiveContext =\
|
||||||
cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
|
cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
|
||||||
cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext)
|
cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext)
|
||||||
|
@ -344,6 +349,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
|
||||||
shutil.rmtree(cls.tempdir.name, ignore_errors=True)
|
shutil.rmtree(cls.tempdir.name, ignore_errors=True)
|
||||||
|
|
||||||
def test_save_and_load_table(self):
|
def test_save_and_load_table(self):
|
||||||
|
if self.sqlCtx is None:
|
||||||
|
return # no hive available, skipped
|
||||||
|
|
||||||
df = self.df
|
df = self.df
|
||||||
tmpPath = tempfile.mkdtemp()
|
tmpPath = tempfile.mkdtemp()
|
||||||
shutil.rmtree(tmpPath)
|
shutil.rmtree(tmpPath)
|
||||||
|
|
Loading…
Reference in a new issue