[SPARK-34945][DOC] Fix Javadoc for classes in catalyst module

### What changes were proposed in this pull request?

Use proper Java doc format for Java classes within `catalyst` module

### Why are the changes needed?

Many Java classes in `catalyst`, especially those for DataSource V2, do not have proper Java doc format. By fixing the format it helps to improve the doc's readability.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

N/A

Closes #32038 from sunchao/javadoc.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Chao Sun 2021-04-02 23:00:19 -07:00 committed by Dongjoon Hyun
parent 65da9287bc
commit f1d42bb68d
31 changed files with 201 additions and 172 deletions

View file

@ -25,7 +25,7 @@ import org.apache.spark.sql.connector.read.PartitionReader;
* at the executor side. During query execution, Spark will collect the task metrics per partition
* by {@link PartitionReader} and update internal metrics based on collected metric values.
* For streaming query, Spark will collect and combine metrics for a final result per micro batch.
*
* <p>
* The metrics will be gathered during query execution back to the driver and then combined. How
* the task metrics are combined is defined by corresponding {@link CustomMetric} with same metric
* name. The final result will be shown up in the data source scan operator in Spark UI.

View file

@ -30,15 +30,16 @@ import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException;
* <p>
* These APIs are used to modify table partition or partition metadata,
* they will change the table data as well.
* ${@link #createPartitions}:
* add an array of partitions and any data they contain to the table
* ${@link #dropPartitions}:
* remove an array of partitions and any data they contain from the table
* ${@link #purgePartitions}:
* remove an array of partitions and any data they contain from the table by skipping
* a trash even if it is supported
* ${@link #truncatePartitions}:
* truncate an array of partitions by removing partitions data
* <ul>
* <li>{@link #createPartitions}: add an array of partitions and any data they contain to the
* table</li>
* <li>{@link #dropPartitions}: remove an array of partitions and any data they contain from
* the table</li>
* <li>{@link #purgePartitions}: remove an array of partitions and any data they contain from
* the table by skipping a trash even if it is supported</li>
* <li>{@link #truncatePartitions}: truncate an array of partitions by removing partitions
* data</li>
* </ul>
*
* @since 3.1.0
*/

View file

@ -32,17 +32,15 @@ import org.apache.spark.sql.types.StructType;
* <p>
* These APIs are used to modify table partition identifier or partition metadata.
* In some cases, they will change the table data as well.
* ${@link #createPartition}:
* add a partition and any data it contains to the table
* ${@link #dropPartition}:
* remove a partition and any data it contains from the table
* ${@link #purgePartition}:
* remove a partition and any data it contains from the table by skipping a trash
* even if it is supported.
* ${@link #replacePartitionMetadata}:
* point a partition to a new location, which will swap one location's data for the other
* ${@link #truncatePartition}:
* remove partition data from the table
* <ul>
* <li>{@link #createPartition}: add a partition and any data it contains to the table</li>
* <li>{@link #dropPartition}: remove a partition and any data it contains from the table</li>
* <li>{@link #purgePartition}: remove a partition and any data it contains from the table by
* skipping a trash even if it is supported.</li>
* <li>{@link #replacePartitionMetadata}: point a partition to a new location, which will swap
* one location's data for the other</li>
* <li>{@link #truncatePartition}: remove partition data from the table</li>
* </ul>
*
* @since 3.1.0
*/

View file

@ -78,14 +78,17 @@ public interface TableProvider {
/**
* Returns true if the source has the ability of accepting external table metadata when getting
* tables. The external table metadata includes:
* 1. For table reader: user-specified schema from `DataFrameReader`/`DataStreamReader` and
* schema/partitioning stored in Spark catalog.
* 2. For table writer: the schema of the input `Dataframe` of
* `DataframeWriter`/`DataStreamWriter`.
* <ol>
* <li>For table reader: user-specified schema from {@code DataFrameReader}/{@code
* DataStreamReader} and schema/partitioning stored in Spark catalog.</li>
* <li>For table writer: the schema of the input {@code Dataframe} of
* {@code DataframeWriter}/{@code DataStreamWriter}.</li>
* </ol>
* <p>
* By default this method returns false, which means the schema and partitioning passed to
* `getTable` are from the infer methods. Please override it if this source has expensive
* schema/partitioning inference and wants external table metadata to avoid inference.
* {@link #getTable(StructType, Transform[], Map)} are from the infer methods. Please override it
* if this source has expensive schema/partitioning inference and wants external table metadata
* to avoid inference.
*/
default boolean supportsExternalMetadata() {
return false;

View file

@ -36,10 +36,8 @@ public interface Batch {
* <p>
* If the {@link Scan} supports filter pushdown, this Batch is likely configured with a filter
* and is responsible for creating splits for that filter, which is not a full scan.
* </p>
* <p>
* This method will be called only once during a data source scan, to launch one Spark job.
* </p>
*/
InputPartition[] planInputPartitions();

View file

@ -24,7 +24,7 @@ import org.apache.spark.annotation.Evolving;
/**
* A serializable representation of an input partition returned by
* {@link Batch#planInputPartitions()} and the corresponding ones in streaming .
*
* <p>
* Note that {@link InputPartition} will be serialized and sent to executors, then
* {@link PartitionReader} will be created by
* {@link PartitionReaderFactory#createReader(InputPartition)} or
@ -42,11 +42,11 @@ public interface InputPartition extends Serializable {
* faster, but Spark does not guarantee to run the input partition reader on these locations.
* The implementations should make sure that it can be run on any location.
* The location is a string representing the host name.
*
* <p>
* Note that if a host name cannot be recognized by Spark, it will be ignored as it was not in
* the returned locations. The default return value is empty string array, which means this
* input partition's reader has no location preference.
*
* <p>
* If this method fails (by throwing an exception), the action will fail and no Spark job will be
* submitted.
*/

View file

@ -27,7 +27,7 @@ import org.apache.spark.sql.connector.CustomTaskMetric;
* A partition reader returned by {@link PartitionReaderFactory#createReader(InputPartition)} or
* {@link PartitionReaderFactory#createColumnarReader(InputPartition)}. It's responsible for
* outputting data for a RDD partition.
*
* <p>
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.catalyst.InternalRow}
* for normal data sources, or {@link org.apache.spark.sql.vectorized.ColumnarBatch} for columnar
* data sources(whose {@link PartitionReaderFactory#supportColumnarReads(InputPartition)}

View file

@ -25,7 +25,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
/**
* A factory used to create {@link PartitionReader} instances.
*
* <p>
* If Spark fails to execute any methods in the implementations of this interface or in the returned
* {@link PartitionReader} (by throwing an exception), corresponding Spark task would fail and
* get retried until hitting the maximum retry times.
@ -37,7 +37,7 @@ public interface PartitionReaderFactory extends Serializable {
/**
* Returns a row-based partition reader to read data from the given {@link InputPartition}.
*
* <p>
* Implementations probably need to cast the input partition to the concrete
* {@link InputPartition} class defined for the data source.
*/
@ -45,7 +45,7 @@ public interface PartitionReaderFactory extends Serializable {
/**
* Returns a columnar partition reader to read data from the given {@link InputPartition}.
*
* <p>
* Implementations probably need to cast the input partition to the concrete
* {@link InputPartition} class defined for the data source.
*/
@ -57,7 +57,7 @@ public interface PartitionReaderFactory extends Serializable {
* Returns true if the given {@link InputPartition} should be read by Spark in a columnar way.
* This means, implementations must also implement {@link #createColumnarReader(InputPartition)}
* for the input partitions that this method returns true.
*
* <p>
* As of Spark 2.4, Spark can only read all input partition in a columnar way, or none of them.
* Data source can't mix columnar and row-based partitions. This may be relaxed in future
* versions.

View file

@ -39,14 +39,17 @@ public interface SupportsPushDownFilters extends ScanBuilder {
/**
* Returns the filters that are pushed to the data source via {@link #pushFilters(Filter[])}.
*
* <p>
* There are 3 kinds of filters:
* 1. pushable filters which don't need to be evaluated again after scanning.
* 2. pushable filters which still need to be evaluated after scanning, e.g. parquet
* row group filter.
* 3. non-pushable filters.
* <ol>
* <li>pushable filters which don't need to be evaluated again after scanning.</li>
* <li>pushable filters which still need to be evaluated after scanning, e.g. parquet row
* group filter.</li>
* <li>non-pushable filters.</li>
* </ol>
* <p>
* Both case 1 and 2 should be considered as pushed filters and should be returned by this method.
*
* <p>
* It's possible that there is no filters in the query and {@link #pushFilters(Filter[])}
* is never called, empty array should be returned for this case.
*/

View file

@ -32,11 +32,11 @@ public interface SupportsPushDownRequiredColumns extends ScanBuilder {
/**
* Applies column pruning w.r.t. the given requiredSchema.
*
* <p>
* Implementation should try its best to prune the unnecessary columns or nested fields, but it's
* also OK to do the pruning partially, e.g., a data source may not be able to prune nested
* fields, and only prune top-level columns.
*
* <p>
* Note that, {@link Scan#readSchema()} implementation should take care of the column
* pruning applied here.
*/

View file

@ -23,7 +23,7 @@ import org.apache.spark.sql.connector.read.partitioning.Partitioning;
/**
* A mix in interface for {@link Scan}. Data sources can implement this interface to
* report data partitioning and try to avoid shuffle at Spark side.
*
* <p>
* Note that, when a {@link Scan} implementation creates exactly one {@link InputPartition},
* Spark may avoid adding a shuffle even if the reader does not implement this interface.
*

View file

@ -22,7 +22,7 @@ import org.apache.spark.annotation.Evolving;
/**
* A mix in interface for {@link Scan}. Data sources can implement this interface to
* report statistics to Spark.
*
* <p>
* As of Spark 3.0, statistics are reported to the optimizer after operators are pushed to the
* data source. Implementations may return more accurate statistics based on pushed operators
* which may improve query performance by providing better information to the optimizer.

View file

@ -24,14 +24,15 @@ import org.apache.spark.sql.connector.read.PartitionReader;
* An interface to represent data distribution requirement, which specifies how the records should
* be distributed among the data partitions (one {@link PartitionReader} outputs data for one
* partition).
* <p>
* Note that this interface has nothing to do with the data ordering inside one
* partition(the output records of a single {@link PartitionReader}).
*
* <p>
* The instance of this interface is created and provided by Spark, then consumed by
* {@link Partitioning#satisfy(Distribution)}. This means data source developers don't need to
* implement this interface, but need to catch as more concrete implementations of this interface
* as possible in {@link Partitioning#satisfy(Distribution)}.
*
* <p>
* Concrete implementations until now:
* <ul>
* <li>{@link ClusteredDistribution}</li>

View file

@ -40,7 +40,7 @@ public interface Partitioning {
/**
* Returns true if this partitioning can satisfy the given distribution, which means Spark does
* not need to shuffle the output data of this data source for some certain operations.
*
* <p>
* Note that, Spark may add new concrete implementations of {@link Distribution} in new releases.
* This method should be aware of it and always return false for unrecognized distributions. It's
* recommended to check every Spark new release and support new distributions if possible, to

View file

@ -30,7 +30,7 @@ public interface ContinuousPartitionReader<T> extends PartitionReader<T> {
/**
* Get the offset of the current record, or the start offset if no records have been read.
*
* <p>
* The execution engine will call this method along with get() to keep track of the current
* offset. When an epoch ends, the offset of the previous record in each partition will be saved
* as a restart checkpoint.

View file

@ -62,7 +62,7 @@ public interface ContinuousStream extends SparkDataStream {
* The execution engine will call this method in every epoch to determine if new input
* partitions need to be generated, which may be required if for example the underlying
* source system has had partitions added or removed.
*
* <p>
* If true, the Spark job to scan this continuous data stream will be interrupted and Spark will
* launch it again with a new list of {@link InputPartition input partitions}.
*/

View file

@ -22,6 +22,7 @@ import org.apache.spark.annotation.Evolving;
/**
* An abstract representation of progress through a {@link MicroBatchStream} or
* {@link ContinuousStream}.
* <p>
* During execution, offsets provided by the data source implementation will be logged and used as
* restart checkpoints. Each source should provide an offset implementation which the source can use
* to reconstruct a position in the stream up to which data has been seen/processed.
@ -33,6 +34,7 @@ public abstract class Offset {
/**
* A JSON-serialized representation of an Offset that is
* used for saving offsets to the offset log.
* <p>
* Note: We assume that equivalent/equal offsets serialize to
* identical JSON strings.
*

View file

@ -24,7 +24,7 @@ import org.apache.spark.annotation.Evolving;
/**
* Used for per-partition offsets in continuous processing. ContinuousReader implementations will
* provide a method to merge these into a global Offset.
*
* <p>
* These offsets must be serializable.
*
* @since 3.0.0

View file

@ -22,7 +22,7 @@ import org.apache.spark.annotation.Evolving;
/**
* Represents a {@link ReadLimit} where the {@link MicroBatchStream} must scan all the data
* available at the streaming source. This is meant to be a hard specification as being able
* to return all available data is necessary for Trigger.Once() to work correctly.
* to return all available data is necessary for {@code Trigger.Once()} to work correctly.
* If a source is unable to scan all available data, then it must throw an error.
*
* @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)

View file

@ -22,7 +22,7 @@ import org.apache.spark.annotation.Evolving;
/**
* The base interface representing a readable data stream in a Spark streaming query. It's
* responsible to manage the offsets of the streaming source in the streaming query.
*
* <p>
* Data sources should implement concrete data stream interfaces:
* {@link MicroBatchStream} and {@link ContinuousStream}.
*

View file

@ -25,7 +25,7 @@ import org.apache.spark.annotation.Evolving;
* contract of triggers, e.g. Trigger.Once() requires that a micro-batch process all data
* available to the system at the start of the micro-batch. Alternatively, sources can decide to
* limit ingest through data source options.
*
* <p>
* Through this interface, a MicroBatchStream should be able to return the next offset that it will
* process until given a {@link ReadLimit}.
*
@ -44,11 +44,11 @@ public interface SupportsAdmissionControl extends SparkDataStream {
* Returns the most recent offset available given a read limit. The start offset can be used
* to figure out how much new data should be read given the limit. Users should implement this
* method instead of latestOffset for a MicroBatchStream or getOffset for Source.
*
* <p>
* When this method is called on a `Source`, the source can return `null` if there is no
* data to process. In addition, for the very first micro-batch, the `startOffset` will be
* null as well.
*
* <p>
* When this method is called on a MicroBatchStream, the `startOffset` will be `initialOffset`
* for the very first micro-batch. The source can return `null` if there is no data to process.
*/
@ -56,7 +56,7 @@ public interface SupportsAdmissionControl extends SparkDataStream {
/**
* Returns the most recent offset available.
*
* <p>
* The source can return `null`, if there is no data to process or the source does not support
* to this method.
*/

View file

@ -21,20 +21,22 @@ import org.apache.spark.annotation.Evolving;
/**
* An interface that defines how to write the data to data source for batch processing.
*
* <p>
* The writing procedure is:
* 1. Create a writer factory by {@link #createBatchWriterFactory(PhysicalWriteInfo)}, serialize
* and send it to all the partitions of the input data(RDD).
* 2. For each partition, create the data writer, and write the data of the partition with this
* writer. If all the data are written successfully, call {@link DataWriter#commit()}. If
* exception happens during the writing, call {@link DataWriter#abort()}.
* 3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If
* some writers are aborted, or the job failed with an unknown reason, call
* {@link #abort(WriterCommitMessage[])}.
*
* <ol>
* <li>Create a writer factory by {@link #createBatchWriterFactory(PhysicalWriteInfo)}, serialize
* and send it to all the partitions of the input data(RDD).</li>
* <li>For each partition, create the data writer, and write the data of the partition with this
* writer. If all the data are written successfully, call {@link DataWriter#commit()}. If
* exception happens during the writing, call {@link DataWriter#abort()}.</li>
* <li>If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If
* some writers are aborted, or the job failed with an unknown reason, call
* {@link #abort(WriterCommitMessage[])}.</li>
* </ol>
* <p>
* While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should
* do it manually in their Spark applications if they want to retry.
*
* <p>
* Please refer to the documentation of commit/abort methods for detailed specifications.
*
* @since 3.0.0
@ -44,7 +46,7 @@ public interface BatchWrite {
/**
* Creates a writer factory which will be serialized and sent to executors.
*
* <p>
* If this method fails (by throwing an exception), the action will fail and no Spark job will be
* submitted.
*

View file

@ -25,17 +25,17 @@ import org.apache.spark.annotation.Evolving;
/**
* A data writer returned by {@link DataWriterFactory#createWriter(int, long)} and is
* responsible for writing data for an input RDD partition.
*
* <p>
* One Spark task has one exclusive data writer, so there is no thread-safe concern.
*
* <p>
* {@link #write(Object)} is called for each record in the input RDD partition. If one record fails
* the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will
* not be processed. If all records are successfully written, {@link #commit()} is called.
*
* <p>
* Once a data writer returns successfully from {@link #commit()} or {@link #abort()}, Spark will
* call {@link #close()} to let DataWriter doing resource cleanup. After calling {@link #close()},
* its lifecycle is over and Spark will not use it again.
*
* <p>
* If this data writer succeeds(all records are successfully written and {@link #commit()}
* succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
* {@link BatchWrite#commit(WriterCommitMessage[])} with commit messages from other data
@ -44,7 +44,7 @@ import org.apache.spark.annotation.Evolving;
* In each retry, {@link DataWriterFactory#createWriter(int, long)} will receive a
* different `taskId`. Spark will call {@link BatchWrite#abort(WriterCommitMessage[])}
* when the configured number of retries is exhausted.
*
* <p>
* Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task
* takes too long to finish. Different from retried tasks, which are launched one by one after the
* previous one fails, speculative tasks are running simultaneously. It's possible that one input
@ -54,8 +54,9 @@ import org.apache.spark.annotation.Evolving;
* these data writers can commit successfully. Or implementations can allow all of them to commit
* successfully, and have a way to revert committed data writers without the commit message, because
* Spark only accepts the commit message that arrives first and ignore others.
*
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.catalyst.InternalRow}.
* <p>
* Note that, Currently the type {@code T} can only be
* {@link org.apache.spark.sql.catalyst.InternalRow}.
*
* @since 3.0.0
*/
@ -64,7 +65,7 @@ public interface DataWriter<T> extends Closeable {
/**
* Writes one record.
*
* <p>
* If this method fails (by throwing an exception), {@link #abort()} will be called and this
* data writer is considered to have been failed.
*
@ -76,12 +77,12 @@ public interface DataWriter<T> extends Closeable {
* Commits this writer after all records are written successfully, returns a commit message which
* will be sent back to driver side and passed to
* {@link BatchWrite#commit(WriterCommitMessage[])}.
*
* <p>
* The written data should only be visible to data source readers after
* {@link BatchWrite#commit(WriterCommitMessage[])} succeeds, which means this method
* should still "hide" the written data and ask the {@link BatchWrite} at driver side to
* do the final commit via {@link WriterCommitMessage}.
*
* <p>
* If this method fails (by throwing an exception), {@link #abort()} will be called and this
* data writer is considered to have been failed.
*
@ -92,10 +93,10 @@ public interface DataWriter<T> extends Closeable {
/**
* Aborts this writer if it is failed. Implementations should clean up the data for already
* written records.
*
* <p>
* This method will only be called if there is one record failed to write, or {@link #commit()}
* failed.
*
* <p>
* If this method fails(by throwing an exception), the underlying data source may have garbage
* that need to be cleaned by {@link BatchWrite#abort(WriterCommitMessage[])} or manually,
* but these garbage should not be visible to data source readers.

View file

@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow;
* A factory of {@link DataWriter} returned by
* {@link BatchWrite#createBatchWriterFactory(PhysicalWriteInfo)}, which is responsible for
* creating and initializing the actual data writer at executor side.
*
* <p>
* Note that, the writer factory will be serialized and sent to executors, then the data writer
* will be created on executors and do the actual writing. So this interface must be
* serializable and {@link DataWriter} doesn't need to be.
@ -42,7 +42,7 @@ public interface DataWriterFactory extends Serializable {
* object instance when sending data to the data writer, for better performance. Data writers
* are responsible for defensive copies if necessary, e.g. copy the data before buffer it in a
* list.
*
* <p>
* If this method fails (by throwing an exception), the corresponding Spark write task would fail
* and get retried until hitting the maximum retry times.
*

View file

@ -35,7 +35,7 @@ public interface LogicalWriteInfo {
CaseInsensitiveStringMap options();
/**
* `queryId` is a unique string of the query. It's possible that there are many queries
* {@code queryId} is a unique string of the query. It's possible that there are many queries
* running at the same time, or a query is restarted and resumed. {@link BatchWrite} can use
* this id to identify the query.
*/

View file

@ -23,7 +23,7 @@ import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
/**
* An interface for building the {@link Write}. Implementations can mix in some interfaces to
* support different ways to write data to data sources.
*
* <p>
* Unless modified by a mixin interface, the {@link Write} configured by this builder is to
* append data without affecting existing data.
*

View file

@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
* A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side
* as the input parameter of {@link BatchWrite#commit(WriterCommitMessage[])} or
* {@link StreamingWrite#commit(long, WriterCommitMessage[])}.
*
* <p>
* This is an empty interface, data sources should define their own message class and use it when
* generating messages at executor side and handling the messages at driver side.
*

View file

@ -29,7 +29,7 @@ import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
* A factory of {@link DataWriter} returned by
* {@link StreamingWrite#createStreamingWriterFactory(PhysicalWriteInfo)}, which is responsible for
* creating and initializing the actual data writer at executor side.
*
* <p>
* Note that, the writer factory will be serialized and sent to executors, then the data writer
* will be created on executors and do the actual writing. So this interface must be
* serializable and {@link DataWriter} doesn't need to be.
@ -44,7 +44,7 @@ public interface StreamingDataWriterFactory extends Serializable {
* object instance when sending data to the data writer, for better performance. Data writers
* are responsible for defensive copies if necessary, e.g. copy the data before buffer it in a
* list.
*
* <p>
* If this method fails (by throwing an exception), the corresponding Spark write task would fail
* and get retried until hitting the maximum retry times.
*

View file

@ -26,19 +26,21 @@ import org.apache.spark.sql.connector.write.WriterCommitMessage;
* An interface that defines how to write the data to data source in streaming queries.
*
* The writing procedure is:
* 1. Create a writer factory by {@link #createStreamingWriterFactory(PhysicalWriteInfo)},
* serialize and send it to all the partitions of the input data(RDD).
* 2. For each epoch in each partition, create the data writer, and write the data of the epoch in
* the partition with this writer. If all the data are written successfully, call
* {@link DataWriter#commit()}. If exception happens during the writing, call
* {@link DataWriter#abort()}.
* 3. If writers in all partitions of one epoch are successfully committed, call
* {@link #commit(long, WriterCommitMessage[])}. If some writers are aborted, or the job failed
* with an unknown reason, call {@link #abort(long, WriterCommitMessage[])}.
*
* <ol>
* <li>Create a writer factory by {@link #createStreamingWriterFactory(PhysicalWriteInfo)},
* serialize and send it to all the partitions of the input data(RDD).</li>
* <li>For each epoch in each partition, create the data writer, and write the data of the
* epoch in the partition with this writer. If all the data are written successfully, call
* {@link DataWriter#commit()}. If exception happens during the writing, call
* {@link DataWriter#abort()}.</li>
* <li>If writers in all partitions of one epoch are successfully committed, call
* {@link #commit(long, WriterCommitMessage[])}. If some writers are aborted, or the job failed
* with an unknown reason, call {@link #abort(long, WriterCommitMessage[])}.</li>
* </ol>
* <p>
* While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should
* do it manually in their Spark applications if they want to retry.
*
* <p>
* Please refer to the documentation of commit/abort methods for detailed specifications.
*
* @since 3.0.0
@ -48,7 +50,7 @@ public interface StreamingWrite {
/**
* Creates a writer factory which will be serialized and sent to executors.
*
* <p>
* If this method fails (by throwing an exception), the action will fail and no Spark job will be
* submitted.
*
@ -60,14 +62,14 @@ public interface StreamingWrite {
* Commits this writing job for the specified epoch with a list of commit messages. The commit
* messages are collected from successful data writers and are produced by
* {@link DataWriter#commit()}.
*
* <p>
* If this method fails (by throwing an exception), this writing job is considered to have been
* failed, and the execution engine will attempt to call
* {@link #abort(long, WriterCommitMessage[])}.
*
* The execution engine may call `commit` multiple times for the same epoch in some circumstances.
* To support exactly-once data semantics, implementations must ensure that multiple commits for
* the same epoch are idempotent.
* <p>
* The execution engine may call {@code commit} multiple times for the same epoch in some
* circumstances. To support exactly-once data semantics, implementations must ensure that
* multiple commits for the same epoch are idempotent.
*/
void commit(long epochId, WriterCommitMessage[] messages);
@ -75,10 +77,10 @@ public interface StreamingWrite {
* Aborts this writing job because some data writers are failed and keep failing when retried, or
* the Spark job fails with some unknown reasons, or {@link #commit(long, WriterCommitMessage[])}
* fails.
*
* <p>
* If this method fails (by throwing an exception), the underlying data source may require manual
* cleanup.
*
* <p>
* Unless the abort is triggered by the failure of commit, the given messages will have some
* null slots, as there may be only a few data writers that were committed before the abort
* happens, or some data writers were committed but their commit messages haven't reached the

View file

@ -23,8 +23,9 @@ import org.apache.spark.sql.catalyst.plans.logical.*;
/**
* Represents the type of timeouts possible for the Dataset operations
* `mapGroupsWithState` and `flatMapGroupsWithState`. See documentation on
* `GroupState` for more details.
* {@code mapGroupsWithState} and {@code flatMapGroupsWithState}.
* <p>
* See documentation on {@code GroupState} for more details.
*
* @since 2.2.0
*/
@ -33,21 +34,29 @@ import org.apache.spark.sql.catalyst.plans.logical.*;
public class GroupStateTimeout {
/**
* Timeout based on processing time. The duration of timeout can be set for each group in
* `map/flatMapGroupsWithState` by calling `GroupState.setTimeoutDuration()`. See documentation
* on `GroupState` for more details.
* Timeout based on processing time.
* <p>
* The duration of timeout can be set for each group in
* {@code map/flatMapGroupsWithState} by calling {@code GroupState.setTimeoutDuration()}.
* <p>
* See documentation on {@code GroupState} for more details.
*/
public static GroupStateTimeout ProcessingTimeTimeout() {
return ProcessingTimeTimeout$.MODULE$;
}
/**
* Timeout based on event-time. The event-time timestamp for timeout can be set for each
* group in `map/flatMapGroupsWithState` by calling `GroupState.setTimeoutTimestamp()`.
* In addition, you have to define the watermark in the query using `Dataset.withWatermark`.
* Timeout based on event-time.
* <p>
* The event-time timestamp for timeout can be set for each
* group in {@code map/flatMapGroupsWithState} by calling
* {@code GroupState.setTimeoutTimestamp()}.
* In addition, you have to define the watermark in the query using
* {@code Dataset.withWatermark}.
* When the watermark advances beyond the set timestamp of a group and the group has not
* received any data, then the group times out. See documentation on
* `GroupState` for more details.
* received any data, then the group times out.
* <p>
* See documentation on {@code GroupState} for more details.
*/
public static GroupStateTimeout EventTimeTimeout() { return EventTimeTimeout$.MODULE$; }

View file

@ -26,22 +26,23 @@ import org.apache.spark.unsafe.types.UTF8String;
* An interface representing in-memory columnar data in Spark. This interface defines the main APIs
* to access the data, as well as their batched versions. The batched versions are considered to be
* faster and preferable whenever possible.
*
* <p>
* Most of the APIs take the rowId as a parameter. This is the batch local 0-based row id for values
* in this ColumnVector.
*
* Spark only calls specific `get` method according to the data type of this {@link ColumnVector},
* <p>
* Spark only calls specific {@code get} method according to the data type of this
* {@link ColumnVector},
* e.g. if it's int type, Spark is guaranteed to only call {@link #getInt(int)} or
* {@link #getInts(int, int)}.
*
* <p>
* ColumnVector supports all the data types including nested types. To handle nested types,
* ColumnVector can have children and is a tree structure. Please refer to {@link #getStruct(int)},
* {@link #getArray(int)} and {@link #getMap(int)} for the details about how to implement nested
* types.
*
* <p>
* ColumnVector is expected to be reused during the entire data loading process, to avoid allocating
* memory again and again.
*
* <p>
* ColumnVector is meant to maximize CPU efficiency but not to minimize storage footprint.
* Implementations should prefer computing efficiency over storage efficiency when design the
* format. Since it is expected to reuse the ColumnVector instance while loading data, the storage
@ -57,9 +58,10 @@ public abstract class ColumnVector implements AutoCloseable {
/**
* Cleans up memory for this column vector. The column vector is not usable after this.
*
* This overwrites `AutoCloseable.close` to remove the `throws` clause, as column vector is
* in-memory and we don't expect any exception to happen during closing.
* <p>
* This overwrites {@link AutoCloseable#close} to remove the
* {@code throws} clause, as column vector is in-memory and we don't expect any exception to
* happen during closing.
*/
@Override
public abstract void close();
@ -75,19 +77,19 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract int numNulls();
/**
* Returns whether the value at rowId is NULL.
* Returns whether the value at {@code rowId} is NULL.
*/
public abstract boolean isNullAt(int rowId);
/**
* Returns the boolean type value for rowId. The return value is undefined and can be anything,
* if the slot for rowId is null.
* Returns the boolean type value for {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
*/
public abstract boolean getBoolean(int rowId);
/**
* Gets boolean type values from [rowId, rowId + count). The return values for the null slots
* are undefined and can be anything.
* Gets boolean type values from {@code [rowId, rowId + count)}. The return values for the null
* slots are undefined and can be anything.
*/
public boolean[] getBooleans(int rowId, int count) {
boolean[] res = new boolean[count];
@ -98,13 +100,13 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
* Returns the byte type value for rowId. The return value is undefined and can be anything,
* if the slot for rowId is null.
* Returns the byte type value for {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
*/
public abstract byte getByte(int rowId);
/**
* Gets byte type values from [rowId, rowId + count). The return values for the null slots
* Gets byte type values from {@code [rowId, rowId + count)}. The return values for the null slots
* are undefined and can be anything.
*/
public byte[] getBytes(int rowId, int count) {
@ -116,14 +118,14 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
* Returns the short type value for rowId. The return value is undefined and can be anything,
* if the slot for rowId is null.
* Returns the short type value for {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
*/
public abstract short getShort(int rowId);
/**
* Gets short type values from [rowId, rowId + count). The return values for the null slots
* are undefined and can be anything.
* Gets short type values from {@code [rowId, rowId + count)}. The return values for the null
* slots are undefined and can be anything.
*/
public short[] getShorts(int rowId, int count) {
short[] res = new short[count];
@ -134,13 +136,13 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
* Returns the int type value for rowId. The return value is undefined and can be anything,
* if the slot for rowId is null.
* Returns the int type value for {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
*/
public abstract int getInt(int rowId);
/**
* Gets int type values from [rowId, rowId + count). The return values for the null slots
* Gets int type values from {@code [rowId, rowId + count)}. The return values for the null slots
* are undefined and can be anything.
*/
public int[] getInts(int rowId, int count) {
@ -152,13 +154,13 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
* Returns the long type value for rowId. The return value is undefined and can be anything,
* if the slot for rowId is null.
* Returns the long type value for {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
*/
public abstract long getLong(int rowId);
/**
* Gets long type values from [rowId, rowId + count). The return values for the null slots
* Gets long type values from {@code [rowId, rowId + count)}. The return values for the null slots
* are undefined and can be anything.
*/
public long[] getLongs(int rowId, int count) {
@ -170,14 +172,14 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
* Returns the float type value for rowId. The return value is undefined and can be anything,
* if the slot for rowId is null.
* Returns the float type value for {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
*/
public abstract float getFloat(int rowId);
/**
* Gets float type values from [rowId, rowId + count). The return values for the null slots
* are undefined and can be anything.
* Gets float type values from {@code [rowId, rowId + count)}. The return values for the null
* slots are undefined and can be anything.
*/
public float[] getFloats(int rowId, int count) {
float[] res = new float[count];
@ -188,14 +190,14 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
* Returns the double type value for rowId. The return value is undefined and can be anything,
* if the slot for rowId is null.
* Returns the double type value for {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
*/
public abstract double getDouble(int rowId);
/**
* Gets double type values from [rowId, rowId + count). The return values for the null slots
* are undefined and can be anything.
* Gets double type values from {@code [rowId, rowId + count)}. The return values for the null
* slots are undefined and can be anything.
*/
public double[] getDoubles(int rowId, int count) {
double[] res = new double[count];
@ -206,8 +208,9 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
* Returns the struct type value for rowId. If the slot for rowId is null, it should return null.
*
* Returns the struct type value for {@code rowId}. If the slot for {@code rowId} is null, it
* should return null.
* <p>
* To support struct type, implementations must implement {@link #getChild(int)} and make this
* vector a tree structure. The number of child vectors must be same as the number of fields of
* the struct type, and each child vector is responsible to store the data for its corresponding
@ -219,8 +222,9 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
* Returns the array type value for rowId. If the slot for rowId is null, it should return null.
*
* Returns the array type value for {@code rowId}. If the slot for {@code rowId} is null, it
* should return null.
* <p>
* To support array type, implementations must construct an {@link ColumnarArray} and return it in
* this method. {@link ColumnarArray} requires a {@link ColumnVector} that stores the data of all
* the elements of all the arrays in this vector, and an offset and length which points to a range
@ -232,12 +236,13 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract ColumnarArray getArray(int rowId);
/**
* Returns the map type value for rowId. If the slot for rowId is null, it should return null.
*
* Returns the map type value for {@code rowId}. If the slot for {@code rowId} is null, it
* should return null.
* <p>
* In Spark, map type value is basically a key data array and a value data array. A key from the
* key array with a index and a value from the value array with the same index contribute to
* an entry of this map type value.
*
* <p>
* To support map type, implementations must construct a {@link ColumnarMap} and return it in
* this method. {@link ColumnarMap} requires a {@link ColumnVector} that stores the data of all
* the keys of all the maps in this vector, and another {@link ColumnVector} that stores the data
@ -247,31 +252,35 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract ColumnarMap getMap(int ordinal);
/**
* Returns the decimal type value for rowId. If the slot for rowId is null, it should return null.
* Returns the decimal type value for {@code rowId}. If the slot for {@code rowId} is null, it
* should return null.
*/
public abstract Decimal getDecimal(int rowId, int precision, int scale);
/**
* Returns the string type value for rowId. If the slot for rowId is null, it should return null.
* Note that the returned UTF8String may point to the data of this column vector, please copy it
* if you want to keep it after this column vector is freed.
* Returns the string type value for {@code rowId}. If the slot for {@code rowId} is null, it
* should return null.
* <p>
* Note that the returned {@link UTF8String} may point to the data of this column vector,
* please copy it if you want to keep it after this column vector is freed.
*/
public abstract UTF8String getUTF8String(int rowId);
/**
* Returns the binary type value for rowId. If the slot for rowId is null, it should return null.
* Returns the binary type value for {@code rowId}. If the slot for {@code rowId} is null, it
* should return null.
*/
public abstract byte[] getBinary(int rowId);
/**
* Returns the calendar interval type value for rowId. If the slot for rowId is null, it should
* return null.
*
* Returns the calendar interval type value for {@code rowId}. If the slot for
* {@code rowId} is null, it should return null.
* <p>
* In Spark, calendar interval type value is basically two integer values representing the number
* of months and days in this interval, and a long value representing the number of microseconds
* in this interval. An interval type vector is the same as a struct type vector with 3 fields:
* `months`, `days` and `microseconds`.
*
* {@code months}, {@code days} and {@code microseconds}.
* <p>
* To support interval type, implementations must implement {@link #getChild(int)} and define 3
* child vectors: the first child vector is an int type vector, containing all the month values of
* all the interval values in this vector. The second child vector is an int type vector,
@ -288,7 +297,7 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
* @return child [[ColumnVector]] at the given ordinal.
* @return child {@link ColumnVector} at the given ordinal.
*/
public abstract ColumnVector getChild(int ordinal);