[SPARK-3614][MLLIB] Add minimumOccurence filtering to IDF

This PR for [SPARK-3614](https://issues.apache.org/jira/browse/SPARK-3614) adds functionality for filtering out terms which do not appear in at least a minimum number of documents.

This is implemented using a minimumOccurence parameter (default 0).  When terms' document frequencies are less than minimumOccurence, their IDFs are set to 0, just like when the DF is 0.  As a result, the TF-IDFs for the terms are found to be 0, as if the terms were not present in the documents.

This PR makes the following changes:
* Add a minimumOccurence parameter to the IDF and DocumentFrequencyAggregator classes.
* Create a parameter-less constructor for IDF with a default minimumOccurence value of 0 to remain backwards-compatibility with the original IDF API.
* Sets the IDFs to 0 for terms which DFs are less than minimumOccurence
* Add tests to the Spark IDFSuite and Java JavaTfIdfSuite test suites
* Updated the MLLib Feature Extraction programming guide to describe the new feature

Author: RJ Nowling <rnowling@gmail.com>

Closes #2494 from rnowling/spark-3614-idf-filter and squashes the following commits:

0aa3c63 [RJ Nowling] Fix identation
e6523a8 [RJ Nowling] Remove unnecessary toDouble's from IDFSuite
bfa82ec [RJ Nowling] Add space after if
30d20b3 [RJ Nowling] Add spaces around equals signs
9013447 [RJ Nowling] Add space before division operator
79978fc [RJ Nowling] Remove unnecessary semi-colon
40fd70c [RJ Nowling] Change minimumOccurence to minDocFreq in code and docs
47850ab [RJ Nowling] Changed minimumOccurence to Int from Long
9fb4093 [RJ Nowling] Remove unnecessary lines from IDF class docs
1fc09d8 [RJ Nowling] Add backwards-compatible constructor to DocumentFrequencyAggregator
1801fd2 [RJ Nowling] Fix style errors in IDF.scala
6897252 [RJ Nowling] Preface minimumOccurence members with val to make them final and immutable
a200bab [RJ Nowling] Remove unnecessary else statement
4b974f5 [RJ Nowling] Remove accidentally-added import from testing
c0cc643 [RJ Nowling] Add minimumOccurence filtering to IDF
This commit is contained in:
RJ Nowling 2014-09-26 09:58:47 -07:00 committed by Xiangrui Meng
parent d16e161d74
commit ec9df6a765
4 changed files with 103 additions and 5 deletions

View file

@ -82,6 +82,21 @@ tf.cache()
val idf = new IDF().fit(tf) val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf) val tfidf: RDD[Vector] = idf.transform(tf)
{% endhighlight %} {% endhighlight %}
MLLib's IDF implementation provides an option for ignoring terms which occur in less than a
minimum number of documents. In such cases, the IDF for these terms is set to 0. This feature
can be used by passing the `minDocFreq` value to the IDF constructor.
{% highlight scala %}
import org.apache.spark.mllib.feature.IDF
// ... continue from the previous example
tf.cache()
val idf = new IDF(minDocFreq = 2).fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
{% endhighlight %}
</div> </div>
</div> </div>

View file

@ -30,9 +30,18 @@ import org.apache.spark.rdd.RDD
* Inverse document frequency (IDF). * Inverse document frequency (IDF).
* The standard formulation is used: `idf = log((m + 1) / (d(t) + 1))`, where `m` is the total * The standard formulation is used: `idf = log((m + 1) / (d(t) + 1))`, where `m` is the total
* number of documents and `d(t)` is the number of documents that contain term `t`. * number of documents and `d(t)` is the number of documents that contain term `t`.
*
* This implementation supports filtering out terms which do not appear in a minimum number
* of documents (controlled by the variable `minDocFreq`). For terms that are not in
* at least `minDocFreq` documents, the IDF is found as 0, resulting in TF-IDFs of 0.
*
* @param minDocFreq minimum of documents in which a term
* should appear for filtering
*/ */
@Experimental @Experimental
class IDF { class IDF(val minDocFreq: Int) {
def this() = this(0)
// TODO: Allow different IDF formulations. // TODO: Allow different IDF formulations.
@ -41,7 +50,8 @@ class IDF {
* @param dataset an RDD of term frequency vectors * @param dataset an RDD of term frequency vectors
*/ */
def fit(dataset: RDD[Vector]): IDFModel = { def fit(dataset: RDD[Vector]): IDFModel = {
val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator)( val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator(
minDocFreq = minDocFreq))(
seqOp = (df, v) => df.add(v), seqOp = (df, v) => df.add(v),
combOp = (df1, df2) => df1.merge(df2) combOp = (df1, df2) => df1.merge(df2)
).idf() ).idf()
@ -60,13 +70,16 @@ class IDF {
private object IDF { private object IDF {
/** Document frequency aggregator. */ /** Document frequency aggregator. */
class DocumentFrequencyAggregator extends Serializable { class DocumentFrequencyAggregator(val minDocFreq: Int) extends Serializable {
/** number of documents */ /** number of documents */
private var m = 0L private var m = 0L
/** document frequency vector */ /** document frequency vector */
private var df: BDV[Long] = _ private var df: BDV[Long] = _
def this() = this(0)
/** Adds a new document. */ /** Adds a new document. */
def add(doc: Vector): this.type = { def add(doc: Vector): this.type = {
if (isEmpty) { if (isEmpty) {
@ -123,7 +136,18 @@ private object IDF {
val inv = new Array[Double](n) val inv = new Array[Double](n)
var j = 0 var j = 0
while (j < n) { while (j < n) {
inv(j) = math.log((m + 1.0)/ (df(j) + 1.0)) /*
* If the term is not present in the minimum
* number of documents, set IDF to 0. This
* will cause multiplication in IDFModel to
* set TF-IDF to 0.
*
* Since arrays are initialized to 0 by default,
* we just omit changing those entries.
*/
if(df(j) >= minDocFreq) {
inv(j) = math.log((m + 1.0) / (df(j) + 1.0))
}
j += 1 j += 1
} }
Vectors.dense(inv) Vectors.dense(inv)
@ -140,6 +164,11 @@ class IDFModel private[mllib] (val idf: Vector) extends Serializable {
/** /**
* Transforms term frequency (TF) vectors to TF-IDF vectors. * Transforms term frequency (TF) vectors to TF-IDF vectors.
*
* If `minDocFreq` was set for the IDF calculation,
* the terms which occur in fewer than `minDocFreq`
* documents will have an entry of 0.
*
* @param dataset an RDD of term frequency vectors * @param dataset an RDD of term frequency vectors
* @return an RDD of TF-IDF vectors * @return an RDD of TF-IDF vectors
*/ */

View file

@ -63,4 +63,24 @@ public class JavaTfIdfSuite implements Serializable {
Assert.assertEquals(0.0, v.apply(indexOfThis), 1e-15); Assert.assertEquals(0.0, v.apply(indexOfThis), 1e-15);
} }
} }
@Test
public void tfIdfMinimumDocumentFrequency() {
// The tests are to check Java compatibility.
HashingTF tf = new HashingTF();
JavaRDD<ArrayList<String>> documents = sc.parallelize(Lists.newArrayList(
Lists.newArrayList("this is a sentence".split(" ")),
Lists.newArrayList("this is another sentence".split(" ")),
Lists.newArrayList("this is still a sentence".split(" "))), 2);
JavaRDD<Vector> termFreqs = tf.transform(documents);
termFreqs.collect();
IDF idf = new IDF(2);
JavaRDD<Vector> tfIdfs = idf.fit(termFreqs).transform(termFreqs);
List<Vector> localTfIdfs = tfIdfs.collect();
int indexOfThis = tf.indexOf("this");
for (Vector v: localTfIdfs) {
Assert.assertEquals(0.0, v.apply(indexOfThis), 1e-15);
}
}
} }

View file

@ -38,7 +38,7 @@ class IDFSuite extends FunSuite with LocalSparkContext {
val idf = new IDF val idf = new IDF
val model = idf.fit(termFrequencies) val model = idf.fit(termFrequencies)
val expected = Vectors.dense(Array(0, 3, 1, 2).map { x => val expected = Vectors.dense(Array(0, 3, 1, 2).map { x =>
math.log((m.toDouble + 1.0) / (x + 1.0)) math.log((m + 1.0) / (x + 1.0))
}) })
assert(model.idf ~== expected absTol 1e-12) assert(model.idf ~== expected absTol 1e-12)
val tfidf = model.transform(termFrequencies).cache().zipWithIndex().map(_.swap).collectAsMap() val tfidf = model.transform(termFrequencies).cache().zipWithIndex().map(_.swap).collectAsMap()
@ -54,4 +54,38 @@ class IDFSuite extends FunSuite with LocalSparkContext {
assert(tfidf2.indices === Array(1)) assert(tfidf2.indices === Array(1))
assert(tfidf2.values(0) ~== (1.0 * expected(1)) absTol 1e-12) assert(tfidf2.values(0) ~== (1.0 * expected(1)) absTol 1e-12)
} }
test("idf minimum document frequency filtering") {
val n = 4
val localTermFrequencies = Seq(
Vectors.sparse(n, Array(1, 3), Array(1.0, 2.0)),
Vectors.dense(0.0, 1.0, 2.0, 3.0),
Vectors.sparse(n, Array(1), Array(1.0))
)
val m = localTermFrequencies.size
val termFrequencies = sc.parallelize(localTermFrequencies, 2)
val idf = new IDF(minDocFreq = 1)
val model = idf.fit(termFrequencies)
val expected = Vectors.dense(Array(0, 3, 1, 2).map { x =>
if (x > 0) {
math.log((m + 1.0) / (x + 1.0))
} else {
0
}
})
assert(model.idf ~== expected absTol 1e-12)
val tfidf = model.transform(termFrequencies).cache().zipWithIndex().map(_.swap).collectAsMap()
assert(tfidf.size === 3)
val tfidf0 = tfidf(0L).asInstanceOf[SparseVector]
assert(tfidf0.indices === Array(1, 3))
assert(Vectors.dense(tfidf0.values) ~==
Vectors.dense(1.0 * expected(1), 2.0 * expected(3)) absTol 1e-12)
val tfidf1 = tfidf(1L).asInstanceOf[DenseVector]
assert(Vectors.dense(tfidf1.values) ~==
Vectors.dense(0.0, 1.0 * expected(1), 2.0 * expected(2), 3.0 * expected(3)) absTol 1e-12)
val tfidf2 = tfidf(2L).asInstanceOf[SparseVector]
assert(tfidf2.indices === Array(1))
assert(tfidf2.values(0) ~== (1.0 * expected(1)) absTol 1e-12)
}
} }