[SPARK-33888][SQL] JDBC SQL TIME type represents incorrectly as TimestampType, it should be physical Int in millis
### What changes were proposed in this pull request? JDBC SQL TIME type represents incorrectly as TimestampType, we change it to be physical Int in millis for now. ### Why are the changes needed? Currently, for JDBC, SQL TIME type represents incorrectly as Spark TimestampType. This should be represent as physical int in millis Represents a time of day, with no reference to a particular calendar, time zone or date, with a precision of one millisecond. It stores the number of milliseconds after midnight, 00:00:00.000. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Close #30902 Closes #30902 from saikocat/SPARK-33888. Lead-authored-by: Hoa <hoameomu@gmail.com> Co-authored-by: Hoa <saikocatz@gmail.com> Co-authored-by: Duc Hoa, Nguyen <hoa.nd@teko.vn> Co-authored-by: Duc Hoa, Nguyen <hoameomu@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
adac633f93
commit
0b647fe69c
|
@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.jdbc
|
|||
|
||||
import java.sql.{Connection, Driver, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException, SQLFeatureNotSupportedException}
|
||||
import java.util.Locale
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import scala.util.Try
|
||||
import scala.util.control.NonFatal
|
||||
|
@ -226,7 +227,7 @@ object JdbcUtils extends Logging {
|
|||
case java.sql.Types.SMALLINT => IntegerType
|
||||
case java.sql.Types.SQLXML => StringType
|
||||
case java.sql.Types.STRUCT => StringType
|
||||
case java.sql.Types.TIME => TimestampType
|
||||
case java.sql.Types.TIME => IntegerType
|
||||
case java.sql.Types.TIME_WITH_TIMEZONE
|
||||
=> null
|
||||
case java.sql.Types.TIMESTAMP => TimestampType
|
||||
|
@ -303,11 +304,23 @@ object JdbcUtils extends Logging {
|
|||
} else {
|
||||
rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
|
||||
}
|
||||
val metadata = new MetadataBuilder().putLong("scale", fieldScale)
|
||||
val metadata = new MetadataBuilder()
|
||||
// SPARK-33888
|
||||
// - include scale in metadata for only DECIMAL & NUMERIC
|
||||
// - include TIME type metadata
|
||||
// - always build the metadata
|
||||
dataType match {
|
||||
// scalastyle:off
|
||||
case java.sql.Types.NUMERIC => metadata.putLong("scale", fieldScale)
|
||||
case java.sql.Types.DECIMAL => metadata.putLong("scale", fieldScale)
|
||||
case java.sql.Types.TIME => metadata.putBoolean("logical_time_type", true)
|
||||
case _ =>
|
||||
// scalastyle:on
|
||||
}
|
||||
val columnType =
|
||||
dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse(
|
||||
getCatalystType(dataType, fieldSize, fieldScale, isSigned))
|
||||
fields(i) = StructField(columnName, columnType, nullable)
|
||||
fields(i) = StructField(columnName, columnType, nullable, metadata.build())
|
||||
i = i + 1
|
||||
}
|
||||
new StructType(fields)
|
||||
|
@ -408,6 +421,23 @@ object JdbcUtils extends Logging {
|
|||
(rs: ResultSet, row: InternalRow, pos: Int) =>
|
||||
row.setFloat(pos, rs.getFloat(pos + 1))
|
||||
|
||||
|
||||
// SPARK-33888 - sql TIME type represents as physical int in millis
|
||||
// Represents a time of day, with no reference to a particular calendar,
|
||||
// time zone or date, with a precision of one millisecond.
|
||||
// It stores the number of milliseconds after midnight, 00:00:00.000.
|
||||
case IntegerType if metadata.contains("logical_time_type") =>
|
||||
(rs: ResultSet, row: InternalRow, pos: Int) => {
|
||||
val rawTime = rs.getTime(pos + 1)
|
||||
if (rawTime != null) {
|
||||
val rawTimeInNano = rawTime.toLocalTime().toNanoOfDay()
|
||||
val timeInMillis = Math.toIntExact(TimeUnit.NANOSECONDS.toMillis(rawTimeInNano))
|
||||
row.setInt(pos, timeInMillis)
|
||||
} else {
|
||||
row.update(pos, null)
|
||||
}
|
||||
}
|
||||
|
||||
case IntegerType =>
|
||||
(rs: ResultSet, row: InternalRow, pos: Int) =>
|
||||
row.setInt(pos, rs.getInt(pos + 1))
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.spark.sql.jdbc
|
|||
import java.math.BigDecimal
|
||||
import java.sql.{Date, DriverManager, SQLException, Timestamp}
|
||||
import java.util.{Calendar, GregorianCalendar, Properties}
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
|
@ -610,7 +611,13 @@ class JDBCSuite extends QueryTest
|
|||
test("H2 time types") {
|
||||
val rows = sql("SELECT * FROM timetypes").collect()
|
||||
val cal = new GregorianCalendar(java.util.Locale.ROOT)
|
||||
cal.setTime(rows(0).getAs[java.sql.Timestamp](0))
|
||||
val epochMillis = java.time.LocalTime.ofNanoOfDay(
|
||||
TimeUnit.MILLISECONDS.toNanos(rows(0).getAs[Int](0)))
|
||||
.atDate(java.time.LocalDate.ofEpochDay(0))
|
||||
.atZone(java.time.ZoneId.systemDefault())
|
||||
.toInstant()
|
||||
.toEpochMilli()
|
||||
cal.setTime(new Date(epochMillis))
|
||||
assert(cal.get(Calendar.HOUR_OF_DAY) === 12)
|
||||
assert(cal.get(Calendar.MINUTE) === 34)
|
||||
assert(cal.get(Calendar.SECOND) === 56)
|
||||
|
@ -625,9 +632,26 @@ class JDBCSuite extends QueryTest
|
|||
assert(cal.get(Calendar.HOUR) === 11)
|
||||
assert(cal.get(Calendar.MINUTE) === 22)
|
||||
assert(cal.get(Calendar.SECOND) === 33)
|
||||
assert(cal.get(Calendar.MILLISECOND) === 543)
|
||||
assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543000)
|
||||
}
|
||||
|
||||
test("SPARK-33888: test TIME types") {
|
||||
val rows = spark.read.jdbc(
|
||||
urlWithUserAndPass, "TEST.TIMETYPES", new Properties()).collect()
|
||||
val cachedRows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties())
|
||||
.cache().collect()
|
||||
val expectedTimeRaw = java.sql.Time.valueOf("12:34:56")
|
||||
val expectedTimeMillis = Math.toIntExact(
|
||||
java.util.concurrent.TimeUnit.NANOSECONDS.toMillis(
|
||||
expectedTimeRaw.toLocalTime().toNanoOfDay()
|
||||
)
|
||||
)
|
||||
assert(rows(0).getAs[Int](0) === expectedTimeMillis)
|
||||
assert(rows(1).getAs[Int](0) === expectedTimeMillis)
|
||||
assert(cachedRows(0).getAs[Int](0) === expectedTimeMillis)
|
||||
}
|
||||
|
||||
test("test DATE types") {
|
||||
val rows = spark.read.jdbc(
|
||||
urlWithUserAndPass, "TEST.TIMETYPES", new Properties()).collect()
|
||||
|
|
Loading…
Reference in a new issue