From 0b647fe69cf201b4dcbc0f4dfc0eb504a523571d Mon Sep 17 00:00:00 2001 From: Hoa Date: Mon, 4 Jan 2021 06:53:12 +0000 Subject: [PATCH] [SPARK-33888][SQL] JDBC SQL TIME type represents incorrectly as TimestampType, it should be physical Int in millis ### What changes were proposed in this pull request? JDBC SQL TIME type represents incorrectly as TimestampType, we change it to be physical Int in millis for now. ### Why are the changes needed? Currently, for JDBC, SQL TIME type represents incorrectly as Spark TimestampType. This should be represent as physical int in millis Represents a time of day, with no reference to a particular calendar, time zone or date, with a precision of one millisecond. It stores the number of milliseconds after midnight, 00:00:00.000. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Close #30902 Closes #30902 from saikocat/SPARK-33888. Lead-authored-by: Hoa Co-authored-by: Hoa Co-authored-by: Duc Hoa, Nguyen Co-authored-by: Duc Hoa, Nguyen Signed-off-by: Wenchen Fan --- .../datasources/jdbc/JdbcUtils.scala | 36 +++++++++++++++++-- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 26 +++++++++++++- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index f997e57b23..85a05f42c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.sql.{Connection, Driver, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException, SQLFeatureNotSupportedException} import java.util.Locale +import java.util.concurrent.TimeUnit import scala.util.Try import scala.util.control.NonFatal @@ -226,7 +227,7 @@ object JdbcUtils extends Logging { case java.sql.Types.SMALLINT => IntegerType case java.sql.Types.SQLXML => StringType case java.sql.Types.STRUCT => StringType - case java.sql.Types.TIME => TimestampType + case java.sql.Types.TIME => IntegerType case java.sql.Types.TIME_WITH_TIMEZONE => null case java.sql.Types.TIMESTAMP => TimestampType @@ -303,11 +304,23 @@ object JdbcUtils extends Logging { } else { rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls } - val metadata = new MetadataBuilder().putLong("scale", fieldScale) + val metadata = new MetadataBuilder() + // SPARK-33888 + // - include scale in metadata for only DECIMAL & NUMERIC + // - include TIME type metadata + // - always build the metadata + dataType match { + // scalastyle:off + case java.sql.Types.NUMERIC => metadata.putLong("scale", fieldScale) + case java.sql.Types.DECIMAL => metadata.putLong("scale", fieldScale) + case java.sql.Types.TIME => metadata.putBoolean("logical_time_type", true) + case _ => + // scalastyle:on + } val columnType = dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse( getCatalystType(dataType, fieldSize, fieldScale, isSigned)) - fields(i) = StructField(columnName, columnType, nullable) + fields(i) = StructField(columnName, columnType, nullable, metadata.build()) i = i + 1 } new StructType(fields) @@ -408,6 +421,23 @@ object JdbcUtils extends Logging { (rs: ResultSet, row: InternalRow, pos: Int) => row.setFloat(pos, rs.getFloat(pos + 1)) + + // SPARK-33888 - sql TIME type represents as physical int in millis + // Represents a time of day, with no reference to a particular calendar, + // time zone or date, with a precision of one millisecond. + // It stores the number of milliseconds after midnight, 00:00:00.000. + case IntegerType if metadata.contains("logical_time_type") => + (rs: ResultSet, row: InternalRow, pos: Int) => { + val rawTime = rs.getTime(pos + 1) + if (rawTime != null) { + val rawTimeInNano = rawTime.toLocalTime().toNanoOfDay() + val timeInMillis = Math.toIntExact(TimeUnit.NANOSECONDS.toMillis(rawTimeInNano)) + row.setInt(pos, timeInMillis) + } else { + row.update(pos, null) + } + } + case IntegerType => (rs: ResultSet, row: InternalRow, pos: Int) => row.setInt(pos, rs.getInt(pos + 1)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index ede5fe538a..639fd0e6fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.jdbc import java.math.BigDecimal import java.sql.{Date, DriverManager, SQLException, Timestamp} import java.util.{Calendar, GregorianCalendar, Properties} +import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -610,7 +611,13 @@ class JDBCSuite extends QueryTest test("H2 time types") { val rows = sql("SELECT * FROM timetypes").collect() val cal = new GregorianCalendar(java.util.Locale.ROOT) - cal.setTime(rows(0).getAs[java.sql.Timestamp](0)) + val epochMillis = java.time.LocalTime.ofNanoOfDay( + TimeUnit.MILLISECONDS.toNanos(rows(0).getAs[Int](0))) + .atDate(java.time.LocalDate.ofEpochDay(0)) + .atZone(java.time.ZoneId.systemDefault()) + .toInstant() + .toEpochMilli() + cal.setTime(new Date(epochMillis)) assert(cal.get(Calendar.HOUR_OF_DAY) === 12) assert(cal.get(Calendar.MINUTE) === 34) assert(cal.get(Calendar.SECOND) === 56) @@ -625,9 +632,26 @@ class JDBCSuite extends QueryTest assert(cal.get(Calendar.HOUR) === 11) assert(cal.get(Calendar.MINUTE) === 22) assert(cal.get(Calendar.SECOND) === 33) + assert(cal.get(Calendar.MILLISECOND) === 543) assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543000) } + test("SPARK-33888: test TIME types") { + val rows = spark.read.jdbc( + urlWithUserAndPass, "TEST.TIMETYPES", new Properties()).collect() + val cachedRows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties()) + .cache().collect() + val expectedTimeRaw = java.sql.Time.valueOf("12:34:56") + val expectedTimeMillis = Math.toIntExact( + java.util.concurrent.TimeUnit.NANOSECONDS.toMillis( + expectedTimeRaw.toLocalTime().toNanoOfDay() + ) + ) + assert(rows(0).getAs[Int](0) === expectedTimeMillis) + assert(rows(1).getAs[Int](0) === expectedTimeMillis) + assert(cachedRows(0).getAs[Int](0) === expectedTimeMillis) + } + test("test DATE types") { val rows = spark.read.jdbc( urlWithUserAndPass, "TEST.TIMETYPES", new Properties()).collect()