[SPARK-3263][GraphX] Fix changes made to GraphGenerator.logNormalGraph in PR #720
PR #720 made multiple changes to GraphGenerator.logNormalGraph including:
* Replacing the call to functions for generating random vertices and edges with in-line implementations with different equations. Based on reading the Pregel paper, I believe the in-line functions are incorrect.
* Hard-coding of RNG seeds so that method now generates the same graph for a given number of vertices, edges, mu, and sigma -- user is not able to override seed or specify that seed should be randomly generated.
* Backwards-incompatible change to logNormalGraph signature with introduction of new required parameter.
* Failed to update scala docs and programming guide for API changes
* Added a Synthetic Benchmark in the examples.
This PR:
* Removes the in-line calls and calls original vertex / edge generation functions again
* Adds an optional seed parameter for deterministic behavior (when desired)
* Keeps the number of partitions parameter that was added.
* Keeps compatibility with the synthetic benchmark example
* Maintains backwards-compatible API
Author: RJ Nowling <rnowling@gmail.com>
Author: Ankur Dave <ankurdave@gmail.com>
Closes #2168 from rnowling/graphgenrand and squashes the following commits:
f1cd79f [Ankur Dave] Style fixes
e11918e [RJ Nowling] Fix bad comparisons in unit tests
785ac70 [RJ Nowling] Fix style error
c70868d [RJ Nowling] Fix logNormalGraph scala doc for seed
41fd1f8 [RJ Nowling] Fix logNormalGraph scala doc for seed
799f002 [RJ Nowling] Added test for different seeds for sampleLogNormal
43949ad [RJ Nowling] Added test for different seeds for generateRandomEdges
2faf75f [RJ Nowling] Added unit test for logNormalGraph
82f22397 [RJ Nowling] Add unit test for sampleLogNormal
b99cba9 [RJ Nowling] Make sampleLogNormal private to Spark (vs private) for unit testing
6803da1 [RJ Nowling] Add GraphGeneratorsSuite with test for generateRandomEdges
1c8fc44 [RJ Nowling] Connected components part of SynthBenchmark was failing to call count on RDD before printing
dfbb6dd [RJ Nowling] Fix parameter name in SynthBenchmark docs
b5eeb80 [RJ Nowling] Add optional seed parameter to SynthBenchmark and set default to randomly generate a seed
1ff8d30 [RJ Nowling] Fix bug in generateRandomEdges where numVertices instead of numEdges was used to control number of edges to generate
98bb73c [RJ Nowling] Add documentation for logNormalGraph parameters
d40141a [RJ Nowling] Fix style error
684804d [RJ Nowling] revert PR #720 which introduce errors in logNormalGraph and messed up seeding of RNGs. Add user-defined optional seed for deterministic behavior
c183136 [RJ Nowling] Fix to deterministic GraphGenerators.logNormalGraph that allows generating graphs randomly using optional seed.
015010c [RJ Nowling] Fixed GraphGenerator logNormalGraph API to make backward-incompatible change in commit 894ecde04
This commit is contained in:
parent
6481d27425
commit
e5d376801d
|
@ -38,12 +38,13 @@ object SynthBenchmark {
|
|||
* Options:
|
||||
* -app "pagerank" or "cc" for pagerank or connected components. (Default: pagerank)
|
||||
* -niters the number of iterations of pagerank to use (Default: 10)
|
||||
* -numVertices the number of vertices in the graph (Default: 1000000)
|
||||
* -nverts the number of vertices in the graph (Default: 1000000)
|
||||
* -numEPart the number of edge partitions in the graph (Default: number of cores)
|
||||
* -partStrategy the graph partitioning strategy to use
|
||||
* -mu the mean parameter for the log-normal graph (Default: 4.0)
|
||||
* -sigma the stdev parameter for the log-normal graph (Default: 1.3)
|
||||
* -degFile the local file to save the degree information (Default: Empty)
|
||||
* -seed seed to use for RNGs (Default: -1, picks seed randomly)
|
||||
*/
|
||||
def main(args: Array[String]) {
|
||||
val options = args.map {
|
||||
|
@ -62,6 +63,7 @@ object SynthBenchmark {
|
|||
var mu: Double = 4.0
|
||||
var sigma: Double = 1.3
|
||||
var degFile: String = ""
|
||||
var seed: Int = -1
|
||||
|
||||
options.foreach {
|
||||
case ("app", v) => app = v
|
||||
|
@ -72,6 +74,7 @@ object SynthBenchmark {
|
|||
case ("mu", v) => mu = v.toDouble
|
||||
case ("sigma", v) => sigma = v.toDouble
|
||||
case ("degFile", v) => degFile = v
|
||||
case ("seed", v) => seed = v.toInt
|
||||
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
|
||||
}
|
||||
|
||||
|
@ -85,7 +88,7 @@ object SynthBenchmark {
|
|||
// Create the graph
|
||||
println(s"Creating graph...")
|
||||
val unpartitionedGraph = GraphGenerators.logNormalGraph(sc, numVertices,
|
||||
numEPart.getOrElse(sc.defaultParallelism), mu, sigma)
|
||||
numEPart.getOrElse(sc.defaultParallelism), mu, sigma, seed)
|
||||
// Repartition the graph
|
||||
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)).cache()
|
||||
|
||||
|
@ -113,7 +116,7 @@ object SynthBenchmark {
|
|||
println(s"Total PageRank = $totalPR")
|
||||
} else if (app == "cc") {
|
||||
println("Running Connected Components")
|
||||
val numComponents = graph.connectedComponents.vertices.map(_._2).distinct()
|
||||
val numComponents = graph.connectedComponents.vertices.map(_._2).distinct().count()
|
||||
println(s"Number of components = $numComponents")
|
||||
}
|
||||
val runTime = System.currentTimeMillis() - startTime
|
||||
|
|
|
@ -40,7 +40,7 @@ object GraphGenerators {
|
|||
val RMATd = 0.25
|
||||
|
||||
/**
|
||||
* Generate a graph whose vertex out degree is log normal.
|
||||
* Generate a graph whose vertex out degree distribution is log normal.
|
||||
*
|
||||
* The default values for mu and sigma are taken from the Pregel paper:
|
||||
*
|
||||
|
@ -48,33 +48,36 @@ object GraphGenerators {
|
|||
* Ilan Horn, Naty Leiser, and Grzegorz Czajkowski. 2010.
|
||||
* Pregel: a system for large-scale graph processing. SIGMOD '10.
|
||||
*
|
||||
* @param sc
|
||||
* @param numVertices
|
||||
* @param mu
|
||||
* @param sigma
|
||||
* @return
|
||||
* If the seed is -1 (default), a random seed is chosen. Otherwise, use
|
||||
* the user-specified seed.
|
||||
*
|
||||
* @param sc Spark Context
|
||||
* @param numVertices number of vertices in generated graph
|
||||
* @param numEParts (optional) number of partitions
|
||||
* @param mu (optional, default: 4.0) mean of out-degree distribution
|
||||
* @param sigma (optional, default: 1.3) standard deviation of out-degree distribution
|
||||
* @param seed (optional, default: -1) seed for RNGs, -1 causes a random seed to be chosen
|
||||
* @return Graph object
|
||||
*/
|
||||
def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int,
|
||||
mu: Double = 4.0, sigma: Double = 1.3): Graph[Long, Int] = {
|
||||
val vertices = sc.parallelize(0 until numVertices, numEParts).map { src =>
|
||||
// Initialize the random number generator with the source vertex id
|
||||
val rand = new Random(src)
|
||||
val degree = math.min(numVertices.toLong, math.exp(rand.nextGaussian() * sigma + mu).toLong)
|
||||
(src.toLong, degree)
|
||||
def logNormalGraph(
|
||||
sc: SparkContext, numVertices: Int, numEParts: Int = 0, mu: Double = 4.0,
|
||||
sigma: Double = 1.3, seed: Long = -1): Graph[Long, Int] = {
|
||||
|
||||
val evalNumEParts = if (numEParts == 0) sc.defaultParallelism else numEParts
|
||||
|
||||
// Enable deterministic seeding
|
||||
val seedRand = if (seed == -1) new Random() else new Random(seed)
|
||||
val seed1 = seedRand.nextInt()
|
||||
val seed2 = seedRand.nextInt()
|
||||
|
||||
val vertices: RDD[(VertexId, Long)] = sc.parallelize(0 until numVertices, evalNumEParts).map {
|
||||
src => (src, sampleLogNormal(mu, sigma, numVertices, seed = (seed1 ^ src)))
|
||||
}
|
||||
|
||||
val edges = vertices.flatMap { case (src, degree) =>
|
||||
new Iterator[Edge[Int]] {
|
||||
// Initialize the random number generator with the source vertex id
|
||||
val rand = new Random(src)
|
||||
var i = 0
|
||||
override def hasNext(): Boolean = { i < degree }
|
||||
override def next(): Edge[Int] = {
|
||||
val nextEdge = Edge[Int](src, rand.nextInt(numVertices), i)
|
||||
i += 1
|
||||
nextEdge
|
||||
}
|
||||
}
|
||||
generateRandomEdges(src.toInt, degree.toInt, numVertices, seed = (seed2 ^ src))
|
||||
}
|
||||
|
||||
Graph(vertices, edges, 0)
|
||||
}
|
||||
|
||||
|
@ -82,9 +85,10 @@ object GraphGenerators {
|
|||
// the edge data is the weight (default 1)
|
||||
val RMATc = 0.15
|
||||
|
||||
def generateRandomEdges(src: Int, numEdges: Int, maxVertexId: Int): Array[Edge[Int]] = {
|
||||
val rand = new Random()
|
||||
Array.fill(maxVertexId) { Edge[Int](src, rand.nextInt(maxVertexId), 1) }
|
||||
def generateRandomEdges(
|
||||
src: Int, numEdges: Int, maxVertexId: Int, seed: Long = -1): Array[Edge[Int]] = {
|
||||
val rand = if (seed == -1) new Random() else new Random(seed)
|
||||
Array.fill(numEdges) { Edge[Int](src, rand.nextInt(maxVertexId), 1) }
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -97,9 +101,12 @@ object GraphGenerators {
|
|||
* @param mu the mean of the normal distribution
|
||||
* @param sigma the standard deviation of the normal distribution
|
||||
* @param maxVal exclusive upper bound on the value of the sample
|
||||
* @param seed optional seed
|
||||
*/
|
||||
private def sampleLogNormal(mu: Double, sigma: Double, maxVal: Int): Int = {
|
||||
val rand = new Random()
|
||||
private[spark] def sampleLogNormal(
|
||||
mu: Double, sigma: Double, maxVal: Int, seed: Long = -1): Int = {
|
||||
val rand = if (seed == -1) new Random() else new Random(seed)
|
||||
|
||||
val sigmaSq = sigma * sigma
|
||||
val m = math.exp(mu + sigmaSq / 2.0)
|
||||
// expm1 is exp(m)-1 with better accuracy for tiny m
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.graphx.util
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.spark.graphx.LocalSparkContext
|
||||
|
||||
class GraphGeneratorsSuite extends FunSuite with LocalSparkContext {
|
||||
|
||||
test("GraphGenerators.generateRandomEdges") {
|
||||
val src = 5
|
||||
val numEdges10 = 10
|
||||
val numEdges20 = 20
|
||||
val maxVertexId = 100
|
||||
|
||||
val edges10 = GraphGenerators.generateRandomEdges(src, numEdges10, maxVertexId)
|
||||
assert(edges10.length == numEdges10)
|
||||
|
||||
val correctSrc = edges10.forall(e => e.srcId == src)
|
||||
assert(correctSrc)
|
||||
|
||||
val correctWeight = edges10.forall(e => e.attr == 1)
|
||||
assert(correctWeight)
|
||||
|
||||
val correctRange = edges10.forall(e => e.dstId >= 0 && e.dstId <= maxVertexId)
|
||||
assert(correctRange)
|
||||
|
||||
val edges20 = GraphGenerators.generateRandomEdges(src, numEdges20, maxVertexId)
|
||||
assert(edges20.length == numEdges20)
|
||||
|
||||
val edges10_round1 =
|
||||
GraphGenerators.generateRandomEdges(src, numEdges10, maxVertexId, seed = 12345)
|
||||
val edges10_round2 =
|
||||
GraphGenerators.generateRandomEdges(src, numEdges10, maxVertexId, seed = 12345)
|
||||
assert(edges10_round1.zip(edges10_round2).forall { case (e1, e2) =>
|
||||
e1.srcId == e2.srcId && e1.dstId == e2.dstId && e1.attr == e2.attr
|
||||
})
|
||||
|
||||
val edges10_round3 =
|
||||
GraphGenerators.generateRandomEdges(src, numEdges10, maxVertexId, seed = 3467)
|
||||
assert(!edges10_round1.zip(edges10_round3).forall { case (e1, e2) =>
|
||||
e1.srcId == e2.srcId && e1.dstId == e2.dstId && e1.attr == e2.attr
|
||||
})
|
||||
}
|
||||
|
||||
test("GraphGenerators.sampleLogNormal") {
|
||||
val mu = 4.0
|
||||
val sigma = 1.3
|
||||
val maxVal = 100
|
||||
|
||||
val dstId = GraphGenerators.sampleLogNormal(mu, sigma, maxVal)
|
||||
assert(dstId < maxVal)
|
||||
|
||||
val dstId_round1 = GraphGenerators.sampleLogNormal(mu, sigma, maxVal, 12345)
|
||||
val dstId_round2 = GraphGenerators.sampleLogNormal(mu, sigma, maxVal, 12345)
|
||||
assert(dstId_round1 == dstId_round2)
|
||||
|
||||
val dstId_round3 = GraphGenerators.sampleLogNormal(mu, sigma, maxVal, 789)
|
||||
assert(dstId_round1 != dstId_round3)
|
||||
}
|
||||
|
||||
test("GraphGenerators.logNormalGraph") {
|
||||
withSpark { sc =>
|
||||
val mu = 4.0
|
||||
val sigma = 1.3
|
||||
val numVertices100 = 100
|
||||
|
||||
val graph = GraphGenerators.logNormalGraph(sc, numVertices100, mu = mu, sigma = sigma)
|
||||
assert(graph.vertices.count() == numVertices100)
|
||||
|
||||
val graph_round1 =
|
||||
GraphGenerators.logNormalGraph(sc, numVertices100, mu = mu, sigma = sigma, seed = 12345)
|
||||
val graph_round2 =
|
||||
GraphGenerators.logNormalGraph(sc, numVertices100, mu = mu, sigma = sigma, seed = 12345)
|
||||
|
||||
val graph_round1_edges = graph_round1.edges.collect()
|
||||
val graph_round2_edges = graph_round2.edges.collect()
|
||||
|
||||
assert(graph_round1_edges.zip(graph_round2_edges).forall { case (e1, e2) =>
|
||||
e1.srcId == e2.srcId && e1.dstId == e2.dstId && e1.attr == e2.attr
|
||||
})
|
||||
|
||||
val graph_round3 =
|
||||
GraphGenerators.logNormalGraph(sc, numVertices100, mu = mu, sigma = sigma, seed = 567)
|
||||
|
||||
val graph_round3_edges = graph_round3.edges.collect()
|
||||
|
||||
assert(!graph_round1_edges.zip(graph_round3_edges).forall { case (e1, e2) =>
|
||||
e1.srcId == e2.srcId && e1.dstId == e2.dstId && e1.attr == e2.attr
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in a new issue