[SPARK-30773][ML] Support NativeBlas for level-1 routines

### What changes were proposed in this pull request?
Change BLAS for part of level-1 routines(axpy, dot, scal(double, denseVector)) from java implementation to NativeBLAS when vector size>256

### Why are the changes needed?
In current ML BLAS.scala, all level-1 routines are fixed to use java
implementation. But NativeBLAS(intel MKL, OpenBLAS) can bring up to 11X
performance improvement based on performance test which apply direct
calls against these methods. We should provide a way to allow user take
advantage of NativeBLAS for level-1 routines. Here we do it through
switching to NativeBLAS for these methods from f2jBLAS.

### Does this PR introduce any user-facing change?
 Yes, methods axpy, dot, scal in level-1 routines will switch to NativeBLAS when it has more than nativeL1Threshold(fixed value 256) elements and will fallback to f2jBLAS if native BLAS is not properly configured in system.

### How was this patch tested?
Perf test direct calls level-1 routines

Closes #27546 from yma11/SPARK-30773.

Lead-authored-by: yan ma <yan.ma@intel.com>
Co-authored-by: Ma Yan <yan.ma@intel.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
This commit is contained in:
yan ma 2020-03-20 10:32:58 -05:00 committed by Sean Owen
parent 9a990133f6
commit fae981e5f3
5 changed files with 49 additions and 19 deletions

View file

@ -78,7 +78,7 @@ The most popular native BLAS such as [Intel MKL](https://software.intel.com/en-u
Configuring these BLAS implementations to use a single thread for operations may actually improve performance (see [SPARK-21305](https://issues.apache.org/jira/browse/SPARK-21305)). It is usually optimal to match this to the number of cores each Spark task is configured to use, which is 1 by default and typically left at 1.
Please refer to resources like the following to understand how to configure the number of threads these BLAS implementations use: [Intel MKL](https://software.intel.com/en-us/articles/recommended-settings-for-calling-intel-mkl-routines-from-multi-threaded-applications) and [OpenBLAS](https://github.com/xianyi/OpenBLAS/wiki/faq#multi-threaded).
Please refer to resources like the following to understand how to configure the number of threads these BLAS implementations use: [Intel MKL](https://software.intel.com/en-us/articles/recommended-settings-for-calling-intel-mkl-routines-from-multi-threaded-applications) or [Intel oneMKL](https://software.intel.com/en-us/onemkl-linux-developer-guide-improving-performance-with-threading) and [OpenBLAS](https://github.com/xianyi/OpenBLAS/wiki/faq#multi-threaded). Note that if nativeBLAS is not properly configured in system, java implementation(f2jBLAS) will be used as fallback option.
To use MLlib in Python, you will need [NumPy](http://www.numpy.org) version 1.4 or newer.

View file

@ -27,8 +27,9 @@ private[spark] object BLAS extends Serializable {
@transient private var _f2jBLAS: NetlibBLAS = _
@transient private var _nativeBLAS: NetlibBLAS = _
private val nativeL1Threshold: Int = 256
// For level-1 routines, we use Java implementation.
// For level-1 function dspmv, use f2jBLAS for better performance.
private[ml] def f2jBLAS: NetlibBLAS = {
if (_f2jBLAS == null) {
_f2jBLAS = new F2jBLAS
@ -36,6 +37,14 @@ private[spark] object BLAS extends Serializable {
_f2jBLAS
}
private[ml] def getBLAS(vectorSize: Int): NetlibBLAS = {
if (vectorSize < nativeL1Threshold) {
f2jBLAS
} else {
nativeBLAS
}
}
/**
* y += a * x
*/
@ -63,7 +72,7 @@ private[spark] object BLAS extends Serializable {
*/
private def axpy(a: Double, x: DenseVector, y: DenseVector): Unit = {
val n = x.size
f2jBLAS.daxpy(n, a, x.values, 1, y.values, 1)
getBLAS(n).daxpy(n, a, x.values, 1, y.values, 1)
}
/**
@ -94,7 +103,7 @@ private[spark] object BLAS extends Serializable {
private[spark] def axpy(a: Double, X: DenseMatrix, Y: DenseMatrix): Unit = {
require(X.numRows == Y.numRows && X.numCols == Y.numCols, "Dimension mismatch: " +
s"size(X) = ${(X.numRows, X.numCols)} but size(Y) = ${(Y.numRows, Y.numCols)}.")
f2jBLAS.daxpy(X.numRows * X.numCols, a, X.values, 1, Y.values, 1)
getBLAS(X.values.length).daxpy(X.numRows * X.numCols, a, X.values, 1, Y.values, 1)
}
/**
@ -123,7 +132,7 @@ private[spark] object BLAS extends Serializable {
*/
private def dot(x: DenseVector, y: DenseVector): Double = {
val n = x.size
f2jBLAS.ddot(n, x.values, 1, y.values, 1)
getBLAS(n).ddot(n, x.values, 1, y.values, 1)
}
/**
@ -218,16 +227,16 @@ private[spark] object BLAS extends Serializable {
def scal(a: Double, x: Vector): Unit = {
x match {
case sx: SparseVector =>
f2jBLAS.dscal(sx.values.length, a, sx.values, 1)
getBLAS(sx.values.length).dscal(sx.values.length, a, sx.values, 1)
case dx: DenseVector =>
f2jBLAS.dscal(dx.values.length, a, dx.values, 1)
getBLAS(dx.size).dscal(dx.values.length, a, dx.values, 1)
case _ =>
throw new IllegalArgumentException(s"scal doesn't support vector type ${x.getClass}.")
}
}
// For level-3 routines, we use the native BLAS.
private def nativeBLAS: NetlibBLAS = {
private[ml] def nativeBLAS: NetlibBLAS = {
if (_nativeBLAS == null) {
_nativeBLAS = NativeBLAS
}
@ -374,7 +383,7 @@ private[spark] object BLAS extends Serializable {
// gemm: alpha is equal to 0 and beta is equal to 1. Returning C.
return
} else if (alpha == 0.0) {
f2jBLAS.dscal(C.values.length, beta, C.values, 1)
getBLAS(C.values.length).dscal(C.values.length, beta, C.values, 1)
} else {
A match {
case sparse: SparseMatrix => gemm(alpha, sparse, B, beta, C)
@ -480,7 +489,7 @@ private[spark] object BLAS extends Serializable {
} else {
// Scale matrix first if `beta` is not equal to 1.0
if (beta != 1.0) {
f2jBLAS.dscal(C.values.length, beta, C.values, 1)
getBLAS(C.values.length).dscal(C.values.length, beta, C.values, 1)
}
// Perform matrix multiplication and add to C. The rows of A are multiplied by the columns of
// B, and added to C.

View file

@ -23,6 +23,12 @@ import org.apache.spark.ml.util.TestingUtils._
class BLASSuite extends SparkMLFunSuite {
test("nativeL1Threshold") {
assert(getBLAS(128) == BLAS.f2jBLAS)
assert(getBLAS(256) == BLAS.nativeBLAS)
assert(getBLAS(512) == BLAS.nativeBLAS)
}
test("copy") {
val sx = Vectors.sparse(4, Array(0, 2), Array(1.0, -2.0))
val dx = Vectors.dense(1.0, 0.0, -2.0, 0.0)

View file

@ -29,8 +29,9 @@ private[spark] object BLAS extends Serializable with Logging {
@transient private var _f2jBLAS: NetlibBLAS = _
@transient private var _nativeBLAS: NetlibBLAS = _
private val nativeL1Threshold: Int = 256
// For level-1 routines, we use Java implementation.
// For level-1 function dspmv, use f2jBLAS for better performance.
private[mllib] def f2jBLAS: NetlibBLAS = {
if (_f2jBLAS == null) {
_f2jBLAS = new F2jBLAS
@ -38,6 +39,14 @@ private[spark] object BLAS extends Serializable with Logging {
_f2jBLAS
}
private[mllib] def getBLAS(vectorSize: Int): NetlibBLAS = {
if (vectorSize < nativeL1Threshold) {
f2jBLAS
} else {
nativeBLAS
}
}
/**
* y += a * x
*/
@ -65,7 +74,7 @@ private[spark] object BLAS extends Serializable with Logging {
*/
private def axpy(a: Double, x: DenseVector, y: DenseVector): Unit = {
val n = x.size
f2jBLAS.daxpy(n, a, x.values, 1, y.values, 1)
getBLAS(n).daxpy(n, a, x.values, 1, y.values, 1)
}
/**
@ -96,7 +105,7 @@ private[spark] object BLAS extends Serializable with Logging {
private[spark] def axpy(a: Double, X: DenseMatrix, Y: DenseMatrix): Unit = {
require(X.numRows == Y.numRows && X.numCols == Y.numCols, "Dimension mismatch: " +
s"size(X) = ${(X.numRows, X.numCols)} but size(Y) = ${(Y.numRows, Y.numCols)}.")
f2jBLAS.daxpy(X.numRows * X.numCols, a, X.values, 1, Y.values, 1)
getBLAS(X.values.length).daxpy(X.numRows * X.numCols, a, X.values, 1, Y.values, 1)
}
/**
@ -125,7 +134,7 @@ private[spark] object BLAS extends Serializable with Logging {
*/
private def dot(x: DenseVector, y: DenseVector): Double = {
val n = x.size
f2jBLAS.ddot(n, x.values, 1, y.values, 1)
getBLAS(n).ddot(n, x.values, 1, y.values, 1)
}
/**
@ -220,16 +229,16 @@ private[spark] object BLAS extends Serializable with Logging {
def scal(a: Double, x: Vector): Unit = {
x match {
case sx: SparseVector =>
f2jBLAS.dscal(sx.values.length, a, sx.values, 1)
getBLAS(sx.values.length).dscal(sx.values.length, a, sx.values, 1)
case dx: DenseVector =>
f2jBLAS.dscal(dx.values.length, a, dx.values, 1)
getBLAS(dx.size).dscal(dx.values.length, a, dx.values, 1)
case _ =>
throw new IllegalArgumentException(s"scal doesn't support vector type ${x.getClass}.")
}
}
// For level-3 routines, we use the native BLAS.
private def nativeBLAS: NetlibBLAS = {
private[mllib] def nativeBLAS: NetlibBLAS = {
if (_nativeBLAS == null) {
_nativeBLAS = NativeBLAS
}
@ -356,7 +365,7 @@ private[spark] object BLAS extends Serializable with Logging {
if (alpha == 0.0 && beta == 1.0) {
logDebug("gemm: alpha is equal to 0 and beta is equal to 1. Returning C.")
} else if (alpha == 0.0) {
f2jBLAS.dscal(C.values.length, beta, C.values, 1)
getBLAS(C.values.length).dscal(C.values.length, beta, C.values, 1)
} else {
A match {
case sparse: SparseMatrix => gemm(alpha, sparse, B, beta, C)
@ -462,7 +471,7 @@ private[spark] object BLAS extends Serializable with Logging {
} else {
// Scale matrix first if `beta` is not equal to 1.0
if (beta != 1.0) {
f2jBLAS.dscal(C.values.length, beta, C.values, 1)
getBLAS(C.values.length).dscal(C.values.length, beta, C.values, 1)
}
// Perform matrix multiplication and add to C. The rows of A are multiplied by the columns of
// B, and added to C.

View file

@ -23,6 +23,12 @@ import org.apache.spark.mllib.util.TestingUtils._
class BLASSuite extends SparkFunSuite {
test("nativeL1Threshold") {
assert(getBLAS(128) == BLAS.f2jBLAS)
assert(getBLAS(256) == BLAS.nativeBLAS)
assert(getBLAS(512) == BLAS.nativeBLAS)
}
test("copy") {
val sx = Vectors.sparse(4, Array(0, 2), Array(1.0, -2.0))
val dx = Vectors.dense(1.0, 0.0, -2.0, 0.0)