[SPARK-14355][BUILD] Fix typos in Exception/Testcase/Comments and static analysis results

## What changes were proposed in this pull request?

This PR contains the following 5 types of maintenance fix over 59 files (+94 lines, -93 lines).
- Fix typos(exception/log strings, testcase name, comments) in 44 lines.
- Fix lint-java errors (MaxLineLength) in 6 lines. (New codes after SPARK-14011)
- Use diamond operators in 40 lines. (New codes after SPARK-13702)
- Fix redundant semicolon in 5 lines.
- Rename class `InferSchemaSuite` to `CSVInferSchemaSuite` in CSVInferSchemaSuite.scala.

## How was this patch tested?

Manual and pass the Jenkins tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #12139 from dongjoon-hyun/SPARK-14355.
This commit is contained in:
Dongjoon Hyun 2016-04-03 18:14:16 -07:00 committed by Reynold Xin
parent 9023015f05
commit 3f749f7ed4
59 changed files with 94 additions and 93 deletions

View file

@ -94,7 +94,7 @@ public class TransportClientFactory implements Closeable {
this.context = Preconditions.checkNotNull(context); this.context = Preconditions.checkNotNull(context);
this.conf = context.getConf(); this.conf = context.getConf();
this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps)); this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));
this.connectionPool = new ConcurrentHashMap<SocketAddress, ClientPool>(); this.connectionPool = new ConcurrentHashMap<>();
this.numConnectionsPerPeer = conf.numConnectionsPerPeer(); this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
this.rand = new Random(); this.rand = new Random();

View file

@ -64,9 +64,9 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
public TransportResponseHandler(Channel channel) { public TransportResponseHandler(Channel channel) {
this.channel = channel; this.channel = channel;
this.outstandingFetches = new ConcurrentHashMap<StreamChunkId, ChunkReceivedCallback>(); this.outstandingFetches = new ConcurrentHashMap<>();
this.outstandingRpcs = new ConcurrentHashMap<Long, RpcResponseCallback>(); this.outstandingRpcs = new ConcurrentHashMap<>();
this.streamCallbacks = new ConcurrentLinkedQueue<StreamCallback>(); this.streamCallbacks = new ConcurrentLinkedQueue<>();
this.timeOfLastRequestNs = new AtomicLong(0); this.timeOfLastRequestNs = new AtomicLong(0);
} }

View file

@ -63,7 +63,7 @@ public class OneForOneStreamManager extends StreamManager {
// For debugging purposes, start with a random stream id to help identifying different streams. // For debugging purposes, start with a random stream id to help identifying different streams.
// This does not need to be globally unique, only unique to this class. // This does not need to be globally unique, only unique to this class.
nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000); nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000);
streams = new ConcurrentHashMap<Long, StreamState>(); streams = new ConcurrentHashMap<>();
} }
@Override @Override

View file

@ -37,7 +37,7 @@ public class ShuffleSecretManager implements SecretKeyHolder {
private static final String SPARK_SASL_USER = "sparkSaslUser"; private static final String SPARK_SASL_USER = "sparkSaslUser";
public ShuffleSecretManager() { public ShuffleSecretManager() {
shuffleSecretMap = new ConcurrentHashMap<String, String>(); shuffleSecretMap = new ConcurrentHashMap<>();
} }
/** /**

View file

@ -45,7 +45,7 @@ final class UnsafeSorterSpillMerger {
} }
} }
}; };
priorityQueue = new PriorityQueue<UnsafeSorterIterator>(numSpills, comparator); priorityQueue = new PriorityQueue<>(numSpills, comparator);
} }
/** /**

View file

@ -182,7 +182,7 @@ private[spark] class RRunner[U](
} }
stream.flush() stream.flush()
} catch { } catch {
// TODO: We should propogate this error to the task thread // TODO: We should propagate this error to the task thread
case e: Exception => case e: Exception =>
logError("R Writer thread got an exception", e) logError("R Writer thread got an exception", e)
} finally { } finally {

View file

@ -326,7 +326,7 @@ class GapSamplingReplacement(
/** /**
* Skip elements with replication factor zero (i.e. elements that won't be sampled). * Skip elements with replication factor zero (i.e. elements that won't be sampled).
* Samples 'k' from geometric distribution P(k) = (1-q)(q)^k, where q = e^(-f), that is * Samples 'k' from geometric distribution P(k) = (1-q)(q)^k, where q = e^(-f), that is
* q is the probabililty of Poisson(0; f) * q is the probability of Poisson(0; f)
*/ */
private def advance(): Unit = { private def advance(): Unit = {
val u = math.max(rng.nextDouble(), epsilon) val u = math.max(rng.nextDouble(), epsilon)

View file

@ -170,11 +170,11 @@ public class UnsafeShuffleWriterSuite {
private UnsafeShuffleWriter<Object, Object> createWriter( private UnsafeShuffleWriter<Object, Object> createWriter(
boolean transferToEnabled) throws IOException { boolean transferToEnabled) throws IOException {
conf.set("spark.file.transferTo", String.valueOf(transferToEnabled)); conf.set("spark.file.transferTo", String.valueOf(transferToEnabled));
return new UnsafeShuffleWriter<Object, Object>( return new UnsafeShuffleWriter<>(
blockManager, blockManager,
shuffleBlockResolver, shuffleBlockResolver,
taskMemoryManager, taskMemoryManager,
new SerializedShuffleHandle<Object, Object>(0, 1, shuffleDep), new SerializedShuffleHandle<>(0, 1, shuffleDep),
0, // map id 0, // map id
taskContext, taskContext,
conf conf

View file

@ -82,10 +82,10 @@ public final class JavaLogQuery {
String user = m.group(3); String user = m.group(3);
String query = m.group(5); String query = m.group(5);
if (!user.equalsIgnoreCase("-")) { if (!user.equalsIgnoreCase("-")) {
return new Tuple3<String, String, String>(ip, user, query); return new Tuple3<>(ip, user, query);
} }
} }
return new Tuple3<String, String, String>(null, null, null); return new Tuple3<>(null, null, null);
} }
public static Stats extractStats(String line) { public static Stats extractStats(String line) {

View file

@ -34,13 +34,13 @@ public class JavaMultiLabelClassificationMetricsExample {
JavaSparkContext sc = new JavaSparkContext(conf); JavaSparkContext sc = new JavaSparkContext(conf);
// $example on$ // $example on$
List<Tuple2<double[], double[]>> data = Arrays.asList( List<Tuple2<double[], double[]>> data = Arrays.asList(
new Tuple2<double[], double[]>(new double[]{0.0, 1.0}, new double[]{0.0, 2.0}), new Tuple2<>(new double[]{0.0, 1.0}, new double[]{0.0, 2.0}),
new Tuple2<double[], double[]>(new double[]{0.0, 2.0}, new double[]{0.0, 1.0}), new Tuple2<>(new double[]{0.0, 2.0}, new double[]{0.0, 1.0}),
new Tuple2<double[], double[]>(new double[]{}, new double[]{0.0}), new Tuple2<>(new double[]{}, new double[]{0.0}),
new Tuple2<double[], double[]>(new double[]{2.0}, new double[]{2.0}), new Tuple2<>(new double[]{2.0}, new double[]{2.0}),
new Tuple2<double[], double[]>(new double[]{2.0, 0.0}, new double[]{2.0, 0.0}), new Tuple2<>(new double[]{2.0, 0.0}, new double[]{2.0, 0.0}),
new Tuple2<double[], double[]>(new double[]{0.0, 1.0, 2.0}, new double[]{0.0, 1.0}), new Tuple2<>(new double[]{0.0, 1.0, 2.0}, new double[]{0.0, 1.0}),
new Tuple2<double[], double[]>(new double[]{1.0}, new double[]{1.0, 2.0}) new Tuple2<>(new double[]{1.0}, new double[]{1.0, 2.0})
); );
JavaRDD<Tuple2<double[], double[]>> scoreAndLabels = sc.parallelize(data); JavaRDD<Tuple2<double[], double[]>> scoreAndLabels = sc.parallelize(data);

View file

@ -40,11 +40,11 @@ public class JavaPowerIterationClusteringExample {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
// $example on$ // $example on$
JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Lists.newArrayList( JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Lists.newArrayList(
new Tuple3<Long, Long, Double>(0L, 1L, 0.9), new Tuple3<>(0L, 1L, 0.9),
new Tuple3<Long, Long, Double>(1L, 2L, 0.9), new Tuple3<>(1L, 2L, 0.9),
new Tuple3<Long, Long, Double>(2L, 3L, 0.9), new Tuple3<>(2L, 3L, 0.9),
new Tuple3<Long, Long, Double>(3L, 4L, 0.1), new Tuple3<>(3L, 4L, 0.1),
new Tuple3<Long, Long, Double>(4L, 5L, 0.9))); new Tuple3<>(4L, 5L, 0.9)));
PowerIterationClustering pic = new PowerIterationClustering() PowerIterationClustering pic = new PowerIterationClustering()
.setK(2) .setK(2)

View file

@ -36,7 +36,7 @@ public class JavaStratifiedSamplingExample {
JavaSparkContext jsc = new JavaSparkContext(conf); JavaSparkContext jsc = new JavaSparkContext(conf);
// $example on$ // $example on$
List<Tuple2<Integer, Character>> list = new ArrayList<Tuple2<Integer, Character>>( List<Tuple2<Integer, Character>> list = new ArrayList<>(
Arrays.<Tuple2<Integer, Character>>asList( Arrays.<Tuple2<Integer, Character>>asList(
new Tuple2(1, 'a'), new Tuple2(1, 'a'),
new Tuple2(1, 'b'), new Tuple2(1, 'b'),

View file

@ -19,7 +19,6 @@ package org.apache.spark.examples.streaming;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function;
import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.*; import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.flume.FlumeUtils; import org.apache.spark.streaming.flume.FlumeUtils;
@ -58,7 +57,8 @@ public final class JavaFlumeEventCount {
Duration batchInterval = new Duration(2000); Duration batchInterval = new Duration(2000);
SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port); JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
FlumeUtils.createStream(ssc, host, port);
flumeStream.count(); flumeStream.count();

View file

@ -27,10 +27,11 @@ public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
@Test @Test
public void testFlumeStream() { public void testFlumeStream() {
// tests the API, does not actually test data receiving // tests the API, does not actually test data receiving
JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345); JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost",
JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345, 12345);
StorageLevel.MEMORY_AND_DISK_SER_2()); JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost",
JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345, 12345, StorageLevel.MEMORY_AND_DISK_SER_2());
StorageLevel.MEMORY_AND_DISK_SER_2(), false); JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost",
12345, StorageLevel.MEMORY_AND_DISK_SER_2(), false);
} }
} }

View file

@ -34,7 +34,7 @@ class CommandBuilderUtils {
/** The set of known JVM vendors. */ /** The set of known JVM vendors. */
enum JavaVendor { enum JavaVendor {
Oracle, IBM, OpenJDK, Unknown Oracle, IBM, OpenJDK, Unknown
}; }
/** Returns whether the given string is null or empty. */ /** Returns whether the given string is null or empty. */
static boolean isEmpty(String s) { static boolean isEmpty(String s) {

View file

@ -477,6 +477,6 @@ public class SparkLauncher {
// No op. // No op.
} }
}; }
} }

View file

@ -175,7 +175,7 @@ public class LauncherServerSuite extends BaseSuite {
TestClient(Socket s) throws IOException { TestClient(Socket s) throws IOException {
super(s); super(s);
this.inbound = new LinkedBlockingQueue<Message>(); this.inbound = new LinkedBlockingQueue<>();
this.clientThread = new Thread(this); this.clientThread = new Thread(this);
clientThread.setName("TestClient"); clientThread.setName("TestClient");
clientThread.setDaemon(true); clientThread.setDaemon(true);

View file

@ -160,7 +160,7 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite {
"SparkPi", "SparkPi",
"42"); "42");
Map<String, String> env = new HashMap<String, String>(); Map<String, String> env = new HashMap<>();
List<String> cmd = buildCommand(sparkSubmitArgs, env); List<String> cmd = buildCommand(sparkSubmitArgs, env);
assertEquals("foo", findArgValue(cmd, parser.MASTER)); assertEquals("foo", findArgValue(cmd, parser.MASTER));
assertEquals("bar", findArgValue(cmd, parser.DEPLOY_MODE)); assertEquals("bar", findArgValue(cmd, parser.DEPLOY_MODE));

View file

@ -205,7 +205,7 @@ final class DecisionTreeClassificationModel private[ml] (
@Since("2.0.0") @Since("2.0.0")
lazy val featureImportances: Vector = TreeEnsembleModel.featureImportances(this, numFeatures) lazy val featureImportances: Vector = TreeEnsembleModel.featureImportances(this, numFeatures)
/** Convert to spark.mllib DecisionTreeModel (losing some infomation) */ /** Convert to spark.mllib DecisionTreeModel (losing some information) */
override private[spark] def toOld: OldDecisionTreeModel = { override private[spark] def toOld: OldDecisionTreeModel = {
new OldDecisionTreeModel(rootNode.toOld(1), OldAlgo.Classification) new OldDecisionTreeModel(rootNode.toOld(1), OldAlgo.Classification)
} }

View file

@ -62,7 +62,7 @@ private[shared] object SharedParamsCodeGen {
"every 10 iterations", isValid = "(interval: Int) => interval == -1 || interval >= 1"), "every 10 iterations", isValid = "(interval: Int) => interval == -1 || interval >= 1"),
ParamDesc[Boolean]("fitIntercept", "whether to fit an intercept term", Some("true")), ParamDesc[Boolean]("fitIntercept", "whether to fit an intercept term", Some("true")),
ParamDesc[String]("handleInvalid", "how to handle invalid entries. Options are skip (which " + ParamDesc[String]("handleInvalid", "how to handle invalid entries. Options are skip (which " +
"will filter out rows with bad values), or error (which will throw an errror). More " + "will filter out rows with bad values), or error (which will throw an error). More " +
"options may be added later", "options may be added later",
isValid = "ParamValidators.inArray(Array(\"skip\", \"error\"))"), isValid = "ParamValidators.inArray(Array(\"skip\", \"error\"))"),
ParamDesc[Boolean]("standardization", "whether to standardize the training features" + ParamDesc[Boolean]("standardization", "whether to standardize the training features" +

View file

@ -270,10 +270,10 @@ private[ml] trait HasFitIntercept extends Params {
private[ml] trait HasHandleInvalid extends Params { private[ml] trait HasHandleInvalid extends Params {
/** /**
* Param for how to handle invalid entries. Options are skip (which will filter out rows with bad values), or error (which will throw an errror). More options may be added later. * Param for how to handle invalid entries. Options are skip (which will filter out rows with bad values), or error (which will throw an error). More options may be added later.
* @group param * @group param
*/ */
final val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", "how to handle invalid entries. Options are skip (which will filter out rows with bad values), or error (which will throw an errror). More options may be added later", ParamValidators.inArray(Array("skip", "error"))) final val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", "how to handle invalid entries. Options are skip (which will filter out rows with bad values), or error (which will throw an error). More options may be added later", ParamValidators.inArray(Array("skip", "error")))
/** @group getParam */ /** @group getParam */
final def getHandleInvalid: String = $(handleInvalid) final def getHandleInvalid: String = $(handleInvalid)

View file

@ -205,7 +205,7 @@ final class DecisionTreeRegressionModel private[ml] (
@Since("2.0.0") @Since("2.0.0")
lazy val featureImportances: Vector = TreeEnsembleModel.featureImportances(this, numFeatures) lazy val featureImportances: Vector = TreeEnsembleModel.featureImportances(this, numFeatures)
/** Convert to spark.mllib DecisionTreeModel (losing some infomation) */ /** Convert to spark.mllib DecisionTreeModel (losing some information) */
override private[spark] def toOld: OldDecisionTreeModel = { override private[spark] def toOld: OldDecisionTreeModel = {
new OldDecisionTreeModel(rootNode.toOld(1), OldAlgo.Regression) new OldDecisionTreeModel(rootNode.toOld(1), OldAlgo.Regression)
} }

View file

@ -71,7 +71,7 @@ private[spark] trait DecisionTreeModel {
*/ */
private[ml] def maxSplitFeatureIndex(): Int = rootNode.maxSplitFeatureIndex() private[ml] def maxSplitFeatureIndex(): Int = rootNode.maxSplitFeatureIndex()
/** Convert to spark.mllib DecisionTreeModel (losing some infomation) */ /** Convert to spark.mllib DecisionTreeModel (losing some information) */
private[spark] def toOld: OldDecisionTreeModel private[spark] def toOld: OldDecisionTreeModel
} }

View file

@ -418,7 +418,7 @@ class LogisticRegressionWithLBFGS
private def run(input: RDD[LabeledPoint], initialWeights: Vector, userSuppliedWeights: Boolean): private def run(input: RDD[LabeledPoint], initialWeights: Vector, userSuppliedWeights: Boolean):
LogisticRegressionModel = { LogisticRegressionModel = {
// ml's Logisitic regression only supports binary classifcation currently. // ml's Logistic regression only supports binary classification currently.
if (numOfLinearPredictor == 1) { if (numOfLinearPredictor == 1) {
def runWithMlLogisitcRegression(elasticNetParam: Double) = { def runWithMlLogisitcRegression(elasticNetParam: Double) = {
// Prepare the ml LogisticRegression based on our settings // Prepare the ml LogisticRegression based on our settings

View file

@ -89,7 +89,7 @@ public class JavaTestParams extends JavaParams {
myDoubleParam_ = new DoubleParam(this, "myDoubleParam", "this is a double param", myDoubleParam_ = new DoubleParam(this, "myDoubleParam", "this is a double param",
ParamValidators.inRange(0.0, 1.0)); ParamValidators.inRange(0.0, 1.0));
List<String> validStrings = Arrays.asList("a", "b"); List<String> validStrings = Arrays.asList("a", "b");
myStringParam_ = new Param<String>(this, "myStringParam", "this is a string param", myStringParam_ = new Param<>(this, "myStringParam", "this is a string param",
ParamValidators.inArray(validStrings)); ParamValidators.inArray(validStrings));
myDoubleArrayParam_ = myDoubleArrayParam_ =
new DoubleArrayParam(this, "myDoubleArrayParam", "this is a double param"); new DoubleArrayParam(this, "myDoubleArrayParam", "this is a double param");

View file

@ -66,8 +66,8 @@ public class JavaStreamingLogisticRegressionSuite implements Serializable {
JavaDStream<LabeledPoint> training = JavaDStream<LabeledPoint> training =
attachTestInputStream(ssc, Arrays.asList(trainingBatch, trainingBatch), 2); attachTestInputStream(ssc, Arrays.asList(trainingBatch, trainingBatch), 2);
List<Tuple2<Integer, Vector>> testBatch = Arrays.asList( List<Tuple2<Integer, Vector>> testBatch = Arrays.asList(
new Tuple2<Integer, Vector>(10, Vectors.dense(1.0)), new Tuple2<>(10, Vectors.dense(1.0)),
new Tuple2<Integer, Vector>(11, Vectors.dense(0.0))); new Tuple2<>(11, Vectors.dense(0.0)));
JavaPairDStream<Integer, Vector> test = JavaPairDStream.fromJavaDStream( JavaPairDStream<Integer, Vector> test = JavaPairDStream.fromJavaDStream(
attachTestInputStream(ssc, Arrays.asList(testBatch, testBatch), 2)); attachTestInputStream(ssc, Arrays.asList(testBatch, testBatch), 2));
StreamingLogisticRegressionWithSGD slr = new StreamingLogisticRegressionWithSGD() StreamingLogisticRegressionWithSGD slr = new StreamingLogisticRegressionWithSGD()

View file

@ -66,8 +66,8 @@ public class JavaStreamingKMeansSuite implements Serializable {
JavaDStream<Vector> training = JavaDStream<Vector> training =
attachTestInputStream(ssc, Arrays.asList(trainingBatch, trainingBatch), 2); attachTestInputStream(ssc, Arrays.asList(trainingBatch, trainingBatch), 2);
List<Tuple2<Integer, Vector>> testBatch = Arrays.asList( List<Tuple2<Integer, Vector>> testBatch = Arrays.asList(
new Tuple2<Integer, Vector>(10, Vectors.dense(1.0)), new Tuple2<>(10, Vectors.dense(1.0)),
new Tuple2<Integer, Vector>(11, Vectors.dense(0.0))); new Tuple2<>(11, Vectors.dense(0.0)));
JavaPairDStream<Integer, Vector> test = JavaPairDStream.fromJavaDStream( JavaPairDStream<Integer, Vector> test = JavaPairDStream.fromJavaDStream(
attachTestInputStream(ssc, Arrays.asList(testBatch, testBatch), 2)); attachTestInputStream(ssc, Arrays.asList(testBatch, testBatch), 2));
StreamingKMeans skmeans = new StreamingKMeans() StreamingKMeans skmeans = new StreamingKMeans()

View file

@ -37,8 +37,8 @@ public class JavaVectorsSuite implements Serializable {
public void sparseArrayConstruction() { public void sparseArrayConstruction() {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Vector v = Vectors.sparse(3, Arrays.asList( Vector v = Vectors.sparse(3, Arrays.asList(
new Tuple2<Integer, Double>(0, 2.0), new Tuple2<>(0, 2.0),
new Tuple2<Integer, Double>(2, 3.0))); new Tuple2<>(2, 3.0)));
assertArrayEquals(new double[]{2.0, 0.0, 3.0}, v.toArray(), 0.0); assertArrayEquals(new double[]{2.0, 0.0, 3.0}, v.toArray(), 0.0);
} }
} }

View file

@ -65,8 +65,8 @@ public class JavaStreamingLinearRegressionSuite implements Serializable {
JavaDStream<LabeledPoint> training = JavaDStream<LabeledPoint> training =
attachTestInputStream(ssc, Arrays.asList(trainingBatch, trainingBatch), 2); attachTestInputStream(ssc, Arrays.asList(trainingBatch, trainingBatch), 2);
List<Tuple2<Integer, Vector>> testBatch = Arrays.asList( List<Tuple2<Integer, Vector>> testBatch = Arrays.asList(
new Tuple2<Integer, Vector>(10, Vectors.dense(1.0)), new Tuple2<>(10, Vectors.dense(1.0)),
new Tuple2<Integer, Vector>(11, Vectors.dense(0.0))); new Tuple2<>(11, Vectors.dense(0.0)));
JavaPairDStream<Integer, Vector> test = JavaPairDStream.fromJavaDStream( JavaPairDStream<Integer, Vector> test = JavaPairDStream.fromJavaDStream(
attachTestInputStream(ssc, Arrays.asList(testBatch, testBatch), 2)); attachTestInputStream(ssc, Arrays.asList(testBatch, testBatch), 2));
StreamingLinearRegressionWithSGD slr = new StreamingLinearRegressionWithSGD() StreamingLinearRegressionWithSGD slr = new StreamingLinearRegressionWithSGD()

View file

@ -759,7 +759,7 @@ class LinearRegressionSuite
.sliding(2) .sliding(2)
.forall(x => x(0) >= x(1))) .forall(x => x(0) >= x(1)))
} else { } else {
// To clalify that the normal solver is used here. // To clarify that the normal solver is used here.
assert(model.summary.objectiveHistory.length == 1) assert(model.summary.objectiveHistory.length == 1)
assert(model.summary.objectiveHistory(0) == 0.0) assert(model.summary.objectiveHistory(0) == 0.0)
val devianceResidualsR = Array(-0.47082, 0.34635) val devianceResidualsR = Array(-0.47082, 0.34635)

View file

@ -151,7 +151,7 @@ public final class UnsafeExternalRowSorter {
Platform.throwException(e); Platform.throwException(e);
} }
throw new RuntimeException("Exception should have been re-thrown in next()"); throw new RuntimeException("Exception should have been re-thrown in next()");
}; }
}; };
} catch (IOException e) { } catch (IOException e) {
cleanupResources(); cleanupResources();

View file

@ -26,7 +26,7 @@ private[spark] trait CatalystConf {
def groupByOrdinal: Boolean def groupByOrdinal: Boolean
/** /**
* Returns the [[Resolver]] for the current configuration, which can be used to determin if two * Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal. * identifiers are equal.
*/ */
def resolver: Resolver = { def resolver: Resolver = {

View file

@ -153,8 +153,8 @@ abstract class Expression extends TreeNode[Expression] {
* evaluate to the same result. * evaluate to the same result.
*/ */
lazy val canonicalized: Expression = { lazy val canonicalized: Expression = {
val canonicalizedChildred = children.map(_.canonicalized) val canonicalizedChildren = children.map(_.canonicalized)
Canonicalize.execute(withNewChildren(canonicalizedChildred)) Canonicalize.execute(withNewChildren(canonicalizedChildren))
} }
/** /**

View file

@ -509,7 +509,7 @@ class CodegenContext {
/** /**
* Checks and sets up the state and codegen for subexpression elimination. This finds the * Checks and sets up the state and codegen for subexpression elimination. This finds the
* common subexpresses, generates the functions that evaluate those expressions and populates * common subexpressions, generates the functions that evaluate those expressions and populates
* the mapping of common subexpressions to the generated functions. * the mapping of common subexpressions to the generated functions.
*/ */
private def subexpressionElimination(expressions: Seq[Expression]) = { private def subexpressionElimination(expressions: Seq[Expression]) = {

View file

@ -222,7 +222,7 @@ object CaseWhen {
} }
/** /**
* A factory method to faciliate the creation of this expression when used in parsers. * A factory method to facilitate the creation of this expression when used in parsers.
* @param branches Expressions at even position are the branch conditions, and expressions at odd * @param branches Expressions at even position are the branch conditions, and expressions at odd
* position are branch values. * position are branch values.
*/ */

View file

@ -965,7 +965,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
/** /**
* Create a binary arithmetic expression. The following arithmetic operators are supported: * Create a binary arithmetic expression. The following arithmetic operators are supported:
* - Mulitplication: '*' * - Multiplication: '*'
* - Division: '/' * - Division: '/'
* - Hive Long Division: 'DIV' * - Hive Long Division: 'DIV'
* - Modulo: '%' * - Modulo: '%'
@ -1270,7 +1270,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
} }
/** /**
* Create a double literal for a number denoted in scientifc notation. * Create a double literal for a number denoted in scientific notation.
*/ */
override def visitScientificDecimalLiteral( override def visitScientificDecimalLiteral(
ctx: ScientificDecimalLiteralContext): Literal = withOrigin(ctx) { ctx: ScientificDecimalLiteralContext): Literal = withOrigin(ctx) {

View file

@ -121,7 +121,7 @@ class RowTest extends FunSpec with Matchers {
externalRow should be theSameInstanceAs externalRow.copy() externalRow should be theSameInstanceAs externalRow.copy()
} }
it("copy should return same ref for interal rows") { it("copy should return same ref for internal rows") {
internalRow should be theSameInstanceAs internalRow.copy() internalRow should be theSameInstanceAs internalRow.copy()
} }

View file

@ -272,5 +272,5 @@ public final class UnsafeKVExternalSorter {
public void close() { public void close() {
cleanupResources(); cleanupResources();
} }
}; }
} }

View file

@ -79,7 +79,7 @@ public final class ColumnarBatch {
/** /**
* Called to close all the columns in this batch. It is not valid to access the data after * Called to close all the columns in this batch. It is not valid to access the data after
* calling this. This must be called at the end to clean up memory allcoations. * calling this. This must be called at the end to clean up memory allocations.
*/ */
public void close() { public void close() {
for (ColumnVector c: columns) { for (ColumnVector c: columns) {
@ -315,7 +315,7 @@ public final class ColumnarBatch {
public int numRows() { return numRows; } public int numRows() { return numRows; }
/** /**
* Returns the number of valid rowss. * Returns the number of valid rows.
*/ */
public int numValidRows() { public int numValidRows() {
assert(numRowsFiltered <= numRows); assert(numRowsFiltered <= numRows);

View file

@ -212,7 +212,7 @@ public final class OnHeapColumnVector extends ColumnVector {
public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
for (int i = 0; i < count; ++i) { for (int i = 0; i < count; ++i) {
intData[i + rowId] = Platform.getInt(src, srcOffset);; intData[i + rowId] = Platform.getInt(src, srcOffset);
srcIndex += 4; srcIndex += 4;
srcOffset += 4; srcOffset += 4;
} }

View file

@ -94,7 +94,7 @@ trait ContinuousQuery {
/** /**
* Blocks until all available data in the source has been processed an committed to the sink. * Blocks until all available data in the source has been processed an committed to the sink.
* This method is intended for testing. Note that in the case of continually arriving data, this * This method is intended for testing. Note that in the case of continually arriving data, this
* method may block forever. Additionally, this method is only guranteed to block until data that * method may block forever. Additionally, this method is only guaranteed to block until data that
* has been synchronously appended data to a [[org.apache.spark.sql.execution.streaming.Source]] * has been synchronously appended data to a [[org.apache.spark.sql.execution.streaming.Source]]
* prior to invocation. (i.e. `getOffset` must immediately reflect the addition). * prior to invocation. (i.e. `getOffset` must immediately reflect the addition).
*/ */

View file

@ -2077,7 +2077,7 @@ class Dataset[T] private[sql](
/** /**
* Returns a new [[Dataset]] partitioned by the given partitioning expressions into * Returns a new [[Dataset]] partitioned by the given partitioning expressions into
* `numPartitions`. The resulting Datasetis hash partitioned. * `numPartitions`. The resulting Dataset is hash partitioned.
* *
* This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
* *

View file

@ -108,7 +108,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
/** /**
* Matches a plan whose single partition should be small enough to build a hash table. * Matches a plan whose single partition should be small enough to build a hash table.
* *
* Note: this assume that the number of partition is fixed, requires addtional work if it's * Note: this assume that the number of partition is fixed, requires additional work if it's
* dynamic. * dynamic.
*/ */
def canBuildHashMap(plan: LogicalPlan): Boolean = { def canBuildHashMap(plan: LogicalPlan): Boolean = {

View file

@ -811,7 +811,7 @@ private[execution] final class UnboundedPrecedingWindowFunctionFrame(
* *
* This is a very expensive operator to use, O(n * (n - 1) /2), because we need to maintain a * This is a very expensive operator to use, O(n * (n - 1) /2), because we need to maintain a
* buffer and must do full recalculation after each row. Reverse iteration would be possible, if * buffer and must do full recalculation after each row. Reverse iteration would be possible, if
* the communitativity of the used window functions can be guaranteed. * the commutativity of the used window functions can be guaranteed.
* *
* @param target to write results to. * @param target to write results to.
* @param processor to calculate the row values with. * @param processor to calculate the row values with.

View file

@ -146,7 +146,7 @@ case class Filter(condition: Expression, child: SparkPlan)
// This has the property of not doing redundant IsNotNull checks and taking better advantage of // This has the property of not doing redundant IsNotNull checks and taking better advantage of
// short-circuiting, not loading attributes until they are needed. // short-circuiting, not loading attributes until they are needed.
// This is very perf sensitive. // This is very perf sensitive.
// TODO: revisit this. We can consider reodering predicates as well. // TODO: revisit this. We can consider reordering predicates as well.
val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length) val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length)
val generated = otherPreds.map { c => val generated = otherPreds.map { c =>
val nullChecks = c.references.map { r => val nullChecks = c.references.map { r =>

View file

@ -185,7 +185,7 @@ private[columnar] object ColumnBuilder {
case udt: UserDefinedType[_] => case udt: UserDefinedType[_] =>
return apply(udt.sqlType, initialSize, columnName, useCompression) return apply(udt.sqlType, initialSize, columnName, useCompression)
case other => case other =>
throw new Exception(s"not suppported type: $other") throw new Exception(s"not supported type: $other")
} }
builder.initialize(initialSize, columnName, useCompression) builder.initialize(initialSize, columnName, useCompression)

View file

@ -296,7 +296,7 @@ private[sql] object StatFunctions extends Logging {
val defaultRelativeError: Double = 0.01 val defaultRelativeError: Double = 0.01
/** /**
* Statisttics from the Greenwald-Khanna paper. * Statistics from the Greenwald-Khanna paper.
* @param value the sampled value * @param value the sampled value
* @param g the minimum rank jump from the previous value's minimum rank * @param g the minimum rank jump from the previous value's minimum rank
* @param delta the maximum span of the rank. * @param delta the maximum span of the rank.

View file

@ -32,7 +32,7 @@ object FileStreamSink {
/** /**
* A sink that writes out results to parquet files. Each batch is written out to a unique * A sink that writes out results to parquet files. Each batch is written out to a unique
* directory. After all of the files in a batch have been succesfully written, the list of * directory. After all of the files in a batch have been successfully written, the list of
* file paths is appended to the log atomically. In the case of partial failures, some duplicate * file paths is appended to the log atomically. In the case of partial failures, some duplicate
* data may be present in the target directory, but only one copy of each file will be present * data may be present in the target directory, but only one copy of each file will be present
* in the log. * in the log.

View file

@ -178,7 +178,7 @@ private[state] class HDFSBackedStateStoreProvider(
* This can be called only after committing all the updates made in the current thread. * This can be called only after committing all the updates made in the current thread.
*/ */
override def iterator(): Iterator[(UnsafeRow, UnsafeRow)] = { override def iterator(): Iterator[(UnsafeRow, UnsafeRow)] = {
verify(state == COMMITTED, "Cannot get iterator of store data before comitting") verify(state == COMMITTED, "Cannot get iterator of store data before committing")
HDFSBackedStateStoreProvider.this.iterator(newVersion) HDFSBackedStateStoreProvider.this.iterator(newVersion)
} }

View file

@ -220,7 +220,7 @@ private[state] object StateStore extends Logging {
val executorId = SparkEnv.get.blockManager.blockManagerId.executorId val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
val verified = val verified =
coordinatorRef.map(_.verifyIfInstanceActive(storeId, executorId)).getOrElse(false) coordinatorRef.map(_.verifyIfInstanceActive(storeId, executorId)).getOrElse(false)
logDebug(s"Verifyied whether the loaded instance $storeId is active: $verified" ) logDebug(s"Verified whether the loaded instance $storeId is active: $verified" )
verified verified
} catch { } catch {
case NonFatal(e) => case NonFatal(e) =>

View file

@ -126,7 +126,7 @@ object JdbcDialects {
/** /**
* Register a dialect for use on all new matching jdbc [[org.apache.spark.sql.DataFrame]]. * Register a dialect for use on all new matching jdbc [[org.apache.spark.sql.DataFrame]].
* Readding an existing dialect will cause a move-to-front. * Reading an existing dialect will cause a move-to-front.
* *
* @param dialect The new dialect. * @param dialect The new dialect.
*/ */

View file

@ -318,14 +318,14 @@ public class JavaDatasetSuite implements Serializable {
Encoder<Tuple3<Integer, Long, String>> encoder3 = Encoder<Tuple3<Integer, Long, String>> encoder3 =
Encoders.tuple(Encoders.INT(), Encoders.LONG(), Encoders.STRING()); Encoders.tuple(Encoders.INT(), Encoders.LONG(), Encoders.STRING());
List<Tuple3<Integer, Long, String>> data3 = List<Tuple3<Integer, Long, String>> data3 =
Arrays.asList(new Tuple3<Integer, Long, String>(1, 2L, "a")); Arrays.asList(new Tuple3<>(1, 2L, "a"));
Dataset<Tuple3<Integer, Long, String>> ds3 = context.createDataset(data3, encoder3); Dataset<Tuple3<Integer, Long, String>> ds3 = context.createDataset(data3, encoder3);
Assert.assertEquals(data3, ds3.collectAsList()); Assert.assertEquals(data3, ds3.collectAsList());
Encoder<Tuple4<Integer, String, Long, String>> encoder4 = Encoder<Tuple4<Integer, String, Long, String>> encoder4 =
Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.LONG(), Encoders.STRING()); Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.LONG(), Encoders.STRING());
List<Tuple4<Integer, String, Long, String>> data4 = List<Tuple4<Integer, String, Long, String>> data4 =
Arrays.asList(new Tuple4<Integer, String, Long, String>(1, "b", 2L, "a")); Arrays.asList(new Tuple4<>(1, "b", 2L, "a"));
Dataset<Tuple4<Integer, String, Long, String>> ds4 = context.createDataset(data4, encoder4); Dataset<Tuple4<Integer, String, Long, String>> ds4 = context.createDataset(data4, encoder4);
Assert.assertEquals(data4, ds4.collectAsList()); Assert.assertEquals(data4, ds4.collectAsList());
@ -333,7 +333,7 @@ public class JavaDatasetSuite implements Serializable {
Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.LONG(), Encoders.STRING(), Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.LONG(), Encoders.STRING(),
Encoders.BOOLEAN()); Encoders.BOOLEAN());
List<Tuple5<Integer, String, Long, String, Boolean>> data5 = List<Tuple5<Integer, String, Long, String, Boolean>> data5 =
Arrays.asList(new Tuple5<Integer, String, Long, String, Boolean>(1, "b", 2L, "a", true)); Arrays.asList(new Tuple5<>(1, "b", 2L, "a", true));
Dataset<Tuple5<Integer, String, Long, String, Boolean>> ds5 = Dataset<Tuple5<Integer, String, Long, String, Boolean>> ds5 =
context.createDataset(data5, encoder5); context.createDataset(data5, encoder5);
Assert.assertEquals(data5, ds5.collectAsList()); Assert.assertEquals(data5, ds5.collectAsList());
@ -354,7 +354,7 @@ public class JavaDatasetSuite implements Serializable {
Encoders.tuple(Encoders.INT(), Encoders.tuple(Encoders.INT(),
Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.LONG())); Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.LONG()));
List<Tuple2<Integer, Tuple3<String, String, Long>>> data2 = List<Tuple2<Integer, Tuple3<String, String, Long>>> data2 =
Arrays.asList(tuple2(1, new Tuple3<String, String, Long>("a", "b", 3L))); Arrays.asList(tuple2(1, new Tuple3<>("a", "b", 3L)));
Dataset<Tuple2<Integer, Tuple3<String, String, Long>>> ds2 = Dataset<Tuple2<Integer, Tuple3<String, String, Long>>> ds2 =
context.createDataset(data2, encoder2); context.createDataset(data2, encoder2);
Assert.assertEquals(data2, ds2.collectAsList()); Assert.assertEquals(data2, ds2.collectAsList());
@ -376,7 +376,7 @@ public class JavaDatasetSuite implements Serializable {
Encoders.tuple(Encoders.DOUBLE(), Encoders.DECIMAL(), Encoders.DATE(), Encoders.TIMESTAMP(), Encoders.tuple(Encoders.DOUBLE(), Encoders.DECIMAL(), Encoders.DATE(), Encoders.TIMESTAMP(),
Encoders.FLOAT()); Encoders.FLOAT());
List<Tuple5<Double, BigDecimal, Date, Timestamp, Float>> data = List<Tuple5<Double, BigDecimal, Date, Timestamp, Float>> data =
Arrays.asList(new Tuple5<Double, BigDecimal, Date, Timestamp, Float>( Arrays.asList(new Tuple5<>(
1.7976931348623157E308, new BigDecimal("0.922337203685477589"), 1.7976931348623157E308, new BigDecimal("0.922337203685477589"),
Date.valueOf("1970-01-01"), new Timestamp(System.currentTimeMillis()), Float.MAX_VALUE)); Date.valueOf("1970-01-01"), new Timestamp(System.currentTimeMillis()), Float.MAX_VALUE));
Dataset<Tuple5<Double, BigDecimal, Date, Timestamp, Float>> ds = Dataset<Tuple5<Double, BigDecimal, Date, Timestamp, Float>> ds =

View file

@ -105,10 +105,10 @@ abstract class QueryTest extends PlanTest {
val expected = expectedAnswer.toSet.toSeq.map((a: Any) => a.toString).sorted val expected = expectedAnswer.toSet.toSeq.map((a: Any) => a.toString).sorted
val actual = decoded.toSet.toSeq.map((a: Any) => a.toString).sorted val actual = decoded.toSet.toSeq.map((a: Any) => a.toString).sorted
val comparision = sideBySide("expected" +: expected, "spark" +: actual).mkString("\n") val comparison = sideBySide("expected" +: expected, "spark" +: actual).mkString("\n")
fail( fail(
s"""Decoded objects do not match expected objects: s"""Decoded objects do not match expected objects:
|$comparision |$comparison
|${ds.resolvedTEncoder.deserializer.treeString} |${ds.resolvedTEncoder.deserializer.treeString}
""".stripMargin) """.stripMargin)
} }

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.csv
import org.apache.spark.SparkFunSuite import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
class InferSchemaSuite extends SparkFunSuite { class CSVInferSchemaSuite extends SparkFunSuite {
test("String fields types are inferred correctly from null types") { test("String fields types are inferred correctly from null types") {
assert(CSVInferSchema.inferField(NullType, "") == NullType) assert(CSVInferSchema.inferField(NullType, "") == NullType)

View file

@ -469,7 +469,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
} }
} }
testQuietly("SPARK-9849 DirectParquetOutputCommitter qualified name backwards compatiblity") { testQuietly("SPARK-9849 DirectParquetOutputCommitter qualified name backwards compatibility") {
val clonedConf = new Configuration(hadoopConfiguration) val clonedConf = new Configuration(hadoopConfiguration)
// Write to a parquet file and let it fail. // Write to a parquet file and let it fail.

View file

@ -29,7 +29,7 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
/** /**
* A stress test for streamign queries that read and write files. This test constists of * A stress test for streaming queries that read and write files. This test consists of
* two threads: * two threads:
* - one that writes out `numRecords` distinct integers to files of random sizes (the total * - one that writes out `numRecords` distinct integers to files of random sizes (the total
* number of records is fixed but each files size / creation time is random). * number of records is fixed but each files size / creation time is random).

View file

@ -380,8 +380,8 @@ class TestHiveContext private[hive](
""".stripMargin.cmd, """.stripMargin.cmd,
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/episodes.avro")}' INTO TABLE episodes".cmd s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/episodes.avro")}' INTO TABLE episodes".cmd
), ),
// THIS TABLE IS NOT THE SAME AS THE HIVE TEST TABLE episodes_partitioned AS DYNAMIC PARITIONING // THIS TABLE IS NOT THE SAME AS THE HIVE TEST TABLE episodes_partitioned AS DYNAMIC
// IS NOT YET SUPPORTED // PARTITIONING IS NOT YET SUPPORTED
TestTable("episodes_part", TestTable("episodes_part",
s"""CREATE TABLE episodes_part (title STRING, air_date STRING, doctor INT) s"""CREATE TABLE episodes_part (title STRING, air_date STRING, doctor INT)
|PARTITIONED BY (doctor_pt INT) |PARTITIONED BY (doctor_pt INT)

View file

@ -482,7 +482,7 @@ abstract class HiveComparisonTest
val tablesGenerated = queryList.zip(executions).flatMap { val tablesGenerated = queryList.zip(executions).flatMap {
// We should take executedPlan instead of sparkPlan, because in following codes we // We should take executedPlan instead of sparkPlan, because in following codes we
// will run the collected plans. As we will do extra processing for sparkPlan such // will run the collected plans. As we will do extra processing for sparkPlan such
// as adding exchage, collapsing codegen stages, etc., collecing sparkPlan here // as adding exchange, collapsing codegen stages, etc., collecting sparkPlan here
// will cause some errors when running these plans later. // will cause some errors when running these plans later.
case (q, e) => e.executedPlan.collect { case (q, e) => e.executedPlan.collect {
case i: InsertIntoHiveTable if tablesRead contains i.table.tableName => case i: InsertIntoHiveTable if tablesRead contains i.table.tableName =>

View file

@ -311,7 +311,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
case ExecutedCommand(_: InsertIntoHadoopFsRelation) => // OK case ExecutedCommand(_: InsertIntoHadoopFsRelation) => // OK
case o => fail("test_insert_parquet should be converted to a " + case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[HadoopFsRelation ].getCanonicalName} and " + s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expected as the SparkPlan. " +
s"However, found a ${o.toString} ") s"However, found a ${o.toString} ")
} }
@ -341,7 +341,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
case ExecutedCommand(_: InsertIntoHadoopFsRelation) => // OK case ExecutedCommand(_: InsertIntoHadoopFsRelation) => // OK
case o => fail("test_insert_parquet should be converted to a " + case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[HadoopFsRelation ].getCanonicalName} and " + s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." + s"${classOf[InsertIntoDataSource].getCanonicalName} is expected as the SparkPlan." +
s"However, found a ${o.toString} ") s"However, found a ${o.toString} ")
} }