diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index a6101c6fe7..0552e807ef 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -24,6 +24,8 @@ license: | ## Upgrading from Spark SQL 3.1 to 3.2 + - Since Spark 3.2, all the supported JDBC dialects use StringType for ROWID. In Spark 3.1 or earlier, Oracle dialect uses StringType and the other dialects use LongType. + - In Spark 3.2, PostgreSQL JDBC dialect uses StringType for MONEY and MONEY[] is not supported due to the JDBC driver for PostgreSQL can't handle those types properly. In Spark 3.1 or earlier, DoubleType and ArrayType of DoubleType are used respectively. - In Spark 3.2, `spark.sql.adaptive.enabled` is enabled by default. To restore the behavior before Spark 3.2, you can set `spark.sql.adaptive.enabled` to `false`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 2ba8bed440..76099aa48b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -226,7 +226,7 @@ object JdbcUtils extends Logging { case java.sql.Types.REAL => DoubleType case java.sql.Types.REF => StringType case java.sql.Types.REF_CURSOR => null - case java.sql.Types.ROWID => LongType + case java.sql.Types.ROWID => StringType case java.sql.Types.SMALLINT => IntegerType case java.sql.Types.SQLXML => StringType case java.sql.Types.STRUCT => StringType @@ -310,11 +310,15 @@ object JdbcUtils extends Logging { val metadata = new MetadataBuilder() metadata.putLong("scale", fieldScale) - // SPARK-33888 - // - include TIME type metadata - // - always build the metadata - if (dataType == java.sql.Types.TIME) { - metadata.putBoolean("logical_time_type", true) + dataType match { + case java.sql.Types.TIME => + // SPARK-33888 + // - include TIME type metadata + // - always build the metadata + metadata.putBoolean("logical_time_type", true) + case java.sql.Types.ROWID => + metadata.putBoolean("rowid", true) + case _ => } val columnType = @@ -448,6 +452,10 @@ object JdbcUtils extends Logging { (rs: ResultSet, row: InternalRow, pos: Int) => row.setByte(pos, rs.getByte(pos + 1)) + case StringType if metadata.contains("rowid") => + (rs: ResultSet, row: InternalRow, pos: Int) => + row.update(pos, UTF8String.fromString(rs.getRowId(pos + 1).toString)) + case StringType => (rs: ResultSet, row: InternalRow, pos: Int) => // TODO(davies): use getBytes for better performance, if the encoding is UTF-8 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index 491b6e29ec..b741ece8dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -64,12 +64,6 @@ private case object OracleDialect extends JdbcDialect { => Some(TimestampType) // Value for Timestamp with Time Zone in Oracle case BINARY_FLOAT => Some(FloatType) // Value for OracleTypes.BINARY_FLOAT case BINARY_DOUBLE => Some(DoubleType) // Value for OracleTypes.BINARY_DOUBLE - // scalastyle:off line.size.limit - // According to the documentation for Oracle Database 19c: - // "Values of the ROWID pseudocolumn are strings representing the address of each row." - // https://docs.oracle.com/en/database/oracle/oracle-database/19/sqlrf/Data-Types.html#GUID-AEF1FE4C-2DE5-4BE7-BB53-83AD8F1E34EF - // scalastyle:on line.size.limit - case Types.ROWID => Some(StringType) case _ => None } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index cc2721149c..ff9adc9502 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -25,6 +25,8 @@ import java.util.{Calendar, GregorianCalendar, Properties} import scala.collection.JavaConverters._ import org.h2.jdbc.JdbcSQLException +import org.mockito.ArgumentMatchers._ +import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.SparkException @@ -1781,4 +1783,28 @@ class JDBCSuite extends QueryTest assert(options.asProperties.get("url") == url) assert(options.asProperties.get("dbtable") == "table3") } + + test("SPARK-34379: Map JDBC RowID to StringType rather than LongType") { + val mockRsmd = mock(classOf[java.sql.ResultSetMetaData]) + when(mockRsmd.getColumnCount).thenReturn(1) + when(mockRsmd.getColumnLabel(anyInt())).thenReturn("rowid") + when(mockRsmd.getColumnType(anyInt())).thenReturn(java.sql.Types.ROWID) + when(mockRsmd.getColumnTypeName(anyInt())).thenReturn("rowid") + when(mockRsmd.getPrecision(anyInt())).thenReturn(0) + when(mockRsmd.getScale(anyInt())).thenReturn(0) + when(mockRsmd.isSigned(anyInt())).thenReturn(false) + when(mockRsmd.isNullable(anyInt())).thenReturn(java.sql.ResultSetMetaData.columnNoNulls) + + val mockRs = mock(classOf[java.sql.ResultSet]) + when(mockRs.getMetaData).thenReturn(mockRsmd) + + val mockDialect = mock(classOf[JdbcDialect]) + when(mockDialect.getCatalystType(anyInt(), anyString(), anyInt(), any[MetadataBuilder])) + .thenReturn(None) + + val schema = JdbcUtils.getSchema(mockRs, mockDialect) + val fields = schema.fields + assert(fields.length === 1) + assert(fields(0).dataType === StringType) + } }