prevent mysql driver from pulling entire resultset into memory. explicitly close resultset and statement.

This commit is contained in:
koeninger 2013-04-22 21:12:52 -05:00
parent b2a3f24dde
commit dfac0aa5c2

View file

@ -15,7 +15,7 @@ import spark.util.NextIterator
@param lowerBound the minimum value of the first placeholder
@param upperBound the maximum value of the second placeholder
The lower and upper bounds are inclusive.
@param numPartitions the amount of parallelism.
@param numPartitions the number of partitions.
Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
the query would be executed twice, once with (1, 10) and once with (11, 20)
@param mapRow a function from a ResultSet to a single row of the desired result type(s).
@ -40,10 +40,15 @@ class JdbcRDD[T: ClassManifest](
toArray
override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
context.addOnCompleteCallback{ () => closeIfNeeded() }
val part = thePart.asInstanceOf[JdbcPartition]
val conn = getConnection()
context.addOnCompleteCallback{ () => closeIfNeeded() }
val stmt = conn.prepareStatement(sql)
val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
// force mysql driver to stream rather than pull entire resultset into memory
if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
stmt.setFetchSize(Integer.MIN_VALUE)
logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ")
}
stmt.setLong(1, part.lower)
stmt.setLong(2, part.upper)
val rs = stmt.executeQuery()
@ -59,8 +64,18 @@ class JdbcRDD[T: ClassManifest](
override def close() {
try {
logInfo("closing connection")
conn.close()
if (null != rs && ! rs.isClosed()) rs.close()
} catch {
case e: Exception => logWarning("Exception closing resultset", e)
}
try {
if (null != stmt && ! stmt.isClosed()) stmt.close()
} catch {
case e: Exception => logWarning("Exception closing statement", e)
}
try {
if (null != conn && ! stmt.isClosed()) conn.close()
logInfo("closed connection")
} catch {
case e: Exception => logWarning("Exception closing connection", e)
}