[SPARK-14536][SQL] fix to handle null value in array type column for postgres.

## What changes were proposed in this pull request?

JDBC  read  is failing with  NPE due to missing null value check for array data type if the source table has null values in the array type column.  For null values Resultset.getArray()  returns null.
This PR adds null safe check to the Resultset.getArray() value before invoking method on the Array object.
## How was this patch tested?

Updated the PostgresIntegration test suite to test null values. Ran docker integration tests on my laptop.

Author: sureshthalamati <suresh.thalamati@gmail.com>

Closes #15192 from sureshthalamati/jdbc_array_null_fix-SPARK-14536.
This commit is contained in:
sureshthalamati 2017-01-20 19:23:20 -08:00 committed by gatorsmile
parent 54268b42dc
commit f174cdc747
2 changed files with 13 additions and 5 deletions

View file

@ -51,12 +51,17 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
+ "B'1000100101', E'\\\\xDEADBEEF', true, '172.16.0.42', '192.168.0.0/16', "
+ """'{1, 2}', '{"a", null, "b"}', '{0.11, 0.22}', '{0.11, 0.22}', 'd1', 1.01, 1)"""
).executeUpdate()
conn.prepareStatement("INSERT INTO bar VALUES (null, null, null, null, null, "
+ "null, null, null, null, null, "
+ "null, null, null, null, null, null, null)"
).executeUpdate()
}
test("Type mapping for various types") {
val df = sqlContext.read.jdbc(jdbcUrl, "bar", new Properties)
val rows = df.collect()
assert(rows.length == 1)
val rows = df.collect().sortBy(_.toString())
assert(rows.length == 2)
// Test the types, and values using the first row.
val types = rows(0).toSeq.map(x => x.getClass)
assert(types.length == 17)
assert(classOf[String].isAssignableFrom(types(0)))
@ -96,6 +101,9 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(rows(0).getString(14) == "d1")
assert(rows(0).getFloat(15) == 1.01f)
assert(rows(0).getShort(16) == 1)
// Test reading null values using the second row.
assert(0.until(16).forall(rows(1).isNullAt(_)))
}
test("Basic write test") {

View file

@ -465,9 +465,9 @@ object JdbcUtils extends Logging {
}
(rs: ResultSet, row: InternalRow, pos: Int) =>
val array = nullSafeConvert[Object](
rs.getArray(pos + 1).getArray,
array => new GenericArrayData(elementConversion.apply(array)))
val array = nullSafeConvert[java.sql.Array](
input = rs.getArray(pos + 1),
array => new GenericArrayData(elementConversion.apply(array.getArray)))
row.update(pos, array)
case _ => throw new IllegalArgumentException(s"Unsupported type ${dt.simpleString}")