diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java index 47644a3267..99ea8f6b4a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java @@ -25,7 +25,7 @@ import org.apache.spark.sql.connector.read.PartitionReader; * at the executor side. During query execution, Spark will collect the task metrics per partition * by {@link PartitionReader} and update internal metrics based on collected metric values. * For streaming query, Spark will collect and combine metrics for a final result per micro batch. - * + *
* The metrics will be gathered during query execution back to the driver and then combined. How * the task metrics are combined is defined by corresponding {@link CustomMetric} with same metric * name. The final result will be shown up in the data source scan operator in Spark UI. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java index 6fd271cf91..e2c693f2d0 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java @@ -30,15 +30,16 @@ import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException; *
* These APIs are used to modify table partition or partition metadata, * they will change the table data as well. - * ${@link #createPartitions}: - * add an array of partitions and any data they contain to the table - * ${@link #dropPartitions}: - * remove an array of partitions and any data they contain from the table - * ${@link #purgePartitions}: - * remove an array of partitions and any data they contain from the table by skipping - * a trash even if it is supported - * ${@link #truncatePartitions}: - * truncate an array of partitions by removing partitions data + *
* These APIs are used to modify table partition identifier or partition metadata. * In some cases, they will change the table data as well. - * ${@link #createPartition}: - * add a partition and any data it contains to the table - * ${@link #dropPartition}: - * remove a partition and any data it contains from the table - * ${@link #purgePartition}: - * remove a partition and any data it contains from the table by skipping a trash - * even if it is supported. - * ${@link #replacePartitionMetadata}: - * point a partition to a new location, which will swap one location's data for the other - * ${@link #truncatePartition}: - * remove partition data from the table + *
* By default this method returns false, which means the schema and partitioning passed to - * `getTable` are from the infer methods. Please override it if this source has expensive - * schema/partitioning inference and wants external table metadata to avoid inference. + * {@link #getTable(StructType, Transform[], Map)} are from the infer methods. Please override it + * if this source has expensive schema/partitioning inference and wants external table metadata + * to avoid inference. */ default boolean supportsExternalMetadata() { return false; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Batch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Batch.java index 73aefa55ae..6861f168b9 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Batch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Batch.java @@ -36,10 +36,8 @@ public interface Batch { *
* If the {@link Scan} supports filter pushdown, this Batch is likely configured with a filter * and is responsible for creating splits for that filter, which is not a full scan. - *
** This method will be called only once during a data source scan, to launch one Spark job. - *
*/ InputPartition[] planInputPartitions(); diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/InputPartition.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/InputPartition.java index 03eec00168..b9e7c375b4 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/InputPartition.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/InputPartition.java @@ -24,7 +24,7 @@ import org.apache.spark.annotation.Evolving; /** * A serializable representation of an input partition returned by * {@link Batch#planInputPartitions()} and the corresponding ones in streaming . - * + ** Note that {@link InputPartition} will be serialized and sent to executors, then * {@link PartitionReader} will be created by * {@link PartitionReaderFactory#createReader(InputPartition)} or @@ -42,11 +42,11 @@ public interface InputPartition extends Serializable { * faster, but Spark does not guarantee to run the input partition reader on these locations. * The implementations should make sure that it can be run on any location. * The location is a string representing the host name. - * + *
* Note that if a host name cannot be recognized by Spark, it will be ignored as it was not in * the returned locations. The default return value is empty string array, which means this * input partition's reader has no location preference. - * + *
* If this method fails (by throwing an exception), the action will fail and no Spark job will be * submitted. */ diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java index dfecb77c66..c91f2b4bf3 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java @@ -27,7 +27,7 @@ import org.apache.spark.sql.connector.CustomTaskMetric; * A partition reader returned by {@link PartitionReaderFactory#createReader(InputPartition)} or * {@link PartitionReaderFactory#createColumnarReader(InputPartition)}. It's responsible for * outputting data for a RDD partition. - * + *
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.catalyst.InternalRow} * for normal data sources, or {@link org.apache.spark.sql.vectorized.ColumnarBatch} for columnar * data sources(whose {@link PartitionReaderFactory#supportColumnarReads(InputPartition)} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReaderFactory.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReaderFactory.java index 9dded247e8..52204fd3bf 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReaderFactory.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReaderFactory.java @@ -25,7 +25,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch; /** * A factory used to create {@link PartitionReader} instances. - * + *
* If Spark fails to execute any methods in the implementations of this interface or in the returned * {@link PartitionReader} (by throwing an exception), corresponding Spark task would fail and * get retried until hitting the maximum retry times. @@ -37,7 +37,7 @@ public interface PartitionReaderFactory extends Serializable { /** * Returns a row-based partition reader to read data from the given {@link InputPartition}. - * + *
* Implementations probably need to cast the input partition to the concrete * {@link InputPartition} class defined for the data source. */ @@ -45,7 +45,7 @@ public interface PartitionReaderFactory extends Serializable { /** * Returns a columnar partition reader to read data from the given {@link InputPartition}. - * + *
* Implementations probably need to cast the input partition to the concrete * {@link InputPartition} class defined for the data source. */ @@ -57,7 +57,7 @@ public interface PartitionReaderFactory extends Serializable { * Returns true if the given {@link InputPartition} should be read by Spark in a columnar way. * This means, implementations must also implement {@link #createColumnarReader(InputPartition)} * for the input partitions that this method returns true. - * + *
* As of Spark 2.4, Spark can only read all input partition in a columnar way, or none of them. * Data source can't mix columnar and row-based partitions. This may be relaxed in future * versions. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownFilters.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownFilters.java index 6594af2773..5ab9092c9a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownFilters.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownFilters.java @@ -39,14 +39,17 @@ public interface SupportsPushDownFilters extends ScanBuilder { /** * Returns the filters that are pushed to the data source via {@link #pushFilters(Filter[])}. - * + *
* There are 3 kinds of filters: - * 1. pushable filters which don't need to be evaluated again after scanning. - * 2. pushable filters which still need to be evaluated after scanning, e.g. parquet - * row group filter. - * 3. non-pushable filters. + *
* Both case 1 and 2 should be considered as pushed filters and should be returned by this method. - * + *
* It's possible that there is no filters in the query and {@link #pushFilters(Filter[])} * is never called, empty array should be returned for this case. */ diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownRequiredColumns.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownRequiredColumns.java index 4f7da3c2a3..34de67bcf4 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownRequiredColumns.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownRequiredColumns.java @@ -32,11 +32,11 @@ public interface SupportsPushDownRequiredColumns extends ScanBuilder { /** * Applies column pruning w.r.t. the given requiredSchema. - * + *
* Implementation should try its best to prune the unnecessary columns or nested fields, but it's * also OK to do the pruning partially, e.g., a data source may not be able to prune nested * fields, and only prune top-level columns. - * + *
* Note that, {@link Scan#readSchema()} implementation should take care of the column * pruning applied here. */ diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsReportPartitioning.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsReportPartitioning.java index e7a27e0749..51d56bdf0a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsReportPartitioning.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsReportPartitioning.java @@ -23,7 +23,7 @@ import org.apache.spark.sql.connector.read.partitioning.Partitioning; /** * A mix in interface for {@link Scan}. Data sources can implement this interface to * report data partitioning and try to avoid shuffle at Spark side. - * + *
* Note that, when a {@link Scan} implementation creates exactly one {@link InputPartition}, * Spark may avoid adding a shuffle even if the reader does not implement this interface. * diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsReportStatistics.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsReportStatistics.java index 1e0c9ca7c7..031749dee0 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsReportStatistics.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsReportStatistics.java @@ -22,7 +22,7 @@ import org.apache.spark.annotation.Evolving; /** * A mix in interface for {@link Scan}. Data sources can implement this interface to * report statistics to Spark. - * + *
* As of Spark 3.0, statistics are reported to the optimizer after operators are pushed to the * data source. Implementations may return more accurate statistics based on pushed operators * which may improve query performance by providing better information to the optimizer. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/partitioning/Distribution.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/partitioning/Distribution.java index 264b268e24..a5911a820a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/partitioning/Distribution.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/partitioning/Distribution.java @@ -24,14 +24,15 @@ import org.apache.spark.sql.connector.read.PartitionReader; * An interface to represent data distribution requirement, which specifies how the records should * be distributed among the data partitions (one {@link PartitionReader} outputs data for one * partition). + *
* Note that this interface has nothing to do with the data ordering inside one * partition(the output records of a single {@link PartitionReader}). - * + *
* The instance of this interface is created and provided by Spark, then consumed by * {@link Partitioning#satisfy(Distribution)}. This means data source developers don't need to * implement this interface, but need to catch as more concrete implementations of this interface * as possible in {@link Partitioning#satisfy(Distribution)}. - * + *
* Concrete implementations until now: *
* Note that, Spark may add new concrete implementations of {@link Distribution} in new releases.
* This method should be aware of it and always return false for unrecognized distributions. It's
* recommended to check every Spark new release and support new distributions if possible, to
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ContinuousPartitionReader.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ContinuousPartitionReader.java
index c2ad9ec244..a33c64192d 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ContinuousPartitionReader.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ContinuousPartitionReader.java
@@ -30,7 +30,7 @@ public interface ContinuousPartitionReader
* The execution engine will call this method along with get() to keep track of the current
* offset. When an epoch ends, the offset of the previous record in each partition will be saved
* as a restart checkpoint.
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ContinuousStream.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ContinuousStream.java
index a84578fe46..98943f6429 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ContinuousStream.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ContinuousStream.java
@@ -62,7 +62,7 @@ public interface ContinuousStream extends SparkDataStream {
* The execution engine will call this method in every epoch to determine if new input
* partitions need to be generated, which may be required if for example the underlying
* source system has had partitions added or removed.
- *
+ *
* If true, the Spark job to scan this continuous data stream will be interrupted and Spark will
* launch it again with a new list of {@link InputPartition input partitions}.
*/
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/Offset.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/Offset.java
index efb8ebb684..818f378624 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/Offset.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/Offset.java
@@ -22,6 +22,7 @@ import org.apache.spark.annotation.Evolving;
/**
* An abstract representation of progress through a {@link MicroBatchStream} or
* {@link ContinuousStream}.
+ *
* During execution, offsets provided by the data source implementation will be logged and used as
* restart checkpoints. Each source should provide an offset implementation which the source can use
* to reconstruct a position in the stream up to which data has been seen/processed.
@@ -33,6 +34,7 @@ public abstract class Offset {
/**
* A JSON-serialized representation of an Offset that is
* used for saving offsets to the offset log.
+ *
* Note: We assume that equivalent/equal offsets serialize to
* identical JSON strings.
*
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/PartitionOffset.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/PartitionOffset.java
index faee230467..2d194f52ee 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/PartitionOffset.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/PartitionOffset.java
@@ -24,7 +24,7 @@ import org.apache.spark.annotation.Evolving;
/**
* Used for per-partition offsets in continuous processing. ContinuousReader implementations will
* provide a method to merge these into a global Offset.
- *
+ *
* These offsets must be serializable.
*
* @since 3.0.0
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadAllAvailable.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadAllAvailable.java
index 5a946ad14b..4aab6b578f 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadAllAvailable.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadAllAvailable.java
@@ -22,7 +22,7 @@ import org.apache.spark.annotation.Evolving;
/**
* Represents a {@link ReadLimit} where the {@link MicroBatchStream} must scan all the data
* available at the streaming source. This is meant to be a hard specification as being able
- * to return all available data is necessary for Trigger.Once() to work correctly.
+ * to return all available data is necessary for {@code Trigger.Once()} to work correctly.
* If a source is unable to scan all available data, then it must throw an error.
*
* @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SparkDataStream.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SparkDataStream.java
index 95703e255e..c8cf2419ce 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SparkDataStream.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SparkDataStream.java
@@ -22,7 +22,7 @@ import org.apache.spark.annotation.Evolving;
/**
* The base interface representing a readable data stream in a Spark streaming query. It's
* responsible to manage the offsets of the streaming source in the streaming query.
- *
+ *
* Data sources should implement concrete data stream interfaces:
* {@link MicroBatchStream} and {@link ContinuousStream}.
*
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java
index c808b9a306..b60a37a223 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java
@@ -25,7 +25,7 @@ import org.apache.spark.annotation.Evolving;
* contract of triggers, e.g. Trigger.Once() requires that a micro-batch process all data
* available to the system at the start of the micro-batch. Alternatively, sources can decide to
* limit ingest through data source options.
- *
+ *
* Through this interface, a MicroBatchStream should be able to return the next offset that it will
* process until given a {@link ReadLimit}.
*
@@ -44,11 +44,11 @@ public interface SupportsAdmissionControl extends SparkDataStream {
* Returns the most recent offset available given a read limit. The start offset can be used
* to figure out how much new data should be read given the limit. Users should implement this
* method instead of latestOffset for a MicroBatchStream or getOffset for Source.
- *
+ *
* When this method is called on a `Source`, the source can return `null` if there is no
* data to process. In addition, for the very first micro-batch, the `startOffset` will be
* null as well.
- *
+ *
* When this method is called on a MicroBatchStream, the `startOffset` will be `initialOffset`
* for the very first micro-batch. The source can return `null` if there is no data to process.
*/
@@ -56,7 +56,7 @@ public interface SupportsAdmissionControl extends SparkDataStream {
/**
* Returns the most recent offset available.
- *
+ *
* The source can return `null`, if there is no data to process or the source does not support
* to this method.
*/
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
index c4b073458e..8c06892841 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
@@ -21,20 +21,22 @@ import org.apache.spark.annotation.Evolving;
/**
* An interface that defines how to write the data to data source for batch processing.
- *
+ *
* The writing procedure is:
- * 1. Create a writer factory by {@link #createBatchWriterFactory(PhysicalWriteInfo)}, serialize
- * and send it to all the partitions of the input data(RDD).
- * 2. For each partition, create the data writer, and write the data of the partition with this
- * writer. If all the data are written successfully, call {@link DataWriter#commit()}. If
- * exception happens during the writing, call {@link DataWriter#abort()}.
- * 3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If
- * some writers are aborted, or the job failed with an unknown reason, call
- * {@link #abort(WriterCommitMessage[])}.
- *
+ *
* While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should
* do it manually in their Spark applications if they want to retry.
- *
+ *
* Please refer to the documentation of commit/abort methods for detailed specifications.
*
* @since 3.0.0
@@ -44,7 +46,7 @@ public interface BatchWrite {
/**
* Creates a writer factory which will be serialized and sent to executors.
- *
+ *
* If this method fails (by throwing an exception), the action will fail and no Spark job will be
* submitted.
*
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DataWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DataWriter.java
index 0022a71491..1c07480148 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DataWriter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DataWriter.java
@@ -25,17 +25,17 @@ import org.apache.spark.annotation.Evolving;
/**
* A data writer returned by {@link DataWriterFactory#createWriter(int, long)} and is
* responsible for writing data for an input RDD partition.
- *
+ *
* One Spark task has one exclusive data writer, so there is no thread-safe concern.
- *
+ *
* {@link #write(Object)} is called for each record in the input RDD partition. If one record fails
* the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will
* not be processed. If all records are successfully written, {@link #commit()} is called.
- *
+ *
* Once a data writer returns successfully from {@link #commit()} or {@link #abort()}, Spark will
* call {@link #close()} to let DataWriter doing resource cleanup. After calling {@link #close()},
* its lifecycle is over and Spark will not use it again.
- *
+ *
* If this data writer succeeds(all records are successfully written and {@link #commit()}
* succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
* {@link BatchWrite#commit(WriterCommitMessage[])} with commit messages from other data
@@ -44,7 +44,7 @@ import org.apache.spark.annotation.Evolving;
* In each retry, {@link DataWriterFactory#createWriter(int, long)} will receive a
* different `taskId`. Spark will call {@link BatchWrite#abort(WriterCommitMessage[])}
* when the configured number of retries is exhausted.
- *
+ *
* Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task
* takes too long to finish. Different from retried tasks, which are launched one by one after the
* previous one fails, speculative tasks are running simultaneously. It's possible that one input
@@ -54,8 +54,9 @@ import org.apache.spark.annotation.Evolving;
* these data writers can commit successfully. Or implementations can allow all of them to commit
* successfully, and have a way to revert committed data writers without the commit message, because
* Spark only accepts the commit message that arrives first and ignore others.
- *
- * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.catalyst.InternalRow}.
+ *
+ * Note that, Currently the type {@code T} can only be
+ * {@link org.apache.spark.sql.catalyst.InternalRow}.
*
* @since 3.0.0
*/
@@ -64,7 +65,7 @@ public interface DataWriter
* If this method fails (by throwing an exception), {@link #abort()} will be called and this
* data writer is considered to have been failed.
*
@@ -76,12 +77,12 @@ public interface DataWriter
* The written data should only be visible to data source readers after
* {@link BatchWrite#commit(WriterCommitMessage[])} succeeds, which means this method
* should still "hide" the written data and ask the {@link BatchWrite} at driver side to
* do the final commit via {@link WriterCommitMessage}.
- *
+ *
* If this method fails (by throwing an exception), {@link #abort()} will be called and this
* data writer is considered to have been failed.
*
@@ -92,10 +93,10 @@ public interface DataWriter
* This method will only be called if there is one record failed to write, or {@link #commit()}
* failed.
- *
+ *
* If this method fails(by throwing an exception), the underlying data source may have garbage
* that need to be cleaned by {@link BatchWrite#abort(WriterCommitMessage[])} or manually,
* but these garbage should not be visible to data source readers.
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DataWriterFactory.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DataWriterFactory.java
index 33b7ff3dd2..cb14a70607 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DataWriterFactory.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DataWriterFactory.java
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow;
* A factory of {@link DataWriter} returned by
* {@link BatchWrite#createBatchWriterFactory(PhysicalWriteInfo)}, which is responsible for
* creating and initializing the actual data writer at executor side.
- *
+ *
* Note that, the writer factory will be serialized and sent to executors, then the data writer
* will be created on executors and do the actual writing. So this interface must be
* serializable and {@link DataWriter} doesn't need to be.
@@ -42,7 +42,7 @@ public interface DataWriterFactory extends Serializable {
* object instance when sending data to the data writer, for better performance. Data writers
* are responsible for defensive copies if necessary, e.g. copy the data before buffer it in a
* list.
- *
+ *
* If this method fails (by throwing an exception), the corresponding Spark write task would fail
* and get retried until hitting the maximum retry times.
*
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/LogicalWriteInfo.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/LogicalWriteInfo.java
index 0638970df9..e472a13018 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/LogicalWriteInfo.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/LogicalWriteInfo.java
@@ -35,7 +35,7 @@ public interface LogicalWriteInfo {
CaseInsensitiveStringMap options();
/**
- * `queryId` is a unique string of the query. It's possible that there are many queries
+ * {@code queryId} is a unique string of the query. It's possible that there are many queries
* running at the same time, or a query is restarted and resumed. {@link BatchWrite} can use
* this id to identify the query.
*/
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteBuilder.java
index 0c72f31af1..51ca167a4c 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteBuilder.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteBuilder.java
@@ -23,7 +23,7 @@ import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
/**
* An interface for building the {@link Write}. Implementations can mix in some interfaces to
* support different ways to write data to data sources.
- *
+ *
* Unless modified by a mixin interface, the {@link Write} configured by this builder is to
* append data without affecting existing data.
*
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriterCommitMessage.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriterCommitMessage.java
index f340d20c35..b76d531f78 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriterCommitMessage.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriterCommitMessage.java
@@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
* A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side
* as the input parameter of {@link BatchWrite#commit(WriterCommitMessage[])} or
* {@link StreamingWrite#commit(long, WriterCommitMessage[])}.
- *
+ *
* This is an empty interface, data sources should define their own message class and use it when
* generating messages at executor side and handling the messages at driver side.
*
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingDataWriterFactory.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingDataWriterFactory.java
index 0923d07e7e..b430e1e597 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingDataWriterFactory.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingDataWriterFactory.java
@@ -29,7 +29,7 @@ import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
* A factory of {@link DataWriter} returned by
* {@link StreamingWrite#createStreamingWriterFactory(PhysicalWriteInfo)}, which is responsible for
* creating and initializing the actual data writer at executor side.
- *
+ *
* Note that, the writer factory will be serialized and sent to executors, then the data writer
* will be created on executors and do the actual writing. So this interface must be
* serializable and {@link DataWriter} doesn't need to be.
@@ -44,7 +44,7 @@ public interface StreamingDataWriterFactory extends Serializable {
* object instance when sending data to the data writer, for better performance. Data writers
* are responsible for defensive copies if necessary, e.g. copy the data before buffer it in a
* list.
- *
+ *
* If this method fails (by throwing an exception), the corresponding Spark write task would fail
* and get retried until hitting the maximum retry times.
*
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java
index e3dec3b2ff..20694f0b05 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java
@@ -26,19 +26,21 @@ import org.apache.spark.sql.connector.write.WriterCommitMessage;
* An interface that defines how to write the data to data source in streaming queries.
*
* The writing procedure is:
- * 1. Create a writer factory by {@link #createStreamingWriterFactory(PhysicalWriteInfo)},
- * serialize and send it to all the partitions of the input data(RDD).
- * 2. For each epoch in each partition, create the data writer, and write the data of the epoch in
- * the partition with this writer. If all the data are written successfully, call
- * {@link DataWriter#commit()}. If exception happens during the writing, call
- * {@link DataWriter#abort()}.
- * 3. If writers in all partitions of one epoch are successfully committed, call
- * {@link #commit(long, WriterCommitMessage[])}. If some writers are aborted, or the job failed
- * with an unknown reason, call {@link #abort(long, WriterCommitMessage[])}.
- *
+ *
* While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should
* do it manually in their Spark applications if they want to retry.
- *
+ *
* Please refer to the documentation of commit/abort methods for detailed specifications.
*
* @since 3.0.0
@@ -48,7 +50,7 @@ public interface StreamingWrite {
/**
* Creates a writer factory which will be serialized and sent to executors.
- *
+ *
* If this method fails (by throwing an exception), the action will fail and no Spark job will be
* submitted.
*
@@ -60,14 +62,14 @@ public interface StreamingWrite {
* Commits this writing job for the specified epoch with a list of commit messages. The commit
* messages are collected from successful data writers and are produced by
* {@link DataWriter#commit()}.
- *
+ *
* If this method fails (by throwing an exception), this writing job is considered to have been
* failed, and the execution engine will attempt to call
* {@link #abort(long, WriterCommitMessage[])}.
- *
- * The execution engine may call `commit` multiple times for the same epoch in some circumstances.
- * To support exactly-once data semantics, implementations must ensure that multiple commits for
- * the same epoch are idempotent.
+ *
+ * The execution engine may call {@code commit} multiple times for the same epoch in some
+ * circumstances. To support exactly-once data semantics, implementations must ensure that
+ * multiple commits for the same epoch are idempotent.
*/
void commit(long epochId, WriterCommitMessage[] messages);
@@ -75,10 +77,10 @@ public interface StreamingWrite {
* Aborts this writing job because some data writers are failed and keep failing when retried, or
* the Spark job fails with some unknown reasons, or {@link #commit(long, WriterCommitMessage[])}
* fails.
- *
+ *
* If this method fails (by throwing an exception), the underlying data source may require manual
* cleanup.
- *
+ *
* Unless the abort is triggered by the failure of commit, the given messages will have some
* null slots, as there may be only a few data writers that were committed before the abort
* happens, or some data writers were committed but their commit messages haven't reached the
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java
index 5f6a46f2b8..a814525f87 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java
@@ -23,8 +23,9 @@ import org.apache.spark.sql.catalyst.plans.logical.*;
/**
* Represents the type of timeouts possible for the Dataset operations
- * `mapGroupsWithState` and `flatMapGroupsWithState`. See documentation on
- * `GroupState` for more details.
+ * {@code mapGroupsWithState} and {@code flatMapGroupsWithState}.
+ *
+ * See documentation on {@code GroupState} for more details.
*
* @since 2.2.0
*/
@@ -33,21 +34,29 @@ import org.apache.spark.sql.catalyst.plans.logical.*;
public class GroupStateTimeout {
/**
- * Timeout based on processing time. The duration of timeout can be set for each group in
- * `map/flatMapGroupsWithState` by calling `GroupState.setTimeoutDuration()`. See documentation
- * on `GroupState` for more details.
+ * Timeout based on processing time.
+ *
+ * The duration of timeout can be set for each group in
+ * {@code map/flatMapGroupsWithState} by calling {@code GroupState.setTimeoutDuration()}.
+ *
+ * See documentation on {@code GroupState} for more details.
*/
public static GroupStateTimeout ProcessingTimeTimeout() {
return ProcessingTimeTimeout$.MODULE$;
}
/**
- * Timeout based on event-time. The event-time timestamp for timeout can be set for each
- * group in `map/flatMapGroupsWithState` by calling `GroupState.setTimeoutTimestamp()`.
- * In addition, you have to define the watermark in the query using `Dataset.withWatermark`.
+ * Timeout based on event-time.
+ *
+ * The event-time timestamp for timeout can be set for each
+ * group in {@code map/flatMapGroupsWithState} by calling
+ * {@code GroupState.setTimeoutTimestamp()}.
+ * In addition, you have to define the watermark in the query using
+ * {@code Dataset.withWatermark}.
* When the watermark advances beyond the set timestamp of a group and the group has not
- * received any data, then the group times out. See documentation on
- * `GroupState` for more details.
+ * received any data, then the group times out.
+ *
+ * See documentation on {@code GroupState} for more details.
*/
public static GroupStateTimeout EventTimeTimeout() { return EventTimeTimeout$.MODULE$; }
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
index 2158ef8324..5ef4fba193 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
@@ -26,22 +26,23 @@ import org.apache.spark.unsafe.types.UTF8String;
* An interface representing in-memory columnar data in Spark. This interface defines the main APIs
* to access the data, as well as their batched versions. The batched versions are considered to be
* faster and preferable whenever possible.
- *
+ *
* Most of the APIs take the rowId as a parameter. This is the batch local 0-based row id for values
* in this ColumnVector.
- *
- * Spark only calls specific `get` method according to the data type of this {@link ColumnVector},
+ *
+ * Spark only calls specific {@code get} method according to the data type of this
+ * {@link ColumnVector},
* e.g. if it's int type, Spark is guaranteed to only call {@link #getInt(int)} or
* {@link #getInts(int, int)}.
- *
+ *
* ColumnVector supports all the data types including nested types. To handle nested types,
* ColumnVector can have children and is a tree structure. Please refer to {@link #getStruct(int)},
* {@link #getArray(int)} and {@link #getMap(int)} for the details about how to implement nested
* types.
- *
+ *
* ColumnVector is expected to be reused during the entire data loading process, to avoid allocating
* memory again and again.
- *
+ *
* ColumnVector is meant to maximize CPU efficiency but not to minimize storage footprint.
* Implementations should prefer computing efficiency over storage efficiency when design the
* format. Since it is expected to reuse the ColumnVector instance while loading data, the storage
@@ -57,9 +58,10 @@ public abstract class ColumnVector implements AutoCloseable {
/**
* Cleans up memory for this column vector. The column vector is not usable after this.
- *
- * This overwrites `AutoCloseable.close` to remove the `throws` clause, as column vector is
- * in-memory and we don't expect any exception to happen during closing.
+ *
+ * This overwrites {@link AutoCloseable#close} to remove the
+ * {@code throws} clause, as column vector is in-memory and we don't expect any exception to
+ * happen during closing.
*/
@Override
public abstract void close();
@@ -75,19 +77,19 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract int numNulls();
/**
- * Returns whether the value at rowId is NULL.
+ * Returns whether the value at {@code rowId} is NULL.
*/
public abstract boolean isNullAt(int rowId);
/**
- * Returns the boolean type value for rowId. The return value is undefined and can be anything,
- * if the slot for rowId is null.
+ * Returns the boolean type value for {@code rowId}. The return value is undefined and can be
+ * anything, if the slot for {@code rowId} is null.
*/
public abstract boolean getBoolean(int rowId);
/**
- * Gets boolean type values from [rowId, rowId + count). The return values for the null slots
- * are undefined and can be anything.
+ * Gets boolean type values from {@code [rowId, rowId + count)}. The return values for the null
+ * slots are undefined and can be anything.
*/
public boolean[] getBooleans(int rowId, int count) {
boolean[] res = new boolean[count];
@@ -98,13 +100,13 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
- * Returns the byte type value for rowId. The return value is undefined and can be anything,
- * if the slot for rowId is null.
+ * Returns the byte type value for {@code rowId}. The return value is undefined and can be
+ * anything, if the slot for {@code rowId} is null.
*/
public abstract byte getByte(int rowId);
/**
- * Gets byte type values from [rowId, rowId + count). The return values for the null slots
+ * Gets byte type values from {@code [rowId, rowId + count)}. The return values for the null slots
* are undefined and can be anything.
*/
public byte[] getBytes(int rowId, int count) {
@@ -116,14 +118,14 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
- * Returns the short type value for rowId. The return value is undefined and can be anything,
- * if the slot for rowId is null.
+ * Returns the short type value for {@code rowId}. The return value is undefined and can be
+ * anything, if the slot for {@code rowId} is null.
*/
public abstract short getShort(int rowId);
/**
- * Gets short type values from [rowId, rowId + count). The return values for the null slots
- * are undefined and can be anything.
+ * Gets short type values from {@code [rowId, rowId + count)}. The return values for the null
+ * slots are undefined and can be anything.
*/
public short[] getShorts(int rowId, int count) {
short[] res = new short[count];
@@ -134,13 +136,13 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
- * Returns the int type value for rowId. The return value is undefined and can be anything,
- * if the slot for rowId is null.
+ * Returns the int type value for {@code rowId}. The return value is undefined and can be
+ * anything, if the slot for {@code rowId} is null.
*/
public abstract int getInt(int rowId);
/**
- * Gets int type values from [rowId, rowId + count). The return values for the null slots
+ * Gets int type values from {@code [rowId, rowId + count)}. The return values for the null slots
* are undefined and can be anything.
*/
public int[] getInts(int rowId, int count) {
@@ -152,13 +154,13 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
- * Returns the long type value for rowId. The return value is undefined and can be anything,
- * if the slot for rowId is null.
+ * Returns the long type value for {@code rowId}. The return value is undefined and can be
+ * anything, if the slot for {@code rowId} is null.
*/
public abstract long getLong(int rowId);
/**
- * Gets long type values from [rowId, rowId + count). The return values for the null slots
+ * Gets long type values from {@code [rowId, rowId + count)}. The return values for the null slots
* are undefined and can be anything.
*/
public long[] getLongs(int rowId, int count) {
@@ -170,14 +172,14 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
- * Returns the float type value for rowId. The return value is undefined and can be anything,
- * if the slot for rowId is null.
+ * Returns the float type value for {@code rowId}. The return value is undefined and can be
+ * anything, if the slot for {@code rowId} is null.
*/
public abstract float getFloat(int rowId);
/**
- * Gets float type values from [rowId, rowId + count). The return values for the null slots
- * are undefined and can be anything.
+ * Gets float type values from {@code [rowId, rowId + count)}. The return values for the null
+ * slots are undefined and can be anything.
*/
public float[] getFloats(int rowId, int count) {
float[] res = new float[count];
@@ -188,14 +190,14 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
- * Returns the double type value for rowId. The return value is undefined and can be anything,
- * if the slot for rowId is null.
+ * Returns the double type value for {@code rowId}. The return value is undefined and can be
+ * anything, if the slot for {@code rowId} is null.
*/
public abstract double getDouble(int rowId);
/**
- * Gets double type values from [rowId, rowId + count). The return values for the null slots
- * are undefined and can be anything.
+ * Gets double type values from {@code [rowId, rowId + count)}. The return values for the null
+ * slots are undefined and can be anything.
*/
public double[] getDoubles(int rowId, int count) {
double[] res = new double[count];
@@ -206,8 +208,9 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
- * Returns the struct type value for rowId. If the slot for rowId is null, it should return null.
- *
+ * Returns the struct type value for {@code rowId}. If the slot for {@code rowId} is null, it
+ * should return null.
+ *
* To support struct type, implementations must implement {@link #getChild(int)} and make this
* vector a tree structure. The number of child vectors must be same as the number of fields of
* the struct type, and each child vector is responsible to store the data for its corresponding
@@ -219,8 +222,9 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
- * Returns the array type value for rowId. If the slot for rowId is null, it should return null.
- *
+ * Returns the array type value for {@code rowId}. If the slot for {@code rowId} is null, it
+ * should return null.
+ *
* To support array type, implementations must construct an {@link ColumnarArray} and return it in
* this method. {@link ColumnarArray} requires a {@link ColumnVector} that stores the data of all
* the elements of all the arrays in this vector, and an offset and length which points to a range
@@ -232,12 +236,13 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract ColumnarArray getArray(int rowId);
/**
- * Returns the map type value for rowId. If the slot for rowId is null, it should return null.
- *
+ * Returns the map type value for {@code rowId}. If the slot for {@code rowId} is null, it
+ * should return null.
+ *
* In Spark, map type value is basically a key data array and a value data array. A key from the
* key array with a index and a value from the value array with the same index contribute to
* an entry of this map type value.
- *
+ *
* To support map type, implementations must construct a {@link ColumnarMap} and return it in
* this method. {@link ColumnarMap} requires a {@link ColumnVector} that stores the data of all
* the keys of all the maps in this vector, and another {@link ColumnVector} that stores the data
@@ -247,31 +252,35 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract ColumnarMap getMap(int ordinal);
/**
- * Returns the decimal type value for rowId. If the slot for rowId is null, it should return null.
+ * Returns the decimal type value for {@code rowId}. If the slot for {@code rowId} is null, it
+ * should return null.
*/
public abstract Decimal getDecimal(int rowId, int precision, int scale);
/**
- * Returns the string type value for rowId. If the slot for rowId is null, it should return null.
- * Note that the returned UTF8String may point to the data of this column vector, please copy it
- * if you want to keep it after this column vector is freed.
+ * Returns the string type value for {@code rowId}. If the slot for {@code rowId} is null, it
+ * should return null.
+ *
+ * Note that the returned {@link UTF8String} may point to the data of this column vector,
+ * please copy it if you want to keep it after this column vector is freed.
*/
public abstract UTF8String getUTF8String(int rowId);
/**
- * Returns the binary type value for rowId. If the slot for rowId is null, it should return null.
+ * Returns the binary type value for {@code rowId}. If the slot for {@code rowId} is null, it
+ * should return null.
*/
public abstract byte[] getBinary(int rowId);
/**
- * Returns the calendar interval type value for rowId. If the slot for rowId is null, it should
- * return null.
- *
+ * Returns the calendar interval type value for {@code rowId}. If the slot for
+ * {@code rowId} is null, it should return null.
+ *
* In Spark, calendar interval type value is basically two integer values representing the number
* of months and days in this interval, and a long value representing the number of microseconds
* in this interval. An interval type vector is the same as a struct type vector with 3 fields:
- * `months`, `days` and `microseconds`.
- *
+ * {@code months}, {@code days} and {@code microseconds}.
+ *
* To support interval type, implementations must implement {@link #getChild(int)} and define 3
* child vectors: the first child vector is an int type vector, containing all the month values of
* all the interval values in this vector. The second child vector is an int type vector,
@@ -288,7 +297,7 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
- * @return child [[ColumnVector]] at the given ordinal.
+ * @return child {@link ColumnVector} at the given ordinal.
*/
public abstract ColumnVector getChild(int ordinal);
+ *
+ *
+ *
+ *