[SPARK-20265][MLLIB] Improve Prefix'span pre-processing efficiency
## What changes were proposed in this pull request? Improve PrefixSpan pre-processing efficency by preventing sequences of zero in the cleaned database. The efficiency gain is reflected in the following graph : https://postimg.org/image/9x6ireuvn/ ## How was this patch tested? Using MLlib's PrefixSpan existing tests and tests of my own on the 8 datasets shown in the graph. All result obtained were stricly the same as the original implementation (without this change). dev/run-tests was also runned, no error were found. Author : Cyril de Vogelaere <cyril.devogelaeregmail.com> Author: Syrux <pokcyril@hotmail.com> Closes #17575 from Syrux/SPARK-20265.
This commit is contained in:
parent
ec68d8f8cf
commit
095d1cb3aa
|
@ -144,45 +144,13 @@ class PrefixSpan private (
|
|||
logInfo(s"minimum count for a frequent pattern: $minCount")
|
||||
|
||||
// Find frequent items.
|
||||
val freqItemAndCounts = data.flatMap { itemsets =>
|
||||
val uniqItems = mutable.Set.empty[Item]
|
||||
itemsets.foreach { _.foreach { item =>
|
||||
uniqItems += item
|
||||
}}
|
||||
uniqItems.toIterator.map((_, 1L))
|
||||
}.reduceByKey(_ + _)
|
||||
.filter { case (_, count) =>
|
||||
count >= minCount
|
||||
}.collect()
|
||||
val freqItems = freqItemAndCounts.sortBy(-_._2).map(_._1)
|
||||
val freqItems = findFrequentItems(data, minCount)
|
||||
logInfo(s"number of frequent items: ${freqItems.length}")
|
||||
|
||||
// Keep only frequent items from input sequences and convert them to internal storage.
|
||||
val itemToInt = freqItems.zipWithIndex.toMap
|
||||
val dataInternalRepr = data.flatMap { itemsets =>
|
||||
val allItems = mutable.ArrayBuilder.make[Int]
|
||||
var containsFreqItems = false
|
||||
allItems += 0
|
||||
itemsets.foreach { itemsets =>
|
||||
val items = mutable.ArrayBuilder.make[Int]
|
||||
itemsets.foreach { item =>
|
||||
if (itemToInt.contains(item)) {
|
||||
items += itemToInt(item) + 1 // using 1-indexing in internal format
|
||||
}
|
||||
}
|
||||
val result = items.result()
|
||||
if (result.nonEmpty) {
|
||||
containsFreqItems = true
|
||||
allItems ++= result.sorted
|
||||
}
|
||||
allItems += 0
|
||||
}
|
||||
if (containsFreqItems) {
|
||||
Iterator.single(allItems.result())
|
||||
} else {
|
||||
Iterator.empty
|
||||
}
|
||||
}.persist(StorageLevel.MEMORY_AND_DISK)
|
||||
val dataInternalRepr = toDatabaseInternalRepr(data, itemToInt)
|
||||
.persist(StorageLevel.MEMORY_AND_DISK)
|
||||
|
||||
val results = genFreqPatterns(dataInternalRepr, minCount, maxPatternLength, maxLocalProjDBSize)
|
||||
|
||||
|
@ -231,6 +199,67 @@ class PrefixSpan private (
|
|||
@Since("1.5.0")
|
||||
object PrefixSpan extends Logging {
|
||||
|
||||
/**
|
||||
* This methods finds all frequent items in a input dataset.
|
||||
*
|
||||
* @param data Sequences of itemsets.
|
||||
* @param minCount The minimal number of sequence an item should be present in to be frequent
|
||||
*
|
||||
* @return An array of Item containing only frequent items.
|
||||
*/
|
||||
private[fpm] def findFrequentItems[Item: ClassTag](
|
||||
data: RDD[Array[Array[Item]]],
|
||||
minCount: Long): Array[Item] = {
|
||||
|
||||
data.flatMap { itemsets =>
|
||||
val uniqItems = mutable.Set.empty[Item]
|
||||
itemsets.foreach(set => uniqItems ++= set)
|
||||
uniqItems.toIterator.map((_, 1L))
|
||||
}.reduceByKey(_ + _).filter { case (_, count) =>
|
||||
count >= minCount
|
||||
}.sortBy(-_._2).map(_._1).collect()
|
||||
}
|
||||
|
||||
/**
|
||||
* This methods cleans the input dataset from un-frequent items, and translate it's item
|
||||
* to their corresponding Int identifier.
|
||||
*
|
||||
* @param data Sequences of itemsets.
|
||||
* @param itemToInt A map allowing translation of frequent Items to their Int Identifier.
|
||||
* The map should only contain frequent item.
|
||||
*
|
||||
* @return The internal repr of the inputted dataset. With properly placed zero delimiter.
|
||||
*/
|
||||
private[fpm] def toDatabaseInternalRepr[Item: ClassTag](
|
||||
data: RDD[Array[Array[Item]]],
|
||||
itemToInt: Map[Item, Int]): RDD[Array[Int]] = {
|
||||
|
||||
data.flatMap { itemsets =>
|
||||
val allItems = mutable.ArrayBuilder.make[Int]
|
||||
var containsFreqItems = false
|
||||
allItems += 0
|
||||
itemsets.foreach { itemsets =>
|
||||
val items = mutable.ArrayBuilder.make[Int]
|
||||
itemsets.foreach { item =>
|
||||
if (itemToInt.contains(item)) {
|
||||
items += itemToInt(item) + 1 // using 1-indexing in internal format
|
||||
}
|
||||
}
|
||||
val result = items.result()
|
||||
if (result.nonEmpty) {
|
||||
containsFreqItems = true
|
||||
allItems ++= result.sorted
|
||||
allItems += 0
|
||||
}
|
||||
}
|
||||
if (containsFreqItems) {
|
||||
Iterator.single(allItems.result())
|
||||
} else {
|
||||
Iterator.empty
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the complete set of frequent sequential patterns in the input sequences.
|
||||
* @param data ordered sequences of itemsets. We represent a sequence internally as Array[Int],
|
||||
|
|
|
@ -360,6 +360,49 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
|
|||
compareResults(expected, model.freqSequences.collect())
|
||||
}
|
||||
|
||||
test("PrefixSpan pre-processing's cleaning test") {
|
||||
|
||||
// One item per itemSet
|
||||
val itemToInt1 = (4 to 5).zipWithIndex.toMap
|
||||
val sequences1 = Seq(
|
||||
Array(Array(4), Array(1), Array(2), Array(5), Array(2), Array(4), Array(5)),
|
||||
Array(Array(6), Array(7), Array(8)))
|
||||
val rdd1 = sc.parallelize(sequences1, 2).cache()
|
||||
|
||||
val cleanedSequence1 = PrefixSpan.toDatabaseInternalRepr(rdd1, itemToInt1).collect()
|
||||
|
||||
val expected1 = Array(Array(0, 4, 0, 5, 0, 4, 0, 5, 0))
|
||||
.map(_.map(x => if (x == 0) 0 else itemToInt1(x) + 1))
|
||||
|
||||
compareInternalSequences(expected1, cleanedSequence1)
|
||||
|
||||
// Multi-item sequence
|
||||
val itemToInt2 = (4 to 6).zipWithIndex.toMap
|
||||
val sequences2 = Seq(
|
||||
Array(Array(4, 5), Array(1, 6, 2), Array(2), Array(5), Array(2), Array(4), Array(5, 6, 7)),
|
||||
Array(Array(8, 9), Array(1, 2)))
|
||||
val rdd2 = sc.parallelize(sequences2, 2).cache()
|
||||
|
||||
val cleanedSequence2 = PrefixSpan.toDatabaseInternalRepr(rdd2, itemToInt2).collect()
|
||||
|
||||
val expected2 = Array(Array(0, 4, 5, 0, 6, 0, 5, 0, 4, 0, 5, 6, 0))
|
||||
.map(_.map(x => if (x == 0) 0 else itemToInt2(x) + 1))
|
||||
|
||||
compareInternalSequences(expected2, cleanedSequence2)
|
||||
|
||||
// Emptied sequence
|
||||
val itemToInt3 = (10 to 10).zipWithIndex.toMap
|
||||
val sequences3 = Seq(
|
||||
Array(Array(4, 5), Array(1, 6, 2), Array(2), Array(5), Array(2), Array(4), Array(5, 6, 7)),
|
||||
Array(Array(8, 9), Array(1, 2)))
|
||||
val rdd3 = sc.parallelize(sequences3, 2).cache()
|
||||
|
||||
val cleanedSequence3 = PrefixSpan.toDatabaseInternalRepr(rdd3, itemToInt3).collect()
|
||||
val expected3 = Array[Array[Int]]()
|
||||
|
||||
compareInternalSequences(expected3, cleanedSequence3)
|
||||
}
|
||||
|
||||
test("model save/load") {
|
||||
val sequences = Seq(
|
||||
Array(Array(1, 2), Array(3)),
|
||||
|
@ -409,4 +452,12 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
|
|||
val actualSet = actualValue.map(x => (x._1.toSeq, x._2)).toSet
|
||||
assert(expectedSet === actualSet)
|
||||
}
|
||||
|
||||
private def compareInternalSequences(
|
||||
expectedValue: Array[Array[Int]],
|
||||
actualValue: Array[Array[Int]]): Unit = {
|
||||
val expectedSet = expectedValue.map(x => x.toSeq).toSet
|
||||
val actualSet = actualValue.map(x => x.toSeq).toSet
|
||||
assert(expectedSet === actualSet)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue