[SPARK-32381][CORE][FOLLOWUP][TEST-HADOOP2.7] Don't remove SerializableFileStatus and SerializableBlockLocation for Hadoop 2.7

### What changes were proposed in this pull request?

Revert the change in #29959 and don't remove `SerializableFileStatus` and `SerializableBlockLocation`.

### Why are the changes needed?

In Hadoop 2.7 `FileStatus` and `BlockLocation` are not serializable, so we still need the two wrapper classes.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

N/A

Closes #30447 from sunchao/SPARK-32381-followup.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Chao Sun 2020-11-20 18:45:17 -08:00 committed by Dongjoon Hyun
parent 530c0a8e28
commit b623c03456
No known key found for this signature in database
GPG key ID: EDA00CE834F0FC5C

View file

@ -136,12 +136,53 @@ private[spark] object HadoopFSUtils extends Logging {
parallelismMax = 0) parallelismMax = 0)
(path, leafFiles) (path, leafFiles)
}.iterator }.iterator
}.map { case (path, statuses) =>
val serializableStatuses = statuses.map { status =>
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
val blockLocations = status match {
case f: LocatedFileStatus =>
f.getBlockLocations.map { loc =>
SerializableBlockLocation(
loc.getNames,
loc.getHosts,
loc.getOffset,
loc.getLength)
}
case _ =>
Array.empty[SerializableBlockLocation]
}
SerializableFileStatus(
status.getPath.toString,
status.getLen,
status.isDirectory,
status.getReplication,
status.getBlockSize,
status.getModificationTime,
status.getAccessTime,
blockLocations)
}
(path.toString, serializableStatuses)
}.collect() }.collect()
} finally { } finally {
sc.setJobDescription(previousJobDescription) sc.setJobDescription(previousJobDescription)
} }
statusMap.toSeq // turn SerializableFileStatus back to Status
statusMap.map { case (path, serializableStatuses) =>
val statuses = serializableStatuses.map { f =>
val blockLocations = f.blockLocations.map { loc =>
new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
}
new LocatedFileStatus(
new FileStatus(
f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,
new Path(f.path)),
blockLocations)
}
(new Path(path), statuses)
}
} }
// scalastyle:off argcount // scalastyle:off argcount
@ -291,4 +332,22 @@ private[spark] object HadoopFSUtils extends Logging {
resolvedLeafStatuses resolvedLeafStatuses
} }
// scalastyle:on argcount // scalastyle:on argcount
/** A serializable variant of HDFS's BlockLocation. This is required by Hadoop 2.7. */
private case class SerializableBlockLocation(
names: Array[String],
hosts: Array[String],
offset: Long,
length: Long)
/** A serializable variant of HDFS's FileStatus. This is required by Hadoop 2.7. */
private case class SerializableFileStatus(
path: String,
length: Long,
isDir: Boolean,
blockReplication: Short,
blockSize: Long,
modificationTime: Long,
accessTime: Long,
blockLocations: Array[SerializableBlockLocation])
} }