[SPARK-22544][SS] FileStreamSource should use its own hadoop conf to call globPathIfNecessary
## What changes were proposed in this pull request? Pass the FileSystem created using the correct Hadoop conf into `globPathIfNecessary` so that it can pick up user's hadoop configurations, such as credentials. ## How was this patch tested? Jenkins Author: Shixiong Zhu <zsxwing@gmail.com> Closes #19771 from zsxwing/fix-file-stream-conf.
This commit is contained in:
parent
fccb337f9d
commit
bf0c0ae2dc
|
@ -47,8 +47,9 @@ class FileStreamSource(
|
|||
|
||||
private val hadoopConf = sparkSession.sessionState.newHadoopConf()
|
||||
|
||||
@transient private val fs = new Path(path).getFileSystem(hadoopConf)
|
||||
|
||||
private val qualifiedBasePath: Path = {
|
||||
val fs = new Path(path).getFileSystem(hadoopConf)
|
||||
fs.makeQualified(new Path(path)) // can contains glob patterns
|
||||
}
|
||||
|
||||
|
@ -187,7 +188,7 @@ class FileStreamSource(
|
|||
if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else None
|
||||
|
||||
private def allFilesUsingInMemoryFileIndex() = {
|
||||
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
|
||||
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(fs, qualifiedBasePath)
|
||||
val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
|
||||
fileIndex.allFiles()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue