[SPARK-9978] [PYSPARK] [SQL] fix Window.orderBy and doc of ntile()

Author: Davies Liu <davies@databricks.com>

Closes #8213 from davies/fix_window.
This commit is contained in:
Davies Liu 2015-08-14 13:55:29 -07:00 committed by Reynold Xin
parent 9407baa2a7
commit 11ed2b180e
3 changed files with 28 additions and 4 deletions

View file

@ -530,9 +530,10 @@ def lead(col, count=1, default=None):
@since(1.4)
def ntile(n):
"""
Window function: returns a group id from 1 to `n` (inclusive) in a round-robin fashion in
a window partition. Fow example, if `n` is 3, the first row will get 1, the second row will
get 2, the third row will get 3, and the fourth row will get 1...
Window function: returns the ntile group id (from 1 to `n` inclusive)
in an ordered window partition. Fow example, if `n` is 4, the first
quarter of the rows will get value 1, the second quarter will get 2,
the third quarter will get 3, and the last quarter will get 4.
This is equivalent to the NTILE function in SQL.

View file

@ -1124,5 +1124,28 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
for r, ex in zip(rs, expected):
self.assertEqual(tuple(r), ex[:len(r)])
def test_window_functions_without_partitionBy(self):
df = self.sqlCtx.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
w = Window.orderBy("key", df.value)
from pyspark.sql import functions as F
sel = df.select(df.value, df.key,
F.max("key").over(w.rowsBetween(0, 1)),
F.min("key").over(w.rowsBetween(0, 1)),
F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))),
F.rowNumber().over(w),
F.rank().over(w),
F.denseRank().over(w),
F.ntile(2).over(w))
rs = sorted(sel.collect())
expected = [
("1", 1, 1, 1, 4, 1, 1, 1, 1),
("2", 1, 1, 1, 4, 2, 2, 2, 1),
("2", 1, 2, 1, 4, 3, 2, 2, 2),
("2", 2, 2, 2, 4, 4, 4, 3, 2)
]
for r, ex in zip(rs, expected):
self.assertEqual(tuple(r), ex[:len(r)])
if __name__ == "__main__":
unittest.main()

View file

@ -64,7 +64,7 @@ class Window(object):
Creates a :class:`WindowSpec` with the partitioning defined.
"""
sc = SparkContext._active_spark_context
jspec = sc._jvm.org.apache.spark.sql.expressions.Window.partitionBy(_to_java_cols(cols))
jspec = sc._jvm.org.apache.spark.sql.expressions.Window.orderBy(_to_java_cols(cols))
return WindowSpec(jspec)