diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index 0c694910b0..1825af6191 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -86,20 +86,21 @@ case class InsertIntoHiveDirCommand( val jobConf = new JobConf(hadoopConf) val targetPath = new Path(storage.locationUri.get) - val writeToPath = + val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf) + val (writeToPath: Path, fs: FileSystem) = if (isLocal) { val localFileSystem = FileSystem.getLocal(jobConf) - localFileSystem.makeQualified(targetPath) + (localFileSystem.makeQualified(targetPath), localFileSystem) } else { - val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf) - val dfs = qualifiedPath.getFileSystem(jobConf) - if (!dfs.exists(qualifiedPath)) { - dfs.mkdirs(qualifiedPath.getParent) - } - qualifiedPath + val dfs = qualifiedPath.getFileSystem(hadoopConf) + (qualifiedPath, dfs) } + if (!fs.exists(writeToPath)) { + fs.mkdirs(writeToPath) + } - val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, writeToPath) + // The temporary path must be a HDFS path, not a local path. + val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, qualifiedPath) val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc( tmpPath.toString, tableDesc, false) @@ -111,15 +112,20 @@ case class InsertIntoHiveDirCommand( fileSinkConf = fileSinkConf, outputLocation = tmpPath.toString) - val fs = writeToPath.getFileSystem(hadoopConf) if (overwrite && fs.exists(writeToPath)) { fs.listStatus(writeToPath).foreach { existFile => if (Option(existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true) } } - fs.listStatus(tmpPath).foreach { - tmpFile => fs.rename(tmpFile.getPath, writeToPath) + val dfs = tmpPath.getFileSystem(hadoopConf) + dfs.listStatus(tmpPath).foreach { + tmpFile => + if (isLocal) { + dfs.copyToLocalFile(tmpFile.getPath, writeToPath) + } else { + dfs.rename(tmpFile.getPath, writeToPath) + } } } catch { case e: Throwable =>