[SPARK-6743] [SQL] Fix empty projections of cached data
Author: Michael Armbrust <michael@databricks.com> Closes #6165 from marmbrus/wrongColumn and squashes the following commits: 4fad158 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into wrongColumn aad7eab [Michael Armbrust] rxins comments f1e8df1 [Michael Armbrust] [SPARK-6743][SQL] Fix empty projections of cached data
This commit is contained in:
parent
4e5220c317
commit
3b68cb0430
|
@ -324,6 +324,7 @@ object Hive {
|
||||||
|import org.apache.spark.sql.functions._
|
|import org.apache.spark.sql.functions._
|
||||||
|import org.apache.spark.sql.hive._
|
|import org.apache.spark.sql.hive._
|
||||||
|import org.apache.spark.sql.hive.test.TestHive._
|
|import org.apache.spark.sql.hive.test.TestHive._
|
||||||
|
|import org.apache.spark.sql.hive.test.TestHive.implicits._
|
||||||
|import org.apache.spark.sql.types._""".stripMargin,
|
|import org.apache.spark.sql.types._""".stripMargin,
|
||||||
cleanupCommands in console := "sparkContext.stop()",
|
cleanupCommands in console := "sparkContext.stop()",
|
||||||
// Some of our log4j jars make it impossible to submit jobs from this JVM to Hive Map/Reduce
|
// Some of our log4j jars make it impossible to submit jobs from this JVM to Hive Map/Reduce
|
||||||
|
|
|
@ -55,6 +55,9 @@ object Row {
|
||||||
// TODO: Improve the performance of this if used in performance critical part.
|
// TODO: Improve the performance of this if used in performance critical part.
|
||||||
new GenericRow(rows.flatMap(_.toSeq).toArray)
|
new GenericRow(rows.flatMap(_.toSeq).toArray)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Returns an empty row. */
|
||||||
|
val empty = apply()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -314,7 +314,7 @@ private[sql] case class InMemoryColumnarTableScan(
|
||||||
columnAccessors(i).extractTo(nextRow, i)
|
columnAccessors(i).extractTo(nextRow, i)
|
||||||
i += 1
|
i += 1
|
||||||
}
|
}
|
||||||
nextRow
|
if (attributes.isEmpty) Row.empty else nextRow
|
||||||
}
|
}
|
||||||
|
|
||||||
override def hasNext: Boolean = columnAccessors(0).hasNext
|
override def hasNext: Boolean = columnAccessors(0).hasNext
|
||||||
|
|
|
@ -39,6 +39,19 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
|
||||||
import org.apache.spark.sql.test.TestSQLContext.implicits._
|
import org.apache.spark.sql.test.TestSQLContext.implicits._
|
||||||
val sqlCtx = TestSQLContext
|
val sqlCtx = TestSQLContext
|
||||||
|
|
||||||
|
test("SPARK-6743: no columns from cache") {
|
||||||
|
Seq(
|
||||||
|
(83, 0, 38),
|
||||||
|
(26, 0, 79),
|
||||||
|
(43, 81, 24)
|
||||||
|
).toDF("a", "b", "c").registerTempTable("cachedData")
|
||||||
|
|
||||||
|
cacheTable("cachedData")
|
||||||
|
checkAnswer(
|
||||||
|
sql("SELECT t1.b FROM cachedData, cachedData t1 GROUP BY t1.b"),
|
||||||
|
Row(0) :: Row(81) :: Nil)
|
||||||
|
}
|
||||||
|
|
||||||
test("self join with aliases") {
|
test("self join with aliases") {
|
||||||
Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str").registerTempTable("df")
|
Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str").registerTempTable("df")
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue