[SPARK-5472][SQL] Fix Scala code style
Fix Scala code style. Author: Hung Lin <hung@zoomdata.com> Closes #4464 from hunglin/SPARK-5472 and squashes the following commits: ef7a3b3 [Hung Lin] SPARK-5472: fix scala style
This commit is contained in:
parent
4396dfb37f
commit
4575c5643a
|
@ -17,13 +17,10 @@
|
||||||
|
|
||||||
package org.apache.spark.sql.jdbc
|
package org.apache.spark.sql.jdbc
|
||||||
|
|
||||||
import java.sql.{Connection, DatabaseMetaData, DriverManager, ResultSet, ResultSetMetaData, SQLException}
|
import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, SQLException}
|
||||||
import scala.collection.mutable.ArrayBuffer
|
|
||||||
|
|
||||||
import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
|
import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.util.NextIterator
|
|
||||||
import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
|
|
||||||
import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow}
|
import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow}
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
import org.apache.spark.sql.sources._
|
import org.apache.spark.sql.sources._
|
||||||
|
@ -100,7 +97,7 @@ private[sql] object JDBCRDD extends Logging {
|
||||||
try {
|
try {
|
||||||
val rsmd = rs.getMetaData
|
val rsmd = rs.getMetaData
|
||||||
val ncols = rsmd.getColumnCount
|
val ncols = rsmd.getColumnCount
|
||||||
var fields = new Array[StructField](ncols);
|
val fields = new Array[StructField](ncols)
|
||||||
var i = 0
|
var i = 0
|
||||||
while (i < ncols) {
|
while (i < ncols) {
|
||||||
val columnName = rsmd.getColumnName(i + 1)
|
val columnName = rsmd.getColumnName(i + 1)
|
||||||
|
@ -176,23 +173,27 @@ private[sql] object JDBCRDD extends Logging {
|
||||||
*
|
*
|
||||||
* @return An RDD representing "SELECT requiredColumns FROM fqTable".
|
* @return An RDD representing "SELECT requiredColumns FROM fqTable".
|
||||||
*/
|
*/
|
||||||
def scanTable(sc: SparkContext,
|
def scanTable(
|
||||||
schema: StructType,
|
sc: SparkContext,
|
||||||
driver: String,
|
schema: StructType,
|
||||||
url: String,
|
driver: String,
|
||||||
fqTable: String,
|
url: String,
|
||||||
requiredColumns: Array[String],
|
fqTable: String,
|
||||||
filters: Array[Filter],
|
requiredColumns: Array[String],
|
||||||
parts: Array[Partition]): RDD[Row] = {
|
filters: Array[Filter],
|
||||||
|
parts: Array[Partition]): RDD[Row] = {
|
||||||
|
|
||||||
val prunedSchema = pruneSchema(schema, requiredColumns)
|
val prunedSchema = pruneSchema(schema, requiredColumns)
|
||||||
|
|
||||||
return new JDBCRDD(sc,
|
return new
|
||||||
getConnector(driver, url),
|
JDBCRDD(
|
||||||
prunedSchema,
|
sc,
|
||||||
fqTable,
|
getConnector(driver, url),
|
||||||
requiredColumns,
|
prunedSchema,
|
||||||
filters,
|
fqTable,
|
||||||
parts)
|
requiredColumns,
|
||||||
|
filters,
|
||||||
|
parts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -412,6 +413,5 @@ private[sql] class JDBCRDD(
|
||||||
gotNext = false
|
gotNext = false
|
||||||
nextValue
|
nextValue
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -96,7 +96,8 @@ private[sql] class DefaultSource extends RelationProvider {
|
||||||
|
|
||||||
if (driver != null) Class.forName(driver)
|
if (driver != null) Class.forName(driver)
|
||||||
|
|
||||||
if ( partitionColumn != null
|
if (
|
||||||
|
partitionColumn != null
|
||||||
&& (lowerBound == null || upperBound == null || numPartitions == null)) {
|
&& (lowerBound == null || upperBound == null || numPartitions == null)) {
|
||||||
sys.error("Partitioning incompletely specified")
|
sys.error("Partitioning incompletely specified")
|
||||||
}
|
}
|
||||||
|
@ -104,30 +105,34 @@ private[sql] class DefaultSource extends RelationProvider {
|
||||||
val partitionInfo = if (partitionColumn == null) {
|
val partitionInfo = if (partitionColumn == null) {
|
||||||
null
|
null
|
||||||
} else {
|
} else {
|
||||||
JDBCPartitioningInfo(partitionColumn,
|
JDBCPartitioningInfo(
|
||||||
lowerBound.toLong, upperBound.toLong,
|
partitionColumn,
|
||||||
numPartitions.toInt)
|
lowerBound.toLong,
|
||||||
|
upperBound.toLong,
|
||||||
|
numPartitions.toInt)
|
||||||
}
|
}
|
||||||
val parts = JDBCRelation.columnPartition(partitionInfo)
|
val parts = JDBCRelation.columnPartition(partitionInfo)
|
||||||
JDBCRelation(url, table, parts)(sqlContext)
|
JDBCRelation(url, table, parts)(sqlContext)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private[sql] case class JDBCRelation(url: String,
|
private[sql] case class JDBCRelation(
|
||||||
table: String,
|
url: String,
|
||||||
parts: Array[Partition])(
|
table: String,
|
||||||
@transient val sqlContext: SQLContext)
|
parts: Array[Partition])(@transient val sqlContext: SQLContext) extends PrunedFilteredScan {
|
||||||
extends PrunedFilteredScan {
|
|
||||||
|
|
||||||
override val schema = JDBCRDD.resolveTable(url, table)
|
override val schema = JDBCRDD.resolveTable(url, table)
|
||||||
|
|
||||||
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]) = {
|
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]) = {
|
||||||
val driver: String = DriverManager.getDriver(url).getClass.getCanonicalName
|
val driver: String = DriverManager.getDriver(url).getClass.getCanonicalName
|
||||||
JDBCRDD.scanTable(sqlContext.sparkContext,
|
JDBCRDD.scanTable(
|
||||||
schema,
|
sqlContext.sparkContext,
|
||||||
driver, url,
|
schema,
|
||||||
table,
|
driver,
|
||||||
requiredColumns, filters,
|
url,
|
||||||
parts)
|
table,
|
||||||
|
requiredColumns,
|
||||||
|
filters,
|
||||||
|
parts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue