[SPARK-32813][SQL] Get default config of ParquetSource vectorized reader if no active SparkSession
### What changes were proposed in this pull request? If no active SparkSession is available, let `FileSourceScanExec.needsUnsafeRowConversion` look at default SQL config of ParquetSource vectorized reader instead of failing the query execution. ### Why are the changes needed? Fix a bug that if no active SparkSession is available, file-based data source scan for Parquet Source will throw exception. ### Does this PR introduce _any_ user-facing change? Yes, this change fixes the bug. ### How was this patch tested? Unit test. Closes #29667 from viirya/SPARK-32813. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
parent
514bf563a7
commit
de0dc52a84
|
@ -177,7 +177,7 @@ case class FileSourceScanExec(
|
|||
|
||||
private lazy val needsUnsafeRowConversion: Boolean = {
|
||||
if (relation.fileFormat.isInstanceOf[ParquetSource]) {
|
||||
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
|
||||
sqlContext.conf.parquetVectorizedReaderEnabled
|
||||
} else {
|
||||
false
|
||||
}
|
||||
|
|
|
@ -17,11 +17,17 @@
|
|||
|
||||
package org.apache.spark.sql.execution
|
||||
|
||||
import java.util.concurrent.Executors
|
||||
|
||||
import scala.collection.parallel.immutable.ParRange
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
|
||||
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.{Row, SparkSession}
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.util.ThreadUtils
|
||||
|
||||
class SQLExecutionSuite extends SparkFunSuite {
|
||||
|
||||
|
@ -119,6 +125,38 @@ class SQLExecutionSuite extends SparkFunSuite {
|
|||
|
||||
spark.stop()
|
||||
}
|
||||
|
||||
test("SPARK-32813: Table scan should work in different thread") {
|
||||
val executor1 = Executors.newSingleThreadExecutor()
|
||||
val executor2 = Executors.newSingleThreadExecutor()
|
||||
var session: SparkSession = null
|
||||
SparkSession.cleanupAnyExistingSession()
|
||||
|
||||
withTempDir { tempDir =>
|
||||
try {
|
||||
val tablePath = tempDir.toString + "/table"
|
||||
val df = ThreadUtils.awaitResult(Future {
|
||||
session = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
|
||||
|
||||
session.createDataFrame(
|
||||
session.sparkContext.parallelize(Row(Array(1, 2, 3)) :: Nil),
|
||||
StructType(Seq(
|
||||
StructField("a", ArrayType(IntegerType, containsNull = false), nullable = false))))
|
||||
.write.parquet(tablePath)
|
||||
|
||||
session.read.parquet(tablePath)
|
||||
}(ExecutionContext.fromExecutorService(executor1)), 1.minute)
|
||||
|
||||
ThreadUtils.awaitResult(Future {
|
||||
assert(df.rdd.collect()(0) === Row(Seq(1, 2, 3)))
|
||||
}(ExecutionContext.fromExecutorService(executor2)), 1.minute)
|
||||
} finally {
|
||||
executor1.shutdown()
|
||||
executor2.shutdown()
|
||||
session.stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object SQLExecutionSuite {
|
||||
|
|
Loading…
Reference in a new issue