From fae981e5f32e0ddb86591a616829423dfafb4ed0 Mon Sep 17 00:00:00 2001 From: yan ma Date: Fri, 20 Mar 2020 10:32:58 -0500 Subject: [PATCH] [SPARK-30773][ML] Support NativeBlas for level-1 routines ### What changes were proposed in this pull request? Change BLAS for part of level-1 routines(axpy, dot, scal(double, denseVector)) from java implementation to NativeBLAS when vector size>256 ### Why are the changes needed? In current ML BLAS.scala, all level-1 routines are fixed to use java implementation. But NativeBLAS(intel MKL, OpenBLAS) can bring up to 11X performance improvement based on performance test which apply direct calls against these methods. We should provide a way to allow user take advantage of NativeBLAS for level-1 routines. Here we do it through switching to NativeBLAS for these methods from f2jBLAS. ### Does this PR introduce any user-facing change? Yes, methods axpy, dot, scal in level-1 routines will switch to NativeBLAS when it has more than nativeL1Threshold(fixed value 256) elements and will fallback to f2jBLAS if native BLAS is not properly configured in system. ### How was this patch tested? Perf test direct calls level-1 routines Closes #27546 from yma11/SPARK-30773. Lead-authored-by: yan ma Co-authored-by: Ma Yan Signed-off-by: Sean Owen --- docs/ml-guide.md | 2 +- .../org/apache/spark/ml/linalg/BLAS.scala | 27 ++++++++++++------- .../apache/spark/ml/linalg/BLASSuite.scala | 6 +++++ .../org/apache/spark/mllib/linalg/BLAS.scala | 27 ++++++++++++------- .../apache/spark/mllib/linalg/BLASSuite.scala | 6 +++++ 5 files changed, 49 insertions(+), 19 deletions(-) diff --git a/docs/ml-guide.md b/docs/ml-guide.md index 2037285f3f..5ce6b4f58c 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -78,7 +78,7 @@ The most popular native BLAS such as [Intel MKL](https://software.intel.com/en-u Configuring these BLAS implementations to use a single thread for operations may actually improve performance (see [SPARK-21305](https://issues.apache.org/jira/browse/SPARK-21305)). It is usually optimal to match this to the number of cores each Spark task is configured to use, which is 1 by default and typically left at 1. -Please refer to resources like the following to understand how to configure the number of threads these BLAS implementations use: [Intel MKL](https://software.intel.com/en-us/articles/recommended-settings-for-calling-intel-mkl-routines-from-multi-threaded-applications) and [OpenBLAS](https://github.com/xianyi/OpenBLAS/wiki/faq#multi-threaded). +Please refer to resources like the following to understand how to configure the number of threads these BLAS implementations use: [Intel MKL](https://software.intel.com/en-us/articles/recommended-settings-for-calling-intel-mkl-routines-from-multi-threaded-applications) or [Intel oneMKL](https://software.intel.com/en-us/onemkl-linux-developer-guide-improving-performance-with-threading) and [OpenBLAS](https://github.com/xianyi/OpenBLAS/wiki/faq#multi-threaded). Note that if nativeBLAS is not properly configured in system, java implementation(f2jBLAS) will be used as fallback option. To use MLlib in Python, you will need [NumPy](http://www.numpy.org) version 1.4 or newer. diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index ea1219165b..3d3e7a22e5 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -27,8 +27,9 @@ private[spark] object BLAS extends Serializable { @transient private var _f2jBLAS: NetlibBLAS = _ @transient private var _nativeBLAS: NetlibBLAS = _ + private val nativeL1Threshold: Int = 256 - // For level-1 routines, we use Java implementation. + // For level-1 function dspmv, use f2jBLAS for better performance. private[ml] def f2jBLAS: NetlibBLAS = { if (_f2jBLAS == null) { _f2jBLAS = new F2jBLAS @@ -36,6 +37,14 @@ private[spark] object BLAS extends Serializable { _f2jBLAS } + private[ml] def getBLAS(vectorSize: Int): NetlibBLAS = { + if (vectorSize < nativeL1Threshold) { + f2jBLAS + } else { + nativeBLAS + } + } + /** * y += a * x */ @@ -63,7 +72,7 @@ private[spark] object BLAS extends Serializable { */ private def axpy(a: Double, x: DenseVector, y: DenseVector): Unit = { val n = x.size - f2jBLAS.daxpy(n, a, x.values, 1, y.values, 1) + getBLAS(n).daxpy(n, a, x.values, 1, y.values, 1) } /** @@ -94,7 +103,7 @@ private[spark] object BLAS extends Serializable { private[spark] def axpy(a: Double, X: DenseMatrix, Y: DenseMatrix): Unit = { require(X.numRows == Y.numRows && X.numCols == Y.numCols, "Dimension mismatch: " + s"size(X) = ${(X.numRows, X.numCols)} but size(Y) = ${(Y.numRows, Y.numCols)}.") - f2jBLAS.daxpy(X.numRows * X.numCols, a, X.values, 1, Y.values, 1) + getBLAS(X.values.length).daxpy(X.numRows * X.numCols, a, X.values, 1, Y.values, 1) } /** @@ -123,7 +132,7 @@ private[spark] object BLAS extends Serializable { */ private def dot(x: DenseVector, y: DenseVector): Double = { val n = x.size - f2jBLAS.ddot(n, x.values, 1, y.values, 1) + getBLAS(n).ddot(n, x.values, 1, y.values, 1) } /** @@ -218,16 +227,16 @@ private[spark] object BLAS extends Serializable { def scal(a: Double, x: Vector): Unit = { x match { case sx: SparseVector => - f2jBLAS.dscal(sx.values.length, a, sx.values, 1) + getBLAS(sx.values.length).dscal(sx.values.length, a, sx.values, 1) case dx: DenseVector => - f2jBLAS.dscal(dx.values.length, a, dx.values, 1) + getBLAS(dx.size).dscal(dx.values.length, a, dx.values, 1) case _ => throw new IllegalArgumentException(s"scal doesn't support vector type ${x.getClass}.") } } // For level-3 routines, we use the native BLAS. - private def nativeBLAS: NetlibBLAS = { + private[ml] def nativeBLAS: NetlibBLAS = { if (_nativeBLAS == null) { _nativeBLAS = NativeBLAS } @@ -374,7 +383,7 @@ private[spark] object BLAS extends Serializable { // gemm: alpha is equal to 0 and beta is equal to 1. Returning C. return } else if (alpha == 0.0) { - f2jBLAS.dscal(C.values.length, beta, C.values, 1) + getBLAS(C.values.length).dscal(C.values.length, beta, C.values, 1) } else { A match { case sparse: SparseMatrix => gemm(alpha, sparse, B, beta, C) @@ -480,7 +489,7 @@ private[spark] object BLAS extends Serializable { } else { // Scale matrix first if `beta` is not equal to 1.0 if (beta != 1.0) { - f2jBLAS.dscal(C.values.length, beta, C.values, 1) + getBLAS(C.values.length).dscal(C.values.length, beta, C.values, 1) } // Perform matrix multiplication and add to C. The rows of A are multiplied by the columns of // B, and added to C. diff --git a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala index 877ac68983..781f3da313 100644 --- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala +++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala @@ -23,6 +23,12 @@ import org.apache.spark.ml.util.TestingUtils._ class BLASSuite extends SparkMLFunSuite { + test("nativeL1Threshold") { + assert(getBLAS(128) == BLAS.f2jBLAS) + assert(getBLAS(256) == BLAS.nativeBLAS) + assert(getBLAS(512) == BLAS.nativeBLAS) + } + test("copy") { val sx = Vectors.sparse(4, Array(0, 2), Array(1.0, -2.0)) val dx = Vectors.dense(1.0, 0.0, -2.0, 0.0) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala index 08086ceff9..da486010cf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala @@ -29,8 +29,9 @@ private[spark] object BLAS extends Serializable with Logging { @transient private var _f2jBLAS: NetlibBLAS = _ @transient private var _nativeBLAS: NetlibBLAS = _ + private val nativeL1Threshold: Int = 256 - // For level-1 routines, we use Java implementation. + // For level-1 function dspmv, use f2jBLAS for better performance. private[mllib] def f2jBLAS: NetlibBLAS = { if (_f2jBLAS == null) { _f2jBLAS = new F2jBLAS @@ -38,6 +39,14 @@ private[spark] object BLAS extends Serializable with Logging { _f2jBLAS } + private[mllib] def getBLAS(vectorSize: Int): NetlibBLAS = { + if (vectorSize < nativeL1Threshold) { + f2jBLAS + } else { + nativeBLAS + } + } + /** * y += a * x */ @@ -65,7 +74,7 @@ private[spark] object BLAS extends Serializable with Logging { */ private def axpy(a: Double, x: DenseVector, y: DenseVector): Unit = { val n = x.size - f2jBLAS.daxpy(n, a, x.values, 1, y.values, 1) + getBLAS(n).daxpy(n, a, x.values, 1, y.values, 1) } /** @@ -96,7 +105,7 @@ private[spark] object BLAS extends Serializable with Logging { private[spark] def axpy(a: Double, X: DenseMatrix, Y: DenseMatrix): Unit = { require(X.numRows == Y.numRows && X.numCols == Y.numCols, "Dimension mismatch: " + s"size(X) = ${(X.numRows, X.numCols)} but size(Y) = ${(Y.numRows, Y.numCols)}.") - f2jBLAS.daxpy(X.numRows * X.numCols, a, X.values, 1, Y.values, 1) + getBLAS(X.values.length).daxpy(X.numRows * X.numCols, a, X.values, 1, Y.values, 1) } /** @@ -125,7 +134,7 @@ private[spark] object BLAS extends Serializable with Logging { */ private def dot(x: DenseVector, y: DenseVector): Double = { val n = x.size - f2jBLAS.ddot(n, x.values, 1, y.values, 1) + getBLAS(n).ddot(n, x.values, 1, y.values, 1) } /** @@ -220,16 +229,16 @@ private[spark] object BLAS extends Serializable with Logging { def scal(a: Double, x: Vector): Unit = { x match { case sx: SparseVector => - f2jBLAS.dscal(sx.values.length, a, sx.values, 1) + getBLAS(sx.values.length).dscal(sx.values.length, a, sx.values, 1) case dx: DenseVector => - f2jBLAS.dscal(dx.values.length, a, dx.values, 1) + getBLAS(dx.size).dscal(dx.values.length, a, dx.values, 1) case _ => throw new IllegalArgumentException(s"scal doesn't support vector type ${x.getClass}.") } } // For level-3 routines, we use the native BLAS. - private def nativeBLAS: NetlibBLAS = { + private[mllib] def nativeBLAS: NetlibBLAS = { if (_nativeBLAS == null) { _nativeBLAS = NativeBLAS } @@ -356,7 +365,7 @@ private[spark] object BLAS extends Serializable with Logging { if (alpha == 0.0 && beta == 1.0) { logDebug("gemm: alpha is equal to 0 and beta is equal to 1. Returning C.") } else if (alpha == 0.0) { - f2jBLAS.dscal(C.values.length, beta, C.values, 1) + getBLAS(C.values.length).dscal(C.values.length, beta, C.values, 1) } else { A match { case sparse: SparseMatrix => gemm(alpha, sparse, B, beta, C) @@ -462,7 +471,7 @@ private[spark] object BLAS extends Serializable with Logging { } else { // Scale matrix first if `beta` is not equal to 1.0 if (beta != 1.0) { - f2jBLAS.dscal(C.values.length, beta, C.values, 1) + getBLAS(C.values.length).dscal(C.values.length, beta, C.values, 1) } // Perform matrix multiplication and add to C. The rows of A are multiplied by the columns of // B, and added to C. diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala index 6e68c1c9d3..12ab2ac3cc 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala @@ -23,6 +23,12 @@ import org.apache.spark.mllib.util.TestingUtils._ class BLASSuite extends SparkFunSuite { + test("nativeL1Threshold") { + assert(getBLAS(128) == BLAS.f2jBLAS) + assert(getBLAS(256) == BLAS.nativeBLAS) + assert(getBLAS(512) == BLAS.nativeBLAS) + } + test("copy") { val sx = Vectors.sparse(4, Array(0, 2), Array(1.0, -2.0)) val dx = Vectors.dense(1.0, 0.0, -2.0, 0.0)