diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index c1adbb18b4..ce5909a094 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -177,7 +177,7 @@ case class FileSourceScanExec( private lazy val needsUnsafeRowConversion: Boolean = { if (relation.fileFormat.isInstanceOf[ParquetSource]) { - SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled + sqlContext.conf.parquetVectorizedReaderEnabled } else { false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala index 8bf7fe62cd..81e692076b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala @@ -17,11 +17,17 @@ package org.apache.spark.sql.execution +import java.util.concurrent.Executors + import scala.collection.parallel.immutable.ParRange +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.types._ +import org.apache.spark.util.ThreadUtils class SQLExecutionSuite extends SparkFunSuite { @@ -119,6 +125,38 @@ class SQLExecutionSuite extends SparkFunSuite { spark.stop() } + + test("SPARK-32813: Table scan should work in different thread") { + val executor1 = Executors.newSingleThreadExecutor() + val executor2 = Executors.newSingleThreadExecutor() + var session: SparkSession = null + SparkSession.cleanupAnyExistingSession() + + withTempDir { tempDir => + try { + val tablePath = tempDir.toString + "/table" + val df = ThreadUtils.awaitResult(Future { + session = SparkSession.builder().appName("test").master("local[*]").getOrCreate() + + session.createDataFrame( + session.sparkContext.parallelize(Row(Array(1, 2, 3)) :: Nil), + StructType(Seq( + StructField("a", ArrayType(IntegerType, containsNull = false), nullable = false)))) + .write.parquet(tablePath) + + session.read.parquet(tablePath) + }(ExecutionContext.fromExecutorService(executor1)), 1.minute) + + ThreadUtils.awaitResult(Future { + assert(df.rdd.collect()(0) === Row(Seq(1, 2, 3))) + }(ExecutionContext.fromExecutorService(executor2)), 1.minute) + } finally { + executor1.shutdown() + executor2.shutdown() + session.stop() + } + } + } } object SQLExecutionSuite {