[SPARK-25738][SQL] Fix LOAD DATA INPATH for hdfs port
## What changes were proposed in this pull request? LOAD DATA INPATH didn't work if the defaultFS included a port for hdfs. Handling this just requires a small change to use the correct URI constructor. ## How was this patch tested? Added a unit test, ran all tests via jenkins Closes #22733 from squito/SPARK-25738. Authored-by: Imran Rashid <irashid@cloudera.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
parent
4cee191c04
commit
fdaa99897a
|
@ -306,7 +306,8 @@ case class LoadDataCommand(
|
|||
val loadPath = {
|
||||
if (isLocal) {
|
||||
val localFS = FileContext.getLocalFSFileContext()
|
||||
makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path))
|
||||
LoadDataCommand.makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(),
|
||||
new Path(path))
|
||||
} else {
|
||||
val loadPath = new Path(path)
|
||||
// Follow Hive's behavior:
|
||||
|
@ -323,7 +324,7 @@ case class LoadDataCommand(
|
|||
// by considering the wild card scenario in mind.as per old logic query param is
|
||||
// been considered while creating URI instance and if path contains wild card char '?'
|
||||
// the remaining charecters after '?' will be removed while forming URI instance
|
||||
makeQualified(defaultFS, uriPath, loadPath)
|
||||
LoadDataCommand.makeQualified(defaultFS, uriPath, loadPath)
|
||||
}
|
||||
}
|
||||
val fs = loadPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
|
||||
|
@ -363,7 +364,9 @@ case class LoadDataCommand(
|
|||
CommandUtils.updateTableStats(sparkSession, targetTable)
|
||||
Seq.empty[Row]
|
||||
}
|
||||
}
|
||||
|
||||
object LoadDataCommand {
|
||||
/**
|
||||
* Returns a qualified path object. Method ported from org.apache.hadoop.fs.Path class.
|
||||
*
|
||||
|
@ -372,7 +375,7 @@ case class LoadDataCommand(
|
|||
* @param path Path instance based on the path string specified by the user.
|
||||
* @return qualified path object
|
||||
*/
|
||||
private def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = {
|
||||
private[sql] def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = {
|
||||
val pathUri = if (path.isAbsolute()) path.toUri() else new Path(workingDir, path).toUri()
|
||||
if (pathUri.getScheme == null || pathUri.getAuthority == null &&
|
||||
defaultUri.getAuthority != null) {
|
||||
|
@ -383,7 +386,7 @@ case class LoadDataCommand(
|
|||
pathUri.getAuthority
|
||||
}
|
||||
try {
|
||||
val newUri = new URI(scheme, authority, pathUri.getPath, pathUri.getFragment)
|
||||
val newUri = new URI(scheme, authority, pathUri.getPath, null, pathUri.getFragment)
|
||||
new Path(newUri)
|
||||
} catch {
|
||||
case e: URISyntaxException =>
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.spark.sql.hive.execution
|
||||
|
||||
import java.io.File
|
||||
import java.net.URI
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.sql.{Date, Timestamp}
|
||||
import java.util.{Locale, Set}
|
||||
|
@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, Functio
|
|||
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation}
|
||||
import org.apache.spark.sql.catalyst.parser.ParseException
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
|
||||
import org.apache.spark.sql.execution.command.LoadDataCommand
|
||||
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
|
||||
|
@ -1985,6 +1987,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
|
|||
}
|
||||
}
|
||||
|
||||
test("SPARK-25738: defaultFs can have a port") {
|
||||
val defaultURI = new URI("hdfs://fizz.buzz.com:8020")
|
||||
val r = LoadDataCommand.makeQualified(defaultURI, new Path("/foo/bar"), new Path("/flim/flam"))
|
||||
assert(r === new Path("hdfs://fizz.buzz.com:8020/flim/flam"))
|
||||
}
|
||||
|
||||
test("Insert overwrite with partition") {
|
||||
withTable("tableWithPartition") {
|
||||
sql(
|
||||
|
|
Loading…
Reference in a new issue