[SPARK-3138][SQL] sqlContext.parquetFile should be able to take a single file as parameter
```if (!fs.getFileStatus(path).isDir) throw Exception``` make no sense after this commit #1370 be careful if someone is working on SPARK-2551, make sure the new change passes test case ```test("Read a parquet file instead of a directory")``` Author: chutium <teng.qiu@gmail.com> Closes #2044 from chutium/parquet-singlefile and squashes the following commits: 4ae477f [chutium] [SPARK-3138][SQL] sqlContext.parquetFile should be able to take a single file as parameter
This commit is contained in:
parent
191d7cf2a6
commit
48f42781de
|
@ -394,17 +394,14 @@ private[parquet] object ParquetTypesConverter extends Logging {
|
|||
throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath")
|
||||
}
|
||||
val path = origPath.makeQualified(fs)
|
||||
if (!fs.getFileStatus(path).isDir) {
|
||||
throw new IllegalArgumentException(
|
||||
s"Expected $path for be a directory with Parquet files/metadata")
|
||||
}
|
||||
ParquetRelation.enableLogForwarding()
|
||||
|
||||
val children = fs.listStatus(path).filterNot { status =>
|
||||
val name = status.getPath.getName
|
||||
(name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE
|
||||
}
|
||||
|
||||
ParquetRelation.enableLogForwarding()
|
||||
|
||||
// NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row
|
||||
// groups. Since Parquet schema is replicated among all row groups, we only need to touch a
|
||||
// single row group to read schema related metadata. Notice that we are making assumptions that
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.spark.sql.test.TestSQLContext
|
|||
import org.apache.spark.sql.test.TestSQLContext._
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
|
||||
case class TestRDDEntry(key: Int, value: String)
|
||||
|
||||
case class NullReflectData(
|
||||
|
@ -420,8 +419,30 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
|
|||
val rdd_copy = sql("SELECT * FROM tmpx").collect()
|
||||
val rdd_orig = rdd.collect()
|
||||
for(i <- 0 to 99) {
|
||||
assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i")
|
||||
assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i")
|
||||
assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i")
|
||||
assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line $i")
|
||||
}
|
||||
Utils.deleteRecursively(file)
|
||||
}
|
||||
|
||||
test("Read a parquet file instead of a directory") {
|
||||
val file = getTempFilePath("parquet")
|
||||
val path = file.toString
|
||||
val fsPath = new Path(path)
|
||||
val fs: FileSystem = fsPath.getFileSystem(TestSQLContext.sparkContext.hadoopConfiguration)
|
||||
val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
|
||||
.map(i => TestRDDEntry(i, s"val_$i"))
|
||||
rdd.coalesce(1).saveAsParquetFile(path)
|
||||
|
||||
val children = fs.listStatus(fsPath).filter(_.getPath.getName.endsWith(".parquet"))
|
||||
assert(children.length > 0)
|
||||
val readFile = parquetFile(path + "/" + children(0).getPath.getName)
|
||||
readFile.registerTempTable("tmpx")
|
||||
val rdd_copy = sql("SELECT * FROM tmpx").collect()
|
||||
val rdd_orig = rdd.collect()
|
||||
for(i <- 0 to 99) {
|
||||
assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i")
|
||||
assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line $i")
|
||||
}
|
||||
Utils.deleteRecursively(file)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue