[SPARK-27964][SQL] Move v2 catalog update methods to CatalogV2Util

## What changes were proposed in this pull request?

Move methods that implement v2 catalog operations to CatalogV2Util so they can be used in #24768.

## How was this patch tested?

Behavior is validated by existing tests.

Closes #24813 from rdblue/SPARK-27964-add-catalog-v2-util.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Ryan Blue 2019-06-05 19:44:53 -07:00 committed by Dongjoon Hyun
parent 20e8843350
commit d1371a2dad
4 changed files with 161 additions and 124 deletions

View file

@ -823,6 +823,7 @@ object Unidoc {
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/execution")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/internal")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive/test")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalog/v2/utils")))
}
private def ignoreClasspaths(classpaths: Seq[Classpath]): Seq[Classpath] = {

View file

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalog.v2.utils
import java.util
import java.util.Collections
import scala.collection.JavaConverters._
import org.apache.spark.sql.catalog.v2.TableChange
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.types.{StructField, StructType}
object CatalogV2Util {
/**
* Apply properties changes to a map and return the result.
*/
def applyPropertiesChanges(
properties: Map[String, String],
changes: Seq[TableChange]): Map[String, String] = {
applyPropertiesChanges(properties.asJava, changes).asScala.toMap
}
/**
* Apply properties changes to a Java map and return the result.
*/
def applyPropertiesChanges(
properties: util.Map[String, String],
changes: Seq[TableChange]): util.Map[String, String] = {
val newProperties = new util.HashMap[String, String](properties)
changes.foreach {
case set: SetProperty =>
newProperties.put(set.property, set.value)
case unset: RemoveProperty =>
newProperties.remove(unset.property)
case _ =>
// ignore non-property changes
}
Collections.unmodifiableMap(newProperties)
}
/**
* Apply schema changes to a schema and return the result.
*/
def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): StructType = {
changes.foldLeft(schema) { (schema, change) =>
change match {
case add: AddColumn =>
add.fieldNames match {
case Array(name) =>
val newField = StructField(name, add.dataType, nullable = add.isNullable)
Option(add.comment) match {
case Some(comment) =>
schema.add(newField.withComment(comment))
case _ =>
schema.add(newField)
}
case names =>
replace(schema, names.init, parent => parent.dataType match {
case parentType: StructType =>
val field = StructField(names.last, add.dataType, nullable = add.isNullable)
val newParentType = Option(add.comment) match {
case Some(comment) =>
parentType.add(field.withComment(comment))
case None =>
parentType.add(field)
}
Some(StructField(parent.name, newParentType, parent.nullable, parent.metadata))
case _ =>
throw new IllegalArgumentException(s"Not a struct: ${names.init.last}")
})
}
case rename: RenameColumn =>
replace(schema, rename.fieldNames, field =>
Some(StructField(rename.newName, field.dataType, field.nullable, field.metadata)))
case update: UpdateColumnType =>
replace(schema, update.fieldNames, field => {
if (!update.isNullable && field.nullable) {
throw new IllegalArgumentException(
s"Cannot change optional column to required: $field.name")
}
Some(StructField(field.name, update.newDataType, update.isNullable, field.metadata))
})
case update: UpdateColumnComment =>
replace(schema, update.fieldNames, field =>
Some(field.withComment(update.newComment)))
case delete: DeleteColumn =>
replace(schema, delete.fieldNames, _ => None)
case _ =>
// ignore non-schema changes
schema
}
}
}
private def replace(
struct: StructType,
fieldNames: Seq[String],
update: StructField => Option[StructField]): StructType = {
val pos = struct.getFieldIndex(fieldNames.head)
.getOrElse(throw new IllegalArgumentException(s"Cannot find field: ${fieldNames.head}"))
val field = struct.fields(pos)
val replacement: Option[StructField] = if (fieldNames.tail.isEmpty) {
update(field)
} else {
field.dataType match {
case nestedStruct: StructType =>
val updatedType: StructType = replace(nestedStruct, fieldNames.tail, update)
Some(StructField(field.name, updatedType, field.nullable, field.metadata))
case _ =>
throw new IllegalArgumentException(s"Not a struct: ${fieldNames.head}")
}
}
val newFields = struct.fields.zipWithIndex.flatMap {
case (_, index) if pos == index =>
replacement
case (other, _) =>
Some(other)
}
new StructType(newFields)
}
}

View file

@ -18,16 +18,15 @@
package org.apache.spark.sql.catalog.v2
import java.util
import java.util.Collections
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.sources.v2.{Table, TableCapability}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
class TestTableCatalog extends TableCatalog {
@ -79,8 +78,8 @@ class TestTableCatalog extends TableCatalog {
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
val table = loadTable(ident)
val properties = TestTableCatalog.applyPropertiesChanges(table.properties, changes)
val schema = TestTableCatalog.applySchemaChanges(table.schema, changes)
val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes)
val schema = CatalogV2Util.applySchemaChanges(table.schema, changes)
val newTable = InMemoryTable(table.name, schema, properties)
tables.put(ident, newTable)
@ -91,122 +90,6 @@ class TestTableCatalog extends TableCatalog {
override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined
}
object TestTableCatalog {
/**
* Apply properties changes to a map and return the result.
*/
def applyPropertiesChanges(
properties: util.Map[String, String],
changes: Seq[TableChange]): util.Map[String, String] = {
val newProperties = new util.HashMap[String, String](properties)
changes.foreach {
case set: SetProperty =>
newProperties.put(set.property, set.value)
case unset: RemoveProperty =>
newProperties.remove(unset.property)
case _ =>
// ignore non-property changes
}
Collections.unmodifiableMap(newProperties)
}
/**
* Apply schema changes to a schema and return the result.
*/
def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): StructType = {
changes.foldLeft(schema) { (schema, change) =>
change match {
case add: AddColumn =>
add.fieldNames match {
case Array(name) =>
val newField = StructField(name, add.dataType, nullable = add.isNullable)
Option(add.comment) match {
case Some(comment) =>
schema.add(newField.withComment(comment))
case _ =>
schema.add(newField)
}
case names =>
replace(schema, names.init, parent => parent.dataType match {
case parentType: StructType =>
val field = StructField(names.last, add.dataType, nullable = add.isNullable)
val newParentType = Option(add.comment) match {
case Some(comment) =>
parentType.add(field.withComment(comment))
case None =>
parentType.add(field)
}
Some(StructField(parent.name, newParentType, parent.nullable, parent.metadata))
case _ =>
throw new IllegalArgumentException(s"Not a struct: ${names.init.last}")
})
}
case rename: RenameColumn =>
replace(schema, rename.fieldNames, field =>
Some(StructField(rename.newName, field.dataType, field.nullable, field.metadata)))
case update: UpdateColumnType =>
replace(schema, update.fieldNames, field => {
if (!update.isNullable && field.nullable) {
throw new IllegalArgumentException(
s"Cannot change optional column to required: $field.name")
}
Some(StructField(field.name, update.newDataType, update.isNullable, field.metadata))
})
case update: UpdateColumnComment =>
replace(schema, update.fieldNames, field =>
Some(field.withComment(update.newComment)))
case delete: DeleteColumn =>
replace(schema, delete.fieldNames, _ => None)
case _ =>
// ignore non-schema changes
schema
}
}
}
private def replace(
struct: StructType,
path: Seq[String],
update: StructField => Option[StructField]): StructType = {
val pos = struct.getFieldIndex(path.head)
.getOrElse(throw new IllegalArgumentException(s"Cannot find field: ${path.head}"))
val field = struct.fields(pos)
val replacement: Option[StructField] = if (path.tail.isEmpty) {
update(field)
} else {
field.dataType match {
case nestedStruct: StructType =>
val updatedType: StructType = replace(nestedStruct, path.tail, update)
Some(StructField(field.name, updatedType, field.nullable, field.metadata))
case _ =>
throw new IllegalArgumentException(s"Not a struct: ${path.head}")
}
}
val newFields = struct.fields.zipWithIndex.flatMap {
case (_, index) if pos == index =>
replacement
case (other, _) =>
Some(other)
}
new StructType(newFields)
}
}
case class InMemoryTable(
name: String,
schema: StructType,

View file

@ -23,8 +23,9 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, TableCatalog, TableChange, TestTableCatalog}
import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, TableCatalog, TableChange}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
@ -85,8 +86,8 @@ class TestInMemoryTableCatalog extends TableCatalog {
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
Option(tables.get(ident)) match {
case Some(table) =>
val properties = TestTableCatalog.applyPropertiesChanges(table.properties, changes)
val schema = TestTableCatalog.applySchemaChanges(table.schema, changes)
val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes)
val schema = CatalogV2Util.applySchemaChanges(table.schema, changes)
val newTable = new InMemoryTable(table.name, schema, properties, table.data)
tables.put(ident, newTable)