[SPARK-16478] graphX (added graph caching in strongly connected components)

## What changes were proposed in this pull request?

I added caching in every iteration for sccGraph that is returned in strongly connected components. Without this cache strongly connected components returned graph that needed to be computed from scratch when some intermediary caches didn't existed anymore.

## How was this patch tested?
I tested it by running code similar to the one  [on databrics](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/4889410027417133/3634650767364730/3117184429335832/latest.html). Basically I generated large graph  and computed strongly connected components with changed code, than simply run count on vertices and edges. Count after this update takes few seconds instead 20 minutes.

# statement
contribution is my original work and I license the work to the project under the project's open source license.

Author: Michał Wesołowski <michal.wesolowski@bzwbk.pl>

Closes #14137 from wesolowskim/SPARK-16478.
This commit is contained in:
Michał Wesołowski 2016-07-19 12:18:42 +01:00 committed by Sean Owen
parent 6c4b9f4be6
commit 5d92326be7

View file

@ -44,6 +44,9 @@ object StronglyConnectedComponents {
// graph we are going to work with in our iterations
var sccWorkGraph = graph.mapVertices { case (vid, _) => (vid, false) }.cache()
// helper variables to unpersist cached graphs
var prevSccGraph = sccGraph
var numVertices = sccWorkGraph.numVertices
var iter = 0
while (sccWorkGraph.numVertices > 0 && iter < numIter) {
@ -64,48 +67,59 @@ object StronglyConnectedComponents {
// write values to sccGraph
sccGraph = sccGraph.outerJoinVertices(finalVertices) {
(vid, scc, opt) => opt.getOrElse(scc)
}
}.cache()
// materialize vertices and edges
sccGraph.vertices.count()
sccGraph.edges.count()
// sccGraph materialized so, unpersist can be done on previous
prevSccGraph.unpersist(blocking = false)
prevSccGraph = sccGraph
// only keep vertices that are not final
sccWorkGraph = sccWorkGraph.subgraph(vpred = (vid, data) => !data._2).cache()
} while (sccWorkGraph.numVertices < numVertices)
sccWorkGraph = sccWorkGraph.mapVertices{ case (vid, (color, isFinal)) => (vid, isFinal) }
// if iter < numIter at this point sccGraph that is returned
// will not be recomputed and pregel executions are pointless
if (iter < numIter) {
sccWorkGraph = sccWorkGraph.mapVertices { case (vid, (color, isFinal)) => (vid, isFinal) }
// collect min of all my neighbor's scc values, update if it's smaller than mine
// then notify any neighbors with scc values larger than mine
sccWorkGraph = Pregel[(VertexId, Boolean), ED, VertexId](
sccWorkGraph, Long.MaxValue, activeDirection = EdgeDirection.Out)(
(vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2),
e => {
if (e.srcAttr._1 < e.dstAttr._1) {
Iterator((e.dstId, e.srcAttr._1))
} else {
Iterator()
}
},
(vid1, vid2) => math.min(vid1, vid2))
// collect min of all my neighbor's scc values, update if it's smaller than mine
// then notify any neighbors with scc values larger than mine
sccWorkGraph = Pregel[(VertexId, Boolean), ED, VertexId](
sccWorkGraph, Long.MaxValue, activeDirection = EdgeDirection.Out)(
(vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2),
e => {
if (e.srcAttr._1 < e.dstAttr._1) {
Iterator((e.dstId, e.srcAttr._1))
} else {
Iterator()
}
},
(vid1, vid2) => math.min(vid1, vid2))
// start at root of SCCs. Traverse values in reverse, notify all my neighbors
// do not propagate if colors do not match!
sccWorkGraph = Pregel[(VertexId, Boolean), ED, Boolean](
sccWorkGraph, false, activeDirection = EdgeDirection.In)(
// vertex is final if it is the root of a color
// or it has the same color as a neighbor that is final
(vid, myScc, existsSameColorFinalNeighbor) => {
val isColorRoot = vid == myScc._1
(myScc._1, myScc._2 || isColorRoot || existsSameColorFinalNeighbor)
},
// activate neighbor if they are not final, you are, and you have the same color
e => {
val sameColor = e.dstAttr._1 == e.srcAttr._1
val onlyDstIsFinal = e.dstAttr._2 && !e.srcAttr._2
if (sameColor && onlyDstIsFinal) {
Iterator((e.srcId, e.dstAttr._2))
} else {
Iterator()
}
},
(final1, final2) => final1 || final2)
// start at root of SCCs. Traverse values in reverse, notify all my neighbors
// do not propagate if colors do not match!
sccWorkGraph = Pregel[(VertexId, Boolean), ED, Boolean](
sccWorkGraph, false, activeDirection = EdgeDirection.In)(
// vertex is final if it is the root of a color
// or it has the same color as a neighbor that is final
(vid, myScc, existsSameColorFinalNeighbor) => {
val isColorRoot = vid == myScc._1
(myScc._1, myScc._2 || isColorRoot || existsSameColorFinalNeighbor)
},
// activate neighbor if they are not final, you are, and you have the same color
e => {
val sameColor = e.dstAttr._1 == e.srcAttr._1
val onlyDstIsFinal = e.dstAttr._2 && !e.srcAttr._2
if (sameColor && onlyDstIsFinal) {
Iterator((e.srcId, e.dstAttr._2))
} else {
Iterator()
}
},
(final1, final2) => final1 || final2)
}
}
sccGraph
}