SPARK-1786: Edge Partition Serialization
This appears to address the issue with edge partition serialization. The solution appears to be just registering the `PrimitiveKeyOpenHashMap`. However I noticed that we appear to have forked that code in GraphX but retained the same name (which is confusing). I also renamed our local copy to `GraphXPrimitiveKeyOpenHashMap`. We should consider dropping that and using the one in Spark if possible. Author: Ankur Dave <ankurdave@gmail.com> Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> Closes #724 from jegonzal/edge_partition_serialization and squashes the following commits: b0a525a [Ankur Dave] Disable reference tracking to fix serialization test bb7f548 [Ankur Dave] Add failing test for EdgePartition Kryo serialization 67dac22 [Joseph E. Gonzalez] Making EdgePartition serializable.
This commit is contained in:
parent
f938a155b2
commit
a6b02fb748
|
@ -24,6 +24,9 @@ import org.apache.spark.util.BoundedPriorityQueue
|
||||||
import org.apache.spark.util.collection.BitSet
|
import org.apache.spark.util.collection.BitSet
|
||||||
|
|
||||||
import org.apache.spark.graphx.impl._
|
import org.apache.spark.graphx.impl._
|
||||||
|
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
|
||||||
|
import org.apache.spark.util.collection.OpenHashSet
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers GraphX classes with Kryo for improved performance.
|
* Registers GraphX classes with Kryo for improved performance.
|
||||||
|
@ -43,8 +46,8 @@ class GraphKryoRegistrator extends KryoRegistrator {
|
||||||
kryo.register(classOf[PartitionStrategy])
|
kryo.register(classOf[PartitionStrategy])
|
||||||
kryo.register(classOf[BoundedPriorityQueue[Object]])
|
kryo.register(classOf[BoundedPriorityQueue[Object]])
|
||||||
kryo.register(classOf[EdgeDirection])
|
kryo.register(classOf[EdgeDirection])
|
||||||
|
kryo.register(classOf[GraphXPrimitiveKeyOpenHashMap[VertexId, Int]])
|
||||||
// This avoids a large number of hash table lookups.
|
kryo.register(classOf[OpenHashSet[Int]])
|
||||||
kryo.setReferences(false)
|
kryo.register(classOf[OpenHashSet[Long]])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
|
||||||
import scala.reflect.{classTag, ClassTag}
|
import scala.reflect.{classTag, ClassTag}
|
||||||
|
|
||||||
import org.apache.spark.graphx._
|
import org.apache.spark.graphx._
|
||||||
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
|
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A collection of edges stored in columnar format, along with any vertex attributes referenced. The
|
* A collection of edges stored in columnar format, along with any vertex attributes referenced. The
|
||||||
|
@ -42,12 +42,12 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
|
||||||
private[graphx]
|
private[graphx]
|
||||||
class EdgePartition[
|
class EdgePartition[
|
||||||
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
|
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
|
||||||
@transient val srcIds: Array[VertexId],
|
val srcIds: Array[VertexId] = null,
|
||||||
@transient val dstIds: Array[VertexId],
|
val dstIds: Array[VertexId] = null,
|
||||||
@transient val data: Array[ED],
|
val data: Array[ED] = null,
|
||||||
@transient val index: PrimitiveKeyOpenHashMap[VertexId, Int],
|
val index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
|
||||||
@transient val vertices: VertexPartition[VD],
|
val vertices: VertexPartition[VD] = null,
|
||||||
@transient val activeSet: Option[VertexSet] = None
|
val activeSet: Option[VertexSet] = None
|
||||||
) extends Serializable {
|
) extends Serializable {
|
||||||
|
|
||||||
/** Return a new `EdgePartition` with the specified edge data. */
|
/** Return a new `EdgePartition` with the specified edge data. */
|
||||||
|
|
|
@ -23,7 +23,7 @@ import scala.util.Sorting
|
||||||
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
|
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
|
||||||
|
|
||||||
import org.apache.spark.graphx._
|
import org.apache.spark.graphx._
|
||||||
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
|
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
|
||||||
|
|
||||||
private[graphx]
|
private[graphx]
|
||||||
class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
|
class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
|
||||||
|
@ -41,7 +41,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
|
||||||
val srcIds = new Array[VertexId](edgeArray.size)
|
val srcIds = new Array[VertexId](edgeArray.size)
|
||||||
val dstIds = new Array[VertexId](edgeArray.size)
|
val dstIds = new Array[VertexId](edgeArray.size)
|
||||||
val data = new Array[ED](edgeArray.size)
|
val data = new Array[ED](edgeArray.size)
|
||||||
val index = new PrimitiveKeyOpenHashMap[VertexId, Int]
|
val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
|
||||||
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
|
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
|
||||||
// adding them to the index
|
// adding them to the index
|
||||||
if (edgeArray.length > 0) {
|
if (edgeArray.length > 0) {
|
||||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
|
|
||||||
import org.apache.spark.graphx._
|
import org.apache.spark.graphx._
|
||||||
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
|
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The Iterator type returned when constructing edge triplets. This could be an anonymous class in
|
* The Iterator type returned when constructing edge triplets. This could be an anonymous class in
|
||||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.spark.rdd.ShuffledRDD
|
||||||
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
|
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
|
||||||
|
|
||||||
import org.apache.spark.graphx._
|
import org.apache.spark.graphx._
|
||||||
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
|
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
|
* A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
|
||||||
|
@ -69,7 +69,7 @@ object RoutingTablePartition {
|
||||||
: Iterator[RoutingTableMessage] = {
|
: Iterator[RoutingTableMessage] = {
|
||||||
// Determine which positions each vertex id appears in using a map where the low 2 bits
|
// Determine which positions each vertex id appears in using a map where the low 2 bits
|
||||||
// represent src and dst
|
// represent src and dst
|
||||||
val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
|
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
|
||||||
edgePartition.srcIds.iterator.foreach { srcId =>
|
edgePartition.srcIds.iterator.foreach { srcId =>
|
||||||
map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
|
map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ import scala.reflect.ClassTag
|
||||||
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
|
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
|
||||||
|
|
||||||
import org.apache.spark.graphx._
|
import org.apache.spark.graphx._
|
||||||
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
|
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
|
||||||
|
|
||||||
/** Stores vertex attributes to ship to an edge partition. */
|
/** Stores vertex attributes to ship to an edge partition. */
|
||||||
private[graphx]
|
private[graphx]
|
||||||
|
|
|
@ -22,7 +22,7 @@ import scala.reflect.ClassTag
|
||||||
import org.apache.spark.util.collection.BitSet
|
import org.apache.spark.util.collection.BitSet
|
||||||
|
|
||||||
import org.apache.spark.graphx._
|
import org.apache.spark.graphx._
|
||||||
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
|
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
|
||||||
|
|
||||||
private[graphx] object VertexPartition {
|
private[graphx] object VertexPartition {
|
||||||
/** Construct a `VertexPartition` from the given vertices. */
|
/** Construct a `VertexPartition` from the given vertices. */
|
||||||
|
|
|
@ -23,7 +23,7 @@ import scala.reflect.ClassTag
|
||||||
import org.apache.spark.util.collection.BitSet
|
import org.apache.spark.util.collection.BitSet
|
||||||
|
|
||||||
import org.apache.spark.graphx._
|
import org.apache.spark.graphx._
|
||||||
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
|
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
|
||||||
|
|
||||||
private[graphx] object VertexPartitionBase {
|
private[graphx] object VertexPartitionBase {
|
||||||
/**
|
/**
|
||||||
|
@ -32,7 +32,7 @@ private[graphx] object VertexPartitionBase {
|
||||||
*/
|
*/
|
||||||
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
|
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
|
||||||
: (VertexIdToIndexMap, Array[VD], BitSet) = {
|
: (VertexIdToIndexMap, Array[VD], BitSet) = {
|
||||||
val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
|
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
|
||||||
iter.foreach { pair =>
|
iter.foreach { pair =>
|
||||||
map(pair._1) = pair._2
|
map(pair._1) = pair._2
|
||||||
}
|
}
|
||||||
|
@ -45,7 +45,7 @@ private[graphx] object VertexPartitionBase {
|
||||||
*/
|
*/
|
||||||
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
|
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
|
||||||
: (VertexIdToIndexMap, Array[VD], BitSet) = {
|
: (VertexIdToIndexMap, Array[VD], BitSet) = {
|
||||||
val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
|
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
|
||||||
iter.foreach { pair =>
|
iter.foreach { pair =>
|
||||||
map.setMerge(pair._1, pair._2, mergeFunc)
|
map.setMerge(pair._1, pair._2, mergeFunc)
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.spark.Logging
|
||||||
import org.apache.spark.util.collection.BitSet
|
import org.apache.spark.util.collection.BitSet
|
||||||
|
|
||||||
import org.apache.spark.graphx._
|
import org.apache.spark.graphx._
|
||||||
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
|
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An class containing additional operations for subclasses of VertexPartitionBase that provide
|
* An class containing additional operations for subclasses of VertexPartitionBase that provide
|
||||||
|
@ -224,7 +224,7 @@ private[graphx] abstract class VertexPartitionBaseOps
|
||||||
* Construct a new VertexPartition whose index contains only the vertices in the mask.
|
* Construct a new VertexPartition whose index contains only the vertices in the mask.
|
||||||
*/
|
*/
|
||||||
def reindex(): Self[VD] = {
|
def reindex(): Self[VD] = {
|
||||||
val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
|
val hashMap = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
|
||||||
val arbitraryMerge = (a: VD, b: VD) => a
|
val arbitraryMerge = (a: VD, b: VD) => a
|
||||||
for ((k, v) <- self.iterator) {
|
for ((k, v) <- self.iterator) {
|
||||||
hashMap.setMerge(k, v, arbitraryMerge)
|
hashMap.setMerge(k, v, arbitraryMerge)
|
||||||
|
|
|
@ -29,7 +29,7 @@ import scala.reflect._
|
||||||
* Under the hood, it uses our OpenHashSet implementation.
|
* Under the hood, it uses our OpenHashSet implementation.
|
||||||
*/
|
*/
|
||||||
private[graphx]
|
private[graphx]
|
||||||
class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
|
class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
|
||||||
@specialized(Long, Int, Double) V: ClassTag](
|
@specialized(Long, Int, Double) V: ClassTag](
|
||||||
val keySet: OpenHashSet[K], var _values: Array[V])
|
val keySet: OpenHashSet[K], var _values: Array[V])
|
||||||
extends Iterable[(K, V)]
|
extends Iterable[(K, V)]
|
|
@ -22,6 +22,9 @@ import scala.util.Random
|
||||||
|
|
||||||
import org.scalatest.FunSuite
|
import org.scalatest.FunSuite
|
||||||
|
|
||||||
|
import org.apache.spark.SparkConf
|
||||||
|
import org.apache.spark.serializer.KryoSerializer
|
||||||
|
|
||||||
import org.apache.spark.graphx._
|
import org.apache.spark.graphx._
|
||||||
|
|
||||||
class EdgePartitionSuite extends FunSuite {
|
class EdgePartitionSuite extends FunSuite {
|
||||||
|
@ -120,4 +123,19 @@ class EdgePartitionSuite extends FunSuite {
|
||||||
assert(!ep.isActive(-1))
|
assert(!ep.isActive(-1))
|
||||||
assert(ep.numActives == Some(2))
|
assert(ep.numActives == Some(2))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Kryo serialization") {
|
||||||
|
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
|
||||||
|
val a: EdgePartition[Int, Int] = makeEdgePartition(aList)
|
||||||
|
val conf = new SparkConf()
|
||||||
|
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||||
|
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
|
||||||
|
val s = new KryoSerializer(conf).newInstance()
|
||||||
|
val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
|
||||||
|
assert(aSer.srcIds.toList === a.srcIds.toList)
|
||||||
|
assert(aSer.dstIds.toList === a.dstIds.toList)
|
||||||
|
assert(aSer.data.toList === a.data.toList)
|
||||||
|
assert(aSer.index != null)
|
||||||
|
assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue