[SPARK-26640][CORE][ML][SQL][STREAMING][PYSPARK] Code cleanup from lgtm.com analysis

## What changes were proposed in this pull request?

Misc code cleanup from lgtm.com analysis. See comments below for details.

## How was this patch tested?

Existing tests.

Closes #23571 from srowen/SPARK-26640.

Lead-authored-by: Sean Owen <sean.owen@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
Sean Owen 2019-01-17 19:40:39 -06:00
parent 0b3abef195
commit c2d0d700b5
43 changed files with 71 additions and 128 deletions

View file

@ -114,7 +114,6 @@ $(document).ready(function () {
var endPoint = createRESTEndPointForExecutorsPage(appId);
$.getJSON(endPoint, function (response, status, jqXHR) {
var summary = [];
var allExecCnt = 0;
var allRDDBlocks = 0;
var allMemoryUsed = 0;
@ -505,7 +504,7 @@ $(document).ready(function () {
{data: 'allTotalTasks'},
{
data: function (row, type) {
return type === 'display' ? (formatDuration(row.allTotalDuration, type) + ' (' + formatDuration(row.allTotalGCTime, type) + ')') : row.allTotalDuration
return type === 'display' ? (formatDuration(row.allTotalDuration) + ' (' + formatDuration(row.allTotalGCTime) + ')') : row.allTotalDuration
},
"fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
if (oData.allTotalDuration > 0) {

View file

@ -103,12 +103,12 @@ $(document).ready(function() {
pageLength: 20
});
historySummary = $("#history-summary");
searchString = historySummary["context"]["location"]["search"];
requestedIncomplete = getParameterByName("showIncomplete", searchString);
var historySummary = $("#history-summary");
var searchString = historySummary["context"]["location"]["search"];
var requestedIncomplete = getParameterByName("showIncomplete", searchString);
requestedIncomplete = (requestedIncomplete == "true" ? true : false);
appParams = {
var appParams = {
limit: appLimit,
status: (requestedIncomplete ? "running" : "completed")
};
@ -116,7 +116,7 @@ $(document).ready(function() {
$.getJSON(uiRoot + "/api/v1/applications", appParams, function(response,status,jqXHR) {
var array = [];
var hasMultipleAttempts = false;
for (i in response) {
for (var i in response) {
var app = response[i];
if (app["attempts"][0]["completed"] == requestedIncomplete) {
continue; // if we want to show for Incomplete, we skip the completed apps; otherwise skip incomplete ones.
@ -127,7 +127,7 @@ $(document).ready(function() {
hasMultipleAttempts = true;
}
var num = app["attempts"].length;
for (j in app["attempts"]) {
for (var j in app["attempts"]) {
var attempt = app["attempts"][j];
attempt["startTime"] = formatTimeMillis(attempt["startTimeEpoch"]);
attempt["endTime"] = formatTimeMillis(attempt["endTimeEpoch"]);
@ -149,7 +149,7 @@ $(document).ready(function() {
"applications": array,
"hasMultipleAttempts": hasMultipleAttempts,
"showCompletedColumns": !requestedIncomplete,
}
};
$.get(uiRoot + "/static/historypage-template.html", function(template) {
var sibling = historySummary.prev();
@ -157,7 +157,7 @@ $(document).ready(function() {
var apps = $(Mustache.render($(template).filter("#history-summary-template").html(),data));
var attemptIdColumnName = 'attemptId';
var startedColumnName = 'started';
var defaultSortColumn = completedColumnName = 'completed';
var completedColumnName = 'completed';
var durationColumnName = 'duration';
var conf = {
"columns": [

View file

@ -220,7 +220,7 @@ function renderDagVizForJob(svgContainer) {
} else {
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
// Use the link from the stage table so it also works for the history server
var attemptId = 0
var attemptId = 0;
var stageLink = d3.select("#stage-" + stageId + "-" + attemptId)
.select("a.name-link")
.attr("href");
@ -236,7 +236,7 @@ function renderDagVizForJob(svgContainer) {
// existing ones, taking into account the position and width of the last stage's
// container. We do not need to do this for the first stage of this job.
if (i > 0) {
var existingStages = svgContainer.selectAll("g.cluster.stage")
var existingStages = svgContainer.selectAll("g.cluster.stage");
if (!existingStages.empty()) {
var lastStage = d3.select(existingStages[0].pop());
var lastStageWidth = toFloat(lastStage.select("rect").attr("width"));
@ -369,8 +369,8 @@ function resizeSvg(svg) {
* here this function is to enable line break.
*/
function interpretLineBreak(svg) {
var allTSpan = svg.selectAll("tspan").each(function() {
node = d3.select(this);
svg.selectAll("tspan").each(function() {
var node = d3.select(this);
var original = node[0][0].innerHTML;
if (original.indexOf("\\n") != -1) {
var arr = original.split("\\n");

View file

@ -263,7 +263,6 @@ function reselectCheckboxesBasedOnTaskTableState() {
function getStageAttemptId() {
var words = document.baseURI.split('?');
var attemptIdStr = words[1].split('&')[1];
var digitsRegex = /[0-9]+/;
// We are using regex here to extract the stage attempt id as there might be certain url's with format
// like /proxy/application_1539986433979_27115/stages/stage/?id=0&attempt=0#tasksTitle
@ -433,7 +432,7 @@ $(document).ready(function () {
"oLanguage": {
"sEmptyTable": "No data to show yet"
}
}
};
var executorSummaryTableSelector =
$("#summary-executor-table").DataTable(executorSummaryConf);
$('#parent-container [data-toggle="tooltip"]').tooltip();
@ -612,7 +611,7 @@ $(document).ready(function () {
"searching": false,
"order": [[0, "asc"]],
"bAutoWidth": false
}
};
$("#accumulator-table").DataTable(accumulatorConf);
// building tasks table that uses server side functionality

View file

@ -89,7 +89,7 @@ function onSearchStringChange() {
if($(this).attr('id') && $(this).attr('id').match(/thread_[0-9]+_tr/) ) {
var children = $(this).children()
var found = false
for (i = 0; i < children.length; i++) {
for (var i = 0; i < children.length; i++) {
if (children.eq(i).text().toLowerCase().indexOf(searchString) >= 0) {
found = true
}

View file

@ -170,7 +170,7 @@ function createRESTEndPointForExecutorsPage(appId) {
if (ind > 0) {
var appId = words[ind + 1];
var newBaseURI = words.slice(0, ind + 2).join('/');
return newBaseURI + "/api/v1/applications/" + appId + "/allexecutors"
return newBaseURI + "/api/v1/applications/" + appId + "/allexecutors";
}
ind = words.indexOf("history");
if (ind > 0) {

View file

@ -33,7 +33,7 @@ function collapseTable(thisName, table){
var status = window.localStorage.getItem(thisName) == "true";
status = !status;
thisClass = '.' + thisName
var thisClass = '.' + thisName
// Expand the list of additional metrics.
var tableDiv = $(thisClass).parent().find('.' + table);

View file

@ -18,7 +18,6 @@
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.ml.param import Params
from pyspark.mllib.linalg import *
import sys

View file

@ -29,7 +29,7 @@ from collections import namedtuple
from sparktestsupport import SPARK_HOME, USER_HOME, ERROR_CODES
from sparktestsupport.shellutils import exit_from_command_with_retcode, run_cmd, rm_r, which
from sparktestsupport.toposort import toposort_flatten, toposort
from sparktestsupport.toposort import toposort_flatten
import sparktestsupport.modules as modules
@ -153,30 +153,6 @@ def determine_java_executable():
return java_exe if java_exe else which("java")
JavaVersion = namedtuple('JavaVersion', ['major', 'minor', 'patch'])
def determine_java_version(java_exe):
"""Given a valid java executable will return its version in named tuple format
with accessors '.major', '.minor', '.patch', '.update'"""
raw_output = subprocess.check_output([java_exe, "-version"],
stderr=subprocess.STDOUT,
universal_newlines=True)
raw_output_lines = raw_output.split('\n')
# find raw version string, eg 'java version "1.8.0_25"'
raw_version_str = next(x for x in raw_output_lines if " version " in x)
match = re.search(r'(\d+)\.(\d+)\.(\d+)', raw_version_str)
major = int(match.group(1))
minor = int(match.group(2))
patch = int(match.group(3))
return JavaVersion(major, minor, patch)
# -------------------------------------------------------------------------------------------------
# Functions for running the other build and test scripts
# -------------------------------------------------------------------------------------------------
@ -443,7 +419,6 @@ def run_python_packaging_tests():
def run_build_tests():
set_title_and_block("Running build tests", "BLOCK_BUILD_TESTS")
run_cmd([os.path.join(SPARK_HOME, "dev", "test-dependencies.sh")])
pass
def run_sparkr_tests():
@ -495,8 +470,6 @@ def main():
" install one and retry.")
sys.exit(2)
java_version = determine_java_version(java_exe)
# install SparkR
if which("R"):
run_cmd([os.path.join(SPARK_HOME, "R", "install-dev.sh")])

View file

@ -50,7 +50,7 @@ public class JavaRandomForestClassificationExample {
// Empty categoricalFeaturesInfo indicates all features are continuous.
int numClasses = 2;
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
Integer numTrees = 3; // Use more in practice.
int numTrees = 3; // Use more in practice.
String featureSubsetStrategy = "auto"; // Let the algorithm choose.
String impurity = "gini";
int maxDepth = 5;

View file

@ -23,7 +23,7 @@ from numpy import array
from pyspark import SparkContext
# $example on$
from pyspark.mllib.clustering import BisectingKMeans, BisectingKMeansModel
from pyspark.mllib.clustering import BisectingKMeans
# $example off$
if __name__ == "__main__":

View file

@ -23,7 +23,7 @@ from __future__ import print_function
from pyspark import SparkContext
# $example on$
import math
from pyspark.mllib.regression import LabeledPoint, IsotonicRegression, IsotonicRegressionModel
from pyspark.mllib.regression import IsotonicRegression, IsotonicRegressionModel
from pyspark.mllib.util import MLUtils
# $example off$

View file

@ -45,9 +45,9 @@ if __name__ == "__main__":
metrics = MulticlassMetrics(predictionAndLabels)
# Overall statistics
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
precision = metrics.precision(1.0)
recall = metrics.recall(1.0)
f1Score = metrics.fMeasure(1.0)
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)

View file

@ -17,7 +17,7 @@
# $example on$
from pyspark.mllib.recommendation import ALS, Rating
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.mllib.evaluation import RegressionMetrics
# $example off$
from pyspark import SparkContext

View file

@ -19,7 +19,7 @@ from __future__ import print_function
from pyspark import SparkContext
# $example on$
from pyspark.mllib.feature import StandardScaler, StandardScalerModel
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.util import MLUtils
# $example off$

View file

@ -23,7 +23,7 @@ Run with:
from __future__ import print_function
# $example on:spark_hive$
from os.path import expanduser, join, abspath
from os.path import join, abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row

View file

@ -455,6 +455,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value);
break;
case CONF:
checkArgument(value != null, "Missing argument to %s", CONF);
String[] setConf = value.split("=", 2);
checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
conf.put(setConf[0], setConf[1]);

View file

@ -17,7 +17,6 @@
import gc
import os
import socket
import sys
from tempfile import NamedTemporaryFile
import threading

View file

@ -439,7 +439,6 @@ class SparkContext(object):
' been killed or may also be in a zombie state.',
RuntimeWarning
)
pass
finally:
self._jsc = None
if getattr(self, "_accumulatorServer", None):

View file

@ -18,7 +18,6 @@
import atexit
import os
import sys
import select
import signal
import shlex
import shutil
@ -174,7 +173,6 @@ def local_connect_and_auth(port, auth_secret):
errors.append("tried to connect to %s, but an error occured: %s" % (sa, emsg))
sock.close()
sock = None
else:
raise Exception("could not open socket: %s" % errors)

View file

@ -23,7 +23,7 @@ from pyspark import since, keyword_only
from pyspark.ml import Estimator, Model
from pyspark.ml.param.shared import *
from pyspark.ml.regression import DecisionTreeModel, DecisionTreeRegressionModel, \
GBTParams, HasVarianceImpurity, RandomForestParams, TreeEnsembleModel, TreeEnsembleParams
GBTParams, HasVarianceImpurity, RandomForestParams, TreeEnsembleModel
from pyspark.ml.util import *
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams
from pyspark.ml.wrapper import JavaWrapper

View file

@ -18,7 +18,7 @@
from pyspark import keyword_only, since
from pyspark.sql import DataFrame
from pyspark.ml.util import *
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, _jvm
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams
from pyspark.ml.param.shared import *
__all__ = ["FPGrowth", "FPGrowthModel", "PrefixSpan"]

View file

@ -16,7 +16,6 @@
#
import sys
import warnings
from pyspark import since, keyword_only
from pyspark.ml.param.shared import *

View file

@ -20,12 +20,10 @@ import sys
import warnings
import numpy
from numpy import array
from pyspark import RDD, since
from pyspark.streaming import DStream
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py
from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector
from pyspark.mllib.linalg import SparseVector, _convert_to_vector
from pyspark.mllib.regression import (
LabeledPoint, LinearModel, _regression_train_wrapper,
StreamingLinearAlgorithm)
@ -126,9 +124,9 @@ class LogisticRegressionModel(LinearClassificationModel):
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data), iterations=10)
>>> lrm.predict(array([0.0, 1.0]))
>>> lrm.predict(numpy.array([0.0, 1.0]))
1
>>> lrm.predict(array([1.0, 0.0]))
>>> lrm.predict(numpy.array([1.0, 0.0]))
0
>>> lrm.predict(SparseVector(2, {1: 1.0}))
1
@ -138,7 +136,7 @@ class LogisticRegressionModel(LinearClassificationModel):
>>> path = tempfile.mkdtemp()
>>> lrm.save(sc, path)
>>> sameModel = LogisticRegressionModel.load(sc, path)
>>> sameModel.predict(array([0.0, 1.0]))
>>> sameModel.predict(numpy.array([0.0, 1.0]))
1
>>> sameModel.predict(SparseVector(2, {0: 1.0}))
0
@ -424,7 +422,7 @@ class SVMModel(LinearClassificationModel):
>>> svm.predict(sc.parallelize([[1.0]])).collect()
[1]
>>> svm.clearThreshold()
>>> svm.predict(array([1.0]))
>>> svm.predict(numpy.array([1.0]))
1.44...
>>> sparse_data = [
@ -577,9 +575,9 @@ class NaiveBayesModel(Saveable, Loader):
... LabeledPoint(1.0, [1.0, 0.0]),
... ]
>>> model = NaiveBayes.train(sc.parallelize(data))
>>> model.predict(array([0.0, 1.0]))
>>> model.predict(numpy.array([0.0, 1.0]))
0.0
>>> model.predict(array([1.0, 0.0]))
>>> model.predict(numpy.array([1.0, 0.0]))
1.0
>>> model.predict(sc.parallelize([[1.0, 0.0]])).collect()
[1.0]

View file

@ -33,7 +33,6 @@ from pyspark import SparkContext, since
from pyspark.rdd import RDD, ignore_unicode_prefix
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py
from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat.distribution import MultivariateGaussian
from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable
from pyspark.streaming import DStream

View file

@ -16,12 +16,11 @@
#
import sys
import warnings
from pyspark import since
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc
from pyspark.sql import SQLContext
from pyspark.sql.types import StructField, StructType, DoubleType, IntegerType, ArrayType
from pyspark.sql.types import StructField, StructType, DoubleType
__all__ = ['BinaryClassificationMetrics', 'RegressionMetrics',
'MulticlassMetrics', 'RankingMetrics']

View file

@ -22,8 +22,6 @@ from __future__ import absolute_import
import sys
import warnings
import random
import binascii
if sys.version >= '3':
basestring = str
unicode = str

View file

@ -17,11 +17,9 @@
import sys
import numpy
from numpy import array
from collections import namedtuple
from pyspark import SparkContext, since
from pyspark import since
from pyspark.rdd import ignore_unicode_prefix
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc
from pyspark.mllib.util import JavaSaveable, JavaLoader, inherit_doc

View file

@ -19,12 +19,11 @@ import sys
import warnings
import numpy as np
from numpy import array
from pyspark import RDD, since
from pyspark.streaming.dstream import DStream
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector
from pyspark.mllib.linalg import SparseVector, _convert_to_vector
from pyspark.mllib.util import Saveable, Loader
__all__ = ['LabeledPoint', 'LinearModel',
@ -168,15 +167,15 @@ class LinearRegressionModel(LinearRegressionModelBase):
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
... initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
... initialWeights=np.array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
... miniBatchFraction=1.0, initialWeights=array([1.0]), regParam=0.1, regType="l2",
... miniBatchFraction=1.0, initialWeights=np.array([1.0]), regParam=0.1, regType="l2",
... intercept=True, validateData=True)
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
@ -305,7 +304,8 @@ class LassoModel(LinearRegressionModelBase):
... LabeledPoint(3.0, [2.0]),
... LabeledPoint(2.0, [3.0])
... ]
>>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=10, initialWeights=array([1.0]))
>>> lrm = LassoWithSGD.train(
... sc.parallelize(data), iterations=10, initialWeights=np.array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
@ -336,13 +336,13 @@ class LassoModel(LinearRegressionModelBase):
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
... initialWeights=array([1.0]))
... initialWeights=np.array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
... regParam=0.01, miniBatchFraction=1.0, initialWeights=array([1.0]), intercept=True,
... regParam=0.01, miniBatchFraction=1.0, initialWeights=np.array([1.0]), intercept=True,
... validateData=True)
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
@ -449,7 +449,7 @@ class RidgeRegressionModel(LinearRegressionModelBase):
... LabeledPoint(2.0, [3.0])
... ]
>>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=10,
... initialWeights=array([1.0]))
... initialWeights=np.array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
@ -480,13 +480,13 @@ class RidgeRegressionModel(LinearRegressionModelBase):
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
... initialWeights=array([1.0]))
... initialWeights=np.array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
... regParam=0.01, miniBatchFraction=1.0, initialWeights=array([1.0]), intercept=True,
... regParam=0.01, miniBatchFraction=1.0, initialWeights=np.array([1.0]), intercept=True,
... validateData=True)
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True

View file

@ -20,7 +20,7 @@ from __future__ import absolute_import
import sys
import random
from pyspark import SparkContext, RDD, since
from pyspark import RDD, since
from pyspark.mllib.common import callMLlibFunc, inherit_doc, JavaModelWrapper
from pyspark.mllib.linalg import _convert_to_vector
from pyspark.mllib.regression import LabeledPoint

View file

@ -17,7 +17,6 @@
import sys
import numpy as np
import warnings
if sys.version > '3':
xrange = range
@ -420,7 +419,7 @@ class Loader(object):
was saved.
:return: model instance
"""
raise NotImplemented
raise NotImplementedError
@inherit_doc

View file

@ -104,11 +104,11 @@ class Profiler(object):
def profile(self, func):
""" Do profiling on the function `func`"""
raise NotImplemented
raise NotImplementedError
def stats(self):
""" Return the collected profiling stats (pstats.Stats)"""
raise NotImplemented
raise NotImplementedError
def show(self, id):
""" Print the profile stats to stdout, id is the RDD id """

View file

@ -25,7 +25,6 @@ import warnings
import heapq
import bisect
import random
import socket
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from threading import Thread
@ -42,8 +41,7 @@ else:
from pyspark.java_gateway import local_connect_and_auth
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
PickleSerializer, pack_long, AutoBatchedSerializer, write_with_length, \
UTF8Deserializer
PickleSerializer, pack_long, AutoBatchedSerializer
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_full_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
@ -53,7 +51,7 @@ from pyspark.resultiterable import ResultIterable
from pyspark.shuffle import Aggregator, ExternalMerger, \
get_used_memory, ExternalSorter, ExternalGroupBy
from pyspark.traceback_utils import SCCallSiteSync
from pyspark.util import fail_on_stopiteration, _exception_message
from pyspark.util import fail_on_stopiteration
__all__ = ["RDD"]

View file

@ -537,10 +537,8 @@ def _test():
globs['df'] = rdd.toDF()
jsonStrings = [
'{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
'{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},'
'"field6":[{"field7": "row2"}]}',
'{"field1" : null, "field2": "row3", '
'"field3":{"field4":33, "field5": []}}'
'{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},"field6":[{"field7": "row2"}]}',
'{"field1" : null, "field2": "row3", "field3":{"field4":33, "field5": []}}'
]
globs['jsonStrings'] = jsonStrings
globs['json'] = sc.parallelize(jsonStrings)

View file

@ -1018,13 +1018,11 @@ def _infer_type(obj):
for key, value in obj.items():
if key is not None and value is not None:
return MapType(_infer_type(key), _infer_type(value), True)
else:
return MapType(NullType(), NullType(), True)
elif isinstance(obj, list):
for v in obj:
if v is not None:
return ArrayType(_infer_type(obj[0]), True)
else:
return ArrayType(NullType(), True)
elif isinstance(obj, array):
if obj.typecode in _array_type_mappings:

View file

@ -17,10 +17,7 @@
from __future__ import print_function
import os
import sys
from py4j.java_gateway import java_import, JavaObject
from py4j.java_gateway import java_import
from pyspark import RDD, SparkConf
from pyspark.serializers import NoOpSerializer, UTF8Deserializer, CloudPickleSerializer

View file

@ -15,9 +15,7 @@
# limitations under the License.
#
from py4j.protocol import Py4JJavaError
from pyspark.serializers import PairDeserializer, NoOpSerializer
from pyspark.serializers import NoOpSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.streaming import DStream

View file

@ -16,7 +16,6 @@
#
from __future__ import print_function
import socket
from pyspark.java_gateway import local_connect_and_auth
from pyspark.serializers import write_int, UTF8Deserializer

View file

@ -28,7 +28,6 @@ try:
import resource
except ImportError:
has_resource_module = False
import socket
import traceback
from pyspark.accumulators import _accumulatorRegistry

View file

@ -33,7 +33,6 @@ if sys.version < '3':
import Queue
else:
import queue as Queue
from distutils.version import LooseVersion
from multiprocessing import Manager

View file

@ -20,7 +20,7 @@ from __future__ import print_function
import glob
import os
import sys
from setuptools import setup, find_packages
from setuptools import setup
from shutil import copyfile, copytree, rmtree
if sys.version_info < (2, 7):

View file

@ -42,7 +42,7 @@ function renderPlanViz() {
setupTooltipForSparkPlanNode(i);
}
resizeSvg(svg)
resizeSvg(svg);
}
/* -------------------- *

View file

@ -129,7 +129,7 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) {
svg.append("g")
.attr("class", "x axis")
.attr("transform", "translate(0," + height + ")")
.call(xAxis)
.call(xAxis);
svg.append("g")
.attr("class", "y axis")
@ -198,7 +198,7 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) {
lastClickedBatch = null;
}
lastClickedBatch = d.x;
highlightBatchRow(lastClickedBatch)
highlightBatchRow(lastClickedBatch);
lastTimeout = window.setTimeout(function () {
lastTimeout = null;
if (lastClickedBatch != null) {
@ -261,9 +261,9 @@ function drawHistogram(id, values, minY, maxY, unitY, batchInterval) {
svg.append("g")
.attr("class", "y axis")
.call(yAxis)
.call(yAxis);
var bar = svg.selectAll(".bar")
svg.selectAll(".bar")
.data(data)
.enter()
.append("g")