spark-instrumented-optimizer/python/pyspark/tests/test_broadcast.py
HyukjinKwon 7c05f61514 [SPARK-28130][PYTHON] Print pretty messages for skipped tests when xmlrunner is available in PySpark
## What changes were proposed in this pull request?

Currently, pretty skipped message added by f7435bec6a mechanism seems not working when xmlrunner is installed apparently.

This PR fixes two things:

1. When `xmlrunner` is installed, seems `xmlrunner` does not respect `vervosity` level in unittests (default is level 1).

    So the output looks as below

    ```
    Running tests...
     ----------------------------------------------------------------------
    SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS
    ----------------------------------------------------------------------
    ```

    So it is not caught by our message detection mechanism.

2. If we manually set the `vervocity` level to `xmlrunner`, it prints messages as below:

    ```
    test_mixed_udf (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... SKIP (0.000s)
    test_mixed_udf_and_sql (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... SKIP (0.000s)
    ...
    ```

    This is different in our Jenkins machine:

    ```
    test_createDataFrame_column_name_encoding (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.23.2 must be installed; however, it was not found.'
    test_createDataFrame_does_not_modify_input (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.23.2 must be installed; however, it was not found.'
    ...
    ```

    Note that last `SKIP` is different. This PR fixes the regular expression to catch `SKIP` case as well.

## How was this patch tested?

Manually tested.

**Before:**

```
Starting test(python2.7): pyspark....
Finished test(python2.7): pyspark.... (0s)
...
Tests passed in 562 seconds

========================================================================
...
```

**After:**

```
Starting test(python2.7): pyspark....
Finished test(python2.7): pyspark.... (48s) ... 93 tests were skipped
...
Tests passed in 560 seconds

Skipped tests pyspark.... with python2.7:
      pyspark...(...) ... SKIP (0.000s)
...

========================================================================
...
```

Closes #24927 from HyukjinKwon/SPARK-28130.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-24 09:58:17 +09:00

138 lines
5.2 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import random
import tempfile
import unittest
from pyspark import SparkConf, SparkContext
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import ChunkedStream
class BroadcastTest(unittest.TestCase):
def tearDown(self):
if getattr(self, "sc", None) is not None:
self.sc.stop()
self.sc = None
def _test_encryption_helper(self, vs):
"""
Creates a broadcast variables for each value in vs, and runs a simple job to make sure the
value is the same when it's read in the executors. Also makes sure there are no task
failures.
"""
bs = [self.sc.broadcast(value=v) for v in vs]
exec_values = self.sc.parallelize(range(2)).map(lambda x: [b.value for b in bs]).collect()
for ev in exec_values:
self.assertEqual(ev, vs)
# make sure there are no task failures
status = self.sc.statusTracker()
for jid in status.getJobIdsForGroup():
for sid in status.getJobInfo(jid).stageIds:
stage_info = status.getStageInfo(sid)
self.assertEqual(0, stage_info.numFailedTasks)
def _test_multiple_broadcasts(self, *extra_confs):
"""
Test broadcast variables make it OK to the executors. Tests multiple broadcast variables,
and also multiple jobs.
"""
conf = SparkConf()
for key, value in extra_confs:
conf.set(key, value)
conf.setMaster("local-cluster[2,1,1024]")
self.sc = SparkContext(conf=conf)
self._test_encryption_helper([5])
self._test_encryption_helper([5, 10, 20])
def test_broadcast_with_encryption(self):
self._test_multiple_broadcasts(("spark.io.encryption.enabled", "true"))
def test_broadcast_no_encryption(self):
self._test_multiple_broadcasts()
def _test_broadcast_on_driver(self, *extra_confs):
conf = SparkConf()
for key, value in extra_confs:
conf.set(key, value)
conf.setMaster("local-cluster[2,1,1024]")
self.sc = SparkContext(conf=conf)
bs = self.sc.broadcast(value=5)
self.assertEqual(5, bs.value)
def test_broadcast_value_driver_no_encryption(self):
self._test_broadcast_on_driver()
def test_broadcast_value_driver_encryption(self):
self._test_broadcast_on_driver(("spark.io.encryption.enabled", "true"))
class BroadcastFrameProtocolTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
gateway = launch_gateway(SparkConf())
cls._jvm = gateway.jvm
cls.longMessage = True
random.seed(42)
def _test_chunked_stream(self, data, py_buf_size):
# write data using the chunked protocol from python.
chunked_file = tempfile.NamedTemporaryFile(delete=False)
dechunked_file = tempfile.NamedTemporaryFile(delete=False)
dechunked_file.close()
try:
out = ChunkedStream(chunked_file, py_buf_size)
out.write(data)
out.close()
# now try to read it in java
jin = self._jvm.java.io.FileInputStream(chunked_file.name)
jout = self._jvm.java.io.FileOutputStream(dechunked_file.name)
self._jvm.DechunkedInputStream.dechunkAndCopyToOutput(jin, jout)
# java should have decoded it back to the original data
self.assertEqual(len(data), os.stat(dechunked_file.name).st_size)
with open(dechunked_file.name, "rb") as f:
byte = f.read(1)
idx = 0
while byte:
self.assertEqual(data[idx], bytearray(byte)[0], msg="idx = " + str(idx))
byte = f.read(1)
idx += 1
finally:
os.unlink(chunked_file.name)
os.unlink(dechunked_file.name)
def test_chunked_stream(self):
def random_bytes(n):
return bytearray(random.getrandbits(8) for _ in range(n))
for data_length in [1, 10, 100, 10000]:
for buffer_length in [1, 2, 5, 8192]:
self._test_chunked_stream(random_bytes(data_length), buffer_length)
if __name__ == '__main__':
from pyspark.tests.test_broadcast import *
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)