From d1371a2dadd9152e286fa62c9b7aafedac6c9e16 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 5 Jun 2019 19:44:53 -0700 Subject: [PATCH] [SPARK-27964][SQL] Move v2 catalog update methods to CatalogV2Util ## What changes were proposed in this pull request? Move methods that implement v2 catalog operations to CatalogV2Util so they can be used in #24768. ## How was this patch tested? Behavior is validated by existing tests. Closes #24813 from rdblue/SPARK-27964-add-catalog-v2-util. Authored-by: Ryan Blue Signed-off-by: Dongjoon Hyun --- project/SparkBuild.scala | 1 + .../sql/catalog/v2/utils/CatalogV2Util.scala | 152 ++++++++++++++++++ .../sql/catalog/v2/TestTableCatalog.scala | 125 +------------- .../sources/v2/TestInMemoryTableCatalog.scala | 7 +- 4 files changed, 161 insertions(+), 124 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 83fe904cc1..7a2b5969bc 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -823,6 +823,7 @@ object Unidoc { .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/execution"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/internal"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive/test"))) + .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalog/v2/utils"))) } private def ignoreClasspaths(classpaths: Seq[Classpath]): Seq[Classpath] = { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala new file mode 100644 index 0000000000..a00bcab602 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2.utils + +import java.util +import java.util.Collections + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalog.v2.TableChange +import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType} +import org.apache.spark.sql.types.{StructField, StructType} + +object CatalogV2Util { + /** + * Apply properties changes to a map and return the result. + */ + def applyPropertiesChanges( + properties: Map[String, String], + changes: Seq[TableChange]): Map[String, String] = { + applyPropertiesChanges(properties.asJava, changes).asScala.toMap + } + + /** + * Apply properties changes to a Java map and return the result. + */ + def applyPropertiesChanges( + properties: util.Map[String, String], + changes: Seq[TableChange]): util.Map[String, String] = { + val newProperties = new util.HashMap[String, String](properties) + + changes.foreach { + case set: SetProperty => + newProperties.put(set.property, set.value) + + case unset: RemoveProperty => + newProperties.remove(unset.property) + + case _ => + // ignore non-property changes + } + + Collections.unmodifiableMap(newProperties) + } + + /** + * Apply schema changes to a schema and return the result. + */ + def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): StructType = { + changes.foldLeft(schema) { (schema, change) => + change match { + case add: AddColumn => + add.fieldNames match { + case Array(name) => + val newField = StructField(name, add.dataType, nullable = add.isNullable) + Option(add.comment) match { + case Some(comment) => + schema.add(newField.withComment(comment)) + case _ => + schema.add(newField) + } + + case names => + replace(schema, names.init, parent => parent.dataType match { + case parentType: StructType => + val field = StructField(names.last, add.dataType, nullable = add.isNullable) + val newParentType = Option(add.comment) match { + case Some(comment) => + parentType.add(field.withComment(comment)) + case None => + parentType.add(field) + } + + Some(StructField(parent.name, newParentType, parent.nullable, parent.metadata)) + + case _ => + throw new IllegalArgumentException(s"Not a struct: ${names.init.last}") + }) + } + + case rename: RenameColumn => + replace(schema, rename.fieldNames, field => + Some(StructField(rename.newName, field.dataType, field.nullable, field.metadata))) + + case update: UpdateColumnType => + replace(schema, update.fieldNames, field => { + if (!update.isNullable && field.nullable) { + throw new IllegalArgumentException( + s"Cannot change optional column to required: $field.name") + } + Some(StructField(field.name, update.newDataType, update.isNullable, field.metadata)) + }) + + case update: UpdateColumnComment => + replace(schema, update.fieldNames, field => + Some(field.withComment(update.newComment))) + + case delete: DeleteColumn => + replace(schema, delete.fieldNames, _ => None) + + case _ => + // ignore non-schema changes + schema + } + } + } + + private def replace( + struct: StructType, + fieldNames: Seq[String], + update: StructField => Option[StructField]): StructType = { + + val pos = struct.getFieldIndex(fieldNames.head) + .getOrElse(throw new IllegalArgumentException(s"Cannot find field: ${fieldNames.head}")) + val field = struct.fields(pos) + val replacement: Option[StructField] = if (fieldNames.tail.isEmpty) { + update(field) + } else { + field.dataType match { + case nestedStruct: StructType => + val updatedType: StructType = replace(nestedStruct, fieldNames.tail, update) + Some(StructField(field.name, updatedType, field.nullable, field.metadata)) + case _ => + throw new IllegalArgumentException(s"Not a struct: ${fieldNames.head}") + } + } + + val newFields = struct.fields.zipWithIndex.flatMap { + case (_, index) if pos == index => + replacement + case (other, _) => + Some(other) + } + + new StructType(newFields) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala index 78b4763484..6ba140fa5d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala @@ -18,16 +18,15 @@ package org.apache.spark.sql.catalog.v2 import java.util -import java.util.Collections import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType} import org.apache.spark.sql.catalog.v2.expressions.Transform +import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.sources.v2.{Table, TableCapability} -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap class TestTableCatalog extends TableCatalog { @@ -79,8 +78,8 @@ class TestTableCatalog extends TableCatalog { override def alterTable(ident: Identifier, changes: TableChange*): Table = { val table = loadTable(ident) - val properties = TestTableCatalog.applyPropertiesChanges(table.properties, changes) - val schema = TestTableCatalog.applySchemaChanges(table.schema, changes) + val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes) + val schema = CatalogV2Util.applySchemaChanges(table.schema, changes) val newTable = InMemoryTable(table.name, schema, properties) tables.put(ident, newTable) @@ -91,122 +90,6 @@ class TestTableCatalog extends TableCatalog { override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined } -object TestTableCatalog { - /** - * Apply properties changes to a map and return the result. - */ - def applyPropertiesChanges( - properties: util.Map[String, String], - changes: Seq[TableChange]): util.Map[String, String] = { - val newProperties = new util.HashMap[String, String](properties) - - changes.foreach { - case set: SetProperty => - newProperties.put(set.property, set.value) - - case unset: RemoveProperty => - newProperties.remove(unset.property) - - case _ => - // ignore non-property changes - } - - Collections.unmodifiableMap(newProperties) - } - - /** - * Apply schema changes to a schema and return the result. - */ - def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): StructType = { - changes.foldLeft(schema) { (schema, change) => - change match { - case add: AddColumn => - add.fieldNames match { - case Array(name) => - val newField = StructField(name, add.dataType, nullable = add.isNullable) - Option(add.comment) match { - case Some(comment) => - schema.add(newField.withComment(comment)) - case _ => - schema.add(newField) - } - - case names => - replace(schema, names.init, parent => parent.dataType match { - case parentType: StructType => - val field = StructField(names.last, add.dataType, nullable = add.isNullable) - val newParentType = Option(add.comment) match { - case Some(comment) => - parentType.add(field.withComment(comment)) - case None => - parentType.add(field) - } - - Some(StructField(parent.name, newParentType, parent.nullable, parent.metadata)) - - case _ => - throw new IllegalArgumentException(s"Not a struct: ${names.init.last}") - }) - } - - case rename: RenameColumn => - replace(schema, rename.fieldNames, field => - Some(StructField(rename.newName, field.dataType, field.nullable, field.metadata))) - - case update: UpdateColumnType => - replace(schema, update.fieldNames, field => { - if (!update.isNullable && field.nullable) { - throw new IllegalArgumentException( - s"Cannot change optional column to required: $field.name") - } - Some(StructField(field.name, update.newDataType, update.isNullable, field.metadata)) - }) - - case update: UpdateColumnComment => - replace(schema, update.fieldNames, field => - Some(field.withComment(update.newComment))) - - case delete: DeleteColumn => - replace(schema, delete.fieldNames, _ => None) - - case _ => - // ignore non-schema changes - schema - } - } - } - - private def replace( - struct: StructType, - path: Seq[String], - update: StructField => Option[StructField]): StructType = { - - val pos = struct.getFieldIndex(path.head) - .getOrElse(throw new IllegalArgumentException(s"Cannot find field: ${path.head}")) - val field = struct.fields(pos) - val replacement: Option[StructField] = if (path.tail.isEmpty) { - update(field) - } else { - field.dataType match { - case nestedStruct: StructType => - val updatedType: StructType = replace(nestedStruct, path.tail, update) - Some(StructField(field.name, updatedType, field.nullable, field.metadata)) - case _ => - throw new IllegalArgumentException(s"Not a struct: ${path.head}") - } - } - - val newFields = struct.fields.zipWithIndex.flatMap { - case (_, index) if pos == index => - replacement - case (other, _) => - Some(other) - } - - new StructType(newFields) - } -} - case class InMemoryTable( name: String, schema: StructType, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index 2ecf1c2f18..4e9f961016 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -23,8 +23,9 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, TableCatalog, TableChange, TestTableCatalog} +import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, TableCatalog, TableChange} import org.apache.spark.sql.catalog.v2.expressions.Transform +import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder} @@ -85,8 +86,8 @@ class TestInMemoryTableCatalog extends TableCatalog { override def alterTable(ident: Identifier, changes: TableChange*): Table = { Option(tables.get(ident)) match { case Some(table) => - val properties = TestTableCatalog.applyPropertiesChanges(table.properties, changes) - val schema = TestTableCatalog.applySchemaChanges(table.schema, changes) + val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes) + val schema = CatalogV2Util.applySchemaChanges(table.schema, changes) val newTable = new InMemoryTable(table.name, schema, properties, table.data) tables.put(ident, newTable)