[SPARK-32660][SQL][DOC] Show Avro related API in documentation

### What changes were proposed in this pull request?

Currently, the Avro related APIs are missing in the documentation https://spark.apache.org/docs/latest/api/scala/org/apache/spark/index.html . This PR is to:
1. Mark internal Avro related classes as private
2. Show Avro related API in Spark official API documentation

### Why are the changes needed?

Better documentation.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Build doc and preview:
![image](https://user-images.githubusercontent.com/1097932/90623042-d156ee00-e1ca-11ea-9edd-2c45b3001fd8.png)

![image](https://user-images.githubusercontent.com/1097932/90623047-d451de80-e1ca-11ea-94ba-02921b64d6f1.png)

![image](https://user-images.githubusercontent.com/1097932/90623058-d6b43880-e1ca-11ea-849a-b9ea9efe6527.png)

Closes #29476 from gengliangwang/avroAPIDoc.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
This commit is contained in:
Gengliang Wang 2020-08-21 13:12:43 +08:00
parent 8b119f1663
commit de141a3271
9 changed files with 30 additions and 14 deletions

View file

@ -35,8 +35,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
// A variant of `AvroKeyOutputFormat`, which is used to inject the custom `RecordWriterFactory` so // A variant of `AvroKeyOutputFormat`, which is used to inject the custom `RecordWriterFactory` so
// that we can set avro file metadata. // that we can set avro file metadata.
public class SparkAvroKeyOutputFormat extends AvroKeyOutputFormat<GenericRecord> { class SparkAvroKeyOutputFormat extends AvroKeyOutputFormat<GenericRecord> {
public SparkAvroKeyOutputFormat(Map<String, String> metadata) { SparkAvroKeyOutputFormat(Map<String, String> metadata) {
super(new SparkRecordWriterFactory(metadata)); super(new SparkRecordWriterFactory(metadata));
} }

View file

@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode} import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode}
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
case class AvroDataToCatalyst( private[avro] case class AvroDataToCatalyst(
child: Expression, child: Expression,
jsonFormatSchema: String, jsonFormatSchema: String,
options: Map[String, String]) options: Map[String, String])

View file

@ -42,7 +42,7 @@ import org.apache.spark.unsafe.types.UTF8String
/** /**
* A deserializer to deserialize data in avro format to data in catalyst format. * A deserializer to deserialize data in avro format to data in catalyst format.
*/ */
class AvroDeserializer( private[sql] class AvroDeserializer(
rootAvroType: Schema, rootAvroType: Schema,
rootCatalystType: DataType, rootCatalystType: DataType,
datetimeRebaseMode: LegacyBehaviorPolicy.Value, datetimeRebaseMode: LegacyBehaviorPolicy.Value,

View file

@ -27,7 +27,7 @@ import org.apache.spark.sql.internal.SQLConf
/** /**
* Options for Avro Reader and Writer stored in case insensitive manner. * Options for Avro Reader and Writer stored in case insensitive manner.
*/ */
class AvroOptions( private[sql] class AvroOptions(
@transient val parameters: CaseInsensitiveMap[String], @transient val parameters: CaseInsensitiveMap[String],
@transient val conf: Configuration) extends Logging with Serializable { @transient val conf: Configuration) extends Logging with Serializable {
@ -95,7 +95,7 @@ class AvroOptions(
parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
} }
object AvroOptions { private[sql] object AvroOptions {
def apply(parameters: Map[String, String]): AvroOptions = { def apply(parameters: Map[String, String]): AvroOptions = {
val hadoopConf = SparkSession val hadoopConf = SparkSession
.getActiveSession .getActiveSession

View file

@ -43,7 +43,7 @@ import org.apache.spark.sql.types._
/** /**
* A serializer to serialize data in catalyst format to data in avro format. * A serializer to serialize data in catalyst format to data in avro format.
*/ */
class AvroSerializer( private[sql] class AvroSerializer(
rootCatalystType: DataType, rootCatalystType: DataType,
rootAvroType: Schema, rootAvroType: Schema,
nullable: Boolean, nullable: Boolean,

View file

@ -38,7 +38,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
object AvroUtils extends Logging { private[sql] object AvroUtils extends Logging {
def inferSchema( def inferSchema(
spark: SparkSession, spark: SparkSession,
options: Map[String, String], options: Map[String, String],

View file

@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.types.{BinaryType, DataType} import org.apache.spark.sql.types.{BinaryType, DataType}
case class CatalystDataToAvro( private[avro] case class CatalystDataToAvro(
child: Expression, child: Expression,
jsonFormatSchema: Option[String]) extends UnaryExpression { jsonFormatSchema: Option[String]) extends UnaryExpression {

View file

@ -24,6 +24,7 @@ import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.avro.LogicalTypes.{Date, Decimal, TimestampMicros, TimestampMillis} import org.apache.avro.LogicalTypes.{Date, Decimal, TimestampMicros, TimestampMillis}
import org.apache.avro.Schema.Type._ import org.apache.avro.Schema.Type._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.util.RandomUUIDGenerator import org.apache.spark.sql.catalyst.util.RandomUUIDGenerator
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.sql.types.Decimal.{maxPrecisionForBytes, minBytesForPrecision} import org.apache.spark.sql.types.Decimal.{maxPrecisionForBytes, minBytesForPrecision}
@ -32,21 +33,29 @@ import org.apache.spark.sql.types.Decimal.{maxPrecisionForBytes, minBytesForPrec
* This object contains method that are used to convert sparkSQL schemas to avro schemas and vice * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice
* versa. * versa.
*/ */
@DeveloperApi
object SchemaConverters { object SchemaConverters {
private lazy val uuidGenerator = RandomUUIDGenerator(new Random().nextLong()) private lazy val uuidGenerator = RandomUUIDGenerator(new Random().nextLong())
private lazy val nullSchema = Schema.create(Schema.Type.NULL) private lazy val nullSchema = Schema.create(Schema.Type.NULL)
/**
* Internal wrapper for SQL data type and nullability.
*
* @since 2.4.0
*/
case class SchemaType(dataType: DataType, nullable: Boolean) case class SchemaType(dataType: DataType, nullable: Boolean)
/** /**
* This function takes an avro schema and returns a sql schema. * Converts an Avro schema to a corresponding Spark SQL schema.
*
* @since 2.4.0
*/ */
def toSqlType(avroSchema: Schema): SchemaType = { def toSqlType(avroSchema: Schema): SchemaType = {
toSqlTypeHelper(avroSchema, Set.empty) toSqlTypeHelper(avroSchema, Set.empty)
} }
def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = { private def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = {
avroSchema.getType match { avroSchema.getType match {
case INT => avroSchema.getLogicalType match { case INT => avroSchema.getLogicalType match {
case _: Date => SchemaType(DateType, nullable = false) case _: Date => SchemaType(DateType, nullable = false)
@ -133,6 +142,11 @@ object SchemaConverters {
} }
} }
/**
* Converts a Spark SQL schema to a corresponding Avro schema.
*
* @since 2.4.0
*/
def toAvroType( def toAvroType(
catalystType: DataType, catalystType: DataType,
nullable: Boolean = false, nullable: Boolean = false,
@ -192,4 +206,5 @@ object SchemaConverters {
} }
} }
class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex) private[avro] class IncompatibleSchemaException(
msg: String, ex: Throwable = null) extends Exception(msg, ex)

View file

@ -852,6 +852,7 @@ object Unidoc {
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive/test"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive/test")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalog/v2/utils"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalog/v2/utils")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/hive"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/hive")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/v2/avro")))
} }
private def ignoreClasspaths(classpaths: Seq[Classpath]): Seq[Classpath] = { private def ignoreClasspaths(classpaths: Seq[Classpath]): Seq[Classpath] = {
@ -867,10 +868,10 @@ object Unidoc {
unidocProjectFilter in(ScalaUnidoc, unidoc) := unidocProjectFilter in(ScalaUnidoc, unidoc) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes, inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes,
yarn, tags, streamingKafka010, sqlKafka010, avro), yarn, tags, streamingKafka010, sqlKafka010),
unidocProjectFilter in(JavaUnidoc, unidoc) := unidocProjectFilter in(JavaUnidoc, unidoc) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes, inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes,
yarn, tags, streamingKafka010, sqlKafka010, avro), yarn, tags, streamingKafka010, sqlKafka010),
unidocAllClasspaths in (ScalaUnidoc, unidoc) := { unidocAllClasspaths in (ScalaUnidoc, unidoc) := {
ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value) ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value)