Commit graph

22943 commits

Author SHA1 Message Date
Marco Gaido 6ad8d4c375 [SPARK-25289][ML] Avoid exception in ChiSqSelector with FDR when no feature is selected
## What changes were proposed in this pull request?

Currently, when FDR is used for `ChiSqSelector` and no feature is selected an exception is thrown because the max operation fails.

The PR fixes the problem by handling this case and returning an empty array in that case, as sklearn (which was the reference for the initial implementation of FDR) does.

## How was this patch tested?

added UT

Closes #22303 from mgaido91/SPARK-25289.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-09-01 08:41:07 -05:00
Liang-Chi Hsieh 7c36ee46d9 [SPARK-25290][CORE][TEST] Reduce the size of acquired arrays to avoid OOM error
## What changes were proposed in this pull request?

`BytesToBytesMapOnHeapSuite`.`randomizedStressTest` caused `OutOfMemoryError` on several test runs. Seems better to reduce memory usage in this test.

## How was this patch tested?

Unit tests.

Closes #22297 from viirya/SPARK-25290.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-09-01 16:25:29 +08:00
Kazuaki Ishizaki c5583fdcd2 [SPARK-23466][SQL] Remove redundant null checks in generated Java code by GenerateUnsafeProjection
## What changes were proposed in this pull request?

This PR works for one of TODOs in `GenerateUnsafeProjection` "if the nullability of field is correct, we can use it to save null check" to simplify generated code.
When `nullable=false` in `DataType`, `GenerateUnsafeProjection` removed code for null checks in the generated Java code.

## How was this patch tested?

Added new test cases into `GenerateUnsafeProjectionSuite`

Closes #20637 from kiszk/SPARK-23466.

Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
2018-09-01 12:19:19 +09:00
Ilan Filonenko e1d72f2c07 [SPARK-25264][K8S] Fix comma-delineated arguments passed into PythonRunner and RRunner
## What changes were proposed in this pull request?

Fixes the issue brought up in https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/273 where the arguments were being comma-delineated, which was incorrect wrt to the PythonRunner and RRunner.

## How was this patch tested?

Modified unit test to test this change.

Author: Ilan Filonenko <if56@cornell.edu>

Closes #22257 from ifilonenko/SPARK-25264.
2018-08-31 15:46:45 -07:00
Maxim Gekk 32da87dfa4 [SPARK-25286][CORE] Removing the dangerous parmap
## What changes were proposed in this pull request?

I propose to remove one of `parmap` methods which accepts an execution context as a parameter. The method should be removed to eliminate any deadlocks that can occur if `parmap` is called recursively on thread pools restricted by size.

Closes #22292 from MaxGekk/remove-overloaded-parmap.

Authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
2018-08-31 10:43:30 -07:00
Xiao Li 7fc8881b0f [SPARK-25296][SQL][TEST] Create ExplainSuite
## What changes were proposed in this pull request?
Move the output verification of Explain test cases to a new suite ExplainSuite.

## How was this patch tested?
N/A

Closes #22300 from gatorsmile/test3200.

Authored-by: Xiao Li <gatorsmile@gmail.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
2018-08-31 08:47:20 -07:00
huangtengfei02 339859c4e4 [SPARK-25261][MINOR][DOC] update the description for spark.executor|driver.memory in configuration.md
## What changes were proposed in this pull request?

As described in [SPARK-25261](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-25261),the unit of spark.executor.memory and spark.driver.memory is parsed as bytes in some cases if no unit specified, while in https://spark.apache.org/docs/latest/configuration.html#application-properties, they are descibed as MiB, which may lead to some misunderstandings.

## How was this patch tested?

N/A

Closes #22252 from ivoson/branch-correct-configuration.

Lead-authored-by: huangtengfei02 <huangtengfei02@baidu.com>
Co-authored-by: Huang Tengfei <tengfei.h@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-08-31 09:06:38 -05:00
yucai 8d9495a8f1 [SPARK-25207][SQL] Case-insensitve field resolution for filter pushdown when reading Parquet
## What changes were proposed in this pull request?

Currently, filter pushdown will not work if Parquet schema and Hive metastore schema are in different letter cases even spark.sql.caseSensitive is false.

Like the below case:
```scala
spark.sparkContext.hadoopConfiguration.setInt("parquet.block.size", 8 * 1024 * 1024)
spark.range(1, 40 * 1024 * 1024, 1, 1).sortWithinPartitions("id").write.parquet("/tmp/t")
sql("CREATE TABLE t (ID LONG) USING parquet LOCATION '/tmp/t'")
sql("select * from t where id < 100L").write.csv("/tmp/id")
```

Although filter "ID < 100L" is generated by Spark, it fails to pushdown into parquet actually, Spark still does the full table scan when reading.
This PR provides a case-insensitive field resolution to make it work.

Before - "ID < 100L" fail to pushedown:
<img width="273" alt="screen shot 2018-08-23 at 10 08 26 pm" src="https://user-images.githubusercontent.com/2989575/44530558-40ef8b00-a721-11e8-8abc-7f97671590d3.png">
After - "ID < 100L" pushedown sucessfully:
<img width="267" alt="screen shot 2018-08-23 at 10 08 40 pm" src="https://user-images.githubusercontent.com/2989575/44530567-44831200-a721-11e8-8634-e9f664b33d39.png">

## How was this patch tested?

Added UTs.

Closes #22197 from yucai/SPARK-25207.

Authored-by: yucai <yyu1@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-31 19:24:09 +08:00
Steve Loughran 515708d5f3 [SPARK-25183][SQL] Spark HiveServer2 to use Spark ShutdownHookManager
## What changes were proposed in this pull request?

Switch `org.apache.hive.service.server.HiveServer2` to register its shutdown callback with Spark's `ShutdownHookManager`, rather than direct with the Java Runtime callback.

This avoids race conditions in shutdown where the filesystem is shutdown before the flush/write/rename of the event log is completed, particularly on object stores where the write and rename can be slow.

## How was this patch tested?

There's no explicit unit for test this, which is consistent with every other shutdown hook in the codebase.

* There's an implicit test when the scalatest process is halted.
* More manual/integration testing is needed.

HADOOP-15679 has added the ability to explicitly execute the hadoop shutdown hook sequence which spark uses; that could be stabilized for testing if desired, after which all the spark hooks could be tested. Until then: external system tests only.

Author: Steve Loughran <stevel@hortonworks.com>

Closes #22186 from steveloughran/BUG/SPARK-25183-shutdown.
2018-08-31 14:45:29 +08:00
Shixiong Zhu aa70a0a1a4
[SPARK-25288][TESTS] Fix flaky Kafka transaction tests
## What changes were proposed in this pull request?

Here are the failures:

http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaRelationSuite&test_name=read+Kafka+transactional+messages%3A+read_committed
http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceSuite&test_name=read+Kafka+transactional+messages%3A+read_committed
http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite&test_name=read+Kafka+transactional+messages%3A+read_committed

I found the Kafka consumer may not see the committed messages for a short time. This PR just adds a new method `waitUntilOffsetAppears` and uses it to make sure the consumer can see a specified offset before checking the result.

## How was this patch tested?

Jenkins

Closes #22293 from zsxwing/SPARK-25288.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-30 23:23:11 -07:00
忍冬 f29c2b5287 [SPARK-25256][SQL][TEST] Plan mismatch errors in Hive tests in Scala 2.12
## What changes were proposed in this pull request?

### For `SPARK-5775 read array from partitioned_parquet_with_key_and_complextypes`:

scala2.12
```
scala> (1 to 10).toString
res4: String = Range 1 to 10
```

scala2.11
```
scala> (1 to 10).toString
res2: String = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
```
And

```
  def prepareAnswer(answer: Seq[Row], isSorted: Boolean): Seq[Row] = {
    val converted: Seq[Row] = answer.map(prepareRow)
    if (!isSorted) converted.sortBy(_.toString()) else converted
  }
```
sortBy `_.toString` is not a good idea.

### Other failures are caused by

```
Array(Int.box(1)).toSeq == Array(Double.box(1.0)).toSeq
```

It is false in 2.12.2 + and is true in 2.11.x , 2.12.0, 2.12.1

## How was this patch tested?

This is a  patch on a specific unit test.

Closes #22264 from sadhen/SPARK25256.

Authored-by: 忍冬 <rendong@wacai.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-08-30 22:37:40 -05:00
Erik Erlandson bb3e6ed921 [SPARK-25287][INFRA] Add up-front check for JIRA_USERNAME and JIRA_PASSWORD
## What changes were proposed in this pull request?

Add an up-front check that `JIRA_USERNAME` and `JIRA_PASSWORD` have been set. If they haven't, ask user if they want to continue. This prevents the JIRA state update from failing at the very end of the process because user forgot to set these environment variables.

## How was this patch tested?

I ran the script with environment vars set, and unset, to verify it works as specified.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #22294 from erikerlandson/spark-25287.

Authored-by: Erik Erlandson <eerlands@redhat.com>
Signed-off-by: Erik Erlandson <eerlands@redhat.com>
2018-08-30 15:08:12 -07:00
Erik Erlandson d6d1224ffa [SPARK-25275][K8S] require memberhip in wheel to run 'su' in dockerfiles
## What changes were proposed in this pull request?
Add a PAM configuration in k8s dockerfile to require authentication into wheel to run as `su`

## How was this patch tested?
Verify against CI that PAM config succeeds & causes no regressions

Closes #22285 from erikerlandson/spark-25275.

Authored-by: Erik Erlandson <eerlands@redhat.com>
Signed-off-by: Erik Erlandson <eerlands@redhat.com>
2018-08-30 14:07:04 -07:00
忍冬 a5fb5b62c3 [SPARK-25235][BUILD][SHELL][FOLLOWUP] Fix repl compile for 2.12
## What changes were proposed in this pull request?

Error messages from https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test/job/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/183/

```
[INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first)  spark-repl_2.12 ---
[INFO] Using zinc server for incremental compilation
[warn] Pruning sources from previous analysis, due to incompatible CompileSetup.
[info] Compiling 6 Scala sources to /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/repl/target/scala-2.12/classes...
[error] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoopInterpreter.scala:80: overriding lazy value importableSymbolsWithRenames in class ImportHandler of type List[(this.intp.global.Symbol, this.intp.global.Name)];
[error]  lazy value importableSymbolsWithRenames needs `override' modifier
[error]       lazy val importableSymbolsWithRenames: List[(Symbol, Name)] = {
[error]                ^
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala:53: variable addedClasspath in class ILoop is deprecated (since 2.11.0): use reset, replay or require to update class path
[warn]       if (addedClasspath != "") {
[warn]           ^
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala:54: variable addedClasspath in class ILoop is deprecated (since 2.11.0): use reset, replay or require to update class path
[warn]         settings.classpath append addedClasspath
[warn]                                   ^
[warn] two warnings found
[error] one error found
[error] Compile failed at Aug 29, 2018 5:28:22 PM [0.679s]
```

Readd the profile for `scala-2.12`. Using `-Pscala-2.12` will overrides `extra.source.dir` and `extra.testsource.dir` with two non-exist directories.

## How was this patch tested?

First, make sure it compiles.
```
dev/change-scala-version.sh 2.12

mvn -Pscala-2.12 -DskipTests compile install
```

Then, make a distribution to try the repl:

`./dev/make-distribution.sh --name custom-spark --tgz -Phadoop-2.7 -Phive -Pyarn -Pscala-2.12`

```
18/08/30 16:04:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://172.16.131.140:4040
Spark context available as 'sc' (master = local[*], app id = local-1535616298812).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0-SNAPSHOT
      /_/

Using Scala version 2.12.6 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.sql("select percentile(key, 1) from values (1, 1),(2, 1) T(key, value)").show
+-------------------------------------+
|percentile(key, CAST(1 AS DOUBLE), 1)|
+-------------------------------------+
|                                  2.0|
+-------------------------------------+
```

Closes #22280 from sadhen/SPARK_24785_FOLLOWUP.

Authored-by: 忍冬 <rendong@wacai.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-08-30 15:54:07 -05:00
aai95 c685b5f56a
[SPARK-24411][SQL] Adding native Java tests for 'isInCollection'
## What changes were proposed in this pull request?
`JavaColumnExpressionSuite.java` was added and `org.apache.spark.sql.ColumnExpressionSuite#test("isInCollection: Java Collection")` was removed.
It provides native Java tests for the method `org.apache.spark.sql.Column#isInCollection`.

Closes #22253 from aai95/isInCollectionJavaTest.

Authored-by: aai95 <aai95@yandex.ru>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2018-08-30 20:38:03 +00:00
Reza Safi 135ff16a35 [SPARK-25233][STREAMING] Give the user the option of specifying a minimum message per partition per batch when using kafka direct API with backpressure
After SPARK-18371, it is guaranteed that there would be at least one message per partition per batch using direct kafka API when new messages exist in the topics. This change will give the user the option of setting the minimum instead of just a hard coded 1 limit
The related unit test is updated and some internal tests verified that the topic partitions with new messages will be progressed by the specified minimum.

Author: Reza Safi <rezasafi@cloudera.com>

Closes #22223 from rezasafi/streaminglag.
2018-08-30 13:26:03 -05:00
Kazuaki Ishizaki 9e0f9591af [SPARK-23997][SQL][FOLLOWUP] Update exception message
## What changes were proposed in this pull request?

This PR is an follow-up PR of #21087 based on [a discussion thread](https://github.com/apache/spark/pull/21087#discussion_r211080067]. Since #21087 changed a condition of `if` statement, the message in an exception is not consistent of the current behavior.
This PR updates the exception message.

## How was this patch tested?

Existing UTs

Closes #22269 from kiszk/SPARK-23997-followup.

Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-08-30 11:21:40 -05:00
Maxim Gekk 3c67cb0b52 [SPARK-25273][DOC] How to install testthat 1.0.2
## What changes were proposed in this pull request?

R tests require `testthat` v1.0.2. In the PR, I described how to install the version in the section http://spark.apache.org/docs/latest/building-spark.html#running-r-tests.

Closes #22272 from MaxGekk/r-testthat-doc.

Authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-30 20:25:26 +08:00
Yuming Wang e9fce2a4c1 [SPARK-24716][TESTS][FOLLOW-UP] Test Hive metastore schema and parquet schema are in different letter cases
## What changes were proposed in this pull request?

Since https://github.com/apache/spark/pull/21696. Spark uses Parquet schema instead of Hive metastore schema to do pushdown.
That change can avoid wrong records returned when Hive metastore schema and parquet schema are in different letter cases. This pr add a test case for it.

More details:
https://issues.apache.org/jira/browse/SPARK-25206

## How was this patch tested?

unit tests

Closes #22267 from wangyum/SPARK-24716-TESTS.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-30 16:24:47 +08:00
忍冬 56bc70047e [SQL][MINOR] Fix compiling for scala 2.12
## What changes were proposed in this pull request?
Introduced by #21320 and #11744

```
$ sbt
> ++2.12.6
> project sql
> compile
...
[error] [warn] spark/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala:41: match may not be exhaustive.
[error] It would fail on the following inputs: (_, ArrayType(_, _)), (_, _)
[error] [warn]         getProjection(a.child).map(p => (p, p.dataType)).map {
[error] [warn]
[error] [warn] spark/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala:52: match may not be exhaustive.
[error] It would fail on the following input: (_, _)
[error] [warn]         getProjection(child).map(p => (p, p.dataType)).map {
[error] [warn]
...
```

And

```
$ sbt
> ++2.12.6
> project hive
> testOnly *ParquetMetastoreSuite
...
[error] /Users/rendong/wdi/spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala:22: object tools is not a member of package scala
[error] import scala.tools.nsc.Properties
[error]              ^
[error] /Users/rendong/wdi/spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala:146: not found: value Properties
[error]     val version = Properties.versionNumberString match {
[error]                   ^
[error] two errors found
...
```

## How was this patch tested?
Existing tests.

Closes #22260 from sadhen/fix_exhaustive_match.

Authored-by: 忍冬 <rendong@wacai.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-30 15:05:36 +08:00
cclauss 3a66a7fca9 [SPARK-25253][PYSPARK][FOLLOWUP] Undefined name: from pyspark.util import _exception_message
HyukjinKwon

## What changes were proposed in this pull request?

add __from pyspark.util import \_exception_message__ to python/pyspark/java_gateway.py

## How was this patch tested?

[flake8](http://flake8.pycqa.org) testing of https://github.com/apache/spark on Python 3.7.0

$ __flake8 . --count --select=E901,E999,F821,F822,F823 --show-source --statistics__
```
./python/pyspark/java_gateway.py:172:20: F821 undefined name '_exception_message'
            emsg = _exception_message(e)
                   ^
1     F821 undefined name '_exception_message'
1
```

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #22265 from cclauss/patch-2.

Authored-by: cclauss <cclauss@bluewin.ch>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-30 08:13:11 +08:00
Thomas Graves ec3e998638 [SPARK-24909][CORE] Always unregister pending partition on task completion.
Spark scheduler can hang when fetch failures, executor lost, task running on lost executor, and multiple stage attempts. To fix this we change to always unregister the pending partition on task completion.

## What changes were proposed in this pull request?
this PR is actually reverting the change in SPARK-19263, so that it always does shuffleStage.pendingPartitions -= task.partitionId.   The change in SPARK-23433, should fix the issue originally from SPARK-19263.

## How was this patch tested?

Unit tests.  The condition happens on a race which I haven't reproduced on a real customer, just see it sometimes on customers jobs in a real cluster.
I am also working on adding spark scheduler integration tests.

Closes #21976 from tgravescs/SPARK-24909.

Authored-by: Thomas Graves <tgraves@unharmedunarmed.corp.ne1.yahoo.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2018-08-29 16:32:02 -07:00
Juliusz Sompolski 6b1b10ca85 [DOC] Fix comment on SparkPlanGraphEdge
## What changes were proposed in this pull request?

`fromId` is the child, and `toId` is the parent, see line 127 in `buildSparkPlanGraphNode` above.
The edges in Spark UI also go from child to parent.

## How was this patch tested?

Comment change only. Inspected code above. Inspected how the edges in Spark UI look like.

Closes #22268 from juliuszsompolski/sparkplangraphedgedoc.

Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
2018-08-29 12:55:44 -07:00
Xiangrui Meng 20b7c684cc [SPARK-25248][.1][PYSPARK] update barrier Python API
## What changes were proposed in this pull request?

I made one pass over the Python APIs for barrier mode and updated them to match the Scala doc in #22240 . Major changes:

* export the public classes
* expand the docs
* add doc for BarrierTaskInfo.addresss

cc: jiangxb1987

Closes #22261 from mengxr/SPARK-25248.1.

Authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2018-08-29 07:22:03 -07:00
sarutak 3864480e14 [SPARK-25266][CORE] Fix memory leak in Barrier Execution Mode
## What changes were proposed in this pull request?

BarrierCoordinator uses Timer and TimerTask. `TimerTask#cancel()` is invoked in ContextBarrierState#cancelTimerTask but `Timer#purge()` is never invoked.

Once a TimerTask is scheduled, the reference to it is not released until `Timer#purge()` is invoked even though `TimerTask#cancel()` is invoked.

## How was this patch tested?

I checked the number of instances related to the TimerTask using jmap.

Closes #22258 from sarutak/fix-barrierexec-oom.

Authored-by: sarutak <sarutak@oss.nttdata.co.jp>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2018-08-29 07:13:13 -07:00
Sean Owen 1fd59c129a [WIP][SPARK-25044][SQL] (take 2) Address translation of LMF closure primitive args to Object in Scala 2.12
## What changes were proposed in this pull request?

Alternative take on https://github.com/apache/spark/pull/22063 that does not introduce udfInternal.
Resolve issue with inferring func types in 2.12 by instead using info captured when UDF is registered -- capturing which types are nullable (i.e. not primitive)

## How was this patch tested?

Existing tests.

Closes #22259 from srowen/SPARK-25044.2.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-29 15:23:16 +08:00
Bryan Cutler 82c18c240a [SPARK-23030][SQL][PYTHON] Use Arrow stream format for creating from and collecting Pandas DataFrames
## What changes were proposed in this pull request?

This changes the calls of `toPandas()` and `createDataFrame()` to use the Arrow stream format, when Arrow is enabled.  Previously, Arrow data was written to byte arrays where each chunk is an output of the Arrow file format.  This was mainly due to constraints at the time, and caused some overhead by writing the schema/footer on each chunk of data and then having to read multiple Arrow file inputs and concat them together.

Using the Arrow stream format has improved these by increasing performance, lower memory overhead for the average case, and simplified the code.  Here are the details of this change:

**toPandas()**

_Before:_
Spark internal rows are converted to Arrow file format, each group of records is a complete Arrow file which contains the schema and other metadata.  Next a collect is done and an Array of Arrow files is the result.  After that each Arrow file is sent to Python driver which then loads each file and concats them to a single Arrow DataFrame.

_After:_
Spark internal rows are converted to ArrowRecordBatches directly, which is the simplest Arrow component for IPC data transfers.  The driver JVM then immediately starts serving data to Python as an Arrow stream, sending the schema first. It then starts a Spark job with a custom handler that sends Arrow RecordBatches to Python. Partitions arriving in order are sent immediately, and out-of-order partitions are buffered until the ones that precede it come in. This improves performance, simplifies memory usage on executors, and improves the average memory usage on the JVM driver.  Since the order of partitions must be preserved, the worst case is that the first partition will be the last to arrive all data must be buffered in memory until then. This case is no worse that before when doing a full collect.

**createDataFrame()**

_Before:_
A Pandas DataFrame is split into parts and each part is made into an Arrow file.  Then each file is prefixed by the buffer size and written to a temp file.  The temp file is read and each Arrow file is parallelized as a byte array.

_After:_
A Pandas DataFrame is split into parts, then an Arrow stream is written to a temp file where each part is an ArrowRecordBatch.  The temp file is read as a stream and the Arrow messages are examined.  If the message is an ArrowRecordBatch, the data is saved as a byte array.  After reading the file, each ArrowRecordBatch is parallelized as a byte array.  This has slightly more processing than before because we must look each Arrow message to extract the record batches, but performance ends up a litle better.  It is cleaner in the sense that IPC from Python to JVM is done over a single Arrow stream.

## How was this patch tested?

Added new unit tests for the additions to ArrowConverters in Scala, existing tests for Python.

## Performance Tests - toPandas

Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute `toPandas()` and took the average best time of 5 runs/5 loops each.

Test code
```python
df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand()).withColumn("x4", rand())
for i in range(5):
	start = time.time()
	_ = df.toPandas()
	elapsed = time.time() - start
```

Current Master | This PR
---------------------|------------
5.803557 | 5.16207
5.409119 | 5.133671
5.493509 | 5.147513
5.433107 | 5.105243
5.488757 | 5.018685

Avg Master | Avg This PR
------------------|--------------
5.5256098 | 5.1134364

Speedup of **1.08060595**

## Performance Tests - createDataFrame

Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute `createDataFrame()` and get the first record. Took the average best time of 5 runs/5 loops each.

Test code
```python
def run():
	pdf = pd.DataFrame(np.random.rand(10000000, 10))
	spark.createDataFrame(pdf).first()

for i in range(6):
	start = time.time()
	run()
	elapsed = time.time() - start
	gc.collect()
	print("Run %d: %f" % (i, elapsed))
```

Current Master | This PR
--------------------|----------
6.234608 | 5.665641
6.32144 | 5.3475
6.527859 | 5.370803
6.95089 | 5.479151
6.235046 | 5.529167

Avg Master | Avg This PR
---------------|----------------
6.4539686 | 5.4784524

Speedup of **1.178064192**

## Memory Improvements

**toPandas()**

The most significant improvement is reduction of the upper bound space complexity in the JVM driver.  Before, the entire dataset was collected in the JVM first before sending it to Python.  With this change, as soon as a partition is collected, the result handler immediately sends it to Python, so the upper bound is the size of the largest partition.  Also, using the Arrow stream format is more efficient because the schema is written once per stream, followed by record batches.  The schema is now only send from driver JVM to Python.  Before, multiple Arrow file formats were used that each contained the schema.  This duplicated schema was created in the executors, sent to the driver JVM, and then Python where all but the first one received are discarded.

I verified the upper bound limit by running a test that would collect data that would exceed the amount of driver JVM memory available.  Using these settings on a standalone cluster:
```
spark.driver.memory 1g
spark.executor.memory 5g
spark.sql.execution.arrow.enabled true
spark.sql.execution.arrow.fallback.enabled false
spark.sql.execution.arrow.maxRecordsPerBatch 0
spark.driver.maxResultSize 2g
```

Test code:
```python
from pyspark.sql.functions import rand
df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand())
df.toPandas()
```

This makes total data size of 33554432×8×4 = 1073741824

With the current master, it fails with OOM but passes using this PR.

**createDataFrame()**

No significant change in memory except that using the stream format instead of separate file formats avoids duplicated the schema, similar to toPandas above.  The process of reading the stream and parallelizing the batches does cause the record batch message metadata to be copied, but it's size is insignificant.

Closes #21546 from BryanCutler/arrow-toPandas-stream-SPARK-23030.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-29 15:01:12 +08:00
DB Tsai ff8dcc1d4c
[SPARK-25235][SHELL] Merge the REPL code in Scala 2.11 and 2.12 branches
## What changes were proposed in this pull request?

Using some reflection tricks to merge Scala 2.11 and 2.12 codebase.

## How was this patch tested?

Existing tests.

Closes #22246 from dbtsai/repl.

Lead-authored-by: DB Tsai <d_tsai@apple.com>
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2018-08-29 04:30:31 +00:00
Imran Rashid 38391c9aa8 [SPARK-25253][PYSPARK] Refactor local connection & auth code
This eliminates some duplication in the code to connect to a server on localhost to talk directly to the jvm.  Also it gives consistent ipv6 and error handling.  Two other incidental changes, that shouldn't matter:
1) python barrier tasks perform authentication immediately (rather than waiting for the BARRIER_FUNCTION indicator)
2) for `rdd._load_from_socket`, the timeout is only increased after authentication.

Closes #22247 from squito/py_connection_refactor.

Authored-by: Imran Rashid <irashid@cloudera.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-29 09:47:38 +08:00
Arun Mahadevan 68ec207a32 [SPARK-25260][SQL] Fix namespace handling in SchemaConverters.toAvroType
## What changes were proposed in this pull request?

`toAvroType` converts spark data type to avro schema. It always appends the record name to namespace so its impossible to have an Avro namespace independent of the record name.

When invoked with a spark data type like,

```java
val sparkSchema = StructType(Seq(
    StructField("name", StringType, nullable = false),
    StructField("address", StructType(Seq(
        StructField("city", StringType, nullable = false),
        StructField("state", StringType, nullable = false))),
    nullable = false)))

// map it to an avro schema with record name "employee" and top level namespace "foo.bar",
val avroSchema = SchemaConverters.toAvroType(sparkSchema,  false, "employee", "foo.bar")

// result is
// avroSchema.getName = employee
// avroSchema.getNamespace = foo.bar.employee
// avroSchema.getFullname = foo.bar.employee.employee
```
The patch proposes to fix this so that the result is

```
avroSchema.getName = employee
avroSchema.getNamespace = foo.bar
avroSchema.getFullname = foo.bar.employee
```
## How was this patch tested?

New and existing unit tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #22251 from arunmahadevan/avro-fix.

Authored-by: Arun Mahadevan <arunm@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-29 09:25:49 +08:00
Marco Gaido 32c8a3d7be [MINOR] Avoid code duplication for nullable in Higher Order function
## What changes were proposed in this pull request?

Most of  `HigherOrderFunction`s have the same `nullable` definition, ie. they are nullable when one of their arguments is nullable. The PR refactors it in order to avoid code duplication.

## How was this patch tested?

NA

Closes #22243 from mgaido91/MINOR_nullable_hof.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-29 09:20:32 +08:00
Bo Meng bbbf814691 [SPARK-22357][CORE] SparkContext.binaryFiles ignore minPartitions parameter
## What changes were proposed in this pull request?
Fix the issue that minPartitions was not used in the method. This is a simple fix and I am not trying to make it complicated. The purpose is to still allow user to control the defaultParallelism through the value of minPartitions, while also via sc.defaultParallelism parameters.

## How was this patch tested?
I have not provided the additional test since the fix is very straightforward.

Closes #21638 from bomeng/22357.

Lead-authored-by: Bo Meng <mengbo@hotmail.com>
Co-authored-by: Bo Meng <bo.meng@jd.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-08-28 19:39:13 -05:00
Bogdan Raducanu 103854028e [SPARK-25212][SQL] Support Filter in ConvertToLocalRelation
## What changes were proposed in this pull request?
Support Filter in ConvertToLocalRelation, similar to how Project works.
Additionally, in Optimizer, run ConvertToLocalRelation earlier to simplify the plan. This is good for very short queries which often are queries on local relations.

## How was this patch tested?
New test. Manual benchmark.

Author: Bogdan Raducanu <bogdan@databricks.com>
Author: Shixiong Zhu <zsxwing@gmail.com>
Author: Yinan Li <ynli@google.com>
Author: Li Jin <ice.xelloss@gmail.com>
Author: s71955 <sujithchacko.2010@gmail.com>
Author: DB Tsai <d_tsai@apple.com>
Author: jaroslav chládek <mastermism@gmail.com>
Author: Huangweizhe <huangweizhe@bbdservice.com>
Author: Xiangrui Meng <meng@databricks.com>
Author: hyukjinkwon <gurwls223@apache.org>
Author: Kent Yao <yaooqinn@hotmail.com>
Author: caoxuewen <cao.xuewen@zte.com.cn>
Author: liuxian <liu.xian3@zte.com.cn>
Author: Adam Bradbury <abradbury@users.noreply.github.com>
Author: Jose Torres <torres.joseph.f+github@gmail.com>
Author: Yuming Wang <yumwang@ebay.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #22205 from bogdanrdc/local-relation-filter.
2018-08-28 15:50:25 -07:00
Ryan Blue 7ad18ee9f2 [SPARK-25004][CORE] Add spark.executor.pyspark.memory limit.
## What changes were proposed in this pull request?

This adds `spark.executor.pyspark.memory` to configure Python's address space limit, [`resource.RLIMIT_AS`](https://docs.python.org/3/library/resource.html#resource.RLIMIT_AS). Limiting Python's address space allows Python to participate in memory management. In practice, we see fewer cases of Python taking too much memory because it doesn't know to run garbage collection. This results in YARN killing fewer containers. This also improves error messages so users know that Python is consuming too much memory:

```
  File "build/bdist.linux-x86_64/egg/package/library.py", line 265, in fe_engineer
    fe_eval_rec.update(f(src_rec_prep, mat_rec_prep))
  File "build/bdist.linux-x86_64/egg/package/library.py", line 163, in fe_comp
    comparisons = EvaluationUtils.leven_list_compare(src_rec_prep.get(item, []), mat_rec_prep.get(item, []))
  File "build/bdist.linux-x86_64/egg/package/evaluationutils.py", line 25, in leven_list_compare
    permutations = sorted(permutations, reverse=True)
  MemoryError
```

The new pyspark memory setting is used to increase requested YARN container memory, instead of sharing overhead memory between python and off-heap JVM activity.

## How was this patch tested?

Tested memory limits in our YARN cluster and verified that MemoryError is thrown.

Author: Ryan Blue <blue@apache.org>

Closes #21977 from rdblue/SPARK-25004-add-python-memory-limit.
2018-08-28 12:31:33 -07:00
Maxim Gekk aff8f15c15 [SPARK-25240][SQL] Fix for a deadlock in RECOVER PARTITIONS
## What changes were proposed in this pull request?

In the PR, I propose to not perform recursive parallel listening of files in the `scanPartitions` method because it can cause a deadlock. Instead of that I propose to do `scanPartitions` in parallel for top level partitions only.

## How was this patch tested?

I extended an existing test to trigger the deadlock.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #22233 from MaxGekk/fix-recover-partitions.
2018-08-28 11:29:05 -07:00
jerryshao 4e3f3cebe4 [SPARK-23679][YARN] Setting RM_HA_URLS for AmIpFilter to avoid redirect failure in YARN mode
## What changes were proposed in this pull request?

YARN `AmIpFilter` adds a new parameter "RM_HA_URLS" to support RM HA, but Spark on YARN doesn't provide a such parameter, so it will be failed to redirect when running on RM HA. The detailed exception can be checked from JIRA. So here fixing this issue by adding "RM_HA_URLS" parameter.

## How was this patch tested?

Local verification.

Closes #22164 from jerryshao/SPARK-23679.

Authored-by: jerryshao <sshao@hortonworks.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2018-08-28 10:33:39 -07:00
Fernando Pereira de46df549a [SPARK-23997][SQL] Configurable maximum number of buckets
## What changes were proposed in this pull request?
This PR implements the possibility of the user to override the maximum number of buckets when saving to a table.
Currently the limit is a hard-coded 100k, which might be insufficient for large workloads.
A new configuration entry is proposed: `spark.sql.bucketing.maxBuckets`, which defaults to the previous 100k.

## How was this patch tested?
Added unit tests in the following spark.sql test suites:

- CreateTableAsSelectSuite
- BucketedWriteSuite

Author: Fernando Pereira <fernando.pereira@epfl.ch>

Closes #21087 from ferdonline/enh/configurable_bucket_limit.
2018-08-28 10:31:47 -07:00
Shixiong Zhu 1149c4efbc
[SPARK-25005][SS] Support non-consecutive offsets for Kafka
## What changes were proposed in this pull request?

As the user uses Kafka transactions to write data, the offsets in Kafka will be non-consecutive. It will contains some transaction (commit or abort) markers. In addition, if the consumer's `isolation.level` is `read_committed`, `poll` will not return aborted messages either. Hence, we will see non-consecutive offsets in the date returned by `poll`. However, as `seekToEnd` may move the offset point to these missing offsets, there are 4 possible corner cases we need to support:

- The whole batch contains no data messages
- The first offset in a batch is not a committed data message
- The last offset in a batch is not a committed data message
- There is a gap in the middle of a batch

They are all covered by the new unit tests.

## How was this patch tested?

The new unit tests.

Closes #22042 from zsxwing/kafka-transaction-read.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-28 08:38:07 -07:00
Shixiong Zhu 592e3a42c2
[SPARK-25218][CORE] Fix potential resource leaks in TransportServer and SocketAuthHelper
## What changes were proposed in this pull request?

Make sure TransportServer and SocketAuthHelper close the resources for all types of errors.

## How was this patch tested?

Jenkins

Closes #22210 from zsxwing/SPARK-25218.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-28 08:36:06 -07:00
Li Jin 8198ea5019 [SPARK-24721][SQL] Exclude Python UDFs filters in FileSourceStrategy
## What changes were proposed in this pull request?
The PR excludes Python UDFs filters in FileSourceStrategy so that they don't ExtractPythonUDF rule to throw exception. It doesn't make sense to pass Python UDF filters in FileSourceStrategy anyway because they cannot be used as push down filters.

## How was this patch tested?
Add a new regression test

Closes #22104 from icexelloss/SPARK-24721-udf-filter.

Authored-by: Li Jin <ice.xelloss@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-28 10:57:13 +08:00
Yinan Li dac099d082 [SPARK-24090][K8S] Update running-on-kubernetes.md
## What changes were proposed in this pull request?

Updated documentation for Spark on Kubernetes for the upcoming 2.4.0.

Please review http://spark.apache.org/contributing.html before opening a pull request.

mccheah erikerlandson

Closes #22224 from liyinan926/master.

Authored-by: Yinan Li <ynli@google.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-08-27 15:55:34 -05:00
Yuming Wang c3f285c939 [SPARK-24149][YARN][FOLLOW-UP] Only get the delegation tokens of the filesystem explicitly specified by the user
## What changes were proposed in this pull request?

Our HDFS cluster configured 5 nameservices: `nameservices1`, `nameservices2`, `nameservices3`, `nameservices-dev1` and `nameservices4`, but `nameservices-dev1` unstable. So sometimes an error occurred and causing the entire job failed since [SPARK-24149](https://issues.apache.org/jira/browse/SPARK-24149):

![image](https://user-images.githubusercontent.com/5399861/42434779-f10c48fc-8386-11e8-98b0-4d9786014744.png)

I think it's best to add a switch here.

## How was this patch tested?

manual tests

Closes #21734 from wangyum/SPARK-24149.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2018-08-27 13:26:55 -07:00
Jose Torres 810d59ce44
[SPARK-24882][FOLLOWUP] Fix flaky synchronization in Kafka tests.
## What changes were proposed in this pull request?

Fix flaky synchronization in Kafka tests - we need to use the scan config that was persisted rather than reconstructing it to identify the stream's current configuration.

We caught most instances of this in the original PR, but this one slipped through.

## How was this patch tested?

n/a

Closes #22245 from jose-torres/fixflake.

Authored-by: Jose Torres <torres.joseph.f+github@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-27 11:04:39 -07:00
liuxian 381a967a76 [SPARK-25249][CORE][TEST] add a unit test for OpenHashMap
## What changes were proposed in this pull request?

This PR adds a unit test for OpenHashMap , this can help developers  to distinguish between the 0/0.0/0L and null

## How was this patch tested?

Closes #22241 from 10110346/openhashmap.

Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-08-27 12:05:33 -05:00
caoxuewen 6193a202aa [SPARK-24978][SQL] Add spark.sql.fast.hash.aggregate.row.max.capacity to configure the capacity of fast aggregation.
## What changes were proposed in this pull request?

this pr add a configuration parameter to configure the capacity of fast aggregation.
Performance comparison:

```
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
 Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
 Aggregate w multiple keys:               Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
 fasthash = default                            5612 / 5882          3.7         267.6       1.0X
 fasthash = config                             3586 / 3595          5.8         171.0       1.6X

```

## How was this patch tested?
the existed test cases.

Closes #21931 from heary-cao/FastHashCapacity.

Authored-by: caoxuewen <cao.xuewen@zte.com.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-27 15:45:48 +08:00
Liang-Chi Hsieh 5c27b0d4f8 [SPARK-19355][SQL][FOLLOWUP] Remove the child.outputOrdering check in global limit
## What changes were proposed in this pull request?

This is based on the discussion https://github.com/apache/spark/pull/16677/files#r212805327.

As SQL standard doesn't mandate that a nested order by followed by a limit has to respect that ordering clause, this patch removes the `child.outputOrdering` check.

## How was this patch tested?

Unit tests.

Closes #22239 from viirya/improve-global-limit-parallelism-followup.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-27 14:02:50 +08:00
hyukjinkwon 5cdb8a23df [SPARK-23698][PYTHON][FOLLOWUP] Resolve undefiend names in setup.py
## What changes were proposed in this pull request?

`__version__` in `setup.py` is currently being dynamically read by `exec`; so the linter complains. Better just switch it off for this line for now.

**Before:**

```bash
$ python -m flake8 . --count --select=E9,F82 --show-source --statistics
./setup.py:37:11: F821 undefined name '__version__'
VERSION = __version__
          ^
1     F821 undefined name '__version__'
1
```

**After:**

```bash
$ python -m flake8 . --count --select=E9,F82 --show-source --statistics
0
```

## How was this patch tested?

Manually tested.

Closes #22235 from HyukjinKwon/SPARK-23698.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-27 10:02:31 +08:00
Adam Bradbury ad43e2c1e8 [SPARK-23792][DOCS] Documentation improvements for datetime functions
## What changes were proposed in this pull request?

Improved the documentation for the datetime functions in `org.apache.spark.sql.functions` by adding details about the supported column input types, the column return type, behaviour on invalid input, supporting examples and clarifications.

## How was this patch tested?

Manually testing each of the datetime functions with different input to ensure that the corresponding Javadoc/Scaladoc matches the behaviour of the function. Successfully ran the `unidoc` SBT process.

Closes #20901 from abradbury/SPARK-23792.

Authored-by: Adam Bradbury <abradbury@users.noreply.github.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-08-26 08:37:52 -05:00
Shixiong Zhu c17a8ff523
[SPARK-25214][SS][FOLLOWUP] Fix the issue that Kafka v2 source may return duplicated records when failOnDataLoss=false
## What changes were proposed in this pull request?

This is a follow up PR for #22207 to fix a potential flaky test. `processAllAvailable` doesn't work for continuous processing so we should not use it for a continuous query.

## How was this patch tested?

Jenkins.

Closes #22230 from zsxwing/SPARK-25214-2.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-25 09:17:40 -07:00
Huangweizhe 6c66ab8b33 [SPARK-24688][EXAMPLES] Modify the comments about LabeledPoint
## What changes were proposed in this pull request?

An RDD is created using LabeledPoint, but the comment is like #LabeledPoint(feature, label).
Although in the method ChiSquareTest.test, the second parameter is feature and the third parameter is label, it it better to write label in front of feature here because if an RDD is created using LabeldPoint, what we get are actually (label, feature) pairs.
Now it is changed as LabeledPoint(label, feature).

The comments in Scala and Java example have the same typos.

## How was this patch tested?

tested

https://issues.apache.org/jira/browse/SPARK-24688

Author: Weizhe Huang 492816239qq.com

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #21665 from uzmijnlm/my_change.

Authored-by: Huangweizhe <huangweizhe@bbdservice.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-08-25 09:24:20 -05:00