[SPARK-14353] Dataset Time Window window API for Python, and SQL

## What changes were proposed in this pull request?

The `window` function was added to Dataset with [this PR](https://github.com/apache/spark/pull/12008).
This PR adds the Python, and SQL, API for this function.

With this PR, SQL, Java, and Scala will share the same APIs as in users can use:
 - `window(timeColumn, windowDuration)`
 - `window(timeColumn, windowDuration, slideDuration)`
 - `window(timeColumn, windowDuration, slideDuration, startTime)`

In Python, users can access all APIs above, but in addition they can do
 - In Python:
   `window(timeColumn, windowDuration, startTime=...)`

that is, they can provide the startTime without providing the `slideDuration`. In this case, we will generate tumbling windows.

## How was this patch tested?

Unit tests + manual tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #12136 from brkyvz/python-windows.
This commit is contained in:
Burak Yavuz 2016-04-05 13:18:39 -07:00 committed by Davies Liu
parent 72544d6f2a
commit 9ee5c25717
7 changed files with 204 additions and 15 deletions

View file

@ -1053,6 +1053,55 @@ def to_utc_timestamp(timestamp, tz):
return Column(sc._jvm.functions.to_utc_timestamp(_to_java_column(timestamp), tz)) return Column(sc._jvm.functions.to_utc_timestamp(_to_java_column(timestamp), tz))
@since(2.0)
@ignore_unicode_prefix
def window(timeColumn, windowDuration, slideDuration=None, startTime=None):
"""Bucketize rows into one or more time windows given a timestamp specifying column. Window
starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window
[12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in
the order of months are not supported.
The time column must be of TimestampType.
Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid
interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'.
If the `slideDuration` is not provided, the windows will be tumbling windows.
The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start
window intervals. For example, in order to have hourly tumbling windows that start 15 minutes
past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`.
The output column will be a struct called 'window' by default with the nested columns 'start'
and 'end', where 'start' and 'end' will be of `TimestampType`.
>>> df = sqlContext.createDataFrame([("2016-03-11 09:00:07", 1)]).toDF("date", "val")
>>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))
>>> w.select(w.window.start.cast("string").alias("start"),
... w.window.end.cast("string").alias("end"), "sum").collect()
[Row(start=u'2016-03-11 09:00:05', end=u'2016-03-11 09:00:10', sum=1)]
"""
def check_string_field(field, fieldName):
if not field or type(field) is not str:
raise TypeError("%s should be provided as a string" % fieldName)
sc = SparkContext._active_spark_context
time_col = _to_java_column(timeColumn)
check_string_field(windowDuration, "windowDuration")
if slideDuration and startTime:
check_string_field(slideDuration, "slideDuration")
check_string_field(startTime, "startTime")
res = sc._jvm.functions.window(time_col, windowDuration, slideDuration, startTime)
elif slideDuration:
check_string_field(slideDuration, "slideDuration")
res = sc._jvm.functions.window(time_col, windowDuration, slideDuration)
elif startTime:
check_string_field(startTime, "startTime")
res = sc._jvm.functions.window(time_col, windowDuration, windowDuration, startTime)
else:
res = sc._jvm.functions.window(time_col, windowDuration)
return Column(res)
# ---------------------------- misc functions ---------------------------------- # ---------------------------- misc functions ----------------------------------
@since(1.5) @since(1.5)

View file

@ -364,7 +364,10 @@ object FunctionRegistry {
} }
Try(f.newInstance(expressions : _*).asInstanceOf[Expression]) match { Try(f.newInstance(expressions : _*).asInstanceOf[Expression]) match {
case Success(e) => e case Success(e) => e
case Failure(e) => throw new AnalysisException(e.getMessage) case Failure(e) =>
// the exception is an invocation exception. To get a meaningful message, we need the
// cause.
throw new AnalysisException(e.getCause.getMessage)
} }
} }
} }

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.commons.lang.StringUtils import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
@ -34,6 +35,28 @@ case class TimeWindow(
with Unevaluable with Unevaluable
with NonSQLExpression { with NonSQLExpression {
//////////////////////////
// SQL Constructors
//////////////////////////
def this(
timeColumn: Expression,
windowDuration: Expression,
slideDuration: Expression,
startTime: Expression) = {
this(timeColumn, TimeWindow.parseExpression(windowDuration),
TimeWindow.parseExpression(windowDuration), TimeWindow.parseExpression(startTime))
}
def this(timeColumn: Expression, windowDuration: Expression, slideDuration: Expression) = {
this(timeColumn, TimeWindow.parseExpression(windowDuration),
TimeWindow.parseExpression(windowDuration), 0)
}
def this(timeColumn: Expression, windowDuration: Expression) = {
this(timeColumn, windowDuration, windowDuration)
}
override def child: Expression = timeColumn override def child: Expression = timeColumn
override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType) override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)
override def dataType: DataType = new StructType() override def dataType: DataType = new StructType()
@ -104,6 +127,18 @@ object TimeWindow {
cal.microseconds cal.microseconds
} }
/**
* Parses the duration expression to generate the long value for the original constructor so
* that we can use `window` in SQL.
*/
private def parseExpression(expr: Expression): Long = expr match {
case NonNullLiteral(s, StringType) => getIntervalInMicroSeconds(s.toString)
case IntegerLiteral(i) => i.toLong
case NonNullLiteral(l, LongType) => l.toString.toLong
case _ => throw new AnalysisException("The duration and time inputs to window must be " +
"an integer, long or string literal.")
}
def apply( def apply(
timeColumn: Expression, timeColumn: Expression,
windowDuration: String, windowDuration: String,

View file

@ -22,6 +22,7 @@ import java.util.UUID
import scala.collection.Map import scala.collection.Map
import scala.collection.mutable.Stack import scala.collection.mutable.Stack
import org.apache.commons.lang.ClassUtils
import org.json4s.JsonAST._ import org.json4s.JsonAST._
import org.json4s.JsonDSL._ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.JsonMethods._
@ -365,20 +366,32 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
* @param newArgs the new product arguments. * @param newArgs the new product arguments.
*/ */
def makeCopy(newArgs: Array[AnyRef]): BaseType = attachTree(this, "makeCopy") { def makeCopy(newArgs: Array[AnyRef]): BaseType = attachTree(this, "makeCopy") {
// Skip no-arg constructors that are just there for kryo.
val ctors = getClass.getConstructors.filter(_.getParameterTypes.size != 0) val ctors = getClass.getConstructors.filter(_.getParameterTypes.size != 0)
if (ctors.isEmpty) { if (ctors.isEmpty) {
sys.error(s"No valid constructor for $nodeName") sys.error(s"No valid constructor for $nodeName")
} }
val defaultCtor = ctors.maxBy(_.getParameterTypes.size) val allArgs: Array[AnyRef] = if (otherCopyArgs.isEmpty) {
newArgs
} else {
newArgs ++ otherCopyArgs
}
val defaultCtor = ctors.find { ctor =>
if (ctor.getParameterTypes.length != allArgs.length) {
false
} else if (allArgs.contains(null)) {
// if there is a `null`, we can't figure out the class, therefore we should just fallback
// to older heuristic
false
} else {
val argsArray: Array[Class[_]] = allArgs.map(_.getClass)
ClassUtils.isAssignable(argsArray, ctor.getParameterTypes, true /* autoboxing */)
}
}.getOrElse(ctors.maxBy(_.getParameterTypes.length)) // fall back to older heuristic
try { try {
CurrentOrigin.withOrigin(origin) { CurrentOrigin.withOrigin(origin) {
// Skip no-arg constructors that are just there for kryo. defaultCtor.newInstance(allArgs.toArray: _*).asInstanceOf[BaseType]
if (otherCopyArgs.isEmpty) {
defaultCtor.newInstance(newArgs: _*).asInstanceOf[BaseType]
} else {
defaultCtor.newInstance((newArgs ++ otherCopyArgs).toArray: _*).asInstanceOf[BaseType]
}
} }
} catch { } catch {
case e: java.lang.IllegalArgumentException => case e: java.lang.IllegalArgumentException =>

View file

@ -17,10 +17,13 @@
package org.apache.spark.sql.catalyst.expressions package org.apache.spark.sql.catalyst.expressions
import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkFunSuite import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.types.LongType
class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper { class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper with PrivateMethodTester {
test("time window is unevaluable") { test("time window is unevaluable") {
intercept[UnsupportedOperationException] { intercept[UnsupportedOperationException] {
@ -73,4 +76,36 @@ class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper {
=== seconds) === seconds)
} }
} }
private val parseExpression = PrivateMethod[Long]('parseExpression)
test("parse sql expression for duration in microseconds - string") {
val dur = TimeWindow.invokePrivate(parseExpression(Literal("5 seconds")))
assert(dur.isInstanceOf[Long])
assert(dur === 5000000)
}
test("parse sql expression for duration in microseconds - integer") {
val dur = TimeWindow.invokePrivate(parseExpression(Literal(100)))
assert(dur.isInstanceOf[Long])
assert(dur === 100)
}
test("parse sql expression for duration in microseconds - long") {
val dur = TimeWindow.invokePrivate(parseExpression(Literal.create(2 << 52, LongType)))
assert(dur.isInstanceOf[Long])
assert(dur === (2 << 52))
}
test("parse sql expression for duration in microseconds - invalid interval") {
intercept[IllegalArgumentException] {
TimeWindow.invokePrivate(parseExpression(Literal("2 apples")))
}
}
test("parse sql expression for duration in microseconds - invalid expression") {
intercept[AnalysisException] {
TimeWindow.invokePrivate(parseExpression(Rand(123)))
}
}
} }

View file

@ -2574,8 +2574,7 @@ object functions {
* processing time. * processing time.
* *
* @param timeColumn The column or the expression to use as the timestamp for windowing by time. * @param timeColumn The column or the expression to use as the timestamp for windowing by time.
* The time can be as TimestampType or LongType, however when using LongType, * The time column must be of TimestampType.
* the time must be given in seconds.
* @param windowDuration A string specifying the width of the window, e.g. `10 minutes`, * @param windowDuration A string specifying the width of the window, e.g. `10 minutes`,
* `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for * `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for
* valid duration identifiers. * valid duration identifiers.
@ -2629,8 +2628,7 @@ object functions {
* processing time. * processing time.
* *
* @param timeColumn The column or the expression to use as the timestamp for windowing by time. * @param timeColumn The column or the expression to use as the timestamp for windowing by time.
* The time can be as TimestampType or LongType, however when using LongType, * The time column must be of TimestampType.
* the time must be given in seconds.
* @param windowDuration A string specifying the width of the window, e.g. `10 minutes`, * @param windowDuration A string specifying the width of the window, e.g. `10 minutes`,
* `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for * `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for
* valid duration identifiers. * valid duration identifiers.
@ -2672,8 +2670,7 @@ object functions {
* processing time. * processing time.
* *
* @param timeColumn The column or the expression to use as the timestamp for windowing by time. * @param timeColumn The column or the expression to use as the timestamp for windowing by time.
* The time can be as TimestampType or LongType, however when using LongType, * The time column must be of TimestampType.
* the time must be given in seconds.
* @param windowDuration A string specifying the width of the window, e.g. `10 minutes`, * @param windowDuration A string specifying the width of the window, e.g. `10 minutes`,
* `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for * `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for
* valid duration identifiers. * valid duration identifiers.

View file

@ -239,4 +239,61 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
Row("2016-03-27 09:00:00.68", "2016-03-27 09:00:00.88", 1)) Row("2016-03-27 09:00:00.68", "2016-03-27 09:00:00.88", 1))
) )
} }
private def withTempTable(f: String => Unit): Unit = {
val tableName = "temp"
Seq(
("2016-03-27 19:39:34", 1),
("2016-03-27 19:39:56", 2),
("2016-03-27 19:39:27", 4)).toDF("time", "value").registerTempTable(tableName)
try {
f(tableName)
} finally {
sqlContext.dropTempTable(tableName)
}
}
test("time window in SQL with single string expression") {
withTempTable { table =>
checkAnswer(
sqlContext.sql(s"""select window(time, "10 seconds"), value from $table""")
.select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
Seq(
Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", 4),
Row("2016-03-27 19:39:30", "2016-03-27 19:39:40", 1),
Row("2016-03-27 19:39:50", "2016-03-27 19:40:00", 2)
)
)
}
}
test("time window in SQL with with two expressions") {
withTempTable { table =>
checkAnswer(
sqlContext.sql(
s"""select window(time, "10 seconds", 10000000), value from $table""")
.select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
Seq(
Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", 4),
Row("2016-03-27 19:39:30", "2016-03-27 19:39:40", 1),
Row("2016-03-27 19:39:50", "2016-03-27 19:40:00", 2)
)
)
}
}
test("time window in SQL with with three expressions") {
withTempTable { table =>
checkAnswer(
sqlContext.sql(
s"""select window(time, "10 seconds", 10000000, "5 seconds"), value from $table""")
.select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
Seq(
Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 1),
Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 4),
Row("2016-03-27 19:39:55", "2016-03-27 19:40:05", 2)
)
)
}
}
} }