[MINOR][DOCS] Remove consecutive duplicated words/typo in Spark Repo

## What changes were proposed in this pull request?
There are many locations in the Spark repo where the same word occurs consecutively. Sometimes they are appropriately placed, but many times they are not. This PR removes the inappropriately duplicated words.

## How was this patch tested?
N/A since only docs or comments were updated.

Author: Niranjan Padmanabhan <niranjan.padmanabhan@gmail.com>

Closes #16455 from neurons/np.structure_streaming_doc.
This commit is contained in:
Niranjan Padmanabhan 2017-01-04 15:07:29 +00:00 committed by Sean Owen
parent 7a82505817
commit a1e40b1f5d
No known key found for this signature in database
GPG key ID: BEB3956D6717BDDC
52 changed files with 57 additions and 57 deletions

View file

@ -86,7 +86,7 @@ public final class UnsafeInMemorySorter {
private final PrefixComparators.RadixSortSupport radixSortSupport;
/**
* Within this buffer, position {@code 2 * i} holds a pointer pointer to the record at
* Within this buffer, position {@code 2 * i} holds a pointer to the record at
* index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix.
*
* Only part of the array will be used to store the pointers, the rest part is preserved as

View file

@ -25,7 +25,7 @@ import org.apache.spark.util.collection.SortDataFormat;
* Supports sorting an array of (record pointer, key prefix) pairs.
* Used in {@link UnsafeInMemorySorter}.
* <p>
* Within each long[] buffer, position {@code 2 * i} holds a pointer pointer to the record at
* Within each long[] buffer, position {@code 2 * i} holds a pointer to the record at
* index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix.
*/
public final class UnsafeSortDataFormat

View file

@ -317,7 +317,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,
pool
}
// Make sure that that we aren't going to exceed the max RPC message size by making sure
// Make sure that we aren't going to exceed the max RPC message size by making sure
// we use broadcast to send large map output statuses.
if (minSizeForBroadcast > maxRpcMessageSize) {
val msg = s"spark.shuffle.mapOutput.minSizeForBroadcast ($minSizeForBroadcast bytes) must " +

View file

@ -98,7 +98,7 @@ case class FetchFailed(
* 4 task failures, instead we immediately go back to the stage which generated the map output,
* and regenerate the missing data. (2) we don't count fetch failures for blacklisting, since
* presumably its not the fault of the executor where the task ran, but the executor which
* stored the data. This is especially important because we we might rack up a bunch of
* stored the data. This is especially important because we might rack up a bunch of
* fetch-failures in rapid succession, on all nodes of the cluster, due to one bad node.
*/
override def countTowardsTaskFailures: Boolean = false

View file

@ -43,7 +43,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
* Execute using
* ./bin/spark-class org.apache.spark.deploy.FaultToleranceTest
*
* Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS
* Make sure that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS
* *and* SPARK_JAVA_OPTS:
* - spark.deploy.recoveryMode=ZOOKEEPER
* - spark.deploy.zookeeper.url=172.17.42.1:2181

View file

@ -291,7 +291,7 @@ object HistoryServer extends Logging {
/**
* Create a security manager.
* This turns off security in the SecurityManager, so that the the History Server can start
* This turns off security in the SecurityManager, so that the History Server can start
* in a Spark cluster where security is enabled.
* @param config configuration for the SecurityManager constructor
* @return the security manager for use in constructing the History Server.

View file

@ -92,7 +92,7 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)
/**
* Resets the value of the current metrics (`this`) and and merges all the independent
* Resets the value of the current metrics (`this`) and merges all the independent
* [[TempShuffleReadMetrics]] into `this`.
*/
private[spark] def setMergeValues(metrics: Seq[TempShuffleReadMetrics]): Unit = {

View file

@ -37,7 +37,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.util.Utils
/**
* A BlockTransferService that uses Netty to fetch a set of blocks at at time.
* A BlockTransferService that uses Netty to fetch a set of blocks at time.
*/
private[spark] class NettyBlockTransferService(
conf: SparkConf,

View file

@ -350,7 +350,7 @@ object SizeEstimator extends Logging {
// 3. consistent fields layouts throughout the hierarchy: This means we should layout
// superclass first. And we can use superclass's shellSize as a starting point to layout the
// other fields in this class.
// 4. class alignment: HotSpot rounds field blocks up to to HeapOopSize not 4 bytes, confirmed
// 4. class alignment: HotSpot rounds field blocks up to HeapOopSize not 4 bytes, confirmed
// with Aleksey. see https://bugs.openjdk.java.net/browse/CODETOOLS-7901322
//
// The real world field layout is much more complicated. There are three kinds of fields

View file

@ -253,7 +253,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar
assertNotFound(appId, None)
}
test("Test that if an attempt ID is is set, it must be used in lookups") {
test("Test that if an attempt ID is set, it must be used in lookups") {
val operations = new StubCacheOperations()
val clock = new ManualClock(1)
implicit val cache = new ApplicationCache(operations, retainedApplications = 10, clock = clock)

View file

@ -1819,7 +1819,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostA")))
// Reducer should run where RDD 2 has preferences, even though though it also has a shuffle dep
// Reducer should run where RDD 2 has preferences, even though it also has a shuffle dep
val reduceTaskSet = taskSets(1)
assertLocations(reduceTaskSet, Seq(Seq("hostB")))
complete(reduceTaskSet, Seq((Success, 42)))
@ -2058,7 +2058,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// Now complete tasks in the second task set
val newTaskSet = taskSets(1)
assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on on hostA
assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on hostA
runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2)))
assert(results.size === 0) // Map stage job should not be complete yet
runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2)))

View file

@ -53,7 +53,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
conf
}
test("single insert insert") {
test("single insert") {
val conf = createSparkConf(loadDefaults = false)
sc = new SparkContext("local", "test", conf)
val map = createExternalMap[Int]

View file

@ -752,7 +752,7 @@ for more details on the API.
`Interaction` is a `Transformer` which takes vector or double-valued columns, and generates a single vector column that contains the product of all combinations of one value from each input column.
For example, if you have 2 vector type columns each of which has 3 dimensions as input columns, then then you'll get a 9-dimensional vector as the output column.
For example, if you have 2 vector type columns each of which has 3 dimensions as input columns, then you'll get a 9-dimensional vector as the output column.
**Examples**

View file

@ -354,7 +354,7 @@ v = u.map(lambda x: 1.0 + 2.0 * x)
useful for visualizing empirical probability distributions without requiring assumptions about the
particular distribution that the observed samples are drawn from. It computes an estimate of the
probability density function of a random variables, evaluated at a given set of points. It achieves
this estimate by expressing the PDF of the empirical distribution at a particular point as the the
this estimate by expressing the PDF of the empirical distribution at a particular point as the
mean of PDFs of normal distributions centered around each of the samples.
<div class="codetabs">

View file

@ -244,7 +244,7 @@ Note that the following Kafka params cannot be set and the Kafka source will thr
- **group.id**: Kafka source will create a unique group id for each query automatically.
- **auto.offset.reset**: Set the source option `startingOffsets` to specify
where to start instead. Structured Streaming manages which offsets are consumed internally, rather
than rely on the kafka Consumer to do it. This will ensure that no data is missed when when new
than rely on the kafka Consumer to do it. This will ensure that no data is missed when new
topics/partitions are dynamically subscribed. Note that `startingOffsets` only applies when a new
Streaming query is started, and that resuming will always pick up from where the query left off.
- **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use

View file

@ -680,7 +680,7 @@ windowedCounts = words.groupBy(
### Handling Late Data and Watermarking
Now consider what happens if one of the events arrives late to the application.
For example, say, a word generated at 12:04 (i.e. event time) could be received received by
For example, say, a word generated at 12:04 (i.e. event time) could be received by
the application at 12:11. The application should use the time 12:04 instead of 12:11
to update the older counts for the window `12:00 - 12:10`. This occurs
naturally in our window-based grouping Structured Streaming can maintain the intermediate state

View file

@ -52,7 +52,7 @@ public class JavaLDAExample {
double ll = model.logLikelihood(dataset);
double lp = model.logPerplexity(dataset);
System.out.println("The lower bound on the log likelihood of the entire corpus: " + ll);
System.out.println("The upper bound bound on perplexity: " + lp);
System.out.println("The upper bound on perplexity: " + lp);
// Describe topics.
Dataset<Row> topics = model.describeTopics(3);

View file

@ -46,7 +46,7 @@ if __name__ == "__main__":
ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound bound on perplexity: " + str(lp))
print("The upper bound on perplexity: " + str(lp))
# Describe topics.
topics = model.describeTopics(3)

View file

@ -50,7 +50,7 @@ object LDAExample {
val ll = model.logLikelihood(dataset)
val lp = model.logPerplexity(dataset)
println(s"The lower bound on the log likelihood of the entire corpus: $ll")
println(s"The upper bound bound on perplexity: $lp")
println(s"The upper bound on perplexity: $lp")
// Describe topics.
val topics = model.describeTopics(3)

View file

@ -45,7 +45,7 @@ import org.apache.flume.sink.AbstractSink
* the thread itself is blocked and a reference to it saved off.
*
* When the ack for that batch is received,
* the thread which created the transaction is is retrieved and it commits the transaction with the
* the thread which created the transaction is retrieved and it commits the transaction with the
* channel from the same thread it was originally created in (since Flume transactions are
* thread local). If a nack is received instead, the sink rolls back the transaction. If no ack
* is received within the specified timeout, the transaction is rolled back too. If an ack comes

View file

@ -212,7 +212,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
|Instead set the source option '$STARTING_OFFSETS_OPTION_KEY' to 'earliest' or 'latest'
|to specify where to start. Structured Streaming manages which offsets are consumed
|internally, rather than relying on the kafkaConsumer to do it. This will ensure that no
|data is missed when when new topics/partitions are dynamically subscribed. Note that
|data is missed when new topics/partitions are dynamically subscribed. Note that
|'$STARTING_OFFSETS_OPTION_KEY' only applies when a new Streaming query is started, and
|that resuming will always pick up from where the query left off. See the docs for more
|details.

View file

@ -129,7 +129,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
/**
* Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager
* and the rest to a write ahead log, and then reading reading it all back using the RDD.
* and the rest to a write ahead log, and then reading it all back using the RDD.
* It can also test if the partitions that were read from the log were again stored in
* block manager.
*

View file

@ -512,7 +512,7 @@ abstract class LDAModel private[ml] (
}
/**
* Calculate an upper bound bound on perplexity. (Lower is better.)
* Calculate an upper bound on perplexity. (Lower is better.)
* See Equation (16) in the Online LDA paper (Hoffman et al., 2010).
*
* WARNING: If this model is an instance of [[DistributedLDAModel]] (produced when [[optimizer]]

View file

@ -54,7 +54,7 @@ private[python] class Word2VecModelWrapper(model: Word2VecModel) {
}
/**
* Finds words similar to the the vector representation of a word without
* Finds words similar to the vector representation of a word without
* filtering results.
* @param vector a vector
* @param num number of synonyms to find

View file

@ -245,7 +245,7 @@ class LocalLDAModel private[spark] (
}
/**
* Calculate an upper bound bound on perplexity. (Lower is better.)
* Calculate an upper bound on perplexity. (Lower is better.)
* See Equation (16) in original Online LDA paper.
*
* @param documents test corpus to use for calculating perplexity

View file

@ -29,7 +29,7 @@ import org.apache.spark.streaming.dstream.DStream
/**
* :: DeveloperApi ::
* StreamingLinearAlgorithm implements methods for continuously
* training a generalized linear model model on streaming data,
* training a generalized linear model on streaming data,
* and using it for prediction on (possibly different) streaming data.
*
* This class takes as type parameters a GeneralizedLinearModel,

View file

@ -699,7 +699,7 @@ class LDAModel(JavaModel):
@since("2.0.0")
def logPerplexity(self, dataset):
"""
Calculate an upper bound bound on perplexity. (Lower is better.)
Calculate an upper bound on perplexity. (Lower is better.)
See Equation (16) in the Online LDA paper (Hoffman et al., 2010).
WARNING: If this model is an instance of :py:class:`DistributedLDAModel` (produced when

View file

@ -481,7 +481,7 @@ class SparseVector(Vector):
>>> SparseVector(4, {1:1.0, 6:2.0})
Traceback (most recent call last):
...
AssertionError: Index 6 is out of the the size of vector with size=4
AssertionError: Index 6 is out of the size of vector with size=4
>>> SparseVector(4, {-1:1.0})
Traceback (most recent call last):
...
@ -521,7 +521,7 @@ class SparseVector(Vector):
if self.indices.size > 0:
assert np.max(self.indices) < self.size, \
"Index %d is out of the the size of vector with size=%d" \
"Index %d is out of the size of vector with size=%d" \
% (np.max(self.indices), self.size)
assert np.min(self.indices) >= 0, \
"Contains negative index %d" % (np.min(self.indices))

View file

@ -95,7 +95,7 @@ def install_exception_handler():
original = py4j.protocol.get_return_value
# The original `get_return_value` is not patched, it's idempotent.
patched = capture_sql_exception(original)
# only patch the one used in in py4j.java_gateway (call Java API)
# only patch the one used in py4j.java_gateway (call Java API)
py4j.java_gateway.get_return_value = patched

View file

@ -32,7 +32,7 @@ private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], rack
/**
* This strategy is calculating the optimal locality preferences of YARN containers by considering
* the node ratio of pending tasks, number of required cores/containers and and locality of current
* the node ratio of pending tasks, number of required cores/containers and locality of current
* existing and pending allocated containers. The target of this algorithm is to maximize the number
* of tasks that would run locally.
*

View file

@ -196,7 +196,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
assertIndexIsValid(i);
BitSetMethods.set(baseObject, baseOffset, i);
// To preserve row equality, zero out the value when setting the column to null.
// Since this row does does not currently support updates to variable-length values, we don't
// Since this row does not currently support updates to variable-length values, we don't
// have to worry about zeroing out that data.
Platform.putLong(baseObject, getFieldOffset(i), 0);
}

View file

@ -516,7 +516,7 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
* into the number of buckets); both variables are based on the size of the current partition.
* During the calculation process the function keeps track of the current row number, the current
* bucket number, and the row number at which the bucket will change (bucketThreshold). When the
* current row number reaches bucket threshold, the bucket value is increased by one and the the
* current row number reaches bucket threshold, the bucket value is increased by one and the
* threshold is increased by the bucket size (plus one extra if the current bucket is padded).
*
* This documentation has been based upon similar documentation for the Hive and Presto projects.

View file

@ -795,7 +795,7 @@ case object OneRowRelation extends LeafNode {
/**
* Computes [[Statistics]] for this plan. The default implementation assumes the output
* cardinality is the product of of all child plan's cardinality, i.e. applies in the case
* cardinality is the product of all child plan's cardinality, i.e. applies in the case
* of cartesian joins.
*
* [[LeafNode]]s must override this.

View file

@ -142,7 +142,7 @@ object DateTimeUtils {
}
/**
* Returns the number of days since epoch from from java.sql.Date.
* Returns the number of days since epoch from java.sql.Date.
*/
def fromJavaDate(date: Date): SQLDate = {
millisToDays(date.getTime)
@ -503,7 +503,7 @@ object DateTimeUtils {
}
/**
* Calculates the year and and the number of the day in the year for the given
* Calculates the year and the number of the day in the year for the given
* number of days. The given days is the number of days since 1.1.1970.
*
* The calculation uses the fact that the period 1.1.2001 until 31.12.2400 is

View file

@ -52,7 +52,7 @@ class AttributeSetSuite extends SparkFunSuite {
assert((aSet ++ bSet).contains(aLower) === true)
}
test("extracts all references references") {
test("extracts all references ") {
val addSet = AttributeSet(Add(aUpper, Alias(bUpper, "test")()):: Nil)
assert(addSet.contains(aUpper))
assert(addSet.contains(aLower))

View file

@ -361,7 +361,7 @@ class Dataset[T] private[sql](
* method used to map columns depend on the type of `U`:
* - When `U` is a class, fields for the class will be mapped to columns of the same name
* (case sensitivity is determined by `spark.sql.caseSensitive`).
* - When `U` is a tuple, the columns will be be mapped by ordinal (i.e. the first column will
* - When `U` is a tuple, the columns will be mapped by ordinal (i.e. the first column will
* be assigned to `_1`).
* - When `U` is a primitive type (i.e. String, Int, etc), then the first column of the
* `DataFrame` will be used.

View file

@ -41,7 +41,7 @@ object PartitionPath {
}
/**
* Holds a directory in a partitioned collection of files as well as as the partition values
* Holds a directory in a partitioned collection of files as well as the partition values
* in the form of a Row. Before scanning, the files at `path` need to be enumerated.
*/
case class PartitionPath(values: InternalRow, path: Path)

View file

@ -285,7 +285,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
/**
* Starts the execution of the streaming query, which will continually send results to the given
* `ForeachWriter` as as new data arrives. The `ForeachWriter` can be used to send the data
* `ForeachWriter` as new data arrives. The `ForeachWriter` can be used to send the data
* generated by the `DataFrame`/`Dataset` to an external system.
*
* Scala example:

View file

@ -257,7 +257,7 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
}
}
test("time window in SQL with with two expressions") {
test("time window in SQL with two expressions") {
withTempTable { table =>
checkAnswer(
spark.sql(
@ -272,7 +272,7 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
}
}
test("time window in SQL with with three expressions") {
test("time window in SQL with three expressions") {
withTempTable { table =>
checkAnswer(
spark.sql(

View file

@ -363,7 +363,7 @@ class PlannerSuite extends SharedSQLContext {
// This is a regression test for SPARK-9703
test("EnsureRequirements should not repartition if only ordering requirement is unsatisfied") {
// Consider an operator that imposes both output distribution and ordering requirements on its
// children, such as sort sort merge join. If the distribution requirements are satisfied but
// children, such as sort merge join. If the distribution requirements are satisfied but
// the output ordering requirements are unsatisfied, then the planner should only add sorts and
// should not need to add additional shuffles / exchanges.
val outputOrdering = Seq(SortOrder(Literal(1), Ascending))

View file

@ -144,7 +144,7 @@ class FileStreamSinkSuite extends StreamTest {
}
// This tests whether FileStreamSink works with aggregations. Specifically, it tests
// whether the the correct streaming QueryExecution (i.e. IncrementalExecution) is used to
// whether the correct streaming QueryExecution (i.e. IncrementalExecution) is used to
// to execute the trigger for writing data to file sink. See SPARK-18440 for more details.
test("writing with aggregation") {

View file

@ -151,7 +151,7 @@ public abstract class AbstractService implements Service {
}
/**
* Verify that that a service is in a given state.
* Verify that a service is in a given state.
*
* @param currentState
* the desired state

View file

@ -33,7 +33,7 @@ public final class ServiceOperations {
}
/**
* Verify that that a service is in a given state.
* Verify that a service is in a given state.
* @param state the actual state a service is in
* @param expectedState the desired state
* @throws IllegalStateException if the service state is different from

View file

@ -29,7 +29,7 @@ public interface ServiceStateChangeListener {
* have changed state before this callback is invoked.
*
* This operation is invoked on the thread that initiated the state change,
* while the service itself in in a synchronized section.
* while the service itself in a synchronized section.
* <ol>
* <li>Any long-lived operation here will prevent the service state
* change from completing in a timely manner.</li>

View file

@ -98,7 +98,7 @@ public class TypeDescriptor {
* For datetime types this is the length in characters of the String representation
* (assuming the maximum allowed precision of the fractional seconds component).
* For binary data this is the length in bytes.
* Null is returned for for data types where the column size is not applicable.
* Null is returned for data types where the column size is not applicable.
*/
public Integer getColumnSize() {
if (type.isNumericType()) {

View file

@ -178,7 +178,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"skewjoin",
"database",
// These tests fail and and exit the JVM.
// These tests fail and exit the JVM.
"auto_join18_multi_distinct",
"join18_multi_distinct",
"input44",

View file

@ -40,7 +40,7 @@ import org.apache.spark.util.Utils
* The Hive table scan operator. Column and partition pruning are both handled.
*
* @param requestedAttributes Attributes to be fetched from the Hive table.
* @param relation The Hive table be be scanned.
* @param relation The Hive table be scanned.
* @param partitionPruningPred An optional partition pruning predicate for partitioned table.
*/
private[hive]

View file

@ -178,7 +178,7 @@ private[streaming] class StateImpl[S] extends State[S] {
removed
}
/** Whether the state has been been updated */
/** Whether the state has been updated */
def isUpdated(): Boolean = {
updated
}

View file

@ -88,7 +88,7 @@ abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
if (!super.isTimeValid(time)) {
false // Time not valid
} else {
// Time is valid, but check it it is more than lastValidTime
// Time is valid, but check it is more than lastValidTime
if (lastValidTime != null && time < lastValidTime) {
logWarning(s"isTimeValid called with $time whereas the last valid time " +
s"is $lastValidTime")

View file

@ -326,7 +326,7 @@ class MapWithStateRDDSuite extends SparkFunSuite with RDDCheckpointTester with B
// Create a MapWithStateRDD that has a long lineage using the data RDD with a long lineage
val stateRDDWithLongLineage = makeStateRDDWithLongLineageDataRDD(longLineageRDD)
// Create a new MapWithStateRDD, with the lineage lineage MapWithStateRDD as the parent
// Create a new MapWithStateRDD, with the lineage MapWithStateRDD as the parent
new MapWithStateRDD[Int, Int, Int, Int](
stateRDDWithLongLineage,
stateRDDWithLongLineage.sparkContext.emptyRDD[(Int, Int)].partitionBy(partitioner),

View file

@ -108,7 +108,7 @@ class WriteAheadLogBackedBlockRDDSuite
/**
* Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager
* and the rest to a write ahead log, and then reading reading it all back using the RDD.
* and the rest to a write ahead log, and then reading it all back using the RDD.
* It can also test if the partitions that were read from the log were again stored in
* block manager.
*

View file

@ -90,7 +90,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs (data1)
assert(listener.onAddDataCalled === false) // should be called only with addDataWithCallback()
// Verify addDataWithCallback() add data+metadata and and callbacks are called correctly
// Verify addDataWithCallback() add data+metadata and callbacks are called correctly
val data2 = 11 to 20
val metadata2 = data2.map { _.toString }
data2.zip(metadata2).foreach { case (d, m) => blockGenerator.addDataWithCallback(d, m) }
@ -103,7 +103,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs combined
}
// Verify addMultipleDataWithCallback() add data+metadata and and callbacks are called correctly
// Verify addMultipleDataWithCallback() add data+metadata and callbacks are called correctly
val data3 = 21 to 30
val metadata3 = "metadata"
blockGenerator.addMultipleDataWithCallback(data3.iterator, metadata3)