[SPARK-36420][GRAPHX] Use isEmpty to improve performance in Pregel‘s superstep

### What changes were proposed in this pull request?

When recived active-messages in Pregel,  we only need an action operator here and active-messages are not empty, so we don’t need to use count, it’s better to use isEmpty.

### Why are the changes needed?

In the case of 10+ billions of vertices and edges, it will make the speed faster.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass existed tests.

Closes #33648 from StefanXiepj/SPARK-36420.

Authored-by: xiepengjie <xiepengjie@kuaishou.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
xiepengjie 2021-08-06 12:20:47 +09:00 committed by Hyukjin Kwon
parent cd070f1b9c
commit 8230a2a700

View file

@ -137,12 +137,12 @@ object Pregel extends Logging {
val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)](
checkpointInterval, graph.vertices.sparkContext)
messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
var activeMessages = messages.count()
var isActiveMessagesNonEmpty = !messages.isEmpty()
// Loop
var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
while (isActiveMessagesNonEmpty && i < maxIterations) {
// Receive the messages and update the vertices.
prevG = g
g = g.joinVertices(messages)(vprog)
@ -158,7 +158,7 @@ object Pregel extends Logging {
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
// and the vertices of g).
messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
activeMessages = messages.count()
isActiveMessagesNonEmpty = !messages.isEmpty()
logInfo("Pregel finished iteration " + i)