[SPARK-16224] [SQL] [PYSPARK] SparkSession builder's configs need to be set to the existing Scala SparkContext's SparkConf
## What changes were proposed in this pull request? When we create a SparkSession at the Python side, it is possible that a SparkContext has been created. For this case, we need to set configs of the SparkSession builder to the Scala SparkContext's SparkConf (we need to do so because conf changes on a active Python SparkContext will not be propagated to the JVM side). Otherwise, we may create a wrong SparkSession (e.g. Hive support is not enabled even if enableHiveSupport is called). ## How was this patch tested? New tests and manual tests. Author: Yin Huai <yhuai@databricks.com> Closes #13931 from yhuai/SPARK-16224.
This commit is contained in:
parent
e158478a9f
commit
0923c4f567
|
@ -166,6 +166,8 @@ class SparkContext(object):
|
|||
|
||||
# Create the Java SparkContext through Py4J
|
||||
self._jsc = jsc or self._initialize_context(self._conf._jconf)
|
||||
# Reset the SparkConf to the one actually used by the SparkContext in JVM.
|
||||
self._conf = SparkConf(_jconf=self._jsc.sc().conf())
|
||||
|
||||
# Create a single Accumulator in Java that we'll send all our updates through;
|
||||
# they will be passed back to us through a TCP server
|
||||
|
|
|
@ -165,6 +165,13 @@ class SparkSession(object):
|
|||
for key, value in self._options.items():
|
||||
sparkConf.set(key, value)
|
||||
sc = SparkContext.getOrCreate(sparkConf)
|
||||
# This SparkContext may be an existing one.
|
||||
for key, value in self._options.items():
|
||||
# we need to propagate the confs
|
||||
# before we create the SparkSession. Otherwise, confs like
|
||||
# warehouse path and metastore url will not be set correctly (
|
||||
# these confs cannot be changed once the SparkSession is created).
|
||||
sc._conf.set(key, value)
|
||||
session = SparkSession(sc)
|
||||
for key, value in self._options.items():
|
||||
session.conf.set(key, value)
|
||||
|
|
|
@ -22,6 +22,7 @@ individual modules.
|
|||
"""
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
import pydoc
|
||||
import shutil
|
||||
import tempfile
|
||||
|
@ -48,7 +49,7 @@ else:
|
|||
from pyspark.sql import SparkSession, HiveContext, Column, Row
|
||||
from pyspark.sql.types import *
|
||||
from pyspark.sql.types import UserDefinedType, _infer_type
|
||||
from pyspark.tests import ReusedPySparkTestCase
|
||||
from pyspark.tests import ReusedPySparkTestCase, SparkSubmitTests
|
||||
from pyspark.sql.functions import UserDefinedFunction, sha2
|
||||
from pyspark.sql.window import Window
|
||||
from pyspark.sql.utils import AnalysisException, ParseException, IllegalArgumentException
|
||||
|
@ -1619,6 +1620,46 @@ class SQLTests(ReusedPySparkTestCase):
|
|||
lambda: spark.catalog.uncacheTable("does_not_exist"))
|
||||
|
||||
|
||||
class HiveSparkSubmitTests(SparkSubmitTests):
|
||||
|
||||
def test_hivecontext(self):
|
||||
# This test checks that HiveContext is using Hive metastore (SPARK-16224).
|
||||
# It sets a metastore url and checks if there is a derby dir created by
|
||||
# Hive metastore. If this derby dir exists, HiveContext is using
|
||||
# Hive metastore.
|
||||
metastore_path = os.path.join(tempfile.mkdtemp(), "spark16224_metastore_db")
|
||||
metastore_URL = "jdbc:derby:;databaseName=" + metastore_path + ";create=true"
|
||||
hive_site_dir = os.path.join(self.programDir, "conf")
|
||||
hive_site_file = self.createTempFile("hive-site.xml", ("""
|
||||
|<configuration>
|
||||
| <property>
|
||||
| <name>javax.jdo.option.ConnectionURL</name>
|
||||
| <value>%s</value>
|
||||
| </property>
|
||||
|</configuration>
|
||||
""" % metastore_URL).lstrip(), "conf")
|
||||
script = self.createTempFile("test.py", """
|
||||
|import os
|
||||
|
|
||||
|from pyspark.conf import SparkConf
|
||||
|from pyspark.context import SparkContext
|
||||
|from pyspark.sql import HiveContext
|
||||
|
|
||||
|conf = SparkConf()
|
||||
|sc = SparkContext(conf=conf)
|
||||
|hive_context = HiveContext(sc)
|
||||
|print(hive_context.sql("show databases").collect())
|
||||
""")
|
||||
proc = subprocess.Popen(
|
||||
[self.sparkSubmit, "--master", "local-cluster[1,1,1024]",
|
||||
"--driver-class-path", hive_site_dir, script],
|
||||
stdout=subprocess.PIPE)
|
||||
out, err = proc.communicate()
|
||||
self.assertEqual(0, proc.returncode)
|
||||
self.assertIn("default", out.decode('utf-8'))
|
||||
self.assertTrue(os.path.exists(metastore_path))
|
||||
|
||||
|
||||
class HiveContextSQLTests(ReusedPySparkTestCase):
|
||||
|
||||
@classmethod
|
||||
|
|
|
@ -1921,6 +1921,14 @@ class ContextTests(unittest.TestCase):
|
|||
post_parallalize_temp_files = os.listdir(sc._temp_dir)
|
||||
self.assertEqual(temp_files, post_parallalize_temp_files)
|
||||
|
||||
def test_set_conf(self):
|
||||
# This is for an internal use case. When there is an existing SparkContext,
|
||||
# SparkSession's builder needs to set configs into SparkContext's conf.
|
||||
sc = SparkContext()
|
||||
sc._conf.set("spark.test.SPARK16224", "SPARK16224")
|
||||
self.assertEqual(sc._jsc.sc().conf().get("spark.test.SPARK16224"), "SPARK16224")
|
||||
sc.stop()
|
||||
|
||||
def test_stop(self):
|
||||
sc = SparkContext()
|
||||
self.assertNotEqual(SparkContext._active_spark_context, None)
|
||||
|
|
Loading…
Reference in a new issue