[SPARK-29757][SQL] Move calendar interval constants together

### What changes were proposed in this pull request?
```java
  public static final int YEARS_PER_DECADE = 10;
  public static final int YEARS_PER_CENTURY = 100;
  public static final int YEARS_PER_MILLENNIUM = 1000;

  public static final byte MONTHS_PER_QUARTER = 3;
  public static final int MONTHS_PER_YEAR = 12;

  public static final byte DAYS_PER_WEEK = 7;
  public static final long DAYS_PER_MONTH = 30L;

  public static final long HOURS_PER_DAY = 24L;

  public static final long MINUTES_PER_HOUR = 60L;

  public static final long SECONDS_PER_MINUTE = 60L;
  public static final long SECONDS_PER_HOUR = MINUTES_PER_HOUR * SECONDS_PER_MINUTE;
  public static final long SECONDS_PER_DAY = HOURS_PER_DAY * SECONDS_PER_HOUR;

  public static final long MILLIS_PER_SECOND = 1000L;
  public static final long MILLIS_PER_MINUTE = SECONDS_PER_MINUTE * MILLIS_PER_SECOND;
  public static final long MILLIS_PER_HOUR = MINUTES_PER_HOUR * MILLIS_PER_MINUTE;
  public static final long MILLIS_PER_DAY = HOURS_PER_DAY * MILLIS_PER_HOUR;

  public static final long MICROS_PER_MILLIS = 1000L;
  public static final long MICROS_PER_SECOND = MILLIS_PER_SECOND * MICROS_PER_MILLIS;
  public static final long MICROS_PER_MINUTE = SECONDS_PER_MINUTE * MICROS_PER_SECOND;
  public static final long MICROS_PER_HOUR = MINUTES_PER_HOUR * MICROS_PER_MINUTE;
  public static final long MICROS_PER_DAY = HOURS_PER_DAY * MICROS_PER_HOUR;
  public static final long MICROS_PER_MONTH = DAYS_PER_MONTH * MICROS_PER_DAY;
  /* 365.25 days per year assumes leap year every four years */
  public static final long MICROS_PER_YEAR = (36525L * MICROS_PER_DAY) / 100;

  public static final long NANOS_PER_MICROS = 1000L;
  public static final long NANOS_PER_MILLIS = MICROS_PER_MILLIS * NANOS_PER_MICROS;
  public static final long NANOS_PER_SECOND = MILLIS_PER_SECOND * NANOS_PER_MILLIS;
```
The above parameters are defined in IntervalUtils, DateTimeUtils, and CalendarInterval, some of them are redundant, some of them are cross-referenced.

### Why are the changes needed?
To simplify code, enhance consistency and reduce risks

### Does this PR introduce any user-facing change?

no
### How was this patch tested?

modified uts

Closes #26399 from yaooqinn/SPARK-29757.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Kent Yao 2019-11-07 19:48:19 +08:00 committed by Wenchen Fan
parent 9b61f90987
commit 9562b26914
32 changed files with 141 additions and 114 deletions

View file

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.util;
public class DateTimeConstants {
public static final int YEARS_PER_DECADE = 10;
public static final int YEARS_PER_CENTURY = 100;
public static final int YEARS_PER_MILLENNIUM = 1000;
public static final byte MONTHS_PER_QUARTER = 3;
public static final int MONTHS_PER_YEAR = 12;
public static final byte DAYS_PER_WEEK = 7;
public static final long DAYS_PER_MONTH = 30L;
public static final long HOURS_PER_DAY = 24L;
public static final long MINUTES_PER_HOUR = 60L;
public static final long SECONDS_PER_MINUTE = 60L;
public static final long SECONDS_PER_HOUR = MINUTES_PER_HOUR * SECONDS_PER_MINUTE;
public static final long SECONDS_PER_DAY = HOURS_PER_DAY * SECONDS_PER_HOUR;
public static final long MILLIS_PER_SECOND = 1000L;
public static final long MILLIS_PER_MINUTE = SECONDS_PER_MINUTE * MILLIS_PER_SECOND;
public static final long MILLIS_PER_HOUR = MINUTES_PER_HOUR * MILLIS_PER_MINUTE;
public static final long MILLIS_PER_DAY = HOURS_PER_DAY * MILLIS_PER_HOUR;
public static final long MICROS_PER_MILLIS = 1000L;
public static final long MICROS_PER_SECOND = MILLIS_PER_SECOND * MICROS_PER_MILLIS;
public static final long MICROS_PER_MINUTE = SECONDS_PER_MINUTE * MICROS_PER_SECOND;
public static final long MICROS_PER_HOUR = MINUTES_PER_HOUR * MICROS_PER_MINUTE;
public static final long MICROS_PER_DAY = HOURS_PER_DAY * MICROS_PER_HOUR;
public static final long MICROS_PER_MONTH = DAYS_PER_MONTH * MICROS_PER_DAY;
/* 365.25 days per year assumes leap year every four years */
public static final long MICROS_PER_YEAR = (36525L * MICROS_PER_DAY) / 100;
public static final long NANOS_PER_MICROS = 1000L;
public static final long NANOS_PER_MILLIS = MICROS_PER_MILLIS * NANOS_PER_MICROS;
public static final long NANOS_PER_SECOND = MILLIS_PER_SECOND * NANOS_PER_MILLIS;
}

View file

@ -24,25 +24,16 @@ import java.time.Period;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import static org.apache.spark.sql.catalyst.util.DateTimeConstants.*;
/**
* The internal representation of interval type.
*/
public final class CalendarInterval implements Serializable {
public static final long MICROS_PER_MILLI = 1000L;
public static final long MICROS_PER_SECOND = MICROS_PER_MILLI * 1000;
public static final long MICROS_PER_MINUTE = MICROS_PER_SECOND * 60;
public static final long MICROS_PER_HOUR = MICROS_PER_MINUTE * 60;
public static final long MICROS_PER_DAY = MICROS_PER_HOUR * 24;
public static final long MICROS_PER_WEEK = MICROS_PER_DAY * 7;
public final int months;
public final int days;
public final long microseconds;
public long milliseconds() {
return this.microseconds / MICROS_PER_MILLI;
}
public CalendarInterval(int months, int days, long microseconds) {
this.months = months;
this.days = days;

View file

@ -23,7 +23,7 @@ import java.time.Duration;
import java.time.Period;
import static org.junit.Assert.*;
import static org.apache.spark.unsafe.types.CalendarInterval.*;
import static org.apache.spark.sql.catalyst.util.DateTimeConstants.*;
public class CalendarIntervalSuite {

View file

@ -32,10 +32,10 @@ import org.apache.avro.util.Utf8
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
/**
* A deserializer to deserialize data in avro format to data in catalyst format.
*/
@ -110,7 +110,7 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
// Before we upgrade Avro to 1.8 for logical type support, spark-avro converts Long to Date.
// For backward compatibility, we still keep this conversion.
case (LONG, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal, (value.asInstanceOf[Long] / DateTimeUtils.MILLIS_PER_DAY).toInt)
updater.setInt(ordinal, (value.asInstanceOf[Long] / MILLIS_PER_DAY).toInt)
case (FLOAT, FloatType) => (updater, ordinal, value) =>
updater.setFloat(ordinal, value.asInstanceOf[Float])

View file

@ -24,10 +24,10 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
/**
* Helper object for stream joins. See [[StreamingSymmetricHashJoinExec]] in SQL for more details.
*/
@ -264,7 +264,7 @@ object StreamingJoinHelper extends PredicateHelper with Logging {
s"watermark calculation. Use interval in terms of day instead.")
Literal(0.0)
} else {
Literal(calendarInterval.days * CalendarInterval.MICROS_PER_DAY.toDouble +
Literal(calendarInterval.days * MICROS_PER_DAY.toDouble +
calendarInterval.microseconds.toDouble)
}
case DoubleType =>

View file

@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

View file

@ -22,9 +22,9 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
case class TimeWindow(
timeColumn: Expression,
@ -108,7 +108,7 @@ object TimeWindow {
throw new IllegalArgumentException(
s"Intervals greater than a month is not supported ($interval).")
}
cal.days * CalendarInterval.MICROS_PER_DAY + cal.microseconds
cal.days * MICROS_PER_DAY + cal.microseconds
}
/**

View file

@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, MapData}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

View file

@ -28,14 +28,14 @@ import org.apache.spark.sql.catalyst.expressions.ArraySortLike.NullOrder
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.UTF8StringBuilder
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.array.ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
import org.apache.spark.unsafe.types.{ByteArray, UTF8String}
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.unsafe.types.{ByteArray, CalendarInterval, UTF8String}
import org.apache.spark.util.collection.OpenHashSet
/**
@ -2613,7 +2613,7 @@ object Sequence {
new CalendarInterval(0, 1, 0))
private val backedSequenceImpl = new IntegralSequenceImpl[T](dt)
private val microsPerDay = 24 * CalendarInterval.MICROS_PER_HOUR
private val microsPerDay = HOURS_PER_DAY * MICROS_PER_HOUR
// We choose a minimum days(28) in one month to calculate the `intervalStepInMicros`
// in order to make sure the estimated array length is long enough
private val microsPerMonth = 28 * microsPerDay

View file

@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

View file

@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.hash.Murmur3_x86_32
@ -902,14 +902,11 @@ object HiveHashFunction extends InterpretedHashFunction {
* with nanosecond values will lead to wrong output hashes (ie. non adherent with Hive output)
*/
def hashCalendarInterval(calendarInterval: CalendarInterval): Long = {
val totalMicroSeconds =
calendarInterval.days * CalendarInterval.MICROS_PER_DAY + calendarInterval.microseconds
val totalSeconds = totalMicroSeconds / CalendarInterval.MICROS_PER_SECOND.toInt
val totalMicroSeconds = calendarInterval.days * MICROS_PER_DAY + calendarInterval.microseconds
val totalSeconds = totalMicroSeconds / MICROS_PER_SECOND.toInt
val result: Int = (17 * 37) + (totalSeconds ^ totalSeconds >> 32).toInt
val nanoSeconds =
(totalMicroSeconds -
(totalSeconds * CalendarInterval.MICROS_PER_SECOND.toInt)).toInt * 1000
val nanoSeconds = (totalMicroSeconds - (totalSeconds * MICROS_PER_SECOND.toInt)).toInt * 1000
(result * 37) + nanoSeconds
}

View file

@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import com.google.common.util.concurrent.AtomicLongMap
import org.apache.spark.sql.catalyst.util.DateTimeUtils.NANOS_PER_SECOND
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
case class QueryExecutionMetering() {
private val timeMap = AtomicLongMap.create[String]()

View file

@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit._
import scala.util.control.NonFatal
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
@ -46,20 +47,6 @@ object DateTimeUtils {
// it's 2440587.5, rounding up to compatible with Hive
final val JULIAN_DAY_OF_EPOCH = 2440588
// Pre-calculated values can provide an opportunity of additional optimizations
// to the compiler like constants propagation and folding.
final val NANOS_PER_MICROS: Long = 1000
final val MICROS_PER_MILLIS: Long = 1000
final val MILLIS_PER_SECOND: Long = 1000
final val SECONDS_PER_DAY: Long = 24 * 60 * 60
final val MICROS_PER_SECOND: Long = MILLIS_PER_SECOND * MICROS_PER_MILLIS
final val NANOS_PER_MILLIS: Long = NANOS_PER_MICROS * MICROS_PER_MILLIS
final val NANOS_PER_SECOND: Long = NANOS_PER_MICROS * MICROS_PER_SECOND
final val MICROS_PER_DAY: Long = SECONDS_PER_DAY * MICROS_PER_SECOND
final val MILLIS_PER_MINUTE: Long = 60 * MILLIS_PER_SECOND
final val MILLIS_PER_HOUR: Long = 60 * MILLIS_PER_MINUTE
final val MILLIS_PER_DAY: Long = SECONDS_PER_DAY * MILLIS_PER_SECOND
// number of days between 1.1.1970 and 1.1.2001
final val to2001 = -11323

View file

@ -22,24 +22,11 @@ import java.util.concurrent.TimeUnit
import scala.util.control.NonFatal
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
object IntervalUtils {
final val MONTHS_PER_YEAR: Int = 12
final val MONTHS_PER_QUARTER: Byte = 3
final val YEARS_PER_MILLENNIUM: Int = 1000
final val YEARS_PER_CENTURY: Int = 100
final val YEARS_PER_DECADE: Int = 10
final val MICROS_PER_HOUR: Long =
DateTimeUtils.MILLIS_PER_HOUR * DateTimeUtils.MICROS_PER_MILLIS
final val MICROS_PER_MINUTE: Long =
DateTimeUtils.MILLIS_PER_MINUTE * DateTimeUtils.MICROS_PER_MILLIS
final val DAYS_PER_MONTH: Byte = 30
final val MICROS_PER_MONTH: Long = DAYS_PER_MONTH * DateTimeUtils.MICROS_PER_DAY
/* 365.25 days per year assumes leap year every four years */
final val MICROS_PER_YEAR: Long = (36525L * DateTimeUtils.MICROS_PER_DAY) / 100
final val DAYS_PER_WEEK: Byte = 7
def getYears(interval: CalendarInterval): Int = {
interval.months / MONTHS_PER_YEAR
@ -92,7 +79,7 @@ object IntervalUtils {
// Returns total number of seconds with microseconds fractional part in the given interval.
def getEpoch(interval: CalendarInterval): Decimal = {
var result = interval.microseconds
result += DateTimeUtils.MICROS_PER_DAY * interval.days
result += MICROS_PER_DAY * interval.days
result += MICROS_PER_YEAR * (interval.months / MONTHS_PER_YEAR)
result += MICROS_PER_MONTH * (interval.months % MONTHS_PER_YEAR)
Decimal(result, 18, 6)
@ -238,7 +225,7 @@ object IntervalUtils {
var micros = secondsFraction
micros = Math.addExact(micros, Math.multiplyExact(hours, MICROS_PER_HOUR))
micros = Math.addExact(micros, Math.multiplyExact(minutes, MICROS_PER_MINUTE))
micros = Math.addExact(micros, Math.multiplyExact(seconds, DateTimeUtils.MICROS_PER_SECOND))
micros = Math.addExact(micros, Math.multiplyExact(seconds, MICROS_PER_SECOND))
new CalendarInterval(0, sign * days, sign * micros)
} catch {
case e: Exception =>
@ -273,7 +260,7 @@ object IntervalUtils {
case "second" =>
microseconds = Math.addExact(microseconds, parseSecondNano(values(i)))
case "millisecond" =>
val millisUs = Math.multiplyExact(values(i).toLong, DateTimeUtils.MICROS_PER_MILLIS)
val millisUs = Math.multiplyExact(values(i).toLong, MICROS_PER_MILLIS)
microseconds = Math.addExact(microseconds, millisUs)
case "microsecond" =>
microseconds = Math.addExact(microseconds, values(i).toLong)
@ -295,7 +282,7 @@ object IntervalUtils {
(nanosStr + "000000000").substring(0, maxNanosLen)
} else nanosStr
val nanos = toLongWithRange("nanosecond", alignedStr, 0L, 999999999L)
val micros = nanos / DateTimeUtils.NANOS_PER_MICROS
val micros = nanos / NANOS_PER_MICROS
if (isNegative) -micros else micros
} else {
0L
@ -310,8 +297,8 @@ object IntervalUtils {
toLongWithRange(
"second",
secondsStr,
Long.MinValue / DateTimeUtils.MICROS_PER_SECOND,
Long.MaxValue / DateTimeUtils.MICROS_PER_SECOND) * DateTimeUtils.MICROS_PER_SECOND
Long.MinValue / MICROS_PER_SECOND,
Long.MaxValue / MICROS_PER_SECOND) * MICROS_PER_SECOND
}
secondNano.split("\\.") match {
@ -343,10 +330,10 @@ object IntervalUtils {
targetUnit: TimeUnit,
daysPerMonth: Int = 31): Long = {
val monthsDuration = Math.multiplyExact(
daysPerMonth * DateTimeUtils.MICROS_PER_DAY,
daysPerMonth * MICROS_PER_DAY,
interval.months)
val daysDuration = Math.multiplyExact(
DateTimeUtils.MICROS_PER_DAY,
MICROS_PER_DAY,
interval.days)
val result = Math.addExact(interval.microseconds, Math.addExact(daysDuration, monthsDuration))
targetUnit.convert(result, TimeUnit.MICROSECONDS)
@ -378,7 +365,7 @@ object IntervalUtils {
val truncatedMonths = Math.toIntExact(monthsWithFraction.toLong)
val days = daysWithFraction + DAYS_PER_MONTH * (monthsWithFraction - truncatedMonths)
val truncatedDays = Math.toIntExact(days.toLong)
val micros = microsWithFraction + DateTimeUtils.MICROS_PER_DAY * (days - truncatedDays)
val micros = microsWithFraction + MICROS_PER_DAY * (days - truncatedDays)
new CalendarInterval(truncatedMonths, truncatedDays, micros.round)
}
@ -481,7 +468,7 @@ object IntervalUtils {
case ' ' =>
state = BEGIN_UNIT_NAME
case '.' =>
fractionScale = (DateTimeUtils.NANOS_PER_SECOND / 10).toInt
fractionScale = (NANOS_PER_SECOND / 10).toInt
state = FRACTIONAL_PART
case _ => return null
}
@ -492,7 +479,7 @@ object IntervalUtils {
fraction += (b - '0') * fractionScale
fractionScale /= 10
case ' ' =>
fraction /= DateTimeUtils.NANOS_PER_MICROS.toInt
fraction /= NANOS_PER_MICROS.toInt
state = BEGIN_UNIT_NAME
case _ => return null
}
@ -527,7 +514,7 @@ object IntervalUtils {
microseconds = Math.addExact(microseconds, hoursUs)
i += hourStr.numBytes()
case 's' if s.matchAt(secondStr, i) =>
val secondsUs = Math.multiplyExact(currentValue, DateTimeUtils.MICROS_PER_SECOND)
val secondsUs = Math.multiplyExact(currentValue, MICROS_PER_SECOND)
microseconds = Math.addExact(Math.addExact(microseconds, secondsUs), fraction)
i += secondStr.numBytes()
case 'm' =>
@ -541,7 +528,7 @@ object IntervalUtils {
} else if (s.matchAt(millisStr, i)) {
val millisUs = Math.multiplyExact(
currentValue,
DateTimeUtils.MICROS_PER_MILLIS)
MICROS_PER_MILLIS)
microseconds = Math.addExact(microseconds, millisUs)
i += millisStr.numBytes()
} else if (s.matchAt(microsStr, i)) {

View file

@ -23,10 +23,10 @@ import scala.collection.mutable
import scala.util.Random
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
/**
* Random data generators for Spark SQL DataTypes. These generators do not generate uniformly random
* values; instead, they're biased to return "interesting" values (such as maximum / minimum values)
@ -172,7 +172,7 @@ object RandomDataGenerator {
// January 1, 1970, 00:00:00 GMT for "9999-12-31 23:59:59.999999".
milliseconds = rand.nextLong() % 253402329599999L
}
DateTimeUtils.toJavaDate((milliseconds / DateTimeUtils.MILLIS_PER_DAY).toInt)
DateTimeUtils.toJavaDate((milliseconds / MILLIS_PER_DAY).toInt)
}
Some(generator)
case TimestampType =>

View file

@ -25,6 +25,7 @@ import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@ -141,11 +142,11 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
"timestampFormat" -> "yyyy-MM-dd HH:mm:ss",
"dateFormat" -> "yyyy-MM-dd"), false, "UTC")
parser = new UnivocityParser(StructType(Seq.empty), timestampsOptions)
val expected = 1420070400 * DateTimeUtils.MICROS_PER_SECOND
val expected = 1420070400 * MICROS_PER_SECOND
assert(parser.makeConverter("_1", TimestampType).apply(timestamp) ==
expected)
assert(parser.makeConverter("_1", DateType).apply("2015-01-01") ==
expected / DateTimeUtils.MICROS_PER_DAY)
expected / MICROS_PER_DAY)
}
test("Throws exception for casting an invalid string to Float and Double Types") {

View file

@ -28,6 +28,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCoercion.numericPrecedence
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
@ -665,9 +666,9 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(Cast(Literal(""), CalendarIntervalType), null)
checkEvaluation(Cast(Literal("interval -3 month 1 day 7 hours"), CalendarIntervalType),
new CalendarInterval(-3, 1, 7 * CalendarInterval.MICROS_PER_HOUR))
new CalendarInterval(-3, 1, 7 * MICROS_PER_HOUR))
checkEvaluation(Cast(Literal.create(
new CalendarInterval(15, 9, -3 * CalendarInterval.MICROS_PER_HOUR), CalendarIntervalType),
new CalendarInterval(15, 9, -3 * MICROS_PER_HOUR), CalendarIntervalType),
StringType),
"1 years 3 months 9 days -3 hours")
checkEvaluation(Cast(Literal("INTERVAL 1 Second 1 microsecond"), CalendarIntervalType),

View file

@ -27,6 +27,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, IntervalUtils}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.array.ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
@ -914,7 +915,7 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
Literal(Date.valueOf("1970-02-01")),
Literal(IntervalUtils.fromString("interval 1 month").negate())),
EmptyRow,
s"sequence boundaries: 0 to 2678400000000 by -${28 * CalendarInterval.MICROS_PER_DAY}")
s"sequence boundaries: 0 to 2678400000000 by -${28 * MICROS_PER_DAY}")
}
}

View file

@ -29,6 +29,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneGMT
import org.apache.spark.sql.internal.SQLConf
@ -1053,11 +1054,11 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val nanos = 123456000
val timestamp = Epoch(MakeTimestamp(
Literal(2019), Literal(8), Literal(9), Literal(0), Literal(0),
Literal(Decimal(nanos / DateTimeUtils.NANOS_PER_SECOND.toDouble, 8, 6)),
Literal(Decimal(nanos / NANOS_PER_SECOND.toDouble, 8, 6)),
Some(Literal(zoneId.getId))))
val instant = LocalDateTime.of(2019, 8, 9, 0, 0, 0, nanos)
.atZone(zoneId).toInstant
val expected = Decimal(BigDecimal(nanos) / DateTimeUtils.NANOS_PER_SECOND +
val expected = Decimal(BigDecimal(nanos) / NANOS_PER_SECOND +
instant.getEpochSecond +
zoneId.getRules.getOffset(instant).getTotalSeconds)
checkEvaluation(timestamp, expected)

View file

@ -21,10 +21,10 @@ import java.time.LocalDateTime
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_HOUR
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{FloatType, TimestampType}
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.unsafe.types.CalendarInterval
class ExpressionSQLBuilderSuite extends SparkFunSuite {
@ -165,7 +165,7 @@ class ExpressionSQLBuilderSuite extends SparkFunSuite {
}
test("interval arithmetic") {
val interval = Literal(new CalendarInterval(0, 0, CalendarInterval.MICROS_PER_HOUR))
val interval = Literal(new CalendarInterval(0, 0, MICROS_PER_HOUR))
checkSQL(
TimeAdd('a, interval),

View file

@ -27,12 +27,12 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection}
import org.apache.spark.sql.catalyst.encoders.ExamplePointUDT
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper {
test("null") {
@ -187,7 +187,7 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper {
checkArrayLiteral(Array(1, 2, 3))
checkArrayLiteral(Array("a", "b", "c"))
checkArrayLiteral(Array(1.0, 4.0))
checkArrayLiteral(Array(CalendarInterval.MICROS_PER_DAY, CalendarInterval.MICROS_PER_HOUR))
checkArrayLiteral(Array(MICROS_PER_DAY, MICROS_PER_HOUR))
val arr = collection.mutable.WrappedArray.make(Array(1.0, 4.0))
checkEvaluation(Literal(arr), toCatalyst(arr))
}
@ -199,7 +199,7 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper {
checkSeqLiteral(Seq(1, 2, 3), IntegerType)
checkSeqLiteral(Seq("a", "b", "c"), StringType)
checkSeqLiteral(Seq(1.0, 4.0), DoubleType)
checkSeqLiteral(Seq(CalendarInterval.MICROS_PER_DAY, CalendarInterval.MICROS_PER_HOUR),
checkSeqLiteral(Seq(MICROS_PER_DAY, MICROS_PER_HOUR),
CalendarIntervalType)
}

View file

@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
import org.scalacheck.{Arbitrary, Gen}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
@ -107,7 +107,7 @@ object LiteralGenerator {
val minDay = LocalDate.of(1, 1, 1).toEpochDay
val maxDay = LocalDate.of(9999, 12, 31).toEpochDay
for { day <- Gen.choose(minDay, maxDay) }
yield Literal.create(new Date(day * DateTimeUtils.MILLIS_PER_DAY), DateType)
yield Literal.create(new Date(day * MILLIS_PER_DAY), DateType)
}
lazy val timestampLiteralGen: Gen[Literal] = {

View file

@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, _}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, IntervalUtils}
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
@ -633,13 +634,13 @@ class ExpressionParserSuite extends AnalysisTest {
Literal(new CalendarInterval(
0,
0,
-13 * DateTimeUtils.MICROS_PER_SECOND - 123 * DateTimeUtils.MICROS_PER_MILLIS - 456)))
-13 * MICROS_PER_SECOND - 123 * MICROS_PER_MILLIS - 456)))
checkIntervals(
"13.123456 second",
Literal(new CalendarInterval(
0,
0,
13 * DateTimeUtils.MICROS_PER_SECOND + 123 * DateTimeUtils.MICROS_PER_MILLIS + 456)))
13 * MICROS_PER_SECOND + 123 * MICROS_PER_MILLIS + 456)))
checkIntervals("1.001 second", Literal(IntervalUtils.fromString("1 second 1 millisecond")))
// Non Existing unit

View file

@ -27,9 +27,10 @@ import org.scalatest.Matchers
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.unsafe.types.UTF8String
class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
@ -397,15 +398,15 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
// transit from Pacific Standard Time to Pacific Daylight Time
assert(timestampAddInterval(
ts1, 0, 0, 23 * CalendarInterval.MICROS_PER_HOUR, TimeZonePST.toZoneId) === ts2)
ts1, 0, 0, 23 * MICROS_PER_HOUR, TimeZonePST.toZoneId) === ts2)
assert(timestampAddInterval(ts1, 0, 1, 0, TimeZonePST.toZoneId) === ts2)
// just a normal day
assert(timestampAddInterval(
ts3, 0, 0, 24 * CalendarInterval.MICROS_PER_HOUR, TimeZonePST.toZoneId) === ts4)
ts3, 0, 0, 24 * MICROS_PER_HOUR, TimeZonePST.toZoneId) === ts4)
assert(timestampAddInterval(ts3, 0, 1, 0, TimeZonePST.toZoneId) === ts4)
// transit from Pacific Daylight Time to Pacific Standard Time
assert(timestampAddInterval(
ts5, 0, 0, 25 * CalendarInterval.MICROS_PER_HOUR, TimeZonePST.toZoneId) === ts6)
ts5, 0, 0, 25 * MICROS_PER_HOUR, TimeZonePST.toZoneId) === ts6)
assert(timestampAddInterval(ts5, 0, 1, 0, TimeZonePST.toZoneId) === ts6)
}

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.util
import java.util.concurrent.TimeUnit
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{MICROS_PER_MILLIS, MICROS_PER_SECOND}
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.IntervalUtils._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

View file

@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
import org.apache.spark.sql.execution.metric.SQLMetrics
/**

View file

@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}

View file

@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.unsafe.types.CalendarInterval

View file

@ -26,7 +26,8 @@ import scala.collection.mutable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_SECOND
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, SparkDataStream}
import org.apache.spark.sql.execution.QueryExecution
@ -88,7 +89,7 @@ trait ProgressReporter extends Logging {
private var lastNoDataProgressEventTime = Long.MinValue
private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(getTimeZone("UTC"))
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
@volatile
protected var currentStatus: StreamingQueryStatus = {

View file

@ -21,9 +21,9 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.unsafe.types.CalendarInterval
private object Triggers {
def validate(intervalMs: Long): Unit = {
@ -35,7 +35,7 @@ private object Triggers {
if (cal.months != 0) {
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
}
TimeUnit.MICROSECONDS.toMillis(cal.microseconds + cal.days * CalendarInterval.MICROS_PER_DAY)
TimeUnit.MICROSECONDS.toMillis(cal.microseconds + cal.days * MICROS_PER_DAY)
}
def convert(interval: Duration): Long = interval.toMillis

View file

@ -41,6 +41,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, TestSQLContext}
import org.apache.spark.sql.test.SQLTestData._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
class SQLQuerySuite extends QueryTest with SharedSparkSession {
import testImplicits._
@ -1567,9 +1568,6 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
}
test("SPARK-8945: add and subtract expressions for interval type") {
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.unsafe.types.CalendarInterval.MICROS_PER_WEEK
val df = sql("select interval 3 years -3 month 7 week 123 microseconds as i")
checkAnswer(df, Row(new CalendarInterval(12 * 3 - 3, 7 * 7, 123)))

View file

@ -18,7 +18,7 @@
package org.apache.spark.sql.streaming
import java.io.File
import java.util.{Locale, TimeZone}
import java.util.Locale
import org.apache.commons.io.FileUtils
import org.scalatest.Assertions
@ -28,7 +28,7 @@ import org.apache.spark.rdd.BlockRDD
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.execution.streaming._
@ -348,25 +348,25 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
val inputData = MemoryStream[Long]
val aggregated =
inputData.toDF()
.select(($"value" * DateTimeUtils.SECONDS_PER_DAY).cast("timestamp").as("value"))
.select(($"value" * SECONDS_PER_DAY).cast("timestamp").as("value"))
.groupBy($"value")
.agg(count("*"))
.where($"value".cast("date") >= date_sub(current_timestamp().cast("date"), 10))
.select(
($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)")
($"value".cast("long") / SECONDS_PER_DAY).cast("long"), $"count(1)")
testStream(aggregated, Complete)(
StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
// advance clock to 10 days, should retain all keys
AddData(inputData, 0L, 5L, 5L, 10L),
AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
AdvanceManualClock(MILLIS_PER_DAY * 10),
CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
// advance clock to 20 days, should retain keys >= 10
AddData(inputData, 15L, 15L, 20L),
AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
AdvanceManualClock(MILLIS_PER_DAY * 10),
CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
// advance clock to 30 days, should retain keys >= 20
AddData(inputData, 85L),
AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
AdvanceManualClock(MILLIS_PER_DAY * 10),
CheckLastBatch((20L, 1), (85L, 1)),
// bounce stream and ensure correct batch timestamp is used
@ -376,7 +376,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
q.sink.asInstanceOf[MemorySink].clear()
q.commitLog.purge(3)
// advance by 60 days i.e., 90 days total
clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
clock.advance(MILLIS_PER_DAY * 60)
true
},
StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
@ -385,7 +385,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
// advance clock to 100 days, should retain keys >= 90
AddData(inputData, 85L, 90L, 100L, 105L),
AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
AdvanceManualClock(MILLIS_PER_DAY * 10),
CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
)
}