[SPARK-30503][ML] OnlineLDAOptimizer does not handle persistance correctly

### What changes were proposed in this pull request?
unpersist graph outside checkpointer, like what Pregel does

### Why are the changes needed?
Shown in [SPARK-30503](https://issues.apache.org/jira/browse/SPARK-30503), intermediate edges are not unpersisted

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
existing testsuites and manual test

Closes #27261 from zhengruifeng/lda_checkpointer.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
This commit is contained in:
zhengruifeng 2020-01-22 08:24:11 -06:00 committed by Sean Owen
parent b8cb52a8a7
commit 1c46bd9e60

View file

@ -142,7 +142,7 @@ final class EMLDAOptimizer extends LDAOptimizer {
// For each document, create an edge (Document -> Term) for each unique term in the document.
val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, termCounts: Vector) =>
// Add edges for terms with non-zero counts.
termCounts.asBreeze.activeIterator.filter(_._2 != 0.0).map { case (term, cnt) =>
termCounts.nonZeroIterator.map { case (term, cnt) =>
Edge(docID, term2index(term), cnt)
}
}
@ -211,11 +211,14 @@ final class EMLDAOptimizer extends LDAOptimizer {
val docTopicDistributions: VertexRDD[TopicCounts] =
graph.aggregateMessages[(Boolean, TopicCounts)](sendMsg, mergeMsg)
.mapValues(_._2)
val prevGraph = graph
// Update the vertex descriptors with the new counts.
val newGraph = Graph(docTopicDistributions, graph.edges)
graph = newGraph
graphCheckpointer.update(newGraph)
globalTopicTotals = computeGlobalTopicTotals()
prevGraph.unpersistVertices()
prevGraph.edges.unpersist()
this
}