[SPARK-23715][SQL] the input of to/from_utc_timestamp can not have timezone

## What changes were proposed in this pull request?

`from_utc_timestamp` assumes its input is in UTC timezone and shifts it to the specified timezone. When the timestamp contains timezone(e.g. `2018-03-13T06:18:23+00:00`), Spark breaks the semantic and respect the timezone in the string. This is not what user expects and the result is different from Hive/Impala. `to_utc_timestamp` has the same problem.

More details please refer to the JIRA ticket.

This PR fixes this by returning null if the input timestamp contains timezone.

## How was this patch tested?

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21169 from cloud-fan/from_utc_timezone.
This commit is contained in:
Wenchen Fan 2018-05-03 19:27:01 +08:00 committed by hyukjinkwon
parent c9bfd1c6f8
commit 417ad92502
9 changed files with 283 additions and 19 deletions

View file

@ -1805,12 +1805,13 @@ working with timestamps in `pandas_udf`s to get the best performance, see
- Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively.
- In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unable to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by `spark.sql.execution.arrow.fallback.enabled`.
- Since Spark 2.4, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing a 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe.
- Since Spark 2.4, expression IDs in UDF arguments do not appear in column names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)` but ``UDF:f(col0 AS `colA`)``.
- Since Spark 2.4, writing a dataframe with an empty or nested empty schema using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An exception is thrown when attempting to write dataframes with empty schema.
- Since Spark 2.4, Spark compares a DATE type with a TIMESTAMP type after promotes both sides to TIMESTAMP. To set `false` to `spark.sql.hive.compareDateTimestampInTimestamp` restores the previous behavior. This option will be removed in Spark 3.0.
- Since Spark 2.4, creating a managed table with nonempty location is not allowed. An exception is thrown when attempting to create a managed table with nonempty location. To set `true` to `spark.sql.allowCreatingManagedTableUsingNonemptyLocation` restores the previous behavior. This option will be removed in Spark 3.0.
- Since Spark 2.4, the type coercion rules can automatically promote the argument types of the variadic SQL functions (e.g., IN/COALESCE) to the widest common type, no matter how the input arguments order. In prior Spark versions, the promotion could fail in some specific orders (e.g., TimestampType, IntegerType and StringType) and throw an exception.
- Since Spark 2.4, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing a 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe.
- Since Spark 2.4, expression IDs in UDF arguments do not appear in column names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)` but ``UDF:f(col0 AS `colA`)``.
- Since Spark 2.4, writing a dataframe with an empty or nested empty schema using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An exception is thrown when attempting to write dataframes with empty schema.
- Since Spark 2.4, Spark compares a DATE type with a TIMESTAMP type after promotes both sides to TIMESTAMP. To set `false` to `spark.sql.hive.compareDateTimestampInTimestamp` restores the previous behavior. This option will be removed in Spark 3.0.
- Since Spark 2.4, creating a managed table with nonempty location is not allowed. An exception is thrown when attempting to create a managed table with nonempty location. To set `true` to `spark.sql.allowCreatingManagedTableUsingNonemptyLocation` restores the previous behavior. This option will be removed in Spark 3.0.
- Since Spark 2.4, the type coercion rules can automatically promote the argument types of the variadic SQL functions (e.g., IN/COALESCE) to the widest common type, no matter how the input arguments order. In prior Spark versions, the promotion could fail in some specific orders (e.g., TimestampType, IntegerType and StringType) and throw an exception.
- In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone in the input timestamp string, which breaks the assumption that the input timestamp is in a specific timezone. Therefore, these 2 functions can return unexpected results. In version 2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10 00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However, `from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8, will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care about this problem and want to retain the previous behaivor to keep their query unchanged, you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed in Spark 3.0 and should only be used as a temporary workaround.
## Upgrading From Spark SQL 2.2 to 2.3
- Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`.

View file

@ -59,7 +59,7 @@ object TypeCoercion {
IfCoercion ::
StackCoercion ::
Division ::
ImplicitTypeCasts ::
new ImplicitTypeCasts(conf) ::
DateTimeOperations ::
WindowFrameCoercion ::
Nil
@ -776,12 +776,33 @@ object TypeCoercion {
/**
* Casts types according to the expected input types for [[Expression]]s.
*/
object ImplicitTypeCasts extends TypeCoercionRule {
class ImplicitTypeCasts(conf: SQLConf) extends TypeCoercionRule {
private def rejectTzInString = conf.getConf(SQLConf.REJECT_TIMEZONE_IN_STRING)
override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e
// Special rules for `from/to_utc_timestamp`. These 2 functions assume the input timestamp
// string is in a specific timezone, so the string itself should not contain timezone.
// TODO: We should move the type coercion logic to expressions instead of a central
// place to put all the rules.
case e: FromUTCTimestamp if e.left.dataType == StringType =>
if (rejectTzInString) {
e.copy(left = StringToTimestampWithoutTimezone(e.left))
} else {
e.copy(left = Cast(e.left, TimestampType))
}
case e: ToUTCTimestamp if e.left.dataType == StringType =>
if (rejectTzInString) {
e.copy(left = StringToTimestampWithoutTimezone(e.left))
} else {
e.copy(left = Cast(e.left, TimestampType))
}
case b @ BinaryOperator(left, right) if left.dataType != right.dataType =>
findTightestCommonType(left.dataType, right.dataType).map { commonType =>
if (b.inputType.acceptsType(commonType)) {
@ -798,7 +819,7 @@ object TypeCoercion {
case e: ImplicitCastInputTypes if e.inputTypes.nonEmpty =>
val children: Seq[Expression] = e.children.zip(e.inputTypes).map { case (in, expected) =>
// If we cannot do the implicit cast, just use the original input.
implicitCast(in, expected).getOrElse(in)
ImplicitTypeCasts.implicitCast(in, expected).getOrElse(in)
}
e.withNewChildren(children)
@ -814,6 +835,9 @@ object TypeCoercion {
}
e.withNewChildren(children)
}
}
object ImplicitTypeCasts {
/**
* Given an expected data type, try to cast the expression and return the cast expression.

View file

@ -1016,6 +1016,48 @@ case class TimeAdd(start: Expression, interval: Expression, timeZoneId: Option[S
}
}
/**
* A special expression used to convert the string input of `to/from_utc_timestamp` to timestamp,
* which requires the timestamp string to not have timezone information, otherwise null is returned.
*/
case class StringToTimestampWithoutTimezone(child: Expression, timeZoneId: Option[String] = None)
extends UnaryExpression with TimeZoneAwareExpression with ExpectsInputTypes {
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))
override def inputTypes: Seq[AbstractDataType] = Seq(StringType)
override def dataType: DataType = TimestampType
override def nullable: Boolean = true
override def toString: String = child.toString
override def sql: String = child.sql
override def nullSafeEval(input: Any): Any = {
DateTimeUtils.stringToTimestamp(
input.asInstanceOf[UTF8String], timeZone, rejectTzInString = true).orNull
}
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
val tz = ctx.addReferenceObj("timeZone", timeZone)
val longOpt = ctx.freshName("longOpt")
val eval = child.genCode(ctx)
val code = s"""
|${eval.code}
|${CodeGenerator.JAVA_BOOLEAN} ${ev.isNull} = true;
|${CodeGenerator.JAVA_LONG} ${ev.value} = ${CodeGenerator.defaultValue(TimestampType)};
|if (!${eval.isNull}) {
| scala.Option<Long> $longOpt = $dtu.stringToTimestamp(${eval.value}, $tz, true);
| if ($longOpt.isDefined()) {
| ${ev.value} = ((Long) $longOpt.get()).longValue();
| ${ev.isNull} = false;
| }
|}
""".stripMargin
ev.copy(code = code)
}
}
/**
* Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in UTC, and renders
* that time as a timestamp in the given time zone. For example, 'GMT+1' would yield

View file

@ -296,10 +296,28 @@ object DateTimeUtils {
* `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m`
*/
def stringToTimestamp(s: UTF8String): Option[SQLTimestamp] = {
stringToTimestamp(s, defaultTimeZone())
stringToTimestamp(s, defaultTimeZone(), rejectTzInString = false)
}
def stringToTimestamp(s: UTF8String, timeZone: TimeZone): Option[SQLTimestamp] = {
stringToTimestamp(s, timeZone, rejectTzInString = false)
}
/**
* Converts a timestamp string to microseconds from the unix epoch, w.r.t. the given timezone.
* Returns None if the input string is not a valid timestamp format.
*
* @param s the input timestamp string.
* @param timeZone the timezone of the timestamp string, will be ignored if the timestamp string
* already contains timezone information and `forceTimezone` is false.
* @param rejectTzInString if true, rejects timezone in the input string, i.e., if the
* timestamp string contains timezone, like `2000-10-10 00:00:00+00:00`,
* return None.
*/
def stringToTimestamp(
s: UTF8String,
timeZone: TimeZone,
rejectTzInString: Boolean): Option[SQLTimestamp] = {
if (s == null) {
return None
}
@ -417,6 +435,8 @@ object DateTimeUtils {
return None
}
if (tz.isDefined && rejectTzInString) return None
val c = if (tz.isEmpty) {
Calendar.getInstance(timeZone)
} else {

View file

@ -1208,6 +1208,13 @@ object SQLConf {
.stringConf
.createWithDefault("")
val REJECT_TIMEZONE_IN_STRING = buildConf("spark.sql.function.rejectTimezoneInString")
.internal()
.doc("If true, `to_utc_timestamp` and `from_utc_timestamp` return null if the input string " +
"contains a timezone part, e.g. `2000-10-10 00:00:00+00:00`.")
.booleanConf
.createWithDefault(true)
object PartitionOverwriteMode extends Enumeration {
val STATIC, DYNAMIC = Value
}

View file

@ -524,11 +524,11 @@ class TypeCoercionSuite extends AnalysisTest {
test("cast NullType for expressions that implement ExpectsInputTypes") {
import TypeCoercionSuite._
ruleTest(TypeCoercion.ImplicitTypeCasts,
ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
AnyTypeUnaryExpression(Literal.create(null, NullType)),
AnyTypeUnaryExpression(Literal.create(null, NullType)))
ruleTest(TypeCoercion.ImplicitTypeCasts,
ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
NumericTypeUnaryExpression(Literal.create(null, NullType)),
NumericTypeUnaryExpression(Literal.create(null, DoubleType)))
}
@ -536,11 +536,11 @@ class TypeCoercionSuite extends AnalysisTest {
test("cast NullType for binary operators") {
import TypeCoercionSuite._
ruleTest(TypeCoercion.ImplicitTypeCasts,
ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
AnyTypeBinaryOperator(Literal.create(null, NullType), Literal.create(null, NullType)),
AnyTypeBinaryOperator(Literal.create(null, NullType), Literal.create(null, NullType)))
ruleTest(TypeCoercion.ImplicitTypeCasts,
ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
NumericTypeBinaryOperator(Literal.create(null, NullType), Literal.create(null, NullType)),
NumericTypeBinaryOperator(Literal.create(null, DoubleType), Literal.create(null, DoubleType)))
}
@ -823,7 +823,7 @@ class TypeCoercionSuite extends AnalysisTest {
}
test("type coercion for CaseKeyWhen") {
ruleTest(TypeCoercion.ImplicitTypeCasts,
ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
CaseKeyWhen(Literal(1.toShort), Seq(Literal(1), Literal("a"))),
CaseKeyWhen(Cast(Literal(1.toShort), IntegerType), Seq(Literal(1), Literal("a")))
)
@ -1275,7 +1275,7 @@ class TypeCoercionSuite extends AnalysisTest {
}
test("SPARK-17117 null type coercion in divide") {
val rules = Seq(FunctionArgumentConversion, Division, ImplicitTypeCasts)
val rules = Seq(FunctionArgumentConversion, Division, new ImplicitTypeCasts(conf))
val nullLit = Literal.create(null, NullType)
ruleTest(rules, Divide(1L, nullLit), Divide(Cast(1L, DoubleType), Cast(nullLit, DoubleType)))
ruleTest(rules, Divide(nullLit, 1L), Divide(Cast(nullLit, DoubleType), Cast(1L, DoubleType)))

View file

@ -27,3 +27,36 @@ select current_date = current_date(), current_timestamp = current_timestamp(), a
select a, b from ttf2 order by a, current_date;
select weekday('2007-02-03'), weekday('2009-07-30'), weekday('2017-05-27'), weekday(null), weekday('1582-10-15 13:10:15');
select from_utc_timestamp('2015-07-24 00:00:00', 'PST');
select from_utc_timestamp('2015-01-24 00:00:00', 'PST');
select from_utc_timestamp(null, 'PST');
select from_utc_timestamp('2015-07-24 00:00:00', null);
select from_utc_timestamp(null, null);
select from_utc_timestamp(cast(0 as timestamp), 'PST');
select from_utc_timestamp(cast('2015-01-24' as date), 'PST');
select to_utc_timestamp('2015-07-24 00:00:00', 'PST');
select to_utc_timestamp('2015-01-24 00:00:00', 'PST');
select to_utc_timestamp(null, 'PST');
select to_utc_timestamp('2015-07-24 00:00:00', null);
select to_utc_timestamp(null, null);
select to_utc_timestamp(cast(0 as timestamp), 'PST');
select to_utc_timestamp(cast('2015-01-24' as date), 'PST');
-- SPARK-23715: the input of to/from_utc_timestamp can not have timezone
select from_utc_timestamp('2000-10-10 00:00:00+00:00', 'PST');
select to_utc_timestamp('2000-10-10 00:00:00+00:00', 'PST');

View file

@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 10
-- Number of queries: 26
-- !query 0
@ -82,9 +82,138 @@ struct<a:int,b:int>
1 2
2 3
-- !query 9
select weekday('2007-02-03'), weekday('2009-07-30'), weekday('2017-05-27'), weekday(null), weekday('1582-10-15 13:10:15')
-- !query 3 schema
-- !query 9 schema
struct<weekday(CAST(2007-02-03 AS DATE)):int,weekday(CAST(2009-07-30 AS DATE)):int,weekday(CAST(2017-05-27 AS DATE)):int,weekday(CAST(NULL AS DATE)):int,weekday(CAST(1582-10-15 13:10:15 AS DATE)):int>
-- !query 3 output
-- !query 9 output
5 3 5 NULL 4
-- !query 10
select from_utc_timestamp('2015-07-24 00:00:00', 'PST')
-- !query 10 schema
struct<from_utc_timestamp(2015-07-24 00:00:00, PST):timestamp>
-- !query 10 output
2015-07-23 17:00:00
-- !query 11
select from_utc_timestamp('2015-01-24 00:00:00', 'PST')
-- !query 11 schema
struct<from_utc_timestamp(2015-01-24 00:00:00, PST):timestamp>
-- !query 11 output
2015-01-23 16:00:00
-- !query 12
select from_utc_timestamp(null, 'PST')
-- !query 12 schema
struct<from_utc_timestamp(CAST(NULL AS TIMESTAMP), PST):timestamp>
-- !query 12 output
NULL
-- !query 13
select from_utc_timestamp('2015-07-24 00:00:00', null)
-- !query 13 schema
struct<from_utc_timestamp(2015-07-24 00:00:00, CAST(NULL AS STRING)):timestamp>
-- !query 13 output
NULL
-- !query 14
select from_utc_timestamp(null, null)
-- !query 14 schema
struct<from_utc_timestamp(CAST(NULL AS TIMESTAMP), CAST(NULL AS STRING)):timestamp>
-- !query 14 output
NULL
-- !query 15
select from_utc_timestamp(cast(0 as timestamp), 'PST')
-- !query 15 schema
struct<from_utc_timestamp(CAST(0 AS TIMESTAMP), PST):timestamp>
-- !query 15 output
1969-12-31 08:00:00
-- !query 16
select from_utc_timestamp(cast('2015-01-24' as date), 'PST')
-- !query 16 schema
struct<from_utc_timestamp(CAST(CAST(2015-01-24 AS DATE) AS TIMESTAMP), PST):timestamp>
-- !query 16 output
2015-01-23 16:00:00
-- !query 17
select to_utc_timestamp('2015-07-24 00:00:00', 'PST')
-- !query 17 schema
struct<to_utc_timestamp(2015-07-24 00:00:00, PST):timestamp>
-- !query 17 output
2015-07-24 07:00:00
-- !query 18
select to_utc_timestamp('2015-01-24 00:00:00', 'PST')
-- !query 18 schema
struct<to_utc_timestamp(2015-01-24 00:00:00, PST):timestamp>
-- !query 18 output
2015-01-24 08:00:00
-- !query 19
select to_utc_timestamp(null, 'PST')
-- !query 19 schema
struct<to_utc_timestamp(CAST(NULL AS TIMESTAMP), PST):timestamp>
-- !query 19 output
NULL
-- !query 20
select to_utc_timestamp('2015-07-24 00:00:00', null)
-- !query 20 schema
struct<to_utc_timestamp(2015-07-24 00:00:00, CAST(NULL AS STRING)):timestamp>
-- !query 20 output
NULL
-- !query 21
select to_utc_timestamp(null, null)
-- !query 21 schema
struct<to_utc_timestamp(CAST(NULL AS TIMESTAMP), CAST(NULL AS STRING)):timestamp>
-- !query 21 output
NULL
-- !query 22
select to_utc_timestamp(cast(0 as timestamp), 'PST')
-- !query 22 schema
struct<to_utc_timestamp(CAST(0 AS TIMESTAMP), PST):timestamp>
-- !query 22 output
1970-01-01 00:00:00
-- !query 23
select to_utc_timestamp(cast('2015-01-24' as date), 'PST')
-- !query 23 schema
struct<to_utc_timestamp(CAST(CAST(2015-01-24 AS DATE) AS TIMESTAMP), PST):timestamp>
-- !query 23 output
2015-01-24 08:00:00
-- !query 24
select from_utc_timestamp('2000-10-10 00:00:00+00:00', 'PST')
-- !query 24 schema
struct<from_utc_timestamp(2000-10-10 00:00:00+00:00, PST):timestamp>
-- !query 24 output
NULL
-- !query 25
select to_utc_timestamp('2000-10-10 00:00:00+00:00', 'PST')
-- !query 25 schema
struct<to_utc_timestamp(2000-10-10 00:00:00+00:00, PST):timestamp>
-- !query 25 output
NULL

View file

@ -23,6 +23,7 @@ import java.util.Locale
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.unsafe.types.CalendarInterval
@ -696,4 +697,11 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext {
Row(Timestamp.valueOf("2015-07-25 07:00:00"))))
}
test("SPARK-23715: to/from_utc_timestamp can retain the previous behavior") {
withSQLConf(SQLConf.REJECT_TIMEZONE_IN_STRING.key -> "false") {
checkAnswer(
sql("SELECT from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')"),
Row(Timestamp.valueOf("2000-10-09 18:00:00")))
}
}
}