[SPARK-9984] [SQL] Create local physical operator interface.

This pull request creates a new operator interface that is more similar to traditional database query iterators (with open/close/next/get).

These local operators are not currently used anywhere, but will become the basis for SPARK-9983 (local physical operators for query execution).

cc zsxwing

Author: Reynold Xin <rxin@databricks.com>

Closes #8212 from rxin/SPARK-9984.
This commit is contained in:
Reynold Xin 2015-08-14 21:12:11 -07:00
parent 6c4fdbec33
commit 609ce3c07d
4 changed files with 224 additions and 0 deletions

View file

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
case class FilterNode(condition: Expression, child: LocalNode) extends UnaryLocalNode {
private[this] var predicate: (InternalRow) => Boolean = _
override def output: Seq[Attribute] = child.output
override def open(): Unit = {
child.open()
predicate = GeneratePredicate.generate(condition, child.output)
}
override def next(): Boolean = {
var found = false
while (child.next() && !found) {
found = predicate.apply(child.get())
}
found
}
override def get(): InternalRow = child.get()
override def close(): Unit = child.close()
}

View file

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.types.StructType
/**
* A local physical operator, in the form of an iterator.
*
* Before consuming the iterator, open function must be called.
* After consuming the iterator, close function must be called.
*/
abstract class LocalNode extends TreeNode[LocalNode] {
def output: Seq[Attribute]
/**
* Initializes the iterator state. Must be called before calling `next()`.
*
* Implementations of this must also call the `open()` function of its children.
*/
def open(): Unit
/**
* Advances the iterator to the next tuple. Returns true if there is at least one more tuple.
*/
def next(): Boolean
/**
* Returns the current tuple.
*/
def get(): InternalRow
/**
* Closes the iterator and releases all resources.
*
* Implementations of this must also call the `close()` function of its children.
*/
def close(): Unit
/**
* Returns the content of the iterator from the beginning to the end in the form of a Scala Seq.
*/
def collect(): Seq[Row] = {
val converter = CatalystTypeConverters.createToScalaConverter(StructType.fromAttributes(output))
val result = new scala.collection.mutable.ArrayBuffer[Row]
open()
while (next()) {
result += converter.apply(get()).asInstanceOf[Row]
}
close()
result
}
}
abstract class LeafLocalNode extends LocalNode {
override def children: Seq[LocalNode] = Seq.empty
}
abstract class UnaryLocalNode extends LocalNode {
def child: LocalNode
override def children: Seq[LocalNode] = Seq(child)
}

View file

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, Attribute, NamedExpression}
case class ProjectNode(projectList: Seq[NamedExpression], child: LocalNode) extends UnaryLocalNode {
private[this] var project: UnsafeProjection = _
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
override def open(): Unit = {
project = UnsafeProjection.create(projectList, child.output)
child.open()
}
override def next(): Boolean = child.next()
override def get(): InternalRow = {
project.apply(child.get())
}
override def close(): Unit = child.close()
}

View file

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
/**
* An operator that scans some local data collection in the form of Scala Seq.
*/
case class SeqScanNode(output: Seq[Attribute], data: Seq[InternalRow]) extends LeafLocalNode {
private[this] var iterator: Iterator[InternalRow] = _
private[this] var currentRow: InternalRow = _
override def open(): Unit = {
iterator = data.iterator
}
override def next(): Boolean = {
if (iterator.hasNext) {
currentRow = iterator.next()
true
} else {
false
}
}
override def get(): InternalRow = currentRow
override def close(): Unit = {
// Do nothing
}
}