In-Mem Hash Join, Plan Optimizer
parent
ad573b0af6
commit
e44a8db702
|
@ -5,6 +5,7 @@ sealed trait Expression
|
|||
def eval(scope: Map[Variable, Constant]): Constant
|
||||
def printString: String
|
||||
|
||||
def vars: Set[Variable] = children.foldLeft(Set.empty){ _ ++ _.vars}
|
||||
def children: Seq[Expression]
|
||||
def rebuild(c: Seq[Expression]): Expression
|
||||
|
||||
|
@ -13,6 +14,8 @@ sealed trait Expression
|
|||
val v = rebuild(children.map { _.transformUp(op) })
|
||||
op.lift(v).getOrElse(v)
|
||||
}
|
||||
|
||||
def conjunction: Seq[Expression] = Seq(this)
|
||||
}
|
||||
|
||||
///////////////////////////////////////////
|
||||
|
@ -46,6 +49,7 @@ case class Variable(name: String) extends Expression
|
|||
)
|
||||
}
|
||||
|
||||
override def vars: Set[Variable] = Set(this)
|
||||
def printString = name
|
||||
def children: Seq[Expression] = Seq.empty
|
||||
def rebuild(c: Seq[Expression]): Expression = this
|
||||
|
@ -62,6 +66,11 @@ case class Variable(name: String) extends Expression
|
|||
///////////////////////////////////////////
|
||||
|
||||
object Expression {
|
||||
def and(atoms: Iterable[Expression]): Expression =
|
||||
if(atoms.isEmpty) { Constant.Boolean(true) }
|
||||
else {
|
||||
atoms.tail.foldLeft(atoms.head){ And(_, _) }
|
||||
}
|
||||
|
||||
|
||||
case class Add(a: Expression, b: Expression) extends Expression
|
||||
|
@ -195,6 +204,8 @@ object Expression {
|
|||
override def printString = s"${a.printString} AND ${b.printString}"
|
||||
def children: Seq[Expression] = Seq(a, b)
|
||||
def rebuild(c: Seq[Expression]): Expression = copy(a = c(0), b = c(1))
|
||||
override def conjunction: Seq[Expression] =
|
||||
a.conjunction ++ b.conjunction
|
||||
}
|
||||
|
||||
case class Or(a: Expression, b: Expression) extends Expression
|
||||
|
|
|
@ -5,7 +5,7 @@ case class Filter(predicate: Expression, child: Plan) extends Plan
|
|||
def schema = child.schema
|
||||
|
||||
lazy val vars: Seq[(Variable, Type)] =
|
||||
child.schema.map { x => (Variable(x._1), x._2) }
|
||||
child.schema
|
||||
|
||||
def read: Iterator[Seq[Constant]] =
|
||||
child.read.filter { row =>
|
||||
|
|
|
@ -35,5 +35,6 @@ object HackDB
|
|||
query("SELECT a, b, a+b FROM test WHERE a = 1 OR a = 2")
|
||||
query("SELECT a, b, f FROM test, best")
|
||||
query("SELECT a, b, f FROM test, best WHERE b = e")
|
||||
query("SELECT a, b, f FROM test, best WHERE e = b And a = 1")
|
||||
}
|
||||
}
|
|
@ -6,7 +6,13 @@ case class Join(
|
|||
predicate: Option[(Variable, Variable)] = None
|
||||
) extends Plan
|
||||
{
|
||||
def schema: Seq[(String, Type)] = left.schema ++ right.schema
|
||||
def schema: Seq[(Variable, Type)] = left.schema ++ right.schema
|
||||
|
||||
lazy val varIdxLeft: Map[Variable, Int] =
|
||||
left.schema.map { _._1 }.zipWithIndex.toMap
|
||||
lazy val varIdxRight: Map[Variable, Int] =
|
||||
right.schema.map { _._1 }.zipWithIndex.toMap
|
||||
|
||||
def read: Iterator[Seq[Constant]] =
|
||||
{
|
||||
predicate match {
|
||||
|
@ -21,7 +27,18 @@ case class Join(
|
|||
}
|
||||
case Some(a, b) =>
|
||||
{
|
||||
???
|
||||
val aIdx:Int = varIdxLeft(a)
|
||||
val bIdx:Int = varIdxRight(b)
|
||||
val leftBuffer:Map[Constant, Seq[Seq[Constant]]] =
|
||||
left.read.toSeq.groupBy { _(aIdx) }
|
||||
|
||||
right.read.flatMap { right =>
|
||||
leftBuffer.getOrElse(right(bIdx), Seq.empty).map { left =>
|
||||
left ++ right
|
||||
}
|
||||
}.filter { row =>
|
||||
row(aIdx) == row(bIdx)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
package net.okennedy.hackdb
|
||||
|
||||
import scala.collection.mutable
|
||||
|
||||
object Optimizer
|
||||
{
|
||||
trait PlanRule
|
||||
{
|
||||
def apply(plan: Plan): Plan
|
||||
}
|
||||
|
||||
val PLAN_RULES = Seq[PlanRule](
|
||||
PushDownSelection
|
||||
)
|
||||
|
||||
def apply(plan: Plan): Plan =
|
||||
{
|
||||
var last:Plan = null
|
||||
var current = plan
|
||||
|
||||
while(last != current){
|
||||
last = current
|
||||
for(rule <- PLAN_RULES)
|
||||
{
|
||||
current = rule(current)
|
||||
}
|
||||
}
|
||||
return current
|
||||
}
|
||||
|
||||
object PushDownSelection extends PlanRule
|
||||
{
|
||||
def apply(plan: Plan): Plan =
|
||||
{
|
||||
plan.transformDown {
|
||||
case f@Filter(predicate, Join(a, b, None)) =>
|
||||
val aVars:Set[Variable] = a.schema.map { _._1 }.toSet
|
||||
val bVars:Set[Variable] = b.schema.map { _._1 }.toSet
|
||||
|
||||
val pushDownA = mutable.Buffer[Expression]()
|
||||
val pushDownB = mutable.Buffer[Expression]()
|
||||
val joinPredicates = mutable.Buffer[(Variable, Variable)]()
|
||||
val rest = mutable.Buffer[Expression]()
|
||||
|
||||
for(atom <- predicate.conjunction){
|
||||
atom match {
|
||||
case Expression.Eq(leftVar:Variable, rightVar:Variable)
|
||||
if aVars.contains(leftVar) && bVars.contains(rightVar) =>
|
||||
joinPredicates.append( leftVar -> rightVar )
|
||||
case Expression.Eq(leftVar:Variable, rightVar:Variable)
|
||||
if aVars.contains(rightVar) && bVars.contains(leftVar) =>
|
||||
joinPredicates.append( rightVar -> leftVar )
|
||||
case e if e.vars.subsetOf(aVars) && e.vars.intersect(bVars).isEmpty =>
|
||||
pushDownA.append(e)
|
||||
case e if e.vars.subsetOf(bVars) && e.vars.intersect(aVars).isEmpty =>
|
||||
pushDownB.append(e)
|
||||
case e =>
|
||||
rest.append(e)
|
||||
}
|
||||
}
|
||||
|
||||
if(pushDownA.isEmpty && pushDownB.isEmpty && joinPredicates.isEmpty){
|
||||
/* return */ f
|
||||
} else {
|
||||
|
||||
var rewritten:Plan =
|
||||
Join(
|
||||
if(pushDownA.isEmpty){ a }
|
||||
else { Filter(Expression.and(pushDownA), a) },
|
||||
|
||||
if(pushDownB.isEmpty){ b }
|
||||
else { Filter(Expression.and(pushDownB), b) },
|
||||
|
||||
if(joinPredicates.isEmpty){ None }
|
||||
else {
|
||||
rest.appendAll( joinPredicates.tail.map { Expression.Eq(_, _) } )
|
||||
Some(joinPredicates.head)
|
||||
}
|
||||
)
|
||||
|
||||
if(!rest.isEmpty){
|
||||
rewritten = Filter(Expression.and(rest), rewritten)
|
||||
}
|
||||
|
||||
/* return */ rewritten
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -2,7 +2,7 @@ package net.okennedy.hackdb
|
|||
|
||||
trait Plan
|
||||
{
|
||||
def schema: Seq[(String, Type)]
|
||||
def schema: Seq[(Variable, Type)]
|
||||
def read: Iterator[Seq[Constant]]
|
||||
|
||||
def children: Seq[Plan]
|
||||
|
|
|
@ -2,10 +2,10 @@ package net.okennedy.hackdb
|
|||
|
||||
case class Project(exprs: Seq[Expression], child: Plan) extends Plan
|
||||
{
|
||||
def schema = exprs.map { e => (e.printString, Type.of(e, vars.toMap)) }
|
||||
def schema = exprs.map { e => (Variable(e.printString), Type.of(e, vars.toMap)) }
|
||||
|
||||
lazy val vars: Seq[(Variable, Type)] =
|
||||
child.schema.map { x => (Variable(x._1), x._2) }
|
||||
child.schema
|
||||
|
||||
def read: Iterator[Seq[Constant]] =
|
||||
child.read.map { row =>
|
||||
|
|
|
@ -22,6 +22,12 @@ case class Query(
|
|||
ret = Project(exprs, ret)
|
||||
|
||||
|
||||
println("--- Original Plan ---")
|
||||
println(ret.toString)
|
||||
|
||||
ret = Optimizer(ret)
|
||||
|
||||
println("--- Optimized Plan ---")
|
||||
println(ret.toString)
|
||||
|
||||
/* return */ ret
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
package net.okennedy.hackdb
|
||||
|
||||
case class Result(header: Seq[(String, Type)], rows: Seq[Seq[Constant]])
|
||||
case class Result(header: Seq[(Variable, Type)], rows: Seq[Seq[Constant]])
|
||||
{
|
||||
lazy val widths =
|
||||
rows.foldLeft( header.toSeq.map { _._1.size } )
|
||||
rows.foldLeft( header.toSeq.map { _._1.name.size } )
|
||||
{ (accum, row) =>
|
||||
val len = Math.max(accum.size, row.size)
|
||||
row.map { _.printString.length }
|
||||
|
@ -22,7 +22,7 @@ case class Result(header: Seq[(String, Type)], rows: Seq[Seq[Constant]])
|
|||
.mkString(" | ")
|
||||
|
||||
override def toString =
|
||||
" " + formatRow(header.map { _._1 }) + "\n" +
|
||||
" " + formatRow(header.map { _._1.name }) + "\n" +
|
||||
"-" + widths.map { w => "-"*w }.mkString("-+-") + "-\n" +
|
||||
rows.map { row => " " + formatRow(row.map { _.printString }) }
|
||||
.mkString("\n")
|
||||
|
|
|
@ -2,12 +2,12 @@ package net.okennedy.hackdb
|
|||
|
||||
import scala.io.Source
|
||||
|
||||
case class Table(schema: Seq[(String, Type)], data: Seq[Seq[Constant]], name: String = "unnamed_table") extends Plan
|
||||
case class Table(schema: Seq[(Variable, Type)], data: Seq[Seq[Constant]], name: String = "unnamed_table") extends Plan
|
||||
{
|
||||
def read: Iterator[Seq[Constant]] =
|
||||
data.iterator
|
||||
|
||||
override def planParameterString: String = name + " => " + schema.map { (name, t) => s"$name: $t" }.mkString(", ")
|
||||
override def planParameterString: String = name + " => " + schema.map { (name, t) => s"${name.name}: $t" }.mkString(", ")
|
||||
|
||||
def children: Seq[Plan] = Seq()
|
||||
def rebuild(c: Seq[Plan]): Plan = this
|
||||
|
@ -22,7 +22,7 @@ object Table
|
|||
.getLines()
|
||||
.map { _.split(",") }
|
||||
|
||||
val header = contents.next()
|
||||
val header = contents.next().map { Variable(_) }
|
||||
val data = contents.toSeq
|
||||
|
||||
val types = data.foldLeft(header.map { _ => Type.Int:Type }) {
|
||||
|
|
Loading…
Reference in New Issue