diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 669de26f21..2432b81278 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -605,6 +605,22 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): """ return self.limit(num).collect() + @ignore_unicode_prefix + @since(3.0) + def tail(self, num): + """ + Returns the last ``num`` rows as a :class:`list` of :class:`Row`. + + Running tail requires moving data into the application's driver process, and doing so with + a very large ``num`` can crash the driver process with OutOfMemoryError. + + >>> df.tail(1) + [Row(age=5, name=u'Bob')] + """ + with SCCallSiteSync(self._sc): + sock_info = self._jdf.tailToPython(num) + return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer()))) + @since(1.3) def foreach(self, f): """Applies the ``f`` function to all :class:`Row` of this :class:`DataFrame`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index dd3d0f507f..0eb1a26c2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3331,6 +3331,16 @@ class Dataset[T] private[sql]( } } + private[sql] def tailToPython(n: Int): Array[Any] = { + EvaluatePython.registerPicklers() + withAction("tailToPython", queryExecution) { plan => + val toJava: (Any) => Any = EvaluatePython.toJava(_, schema) + val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler( + plan.executeTail(n).iterator.map(toJava)) + PythonRDD.serveIterator(iter, "serve-DataFrame") + } + } + private[sql] def getRowsToPython( _numRows: Int, truncate: Int): Array[Any] = {