[SPARK-7469] [SQL] DAG visualization: show SQL query operators
The DAG visualization currently displays only low-level Spark primitives (e.g. `map`, `reduceByKey`, `filter` etc.). For SQL, these aren't particularly useful. Instead, we should display higher level physical operators (e.g. `Filter`, `Exchange`, `ShuffleHashJoin`). cc marmbrus
-----------------
**Before**
<img src="https://issues.apache.org/jira/secure/attachment/12731586/before.png" width="600px"/>
-----------------
**After** (Pay attention to the words)
<img src="https://issues.apache.org/jira/secure/attachment/12731587/after.png" width="600px"/>
-----------------
Author: Andrew Or <andrew@databricks.com>
Closes #5999 from andrewor14/dag-viz-sql and squashes the following commits:
0db23a4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-sql
1e211db [Andrew Or] Update comment
0d49fd6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-sql
ffd237a [Andrew Or] Fix style
202dac1 [Andrew Or] Make ignoreParent false by default
e61b1ab [Andrew Or] Visualize SQL operators, not low-level Spark primitives
569034a [Andrew Or] Add a flag to ignore parent settings and scopes
(cherry picked from commit bd61f07039
)
Signed-off-by: Andrew Or <andrew@databricks.com>
This commit is contained in:
parent
1eae47620d
commit
cafffd0c29
|
@ -102,16 +102,21 @@ private[spark] object RDDOperationScope {
|
|||
/**
|
||||
* Execute the given body such that all RDDs created in this body will have the same scope.
|
||||
*
|
||||
* If nesting is allowed, this concatenates the previous scope with the new one in a way that
|
||||
* signifies the hierarchy. Otherwise, if nesting is not allowed, then any children calls to
|
||||
* this method executed in the body will have no effect.
|
||||
* If nesting is allowed, any subsequent calls to this method in the given body will instantiate
|
||||
* child scopes that are nested within our scope. Otherwise, these calls will take no effect.
|
||||
*
|
||||
* Additionally, the caller of this method may optionally ignore the configurations and scopes
|
||||
* set by the higher level caller. In this case, this method will ignore the parent caller's
|
||||
* intention to disallow nesting, and the new scope instantiated will not have a parent. This
|
||||
* is useful for scoping physical operations in Spark SQL, for instance.
|
||||
*
|
||||
* Note: Return statements are NOT allowed in body.
|
||||
*/
|
||||
private[spark] def withScope[T](
|
||||
sc: SparkContext,
|
||||
name: String,
|
||||
allowNesting: Boolean)(body: => T): T = {
|
||||
allowNesting: Boolean,
|
||||
ignoreParent: Boolean = false)(body: => T): T = {
|
||||
// Save the old scope to restore it later
|
||||
val scopeKey = SparkContext.RDD_SCOPE_KEY
|
||||
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
|
||||
|
@ -119,8 +124,11 @@ private[spark] object RDDOperationScope {
|
|||
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
|
||||
val oldNoOverride = sc.getLocalProperty(noOverrideKey)
|
||||
try {
|
||||
// Set the scope only if the higher level caller allows us to do so
|
||||
if (sc.getLocalProperty(noOverrideKey) == null) {
|
||||
if (ignoreParent) {
|
||||
// Ignore all parent settings and scopes and start afresh with our own root scope
|
||||
sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
|
||||
} else if (sc.getLocalProperty(noOverrideKey) == null) {
|
||||
// Otherwise, set the scope only if the higher level caller allows us to do so
|
||||
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
|
||||
}
|
||||
// Optionally disallow the child body to override our scope
|
||||
|
|
|
@ -267,7 +267,7 @@ private[sql] case class InMemoryColumnarTableScan(
|
|||
|
||||
private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
if (enableAccumulators) {
|
||||
readPartitions.setValue(0)
|
||||
readBatches.setValue(0)
|
||||
|
|
|
@ -121,7 +121,7 @@ case class Aggregate(
|
|||
}
|
||||
}
|
||||
|
||||
override def execute(): RDD[Row] = attachTree(this, "execute") {
|
||||
protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
|
||||
if (groupingExpressions.isEmpty) {
|
||||
child.execute().mapPartitions { iter =>
|
||||
val buffer = newAggregateBuffer()
|
||||
|
|
|
@ -109,7 +109,7 @@ case class Exchange(
|
|||
serializer
|
||||
}
|
||||
|
||||
override def execute(): RDD[Row] = attachTree(this , "execute") {
|
||||
protected override def doExecute(): RDD[Row] = attachTree(this , "execute") {
|
||||
newPartitioning match {
|
||||
case HashPartitioning(expressions, numPartitions) =>
|
||||
// TODO: Eliminate redundant expressions in grouping key and value.
|
||||
|
|
|
@ -106,7 +106,7 @@ private[sql] case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlCon
|
|||
|
||||
/** Physical plan node for scanning data from an RDD. */
|
||||
private[sql] case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
|
||||
override def execute(): RDD[Row] = rdd
|
||||
protected override def doExecute(): RDD[Row] = rdd
|
||||
}
|
||||
|
||||
/** Logical plan node for scanning data from a local collection. */
|
||||
|
|
|
@ -43,7 +43,7 @@ case class Expand(
|
|||
// as UNKNOWN partitioning
|
||||
override def outputPartitioning: Partitioning = UnknownPartitioning(0)
|
||||
|
||||
override def execute(): RDD[Row] = attachTree(this, "execute") {
|
||||
protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
|
||||
child.execute().mapPartitions { iter =>
|
||||
// TODO Move out projection objects creation and transfer to
|
||||
// workers via closure. However we can't assume the Projection
|
||||
|
|
|
@ -46,7 +46,7 @@ case class Generate(
|
|||
|
||||
val boundGenerator = BindReferences.bindReference(generator, child.output)
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
if (join) {
|
||||
child.execute().mapPartitions { iter =>
|
||||
val nullValues = Seq.fill(generator.elementTypes.size)(Literal(null))
|
||||
|
|
|
@ -66,7 +66,7 @@ case class GeneratedAggregate(
|
|||
|
||||
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
val aggregatesToCompute = aggregateExpressions.flatMap { a =>
|
||||
a.collect { case agg: AggregateExpression => agg}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ private[sql] case class LocalTableScan(output: Seq[Attribute], rows: Seq[Row]) e
|
|||
|
||||
private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
|
||||
|
||||
override def execute(): RDD[Row] = rdd
|
||||
protected override def doExecute(): RDD[Row] = rdd
|
||||
|
||||
|
||||
override def executeCollect(): Array[Row] = {
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
|
|||
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.rdd.{RDD, RDDOperationScope}
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees}
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
|
@ -79,14 +79,25 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
|
|||
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
|
||||
|
||||
/**
|
||||
* Runs this query returning the result as an RDD.
|
||||
* Returns the result of this query as an RDD[Row] by delegating to doExecute
|
||||
* after adding query plan information to created RDDs for visualization.
|
||||
* Concrete implementations of SparkPlan should override doExecute instead.
|
||||
*/
|
||||
def execute(): RDD[Row]
|
||||
final def execute(): RDD[Row] = {
|
||||
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
|
||||
doExecute()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Overridden by concrete implementations of SparkPlan.
|
||||
* Produces the result of the query as an RDD[Row]
|
||||
*/
|
||||
protected def doExecute(): RDD[Row]
|
||||
|
||||
/**
|
||||
* Runs this query returning the result as an array.
|
||||
*/
|
||||
|
||||
def executeCollect(): Array[Row] = {
|
||||
execute().mapPartitions { iter =>
|
||||
val converter = CatalystTypeConverters.createToScalaConverter(schema)
|
||||
|
|
|
@ -112,7 +112,7 @@ case class Window(
|
|||
}
|
||||
}
|
||||
|
||||
def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
child.execute().mapPartitions { iter =>
|
||||
new Iterator[Row] {
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
|
|||
|
||||
@transient lazy val buildProjection = newMutableProjection(projectList, child.output)
|
||||
|
||||
override def execute(): RDD[Row] = child.execute().mapPartitions { iter =>
|
||||
protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter =>
|
||||
val resuableProjection = buildProjection()
|
||||
iter.map(resuableProjection)
|
||||
}
|
||||
|
@ -54,7 +54,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
|
|||
|
||||
@transient lazy val conditionEvaluator: (Row) => Boolean = newPredicate(condition, child.output)
|
||||
|
||||
override def execute(): RDD[Row] = child.execute().mapPartitions { iter =>
|
||||
protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter =>
|
||||
iter.filter(conditionEvaluator)
|
||||
}
|
||||
|
||||
|
@ -83,7 +83,7 @@ case class Sample(
|
|||
override def output: Seq[Attribute] = child.output
|
||||
|
||||
// TODO: How to pick seed?
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
if (withReplacement) {
|
||||
child.execute().map(_.copy()).sample(withReplacement, upperBound - lowerBound, seed)
|
||||
} else {
|
||||
|
@ -99,7 +99,7 @@ case class Sample(
|
|||
case class Union(children: Seq[SparkPlan]) extends SparkPlan {
|
||||
// TODO: attributes output by union should be distinct for nullability purposes
|
||||
override def output: Seq[Attribute] = children.head.output
|
||||
override def execute(): RDD[Row] = sparkContext.union(children.map(_.execute()))
|
||||
protected override def doExecute(): RDD[Row] = sparkContext.union(children.map(_.execute()))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -124,7 +124,7 @@ case class Limit(limit: Int, child: SparkPlan)
|
|||
|
||||
override def executeCollect(): Array[Row] = child.executeTake(limit)
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) {
|
||||
child.execute().mapPartitions { iter =>
|
||||
iter.take(limit).map(row => (false, row.copy()))
|
||||
|
@ -166,7 +166,7 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
|
|||
|
||||
// TODO: Terminal split should be implemented differently from non-terminal split.
|
||||
// TODO: Pick num splits based on |limit|.
|
||||
override def execute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1)
|
||||
protected override def doExecute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1)
|
||||
|
||||
override def outputOrdering: Seq[SortOrder] = sortOrder
|
||||
}
|
||||
|
@ -186,7 +186,7 @@ case class Sort(
|
|||
override def requiredChildDistribution: Seq[Distribution] =
|
||||
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
|
||||
|
||||
override def execute(): RDD[Row] = attachTree(this, "sort") {
|
||||
protected override def doExecute(): RDD[Row] = attachTree(this, "sort") {
|
||||
child.execute().mapPartitions( { iterator =>
|
||||
val ordering = newOrdering(sortOrder, child.output)
|
||||
iterator.map(_.copy()).toArray.sorted(ordering).iterator
|
||||
|
@ -214,7 +214,7 @@ case class ExternalSort(
|
|||
override def requiredChildDistribution: Seq[Distribution] =
|
||||
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
|
||||
|
||||
override def execute(): RDD[Row] = attachTree(this, "sort") {
|
||||
protected override def doExecute(): RDD[Row] = attachTree(this, "sort") {
|
||||
child.execute().mapPartitions( { iterator =>
|
||||
val ordering = newOrdering(sortOrder, child.output)
|
||||
val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
|
||||
|
@ -244,7 +244,7 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
|
|||
override def requiredChildDistribution: Seq[Distribution] =
|
||||
if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
child.execute().mapPartitions { iter =>
|
||||
val hashSet = new scala.collection.mutable.HashSet[Row]()
|
||||
|
||||
|
@ -270,7 +270,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
|
|||
extends UnaryNode {
|
||||
override def output: Seq[Attribute] = child.output
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
|
||||
}
|
||||
}
|
||||
|
@ -285,7 +285,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
|
|||
case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
|
||||
override def output: Seq[Attribute] = left.output
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
|
||||
}
|
||||
}
|
||||
|
@ -299,7 +299,7 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
|
|||
case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
|
||||
override def output: Seq[Attribute] = children.head.output
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
|
||||
}
|
||||
}
|
||||
|
@ -314,5 +314,5 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
|
|||
case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
|
||||
def children: Seq[SparkPlan] = child :: Nil
|
||||
|
||||
def execute(): RDD[Row] = child.execute()
|
||||
protected override def doExecute(): RDD[Row] = child.execute()
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
|
|||
|
||||
override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
val converted = sideEffectResult.map(r =>
|
||||
CatalystTypeConverters.convertToCatalyst(r, schema).asInstanceOf[Row])
|
||||
sqlContext.sparkContext.parallelize(converted, 1)
|
||||
|
|
|
@ -125,7 +125,7 @@ package object debug {
|
|||
}
|
||||
}
|
||||
|
||||
def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
child.execute().mapPartitions { iter =>
|
||||
new Iterator[Row] {
|
||||
def hasNext: Boolean = iter.hasNext
|
||||
|
@ -193,7 +193,7 @@ package object debug {
|
|||
|
||||
def children: List[SparkPlan] = child :: Nil
|
||||
|
||||
def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
child.execute().map { row =>
|
||||
try typeCheck(row, child.schema) catch {
|
||||
case e: Exception =>
|
||||
|
|
|
@ -66,7 +66,7 @@ case class BroadcastHashJoin(
|
|||
sparkContext.broadcast(hashed)
|
||||
}
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
val broadcastRelation = Await.result(broadcastFuture, timeout)
|
||||
|
||||
streamedPlan.execute().mapPartitions { streamedIter =>
|
||||
|
|
|
@ -38,7 +38,7 @@ case class BroadcastLeftSemiJoinHash(
|
|||
|
||||
override def output: Seq[Attribute] = left.output
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
val buildIter= buildPlan.execute().map(_.copy()).collect().toIterator
|
||||
val hashSet = new java.util.HashSet[Row]()
|
||||
var currentRow: Row = null
|
||||
|
|
|
@ -61,7 +61,7 @@ case class BroadcastNestedLoopJoin(
|
|||
@transient private lazy val boundCondition =
|
||||
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
val broadcastedRelation =
|
||||
sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
|
|||
case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
|
||||
override def output: Seq[Attribute] = left.output ++ right.output
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
val leftResults = left.execute().map(_.copy())
|
||||
val rightResults = right.execute().map(_.copy())
|
||||
|
||||
|
|
|
@ -183,7 +183,7 @@ case class HashOuterJoin(
|
|||
hashTable
|
||||
}
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
val joinedRow = new JoinedRow()
|
||||
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
|
||||
// TODO this probably can be replaced by external sort (sort merged join?)
|
||||
|
|
|
@ -47,7 +47,7 @@ case class LeftSemiJoinBNL(
|
|||
@transient private lazy val boundCondition =
|
||||
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
val broadcastedRelation =
|
||||
sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ case class LeftSemiJoinHash(
|
|||
|
||||
override def output: Seq[Attribute] = left.output
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
|
||||
val hashSet = new java.util.HashSet[Row]()
|
||||
var currentRow: Row = null
|
||||
|
|
|
@ -43,7 +43,7 @@ case class ShuffledHashJoin(
|
|||
override def requiredChildDistribution: Seq[ClusteredDistribution] =
|
||||
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
|
||||
val hashed = HashedRelation(buildIter, buildSideKeyGenerator)
|
||||
hashJoin(streamIter, hashed)
|
||||
|
|
|
@ -60,7 +60,7 @@ case class SortMergeJoin(
|
|||
private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] =
|
||||
keys.map(SortOrder(_, Ascending))
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
val leftResults = left.execute().map(_.copy())
|
||||
val rightResults = right.execute().map(_.copy())
|
||||
|
||||
|
|
|
@ -228,7 +228,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:
|
|||
|
||||
def children: Seq[SparkPlan] = child :: Nil
|
||||
|
||||
def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
val childResults = child.execute().map(_.copy())
|
||||
|
||||
val parent = childResults.mapPartitions { iter =>
|
||||
|
|
|
@ -77,7 +77,7 @@ private[sql] case class ParquetTableScan(
|
|||
}
|
||||
}.toArray
|
||||
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
import parquet.filter2.compat.FilterCompat.FilterPredicateCompat
|
||||
|
||||
val sc = sqlContext.sparkContext
|
||||
|
@ -255,7 +255,7 @@ private[sql] case class InsertIntoParquetTable(
|
|||
/**
|
||||
* Inserts all rows into the Parquet file.
|
||||
*/
|
||||
override def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
// TODO: currently we do not check whether the "schema"s are compatible
|
||||
// That means if one first creates a table and then INSERTs data with
|
||||
// and incompatible schema the execution will fail. It would be nice
|
||||
|
|
|
@ -129,7 +129,7 @@ case class HiveTableScan(
|
|||
}
|
||||
}
|
||||
|
||||
override def execute(): RDD[Row] = if (!relation.hiveQlTable.isPartitioned) {
|
||||
protected override def doExecute(): RDD[Row] = if (!relation.hiveQlTable.isPartitioned) {
|
||||
hadoopReader.makeRDDForTable(relation.hiveQlTable)
|
||||
} else {
|
||||
hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))
|
||||
|
|
|
@ -258,5 +258,7 @@ case class InsertIntoHiveTable(
|
|||
|
||||
override def executeCollect(): Array[Row] = sideEffectResult.toArray
|
||||
|
||||
override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
sqlContext.sparkContext.parallelize(sideEffectResult, 1)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ case class ScriptTransformation(
|
|||
|
||||
override def otherCopyArgs: Seq[HiveContext] = sc :: Nil
|
||||
|
||||
def execute(): RDD[Row] = {
|
||||
protected override def doExecute(): RDD[Row] = {
|
||||
child.execute().mapPartitions { iter =>
|
||||
val cmd = List("/bin/bash", "-c", script)
|
||||
val builder = new ProcessBuilder(cmd)
|
||||
|
|
Loading…
Reference in a new issue