[SPARK-34379][SQL] Map JDBC RowID to StringType rather than LongType
### What changes were proposed in this pull request? This PR fix an issue that `java.sql.RowId` is mapped to `LongType` and prefer `StringType`. In the current implementation, JDBC RowID type is mapped to `LongType` except for `OracleDialect`, but there is no guarantee to be able to convert RowID to long. `java.sql.RowId` declares `toString` and the specification of `java.sql.RowId` says > _all methods on the RowId interface must be fully implemented if the JDBC driver supports the data type_ (https://docs.oracle.com/javase/8/docs/api/java/sql/RowId.html) So, we should prefer StringType to LongType. ### Why are the changes needed? This seems to be a potential bug. ### Does this PR introduce _any_ user-facing change? Yes. RowID is mapped to StringType rather than LongType. ### How was this patch tested? New test and the existing test case `SPARK-32992: map Oracle's ROWID type to StringType` in `OracleIntegrationSuite` passes. Closes #31491 from sarutak/rowid-type. Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com> Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
This commit is contained in:
parent
f78466dca6
commit
82b33a3041
|
@ -24,6 +24,8 @@ license: |
|
||||||
|
|
||||||
## Upgrading from Spark SQL 3.1 to 3.2
|
## Upgrading from Spark SQL 3.1 to 3.2
|
||||||
|
|
||||||
|
- Since Spark 3.2, all the supported JDBC dialects use StringType for ROWID. In Spark 3.1 or earlier, Oracle dialect uses StringType and the other dialects use LongType.
|
||||||
|
|
||||||
- In Spark 3.2, PostgreSQL JDBC dialect uses StringType for MONEY and MONEY[] is not supported due to the JDBC driver for PostgreSQL can't handle those types properly. In Spark 3.1 or earlier, DoubleType and ArrayType of DoubleType are used respectively.
|
- In Spark 3.2, PostgreSQL JDBC dialect uses StringType for MONEY and MONEY[] is not supported due to the JDBC driver for PostgreSQL can't handle those types properly. In Spark 3.1 or earlier, DoubleType and ArrayType of DoubleType are used respectively.
|
||||||
|
|
||||||
- In Spark 3.2, `spark.sql.adaptive.enabled` is enabled by default. To restore the behavior before Spark 3.2, you can set `spark.sql.adaptive.enabled` to `false`.
|
- In Spark 3.2, `spark.sql.adaptive.enabled` is enabled by default. To restore the behavior before Spark 3.2, you can set `spark.sql.adaptive.enabled` to `false`.
|
||||||
|
|
|
@ -226,7 +226,7 @@ object JdbcUtils extends Logging {
|
||||||
case java.sql.Types.REAL => DoubleType
|
case java.sql.Types.REAL => DoubleType
|
||||||
case java.sql.Types.REF => StringType
|
case java.sql.Types.REF => StringType
|
||||||
case java.sql.Types.REF_CURSOR => null
|
case java.sql.Types.REF_CURSOR => null
|
||||||
case java.sql.Types.ROWID => LongType
|
case java.sql.Types.ROWID => StringType
|
||||||
case java.sql.Types.SMALLINT => IntegerType
|
case java.sql.Types.SMALLINT => IntegerType
|
||||||
case java.sql.Types.SQLXML => StringType
|
case java.sql.Types.SQLXML => StringType
|
||||||
case java.sql.Types.STRUCT => StringType
|
case java.sql.Types.STRUCT => StringType
|
||||||
|
@ -310,11 +310,15 @@ object JdbcUtils extends Logging {
|
||||||
val metadata = new MetadataBuilder()
|
val metadata = new MetadataBuilder()
|
||||||
metadata.putLong("scale", fieldScale)
|
metadata.putLong("scale", fieldScale)
|
||||||
|
|
||||||
// SPARK-33888
|
dataType match {
|
||||||
// - include TIME type metadata
|
case java.sql.Types.TIME =>
|
||||||
// - always build the metadata
|
// SPARK-33888
|
||||||
if (dataType == java.sql.Types.TIME) {
|
// - include TIME type metadata
|
||||||
metadata.putBoolean("logical_time_type", true)
|
// - always build the metadata
|
||||||
|
metadata.putBoolean("logical_time_type", true)
|
||||||
|
case java.sql.Types.ROWID =>
|
||||||
|
metadata.putBoolean("rowid", true)
|
||||||
|
case _ =>
|
||||||
}
|
}
|
||||||
|
|
||||||
val columnType =
|
val columnType =
|
||||||
|
@ -448,6 +452,10 @@ object JdbcUtils extends Logging {
|
||||||
(rs: ResultSet, row: InternalRow, pos: Int) =>
|
(rs: ResultSet, row: InternalRow, pos: Int) =>
|
||||||
row.setByte(pos, rs.getByte(pos + 1))
|
row.setByte(pos, rs.getByte(pos + 1))
|
||||||
|
|
||||||
|
case StringType if metadata.contains("rowid") =>
|
||||||
|
(rs: ResultSet, row: InternalRow, pos: Int) =>
|
||||||
|
row.update(pos, UTF8String.fromString(rs.getRowId(pos + 1).toString))
|
||||||
|
|
||||||
case StringType =>
|
case StringType =>
|
||||||
(rs: ResultSet, row: InternalRow, pos: Int) =>
|
(rs: ResultSet, row: InternalRow, pos: Int) =>
|
||||||
// TODO(davies): use getBytes for better performance, if the encoding is UTF-8
|
// TODO(davies): use getBytes for better performance, if the encoding is UTF-8
|
||||||
|
|
|
@ -64,12 +64,6 @@ private case object OracleDialect extends JdbcDialect {
|
||||||
=> Some(TimestampType) // Value for Timestamp with Time Zone in Oracle
|
=> Some(TimestampType) // Value for Timestamp with Time Zone in Oracle
|
||||||
case BINARY_FLOAT => Some(FloatType) // Value for OracleTypes.BINARY_FLOAT
|
case BINARY_FLOAT => Some(FloatType) // Value for OracleTypes.BINARY_FLOAT
|
||||||
case BINARY_DOUBLE => Some(DoubleType) // Value for OracleTypes.BINARY_DOUBLE
|
case BINARY_DOUBLE => Some(DoubleType) // Value for OracleTypes.BINARY_DOUBLE
|
||||||
// scalastyle:off line.size.limit
|
|
||||||
// According to the documentation for Oracle Database 19c:
|
|
||||||
// "Values of the ROWID pseudocolumn are strings representing the address of each row."
|
|
||||||
// https://docs.oracle.com/en/database/oracle/oracle-database/19/sqlrf/Data-Types.html#GUID-AEF1FE4C-2DE5-4BE7-BB53-83AD8F1E34EF
|
|
||||||
// scalastyle:on line.size.limit
|
|
||||||
case Types.ROWID => Some(StringType)
|
|
||||||
case _ => None
|
case _ => None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,8 @@ import java.util.{Calendar, GregorianCalendar, Properties}
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
import org.h2.jdbc.JdbcSQLException
|
import org.h2.jdbc.JdbcSQLException
|
||||||
|
import org.mockito.ArgumentMatchers._
|
||||||
|
import org.mockito.Mockito._
|
||||||
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
|
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
|
||||||
|
|
||||||
import org.apache.spark.SparkException
|
import org.apache.spark.SparkException
|
||||||
|
@ -1781,4 +1783,28 @@ class JDBCSuite extends QueryTest
|
||||||
assert(options.asProperties.get("url") == url)
|
assert(options.asProperties.get("url") == url)
|
||||||
assert(options.asProperties.get("dbtable") == "table3")
|
assert(options.asProperties.get("dbtable") == "table3")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("SPARK-34379: Map JDBC RowID to StringType rather than LongType") {
|
||||||
|
val mockRsmd = mock(classOf[java.sql.ResultSetMetaData])
|
||||||
|
when(mockRsmd.getColumnCount).thenReturn(1)
|
||||||
|
when(mockRsmd.getColumnLabel(anyInt())).thenReturn("rowid")
|
||||||
|
when(mockRsmd.getColumnType(anyInt())).thenReturn(java.sql.Types.ROWID)
|
||||||
|
when(mockRsmd.getColumnTypeName(anyInt())).thenReturn("rowid")
|
||||||
|
when(mockRsmd.getPrecision(anyInt())).thenReturn(0)
|
||||||
|
when(mockRsmd.getScale(anyInt())).thenReturn(0)
|
||||||
|
when(mockRsmd.isSigned(anyInt())).thenReturn(false)
|
||||||
|
when(mockRsmd.isNullable(anyInt())).thenReturn(java.sql.ResultSetMetaData.columnNoNulls)
|
||||||
|
|
||||||
|
val mockRs = mock(classOf[java.sql.ResultSet])
|
||||||
|
when(mockRs.getMetaData).thenReturn(mockRsmd)
|
||||||
|
|
||||||
|
val mockDialect = mock(classOf[JdbcDialect])
|
||||||
|
when(mockDialect.getCatalystType(anyInt(), anyString(), anyInt(), any[MetadataBuilder]))
|
||||||
|
.thenReturn(None)
|
||||||
|
|
||||||
|
val schema = JdbcUtils.getSchema(mockRs, mockDialect)
|
||||||
|
val fields = schema.fields
|
||||||
|
assert(fields.length === 1)
|
||||||
|
assert(fields(0).dataType === StringType)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue