[SPARK-31935][SQL] Hadoop file system config should be effective in data source options

### What changes were proposed in this pull request?

Mkae Hadoop file system config effective in data source options.

From `org.apache.hadoop.fs.FileSystem.java`:
```
  public static FileSystem get(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();
    String authority = uri.getAuthority();

    if (scheme == null && authority == null) {     // use default FS
      return get(conf);
    }

    if (scheme != null && authority == null) {     // no authority
      URI defaultUri = getDefaultUri(conf);
      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
          && defaultUri.getAuthority() != null) {  // & default has authority
        return get(defaultUri, conf);              // return default
      }
    }

    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {
      return createFileSystem(uri, conf);
    }

    return CACHE.get(uri, conf);
  }
```
Before changes, the file system configurations in data source options are not propagated in `DataSource.scala`.
After changes, we can specify authority and URI schema related configurations for scanning file systems.

This problem only exists in data source V1. In V2, we already use `sparkSession.sessionState.newHadoopConfWithOptions(options)` in `FileTable`.
### Why are the changes needed?

Allow users to specify authority and URI schema related Hadoop configurations for file source reading.

### Does this PR introduce _any_ user-facing change?

Yes, the file system related Hadoop configuration in data source option will be effective on reading.

### How was this patch tested?

Unit test

Closes #28760 from gengliangwang/ds_conf.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
This commit is contained in:
Gengliang Wang 2020-06-09 12:15:07 -07:00
parent 6a424b93e5
commit f3771c6b47
4 changed files with 52 additions and 7 deletions

View file

@ -110,6 +110,9 @@ case class DataSource(
private def providingInstance() = providingClass.getConstructor().newInstance()
private def newHadoopConfiguration(): Configuration =
sparkSession.sessionState.newHadoopConfWithOptions(options)
lazy val sourceInfo: SourceInfo = sourceSchema()
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
private val equality = sparkSession.sessionState.conf.resolver
@ -231,7 +234,7 @@ case class DataSource(
// once the streaming job starts and some upstream source starts dropping data.
val hdfsPath = new Path(path)
if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val fs = hdfsPath.getFileSystem(newHadoopConfiguration())
if (!fs.exists(hdfsPath)) {
throw new AnalysisException(s"Path does not exist: $path")
}
@ -358,7 +361,7 @@ case class DataSource(
case (format: FileFormat, _)
if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf(),
newHadoopConfiguration(),
sparkSession.sessionState.conf) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath,
@ -450,7 +453,7 @@ case class DataSource(
val allPaths = paths ++ caseInsensitiveOptions.get("path")
val outputPath = if (allPaths.length == 1) {
val path = new Path(allPaths.head)
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
val fs = path.getFileSystem(newHadoopConfiguration())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
@ -570,9 +573,7 @@ case class DataSource(
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path] = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, hadoopConf,
DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, newHadoopConfiguration(),
checkEmptyGlobPath, checkFilesExist)
}
}

View file

@ -843,6 +843,26 @@ class FileBasedDataSourceSuite extends QueryTest
}
}
test("SPARK-31935: Hadoop file system config should be effective in data source options") {
Seq("parquet", "").foreach { format =>
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
withTempDir { dir =>
val path = dir.getCanonicalPath
val defaultFs = "nonexistFS://nonexistFS"
val expectMessage = "No FileSystem for scheme: nonexistFS"
val message1 = intercept[java.io.IOException] {
spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path)
}.getMessage
assert(message1 == expectMessage)
val message2 = intercept[java.io.IOException] {
spark.read.option("fs.defaultFS", defaultFs).parquet(path)
}.getMessage
assert(message2 == expectMessage)
}
}
}
}
test("SPARK-31116: Select nested schema with case insensitive mode") {
// This test case failed at only Parquet. ORC is added for test coverage parity.
Seq("orc", "parquet").foreach { format =>

View file

@ -19,11 +19,12 @@ package org.apache.spark.sql.execution.datasources
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.scalatest.PrivateMethodTester
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.test.SharedSparkSession
class DataSourceSuite extends SharedSparkSession {
class DataSourceSuite extends SharedSparkSession with PrivateMethodTester {
import TestPaths._
test("test glob and non glob paths") {
@ -132,6 +133,17 @@ class DataSourceSuite extends SharedSparkSession {
)
)
}
test("Data source options should be propagated in method checkAndGlobPathIfNecessary") {
val dataSourceOptions = Map("fs.defaultFS" -> "nonexistsFs://nonexistsFs")
val dataSource = DataSource(spark, "parquet", Seq("/path3"), options = dataSourceOptions)
val checkAndGlobPathIfNecessary = PrivateMethod[Seq[Path]]('checkAndGlobPathIfNecessary)
val message = intercept[java.io.IOException] {
dataSource invokePrivate checkAndGlobPathIfNecessary(false, false)
}.getMessage
assert(message.equals("No FileSystem for scheme: nonexistsFs"))
}
}
object TestPaths {

View file

@ -532,6 +532,18 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
test("SPARK-31935: Hadoop file system config should be effective in data source options") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val defaultFs = "nonexistFS://nonexistFS"
val expectMessage = "No FileSystem for scheme: nonexistFS"
val message = intercept[java.io.IOException] {
spark.readStream.option("fs.defaultFS", defaultFs).text(path)
}.getMessage
assert(message == expectMessage)
}
}
test("read from textfile") {
withTempDirs { case (src, tmp) =>
val textStream = spark.readStream.textFile(src.getCanonicalPath)