[SPARK-30809][SQL] Review and fix issues in SQL API docs

### What changes were proposed in this pull request?
- Add missing `since` annotation.
- Don't show classes under `org.apache.spark.sql.dynamicpruning` package in API docs.
- Fix the scope of `xxxExactNumeric` to remove it from the API docs.

### Why are the changes needed?
Avoid leaking APIs unintentionally in Spark 3.0.0.

### Does this PR introduce any user-facing change?
No. All these changes are to avoid leaking APIs unintentionally in Spark 3.0.0.

### How was this patch tested?
Manually generated the API docs and verified the above issues have been fixed.

Closes #27560 from xuanyuanking/SPARK-30809.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Yuanjian Li 2020-02-21 17:03:22 +08:00 committed by Wenchen Fan
parent abe0821ee9
commit a5efbb284e
61 changed files with 273 additions and 169 deletions

View file

@ -108,7 +108,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
/**
* An exception thrown if the RPC is aborted.
*/
class RpcAbortException(message: String) extends Exception(message)
private[spark] class RpcAbortException(message: String) extends Exception(message)
/**
* A wrapper for [[Future]] but add abort method.

View file

@ -823,6 +823,7 @@ object Unidoc {
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/internal")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/memory")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/network")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/rpc")))
.map(_.filterNot(f =>
f.getCanonicalPath.contains("org/apache/spark/shuffle") &&
!f.getCanonicalPath.contains("org/apache/spark/shuffle/api")))

View file

@ -27,6 +27,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
* cores for Solr and so on.
* <p>
* This interface will be instantiated when end users call `SparkSession#executeCommand`.
*
* @since 3.0.0
*/
@Unstable
public interface ExternalCommandRunner {

View file

@ -26,6 +26,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
* some custom logic and call the built-in session catalog at the end. For example, they can
* implement {@code createTable}, do something else before calling {@code createTable} of the
* built-in session catalog.
*
* @since 3.0.0
*/
@Experimental
public interface CatalogExtension extends TableCatalog, SupportsNamespaces {

View file

@ -38,6 +38,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
* {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive
* string map of options in initialization with the prefix removed.
* {@code name}, is also passed and is the catalog's name; in this case, "catalog-name".
*
* @since 3.0.0
*/
@Experimental
public interface CatalogPlugin {

View file

@ -1,127 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connector.catalog;
import org.apache.spark.SparkException;
import org.apache.spark.annotation.Private;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Utils;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static scala.collection.JavaConverters.mapAsJavaMapConverter;
@Private
public class Catalogs {
private Catalogs() {
}
/**
* Load and configure a catalog by name.
* <p>
* This loads, instantiates, and initializes the catalog plugin for each call; it does not cache
* or reuse instances.
*
* @param name a String catalog name
* @param conf a SQLConf
* @return an initialized CatalogPlugin
* @throws CatalogNotFoundException if the plugin class cannot be found
* @throws SparkException if the plugin class cannot be instantiated
*/
public static CatalogPlugin load(String name, SQLConf conf)
throws CatalogNotFoundException, SparkException {
String pluginClassName;
try {
pluginClassName = conf.getConfString("spark.sql.catalog." + name);
} catch (NoSuchElementException e){
throw new CatalogNotFoundException(String.format(
"Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name));
}
ClassLoader loader = Utils.getContextOrSparkClassLoader();
try {
Class<?> pluginClass = loader.loadClass(pluginClassName);
if (!CatalogPlugin.class.isAssignableFrom(pluginClass)) {
throw new SparkException(String.format(
"Plugin class for catalog '%s' does not implement CatalogPlugin: %s",
name, pluginClassName));
}
CatalogPlugin plugin =
CatalogPlugin.class.cast(pluginClass.getDeclaredConstructor().newInstance());
plugin.initialize(name, catalogOptions(name, conf));
return plugin;
} catch (ClassNotFoundException e) {
throw new SparkException(String.format(
"Cannot find catalog plugin class for catalog '%s': %s", name, pluginClassName));
} catch (NoSuchMethodException e) {
throw new SparkException(String.format(
"Failed to find public no-arg constructor for catalog '%s': %s", name, pluginClassName),
e);
} catch (IllegalAccessException e) {
throw new SparkException(String.format(
"Failed to call public no-arg constructor for catalog '%s': %s", name, pluginClassName),
e);
} catch (InstantiationException e) {
throw new SparkException(String.format(
"Cannot instantiate abstract catalog plugin class for catalog '%s': %s", name,
pluginClassName), e.getCause());
} catch (InvocationTargetException e) {
throw new SparkException(String.format(
"Failed during instantiating constructor for catalog '%s': %s", name, pluginClassName),
e.getCause());
}
}
/**
* Extracts a named catalog's configuration from a SQLConf.
*
* @param name a catalog name
* @param conf a SQLConf
* @return a case insensitive string map of options starting with spark.sql.catalog.(name).
*/
private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) {
Map<String, String> allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava();
Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)");
HashMap<String, String> options = new HashMap<>();
for (Map.Entry<String, String> entry : allConfs.entrySet()) {
Matcher matcher = prefix.matcher(entry.getKey());
if (matcher.matches() && matcher.groupCount() > 0) {
options.put(matcher.group(1), entry.getValue());
}
}
return new CaseInsensitiveStringMap(options);
}
}

View file

@ -33,6 +33,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
* by calling the built-in session catalog directly. This is created for convenience, so that users
* only need to override some methods where they want to apply custom logic. For example, they can
* override {@code createTable}, do something else before calling {@code super.createTable}.
*
* @since 3.0.0
*/
@Experimental
public abstract class DelegatingCatalogExtension implements CatalogExtension {

View file

@ -21,6 +21,8 @@ import org.apache.spark.annotation.Experimental;
/**
* Identifies an object in a catalog.
*
* @since 3.0.0
*/
@Experimental
public interface Identifier {

View file

@ -30,6 +30,8 @@ import org.apache.spark.annotation.Experimental;
* removeProperty("other_prop")
* )
* </pre>
*
* @since 3.0.0
*/
@Experimental
public interface NamespaceChange {

View file

@ -23,6 +23,8 @@ import org.apache.spark.annotation.Evolving;
* A mix-in interface for {@link TableProvider}. Data sources can implement this interface to
* propagate session configs with the specified key-prefix to all data source operations in this
* session.
*
* @since 3.0.0
*/
@Evolving
public interface SessionConfigSupport extends TableProvider {

View file

@ -36,6 +36,8 @@ import org.apache.spark.sql.types.StructType;
* write will be committed. The job concludes with a call to {@link #commitStagedChanges()}, at
* which point implementations are expected to commit the table's metadata into the metastore along
* with the data that was written by the writes from the write builder this table created.
*
* @since 3.0.0
*/
@Experimental
public interface StagedTable extends Table {

View file

@ -48,6 +48,8 @@ import org.apache.spark.sql.types.StructType;
* {@link BatchWrite#commit(WriterCommitMessage[])} is called,
* {@link StagedTable#commitStagedChanges()} is called, at which point the staged table can
* complete both the data write and the metadata swap operation atomically.
*
* @since 3.0.0
*/
@Experimental
public interface StagingTableCatalog extends TableCatalog {

View file

@ -26,6 +26,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
* sources use the `DataFrameWriter.save(path)` method, the option `path` can translate to a
* PathIdentifier. A catalog can then use this PathIdentifier to check the existence of a table, or
* whether a table can be created at a given directory.
*
* @since 3.0.0
*/
@Evolving
public interface SupportsCatalogOptions extends TableProvider {

View file

@ -23,6 +23,8 @@ import org.apache.spark.sql.sources.Filter;
/**
* A mix-in interface for {@link Table} delete support. Data sources can implement this
* interface to provide the ability to delete data from tables that matches filter expressions.
*
* @since 3.0.0
*/
@Experimental
public interface SupportsDelete {

View file

@ -35,6 +35,8 @@ import java.util.Map;
* and uses Java packages as namespaces is not required to support the methods to create, alter, or
* drop a namespace. Implementations are allowed to discover the existence of objects or namespaces
* without throwing {@link NoSuchNamespaceException} when no namespace is found.
*
* @since 3.0.0
*/
@Experimental
public interface SupportsNamespaces extends CatalogPlugin {

View file

@ -26,6 +26,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
* A mix-in interface of {@link Table}, to indicate that it's readable. This adds
* {@link #newScanBuilder(CaseInsensitiveStringMap)} that is used to create a scan for batch,
* micro-batch, or continuous processing.
*
* @since 3.0.0
*/
@Experimental
public interface SupportsRead extends Table {

View file

@ -26,6 +26,8 @@ import org.apache.spark.sql.connector.write.WriteBuilder;
* A mix-in interface of {@link Table}, to indicate that it's writable. This adds
* {@link #newWriteBuilder(LogicalWriteInfo)} that is used to create a
* write for batch or streaming.
*
* @since 3.0.0
*/
@Experimental
public interface SupportsWrite extends Table {

View file

@ -36,6 +36,8 @@ import java.util.Set;
* The default implementation of {@link #partitioning()} returns an empty array of partitions, and
* the default implementation of {@link #properties()} returns an empty map. These should be
* overridden by implementations that support partitioning and table properties.
*
* @since 3.0.0
*/
@Evolving
public interface Table {

View file

@ -25,6 +25,8 @@ import org.apache.spark.annotation.Experimental;
* Tables use {@link Table#capabilities()} to return a set of capabilities. Each capability signals
* to Spark that the table supports a feature identified by the capability. For example, returning
* {@link #BATCH_READ} allows Spark to read from the table using a batch scan.
*
* @since 3.0.0
*/
@Experimental
public enum TableCapability {

View file

@ -34,6 +34,8 @@ import java.util.Map;
* {@link #alterTable(Identifier, TableChange...)} will be normalized to match the case used in the
* table schema when updating, renaming, or dropping existing columns when catalyst analysis is case
* insensitive.
*
* @since 3.0.0
*/
@Experimental
public interface TableCatalog extends CatalogPlugin {

View file

@ -36,6 +36,8 @@ import org.apache.spark.sql.types.DataType;
* deleteColumn("c")
* )
* </pre>
*
* @since 3.0.0
*/
@Experimental
public interface TableChange {

View file

@ -34,6 +34,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
* <p>
* The major responsibility of this interface is to return a {@link Table} for read/write.
* </p>
*
* @since 3.0.0
*/
@Evolving
public interface TableProvider {

View file

@ -21,6 +21,8 @@ import org.apache.spark.annotation.Experimental;
/**
* Base class of the public logical expression API.
*
* @since 3.0.0
*/
@Experimental
public interface Expression {

View file

@ -26,6 +26,8 @@ import org.apache.spark.sql.types.DataType;
/**
* Helper methods to create logical transforms to pass into Spark.
*
* @since 3.0.0
*/
@Experimental
public class Expressions {

View file

@ -27,6 +27,7 @@ import org.apache.spark.sql.types.DataType;
* the literal's {@link DataType SQL data type}.
*
* @param <T> the JVM type of a value held by the literal
* @since 3.0.0
*/
@Experimental
public interface Literal<T> extends Expression {

View file

@ -21,6 +21,8 @@ import org.apache.spark.annotation.Experimental;
/**
* Represents a field or column reference in the public logical expression API.
*
* @since 3.0.0
*/
@Experimental
public interface NamedReference extends Expression {

View file

@ -24,6 +24,8 @@ import org.apache.spark.annotation.Experimental;
* <p>
* For example, the transform date(ts) is used to derive a date value from a timestamp column. The
* transform name is "date" and its argument is a reference to the "ts" column.
*
* @since 3.0.0
*/
@Experimental
public interface Transform extends Expression {

View file

@ -23,6 +23,8 @@ import org.apache.spark.annotation.Evolving;
* A physical representation of a data source scan for batch queries. This interface is used to
* provide physical information, like how many partitions the scanned data has, and how to read
* records from the partitions.
*
* @since 3.0.0
*/
@Evolving
public interface Batch {

View file

@ -31,6 +31,8 @@ import org.apache.spark.annotation.Evolving;
* {@link PartitionReaderFactory#createColumnarReader(InputPartition)} on executors to do
* the actual reading. So {@link InputPartition} must be serializable while {@link PartitionReader}
* doesn't need to be.
*
* @since 3.0.0
*/
@Evolving
public interface InputPartition extends Serializable {

View file

@ -31,6 +31,8 @@ import org.apache.spark.annotation.Evolving;
* for normal data sources, or {@link org.apache.spark.sql.vectorized.ColumnarBatch} for columnar
* data sources(whose {@link PartitionReaderFactory#supportColumnarReads(InputPartition)}
* returns true).
*
* @since 3.0.0
*/
@Evolving
public interface PartitionReader<T> extends Closeable {

View file

@ -29,6 +29,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
* If Spark fails to execute any methods in the implementations of this interface or in the returned
* {@link PartitionReader} (by throwing an exception), corresponding Spark task would fail and
* get retried until hitting the maximum retry times.
*
* @since 3.0.0
*/
@Evolving
public interface PartitionReaderFactory extends Serializable {

View file

@ -34,6 +34,8 @@ import org.apache.spark.sql.connector.catalog.TableCapability;
* implemented, if the {@link Table} that creates this {@link Scan} returns
* {@link TableCapability#BATCH_READ} support in its {@link Table#capabilities()}.
* </p>
*
* @since 3.0.0
*/
@Evolving
public interface Scan {

View file

@ -23,6 +23,8 @@ import org.apache.spark.annotation.Evolving;
* An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ
* interfaces to do operator pushdown, and keep the operator pushdown result in the returned
* {@link Scan}.
*
* @since 3.0.0
*/
@Evolving
public interface ScanBuilder {

View file

@ -24,6 +24,8 @@ import org.apache.spark.annotation.Evolving;
/**
* An interface to represent statistics for a data source, which is returned by
* {@link SupportsReportStatistics#estimateStatistics()}.
*
* @since 3.0.0
*/
@Evolving
public interface Statistics {

View file

@ -23,6 +23,8 @@ import org.apache.spark.sql.sources.Filter;
/**
* A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to
* push down filters to the data source and reduce the size of the data to be read.
*
* @since 3.0.0
*/
@Evolving
public interface SupportsPushDownFilters extends ScanBuilder {

View file

@ -24,6 +24,8 @@ import org.apache.spark.sql.types.StructType;
* A mix-in interface for {@link ScanBuilder}. Data sources can implement this
* interface to push down required columns to the data source and only read these columns during
* scan to reduce the size of the data to be read.
*
* @since 3.0.0
*/
@Evolving
public interface SupportsPushDownRequiredColumns extends ScanBuilder {

View file

@ -26,6 +26,8 @@ import org.apache.spark.sql.connector.read.partitioning.Partitioning;
*
* Note that, when a {@link Scan} implementation creates exactly one {@link InputPartition},
* Spark may avoid adding a shuffle even if the reader does not implement this interface.
*
* @since 3.0.0
*/
@Evolving
public interface SupportsReportPartitioning extends Scan {

View file

@ -26,6 +26,8 @@ import org.apache.spark.annotation.Evolving;
* As of Spark 2.4, statistics are reported to the optimizer before any operator is pushed to the
* data source. Implementations that return more accurate statistics based on pushed operators will
* not improve query performance until the planner can push operators before getting stats.
*
* @since 3.0.0
*/
@Evolving
public interface SupportsReportStatistics extends Scan {

View file

@ -24,6 +24,8 @@ import org.apache.spark.sql.connector.read.PartitionReader;
* A concrete implementation of {@link Distribution}. Represents a distribution where records that
* share the same values for the {@link #clusteredColumns} will be produced by the same
* {@link PartitionReader}.
*
* @since 3.0.0
*/
@Evolving
public class ClusteredDistribution implements Distribution {

View file

@ -36,6 +36,8 @@ import org.apache.spark.sql.connector.read.PartitionReader;
* <ul>
* <li>{@link ClusteredDistribution}</li>
* </ul>
*
* @since 3.0.0
*/
@Evolving
public interface Distribution {}

View file

@ -26,6 +26,8 @@ import org.apache.spark.sql.connector.read.SupportsReportPartitioning;
* {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work
* like a snapshot. Once created, it should be deterministic and always report the same number of
* partitions and the same "satisfy" result for a certain distribution.
*
* @since 3.0.0
*/
@Evolving
public interface Partitioning {

View file

@ -36,6 +36,8 @@ import org.apache.spark.annotation.Evolving;
* do it manually in their Spark applications if they want to retry.
*
* Please refer to the documentation of commit/abort methods for detailed specifications.
*
* @since 3.0.0
*/
@Evolving
public interface BatchWrite {

View file

@ -56,6 +56,8 @@ import org.apache.spark.annotation.Evolving;
* Spark only accepts the commit message that arrives first and ignore others.
*
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.catalyst.InternalRow}.
*
* @since 3.0.0
*/
@Evolving
public interface DataWriter<T> extends Closeable {

View file

@ -31,6 +31,8 @@ import org.apache.spark.sql.catalyst.InternalRow;
* Note that, the writer factory will be serialized and sent to executors, then the data writer
* will be created on executors and do the actual writing. So this interface must be
* serializable and {@link DataWriter} doesn't need to be.
*
* @since 3.0.0
*/
@Evolving
public interface DataWriterFactory extends Serializable {

View file

@ -24,6 +24,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
* This interface contains logical write information that data sources can use when generating a
* {@link WriteBuilder}.
*
* @since 3.0.0
*/
@Evolving
public interface LogicalWriteInfo {

View file

@ -23,6 +23,8 @@ import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory
/**
* This interface contains physical write information that data sources can use when
* generating a {@link DataWriterFactory} or a {@link StreamingDataWriterFactory}.
*
* @since 3.0.0
*/
@Evolving
public interface PhysicalWriteInfo {

View file

@ -26,6 +26,8 @@ package org.apache.spark.sql.connector.write;
* <p>
* This is provided to implement SQL compatible with Hive table operations but is not recommended.
* Instead, use the {@link SupportsOverwrite overwrite by filter API} to explicitly replace data.
*
* @since 3.0.0
*/
public interface SupportsDynamicOverwrite extends WriteBuilder {
/**

View file

@ -25,6 +25,8 @@ import org.apache.spark.sql.sources.Filter;
* <p>
* Overwriting data by filter will delete any data that matches the filter and replace it with data
* that is committed in the write.
*
* @since 3.0.0
*/
public interface SupportsOverwrite extends WriteBuilder, SupportsTruncate {
/**

View file

@ -21,6 +21,8 @@ package org.apache.spark.sql.connector.write;
* Write builder trait for tables that support truncation.
* <p>
* Truncation removes all data in a table and replaces it with data that is committed in the write.
*
* @since 3.0.0
*/
public interface SupportsTruncate extends WriteBuilder {
/**

View file

@ -28,6 +28,8 @@ import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
*
* Unless modified by a mixin interface, the {@link BatchWrite} configured by this builder is to
* append data without affecting existing data.
*
* @since 3.0.0
*/
@Evolving
public interface WriteBuilder {

View file

@ -29,6 +29,8 @@ import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
*
* This is an empty interface, data sources should define their own message class and use it when
* generating messages at executor side and handling the messages at driver side.
*
* @since 3.0.0
*/
@Evolving
public interface WriterCommitMessage extends Serializable {}

View file

@ -36,6 +36,8 @@ import java.util.Set;
* <p>
* Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return
* keys converted to lower case. This map doesn't allow null key.
*
* @since 3.0.0
*/
@Experimental
public class CaseInsensitiveStringMap implements Map<String, String> {

View file

@ -162,7 +162,8 @@ case class BitwiseNot(child: Expression) extends UnaryExpression with ExpectsInp
Examples:
> SELECT _FUNC_(0);
0
""")
""",
since = "3.0.0")
case class BitwiseCount(child: Expression) extends UnaryExpression with ExpectsInputTypes {
override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(IntegralType, BooleanType))

View file

@ -601,7 +601,8 @@ object Murmur3HashFunction extends InterpretedHashFunction {
Examples:
> SELECT _FUNC_('Spark', array(123), 2);
5602566077635097486
""")
""",
since = "3.0.0")
case class XxHash64(children: Seq[Expression], seed: Long) extends HashExpression[Long] {
def this(arguments: Seq[Expression]) = this(arguments, 42L)

View file

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.util
import com.fasterxml.jackson.core.{JsonGenerator, JsonParser}
import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonSerializer, SerializerProvider}
import org.json4s.JsonAST.JValue
import org.json4s.jackson.{JValueDeserializer, JValueSerializer}
import org.apache.spark.sql.types.DataType
object DataTypeJsonUtils {
/**
* Jackson serializer for [[DataType]]. Internally this delegates to json4s based serialization.
*/
class DataTypeJsonSerializer extends JsonSerializer[DataType] {
private val delegate = new JValueSerializer
override def serialize(
value: DataType,
gen: JsonGenerator,
provider: SerializerProvider): Unit = {
delegate.serialize(value.jsonValue, gen, provider)
}
}
/**
* Jackson deserializer for [[DataType]]. Internally this delegates to json4s based
* deserialization.
*/
class DataTypeJsonDeserializer extends JsonDeserializer[DataType] {
private val delegate = new JValueDeserializer(classOf[Any])
override def deserialize(
jsonParser: JsonParser,
deserializationContext: DeserializationContext): DataType = {
val json = delegate.deserialize(jsonParser, deserializationContext)
DataType.parseDataType(json.asInstanceOf[JValue])
}
}
}

View file

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connector.catalog
import java.lang.reflect.InvocationTargetException
import java.util
import java.util.NoSuchElementException
import java.util.regex.Pattern
import org.apache.spark.SparkException
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils
private[sql] object Catalogs {
/**
* Load and configure a catalog by name.
* <p>
* This loads, instantiates, and initializes the catalog plugin for each call; it does not cache
* or reuse instances.
*
* @param name a String catalog name
* @param conf a SQLConf
* @return an initialized CatalogPlugin
* @throws CatalogNotFoundException if the plugin class cannot be found
* @throws org.apache.spark.SparkException if the plugin class cannot be instantiated
*/
@throws[CatalogNotFoundException]
@throws[SparkException]
def load(name: String, conf: SQLConf): CatalogPlugin = {
val pluginClassName = try {
conf.getConfString("spark.sql.catalog." + name)
} catch {
case _: NoSuchElementException =>
throw new CatalogNotFoundException(
s"Catalog '$name' plugin class not found: spark.sql.catalog.$name is not defined")
}
val loader = Utils.getContextOrSparkClassLoader
try {
val pluginClass = loader.loadClass(pluginClassName)
if (!classOf[CatalogPlugin].isAssignableFrom(pluginClass)) {
throw new SparkException(
s"Plugin class for catalog '$name' does not implement CatalogPlugin: $pluginClassName")
}
val plugin = pluginClass.getDeclaredConstructor().newInstance().asInstanceOf[CatalogPlugin]
plugin.initialize(name, catalogOptions(name, conf))
plugin
} catch {
case _: ClassNotFoundException =>
throw new SparkException(
s"Cannot find catalog plugin class for catalog '$name': $pluginClassName")
case e: NoSuchMethodException =>
throw new SparkException(
s"Failed to find public no-arg constructor for catalog '$name': $pluginClassName)", e)
case e: IllegalAccessException =>
throw new SparkException(
s"Failed to call public no-arg constructor for catalog '$name': $pluginClassName)", e)
case e: InstantiationException =>
throw new SparkException("Cannot instantiate abstract catalog plugin class for " +
s"catalog '$name': $pluginClassName", e.getCause)
case e: InvocationTargetException =>
throw new SparkException("Failed during instantiating constructor for catalog " +
s"'$name': $pluginClassName", e.getCause)
}
}
/**
* Extracts a named catalog's configuration from a SQLConf.
*
* @param name a catalog name
* @param conf a SQLConf
* @return a case insensitive string map of options starting with spark.sql.catalog.(name).
*/
private def catalogOptions(name: String, conf: SQLConf) = {
val prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)")
val options = new util.HashMap[String, String]
conf.getAllConfs.foreach {
case (key, value) =>
val matcher = prefix.matcher(key)
if (matcher.matches && matcher.groupCount > 0) options.put(matcher.group(1), value)
}
new CaseInsensitiveStringMap(options)
}
}

View file

@ -64,7 +64,7 @@ private[sql] object LogicalExpressions {
/**
* Allows Spark to rewrite the given references of the transform during analysis.
*/
sealed trait RewritableTransform extends Transform {
private[sql] sealed trait RewritableTransform extends Transform {
/** Creates a copy of this transform with the new analyzed references. */
def withReferences(newReferences: Seq[NamedReference]): Transform
}

View file

@ -221,6 +221,8 @@ case class StringContains(attribute: String, value: String) extends Filter {
/**
* A filter that always evaluates to `true`.
*
* @since 3.0.0
*/
@Evolving
case class AlwaysTrue() extends Filter {
@ -233,6 +235,8 @@ object AlwaysTrue extends AlwaysTrue {
/**
* A filter that always evaluates to `false`.
*
* @since 3.0.0
*/
@Evolving
case class AlwaysFalse() extends Filter {

View file

@ -21,20 +21,17 @@ import java.util.Locale
import scala.util.control.NonFatal
import com.fasterxml.jackson.core.{JsonGenerator, JsonParser}
import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonSerializer, SerializerProvider}
import com.fasterxml.jackson.databind.`type`.TypeFactory
import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize}
import org.json4s._
import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.jackson.{JValueDeserializer, JValueSerializer}
import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.DataTypeJsonUtils.{DataTypeJsonDeserializer, DataTypeJsonSerializer}
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
@ -485,30 +482,3 @@ object DataType {
}
}
}
/**
* Jackson serializer for [[DataType]]. Internally this delegates to json4s based serialization.
*/
class DataTypeJsonSerializer extends JsonSerializer[DataType] {
private val delegate = new JValueSerializer
override def serialize(
value: DataType,
gen: JsonGenerator,
provider: SerializerProvider): Unit = {
delegate.serialize(value.jsonValue, gen, provider)
}
}
/**
* Jackson deserializer for [[DataType]]. Internally this delegates to json4s based deserialization.
*/
class DataTypeJsonDeserializer extends JsonDeserializer[DataType] {
private val delegate = new JValueDeserializer(classOf[Any])
override def deserialize(
jsonParser: JsonParser,
deserializationContext: DeserializationContext): DataType = {
val json = delegate.deserialize(jsonParser, deserializationContext)
DataType.parseDataType(json.asInstanceOf[JValue])
}
}

View file

@ -22,7 +22,7 @@ import scala.math.Ordering
import org.apache.spark.sql.types.Decimal.DecimalIsConflicted
object ByteExactNumeric extends ByteIsIntegral with Ordering.ByteOrdering {
private[sql] object ByteExactNumeric extends ByteIsIntegral with Ordering.ByteOrdering {
private def checkOverflow(res: Int, x: Byte, y: Byte, op: String): Unit = {
if (res > Byte.MaxValue || res < Byte.MinValue) {
throw new ArithmeticException(s"$x $op $y caused overflow.")
@ -56,7 +56,7 @@ object ByteExactNumeric extends ByteIsIntegral with Ordering.ByteOrdering {
}
object ShortExactNumeric extends ShortIsIntegral with Ordering.ShortOrdering {
private[sql] object ShortExactNumeric extends ShortIsIntegral with Ordering.ShortOrdering {
private def checkOverflow(res: Int, x: Short, y: Short, op: String): Unit = {
if (res > Short.MaxValue || res < Short.MinValue) {
throw new ArithmeticException(s"$x $op $y caused overflow.")
@ -90,7 +90,7 @@ object ShortExactNumeric extends ShortIsIntegral with Ordering.ShortOrdering {
}
object IntegerExactNumeric extends IntIsIntegral with Ordering.IntOrdering {
private[sql] object IntegerExactNumeric extends IntIsIntegral with Ordering.IntOrdering {
override def plus(x: Int, y: Int): Int = Math.addExact(x, y)
override def minus(x: Int, y: Int): Int = Math.subtractExact(x, y)
@ -100,7 +100,7 @@ object IntegerExactNumeric extends IntIsIntegral with Ordering.IntOrdering {
override def negate(x: Int): Int = Math.negateExact(x)
}
object LongExactNumeric extends LongIsIntegral with Ordering.LongOrdering {
private[sql] object LongExactNumeric extends LongIsIntegral with Ordering.LongOrdering {
override def plus(x: Long, y: Long): Long = Math.addExact(x, y)
override def minus(x: Long, y: Long): Long = Math.subtractExact(x, y)
@ -117,7 +117,7 @@ object LongExactNumeric extends LongIsIntegral with Ordering.LongOrdering {
}
}
object FloatExactNumeric extends FloatIsFractional {
private[sql] object FloatExactNumeric extends FloatIsFractional {
private def overflowException(x: Float, dataType: String) =
throw new ArithmeticException(s"Casting $x to $dataType causes overflow")
@ -151,7 +151,7 @@ object FloatExactNumeric extends FloatIsFractional {
override def compare(x: Float, y: Float): Int = java.lang.Float.compare(x, y)
}
object DoubleExactNumeric extends DoubleIsFractional {
private[sql] object DoubleExactNumeric extends DoubleIsFractional {
private def overflowException(x: Double, dataType: String) =
throw new ArithmeticException(s"Casting $x to $dataType causes overflow")
@ -179,7 +179,7 @@ object DoubleExactNumeric extends DoubleIsFractional {
override def compare(x: Double, y: Double): Int = java.lang.Double.compare(x, y)
}
object DecimalExactNumeric extends DecimalIsConflicted {
private[sql] object DecimalExactNumeric extends DecimalIsConflicted {
override def toInt(x: Decimal): Int = x.roundToInt()
override def toLong(x: Decimal): Long = x.roundToLong()

View file

@ -246,6 +246,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
/**
* Configuration methods common to create/replace operations and insert/overwrite operations.
* @tparam R builder type to return
* @since 3.0.0
*/
trait WriteConfigMethods[R] {
/**
@ -293,6 +294,8 @@ trait WriteConfigMethods[R] {
/**
* Trait to restrict calls to create and replace operations.
*
* @since 3.0.0
*/
trait CreateTableWriter[T] extends WriteConfigMethods[CreateTableWriter[T]] {
/**