[SPARK-29486][SQL] CalendarInterval should have 3 fields: months, days and microseconds

### What changes were proposed in this pull request?
Current CalendarInterval has 2 fields: months and microseconds. This PR try to change it
to 3 fields: months, days and microseconds. This is because one logical day interval may
have different number of microseconds (daylight saving).

### Why are the changes needed?
One logical day interval may have different number of microseconds (daylight saving).
For example, in PST timezone, there will be 25 hours from 2019-11-2 12:00:00 to
2019-11-3 12:00:00

### Does this PR introduce any user-facing change?
no

### How was this patch tested?
unit test and new added test cases

Closes #26134 from LinhongLiu/calendarinterval.

Authored-by: Liu,Linhong <liulinhong@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Liu,Linhong 2019-11-01 18:12:33 +08:00 committed by Wenchen Fan
parent 8a4378c6f0
commit a4382f7fe1
42 changed files with 337 additions and 228 deletions

View file

@ -18,6 +18,7 @@
package org.apache.spark.unsafe.types;
import java.io.Serializable;
import java.util.Objects;
/**
* The internal representation of interval type.
@ -31,45 +32,50 @@ public final class CalendarInterval implements Serializable {
public static final long MICROS_PER_WEEK = MICROS_PER_DAY * 7;
public final int months;
public final int days;
public final long microseconds;
public long milliseconds() {
return this.microseconds / MICROS_PER_MILLI;
}
public CalendarInterval(int months, long microseconds) {
public CalendarInterval(int months, int days, long microseconds) {
this.months = months;
this.days = days;
this.microseconds = microseconds;
}
public CalendarInterval add(CalendarInterval that) {
int months = this.months + that.months;
int days = this.days + that.days;
long microseconds = this.microseconds + that.microseconds;
return new CalendarInterval(months, microseconds);
return new CalendarInterval(months, days, microseconds);
}
public CalendarInterval subtract(CalendarInterval that) {
int months = this.months - that.months;
int days = this.days - that.days;
long microseconds = this.microseconds - that.microseconds;
return new CalendarInterval(months, microseconds);
return new CalendarInterval(months, days, microseconds);
}
public CalendarInterval negate() {
return new CalendarInterval(-this.months, -this.microseconds);
return new CalendarInterval(-this.months, -this.days, -this.microseconds);
}
@Override
public boolean equals(Object other) {
if (this == other) return true;
if (other == null || !(other instanceof CalendarInterval)) return false;
CalendarInterval o = (CalendarInterval) other;
return this.months == o.months && this.microseconds == o.microseconds;
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CalendarInterval that = (CalendarInterval) o;
return months == that.months &&
days == that.days &&
microseconds == that.microseconds;
}
@Override
public int hashCode() {
return 31 * months + (int) microseconds;
return Objects.hash(months, days, microseconds);
}
@Override
@ -81,12 +87,13 @@ public final class CalendarInterval implements Serializable {
appendUnit(sb, months % 12, "month");
}
if (days != 0) {
appendUnit(sb, days / 7, "week");
appendUnit(sb, days % 7, "day");
}
if (microseconds != 0) {
long rest = microseconds;
appendUnit(sb, rest / MICROS_PER_WEEK, "week");
rest %= MICROS_PER_WEEK;
appendUnit(sb, rest / MICROS_PER_DAY, "day");
rest %= MICROS_PER_DAY;
appendUnit(sb, rest / MICROS_PER_HOUR, "hour");
rest %= MICROS_PER_HOUR;
appendUnit(sb, rest / MICROS_PER_MINUTE, "minute");
@ -96,7 +103,7 @@ public final class CalendarInterval implements Serializable {
appendUnit(sb, rest / MICROS_PER_MILLI, "millisecond");
rest %= MICROS_PER_MILLI;
appendUnit(sb, rest, "microsecond");
} else if (months == 0) {
} else if (months == 0 && days == 0) {
sb.append(" 0 microseconds");
}

View file

@ -26,59 +26,72 @@ public class CalendarIntervalSuite {
@Test
public void equalsTest() {
CalendarInterval i1 = new CalendarInterval(3, 123);
CalendarInterval i2 = new CalendarInterval(3, 321);
CalendarInterval i3 = new CalendarInterval(1, 123);
CalendarInterval i4 = new CalendarInterval(3, 123);
CalendarInterval i1 = new CalendarInterval(3, 2, 123);
CalendarInterval i2 = new CalendarInterval(3, 2,321);
CalendarInterval i3 = new CalendarInterval(3, 4,123);
CalendarInterval i4 = new CalendarInterval(1, 2, 123);
CalendarInterval i5 = new CalendarInterval(1, 4, 321);
CalendarInterval i6 = new CalendarInterval(3, 2, 123);
assertNotSame(i1, i2);
assertNotSame(i1, i3);
assertNotSame(i1, i4);
assertNotSame(i2, i3);
assertEquals(i1, i4);
assertNotSame(i2, i4);
assertNotSame(i3, i4);
assertNotSame(i1, i5);
assertEquals(i1, i6);
}
@Test
public void toStringTest() {
CalendarInterval i;
i = new CalendarInterval(0, 0);
i = new CalendarInterval(0, 0, 0);
assertEquals("interval 0 microseconds", i.toString());
i = new CalendarInterval(34, 0);
i = new CalendarInterval(34, 0, 0);
assertEquals("interval 2 years 10 months", i.toString());
i = new CalendarInterval(-34, 0);
i = new CalendarInterval(-34, 0, 0);
assertEquals("interval -2 years -10 months", i.toString());
i = new CalendarInterval(0, 3 * MICROS_PER_WEEK + 13 * MICROS_PER_HOUR + 123);
assertEquals("interval 3 weeks 13 hours 123 microseconds", i.toString());
i = new CalendarInterval(0, 31, 0);
assertEquals("interval 4 weeks 3 days", i.toString());
i = new CalendarInterval(0, -3 * MICROS_PER_WEEK - 13 * MICROS_PER_HOUR - 123);
assertEquals("interval -3 weeks -13 hours -123 microseconds", i.toString());
i = new CalendarInterval(0, -31, 0);
assertEquals("interval -4 weeks -3 days", i.toString());
i = new CalendarInterval(34, 3 * MICROS_PER_WEEK + 13 * MICROS_PER_HOUR + 123);
assertEquals("interval 2 years 10 months 3 weeks 13 hours 123 microseconds", i.toString());
i = new CalendarInterval(0, 0, 3 * MICROS_PER_HOUR + 13 * MICROS_PER_MINUTE + 123);
assertEquals("interval 3 hours 13 minutes 123 microseconds", i.toString());
i = new CalendarInterval(0, 0, -3 * MICROS_PER_HOUR - 13 * MICROS_PER_MINUTE - 123);
assertEquals("interval -3 hours -13 minutes -123 microseconds", i.toString());
i = new CalendarInterval(34, 31, 3 * MICROS_PER_HOUR + 13 * MICROS_PER_MINUTE + 123);
assertEquals("interval 2 years 10 months 4 weeks 3 days 3 hours 13 minutes 123 microseconds",
i.toString());
}
@Test
public void addTest() {
CalendarInterval input1 = new CalendarInterval(3, 1 * MICROS_PER_HOUR);
CalendarInterval input2 = new CalendarInterval(2, 100 * MICROS_PER_HOUR);
assertEquals(input1.add(input2), new CalendarInterval(5, 101 * MICROS_PER_HOUR));
CalendarInterval input1 = new CalendarInterval(3, 1, 1 * MICROS_PER_HOUR);
CalendarInterval input2 = new CalendarInterval(2, 4, 100 * MICROS_PER_HOUR);
assertEquals(input1.add(input2), new CalendarInterval(5, 5, 101 * MICROS_PER_HOUR));
input1 = new CalendarInterval(-10, -81 * MICROS_PER_HOUR);
input2 = new CalendarInterval(75, 200 * MICROS_PER_HOUR);
assertEquals(input1.add(input2), new CalendarInterval(65, 119 * MICROS_PER_HOUR));
input1 = new CalendarInterval(-10, -30, -81 * MICROS_PER_HOUR);
input2 = new CalendarInterval(75, 150, 200 * MICROS_PER_HOUR);
assertEquals(input1.add(input2), new CalendarInterval(65, 120, 119 * MICROS_PER_HOUR));
}
@Test
public void subtractTest() {
CalendarInterval input1 = new CalendarInterval(3, 1 * MICROS_PER_HOUR);
CalendarInterval input2 = new CalendarInterval(2, 100 * MICROS_PER_HOUR);
assertEquals(input1.subtract(input2), new CalendarInterval(1, -99 * MICROS_PER_HOUR));
CalendarInterval input1 = new CalendarInterval(3, 1, 1 * MICROS_PER_HOUR);
CalendarInterval input2 = new CalendarInterval(2, 4, 100 * MICROS_PER_HOUR);
assertEquals(input1.subtract(input2), new CalendarInterval(1, -3, -99 * MICROS_PER_HOUR));
input1 = new CalendarInterval(-10, -81 * MICROS_PER_HOUR);
input2 = new CalendarInterval(75, 200 * MICROS_PER_HOUR);
assertEquals(input1.subtract(input2), new CalendarInterval(-85, -281 * MICROS_PER_HOUR));
input1 = new CalendarInterval(-10, -30, -81 * MICROS_PER_HOUR);
input2 = new CalendarInterval(75, 150, 200 * MICROS_PER_HOUR);
assertEquals(input1.subtract(input2), new CalendarInterval(-85, -180, -281 * MICROS_PER_HOUR));
}
}

View file

@ -230,9 +230,10 @@ public final class UnsafeArrayData extends ArrayData implements Externalizable,
if (isNullAt(ordinal)) return null;
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int months = (int) Platform.getLong(baseObject, baseOffset + offset);
final int months = Platform.getInt(baseObject, baseOffset + offset);
final int days = Platform.getInt(baseObject, baseOffset + offset + 4);
final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 8);
return new CalendarInterval(months, microseconds);
return new CalendarInterval(months, days, microseconds);
}
@Override

View file

@ -401,9 +401,10 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
} else {
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int months = (int) Platform.getLong(baseObject, baseOffset + offset);
final int months = Platform.getInt(baseObject, baseOffset + offset);
final int days = Platform.getInt(baseObject, baseOffset + offset + 4);
final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 8);
return new CalendarInterval(months, microseconds);
return new CalendarInterval(months, days, microseconds);
}
}

View file

@ -134,8 +134,9 @@ public abstract class UnsafeWriter {
// grow the global buffer before writing data.
grow(16);
// Write the months and microseconds fields of Interval to the variable length portion.
Platform.putLong(getBuffer(), cursor(), input.months);
// Write the months, days and microseconds fields of Interval to the variable length portion.
Platform.putInt(getBuffer(), cursor(), input.months);
Platform.putInt(getBuffer(), cursor() + 4, input.days);
Platform.putLong(getBuffer(), cursor() + 8, input.microseconds);
setOffsetAndSize(ordinal, 16);

View file

@ -267,21 +267,24 @@ public abstract class ColumnVector implements AutoCloseable {
* Returns the calendar interval type value for rowId. If the slot for rowId is null, it should
* return null.
*
* In Spark, calendar interval type value is basically an integer value representing the number of
* months in this interval, and a long value representing the number of microseconds in this
* interval. An interval type vector is the same as a struct type vector with 2 fields: `months`
* and `microseconds`.
* In Spark, calendar interval type value is basically two integer values representing the number
* of months and days in this interval, and a long value representing the number of microseconds
* in this interval. An interval type vector is the same as a struct type vector with 3 fields:
* `months`, `days` and `microseconds`.
*
* To support interval type, implementations must implement {@link #getChild(int)} and define 2
* To support interval type, implementations must implement {@link #getChild(int)} and define 3
* child vectors: the first child vector is an int type vector, containing all the month values of
* all the interval values in this vector. The second child vector is a long type vector,
* containing all the microsecond values of all the interval values in this vector.
* all the interval values in this vector. The second child vector is an int type vector,
* containing all the day values of all the interval values in this vector. The third child vector
* is a long type vector, containing all the microsecond values of all the interval values in this
* vector.
*/
public final CalendarInterval getInterval(int rowId) {
if (isNullAt(rowId)) return null;
final int months = getChild(0).getInt(rowId);
final long microseconds = getChild(1).getLong(rowId);
return new CalendarInterval(months, microseconds);
final int days = getChild(1).getInt(rowId);
final long microseconds = getChild(2).getLong(rowId);
return new CalendarInterval(months, days, microseconds);
}
/**

View file

@ -264,7 +264,8 @@ object StreamingJoinHelper extends PredicateHelper with Logging {
s"watermark calculation. Use interval in terms of day instead.")
Literal(0.0)
} else {
Literal(calendarInterval.microseconds.toDouble)
Literal(calendarInterval.days * CalendarInterval.MICROS_PER_DAY.toDouble +
calendarInterval.microseconds.toDouble)
}
case DoubleType =>
Multiply(lit, Literal(1000000.0))

View file

@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
case class TimeWindow(
timeColumn: Expression,
@ -107,7 +108,7 @@ object TimeWindow {
throw new IllegalArgumentException(
s"Intervals greater than a month is not supported ($interval).")
}
cal.microseconds
cal.days * CalendarInterval.MICROS_PER_DAY + cal.microseconds
}
/**

View file

@ -2610,25 +2610,33 @@ object Sequence {
override val defaultStep: DefaultStep = new DefaultStep(
(dt.ordering.lteq _).asInstanceOf[LessThanOrEqualFn],
CalendarIntervalType,
new CalendarInterval(0, MICROS_PER_DAY))
new CalendarInterval(0, 1, 0))
private val backedSequenceImpl = new IntegralSequenceImpl[T](dt)
private val microsPerMonth = 28 * CalendarInterval.MICROS_PER_DAY
private val microsPerDay = 24 * CalendarInterval.MICROS_PER_HOUR
// We choose a minimum days(28) in one month to calculate the `intervalStepInMicros`
// in order to make sure the estimated array length is long enough
private val microsPerMonth = 28 * microsPerDay
override def eval(input1: Any, input2: Any, input3: Any): Array[T] = {
val start = input1.asInstanceOf[T]
val stop = input2.asInstanceOf[T]
val step = input3.asInstanceOf[CalendarInterval]
val stepMonths = step.months
val stepDays = step.days
val stepMicros = step.microseconds
if (stepMonths == 0) {
backedSequenceImpl.eval(start, stop, fromLong(stepMicros / scale))
if (stepMonths == 0 && stepMicros == 0 && scale == MICROS_PER_DAY) {
backedSequenceImpl.eval(start, stop, fromLong(stepDays))
} else if (stepMonths == 0 && stepDays == 0 && scale == 1) {
backedSequenceImpl.eval(start, stop, fromLong(stepMicros))
} else {
// To estimate the resulted array length we need to make assumptions
// about a month length in microseconds
val intervalStepInMicros = stepMicros + stepMonths * microsPerMonth
// about a month length in days and a day length in microseconds
val intervalStepInMicros =
stepMicros + stepMonths * microsPerMonth + stepDays * microsPerDay
val startMicros: Long = num.toLong(start) * scale
val stopMicros: Long = num.toLong(stop) * scale
val maxEstimatedArrayLength =
@ -2643,7 +2651,8 @@ object Sequence {
while (t < exclusiveItem ^ stepSign < 0) {
arr(i) = fromLong(t / scale)
i += 1
t = timestampAddInterval(startMicros, i * stepMonths, i * stepMicros, zoneId)
t = timestampAddInterval(
startMicros, i * stepMonths, i * stepDays, i * stepMicros, zoneId)
}
// truncate array to the correct length
@ -2659,6 +2668,7 @@ object Sequence {
arr: String,
elemType: String): String = {
val stepMonths = ctx.freshName("stepMonths")
val stepDays = ctx.freshName("stepDays")
val stepMicros = ctx.freshName("stepMicros")
val stepScaled = ctx.freshName("stepScaled")
val intervalInMicros = ctx.freshName("intervalInMicros")
@ -2673,18 +2683,21 @@ object Sequence {
val sequenceLengthCode =
s"""
|final long $intervalInMicros = $stepMicros + $stepMonths * ${microsPerMonth}L;
|final long $intervalInMicros =
| $stepMicros + $stepMonths * ${microsPerMonth}L + $stepDays * ${microsPerDay}L;
|${genSequenceLengthCode(ctx, startMicros, stopMicros, intervalInMicros, arrLength)}
""".stripMargin
s"""
|final int $stepMonths = $step.months;
|final int $stepDays = $step.days;
|final long $stepMicros = $step.microseconds;
|
|if ($stepMonths == 0) {
| final $elemType $stepScaled = ($elemType) ($stepMicros / ${scale}L);
| ${backedSequenceImpl.genCode(ctx, start, stop, stepScaled, arr, elemType)};
|if ($stepMonths == 0 && $stepMicros == 0 && ${scale}L == ${MICROS_PER_DAY}L) {
| ${backedSequenceImpl.genCode(ctx, start, stop, stepDays, arr, elemType)};
|
|} else if ($stepMonths == 0 && $stepDays == 0 && ${scale}L == 1) {
| ${backedSequenceImpl.genCode(ctx, start, stop, stepMicros, arr, elemType)};
|} else {
| final long $startMicros = $start * ${scale}L;
| final long $stopMicros = $stop * ${scale}L;
@ -2702,7 +2715,7 @@ object Sequence {
| $arr[$i] = ($elemType) ($t / ${scale}L);
| $i += 1;
| $t = org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampAddInterval(
| $startMicros, $i * $stepMonths, $i * $stepMicros, $zid);
| $startMicros, $i * $stepMonths, $i * $stepDays, $i * $stepMicros, $zid);
| }
|
| if ($arr.length > $i) {

View file

@ -1090,14 +1090,14 @@ case class TimeAdd(start: Expression, interval: Expression, timeZoneId: Option[S
override def nullSafeEval(start: Any, interval: Any): Any = {
val itvl = interval.asInstanceOf[CalendarInterval]
DateTimeUtils.timestampAddInterval(
start.asInstanceOf[Long], itvl.months, itvl.microseconds, zoneId)
start.asInstanceOf[Long], itvl.months, itvl.days, itvl.microseconds, zoneId)
}
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (sd, i) => {
s"""$dtu.timestampAddInterval($sd, $i.months, $i.microseconds, $zid)"""
s"""$dtu.timestampAddInterval($sd, $i.months, $i.days, $i.microseconds, $zid)"""
})
}
}
@ -1205,14 +1205,14 @@ case class TimeSub(start: Expression, interval: Expression, timeZoneId: Option[S
override def nullSafeEval(start: Any, interval: Any): Any = {
val itvl = interval.asInstanceOf[CalendarInterval]
DateTimeUtils.timestampAddInterval(
start.asInstanceOf[Long], 0 - itvl.months, 0 - itvl.microseconds, zoneId)
start.asInstanceOf[Long], 0 - itvl.months, 0 - itvl.days, 0 - itvl.microseconds, zoneId)
}
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (sd, i) => {
s"""$dtu.timestampAddInterval($sd, 0 - $i.months, 0 - $i.microseconds, $zid)"""
s"""$dtu.timestampAddInterval($sd, 0 - $i.months, 0 - $i.days, 0 - $i.microseconds, $zid)"""
})
}
}
@ -2121,7 +2121,7 @@ case class DatePart(field: Expression, source: Expression, child: Expression)
}
/**
* Returns the interval from startTimestamp to endTimestamp in which the `months` field
* Returns the interval from startTimestamp to endTimestamp in which the `months` and `day` field
* is set to 0 and the `microseconds` field is initialized to the microsecond difference
* between the given timestamps.
*/
@ -2134,12 +2134,12 @@ case class SubtractTimestamps(endTimestamp: Expression, startTimestamp: Expressi
override def dataType: DataType = CalendarIntervalType
override def nullSafeEval(end: Any, start: Any): Any = {
new CalendarInterval(0, end.asInstanceOf[Long] - start.asInstanceOf[Long])
new CalendarInterval(0, 0, end.asInstanceOf[Long] - start.asInstanceOf[Long])
}
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
defineCodeGen(ctx, ev, (end, start) =>
s"new org.apache.spark.unsafe.types.CalendarInterval(0, $end - $start)")
s"new org.apache.spark.unsafe.types.CalendarInterval(0, 0, $end - $start)")
}
}

View file

@ -495,7 +495,7 @@ abstract class InterpretedHashFunction {
val bytes = d.toJavaBigDecimal.unscaledValue().toByteArray
hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, seed)
}
case c: CalendarInterval => hashInt(c.months, hashLong(c.microseconds, seed))
case c: CalendarInterval => hashInt(c.months, hashInt(c.days, hashLong(c.microseconds, seed)))
case a: Array[Byte] =>
hashUnsafeBytes(a, Platform.BYTE_ARRAY_OFFSET, a.length, seed)
case s: UTF8String =>
@ -902,11 +902,13 @@ object HiveHashFunction extends InterpretedHashFunction {
* with nanosecond values will lead to wrong output hashes (ie. non adherent with Hive output)
*/
def hashCalendarInterval(calendarInterval: CalendarInterval): Long = {
val totalSeconds = calendarInterval.microseconds / CalendarInterval.MICROS_PER_SECOND.toInt
val totalMicroSeconds =
calendarInterval.days * CalendarInterval.MICROS_PER_DAY + calendarInterval.microseconds
val totalSeconds = totalMicroSeconds / CalendarInterval.MICROS_PER_SECOND.toInt
val result: Int = (17 * 37) + (totalSeconds ^ totalSeconds >> 32).toInt
val nanoSeconds =
(calendarInterval.microseconds -
(totalMicroSeconds -
(totalSeconds * CalendarInterval.MICROS_PER_SECOND.toInt)).toInt * 1000
(result * 37) + nanoSeconds
}

View file

@ -63,10 +63,10 @@ case class ExtractIntervalMonths(child: Expression)
extends ExtractIntervalPart(child, ByteType, getMonths, "getMonths")
case class ExtractIntervalDays(child: Expression)
extends ExtractIntervalPart(child, LongType, getDays, "getDays")
extends ExtractIntervalPart(child, IntegerType, getDays, "getDays")
case class ExtractIntervalHours(child: Expression)
extends ExtractIntervalPart(child, ByteType, getHours, "getHours")
extends ExtractIntervalPart(child, LongType, getHours, "getHours")
case class ExtractIntervalMinutes(child: Expression)
extends ExtractIntervalPart(child, ByteType, getMinutes, "getMinutes")

View file

@ -162,7 +162,7 @@ object Literal {
case TimestampType => create(0L, TimestampType)
case StringType => Literal("")
case BinaryType => Literal("".getBytes(StandardCharsets.UTF_8))
case CalendarIntervalType => Literal(new CalendarInterval(0, 0))
case CalendarIntervalType => Literal(new CalendarInterval(0, 0, 0))
case arr: ArrayType => create(Array(), arr)
case map: MapType => create(Map(), map)
case struct: StructType =>

View file

@ -575,11 +575,13 @@ object DateTimeUtils {
def timestampAddInterval(
start: SQLTimestamp,
months: Int,
days: Int,
microseconds: Long,
zoneId: ZoneId): SQLTimestamp = {
val resultTimestamp = microsToInstant(start)
.atZone(zoneId)
.plusMonths(months)
.plusDays(days)
.plus(microseconds, ChronoUnit.MICROS)
instantToMicros(resultTimestamp.toInstant)
}
@ -963,7 +965,7 @@ object DateTimeUtils {
LocalDate.ofEpochDay(startDate),
LocalDate.ofEpochDay(endDate))
val months = period.getMonths + 12 * period.getYears
val microseconds = period.getDays * MICROS_PER_DAY
new CalendarInterval(months, microseconds)
val days = period.getDays
new CalendarInterval(months, days, 0)
}
}

View file

@ -64,12 +64,12 @@ object IntervalUtils {
(getMonths(interval) / MONTHS_PER_QUARTER + 1).toByte
}
def getDays(interval: CalendarInterval): Long = {
interval.microseconds / DateTimeUtils.MICROS_PER_DAY
def getDays(interval: CalendarInterval): Int = {
interval.days
}
def getHours(interval: CalendarInterval): Byte = {
((interval.microseconds % DateTimeUtils.MICROS_PER_DAY) / MICROS_PER_HOUR).toByte
def getHours(interval: CalendarInterval): Long = {
interval.microseconds / MICROS_PER_HOUR
}
def getMinutes(interval: CalendarInterval): Byte = {
@ -91,6 +91,7 @@ object IntervalUtils {
// Returns total number of seconds with microseconds fractional part in the given interval.
def getEpoch(interval: CalendarInterval): Decimal = {
var result = interval.microseconds
result += DateTimeUtils.MICROS_PER_DAY * interval.days
result += MICROS_PER_YEAR * (interval.months / MONTHS_PER_YEAR)
result += MICROS_PER_MONTH * (interval.months % MONTHS_PER_YEAR)
Decimal(result, 18, 6)
@ -150,7 +151,7 @@ object IntervalUtils {
val years = toLongWithRange("year", yearStr, 0, Integer.MAX_VALUE).toInt
val months = toLongWithRange("month", monthStr, 0, 11).toInt
val totalMonths = Math.addExact(Math.multiplyExact(years, 12), months)
new CalendarInterval(totalMonths, 0)
new CalendarInterval(totalMonths, 0, 0)
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(
@ -201,7 +202,7 @@ object IntervalUtils {
val days = if (m.group(2) == null) {
0
} else {
toLongWithRange("day", m.group(3), 0, Integer.MAX_VALUE)
toLongWithRange("day", m.group(3), 0, Integer.MAX_VALUE).toInt
}
var hours: Long = 0L
var minutes: Long = 0L
@ -234,11 +235,10 @@ object IntervalUtils {
s"Cannot support (interval '$input' $from to $to) expression")
}
var micros = secondsFraction
micros = Math.addExact(micros, Math.multiplyExact(days, DateTimeUtils.MICROS_PER_DAY))
micros = Math.addExact(micros, Math.multiplyExact(hours, MICROS_PER_HOUR))
micros = Math.addExact(micros, Math.multiplyExact(minutes, MICROS_PER_MINUTE))
micros = Math.addExact(micros, Math.multiplyExact(seconds, DateTimeUtils.MICROS_PER_SECOND))
new CalendarInterval(0, sign * micros)
new CalendarInterval(0, sign * days, sign * micros)
} catch {
case e: Exception =>
throw new IllegalArgumentException(
@ -249,6 +249,7 @@ object IntervalUtils {
def fromUnitStrings(units: Array[String], values: Array[String]): CalendarInterval = {
assert(units.length == values.length)
var months: Int = 0
var days: Int = 0
var microseconds: Long = 0
var i = 0
while (i < units.length) {
@ -259,11 +260,9 @@ object IntervalUtils {
case "month" =>
months = Math.addExact(months, values(i).toInt)
case "week" =>
val weeksUs = Math.multiplyExact(values(i).toLong, 7 * DateTimeUtils.MICROS_PER_DAY)
microseconds = Math.addExact(microseconds, weeksUs)
days = Math.addExact(days, Math.multiplyExact(values(i).toInt, 7))
case "day" =>
val daysUs = Math.multiplyExact(values(i).toLong, DateTimeUtils.MICROS_PER_DAY)
microseconds = Math.addExact(microseconds, daysUs)
days = Math.addExact(days, values(i).toInt)
case "hour" =>
val hoursUs = Math.multiplyExact(values(i).toLong, MICROS_PER_HOUR)
microseconds = Math.addExact(microseconds, hoursUs)
@ -284,7 +283,7 @@ object IntervalUtils {
}
i += 1
}
new CalendarInterval(months, microseconds)
new CalendarInterval(months, days, microseconds)
}
// Parses a string with nanoseconds, truncates the result and returns microseconds
@ -345,7 +344,10 @@ object IntervalUtils {
val monthsDuration = Math.multiplyExact(
daysPerMonth * DateTimeUtils.MICROS_PER_DAY,
interval.months)
val result = Math.addExact(interval.microseconds, monthsDuration)
val daysDuration = Math.multiplyExact(
DateTimeUtils.MICROS_PER_DAY,
interval.days)
val result = Math.addExact(interval.microseconds, Math.addExact(daysDuration, monthsDuration))
targetUnit.convert(result, TimeUnit.MICROSECONDS)
}

View file

@ -193,8 +193,9 @@ object RandomDataGenerator {
Some(generator)
case CalendarIntervalType => Some(() => {
val months = rand.nextInt(1000)
val days = rand.nextInt(10000)
val ns = rand.nextLong()
new CalendarInterval(months, ns)
new CalendarInterval(months, days, ns)
})
case DecimalType.Fixed(precision, scale) => Some(
() => BigDecimal.apply(

View file

@ -1405,7 +1405,7 @@ class TypeCoercionSuite extends AnalysisTest {
val dateTimeOperations = TypeCoercion.DateTimeOperations
val date = Literal(new java.sql.Date(0L))
val timestamp = Literal(new Timestamp(0L))
val interval = Literal(new CalendarInterval(0, 0))
val interval = Literal(new CalendarInterval(0, 0, 0))
val str = Literal("2015-01-01")
val intValue = Literal(0, IntegerType)

View file

@ -664,16 +664,16 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper {
import org.apache.spark.unsafe.types.CalendarInterval
checkEvaluation(Cast(Literal(""), CalendarIntervalType), null)
checkEvaluation(Cast(Literal("interval -3 month 7 hours"), CalendarIntervalType),
new CalendarInterval(-3, 7 * CalendarInterval.MICROS_PER_HOUR))
checkEvaluation(Cast(Literal("interval -3 month 1 day 7 hours"), CalendarIntervalType),
new CalendarInterval(-3, 1, 7 * CalendarInterval.MICROS_PER_HOUR))
checkEvaluation(Cast(Literal.create(
new CalendarInterval(15, -3 * CalendarInterval.MICROS_PER_DAY), CalendarIntervalType),
new CalendarInterval(15, 9, -3 * CalendarInterval.MICROS_PER_HOUR), CalendarIntervalType),
StringType),
"interval 1 years 3 months -3 days")
"interval 1 years 3 months 1 weeks 2 days -3 hours")
checkEvaluation(Cast(Literal("INTERVAL 1 Second 1 microsecond"), CalendarIntervalType),
new CalendarInterval(0, 1000001))
new CalendarInterval(0, 0, 1000001))
checkEvaluation(Cast(Literal("1 MONTH 1 Microsecond"), CalendarIntervalType),
new CalendarInterval(1, 1))
new CalendarInterval(1, 0, 1))
}
test("cast string to boolean") {

View file

@ -377,15 +377,15 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(
TimeAdd(
Literal(new Timestamp(sdf.parse("2016-01-29 10:00:00.000").getTime)),
Literal(new CalendarInterval(1, 123000L)),
Literal(new CalendarInterval(1, 2, 123000L)),
timeZoneId),
DateTimeUtils.fromJavaTimestamp(
new Timestamp(sdf.parse("2016-02-29 10:00:00.123").getTime)))
new Timestamp(sdf.parse("2016-03-02 10:00:00.123").getTime)))
checkEvaluation(
TimeAdd(
Literal.create(null, TimestampType),
Literal(new CalendarInterval(1, 123000L)),
Literal(new CalendarInterval(1, 2, 123000L)),
timeZoneId),
null)
checkEvaluation(
@ -415,22 +415,36 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(
TimeSub(
Literal(new Timestamp(sdf.parse("2016-03-31 10:00:00.000").getTime)),
Literal(new CalendarInterval(1, 0)),
Literal(new CalendarInterval(1, 0, 0)),
timeZoneId),
DateTimeUtils.fromJavaTimestamp(
new Timestamp(sdf.parse("2016-02-29 10:00:00.000").getTime)))
checkEvaluation(
TimeSub(
Literal(new Timestamp(sdf.parse("2016-03-31 10:00:00.000").getTime)),
Literal(new CalendarInterval(1, 1, 0)),
timeZoneId),
DateTimeUtils.fromJavaTimestamp(
new Timestamp(sdf.parse("2016-02-28 10:00:00.000").getTime)))
checkEvaluation(
TimeSub(
Literal(new Timestamp(sdf.parse("2016-03-30 00:00:01.000").getTime)),
Literal(new CalendarInterval(1, 2000000.toLong)),
Literal(new CalendarInterval(1, 0, 2000000.toLong)),
timeZoneId),
DateTimeUtils.fromJavaTimestamp(
new Timestamp(sdf.parse("2016-02-28 23:59:59.000").getTime)))
checkEvaluation(
TimeSub(
Literal(new Timestamp(sdf.parse("2016-03-30 00:00:01.000").getTime)),
Literal(new CalendarInterval(1, 1, 2000000.toLong)),
timeZoneId),
DateTimeUtils.fromJavaTimestamp(
new Timestamp(sdf.parse("2016-02-27 23:59:59.000").getTime)))
checkEvaluation(
TimeSub(
Literal.create(null, TimestampType),
Literal(new CalendarInterval(1, 123000L)),
Literal(new CalendarInterval(1, 2, 123000L)),
timeZoneId),
null)
checkEvaluation(
@ -1073,25 +1087,25 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
test("timestamps difference") {
val end = Instant.parse("2019-10-04T11:04:01.123456Z")
checkEvaluation(SubtractTimestamps(Literal(end), Literal(end)),
new CalendarInterval(0, 0))
new CalendarInterval(0, 0, 0))
checkEvaluation(SubtractTimestamps(Literal(end), Literal(Instant.EPOCH)),
IntervalUtils.fromString("interval 18173 days " +
"11 hours 4 minutes 1 seconds 123 milliseconds 456 microseconds"))
IntervalUtils.fromString("interval " +
"436163 hours 4 minutes 1 seconds 123 milliseconds 456 microseconds"))
checkEvaluation(SubtractTimestamps(Literal(Instant.EPOCH), Literal(end)),
IntervalUtils.fromString("interval -18173 days " +
"-11 hours -4 minutes -1 seconds -123 milliseconds -456 microseconds"))
IntervalUtils.fromString("interval " +
"-436163 hours -4 minutes -1 seconds -123 milliseconds -456 microseconds"))
checkEvaluation(
SubtractTimestamps(
Literal(Instant.parse("9999-12-31T23:59:59.999999Z")),
Literal(Instant.parse("0001-01-01T00:00:00Z"))),
IntervalUtils.fromString("interval 521722 weeks 4 days " +
"23 hours 59 minutes 59 seconds 999 milliseconds 999 microseconds"))
IntervalUtils.fromString("interval " +
"87649415 hours 59 minutes 59 seconds 999 milliseconds 999 microseconds"))
}
test("subtract dates") {
val end = LocalDate.of(2019, 10, 5)
checkEvaluation(SubtractDates(Literal(end), Literal(end)),
new CalendarInterval(0, 0))
new CalendarInterval(0, 0, 0))
checkEvaluation(SubtractDates(Literal(end.plusDays(1)), Literal(end)),
IntervalUtils.fromString("interval 1 days"))
checkEvaluation(SubtractDates(Literal(end.minusDays(1)), Literal(end)),

View file

@ -165,16 +165,16 @@ class ExpressionSQLBuilderSuite extends SparkFunSuite {
}
test("interval arithmetic") {
val interval = Literal(new CalendarInterval(0, CalendarInterval.MICROS_PER_DAY))
val interval = Literal(new CalendarInterval(0, 0, CalendarInterval.MICROS_PER_HOUR))
checkSQL(
TimeAdd('a, interval),
"`a` + interval 1 days"
"`a` + interval 1 hours"
)
checkSQL(
TimeSub('a, interval),
"`a` - interval 1 days"
"`a` - interval 1 hours"
)
}
}

View file

@ -103,25 +103,27 @@ class IntervalExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
"31 days 11 hours 59 minutes 59 seconds 999 milliseconds 999 microseconds"
test("days") {
checkEvaluation(ExtractIntervalDays("0 days"), 0L)
checkEvaluation(ExtractIntervalDays("1 days 100 seconds"), 1L)
checkEvaluation(ExtractIntervalDays("-1 days -100 seconds"), -1L)
checkEvaluation(ExtractIntervalDays("-365 days"), -365L)
checkEvaluation(ExtractIntervalDays("365 days"), 365L)
checkEvaluation(ExtractIntervalDays("0 days"), 0)
checkEvaluation(ExtractIntervalDays("1 days 100 seconds"), 1)
checkEvaluation(ExtractIntervalDays("-1 days -100 seconds"), -1)
checkEvaluation(ExtractIntervalDays("-365 days"), -365)
checkEvaluation(ExtractIntervalDays("365 days"), 365)
// Years and months must not be taken into account
checkEvaluation(ExtractIntervalDays("100 year 10 months 5 days"), 5L)
checkEvaluation(ExtractIntervalDays(largeInterval), 31L)
checkEvaluation(ExtractIntervalDays("100 year 10 months 5 days"), 5)
checkEvaluation(ExtractIntervalDays(largeInterval), 31)
}
test("hours") {
checkEvaluation(ExtractIntervalHours("0 hours"), 0.toByte)
checkEvaluation(ExtractIntervalHours("1 hour"), 1.toByte)
checkEvaluation(ExtractIntervalHours("-1 hour"), -1.toByte)
checkEvaluation(ExtractIntervalHours("23 hours"), 23.toByte)
checkEvaluation(ExtractIntervalHours("-23 hours"), -23.toByte)
// Years and months must not be taken into account
checkEvaluation(ExtractIntervalHours("100 year 10 months 10 hours"), 10.toByte)
checkEvaluation(ExtractIntervalHours(largeInterval), 11.toByte)
checkEvaluation(ExtractIntervalHours("0 hours"), 0L)
checkEvaluation(ExtractIntervalHours("1 hour"), 1L)
checkEvaluation(ExtractIntervalHours("-1 hour"), -1L)
checkEvaluation(ExtractIntervalHours("23 hours"), 23L)
checkEvaluation(ExtractIntervalHours("-23 hours"), -23L)
// Years, months and days must not be taken into account
checkEvaluation(ExtractIntervalHours("100 year 10 months 10 days 10 hours"), 10L)
// Minutes should be taken into account
checkEvaluation(ExtractIntervalHours("10 hours 100 minutes"), 11L)
checkEvaluation(ExtractIntervalHours(largeInterval), 11L)
}
test("minutes") {

View file

@ -75,7 +75,7 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(Literal.default(DateType), LocalDate.ofEpochDay(0))
checkEvaluation(Literal.default(TimestampType), Instant.ofEpochSecond(0))
}
checkEvaluation(Literal.default(CalendarIntervalType), new CalendarInterval(0, 0L))
checkEvaluation(Literal.default(CalendarIntervalType), new CalendarInterval(0, 0, 0L))
checkEvaluation(Literal.default(ArrayType(StringType)), Array())
checkEvaluation(Literal.default(MapType(IntegerType, StringType)), Map())
checkEvaluation(Literal.default(StructType(StructField("a", StringType) :: Nil)), Row(""))

View file

@ -135,10 +135,12 @@ object LiteralGenerator {
Instant.parse("0001-01-01T00:00:00.000000Z"),
Instant.parse("9999-12-31T23:59:59.999999Z")).getSeconds
val maxMicros = TimeUnit.SECONDS.toMicros(maxDurationInSec)
val maxDays = TimeUnit.SECONDS.toDays(maxDurationInSec).toInt
for {
months <- Gen.choose(-1 * maxIntervalInMonths, maxIntervalInMonths)
micros <- Gen.choose(-1 * maxMicros, maxMicros)
} yield Literal.create(new CalendarInterval(months, micros), CalendarIntervalType)
days <- Gen.choose(-1 * maxDays, maxDays)
} yield Literal.create(new CalendarInterval(months, days, micros), CalendarIntervalType)
}

View file

@ -1154,7 +1154,7 @@ class FilterPushdownSuite extends PlanTest {
}
test("watermark pushdown: no pushdown on watermark attribute #1") {
val interval = new CalendarInterval(2, 2000L)
val interval = new CalendarInterval(2, 2, 2000L)
// Verify that all conditions except the watermark touching condition are pushed down
// by the optimizer and others are not.
@ -1169,7 +1169,7 @@ class FilterPushdownSuite extends PlanTest {
}
test("watermark pushdown: no pushdown for nondeterministic filter") {
val interval = new CalendarInterval(2, 2000L)
val interval = new CalendarInterval(2, 2, 2000L)
// Verify that all conditions except the watermark touching condition are pushed down
// by the optimizer and others are not.
@ -1184,7 +1184,7 @@ class FilterPushdownSuite extends PlanTest {
}
test("watermark pushdown: full pushdown") {
val interval = new CalendarInterval(2, 2000L)
val interval = new CalendarInterval(2, 2, 2000L)
// Verify that all conditions except the watermark touching condition are pushed down
// by the optimizer and others are not.
@ -1198,7 +1198,7 @@ class FilterPushdownSuite extends PlanTest {
}
test("watermark pushdown: no pushdown on watermark attribute #2") {
val interval = new CalendarInterval(2, 2000L)
val interval = new CalendarInterval(2, 2, 2000L)
val originalQuery = EventTimeWatermark('a, interval, testRelation)
.where('a === 5 && 'b === 10)

View file

@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.unsafe.types.CalendarInterval
class LeftSemiPushdownSuite extends PlanTest {

View file

@ -631,11 +631,13 @@ class ExpressionParserSuite extends AnalysisTest {
checkIntervals(
"-13.123456789 second",
Literal(new CalendarInterval(
0,
0,
-13 * DateTimeUtils.MICROS_PER_SECOND - 123 * DateTimeUtils.MICROS_PER_MILLIS - 456)))
checkIntervals(
"13.123456 second",
Literal(new CalendarInterval(
0,
0,
13 * DateTimeUtils.MICROS_PER_SECOND + 123 * DateTimeUtils.MICROS_PER_MILLIS + 456)))
checkIntervals("1.001 second", Literal(IntervalUtils.fromString("1 second 1 millisecond")))
@ -682,12 +684,11 @@ class ExpressionParserSuite extends AnalysisTest {
// Composed intervals.
checkIntervals(
"3 months 22 seconds 1 millisecond",
Literal(new CalendarInterval(3, 22001000L)))
"3 months 4 days 22 seconds 1 millisecond",
Literal(new CalendarInterval(3, 4, 22001000L)))
checkIntervals(
"3 years '-1-10' year to month 3 weeks '1 0:0:2' day to second",
Literal(new CalendarInterval(14,
22 * CalendarInterval.MICROS_PER_DAY + 2 * CalendarInterval.MICROS_PER_SECOND)))
Literal(new CalendarInterval(14, 22, 2 * CalendarInterval.MICROS_PER_SECOND)))
}
test("SPARK-23264 Interval Compatibility tests") {

View file

@ -28,7 +28,7 @@ import org.scalatest.Matchers
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
class DateTimeUtilsSuite extends SparkFunSuite with Matchers {
@ -373,13 +373,39 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers {
test("timestamp add months") {
val ts1 = date(1997, 2, 28, 10, 30, 0)
val ts2 = date(2000, 2, 28, 10, 30, 0, 123000)
assert(timestampAddInterval(ts1, 36, 123000, defaultZoneId) === ts2)
assert(timestampAddInterval(ts1, 36, 0, 123000, defaultZoneId) === ts2)
val ts3 = date(1997, 2, 27, 16, 0, 0, 0, TimeZonePST)
val ts4 = date(2000, 2, 27, 16, 0, 0, 123000, TimeZonePST)
val ts5 = date(2000, 2, 28, 0, 0, 0, 123000, TimeZoneGMT)
assert(timestampAddInterval(ts3, 36, 123000, TimeZonePST.toZoneId) === ts4)
assert(timestampAddInterval(ts3, 36, 123000, TimeZoneGMT.toZoneId) === ts5)
assert(timestampAddInterval(ts3, 36, 0, 123000, TimeZonePST.toZoneId) === ts4)
assert(timestampAddInterval(ts3, 36, 0, 123000, TimeZoneGMT.toZoneId) === ts5)
}
test("timestamp add days") {
// 2019-3-9 is the end of Pacific Standard Time
val ts1 = date(2019, 3, 9, 12, 0, 0, 123000, TimeZonePST)
// 2019-3-10 is the start of Pacific Daylight Time
val ts2 = date(2019, 3, 10, 12, 0, 0, 123000, TimeZonePST)
val ts3 = date(2019, 5, 9, 12, 0, 0, 123000, TimeZonePST)
val ts4 = date(2019, 5, 10, 12, 0, 0, 123000, TimeZonePST)
// 2019-11-2 is the end of Pacific Daylight Time
val ts5 = date(2019, 11, 2, 12, 0, 0, 123000, TimeZonePST)
// 2019-11-3 is the start of Pacific Standard Time
val ts6 = date(2019, 11, 3, 12, 0, 0, 123000, TimeZonePST)
// transit from Pacific Standard Time to Pacific Daylight Time
assert(timestampAddInterval(
ts1, 0, 0, 23 * CalendarInterval.MICROS_PER_HOUR, TimeZonePST.toZoneId) === ts2)
assert(timestampAddInterval(ts1, 0, 1, 0, TimeZonePST.toZoneId) === ts2)
// just a normal day
assert(timestampAddInterval(
ts3, 0, 0, 24 * CalendarInterval.MICROS_PER_HOUR, TimeZonePST.toZoneId) === ts4)
assert(timestampAddInterval(ts3, 0, 1, 0, TimeZonePST.toZoneId) === ts4)
// transit from Pacific Daylight Time to Pacific Standard Time
assert(timestampAddInterval(
ts5, 0, 0, 25 * CalendarInterval.MICROS_PER_HOUR, TimeZonePST.toZoneId) === ts6)
assert(timestampAddInterval(ts5, 0, 1, 0, TimeZonePST.toZoneId) === ts6)
}
test("monthsBetween") {

View file

@ -27,15 +27,15 @@ import org.apache.spark.unsafe.types.CalendarInterval._
class IntervalUtilsSuite extends SparkFunSuite {
test("fromString: basic") {
testSingleUnit("YEAR", 3, 36, 0)
testSingleUnit("Month", 3, 3, 0)
testSingleUnit("Week", 3, 0, 3 * MICROS_PER_WEEK)
testSingleUnit("DAY", 3, 0, 3 * MICROS_PER_DAY)
testSingleUnit("HouR", 3, 0, 3 * MICROS_PER_HOUR)
testSingleUnit("MiNuTe", 3, 0, 3 * MICROS_PER_MINUTE)
testSingleUnit("Second", 3, 0, 3 * MICROS_PER_SECOND)
testSingleUnit("MilliSecond", 3, 0, 3 * MICROS_PER_MILLI)
testSingleUnit("MicroSecond", 3, 0, 3)
testSingleUnit("YEAR", 3, 36, 0, 0)
testSingleUnit("Month", 3, 3, 0, 0)
testSingleUnit("Week", 3, 0, 21, 0)
testSingleUnit("DAY", 3, 0, 3, 0)
testSingleUnit("HouR", 3, 0, 0, 3 * MICROS_PER_HOUR)
testSingleUnit("MiNuTe", 3, 0, 0, 3 * MICROS_PER_MINUTE)
testSingleUnit("Second", 3, 0, 0, 3 * MICROS_PER_SECOND)
testSingleUnit("MilliSecond", 3, 0, 0, 3 * MICROS_PER_MILLI)
testSingleUnit("MicroSecond", 3, 0, 0, 3)
for (input <- Seq(null, "", " ")) {
try {
@ -64,36 +64,37 @@ class IntervalUtilsSuite extends SparkFunSuite {
test("fromString: random order field") {
val input = "1 day 1 year"
val result = new CalendarInterval(12, MICROS_PER_DAY)
val result = new CalendarInterval(12, 1, 0)
assert(fromString(input) == result)
}
test("fromString: duplicated fields") {
val input = "1 day 1 day"
val result = new CalendarInterval(0, 2 * MICROS_PER_DAY)
val result = new CalendarInterval(0, 2, 0)
assert(fromString(input) == result)
}
test("fromString: value with +/-") {
val input = "+1 year -1 day"
val result = new CalendarInterval(12, -MICROS_PER_DAY)
val result = new CalendarInterval(12, -1, 0)
assert(fromString(input) == result)
}
private def testSingleUnit(unit: String, number: Int, months: Int, microseconds: Long): Unit = {
private def testSingleUnit(
unit: String, number: Int, months: Int, days: Int, microseconds: Long): Unit = {
for (prefix <- Seq("interval ", "")) {
val input1 = prefix + number + " " + unit
val input2 = prefix + number + " " + unit + "s"
val result = new CalendarInterval(months, microseconds)
val result = new CalendarInterval(months, days, microseconds)
assert(fromString(input1) == result)
assert(fromString(input2) == result)
}
}
test("from year-month string") {
assert(fromYearMonthString("99-10") === new CalendarInterval(99 * 12 + 10, 0L))
assert(fromYearMonthString("+99-10") === new CalendarInterval(99 * 12 + 10, 0L))
assert(fromYearMonthString("-8-10") === new CalendarInterval(-8 * 12 - 10, 0L))
assert(fromYearMonthString("99-10") === new CalendarInterval(99 * 12 + 10, 0, 0L))
assert(fromYearMonthString("+99-10") === new CalendarInterval(99 * 12 + 10, 0, 0L))
assert(fromYearMonthString("-8-10") === new CalendarInterval(-8 * 12 - 10, 0, 0L))
try {
fromYearMonthString("99-15")
@ -116,15 +117,16 @@ class IntervalUtilsSuite extends SparkFunSuite {
assert(fromDayTimeString("5 12:40:30.999999999") ===
new CalendarInterval(
0,
5 * MICROS_PER_DAY +
5,
12 * MICROS_PER_HOUR +
40 * MICROS_PER_MINUTE +
30 * MICROS_PER_SECOND + 999999L))
assert(fromDayTimeString("10 0:12:0.888") ===
new CalendarInterval(
0,
10 * MICROS_PER_DAY + 12 * MICROS_PER_MINUTE + 888 * MICROS_PER_MILLI))
assert(fromDayTimeString("-3 0:0:0") === new CalendarInterval(0, -3 * MICROS_PER_DAY))
10,
12 * MICROS_PER_MINUTE + 888 * MICROS_PER_MILLI))
assert(fromDayTimeString("-3 0:0:0") === new CalendarInterval(0, -3, 0L))
try {
fromDayTimeString("5 30:12:20")

View file

@ -53,7 +53,8 @@ class UnsafeArraySuite extends SparkFunSuite {
BigDecimal("1.2345678901234567890123456").setScale(21, BigDecimal.RoundingMode.FLOOR),
BigDecimal("2.3456789012345678901234567").setScale(21, BigDecimal.RoundingMode.FLOOR))
val calenderintervalArray = Array(new CalendarInterval(3, 321), new CalendarInterval(1, 123))
val calenderintervalArray = Array(
new CalendarInterval(3, 2, 321), new CalendarInterval(1, 2, 123))
val intMultiDimArray = Array(Array(1), Array(2, 20), Array(3, 30, 300))
val doubleMultiDimArray = Array(

View file

@ -165,7 +165,8 @@ public class ColumnVectorUtils {
CalendarInterval c = (CalendarInterval)o;
dst.appendStruct(false);
dst.getChild(0).appendInt(c.months);
dst.getChild(1).appendLong(c.microseconds);
dst.getChild(1).appendInt(c.days);
dst.getChild(2).appendLong(c.microseconds);
} else if (t instanceof DateType) {
dst.appendInt(DateTimeUtils.fromJavaDate((Date)o));
} else {

View file

@ -736,10 +736,11 @@ public abstract class WritableColumnVector extends ColumnVector {
this.childColumns[0] = reserveNewColumn(capacity, mapType.keyType());
this.childColumns[1] = reserveNewColumn(capacity, mapType.valueType());
} else if (type instanceof CalendarIntervalType) {
// Two columns. Months as int. Microseconds as Long.
this.childColumns = new WritableColumnVector[2];
// Three columns. Months as int. Days as Int. Microseconds as Long.
this.childColumns = new WritableColumnVector[3];
this.childColumns[0] = reserveNewColumn(capacity, DataTypes.IntegerType);
this.childColumns[1] = reserveNewColumn(capacity, DataTypes.LongType);
this.childColumns[1] = reserveNewColumn(capacity, DataTypes.IntegerType);
this.childColumns[2] = reserveNewColumn(capacity, DataTypes.LongType);
} else {
this.childColumns = null;
}

View file

@ -323,7 +323,8 @@ private object RowToColumnConverter {
val c = row.getInterval(column)
cv.appendStruct(false)
cv.getChild(0).appendInt(c.months)
cv.getChild(1).appendLong(c.microseconds)
cv.getChild(1).appendInt(c.days)
cv.getChild(2).appendLong(c.microseconds)
}
}

View file

@ -23,6 +23,7 @@ import scala.concurrent.duration.Duration
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.unsafe.types.CalendarInterval
private object Triggers {
def validate(intervalMs: Long): Unit = {
@ -34,7 +35,7 @@ private object Triggers {
if (cal.months != 0) {
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
}
TimeUnit.MICROSECONDS.toMillis(cal.microseconds)
TimeUnit.MICROSECONDS.toMillis(cal.microseconds + cal.days * CalendarInterval.MICROS_PER_DAY)
}
def convert(interval: Duration): Long = interval.toMillis

View file

@ -238,7 +238,7 @@ from interval_arithmetic
-- !query 17 schema
struct<tsval:timestamp,CAST(tsval - interval 14 weeks 1 days 11 hours 22 minutes 33 seconds 123 milliseconds 456 microseconds AS TIMESTAMP):timestamp,CAST(tsval - interval -14 weeks -1 days -11 hours -22 minutes -33 seconds -123 milliseconds -456 microseconds AS TIMESTAMP):timestamp,CAST(tsval + interval 14 weeks 1 days 11 hours 22 minutes 33 seconds 123 milliseconds 456 microseconds AS TIMESTAMP):timestamp,CAST(tsval + interval -14 weeks -1 days -11 hours -22 minutes -33 seconds -123 milliseconds -456 microseconds AS TIMESTAMP):timestamp,CAST(tsval + (- interval 14 weeks 1 days 11 hours 22 minutes 33 seconds 123 milliseconds 456 microseconds) AS TIMESTAMP):timestamp,CAST(tsval + interval 14 weeks 1 days 11 hours 22 minutes 33 seconds 123 milliseconds 456 microseconds AS TIMESTAMP):timestamp>
-- !query 17 output
2012-01-01 00:00:00 2011-09-23 13:37:26.876544 2012-04-09 12:22:33.123456 2012-04-09 12:22:33.123456 2011-09-23 13:37:26.876544 2011-09-23 13:37:26.876544 2012-04-09 12:22:33.123456
2012-01-01 00:00:00 2011-09-23 12:37:26.876544 2012-04-09 11:22:33.123456 2012-04-09 11:22:33.123456 2011-09-23 12:37:26.876544 2011-09-23 12:37:26.876544 2012-04-09 11:22:33.123456
-- !query 18
@ -254,7 +254,7 @@ from interval_arithmetic
-- !query 18 schema
struct<tsval:timestamp,CAST(tsval - interval 14 weeks 1 days 11 hours 22 minutes 33 seconds 123 milliseconds 456 microseconds AS TIMESTAMP):timestamp,CAST(tsval - interval -14 weeks -1 days -11 hours -22 minutes -33 seconds -123 milliseconds -456 microseconds AS TIMESTAMP):timestamp,CAST(tsval + interval 14 weeks 1 days 11 hours 22 minutes 33 seconds 123 milliseconds 456 microseconds AS TIMESTAMP):timestamp,CAST(tsval + interval -14 weeks -1 days -11 hours -22 minutes -33 seconds -123 milliseconds -456 microseconds AS TIMESTAMP):timestamp,CAST(tsval + (- interval 14 weeks 1 days 11 hours 22 minutes 33 seconds 123 milliseconds 456 microseconds) AS TIMESTAMP):timestamp,CAST(tsval + interval 14 weeks 1 days 11 hours 22 minutes 33 seconds 123 milliseconds 456 microseconds AS TIMESTAMP):timestamp>
-- !query 18 output
2012-01-01 00:00:00 2011-09-23 13:37:26.876544 2012-04-09 12:22:33.123456 2012-04-09 12:22:33.123456 2011-09-23 13:37:26.876544 2011-09-23 13:37:26.876544 2012-04-09 12:22:33.123456
2012-01-01 00:00:00 2011-09-23 12:37:26.876544 2012-04-09 11:22:33.123456 2012-04-09 11:22:33.123456 2011-09-23 12:37:26.876544 2011-09-23 12:37:26.876544 2012-04-09 11:22:33.123456
-- !query 19

View file

@ -615,7 +615,7 @@ struct<date_part('months', t2.`c`):tinyint>
-- !query 76
select date_part('day', c) from t2
-- !query 76 schema
struct<date_part('day', t2.`c`):bigint>
struct<date_part('day', t2.`c`):int>
-- !query 76 output
8
@ -623,7 +623,7 @@ struct<date_part('day', t2.`c`):bigint>
-- !query 77
select date_part('d', c) from t2
-- !query 77 schema
struct<date_part('d', t2.`c`):bigint>
struct<date_part('d', t2.`c`):int>
-- !query 77 output
8
@ -631,7 +631,7 @@ struct<date_part('d', t2.`c`):bigint>
-- !query 78
select date_part('days', c) from t2
-- !query 78 schema
struct<date_part('days', t2.`c`):bigint>
struct<date_part('days', t2.`c`):int>
-- !query 78 output
8
@ -639,7 +639,7 @@ struct<date_part('days', t2.`c`):bigint>
-- !query 79
select date_part('hour', c) from t2
-- !query 79 schema
struct<date_part('hour', t2.`c`):tinyint>
struct<date_part('hour', t2.`c`):bigint>
-- !query 79 output
7
@ -647,7 +647,7 @@ struct<date_part('hour', t2.`c`):tinyint>
-- !query 80
select date_part('h', c) from t2
-- !query 80 schema
struct<date_part('h', t2.`c`):tinyint>
struct<date_part('h', t2.`c`):bigint>
-- !query 80 output
7
@ -655,7 +655,7 @@ struct<date_part('h', t2.`c`):tinyint>
-- !query 81
select date_part('hours', c) from t2
-- !query 81 schema
struct<date_part('hours', t2.`c`):tinyint>
struct<date_part('hours', t2.`c`):bigint>
-- !query 81 output
7
@ -663,7 +663,7 @@ struct<date_part('hours', t2.`c`):tinyint>
-- !query 82
select date_part('hr', c) from t2
-- !query 82 schema
struct<date_part('hr', t2.`c`):tinyint>
struct<date_part('hr', t2.`c`):bigint>
-- !query 82 output
7
@ -671,7 +671,7 @@ struct<date_part('hr', t2.`c`):tinyint>
-- !query 83
select date_part('hrs', c) from t2
-- !query 83 schema
struct<date_part('hrs', t2.`c`):tinyint>
struct<date_part('hrs', t2.`c`):bigint>
-- !query 83 output
7

View file

@ -136,7 +136,7 @@ select date'2020-01-01' - timestamp'2019-10-06 10:11:12.345678'
-- !query 15 schema
struct<subtracttimestamps(CAST(DATE '2020-01-01' AS TIMESTAMP), TIMESTAMP('2019-10-06 10:11:12.345678')):interval>
-- !query 15 output
interval 12 weeks 2 days 14 hours 48 minutes 47 seconds 654 milliseconds 322 microseconds
interval 2078 hours 48 minutes 47 seconds 654 milliseconds 322 microseconds
-- !query 16
@ -144,4 +144,4 @@ select timestamp'2019-10-06 10:11:12.345678' - date'2020-01-01'
-- !query 16 schema
struct<subtracttimestamps(TIMESTAMP('2019-10-06 10:11:12.345678'), CAST(DATE '2020-01-01' AS TIMESTAMP)):interval>
-- !query 16 output
interval -12 weeks -2 days -14 hours -48 minutes -47 seconds -654 milliseconds -322 microseconds
interval -2078 hours -48 minutes -47 seconds -654 milliseconds -322 microseconds

View file

@ -339,9 +339,9 @@ interval 1 years 2 months 3 weeks 4 days 5 hours 6 minutes 7 seconds 8 milliseco
-- !query 36
select interval '30' year '25' month '-100' day '40' hour '80' minute '299.889987299' second
-- !query 36 schema
struct<interval 32 years 1 months -14 weeks -6 hours -35 minutes -110 milliseconds -13 microseconds:interval>
struct<interval 32 years 1 months -14 weeks -2 days 41 hours 24 minutes 59 seconds 889 milliseconds 987 microseconds:interval>
-- !query 36 output
interval 32 years 1 months -14 weeks -6 hours -35 minutes -110 milliseconds -13 microseconds
interval 32 years 1 months -14 weeks -2 days 41 hours 24 minutes 59 seconds 889 milliseconds 987 microseconds
-- !query 37

View file

@ -21,9 +21,9 @@ interval 16 hours 39 minutes
-- !query 2
SELECT interval '999' hour
-- !query 2 schema
struct<interval 5 weeks 6 days 15 hours:interval>
struct<interval 999 hours:interval>
-- !query 2 output
interval 5 weeks 6 days 15 hours
interval 999 hours
-- !query 3

View file

@ -219,11 +219,11 @@ SELECT '' AS `54`, d1 - timestamp '1997-01-02' AS diff
-- !query 24 schema
struct<54:string,diff:interval>
-- !query 24 output
interval -1409 weeks -8 hours
interval -236720 hours
interval 0 microseconds
interval 246 weeks 2 days 17 hours 19 minutes 20 seconds
interval 3 hours 4 minutes 5 seconds
interval 5 weeks 4 days 17 hours 32 minutes 1 seconds
interval 41393 hours 19 minutes 20 seconds
interval 953 hours 32 minutes 1 seconds
-- !query 25
@ -242,11 +242,11 @@ SELECT '' AS `54`, d1 - timestamp '1997-01-02' AS diff
-- !query 26 schema
struct<54:string,diff:interval>
-- !query 26 output
interval -1409 weeks -8 hours
interval -236720 hours
interval 0 microseconds
interval 246 weeks 2 days 17 hours 19 minutes 20 seconds
interval 3 hours 4 minutes 5 seconds
interval 5 weeks 4 days 17 hours 32 minutes 1 seconds
interval 41393 hours 19 minutes 20 seconds
interval 953 hours 32 minutes 1 seconds
-- !query 27

View file

@ -290,15 +290,15 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
val t2 = Timestamp.valueOf("2015-12-31 00:00:00")
val d1 = Date.valueOf("2015-07-31")
val d2 = Date.valueOf("2015-12-31")
val i = new CalendarInterval(2, 2000000L)
val i = new CalendarInterval(2, 2, 2000000L)
val df = Seq((1, t1, d1), (3, t2, d2)).toDF("n", "t", "d")
checkAnswer(
df.selectExpr(s"d + $i"),
Seq(Row(Date.valueOf("2015-09-30")), Row(Date.valueOf("2016-02-29"))))
Seq(Row(Date.valueOf("2015-10-02")), Row(Date.valueOf("2016-03-02"))))
checkAnswer(
df.selectExpr(s"t + $i"),
Seq(Row(Timestamp.valueOf("2015-10-01 00:00:01")),
Row(Timestamp.valueOf("2016-02-29 00:00:02"))))
Seq(Row(Timestamp.valueOf("2015-10-03 00:00:01")),
Row(Timestamp.valueOf("2016-03-02 00:00:02"))))
}
test("time_sub") {
@ -306,15 +306,15 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
val t2 = Timestamp.valueOf("2016-02-29 00:00:02")
val d1 = Date.valueOf("2015-09-30")
val d2 = Date.valueOf("2016-02-29")
val i = new CalendarInterval(2, 2000000L)
val i = new CalendarInterval(2, 2, 2000000L)
val df = Seq((1, t1, d1), (3, t2, d2)).toDF("n", "t", "d")
checkAnswer(
df.selectExpr(s"d - $i"),
Seq(Row(Date.valueOf("2015-07-29")), Row(Date.valueOf("2015-12-28"))))
Seq(Row(Date.valueOf("2015-07-27")), Row(Date.valueOf("2015-12-26"))))
checkAnswer(
df.selectExpr(s"t - $i"),
Seq(Row(Timestamp.valueOf("2015-07-31 23:59:59")),
Row(Timestamp.valueOf("2015-12-29 00:00:00"))))
Seq(Row(Timestamp.valueOf("2015-07-29 23:59:59")),
Row(Timestamp.valueOf("2015-12-27 00:00:00"))))
}
test("function add_months") {

View file

@ -1556,7 +1556,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
import org.apache.spark.unsafe.types.CalendarInterval
val df = sql("select interval 3 years -3 month 7 week 123 microseconds")
checkAnswer(df, Row(new CalendarInterval(12 * 3 - 3, 7L * 1000 * 1000 * 3600 * 24 * 7 + 123 )))
checkAnswer(df, Row(new CalendarInterval(12 * 3 - 3, 7 * 7, 123 )))
withTempPath(f => {
// Currently we don't yet support saving out values of interval data type.
val e = intercept[AnalysisException] {
@ -1582,17 +1582,17 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
import org.apache.spark.unsafe.types.CalendarInterval.MICROS_PER_WEEK
val df = sql("select interval 3 years -3 month 7 week 123 microseconds as i")
checkAnswer(df, Row(new CalendarInterval(12 * 3 - 3, 7L * MICROS_PER_WEEK + 123)))
checkAnswer(df, Row(new CalendarInterval(12 * 3 - 3, 7 * 7, 123)))
checkAnswer(df.select(df("i") + new CalendarInterval(2, 123)),
Row(new CalendarInterval(12 * 3 - 3 + 2, 7L * MICROS_PER_WEEK + 123 + 123)))
checkAnswer(df.select(df("i") + new CalendarInterval(2, 1, 123)),
Row(new CalendarInterval(12 * 3 - 3 + 2, 7 * 7 + 1, 123 + 123)))
checkAnswer(df.select(df("i") - new CalendarInterval(2, 123)),
Row(new CalendarInterval(12 * 3 - 3 - 2, 7L * MICROS_PER_WEEK + 123 - 123)))
checkAnswer(df.select(df("i") - new CalendarInterval(2, 1, 123)),
Row(new CalendarInterval(12 * 3 - 3 - 2, 7 * 7 - 1, 123 - 123)))
// unary minus
checkAnswer(df.select(-df("i")),
Row(new CalendarInterval(-(12 * 3 - 3), -(7L * MICROS_PER_WEEK + 123))))
Row(new CalendarInterval(-(12 * 3 - 3), -7 * 7, -123)))
}
test("aggregation with codegen updates peak execution memory") {

View file

@ -636,30 +636,40 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(column.arrayData().elementsAppended == 0)
}
testVector("CalendarInterval APIs", 4, CalendarIntervalType) {
testVector("CalendarInterval APIs", 5, CalendarIntervalType) {
column =>
val reference = mutable.ArrayBuffer.empty[CalendarInterval]
val months = column.getChild(0)
val microseconds = column.getChild(1)
val days = column.getChild(1)
val microseconds = column.getChild(2)
assert(months.dataType() == IntegerType)
assert(days.dataType() == IntegerType)
assert(microseconds.dataType() == LongType)
months.putInt(0, 1)
days.putInt(0, 10)
microseconds.putLong(0, 100)
reference += new CalendarInterval(1, 100)
reference += new CalendarInterval(1, 10, 100)
months.putInt(1, 0)
days.putInt(1, 0)
microseconds.putLong(1, 2000)
reference += new CalendarInterval(0, 2000)
reference += new CalendarInterval(0, 0, 2000)
column.putNull(2)
assert(column.getInterval(2) == null)
reference += null
months.putInt(3, 20)
days.putInt(3, 0)
microseconds.putLong(3, 0)
reference += new CalendarInterval(20, 0)
reference += new CalendarInterval(20, 0, 0)
months.putInt(4, 0)
days.putInt(4, 200)
microseconds.putLong(4, 0)
reference += new CalendarInterval(0, 200, 0)
reference.zipWithIndex.foreach { case (v, i) =>
val errMsg = "VectorType=" + column.getClass.getSimpleName
@ -1311,7 +1321,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
Decimal("1234.23456"),
DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")),
new CalendarInterval(1, 0),
new CalendarInterval(1, 0, 0),
new GenericArrayData(Array(1, 2, 3, 4, null)),
new GenericInternalRow(Array[Any](5.asInstanceOf[Any], 10)),
mapBuilder.build()
@ -1332,7 +1342,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
Decimal("0.01000"),
DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("1875-12-12")),
DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("1880-01-05 12:45:21.321")),
new CalendarInterval(-10, -100),
new CalendarInterval(-10, -50, -100),
new GenericArrayData(Array(5, 10, -100)),
new GenericInternalRow(Array[Any](20.asInstanceOf[Any], null)),
mapBuilder.build()
@ -1424,8 +1434,8 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(columns(10).isNullAt(2))
assert(columns(11).dataType() == CalendarIntervalType)
assert(columns(11).getInterval(0) == new CalendarInterval(1, 0))
assert(columns(11).getInterval(1) == new CalendarInterval(-10, -100))
assert(columns(11).getInterval(0) == new CalendarInterval(1, 0, 0))
assert(columns(11).getInterval(1) == new CalendarInterval(-10, -50, -100))
assert(columns(11).isNullAt(2))
assert(columns(12).dataType() == ArrayType(IntegerType))