[SPARK-30539][PYTHON][SQL] Add DataFrame.tail in PySpark

### What changes were proposed in this pull request?

https://github.com/apache/spark/pull/26809 added `Dataset.tail` API. It should be good to have it in PySpark API as well.

### Why are the changes needed?

To support consistent APIs.

### Does this PR introduce any user-facing change?

No. It adds a new API.

### How was this patch tested?

Manually tested and doctest was added.

Closes #27251 from HyukjinKwon/SPARK-30539.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
HyukjinKwon 2020-01-18 00:18:12 -08:00 committed by Dongjoon Hyun
parent a3357dfcca
commit a6bdea3ad4
2 changed files with 26 additions and 0 deletions

View file

@ -605,6 +605,22 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
"""
return self.limit(num).collect()
@ignore_unicode_prefix
@since(3.0)
def tail(self, num):
"""
Returns the last ``num`` rows as a :class:`list` of :class:`Row`.
Running tail requires moving data into the application's driver process, and doing so with
a very large ``num`` can crash the driver process with OutOfMemoryError.
>>> df.tail(1)
[Row(age=5, name=u'Bob')]
"""
with SCCallSiteSync(self._sc):
sock_info = self._jdf.tailToPython(num)
return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
@since(1.3)
def foreach(self, f):
"""Applies the ``f`` function to all :class:`Row` of this :class:`DataFrame`.

View file

@ -3331,6 +3331,16 @@ class Dataset[T] private[sql](
}
}
private[sql] def tailToPython(n: Int): Array[Any] = {
EvaluatePython.registerPicklers()
withAction("tailToPython", queryExecution) { plan =>
val toJava: (Any) => Any = EvaluatePython.toJava(_, schema)
val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler(
plan.executeTail(n).iterator.map(toJava))
PythonRDD.serveIterator(iter, "serve-DataFrame")
}
}
private[sql] def getRowsToPython(
_numRows: Int,
truncate: Int): Array[Any] = {