From d161caa6eb42b2c399b7f0878bd5aea3978febcf Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Fri, 13 Dec 2013 16:35:37 -0800 Subject: [PATCH] Expose srcStale and dstStale --- .../src/main/scala/org/apache/spark/graph/EdgeTriplet.scala | 3 +++ .../main/scala/org/apache/spark/graph/impl/GraphImpl.scala | 6 ++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala index 28efb5d1c1..76768489ee 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala @@ -28,6 +28,9 @@ class EdgeTriplet[VD, ED] extends Edge[ED] { */ var dstAttr: VD = _ //nullValue[VD] + var srcStale: Boolean = false + var dstStale: Boolean = false + /** * Set the edge properties of this triplet. */ diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 7816a1d5e8..8ba90fd9bb 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -226,8 +226,10 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val et = new EdgeTriplet[VD, ED] val filteredEdges = edgePartition.iterator.flatMap { e => // Ensure that the edge meets the requirements of skipStaleSrc and skipStaleDst - val skipDueToSrc = skipStaleSrc && vertexPartition.isStale(e.srcId) - val skipDueToDst = skipStaleDst && vertexPartition.isStale(e.dstId) + et.srcStale = vertexPartition.isStale(e.srcId) + et.dstStale = vertexPartition.isStale(e.dstId) + val skipDueToSrc = skipStaleSrc && et.srcStale + val skipDueToDst = skipStaleDst && et.dstStale if (!skipDueToSrc && !skipDueToDst) { et.set(e) if (mapUsesSrcAttr) {