[SPARK-21401][ML][MLLIB] add poll function for BoundedPriorityQueue

## What changes were proposed in this pull request?
The most of BoundedPriorityQueue usages in ML/MLLIB are:
Get the value of BoundedPriorityQueue, then sort it.
For example, in Word2Vec: pq.toSeq.sortBy(-_._2)
in ALS, pq.toArray.sorted()

The test results show using pq.poll is much faster than sort the value.
It is good to add the poll function for BoundedPriorityQueue.

## How was this patch tested?
The existing UT

Author: Peng <peng.meng@intel.com>
Author: Peng Meng <peng.meng@intel.com>

Closes #18620 from mpjlu/add-poll.
This commit is contained in:
Peng 2017-07-19 09:56:48 +01:00 committed by Sean Owen
parent ae253e5a87
commit 46307b2cd3
2 changed files with 55 additions and 0 deletions

View file

@ -51,6 +51,10 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Orderin
this
}
def poll(): A = {
underlying.poll()
}
override def +=(elem1: A, elem2: A, elems: A*): this.type = {
this += elem1 += elem2 ++= elems
}

View file

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util
import org.apache.spark.SparkFunSuite
class BoundedPriorityQueueSuite extends SparkFunSuite {
test("BoundedPriorityQueue poll test") {
val pq = new BoundedPriorityQueue[Double](4)
pq += 0.1
pq += 1.5
pq += 1.0
pq += 0.3
pq += 0.01
assert(pq.isEmpty == false)
assert(pq.poll() == 0.1)
assert(pq.poll() == 0.3)
assert(pq.poll() == 1.0)
assert(pq.poll() == 1.5)
assert(pq.isEmpty == true)
val pq2 = new BoundedPriorityQueue[(Int, Double)](4)(Ordering.by(_._2))
pq2 += 1 -> 0.5
pq2 += 5 -> 0.1
pq2 += 3 -> 0.3
pq2 += 4 -> 0.2
pq2 += 1 -> 0.4
assert(pq2.poll()._2 == 0.2)
assert(pq2.poll()._2 == 0.3)
assert(pq2.poll()._2 == 0.4)
assert(pq2.poll()._2 == 0.5)
}
}