[SPARK-16462][SPARK-16460][SPARK-15144][SQL] Make CSV cast null values properly
## Problem CSV in Spark 2.0.0: - does not read null values back correctly for certain data types such as `Boolean`, `TimestampType`, `DateType` -- this is a regression comparing to 1.6; - does not read empty values (specified by `options.nullValue`) as `null`s for `StringType` -- this is compatible with 1.6 but leads to problems like SPARK-16903. ## What changes were proposed in this pull request? This patch makes changes to read all empty values back as `null`s. ## How was this patch tested? New test cases. Author: Liwei Lin <lwlin7@gmail.com> Closes #14118 from lw-lin/csv-cast-null.
This commit is contained in:
parent
7151011b38
commit
1dbb725dbe
|
@ -329,7 +329,8 @@ class DataFrameReader(OptionUtils):
|
|||
being read should be skipped. If None is set, it uses
|
||||
the default value, ``false``.
|
||||
:param nullValue: sets the string representation of a null value. If None is set, it uses
|
||||
the default value, empty string.
|
||||
the default value, empty string. Since 2.0.1, this ``nullValue`` param
|
||||
applies to all supported types including the string type.
|
||||
:param nanValue: sets the string representation of a non-number value. If None is set, it
|
||||
uses the default value, ``NaN``.
|
||||
:param positiveInf: sets the string representation of a positive infinity value. If None
|
||||
|
|
|
@ -497,7 +497,8 @@ class DataStreamReader(OptionUtils):
|
|||
being read should be skipped. If None is set, it uses
|
||||
the default value, ``false``.
|
||||
:param nullValue: sets the string representation of a null value. If None is set, it uses
|
||||
the default value, empty string.
|
||||
the default value, empty string. Since 2.0.1, this ``nullValue`` param
|
||||
applies to all supported types including the string type.
|
||||
:param nanValue: sets the string representation of a non-number value. If None is set, it
|
||||
uses the default value, ``NaN``.
|
||||
:param positiveInf: sets the string representation of a positive infinity value. If None
|
||||
|
|
|
@ -376,7 +376,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
|
|||
* from values being read should be skipped.</li>
|
||||
* <li>`ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing
|
||||
* whitespaces from values being read should be skipped.</li>
|
||||
* <li>`nullValue` (default empty string): sets the string representation of a null value.</li>
|
||||
* <li>`nullValue` (default empty string): sets the string representation of a null value. Since
|
||||
* 2.0.1, this applies to all supported types including the string type.</li>
|
||||
* <li>`nanValue` (default `NaN`): sets the string representation of a non-number" value.</li>
|
||||
* <li>`positiveInf` (default `Inf`): sets the string representation of a positive infinity
|
||||
* value.</li>
|
||||
|
|
|
@ -232,66 +232,58 @@ private[csv] object CSVTypeCast {
|
|||
nullable: Boolean = true,
|
||||
options: CSVOptions = CSVOptions()): Any = {
|
||||
|
||||
castType match {
|
||||
case _: ByteType => if (datum == options.nullValue && nullable) null else datum.toByte
|
||||
case _: ShortType => if (datum == options.nullValue && nullable) null else datum.toShort
|
||||
case _: IntegerType => if (datum == options.nullValue && nullable) null else datum.toInt
|
||||
case _: LongType => if (datum == options.nullValue && nullable) null else datum.toLong
|
||||
case _: FloatType =>
|
||||
if (datum == options.nullValue && nullable) {
|
||||
null
|
||||
} else if (datum == options.nanValue) {
|
||||
Float.NaN
|
||||
} else if (datum == options.negativeInf) {
|
||||
Float.NegativeInfinity
|
||||
} else if (datum == options.positiveInf) {
|
||||
Float.PositiveInfinity
|
||||
} else {
|
||||
Try(datum.toFloat)
|
||||
.getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue())
|
||||
}
|
||||
case _: DoubleType =>
|
||||
if (datum == options.nullValue && nullable) {
|
||||
null
|
||||
} else if (datum == options.nanValue) {
|
||||
Double.NaN
|
||||
} else if (datum == options.negativeInf) {
|
||||
Double.NegativeInfinity
|
||||
} else if (datum == options.positiveInf) {
|
||||
Double.PositiveInfinity
|
||||
} else {
|
||||
Try(datum.toDouble)
|
||||
.getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue())
|
||||
}
|
||||
case _: BooleanType => datum.toBoolean
|
||||
case dt: DecimalType =>
|
||||
if (datum == options.nullValue && nullable) {
|
||||
null
|
||||
} else {
|
||||
if (nullable && datum == options.nullValue) {
|
||||
null
|
||||
} else {
|
||||
castType match {
|
||||
case _: ByteType => datum.toByte
|
||||
case _: ShortType => datum.toShort
|
||||
case _: IntegerType => datum.toInt
|
||||
case _: LongType => datum.toLong
|
||||
case _: FloatType =>
|
||||
datum match {
|
||||
case options.nanValue => Float.NaN
|
||||
case options.negativeInf => Float.NegativeInfinity
|
||||
case options.positiveInf => Float.PositiveInfinity
|
||||
case _ =>
|
||||
Try(datum.toFloat)
|
||||
.getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue())
|
||||
}
|
||||
case _: DoubleType =>
|
||||
datum match {
|
||||
case options.nanValue => Double.NaN
|
||||
case options.negativeInf => Double.NegativeInfinity
|
||||
case options.positiveInf => Double.PositiveInfinity
|
||||
case _ =>
|
||||
Try(datum.toDouble)
|
||||
.getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue())
|
||||
}
|
||||
case _: BooleanType => datum.toBoolean
|
||||
case dt: DecimalType =>
|
||||
val value = new BigDecimal(datum.replaceAll(",", ""))
|
||||
Decimal(value, dt.precision, dt.scale)
|
||||
}
|
||||
case _: TimestampType =>
|
||||
// This one will lose microseconds parts.
|
||||
// See https://issues.apache.org/jira/browse/SPARK-10681.
|
||||
Try(options.timestampFormat.parse(datum).getTime * 1000L)
|
||||
.getOrElse {
|
||||
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
|
||||
// compatibility.
|
||||
DateTimeUtils.stringToTime(datum).getTime * 1000L
|
||||
}
|
||||
case _: DateType =>
|
||||
// This one will lose microseconds parts.
|
||||
// See https://issues.apache.org/jira/browse/SPARK-10681.x
|
||||
Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
|
||||
.getOrElse {
|
||||
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
|
||||
// compatibility.
|
||||
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
|
||||
}
|
||||
case _: StringType => UTF8String.fromString(datum)
|
||||
case udt: UserDefinedType[_] => castTo(datum, udt.sqlType, nullable, options)
|
||||
case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}")
|
||||
case _: TimestampType =>
|
||||
// This one will lose microseconds parts.
|
||||
// See https://issues.apache.org/jira/browse/SPARK-10681.
|
||||
Try(options.timestampFormat.parse(datum).getTime * 1000L)
|
||||
.getOrElse {
|
||||
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
|
||||
// compatibility.
|
||||
DateTimeUtils.stringToTime(datum).getTime * 1000L
|
||||
}
|
||||
case _: DateType =>
|
||||
// This one will lose microseconds parts.
|
||||
// See https://issues.apache.org/jira/browse/SPARK-10681.x
|
||||
Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
|
||||
.getOrElse {
|
||||
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
|
||||
// compatibility.
|
||||
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
|
||||
}
|
||||
case _: StringType => UTF8String.fromString(datum)
|
||||
case udt: UserDefinedType[_] => castTo(datum, udt.sqlType, nullable, options)
|
||||
case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -232,7 +232,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
|
|||
* from values being read should be skipped.</li>
|
||||
* <li>`ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing
|
||||
* whitespaces from values being read should be skipped.</li>
|
||||
* <li>`nullValue` (default empty string): sets the string representation of a null value.</li>
|
||||
* <li>`nullValue` (default empty string): sets the string representation of a null value. Since
|
||||
* 2.0.1, this applies to all supported types including the string type.</li>
|
||||
* <li>`nanValue` (default `NaN`): sets the string representation of a non-number" value.</li>
|
||||
* <li>`positiveInf` (default `Inf`): sets the string representation of a positive infinity
|
||||
* value.</li>
|
||||
|
|
|
@ -554,7 +554,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
|
|||
|
||||
verifyCars(cars, withHeader = true, checkValues = false)
|
||||
val results = cars.collect()
|
||||
assert(results(0).toSeq === Array(2012, "Tesla", "S", "null", "null"))
|
||||
assert(results(0).toSeq === Array(2012, "Tesla", "S", null, null))
|
||||
assert(results(2).toSeq === Array(null, "Chevy", "Volt", null, null))
|
||||
}
|
||||
|
||||
|
|
|
@ -68,16 +68,46 @@ class CSVTypeCastSuite extends SparkFunSuite {
|
|||
}
|
||||
|
||||
test("Nullable types are handled") {
|
||||
assert(CSVTypeCast.castTo("", IntegerType, nullable = true, CSVOptions()) == null)
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", ByteType, nullable = true, CSVOptions("nullValue", "-")))
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", ShortType, nullable = true, CSVOptions("nullValue", "-")))
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", IntegerType, nullable = true, CSVOptions("nullValue", "-")))
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", LongType, nullable = true, CSVOptions("nullValue", "-")))
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", FloatType, nullable = true, CSVOptions("nullValue", "-")))
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", DoubleType, nullable = true, CSVOptions("nullValue", "-")))
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", BooleanType, nullable = true, CSVOptions("nullValue", "-")))
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", DecimalType.DoubleDecimal, true, CSVOptions("nullValue", "-")))
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", TimestampType, nullable = true, CSVOptions("nullValue", "-")))
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", DateType, nullable = true, CSVOptions("nullValue", "-")))
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", StringType, nullable = true, CSVOptions("nullValue", "-")))
|
||||
}
|
||||
|
||||
test("String type should always return the same as the input") {
|
||||
assert(
|
||||
CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions()) ==
|
||||
UTF8String.fromString(""))
|
||||
test("String type should also respect `nullValue`") {
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions()))
|
||||
assert(
|
||||
CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions()) ==
|
||||
UTF8String.fromString(""))
|
||||
|
||||
assert(
|
||||
CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions("nullValue", "null")) ==
|
||||
UTF8String.fromString(""))
|
||||
assert(
|
||||
CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions("nullValue", "null")) ==
|
||||
UTF8String.fromString(""))
|
||||
|
||||
assertNull(
|
||||
CSVTypeCast.castTo(null, StringType, nullable = true, CSVOptions("nullValue", "null")))
|
||||
}
|
||||
|
||||
test("Throws exception for empty string with non null type") {
|
||||
|
@ -170,20 +200,4 @@ class CSVTypeCastSuite extends SparkFunSuite {
|
|||
assert(doubleVal2 == Double.PositiveInfinity)
|
||||
}
|
||||
|
||||
test("Type-specific null values are used for casting") {
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", ByteType, nullable = true, CSVOptions("nullValue", "-")))
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", ShortType, nullable = true, CSVOptions("nullValue", "-")))
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", IntegerType, nullable = true, CSVOptions("nullValue", "-")))
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", LongType, nullable = true, CSVOptions("nullValue", "-")))
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", FloatType, nullable = true, CSVOptions("nullValue", "-")))
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", DoubleType, nullable = true, CSVOptions("nullValue", "-")))
|
||||
assertNull(
|
||||
CSVTypeCast.castTo("-", DecimalType.DoubleDecimal, true, CSVOptions("nullValue", "-")))
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue