[SPARK-23698][PYTHON] Resolve undefined names in Python 3

## What changes were proposed in this pull request?

Fix issues arising from the fact that builtins __file__, __long__, __raw_input()__, __unicode__, __xrange()__, etc. were all removed from Python 3.  __Undefined names__ have the potential to raise [NameError](https://docs.python.org/3/library/exceptions.html#NameError) at runtime.

## How was this patch tested?
* $ __python2 -m flake8 . --count --select=E9,F82 --show-source --statistics__
* $ __python3 -m flake8 . --count --select=E9,F82 --show-source --statistics__

holdenk

flake8 testing of https://github.com/apache/spark on Python 3.6.3

$ __python3 -m flake8 . --count --select=E901,E999,F821,F822,F823 --show-source --statistics__
```
./dev/merge_spark_pr.py:98:14: F821 undefined name 'raw_input'
    result = raw_input("\n%s (y/n): " % prompt)
             ^
./dev/merge_spark_pr.py:136:22: F821 undefined name 'raw_input'
    primary_author = raw_input(
                     ^
./dev/merge_spark_pr.py:186:16: F821 undefined name 'raw_input'
    pick_ref = raw_input("Enter a branch name [%s]: " % default_branch)
               ^
./dev/merge_spark_pr.py:233:15: F821 undefined name 'raw_input'
    jira_id = raw_input("Enter a JIRA id [%s]: " % default_jira_id)
              ^
./dev/merge_spark_pr.py:278:20: F821 undefined name 'raw_input'
    fix_versions = raw_input("Enter comma-separated fix version(s) [%s]: " % default_fix_versions)
                   ^
./dev/merge_spark_pr.py:317:28: F821 undefined name 'raw_input'
            raw_assignee = raw_input(
                           ^
./dev/merge_spark_pr.py:430:14: F821 undefined name 'raw_input'
    pr_num = raw_input("Which pull request would you like to merge? (e.g. 34): ")
             ^
./dev/merge_spark_pr.py:442:18: F821 undefined name 'raw_input'
        result = raw_input("Would you like to use the modified title? (y/n): ")
                 ^
./dev/merge_spark_pr.py:493:11: F821 undefined name 'raw_input'
    while raw_input("\n%s (y/n): " % pick_prompt).lower() == "y":
          ^
./dev/create-release/releaseutils.py:58:16: F821 undefined name 'raw_input'
    response = raw_input("%s [y/n]: " % msg)
               ^
./dev/create-release/releaseutils.py:152:38: F821 undefined name 'unicode'
        author = unidecode.unidecode(unicode(author, "UTF-8")).strip()
                                     ^
./python/setup.py:37:11: F821 undefined name '__version__'
VERSION = __version__
          ^
./python/pyspark/cloudpickle.py:275:18: F821 undefined name 'buffer'
        dispatch[buffer] = save_buffer
                 ^
./python/pyspark/cloudpickle.py:807:18: F821 undefined name 'file'
        dispatch[file] = save_file
                 ^
./python/pyspark/sql/conf.py:61:61: F821 undefined name 'unicode'
        if not isinstance(obj, str) and not isinstance(obj, unicode):
                                                            ^
./python/pyspark/sql/streaming.py:25:21: F821 undefined name 'long'
    intlike = (int, long)
                    ^
./python/pyspark/streaming/dstream.py:405:35: F821 undefined name 'long'
        return self._sc._jvm.Time(long(timestamp * 1000))
                                  ^
./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:21:10: F821 undefined name 'xrange'
for i in xrange(50):
         ^
./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:22:14: F821 undefined name 'xrange'
    for j in xrange(5):
             ^
./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:23:18: F821 undefined name 'xrange'
        for k in xrange(20022):
                 ^
20    F821 undefined name 'raw_input'
20
```

Closes #20838 from cclauss/fix-undefined-names.

Authored-by: cclauss <cclauss@bluewin.ch>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
This commit is contained in:
cclauss 2018-08-22 10:06:59 -07:00 committed by Bryan Cutler
parent e754887182
commit 71f38ac242
7 changed files with 50 additions and 9 deletions

View file

@ -50,7 +50,7 @@ except ImportError:
sys.exit(-1)
if sys.version < '3':
input = raw_input
input = raw_input # noqa
# Contributors list file name
contributors_file_name = "contributors.txt"
@ -152,7 +152,11 @@ def get_commits(tag):
if not is_valid_author(author):
author = github_username
# Guard against special characters
author = unidecode.unidecode(unicode(author, "UTF-8")).strip()
try: # Python 2
author = unicode(author, "UTF-8")
except NameError: # Python 3
author = str(author)
author = unidecode.unidecode(author).strip()
commit = Commit(_hash, author, title, pr_number)
commits.append(commit)
return commits

View file

@ -40,7 +40,7 @@ except ImportError:
JIRA_IMPORTED = False
if sys.version < '3':
input = raw_input
input = raw_input # noqa
# Location of your Spark git development area
SPARK_HOME = os.environ.get("SPARK_HOME", os.getcwd())

View file

@ -20,6 +20,9 @@ import sys
from pyspark import since, _NoValue
from pyspark.rdd import ignore_unicode_prefix
if sys.version_info[0] >= 3:
basestring = str
class RuntimeConfig(object):
"""User-facing configuration API, accessible through `SparkSession.conf`.
@ -59,7 +62,7 @@ class RuntimeConfig(object):
def _checkType(self, obj, identifier):
"""Assert that an object is of type str."""
if not isinstance(obj, str) and not isinstance(obj, unicode):
if not isinstance(obj, basestring):
raise TypeError("expected %s '%s' to be a string (was '%s')" %
(identifier, obj, type(obj).__name__))

View file

@ -19,10 +19,7 @@ import sys
import json
if sys.version >= '3':
intlike = int
basestring = unicode = str
else:
intlike = (int, long)
basestring = str
from py4j.java_gateway import java_import

View file

@ -23,6 +23,8 @@ from datetime import datetime
if sys.version < "3":
from itertools import imap as map, ifilter as filter
else:
long = int
from py4j.protocol import Py4JJavaError

View file

@ -179,7 +179,7 @@ class BasicOperationTests(PySparkStreamingTestCase):
self._test_func(input, func, expected)
def test_flatMap(self):
"""Basic operation test for DStream.faltMap."""
"""Basic operation test for DStream.flatMap."""
input = [range(1, 5), range(5, 9), range(9, 13)]
def func(dstream):
@ -206,6 +206,38 @@ class BasicOperationTests(PySparkStreamingTestCase):
expected = [[len(x)] for x in input]
self._test_func(input, func, expected)
def test_slice(self):
"""Basic operation test for DStream.slice."""
import datetime as dt
self.ssc = StreamingContext(self.sc, 1.0)
self.ssc.remember(4.0)
input = [[1], [2], [3], [4]]
stream = self.ssc.queueStream([self.sc.parallelize(d, 1) for d in input])
time_vals = []
def get_times(t, rdd):
if rdd and len(time_vals) < len(input):
time_vals.append(t)
stream.foreachRDD(get_times)
self.ssc.start()
self.wait_for(time_vals, 4)
begin_time = time_vals[0]
def get_sliced(begin_delta, end_delta):
begin = begin_time + dt.timedelta(seconds=begin_delta)
end = begin_time + dt.timedelta(seconds=end_delta)
rdds = stream.slice(begin, end)
result_list = [rdd.collect() for rdd in rdds]
return [r for result in result_list for r in result]
self.assertEqual(set([1]), set(get_sliced(0, 0)))
self.assertEqual(set([2, 3]), set(get_sliced(1, 2)))
self.assertEqual(set([2, 3, 4]), set(get_sliced(1, 4)))
self.assertEqual(set([1, 2, 3, 4]), set(get_sliced(0, 4)))
def test_reduce(self):
"""Basic operation test for DStream.reduce."""
input = [range(1, 5), range(5, 9), range(9, 13)]

View file

@ -18,6 +18,9 @@
#
import sys
if sys.version_info[0] >= 3:
xrange = range
for i in xrange(50):
for j in xrange(5):
for k in xrange(20022):