[SPARK-7567] [SQL] Migrating Parquet data source to FSBasedRelation

This PR migrates Parquet data source to the newly introduced `FSBasedRelation`. `FSBasedParquetRelation` is created to replace `ParquetRelation2`. Major differences are:

1. Partition discovery code has been factored out to `FSBasedRelation`
1. `AppendingParquetOutputFormat` is not used now. Instead, an anonymous subclass of `ParquetOutputFormat` is used to handle appending and writing dynamic partitions
1. When scanning partitioned tables, `FSBasedParquetRelation.buildScan` only builds an `RDD[Row]` for a single selected partition
1. `FSBasedParquetRelation` doesn't rely on Catalyst expressions for filter push down, thus it doesn't extend `CatalystScan` anymore

   After migrating `JSONRelation` (which extends `CatalystScan`), we can remove `CatalystScan`.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6090)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #6090 from liancheng/parquet-migration and squashes the following commits:

6063f87 [Cheng Lian] Casts to OutputCommitter rather than FileOutputCommtter
bfd1cf0 [Cheng Lian] Fixes compilation error introduced while rebasing
f9ea56e [Cheng Lian] Adds ParquetRelation2 related classes to MiMa check whitelist
261d8c1 [Cheng Lian] Minor bug fix and more tests
db65660 [Cheng Lian] Migrates Parquet data source to FSBasedRelation
This commit is contained in:
Cheng Lian 2015-05-13 11:04:10 -07:00 committed by Michael Armbrust
parent bec938f777
commit 7ff16e8abe
16 changed files with 933 additions and 1097 deletions

View file

@ -111,6 +111,12 @@ object MimaExcludes {
"org.apache.spark.sql.parquet.ParquetRelation2$PartitionValues"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetRelation2$PartitionValues$"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetRelation2"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetRelation2$"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache"),
// These test support classes were moved out of src/main and into src/test:
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetTestData"),

View file

@ -27,9 +27,11 @@ import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal
import com.google.common.reflect.TypeToken
import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
@ -42,6 +44,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, e
import org.apache.spark.sql.execution.{Filter, _}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json._
import org.apache.spark.sql.parquet.FSBasedParquetRelation
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@ -641,7 +644,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
if (paths.isEmpty) {
emptyDataFrame
} else if (conf.parquetUseDataSourceApi) {
baseRelationToDataFrame(parquet.ParquetRelation2(paths, Map.empty)(this))
val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
baseRelationToDataFrame(
new FSBasedParquetRelation(
globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this))
} else {
DataFrame(this, parquet.ParquetRelation(
paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))

View file

@ -29,128 +29,184 @@ import parquet.io.api.Binary
import org.apache.spark.SparkEnv
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
private[sql] object ParquetFilters {
val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
def createRecordFilter(filterExpressions: Seq[Expression]): Option[Filter] = {
filterExpressions.flatMap(createFilter).reduceOption(FilterApi.and).map(FilterCompat.get)
filterExpressions.flatMap { filter =>
createFilter(filter)
}.reduceOption(FilterApi.and).map(FilterCompat.get)
}
private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
case IntegerType =>
(n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer])
case LongType =>
(n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
(n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
// Binary.fromString and Binary.fromByteArray don't accept null values
case StringType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull)
case BinaryType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
}
private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
(n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
case IntegerType =>
(n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer])
case LongType =>
(n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
(n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
case StringType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull)
case BinaryType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
}
private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case IntegerType =>
(n: String, v: Any) => FilterApi.lt(intColumn(n), v.asInstanceOf[Integer])
case LongType =>
(n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
(n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
case StringType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
}
private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case IntegerType =>
(n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
case LongType =>
(n: String, v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
(n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
case StringType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
}
private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case IntegerType =>
(n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[java.lang.Integer])
case LongType =>
(n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
(n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
case StringType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
}
private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case IntegerType =>
(n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
case LongType =>
(n: String, v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
(n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
case StringType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
}
/**
* Converts data sources filters to Parquet filter predicates.
*/
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
// NOTE:
//
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
// which can be casted to `false` implicitly. Please refer to the `eval` method of these
// operators and the `SimplifyFilters` rule for details.
predicate match {
case sources.IsNull(name) =>
makeEq.lift(dataTypeOf(name)).map(_(name, null))
case sources.IsNotNull(name) =>
makeNotEq.lift(dataTypeOf(name)).map(_(name, null))
case sources.EqualTo(name, value) =>
makeEq.lift(dataTypeOf(name)).map(_(name, value))
case sources.Not(sources.EqualTo(name, value)) =>
makeNotEq.lift(dataTypeOf(name)).map(_(name, value))
case sources.LessThan(name, value) =>
makeLt.lift(dataTypeOf(name)).map(_(name, value))
case sources.LessThanOrEqual(name, value) =>
makeLtEq.lift(dataTypeOf(name)).map(_(name, value))
case sources.GreaterThan(name, value) =>
makeGt.lift(dataTypeOf(name)).map(_(name, value))
case sources.GreaterThanOrEqual(name, value) =>
makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
case sources.And(lhs, rhs) =>
(createFilter(schema, lhs) ++ createFilter(schema, rhs)).reduceOption(FilterApi.and)
case sources.Or(lhs, rhs) =>
for {
lhsFilter <- createFilter(schema, lhs)
rhsFilter <- createFilter(schema, rhs)
} yield FilterApi.or(lhsFilter, rhsFilter)
case sources.Not(pred) =>
createFilter(schema, pred).map(FilterApi.not)
case _ => None
}
}
/**
* Converts Catalyst predicate expressions to Parquet filter predicates.
*
* @todo This can be removed once we get rid of the old Parquet support.
*/
def createFilter(predicate: Expression): Option[FilterPredicate] = {
val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
case IntegerType =>
(n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer])
case LongType =>
(n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
(n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
// Binary.fromString and Binary.fromByteArray don't accept null values
case StringType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull)
case BinaryType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
}
val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
(n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
case IntegerType =>
(n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer])
case LongType =>
(n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
(n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
case StringType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull)
case BinaryType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
}
val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case IntegerType =>
(n: String, v: Any) => FilterApi.lt(intColumn(n), v.asInstanceOf[Integer])
case LongType =>
(n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
(n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
case StringType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
}
val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case IntegerType =>
(n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
case LongType =>
(n: String, v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
(n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
case StringType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
}
val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case IntegerType =>
(n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[java.lang.Integer])
case LongType =>
(n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
(n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
case StringType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
}
val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case IntegerType =>
(n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
case LongType =>
(n: String, v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
(n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
case StringType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
}
// NOTE:
//
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
@ -170,7 +226,7 @@ private[sql] object ParquetFilters {
makeEq.lift(dataType).map(_(name, value))
case EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
makeEq.lift(dataType).map(_(name, value))
case Not(EqualTo(NamedExpression(name, _), NonNullLiteral(value, dataType))) =>
makeNotEq.lift(dataType).map(_(name, value))
case Not(EqualTo(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _))) =>
@ -192,7 +248,7 @@ private[sql] object ParquetFilters {
case LessThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
makeLtEq.lift(dataType).map(_(name, value))
case LessThanOrEqual(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
makeLtEq.lift(dataType).map(_(name, value))
makeLtEq.lift(dataType).map(_(name, value))
case LessThanOrEqual(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
makeGtEq.lift(dataType).map(_(name, value))
case LessThanOrEqual(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
@ -201,7 +257,7 @@ private[sql] object ParquetFilters {
case GreaterThan(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
makeGt.lift(dataType).map(_(name, value))
case GreaterThan(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
makeGt.lift(dataType).map(_(name, value))
makeGt.lift(dataType).map(_(name, value))
case GreaterThan(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
makeLt.lift(dataType).map(_(name, value))
case GreaterThan(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
@ -210,7 +266,7 @@ private[sql] object ParquetFilters {
case GreaterThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
makeGtEq.lift(dataType).map(_(name, value))
case GreaterThanOrEqual(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
makeGtEq.lift(dataType).map(_(name, value))
makeGtEq.lift(dataType).map(_(name, value))
case GreaterThanOrEqual(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
makeLtEq.lift(dataType).map(_(name, value))
case GreaterThanOrEqual(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>

View file

@ -674,7 +674,7 @@ private[parquet] object FileSystemHelper {
def findMaxTaskId(pathStr: String, conf: Configuration): Int = {
val files = FileSystemHelper.listFiles(pathStr, conf)
// filename pattern is part-r-<int>.parquet
val nameP = new scala.util.matching.Regex("""part-r-(\d{1,}).parquet""", "taskid")
val nameP = new scala.util.matching.Regex("""part-.-(\d{1,}).*""", "taskid")
val hiddenFileP = new scala.util.matching.Regex("_.*")
files.map(_.getName).map {
case nameP(taskid) => taskid.toInt

View file

@ -0,0 +1,565 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.parquet
import java.util.{List => JList}
import scala.collection.JavaConversions._
import scala.util.Try
import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import parquet.filter2.predicate.FilterApi
import parquet.format.converter.ParquetMetadataConverter
import parquet.hadoop._
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD._
import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
private[sql] class DefaultSource extends FSBasedRelationProvider {
override def createRelation(
sqlContext: SQLContext,
paths: Array[String],
schema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): FSBasedRelation = {
val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
new FSBasedParquetRelation(paths, schema, partitionSpec, parameters)(sqlContext)
}
}
// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[sql] class ParquetOutputWriter extends OutputWriter {
private var recordWriter: RecordWriter[Void, Row] = _
private var taskAttemptContext: TaskAttemptContext = _
override def init(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): Unit = {
val conf = context.getConfiguration
val outputFormat = {
// When appending new Parquet files to an existing Parquet file directory, to avoid
// overwriting existing data files, we need to find out the max task ID encoded in these data
// file names.
// TODO Make this snippet a utility function for other data source developers
val maxExistingTaskId = {
// Note that `path` may point to a temporary location. Here we retrieve the real
// destination path from the configuration
val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
val fs = outputPath.getFileSystem(conf)
if (fs.exists(outputPath)) {
// Pattern used to match task ID in part file names, e.g.:
//
// part-r-00001.gz.part
// ^~~~~
val partFilePattern = """part-.-(\d{1,}).*""".r
fs.listStatus(outputPath).map(_.getPath.getName).map {
case partFilePattern(id) => id.toInt
case name if name.startsWith("_") => 0
case name if name.startsWith(".") => 0
case name => sys.error(
s"""Trying to write Parquet files to directory $outputPath,
|but found items with illegal name "$name"
""".stripMargin.replace('\n', ' ').trim)
}.reduceOption(_ max _).getOrElse(0)
} else {
0
}
}
new ParquetOutputFormat[Row]() {
// Here we override `getDefaultWorkFile` for two reasons:
//
// 1. To allow appending. We need to generate output file name based on the max available
// task ID computed above.
//
// 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses
// `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
// partitions in the case of dynamic partitioning.
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
new Path(path, f"part-r-$split%05d$extension")
}
}
}
recordWriter = outputFormat.getRecordWriter(context)
taskAttemptContext = context
}
override def write(row: Row): Unit = recordWriter.write(null, row)
override def close(): Unit = recordWriter.close(taskAttemptContext)
}
private[sql] class FSBasedParquetRelation(
paths: Array[String],
private val maybeDataSchema: Option[StructType],
private val maybePartitionSpec: Option[PartitionSpec],
parameters: Map[String, String])(
val sqlContext: SQLContext)
extends FSBasedRelation(paths, maybePartitionSpec)
with Logging {
// Should we merge schemas from all Parquet part-files?
private val shouldMergeSchemas =
parameters.getOrElse(FSBasedParquetRelation.MERGE_SCHEMA, "true").toBoolean
private val maybeMetastoreSchema = parameters
.get(FSBasedParquetRelation.METASTORE_SCHEMA)
.map(DataType.fromJson(_).asInstanceOf[StructType])
private val metadataCache = new MetadataCache
metadataCache.refresh()
override def equals(other: scala.Any): Boolean = other match {
case that: FSBasedParquetRelation =>
val schemaEquality = if (shouldMergeSchemas) {
this.shouldMergeSchemas == that.shouldMergeSchemas
} else {
this.dataSchema == that.dataSchema &&
this.schema == that.schema
}
this.paths.toSet == that.paths.toSet &&
schemaEquality &&
this.maybeDataSchema == that.maybeDataSchema &&
this.partitionColumns == that.partitionColumns
case _ => false
}
override def hashCode(): Int = {
if (shouldMergeSchemas) {
Objects.hashCode(
Boolean.box(shouldMergeSchemas),
paths.toSet,
maybeDataSchema,
maybePartitionSpec)
} else {
Objects.hashCode(
Boolean.box(shouldMergeSchemas),
paths.toSet,
dataSchema,
schema,
maybeDataSchema,
maybePartitionSpec)
}
}
override def outputWriterClass: Class[_ <: OutputWriter] = classOf[ParquetOutputWriter]
override def dataSchema: StructType = metadataCache.dataSchema
override private[sql] def refresh(): Unit = {
metadataCache.refresh()
super.refresh()
}
// Parquet data source always uses Catalyst internal representations.
override val needConversion: Boolean = false
override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum
override def prepareForWrite(job: Job): Unit = {
val conf = ContextUtil.getConfiguration(job)
val committerClass =
conf.getClass(
"spark.sql.parquet.output.committer.class",
classOf[ParquetOutputCommitter],
classOf[ParquetOutputCommitter])
conf.setClass(
"mapred.output.committer.class",
committerClass,
classOf[ParquetOutputCommitter])
// TODO There's no need to use two kinds of WriteSupport
// We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
// complex types.
val writeSupportClass =
if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
classOf[MutableRowWriteSupport]
} else {
classOf[RowWriteSupport]
}
ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
// Sets compression scheme
conf.set(
ParquetOutputFormat.COMPRESSION,
ParquetRelation
.shortParquetCompressionCodecNames
.getOrElse(
sqlContext.conf.parquetCompressionCodec.toUpperCase,
CompressionCodecName.UNCOMPRESSED).name())
}
override def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[String]): RDD[Row] = {
val job = Job.getInstance(SparkHadoopUtil.get.conf)
val conf = ContextUtil.getConfiguration(job)
ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
if (inputPaths.nonEmpty) {
FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*)
}
// Try to push down filters when filter push-down is enabled.
if (sqlContext.conf.parquetFilterPushDown) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter(dataSchema, _))
.reduceOption(FilterApi.and)
.foreach(ParquetInputFormat.setFilterPredicate(conf, _))
}
conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
})
conf.set(
RowWriteSupport.SPARK_ROW_SCHEMA,
ParquetTypesConverter.convertToString(dataSchema.toAttributes))
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
val inputFileStatuses =
metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString))
val footers = inputFileStatuses.map(metadataCache.footers)
// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
// footers. Especially when a global arbitrative schema (either from metastore or data source
// DDL) is available.
new NewHadoopRDD(
sqlContext.sparkContext,
classOf[FilteringParquetRowInputFormat],
classOf[Void],
classOf[Row],
conf) {
val cacheMetadata = useMetadataCache
@transient val cachedStatuses = inputFileStatuses.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
val pathWithAuthority = new Path(f.getPath.toUri.toString)
new FileStatus(
f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
}.toSeq
@transient val cachedFooters = footers.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
}.toSeq
// Overridden so we can inject our own cached files statuses.
override def getPartitions: Array[SparkPartition] = {
val inputFormat = if (cacheMetadata) {
new FilteringParquetRowInputFormat {
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
}
} else {
new FilteringParquetRowInputFormat
}
val jobContext = newJobContext(getConf, jobId)
val rawSplits = inputFormat.getSplits(jobContext)
Array.tabulate[SparkPartition](rawSplits.size) { i =>
new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
}
}.values
}
private class MetadataCache {
// `FileStatus` objects of all "_metadata" files.
private var metadataStatuses: Array[FileStatus] = _
// `FileStatus` objects of all "_common_metadata" files.
private var commonMetadataStatuses: Array[FileStatus] = _
// Parquet footer cache.
var footers: Map[FileStatus, Footer] = _
// `FileStatus` objects of all data files (Parquet part-files).
var dataStatuses: Array[FileStatus] = _
// Schema of the actual Parquet files, without partition columns discovered from partition
// directory paths.
var dataSchema: StructType = _
// Schema of the whole table, including partition columns.
var schema: StructType = _
/**
* Refreshes `FileStatus`es, footers, partition spec, and table schema.
*/
def refresh(): Unit = {
// Support either reading a collection of raw Parquet part-files, or a collection of folders
// containing Parquet files (e.g. partitioned Parquet table).
val baseStatuses = paths.distinct.flatMap { p =>
val path = new Path(p)
val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
Try(fs.getFileStatus(qualified)).toOption
}
assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))
// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
val leaves = baseStatuses.flatMap { f =>
val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf)
SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
}
}
dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
commonMetadataStatuses =
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
val parquetMetadata = ParquetFileReader.readFooter(
SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER)
f -> new Footer(f.getPath, parquetMetadata)
}.seq.toMap
dataSchema = {
val dataSchema0 =
maybeDataSchema
.orElse(readSchema())
.orElse(maybeMetastoreSchema)
.getOrElse(sys.error("Failed to get the schema."))
// If this Parquet relation is converted from a Hive Metastore table, must reconcile case
// case insensitivity issue and possible schema mismatch (probably caused by schema
// evolution).
maybeMetastoreSchema
.map(FSBasedParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0))
.getOrElse(dataSchema0)
}
}
private def isSummaryFile(file: Path): Boolean = {
file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}
private def readSchema(): Option[StructType] = {
// Sees which file(s) we need to touch in order to figure out the schema.
//
// Always tries the summary files first if users don't require a merged schema. In this case,
// "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
// groups information, and could be much smaller for large Parquet files with lots of row
// groups.
//
// NOTE: Metadata stored in the summary files are merged from all part-files. However, for
// user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
// how to merge them correctly if some key is associated with different values in different
// part-files. When this happens, Parquet simply gives up generating the summary file. This
// implies that if a summary file presents, then:
//
// 1. Either all part-files have exactly the same Spark SQL schema, or
// 2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
// their schemas may differ from each other).
//
// Here we tend to be pessimistic and take the second case into account. Basically this means
// we can't trust the summary files if users require a merged schema, and must touch all part-
// files to do the merge.
val filesToTouch =
if (shouldMergeSchemas) {
// Also includes summary files, 'cause there might be empty partition directories.
(metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
} else {
// Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
// don't have this.
commonMetadataStatuses.headOption
// Falls back to "_metadata"
.orElse(metadataStatuses.headOption)
// Summary file(s) not found, the Parquet file is either corrupted, or different part-
// files contain conflicting user defined metadata (two or more values are associated
// with a same key in different files). In either case, we fall back to any of the
// first part-file, and just assume all schemas are consistent.
.orElse(dataStatuses.headOption)
.toSeq
}
assert(
filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
"No schema defined, " +
s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
FSBasedParquetRelation.readSchema(filesToTouch.map(footers.apply), sqlContext)
}
}
}
private[sql] object FSBasedParquetRelation extends Logging {
// Whether we should merge schemas collected from all Parquet part-files.
private[sql] val MERGE_SCHEMA = "mergeSchema"
// Hive Metastore schema, used when converting Metastore Parquet tables. This option is only used
// internally.
private[sql] val METASTORE_SCHEMA = "metastoreSchema"
private[parquet] def readSchema(
footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
footers.map { footer =>
val metadata = footer.getParquetMetadata.getFileMetaData
val parquetSchema = metadata.getSchema
val maybeSparkSchema = metadata
.getKeyValueMetaData
.toMap
.get(RowReadSupport.SPARK_METADATA_KEY)
.flatMap { serializedSchema =>
// Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
// whatever is available.
Try(DataType.fromJson(serializedSchema))
.recover { case _: Throwable =>
logInfo(
s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
"falling back to the deprecated DataType.fromCaseClassString parser.")
DataType.fromCaseClassString(serializedSchema)
}
.recover { case cause: Throwable =>
logWarning(
s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
|\t$serializedSchema
""".stripMargin,
cause)
}
.map(_.asInstanceOf[StructType])
.toOption
}
maybeSparkSchema.getOrElse {
// Falls back to Parquet schema if Spark SQL schema is absent.
StructType.fromAttributes(
// TODO Really no need to use `Attribute` here, we only need to know the data type.
ParquetTypesConverter.convertToAttributes(
parquetSchema,
sqlContext.conf.isParquetBinaryAsString,
sqlContext.conf.isParquetINT96AsTimestamp))
}
}.reduceOption { (left, right) =>
try left.merge(right) catch { case e: Throwable =>
throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
}
}
}
/**
* Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
* schema and Parquet schema.
*
* Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
* schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
* distinguish binary and string). This method generates a correct schema by merging Metastore
* schema data types and Parquet schema field names.
*/
private[parquet] def mergeMetastoreParquetSchema(
metastoreSchema: StructType,
parquetSchema: StructType): StructType = {
def schemaConflictMessage: String =
s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
|${metastoreSchema.prettyJson}
|
|Parquet schema:
|${parquetSchema.prettyJson}
""".stripMargin
val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
val ordinalMap = metastoreSchema.zipWithIndex.map {
case (field, index) => field.name.toLowerCase -> index
}.toMap
val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
StructType(metastoreSchema.zip(reorderedParquetSchema).map {
// Uses Parquet field names but retains Metastore data types.
case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
mSchema.copy(name = pSchema.name)
case _ =>
throw new SparkException(schemaConflictMessage)
})
}
/**
* Returns the original schema from the Parquet file with any missing nullable fields from the
* Hive Metastore schema merged in.
*
* When constructing a DataFrame from a collection of structured data, the resulting object has
* a schema corresponding to the union of the fields present in each element of the collection.
* Spark SQL simply assigns a null value to any field that isn't present for a particular row.
* In some cases, it is possible that a given table partition stored as a Parquet file doesn't
* contain a particular nullable field in its schema despite that field being present in the
* table schema obtained from the Hive Metastore. This method returns a schema representing the
* Parquet file schema along with any additional nullable fields from the Metastore schema
* merged in.
*/
private[parquet] def mergeMissingNullableFields(
metastoreSchema: StructType,
parquetSchema: StructType): StructType = {
val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
val missingFields = metastoreSchema
.map(_.name.toLowerCase)
.diff(parquetSchema.map(_.name.toLowerCase))
.map(fieldMap(_))
.filter(_.nullable)
StructType(parquetSchema ++ missingFields)
}
}

View file

@ -1,840 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.parquet
import java.io.IOException
import java.lang.{Double => JDouble, Float => JFloat, Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{InputSplit, Job, JobContext}
import parquet.filter2.predicate.FilterApi
import parquet.format.converter.ParquetMetadataConverter
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil
import parquet.hadoop.{ParquetInputFormat, _}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, expressions}
import org.apache.spark.sql.parquet.ParquetTypesConverter._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{IntegerType, StructField, StructType, _}
import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext, SaveMode}
import org.apache.spark.{Logging, SerializableWritable, SparkException, TaskContext, Partition => SparkPartition}
/**
* Allows creation of Parquet based tables using the syntax:
* {{{
* CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet OPTIONS (...)
* }}}
*
* Supported options include:
*
* - `path`: Required. When reading Parquet files, `path` should point to the location of the
* Parquet file(s). It can be either a single raw Parquet file, or a directory of Parquet files.
* In the latter case, this data source tries to discover partitioning information if the the
* directory is structured in the same style of Hive partitioned tables. When writing Parquet
* file, `path` should point to the destination folder.
*
* - `mergeSchema`: Optional. Indicates whether we should merge potentially different (but
* compatible) schemas stored in all Parquet part-files.
*
* - `partition.defaultName`: Optional. Partition name used when a value of a partition column is
* null or empty string. This is similar to the `hive.exec.default.partition.name` configuration
* in Hive.
*/
private[sql] class DefaultSource
extends RelationProvider
with SchemaRelationProvider
with CreatableRelationProvider {
private def checkPath(parameters: Map[String, String]): String = {
parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables."))
}
/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
ParquetRelation2(Seq(checkPath(parameters)), parameters, None)(sqlContext)
}
/** Returns a new base relation with the given parameters and schema. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation = {
ParquetRelation2(Seq(checkPath(parameters)), parameters, Some(schema))(sqlContext)
}
/** Returns a new base relation with the given parameters and save given data into it. */
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
val path = checkPath(parameters)
val filesystemPath = new Path(path)
val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val doInsertion = (mode, fs.exists(filesystemPath)) match {
case (SaveMode.ErrorIfExists, true) =>
sys.error(s"path $path already exists.")
case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
true
case (SaveMode.Ignore, exists) =>
!exists
}
val relation = if (doInsertion) {
// This is a hack. We always set nullable/containsNull/valueContainsNull to true
// for the schema of a parquet data.
val df =
sqlContext.createDataFrame(
data.queryExecution.toRdd,
data.schema.asNullable,
needsConversion = false)
val createdRelation =
createRelation(sqlContext, parameters, df.schema).asInstanceOf[ParquetRelation2]
createdRelation.insert(df, overwrite = mode == SaveMode.Overwrite)
createdRelation
} else {
// If the save mode is Ignore, we will just create the relation based on existing data.
createRelation(sqlContext, parameters)
}
relation
}
}
/**
* An alternative to [[ParquetRelation]] that plugs in using the data sources API. This class is
* intended as a full replacement of the Parquet support in Spark SQL. The old implementation will
* be deprecated and eventually removed once this version is proved to be stable enough.
*
* Compared with the old implementation, this class has the following notable differences:
*
* - Partitioning discovery: Hive style multi-level partitions are auto discovered.
* - Metadata discovery: Parquet is a format comes with schema evolving support. This data source
* can detect and merge schemas from all Parquet part-files as long as they are compatible.
* Also, metadata and [[FileStatus]]es are cached for better performance.
* - Statistics: Statistics for the size of the table are automatically populated during schema
* discovery.
*/
@DeveloperApi
private[sql] case class ParquetRelation2(
paths: Seq[String],
parameters: Map[String, String],
maybeSchema: Option[StructType] = None,
maybePartitionSpec: Option[PartitionSpec] = None)(
@transient val sqlContext: SQLContext)
extends BaseRelation
with CatalystScan
with InsertableRelation
with SparkHadoopMapReduceUtil
with Logging {
// Should we merge schemas from all Parquet part-files?
private val shouldMergeSchemas =
parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
// Optional Metastore schema, used when converting Hive Metastore Parquet table
private val maybeMetastoreSchema =
parameters
.get(ParquetRelation2.METASTORE_SCHEMA)
.map(s => DataType.fromJson(s).asInstanceOf[StructType])
// Hive uses this as part of the default partition name when the partition column value is null
// or empty string
private val defaultPartitionName = parameters.getOrElse(
ParquetRelation2.DEFAULT_PARTITION_NAME, "__HIVE_DEFAULT_PARTITION__")
override def equals(other: Any): Boolean = other match {
case relation: ParquetRelation2 =>
// If schema merging is required, we don't compare the actual schemas since they may evolve.
val schemaEquality = if (shouldMergeSchemas) {
shouldMergeSchemas == relation.shouldMergeSchemas
} else {
schema == relation.schema
}
paths.toSet == relation.paths.toSet &&
schemaEquality &&
maybeMetastoreSchema == relation.maybeMetastoreSchema &&
maybePartitionSpec == relation.maybePartitionSpec
case _ => false
}
override def hashCode(): Int = {
if (shouldMergeSchemas) {
com.google.common.base.Objects.hashCode(
shouldMergeSchemas: java.lang.Boolean,
paths.toSet,
maybeMetastoreSchema,
maybePartitionSpec)
} else {
com.google.common.base.Objects.hashCode(
shouldMergeSchemas: java.lang.Boolean,
schema,
paths.toSet,
maybeMetastoreSchema,
maybePartitionSpec)
}
}
private[sql] def sparkContext = sqlContext.sparkContext
private class MetadataCache {
// `FileStatus` objects of all "_metadata" files.
private var metadataStatuses: Array[FileStatus] = _
// `FileStatus` objects of all "_common_metadata" files.
private var commonMetadataStatuses: Array[FileStatus] = _
// Parquet footer cache.
var footers: Map[FileStatus, Footer] = _
// `FileStatus` objects of all data files (Parquet part-files).
var dataStatuses: Array[FileStatus] = _
// Partition spec of this table, including names, data types, and values of each partition
// column, and paths of each partition.
var partitionSpec: PartitionSpec = _
// Schema of the actual Parquet files, without partition columns discovered from partition
// directory paths.
var parquetSchema: StructType = _
// Schema of the whole table, including partition columns.
var schema: StructType = _
// Indicates whether partition columns are also included in Parquet data file schema. If not,
// we need to fill in partition column values into read rows when scanning the table.
var partitionKeysIncludedInParquetSchema: Boolean = _
def prepareMetadata(path: Path, schema: StructType, conf: Configuration): Unit = {
conf.set(
ParquetOutputFormat.COMPRESSION,
ParquetRelation
.shortParquetCompressionCodecNames
.getOrElse(
sqlContext.conf.parquetCompressionCodec.toUpperCase,
CompressionCodecName.UNCOMPRESSED).name())
ParquetRelation.enableLogForwarding()
ParquetTypesConverter.writeMetaData(schema.toAttributes, path, conf)
}
/**
* Refreshes `FileStatus`es, footers, partition spec, and table schema.
*/
def refresh(): Unit = {
// Support either reading a collection of raw Parquet part-files, or a collection of folders
// containing Parquet files (e.g. partitioned Parquet table).
val baseStatuses = paths.distinct.map { p =>
val fs = FileSystem.get(URI.create(p), sparkContext.hadoopConfiguration)
val path = new Path(p)
val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
if (!fs.exists(qualified) && maybeSchema.isDefined) {
fs.mkdirs(qualified)
prepareMetadata(qualified, maybeSchema.get, sparkContext.hadoopConfiguration)
}
fs.getFileStatus(qualified)
}.toArray
assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))
// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
val leaves = baseStatuses.flatMap { f =>
val fs = FileSystem.get(f.getPath.toUri, sparkContext.hadoopConfiguration)
SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
}
}
dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
commonMetadataStatuses =
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
val parquetMetadata = ParquetFileReader.readFooter(
sparkContext.hadoopConfiguration, f, ParquetMetadataConverter.NO_FILTER)
f -> new Footer(f.getPath, parquetMetadata)
}.seq.toMap
partitionSpec = maybePartitionSpec.getOrElse {
val partitionDirs = leaves
.filterNot(baseStatuses.contains)
.map(_.getPath.getParent)
.distinct
if (partitionDirs.nonEmpty) {
// Parses names and values of partition columns, and infer their data types.
PartitioningUtils.parsePartitions(partitionDirs, defaultPartitionName)
} else {
// No partition directories found, makes an empty specification
PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition])
}
}
// To get the schema. We first try to get the schema defined in maybeSchema.
// If maybeSchema is not defined, we will try to get the schema from existing parquet data
// (through readSchema). If data does not exist, we will try to get the schema defined in
// maybeMetastoreSchema (defined in the options of the data source).
// Finally, if we still could not get the schema. We throw an error.
parquetSchema =
maybeSchema
.orElse(readSchema())
.orElse(maybeMetastoreSchema)
.getOrElse(sys.error("Failed to get the schema."))
partitionKeysIncludedInParquetSchema =
isPartitioned &&
partitionColumns.forall(f => parquetSchema.fieldNames.contains(f.name))
schema = {
val fullRelationSchema = if (partitionKeysIncludedInParquetSchema) {
parquetSchema
} else {
StructType(parquetSchema.fields ++ partitionColumns.fields)
}
// If this Parquet relation is converted from a Hive Metastore table, must reconcile case
// insensitivity issue and possible schema mismatch.
maybeMetastoreSchema
.map(ParquetRelation2.mergeMetastoreParquetSchema(_, fullRelationSchema))
.getOrElse(fullRelationSchema)
}
}
private def readSchema(): Option[StructType] = {
// Sees which file(s) we need to touch in order to figure out the schema.
val filesToTouch =
// Always tries the summary files first if users don't require a merged schema. In this case,
// "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
// groups information, and could be much smaller for large Parquet files with lots of row
// groups.
//
// NOTE: Metadata stored in the summary files are merged from all part-files. However, for
// user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
// how to merge them correctly if some key is associated with different values in different
// part-files. When this happens, Parquet simply gives up generating the summary file. This
// implies that if a summary file presents, then:
//
// 1. Either all part-files have exactly the same Spark SQL schema, or
// 2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
// their schemas may differ from each other).
//
// Here we tend to be pessimistic and take the second case into account. Basically this means
// we can't trust the summary files if users require a merged schema, and must touch all part-
// files to do the merge.
if (shouldMergeSchemas) {
// Also includes summary files, 'cause there might be empty partition directories.
(metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
} else {
// Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
// don't have this.
commonMetadataStatuses.headOption
// Falls back to "_metadata"
.orElse(metadataStatuses.headOption)
// Summary file(s) not found, the Parquet file is either corrupted, or different part-
// files contain conflicting user defined metadata (two or more values are associated
// with a same key in different files). In either case, we fall back to any of the
// first part-file, and just assume all schemas are consistent.
.orElse(dataStatuses.headOption)
.toSeq
}
ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
}
}
@transient private val metadataCache = new MetadataCache
metadataCache.refresh()
def partitionSpec: PartitionSpec = metadataCache.partitionSpec
def partitionColumns: StructType = metadataCache.partitionSpec.partitionColumns
def partitions: Seq[Partition] = metadataCache.partitionSpec.partitions
def isPartitioned: Boolean = partitionColumns.nonEmpty
private def partitionKeysIncludedInDataSchema = metadataCache.partitionKeysIncludedInParquetSchema
private def parquetSchema = metadataCache.parquetSchema
override def schema: StructType = metadataCache.schema
private def isSummaryFile(file: Path): Boolean = {
file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}
// Skip type conversion
override val needConversion: Boolean = false
// TODO Should calculate per scan size
// It's common that a query only scans a fraction of a large Parquet file. Returning size of the
// whole Parquet file disables some optimizations in this case (e.g. broadcast join).
override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum
// This is mostly a hack so that we can use the existing parquet filter code.
override def buildScan(output: Seq[Attribute], predicates: Seq[Expression]): RDD[Row] = {
val job = new Job(sparkContext.hadoopConfiguration)
ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
val jobConf: Configuration = ContextUtil.getConfiguration(job)
val selectedPartitions = prunePartitions(predicates, partitions)
val selectedFiles = if (isPartitioned) {
selectedPartitions.flatMap { p =>
metadataCache.dataStatuses.filter(_.getPath.getParent.toString == p.path)
}
} else {
metadataCache.dataStatuses.toSeq
}
val selectedFooters = selectedFiles.map(metadataCache.footers)
// FileInputFormat cannot handle empty lists.
if (selectedFiles.nonEmpty) {
// In order to encode the authority of a Path containning special characters such as /,
// we need to use the string retruned by the URI of the path to create a new Path.
val selectedPaths = selectedFiles.map(status => new Path(status.getPath.toUri.toString))
FileInputFormat.setInputPaths(job, selectedPaths: _*)
}
// Try to push down filters when filter push-down is enabled.
if (sqlContext.conf.parquetFilterPushDown) {
val partitionColNames = partitionColumns.map(_.name).toSet
predicates
// Don't push down predicates which reference partition columns
.filter { pred =>
val referencedColNames = pred.references.map(_.name).toSet
referencedColNames.intersect(partitionColNames).isEmpty
}
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter)
.reduceOption(FilterApi.and)
.foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
}
if (isPartitioned) {
logInfo {
val percentRead = selectedPartitions.size.toDouble / partitions.size.toDouble * 100
s"Reading $percentRead% of partitions"
}
}
val requiredColumns = output.map(_.name)
val requestedSchema = StructType(requiredColumns.map(schema(_)))
// Store both requested and original schema in `Configuration`
jobConf.set(
RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
convertToString(requestedSchema.toAttributes))
jobConf.set(
RowWriteSupport.SPARK_ROW_SCHEMA,
convertToString(schema.toAttributes))
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
val useCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
jobConf.set(SQLConf.PARQUET_CACHE_METADATA, useCache.toString)
val baseRDD =
new NewHadoopRDD(
sparkContext,
classOf[FilteringParquetRowInputFormat],
classOf[Void],
classOf[Row],
jobConf) {
val cacheMetadata = useCache
@transient
val cachedStatus = selectedFiles.map { st =>
// In order to encode the authority of a Path containning special characters such as /,
// we need to use the string retruned by the URI of the path to create a new Path.
val newPath = new Path(st.getPath.toUri.toString)
new FileStatus(
st.getLen,
st.isDir,
st.getReplication,
st.getBlockSize,
st.getModificationTime,
st.getAccessTime,
st.getPermission,
st.getOwner,
st.getGroup,
newPath)
}
@transient
val cachedFooters = selectedFooters.map { f =>
// In order to encode the authority of a Path containning special characters such as /,
// we need to use the string retruned by the URI of the path to create a new Path.
new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
}
// Overridden so we can inject our own cached files statuses.
override def getPartitions: Array[SparkPartition] = {
val inputFormat = if (cacheMetadata) {
new FilteringParquetRowInputFormat {
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
}
} else {
new FilteringParquetRowInputFormat
}
val jobContext = newJobContext(getConf, jobId)
val rawSplits = inputFormat.getSplits(jobContext)
Array.tabulate[SparkPartition](rawSplits.size) { i =>
new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
}
}
// The ordinals for partition keys in the result row, if requested.
val partitionKeyLocations = partitionColumns.fieldNames.zipWithIndex.map {
case (name, index) => index -> requiredColumns.indexOf(name)
}.toMap.filter {
case (_, index) => index >= 0
}
// When the data does not include the key and the key is requested then we must fill it in
// based on information from the input split.
if (!partitionKeysIncludedInDataSchema && partitionKeyLocations.nonEmpty) {
// This check is based on CatalystConverter.createRootConverter.
val primitiveRow =
requestedSchema.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))
baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
val partValues = selectedPartitions.collectFirst {
case p if split.getPath.getParent.toString == p.path =>
CatalystTypeConverters.convertToCatalyst(p.values).asInstanceOf[Row]
}.get
val requiredPartOrdinal = partitionKeyLocations.keys.toSeq
if (primitiveRow) {
iterator.map { pair =>
// We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
val row = pair._2.asInstanceOf[SpecificMutableRow]
var i = 0
while (i < requiredPartOrdinal.size) {
// TODO Avoids boxing cost here!
val partOrdinal = requiredPartOrdinal(i)
row.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
i += 1
}
row
}
} else {
// Create a mutable row since we need to fill in values from partition columns.
val mutableRow = new GenericMutableRow(requestedSchema.size)
iterator.map { pair =>
// We are using CatalystGroupConverter and it returns a GenericRow.
// Since GenericRow is not mutable, we just cast it to a Row.
val row = pair._2.asInstanceOf[Row]
var i = 0
while (i < row.size) {
// TODO Avoids boxing cost here!
mutableRow(i) = row(i)
i += 1
}
i = 0
while (i < requiredPartOrdinal.size) {
// TODO Avoids boxing cost here!
val partOrdinal = requiredPartOrdinal(i)
mutableRow.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
i += 1
}
mutableRow
}
}
}
} else {
baseRDD.map(_._2)
}
}
private def prunePartitions(
predicates: Seq[Expression],
partitions: Seq[Partition]): Seq[Partition] = {
val partitionColumnNames = partitionColumns.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
}
val rawPredicate =
partitionPruningPredicates.reduceOption(expressions.And).getOrElse(Literal(true))
val boundPredicate = InterpretedPredicate.create(rawPredicate transform {
case a: AttributeReference =>
val index = partitionColumns.indexWhere(a.name == _.name)
BoundReference(index, partitionColumns(index).dataType, nullable = true)
})
if (isPartitioned && partitionPruningPredicates.nonEmpty) {
partitions.filter(p => boundPredicate(p.values))
} else {
partitions
}
}
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
assert(paths.size == 1, s"Can't write to multiple destinations: ${paths.mkString(",")}")
// TODO: currently we do not check whether the "schema"s are compatible
// That means if one first creates a table and then INSERTs data with
// and incompatible schema the execution will fail. It would be nice
// to catch this early one, maybe having the planner validate the schema
// before calling execute().
val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
val writeSupport =
if (parquetSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
log.debug("Initializing MutableRowWriteSupport")
classOf[MutableRowWriteSupport]
} else {
classOf[RowWriteSupport]
}
ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
val conf = ContextUtil.getConfiguration(job)
RowWriteSupport.setSchema(data.schema.toAttributes, conf)
val destinationPath = new Path(paths.head)
if (overwrite) {
val fs = destinationPath.getFileSystem(conf)
if (fs.exists(destinationPath)) {
var success: Boolean = false
try {
success = fs.delete(destinationPath, true)
} catch {
case e: IOException =>
throw new IOException(
s"Unable to clear output directory ${destinationPath.toString} prior" +
s" to writing to Parquet table:\n${e.toString}")
}
if (!success) {
throw new IOException(
s"Unable to clear output directory ${destinationPath.toString} prior" +
s" to writing to Parquet table.")
}
}
}
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[Row])
FileOutputFormat.setOutputPath(job, destinationPath)
val wrappedConf = new SerializableWritable(job.getConfiguration)
val jobTrackerId = new SimpleDateFormat("yyyyMMddHHmm").format(new Date())
val stageId = sqlContext.sparkContext.newRddId()
val taskIdOffset = if (overwrite) {
1
} else {
FileSystemHelper.findMaxTaskId(
FileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
}
def writeShard(context: TaskContext, iterator: Iterator[Row]): Unit = {
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = newTaskAttemptID(
jobTrackerId, stageId, isMap = false, context.partitionId(), context.attemptNumber())
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val format = new AppendingParquetOutputFormat(taskIdOffset)
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val writer = format.getRecordWriter(hadoopContext)
try {
while (iterator.hasNext) {
val row = iterator.next()
writer.write(null, row)
}
} finally {
writer.close(hadoopContext)
}
SparkHadoopMapRedUtil.commitTask(committer, hadoopContext, context)
}
val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
/* apparently we need a TaskAttemptID to construct an OutputCommitter;
* however we're only going to use this local OutputCommitter for
* setupJob/commitJob, so we just use a dummy "map" task.
*/
val jobAttemptId = newTaskAttemptID(jobTrackerId, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)
sqlContext.sparkContext.runJob(data.queryExecution.executedPlan.execute(), writeShard _)
jobCommitter.commitJob(jobTaskContext)
metadataCache.refresh()
}
}
private[sql] object ParquetRelation2 extends Logging {
// Whether we should merge schemas collected from all Parquet part-files.
val MERGE_SCHEMA = "mergeSchema"
// Default partition name to use when the partition column value is null or empty string.
val DEFAULT_PARTITION_NAME = "partition.defaultName"
// Hive Metastore schema, used when converting Metastore Parquet tables. This option is only used
// internally.
private[sql] val METASTORE_SCHEMA = "metastoreSchema"
private[parquet] def readSchema(
footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
footers.map { footer =>
val metadata = footer.getParquetMetadata.getFileMetaData
val parquetSchema = metadata.getSchema
val maybeSparkSchema = metadata
.getKeyValueMetaData
.toMap
.get(RowReadSupport.SPARK_METADATA_KEY)
.flatMap { serializedSchema =>
// Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
// whatever is available.
Try(DataType.fromJson(serializedSchema))
.recover { case _: Throwable =>
logInfo(
s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
"falling back to the deprecated DataType.fromCaseClassString parser.")
DataType.fromCaseClassString(serializedSchema)
}
.recover { case cause: Throwable =>
logWarning(
s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
|\t$serializedSchema
""".stripMargin,
cause)
}
.map(_.asInstanceOf[StructType])
.toOption
}
maybeSparkSchema.getOrElse {
// Falls back to Parquet schema if Spark SQL schema is absent.
StructType.fromAttributes(
// TODO Really no need to use `Attribute` here, we only need to know the data type.
convertToAttributes(
parquetSchema,
sqlContext.conf.isParquetBinaryAsString,
sqlContext.conf.isParquetINT96AsTimestamp))
}
}.reduceOption { (left, right) =>
try left.merge(right) catch { case e: Throwable =>
throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
}
}
}
/**
* Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
* schema and Parquet schema.
*
* Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
* schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
* distinguish binary and string). This method generates a correct schema by merging Metastore
* schema data types and Parquet schema field names.
*/
private[parquet] def mergeMetastoreParquetSchema(
metastoreSchema: StructType,
parquetSchema: StructType): StructType = {
def schemaConflictMessage: String =
s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
|${metastoreSchema.prettyJson}
|
|Parquet schema:
|${parquetSchema.prettyJson}
""".stripMargin
val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
val ordinalMap = metastoreSchema.zipWithIndex.map {
case (field, index) => field.name.toLowerCase -> index
}.toMap
val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
StructType(metastoreSchema.zip(reorderedParquetSchema).map {
// Uses Parquet field names but retains Metastore data types.
case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
mSchema.copy(name = pSchema.name)
case _ =>
throw new SparkException(schemaConflictMessage)
})
}
/**
* Returns the original schema from the Parquet file with any missing nullable fields from the
* Hive Metastore schema merged in.
*
* When constructing a DataFrame from a collection of structured data, the resulting object has
* a schema corresponding to the union of the fields present in each element of the collection.
* Spark SQL simply assigns a null value to any field that isn't present for a particular row.
* In some cases, it is possible that a given table partition stored as a Parquet file doesn't
* contain a particular nullable field in its schema despite that field being present in the
* table schema obtained from the Hive Metastore. This method returns a schema representing the
* Parquet file schema along with any additional nullable fields from the Metastore schema
* merged in.
*/
private[parquet] def mergeMissingNullableFields(
metastoreSchema: StructType,
parquetSchema: StructType): StructType = {
val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
val missingFields = metastoreSchema
.map(_.name.toLowerCase)
.diff(parquetSchema.map(_.name.toLowerCase))
.map(fieldMap(_))
.filter(_.nullable)
StructType(parquetSchema ++ missingFields)
}
}

View file

@ -293,9 +293,18 @@ private[sql] abstract class BaseWriterContainer(
}
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
outputFormatClass.newInstance().getOutputCommitter(context)
val committerClass = context.getConfiguration.getClass(
"mapred.output.committer.class", null, classOf[OutputCommitter])
Option(committerClass).map { clazz =>
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
ctor.newInstance(new Path(outputPath), context)
}.getOrElse {
outputFormatClass.newInstance().getOutputCommitter(context)
}
}
private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
this.taskId = new TaskID(this.jobId, true, splitId)
@ -345,6 +354,7 @@ private[sql] class DefaultWriterContainer(
override protected def initWriters(): Unit = {
writer = outputWriterClass.newInstance()
taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
writer.init(getWorkPath, dataSchema, taskAttemptContext)
}
@ -384,11 +394,14 @@ private[sql] class DynamicPartitionWriterContainer(
DynamicPartitionWriterContainer.escapePathName(string)
}
s"/$col=$valueString"
}.mkString
}.mkString.stripPrefix(Path.SEPARATOR)
outputWriters.getOrElseUpdate(partitionPath, {
val path = new Path(getWorkPath, partitionPath.stripPrefix(Path.SEPARATOR))
val path = new Path(getWorkPath, partitionPath)
val writer = outputWriterClass.newInstance()
taskAttemptContext.getConfiguration.set(
"spark.sql.sources.output.path",
new Path(outputPath, partitionPath).toString)
writer.init(path.toString, dataSchema, taskAttemptContext)
writer
})

View file

@ -63,7 +63,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
}.flatten.reduceOption(_ && _)
val forParquetDataSource = query.queryExecution.optimizedPlan.collect {
case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation2)) => filters
case PhysicalOperation(_, filters, LogicalRelation(_: FSBasedParquetRelation)) => filters
}.flatten.reduceOption(_ && _)
forParquetTableScan.orElse(forParquetDataSource)
@ -350,7 +350,7 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before
override protected def afterAll(): Unit = {
sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}
test("SPARK-6742: don't push down predicates which reference partition columns") {
import sqlContext.implicits._
@ -365,7 +365,7 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before
path,
Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext,
Seq(AttributeReference("part", IntegerType, false)()) ))
checkAnswer(
df.filter("a = 1 or part = 1"),
(1 to 3).map(i => Row(1, i, i.toString)))

View file

@ -119,7 +119,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
// Decimals with precision above 18 are not yet supported
intercept[RuntimeException] {
intercept[Throwable] {
withTempPath { dir =>
makeDecimalRDD(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath)
parquetFile(dir.getCanonicalPath).collect()
@ -127,7 +127,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
// Unlimited-length decimals are not yet supported
intercept[RuntimeException] {
intercept[Throwable] {
withTempPath { dir =>
makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath)
parquetFile(dir.getCanonicalPath).collect()
@ -419,7 +419,7 @@ class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterA
test("SPARK-6330 regression test") {
// In 1.3.0, save to fs other than file: without configuring core-site.xml would get:
// IllegalArgumentException: Wrong FS: hdfs://..., expected: file:///
intercept[java.io.FileNotFoundException] {
intercept[Throwable] {
sqlContext.parquetFile("file:///nonexistent")
}
val errorMessage = intercept[Throwable] {

View file

@ -39,7 +39,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
import sqlContext._
import sqlContext.implicits._
val defaultPartitionName = "__NULL__"
val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__"
test("column type inference") {
def check(raw: String, literal: Literal): Unit = {
@ -252,9 +252,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
val parquetRelation = load(
"org.apache.spark.sql.parquet",
Map(
"path" -> base.getCanonicalPath,
ParquetRelation2.DEFAULT_PARTITION_NAME -> defaultPartitionName))
Map("path" -> base.getCanonicalPath))
parquetRelation.registerTempTable("t")
@ -297,9 +295,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
val parquetRelation = load(
"org.apache.spark.sql.parquet",
Map(
"path" -> base.getCanonicalPath,
ParquetRelation2.DEFAULT_PARTITION_NAME -> defaultPartitionName))
Map("path" -> base.getCanonicalPath))
parquetRelation.registerTempTable("t")

View file

@ -204,7 +204,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
StructField("lowerCase", StringType),
StructField("UPPERCase", DoubleType, nullable = false)))) {
ParquetRelation2.mergeMetastoreParquetSchema(
FSBasedParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("lowercase", StringType),
StructField("uppercase", DoubleType, nullable = false))),
@ -219,7 +219,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
StructType(Seq(
StructField("UPPERCase", DoubleType, nullable = false)))) {
ParquetRelation2.mergeMetastoreParquetSchema(
FSBasedParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false))),
@ -230,7 +230,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
// Metastore schema contains additional non-nullable fields.
assert(intercept[Throwable] {
ParquetRelation2.mergeMetastoreParquetSchema(
FSBasedParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false),
StructField("lowerCase", BinaryType, nullable = false))),
@ -241,7 +241,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
// Conflicting non-nullable field names
intercept[Throwable] {
ParquetRelation2.mergeMetastoreParquetSchema(
FSBasedParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(StructField("lower", StringType, nullable = false))),
StructType(Seq(StructField("lowerCase", BinaryType))))
}
@ -255,7 +255,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true)))) {
ParquetRelation2.mergeMetastoreParquetSchema(
FSBasedParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
@ -268,7 +268,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
// Merge should fail if the Metastore contains any additional fields that are not
// nullable.
assert(intercept[Throwable] {
ParquetRelation2.mergeMetastoreParquetSchema(
FSBasedParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),

View file

@ -33,10 +33,10 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.parquet.FSBasedParquetRelation
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources}
import org.apache.spark.util.Utils
/* Implicit conversions */
@ -226,8 +226,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// serialize the Metastore schema to JSON and pass it as a data source option because of the
// evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
val parquetOptions = Map(
ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
FSBasedParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
FSBasedParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
@ -238,13 +238,15 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
case logical@LogicalRelation(parquetRelation: FSBasedParquetRelation) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
parquetRelation.paths.toSet == pathsInMetastore.toSet &&
logical.schema.sameType(metastoreSchema) &&
parquetRelation.maybePartitionSpec == partitionSpecInMetastore
parquetRelation.partitionSpec == partitionSpecInMetastore.getOrElse {
PartitionSpec(StructType(Nil), Array.empty[sources.Partition])
}
if (useCached) {
Some(logical)
@ -256,7 +258,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
case other =>
logWarning(
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " +
s"as Parquet. However, we are getting a ${other} from the metastore cache. " +
s"as Parquet. However, we are getting a $other from the metastore cache. " +
s"This cached entry will be invalidated.")
cachedDataSourceTables.invalidate(tableIdentifier)
None
@ -278,8 +280,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
val parquetRelation = cached.getOrElse {
val created =
LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
val created = LogicalRelation(
new FSBasedParquetRelation(
paths.toArray, None, Some(partitionSpec), parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}
@ -290,8 +293,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
val parquetRelation = cached.getOrElse {
val created =
LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
val created = LogicalRelation(
new FSBasedParquetRelation(paths.toArray, None, None, parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}

View file

@ -21,21 +21,18 @@ import java.io.File
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.InvalidInputException
import org.scalatest.BeforeAndAfterEach
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.metastore.TableType
import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.mapred.InvalidInputException
import org.apache.spark.sql._
import org.apache.spark.util.Utils
import org.apache.spark.sql.types._
import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.parquet.FSBasedParquetRelation
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
/**
* Tests for persisting tables created though the data sources API into the metastore.
@ -582,11 +579,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
case LogicalRelation(p: ParquetRelation2) => // OK
case LogicalRelation(p: FSBasedParquetRelation) => // OK
case _ =>
fail(
"test_parquet_ctas should be converted to " +
s"${classOf[ParquetRelation2].getCanonicalName}")
s"${classOf[FSBasedParquetRelation].getCanonicalName}")
}
// Clenup and reset confs.

View file

@ -19,16 +19,14 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.DefaultParserDialect
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
import org.apache.spark.sql.hive.MetastoreRelation
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim}
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation}
import org.apache.spark.sql.parquet.FSBasedParquetRelation
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, DefaultParserDialect, QueryTest, Row, SQLConf}
case class Nested1(f1: Nested2)
case class Nested2(f2: Nested3)
@ -176,17 +174,17 @@ class SQLQuerySuite extends QueryTest {
def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
relation match {
case LogicalRelation(r: ParquetRelation2) =>
case LogicalRelation(r: FSBasedParquetRelation) =>
if (!isDataSourceParquet) {
fail(
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
s"${ParquetRelation2.getClass.getCanonicalName}.")
s"${FSBasedParquetRelation.getClass.getCanonicalName}.")
}
case r: MetastoreRelation =>
if (isDataSourceParquet) {
fail(
s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " +
s"${FSBasedParquetRelation.getClass.getCanonicalName} is expected, but found " +
s"${classOf[MetastoreRelation].getCanonicalName}.")
}
}
@ -596,7 +594,7 @@ class SQLQuerySuite extends QueryTest {
sql(s"DROP TABLE $tableName")
}
}
test("SPARK-5203 union with different decimal precision") {
Seq.empty[(Decimal, Decimal)]
.toDF("d1", "d2")

View file

@ -21,16 +21,15 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.{QueryTest, SQLConf}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.parquet.{FSBasedParquetRelation, ParquetTableScan}
import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoFSBasedRelation, LogicalRelation}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
import org.apache.spark.util.Utils
// The data where the partitioning key exists only in the directory structure.
@ -292,10 +291,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
case LogicalRelation(p: ParquetRelation2) => // OK
case _ =>
fail(
s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}")
case LogicalRelation(_: FSBasedParquetRelation) => // OK
case _ => fail(
"test_parquet_ctas should be converted to " +
s"${classOf[FSBasedParquetRelation].getCanonicalName}")
}
sql("DROP TABLE IF EXISTS test_parquet_ctas")
@ -316,12 +315,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.executedPlan match {
case ExecutedCommand(
InsertIntoDataSource(
LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
case ExecutedCommand(InsertIntoFSBasedRelation(_: FSBasedParquetRelation, _, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[ParquetRelation2].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
s"${classOf[FSBasedParquetRelation].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " +
s"However, found a ${o.toString} ")
}
@ -348,11 +345,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.executedPlan match {
case ExecutedCommand(
InsertIntoDataSource(
LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
case ExecutedCommand(InsertIntoFSBasedRelation(r: FSBasedParquetRelation, _, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[ParquetRelation2].getCanonicalName} and " +
s"${classOf[FSBasedParquetRelation].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
s"However, found a ${o.toString} ")
}
@ -383,7 +378,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
assertResult(2) {
analyzed.collect {
case r @ LogicalRelation(_: ParquetRelation2) => r
case r @ LogicalRelation(_: FSBasedParquetRelation) => r
}.size
}
@ -395,7 +390,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
// Converted test_parquet should be cached.
catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
case null => fail("Converted test_parquet should be cached in the cache.")
case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
case logical @ LogicalRelation(parquetRelation: FSBasedParquetRelation) => // OK
case other =>
fail(
"The cached test_parquet should be a Parquet Relation. " +
@ -693,7 +688,7 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int")
intercept[RuntimeException](df2.saveAsParquetFile(filePath))
intercept[Throwable](df2.saveAsParquetFile(filePath))
val df3 = df2.toDF("str", "max_int")
df3.saveAsParquetFile(filePath2)

View file

@ -28,12 +28,14 @@ import org.apache.spark.sql.types._
// TODO Don't extend ParquetTest
// This test suite extends ParquetTest for some convenient utility methods. These methods should be
// moved to some more general places, maybe QueryTest.
class FSBasedRelationSuite extends QueryTest with ParquetTest {
class FSBasedRelationTest extends QueryTest with ParquetTest {
override val sqlContext: SQLContext = TestHive
import sqlContext._
import sqlContext.implicits._
val dataSourceName = classOf[SimpleTextSource].getCanonicalName
val dataSchema =
StructType(
Seq(
@ -92,17 +94,17 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempPath { file =>
testDF.save(
path = file.getCanonicalPath,
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Overwrite)
testDF.save(
path = file.getCanonicalPath,
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Overwrite)
checkAnswer(
load(
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)),
@ -114,17 +116,17 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempPath { file =>
testDF.save(
path = file.getCanonicalPath,
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Overwrite)
testDF.save(
path = file.getCanonicalPath,
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Append)
checkAnswer(
load(
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)).orderBy("a"),
@ -137,7 +139,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
intercept[RuntimeException] {
testDF.save(
path = file.getCanonicalPath,
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.ErrorIfExists)
}
}
@ -147,7 +149,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempDir { file =>
testDF.save(
path = file.getCanonicalPath,
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Ignore)
val path = new Path(file.getCanonicalPath)
@ -159,62 +161,37 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("save()/load() - partitioned table - simple queries") {
withTempPath { file =>
partitionedTestDF.save(
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.ErrorIfExists,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
checkQueries(
load(
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)))
}
}
test("save()/load() - partitioned table - simple queries - partition columns in data") {
withTempDir { file =>
val basePath = new Path(file.getCanonicalPath)
val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
val qualifiedBasePath = fs.makeQualified(basePath)
for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
sparkContext
.parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
.saveAsTextFile(partitionDir.toString)
}
val dataSchemaWithPartition =
StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
checkQueries(
load(
source = classOf[SimpleTextSource].getCanonicalName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchemaWithPartition.json)))
}
}
test("save()/load() - partitioned table - Overwrite") {
withTempPath { file =>
partitionedTestDF.save(
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF.save(
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
checkAnswer(
load(
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)),
@ -225,20 +202,20 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("save()/load() - partitioned table - Append") {
withTempPath { file =>
partitionedTestDF.save(
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF.save(
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Append,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
checkAnswer(
load(
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)),
@ -249,20 +226,20 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("save()/load() - partitioned table - Append - new partition values") {
withTempPath { file =>
partitionedTestDF1.save(
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF2.save(
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Append,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
checkAnswer(
load(
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)),
@ -274,7 +251,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempDir { file =>
intercept[RuntimeException] {
partitionedTestDF.save(
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.ErrorIfExists,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
@ -286,7 +263,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempDir { file =>
partitionedTestDF.save(
path = file.getCanonicalPath,
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Ignore)
val path = new Path(file.getCanonicalPath)
@ -302,7 +279,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - non-partitioned table - Overwrite") {
testDF.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Overwrite,
Map("dataSchema" -> dataSchema.json))
@ -314,12 +291,12 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - non-partitioned table - Append") {
testDF.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Overwrite)
testDF.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Append)
withTable("t") {
@ -334,7 +311,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
intercept[AnalysisException] {
testDF.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.ErrorIfExists)
}
}
@ -346,7 +323,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempTable("t") {
testDF.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Ignore)
assert(table("t").collect().isEmpty)
@ -356,7 +333,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - partitioned table - simple queries") {
partitionedTestDF.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Overwrite,
Map("dataSchema" -> dataSchema.json))
@ -368,14 +345,14 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - partitioned table - Overwrite") {
partitionedTestDF.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@ -388,14 +365,14 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - partitioned table - Append") {
partitionedTestDF.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Append,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@ -408,14 +385,14 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - partitioned table - Append - new partition values") {
partitionedTestDF1.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF2.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Append,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@ -428,7 +405,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") {
partitionedTestDF1.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@ -437,7 +414,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
intercept[Throwable] {
partitionedTestDF2.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Append,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1"))
@ -447,7 +424,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
intercept[Throwable] {
partitionedTestDF2.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Append,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p2", "p1"))
@ -461,7 +438,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
intercept[AnalysisException] {
partitionedTestDF.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.ErrorIfExists,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@ -475,7 +452,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempTable("t") {
partitionedTestDF.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Ignore,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@ -487,13 +464,13 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("Hadoop style globbing") {
withTempPath { file =>
partitionedTestDF.save(
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
val df = load(
source = classOf[SimpleTextSource].getCanonicalName,
source = dataSourceName,
options = Map(
"path" -> s"${file.getCanonicalPath}/p1=*/p2=???",
"dataSchema" -> dataSchema.json))
@ -521,3 +498,67 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
}
}
}
class SimpleTextRelationSuite extends FSBasedRelationTest {
override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName
import sqlContext._
test("save()/load() - partitioned table - simple queries - partition columns in data") {
withTempDir { file =>
val basePath = new Path(file.getCanonicalPath)
val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
val qualifiedBasePath = fs.makeQualified(basePath)
for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
sparkContext
.parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
.saveAsTextFile(partitionDir.toString)
}
val dataSchemaWithPartition =
StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
checkQueries(
load(
source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchemaWithPartition.json)))
}
}
}
class FSBasedParquetRelationSuite extends FSBasedRelationTest {
override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName
import sqlContext._
import sqlContext.implicits._
test("save()/load() - partitioned table - simple queries - partition columns in data") {
withTempDir { file =>
val basePath = new Path(file.getCanonicalPath)
val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
val qualifiedBasePath = fs.makeQualified(basePath)
for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
sparkContext
.parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
.toDF("a", "b", "p1")
.saveAsParquetFile(partitionDir.toString)
}
val dataSchemaWithPartition =
StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
checkQueries(
load(
source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchemaWithPartition.json)))
}
}
}