diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index b7b2a5923c..0aff9cebe9 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1980,6 +1980,41 @@ class HiveContextSQLTests(ReusedPySparkTestCase): # Regression test for SPARK-17514: limit(n).collect() should the perform same as take(n) assert_runs_only_one_job_stage_and_task("collect_limit", lambda: df.limit(1).collect()) + @unittest.skipIf(sys.version_info < (3, 3), "Unittest < 3.3 doesn't support mocking") + def test_unbounded_frames(self): + from unittest.mock import patch + from pyspark.sql import functions as F + from pyspark.sql import window + import importlib + + df = self.spark.range(0, 3) + + def rows_frame_match(): + return "ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING" in df.select( + F.count("*").over(window.Window.rowsBetween(-sys.maxsize, sys.maxsize)) + ).columns[0] + + def range_frame_match(): + return "RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING" in df.select( + F.count("*").over(window.Window.rangeBetween(-sys.maxsize, sys.maxsize)) + ).columns[0] + + with patch("sys.maxsize", 2 ** 31 - 1): + importlib.reload(window) + self.assertTrue(rows_frame_match()) + self.assertTrue(range_frame_match()) + + with patch("sys.maxsize", 2 ** 63 - 1): + importlib.reload(window) + self.assertTrue(rows_frame_match()) + self.assertTrue(range_frame_match()) + + with patch("sys.maxsize", 2 ** 127 - 1): + importlib.reload(window) + self.assertTrue(rows_frame_match()) + self.assertTrue(range_frame_match()) + + importlib.reload(window) if __name__ == "__main__": from pyspark.sql.tests import * diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py index c345e623f1..7ce27f9b10 100644 --- a/python/pyspark/sql/window.py +++ b/python/pyspark/sql/window.py @@ -49,6 +49,8 @@ class Window(object): _JAVA_MIN_LONG = -(1 << 63) # -9223372036854775808 _JAVA_MAX_LONG = (1 << 63) - 1 # 9223372036854775807 + _PRECEDING_THRESHOLD = max(-sys.maxsize, _JAVA_MIN_LONG) + _FOLLOWING_THRESHOLD = min(sys.maxsize, _JAVA_MAX_LONG) unboundedPreceding = _JAVA_MIN_LONG @@ -98,9 +100,9 @@ class Window(object): The frame is unbounded if this is ``Window.unboundedFollowing``, or any value greater than or equal to 9223372036854775807. """ - if start <= Window._JAVA_MIN_LONG: + if start <= Window._PRECEDING_THRESHOLD: start = Window.unboundedPreceding - if end >= Window._JAVA_MAX_LONG: + if end >= Window._FOLLOWING_THRESHOLD: end = Window.unboundedFollowing sc = SparkContext._active_spark_context jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rowsBetween(start, end) @@ -123,14 +125,14 @@ class Window(object): :param start: boundary start, inclusive. The frame is unbounded if this is ``Window.unboundedPreceding``, or - any value less than or equal to -9223372036854775808. + any value less than or equal to max(-sys.maxsize, -9223372036854775808). :param end: boundary end, inclusive. The frame is unbounded if this is ``Window.unboundedFollowing``, or - any value greater than or equal to 9223372036854775807. + any value greater than or equal to min(sys.maxsize, 9223372036854775807). """ - if start <= Window._JAVA_MIN_LONG: + if start <= Window._PRECEDING_THRESHOLD: start = Window.unboundedPreceding - if end >= Window._JAVA_MAX_LONG: + if end >= Window._FOLLOWING_THRESHOLD: end = Window.unboundedFollowing sc = SparkContext._active_spark_context jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end) @@ -185,14 +187,14 @@ class WindowSpec(object): :param start: boundary start, inclusive. The frame is unbounded if this is ``Window.unboundedPreceding``, or - any value less than or equal to -9223372036854775808. + any value less than or equal to max(-sys.maxsize, -9223372036854775808). :param end: boundary end, inclusive. The frame is unbounded if this is ``Window.unboundedFollowing``, or - any value greater than or equal to 9223372036854775807. + any value greater than or equal to min(sys.maxsize, 9223372036854775807). """ - if start <= Window._JAVA_MIN_LONG: + if start <= Window._PRECEDING_THRESHOLD: start = Window.unboundedPreceding - if end >= Window._JAVA_MAX_LONG: + if end >= Window._FOLLOWING_THRESHOLD: end = Window.unboundedFollowing return WindowSpec(self._jspec.rowsBetween(start, end)) @@ -211,14 +213,14 @@ class WindowSpec(object): :param start: boundary start, inclusive. The frame is unbounded if this is ``Window.unboundedPreceding``, or - any value less than or equal to -9223372036854775808. + any value less than or equal to max(-sys.maxsize, -9223372036854775808). :param end: boundary end, inclusive. The frame is unbounded if this is ``Window.unboundedFollowing``, or - any value greater than or equal to 9223372036854775807. + any value greater than or equal to min(sys.maxsize, 9223372036854775807). """ - if start <= Window._JAVA_MIN_LONG: + if start <= Window._PRECEDING_THRESHOLD: start = Window.unboundedPreceding - if end >= Window._JAVA_MAX_LONG: + if end >= Window._FOLLOWING_THRESHOLD: end = Window.unboundedFollowing return WindowSpec(self._jspec.rangeBetween(start, end))