Do not re-use objects in the EdgePartition/EdgeTriplet iterators.
This avoids a silent data corruption issue (https://spark-project.atlassian.net/browse/SPARK-1188) and has no performance impact by my measurements. It also simplifies the code. As far as I can tell the object re-use was nothing but premature optimization. I did actual benchmarks for all the included changes, and there is no performance difference. I am not sure where to put the benchmarks. Does Spark not have a benchmark suite? This is an example benchmark I did: test("benchmark") { val builder = new EdgePartitionBuilder[Int] for (i <- (1 to 10000000)) { builder.add(i.toLong, i.toLong, i) } val p = builder.toEdgePartition p.map(_.attr + 1).iterator.toList } It ran for 10 seconds both before and after this change. Author: Daniel Darabos <darabos.daniel@gmail.com> Closes #276 from darabos/spark-1188 and squashes the following commits: 574302b [Daniel Darabos] Restore "manual" copying in EdgePartition.map(Iterator). Add comment to discourage novices like myself from trying to simplify the code. 4117a64 [Daniel Darabos] Revert EdgePartitionSuite. 4955697 [Daniel Darabos] Create a copy of the Edge objects in EdgeRDD.compute(). This avoids exposing the object re-use, while still enables the more efficient behavior for internal code. 4ec77f8 [Daniel Darabos] Add comments about object re-use to the affected functions. 2da5e87 [Daniel Darabos] Restore object re-use in EdgePartition. 0182f2b [Daniel Darabos] Do not re-use objects in the EdgePartition/EdgeTriplet iterators. This avoids a silent data corruption issue (SPARK-1188) and has no performance impact in my measurements. It also simplifies the code. c55f52f [Daniel Darabos] Tests that reproduce the problems from SPARK-1188.
This commit is contained in:
parent
de8eefa804
commit
78236334e4
|
@ -45,7 +45,8 @@ class EdgeRDD[@specialized ED: ClassTag](
|
||||||
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
|
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
|
||||||
|
|
||||||
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
|
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
|
||||||
firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator
|
val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
|
||||||
|
p.next._2.iterator.map(_.copy())
|
||||||
}
|
}
|
||||||
|
|
||||||
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
|
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
|
||||||
|
|
|
@ -56,6 +56,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
|
||||||
* Construct a new edge partition by applying the function f to all
|
* Construct a new edge partition by applying the function f to all
|
||||||
* edges in this partition.
|
* edges in this partition.
|
||||||
*
|
*
|
||||||
|
* Be careful not to keep references to the objects passed to `f`.
|
||||||
|
* To improve GC performance the same object is re-used for each call.
|
||||||
|
*
|
||||||
* @param f a function from an edge to a new attribute
|
* @param f a function from an edge to a new attribute
|
||||||
* @tparam ED2 the type of the new attribute
|
* @tparam ED2 the type of the new attribute
|
||||||
* @return a new edge partition with the result of the function `f`
|
* @return a new edge partition with the result of the function `f`
|
||||||
|
@ -84,12 +87,12 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
|
||||||
* order of the edges returned by `EdgePartition.iterator` and
|
* order of the edges returned by `EdgePartition.iterator` and
|
||||||
* should return attributes equal to the number of edges.
|
* should return attributes equal to the number of edges.
|
||||||
*
|
*
|
||||||
* @param f a function from an edge to a new attribute
|
* @param iter an iterator for the new attribute values
|
||||||
* @tparam ED2 the type of the new attribute
|
* @tparam ED2 the type of the new attribute
|
||||||
* @return a new edge partition with the result of the function `f`
|
* @return a new edge partition with the attribute values replaced
|
||||||
* applied to each edge
|
|
||||||
*/
|
*/
|
||||||
def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = {
|
def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = {
|
||||||
|
// Faster than iter.toArray, because the expected size is known.
|
||||||
val newData = new Array[ED2](data.size)
|
val newData = new Array[ED2](data.size)
|
||||||
var i = 0
|
var i = 0
|
||||||
while (iter.hasNext) {
|
while (iter.hasNext) {
|
||||||
|
@ -188,6 +191,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
|
||||||
/**
|
/**
|
||||||
* Get an iterator over the edges in this partition.
|
* Get an iterator over the edges in this partition.
|
||||||
*
|
*
|
||||||
|
* Be careful not to keep references to the objects from this iterator.
|
||||||
|
* To improve GC performance the same object is re-used in `next()`.
|
||||||
|
*
|
||||||
* @return an iterator over edges in the partition
|
* @return an iterator over edges in the partition
|
||||||
*/
|
*/
|
||||||
def iterator = new Iterator[Edge[ED]] {
|
def iterator = new Iterator[Edge[ED]] {
|
||||||
|
@ -216,6 +222,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
|
||||||
/**
|
/**
|
||||||
* Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
|
* Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
|
||||||
* cluster must start at position `index`.
|
* cluster must start at position `index`.
|
||||||
|
*
|
||||||
|
* Be careful not to keep references to the objects from this iterator. To improve GC performance
|
||||||
|
* the same object is re-used in `next()`.
|
||||||
*/
|
*/
|
||||||
private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] {
|
private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] {
|
||||||
private[this] val edge = new Edge[ED]
|
private[this] val edge = new Edge[ED]
|
||||||
|
|
|
@ -37,20 +37,15 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
|
||||||
// Current position in the array.
|
// Current position in the array.
|
||||||
private var pos = 0
|
private var pos = 0
|
||||||
|
|
||||||
// A triplet object that this iterator.next() call returns. We reuse this object to avoid
|
|
||||||
// allocating too many temporary Java objects.
|
|
||||||
private val triplet = new EdgeTriplet[VD, ED]
|
|
||||||
|
|
||||||
private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray)
|
private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray)
|
||||||
|
|
||||||
override def hasNext: Boolean = pos < edgePartition.size
|
override def hasNext: Boolean = pos < edgePartition.size
|
||||||
|
|
||||||
override def next() = {
|
override def next() = {
|
||||||
|
val triplet = new EdgeTriplet[VD, ED]
|
||||||
triplet.srcId = edgePartition.srcIds(pos)
|
triplet.srcId = edgePartition.srcIds(pos)
|
||||||
// assert(vmap.containsKey(e.src.id))
|
|
||||||
triplet.srcAttr = vmap(triplet.srcId)
|
triplet.srcAttr = vmap(triplet.srcId)
|
||||||
triplet.dstId = edgePartition.dstIds(pos)
|
triplet.dstId = edgePartition.dstIds(pos)
|
||||||
// assert(vmap.containsKey(e.dst.id))
|
|
||||||
triplet.dstAttr = vmap(triplet.dstId)
|
triplet.dstAttr = vmap(triplet.dstId)
|
||||||
triplet.attr = edgePartition.data(pos)
|
triplet.attr = edgePartition.data(pos)
|
||||||
pos += 1
|
pos += 1
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.spark.graphx.impl
|
||||||
|
|
||||||
|
import scala.reflect.ClassTag
|
||||||
|
import scala.util.Random
|
||||||
|
|
||||||
|
import org.scalatest.FunSuite
|
||||||
|
|
||||||
|
import org.apache.spark.graphx._
|
||||||
|
|
||||||
|
class EdgeTripletIteratorSuite extends FunSuite {
|
||||||
|
test("iterator.toList") {
|
||||||
|
val builder = new EdgePartitionBuilder[Int]
|
||||||
|
builder.add(1, 2, 0)
|
||||||
|
builder.add(1, 3, 0)
|
||||||
|
builder.add(1, 4, 0)
|
||||||
|
val vidmap = new VertexIdToIndexMap
|
||||||
|
vidmap.add(1)
|
||||||
|
vidmap.add(2)
|
||||||
|
vidmap.add(3)
|
||||||
|
vidmap.add(4)
|
||||||
|
val vs = Array.fill(vidmap.capacity)(0)
|
||||||
|
val iter = new EdgeTripletIterator[Int, Int](vidmap, vs, builder.toEdgePartition)
|
||||||
|
val result = iter.toList.map(et => (et.srcId, et.dstId))
|
||||||
|
assert(result === Seq((1, 2), (1, 3), (1, 4)))
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue