[SPARK-28204][SQL][TESTS] Make separate two test cases for column pruning in binary files
## What changes were proposed in this pull request? SPARK-27534 missed to address my own comments at https://github.com/WeichenXu123/spark/pull/8 It's better to push this in since the codes are already cleaned up. ## How was this patch tested? Unittests fixed Closes #25003 from HyukjinKwon/SPARK-27534. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
parent
8299600575
commit
facf9c30a2
|
@ -290,56 +290,54 @@ class BinaryFileFormatSuite extends QueryTest with SharedSQLContext with SQLTest
|
|||
), true)
|
||||
}
|
||||
|
||||
private def readBinaryFile(file: File, requiredSchema: StructType): Row = {
|
||||
val format = new BinaryFileFormat
|
||||
val reader = format.buildReaderWithPartitionValues(
|
||||
sparkSession = spark,
|
||||
dataSchema = schema,
|
||||
partitionSchema = StructType(Nil),
|
||||
requiredSchema = requiredSchema,
|
||||
filters = Seq.empty,
|
||||
options = Map.empty,
|
||||
hadoopConf = spark.sessionState.newHadoopConf()
|
||||
)
|
||||
val partitionedFile = mock(classOf[PartitionedFile])
|
||||
when(partitionedFile.filePath).thenReturn(file.getPath)
|
||||
val encoder = RowEncoder(requiredSchema).resolveAndBind()
|
||||
encoder.fromRow(reader(partitionedFile).next())
|
||||
}
|
||||
|
||||
test("column pruning") {
|
||||
def getRequiredSchema(fieldNames: String*): StructType = {
|
||||
StructType(fieldNames.map {
|
||||
case f if schema.fieldNames.contains(f) => schema(f)
|
||||
case other => StructField(other, NullType)
|
||||
})
|
||||
}
|
||||
def read(file: File, requiredSchema: StructType): Row = {
|
||||
val format = new BinaryFileFormat
|
||||
val reader = format.buildReaderWithPartitionValues(
|
||||
sparkSession = spark,
|
||||
dataSchema = schema,
|
||||
partitionSchema = StructType(Nil),
|
||||
requiredSchema = requiredSchema,
|
||||
filters = Seq.empty,
|
||||
options = Map.empty,
|
||||
hadoopConf = spark.sessionState.newHadoopConf()
|
||||
)
|
||||
val partitionedFile = mock(classOf[PartitionedFile])
|
||||
when(partitionedFile.filePath).thenReturn(file.getPath)
|
||||
val encoder = RowEncoder(requiredSchema).resolveAndBind()
|
||||
encoder.fromRow(reader(partitionedFile).next())
|
||||
}
|
||||
val file = new File(Utils.createTempDir(), "data")
|
||||
val content = "123".getBytes
|
||||
Files.write(file.toPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE)
|
||||
withTempPath { file =>
|
||||
val content = "123".getBytes
|
||||
Files.write(file.toPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE)
|
||||
|
||||
read(file, getRequiredSchema(MODIFICATION_TIME, CONTENT, LENGTH, PATH)) match {
|
||||
case Row(t, c, len, p) =>
|
||||
assert(t === new Timestamp(file.lastModified()))
|
||||
assert(c === content)
|
||||
assert(len === content.length)
|
||||
assert(p.asInstanceOf[String].endsWith(file.getAbsolutePath))
|
||||
val actual = readBinaryFile(file, StructType(schema.takeRight(3)))
|
||||
val expected = Row(new Timestamp(file.lastModified()), content.length, content)
|
||||
|
||||
assert(actual === expected)
|
||||
}
|
||||
file.setReadable(false)
|
||||
withClue("cannot read content") {
|
||||
}
|
||||
|
||||
test("column pruning - non-readable file") {
|
||||
withTempPath { file =>
|
||||
val content = "abc".getBytes
|
||||
Files.write(file.toPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE)
|
||||
file.setReadable(false)
|
||||
|
||||
// If content is selected, it throws an exception because it's not readable.
|
||||
intercept[IOException] {
|
||||
read(file, getRequiredSchema(CONTENT))
|
||||
readBinaryFile(file, StructType(schema(CONTENT) :: Nil))
|
||||
}
|
||||
}
|
||||
assert(read(file, getRequiredSchema(LENGTH)) === Row(content.length),
|
||||
"Get length should not read content.")
|
||||
intercept[RuntimeException] {
|
||||
read(file, getRequiredSchema(LENGTH, "other"))
|
||||
}
|
||||
|
||||
val df = spark.read.format(BINARY_FILE).load(file.getPath)
|
||||
assert(df.count() === 1, "Count should not read content.")
|
||||
assert(df.select("LENGTH").first().getLong(0) === content.length,
|
||||
"column pruning should be case insensitive")
|
||||
// Otherwise, it should be able to read.
|
||||
assert(
|
||||
readBinaryFile(file, StructType(schema(LENGTH) :: Nil)) === Row(content.length),
|
||||
"Get length should not read content.")
|
||||
assert(
|
||||
spark.read.format(BINARY_FILE).load(file.getPath).count() === 1,
|
||||
"Count should not read content.")
|
||||
}
|
||||
}
|
||||
|
||||
test("fail fast and do not attempt to read if a file is too big") {
|
||||
|
|
Loading…
Reference in a new issue