Revert "[SPARK-34309][BUILD][CORE][SQL][K8S] Use Caffeine instead of Guava Cache"
### What changes were proposed in this pull request? This pr revert the change of SPARK-34309, includes: - https://github.com/apache/spark/pull/31517 - https://github.com/apache/spark/pull/33772 ### Why are the changes needed? 1. No really performance improvement in Spark 2. Added an additional dependency ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass the Jenkins or GitHub Action Closes #33784 from LuciferYang/revert-caffeine. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
parent
90cbf9ca3e
commit
1ccb06ca8c
|
@ -58,10 +58,6 @@
|
|||
<artifactId>slf4j-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
|
|
|
@ -20,7 +20,10 @@ package org.apache.spark.network.shuffle;
|
|||
import java.io.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
|
@ -29,10 +32,11 @@ import org.apache.commons.lang3.tuple.Pair;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache;
|
||||
import com.github.benmanes.caffeine.cache.Weigher;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import com.google.common.cache.Weigher;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.iq80.leveldb.DB;
|
||||
import org.iq80.leveldb.DBIterator;
|
||||
|
@ -108,10 +112,16 @@ public class ExternalShuffleBlockResolver {
|
|||
Boolean.parseBoolean(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false"));
|
||||
this.registeredExecutorFile = registeredExecutorFile;
|
||||
String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m");
|
||||
shuffleIndexCache = Caffeine.newBuilder()
|
||||
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
|
||||
new CacheLoader<File, ShuffleIndexInformation>() {
|
||||
public ShuffleIndexInformation load(File file) throws IOException {
|
||||
return new ShuffleIndexInformation(file);
|
||||
}
|
||||
};
|
||||
shuffleIndexCache = CacheBuilder.newBuilder()
|
||||
.maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize))
|
||||
.weigher((Weigher<File, ShuffleIndexInformation>)(file, indexInfo) -> indexInfo.getSize())
|
||||
.build(ShuffleIndexInformation::new);
|
||||
.weigher((Weigher<File, ShuffleIndexInformation>) (file, indexInfo) -> indexInfo.getSize())
|
||||
.build(indexCacheLoader);
|
||||
db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
|
||||
if (db != null) {
|
||||
executors = reloadRegisteredExecutors(db);
|
||||
|
@ -303,7 +313,7 @@ public class ExternalShuffleBlockResolver {
|
|||
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
|
||||
shuffleIndexRecord.getOffset(),
|
||||
shuffleIndexRecord.getLength());
|
||||
} catch (CompletionException e) {
|
||||
} catch (ExecutionException e) {
|
||||
throw new RuntimeException("Failed to open file: " + indexFile, e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,22 +31,23 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache;
|
||||
import com.github.benmanes.caffeine.cache.Weigher;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import com.google.common.cache.Weigher;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.common.primitives.Longs;
|
||||
import org.roaringbitmap.RoaringBitmap;
|
||||
|
@ -121,10 +122,16 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
|
|||
NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner"));
|
||||
this.minChunkSize = conf.minChunkSizeInMergedShuffleFile();
|
||||
this.ioExceptionsThresholdDuringMerge = conf.ioExceptionsThresholdDuringMerge();
|
||||
indexCache = Caffeine.newBuilder()
|
||||
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
|
||||
new CacheLoader<File, ShuffleIndexInformation>() {
|
||||
public ShuffleIndexInformation load(File file) throws IOException {
|
||||
return new ShuffleIndexInformation(file);
|
||||
}
|
||||
};
|
||||
indexCache = CacheBuilder.newBuilder()
|
||||
.maximumWeight(conf.mergedIndexCacheSize())
|
||||
.weigher((Weigher<File, ShuffleIndexInformation>)(file, indexInfo) -> indexInfo.getSize())
|
||||
.build(ShuffleIndexInformation::new);
|
||||
.build(indexCacheLoader);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -315,7 +322,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
|
|||
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(chunkId);
|
||||
return new FileSegmentManagedBuffer(
|
||||
conf, dataFile, shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength());
|
||||
} catch (CompletionException e) {
|
||||
} catch (ExecutionException e) {
|
||||
throw new RuntimeException(String.format(
|
||||
"Failed to open merged shuffle index file %s", indexFile.getPath()), e);
|
||||
}
|
||||
|
|
|
@ -1,12 +0,0 @@
|
|||
================================================================================================
|
||||
Loading Cache
|
||||
================================================================================================
|
||||
|
||||
OpenJDK 64-Bit Server VM 11.0.8+10-LTS on Mac OS X 10.15.7
|
||||
Intel(R) Core(TM) i5-7360U CPU @ 2.30GHz
|
||||
Loading Cache: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
--------------------------------------------------------------------------------------------------------------------------
|
||||
Guava Cache 5 6 1 15.9 62.8 1.0X
|
||||
Caffeine 2 2 0 46.1 21.7 2.9X
|
||||
|
||||
|
|
@ -1,12 +0,0 @@
|
|||
================================================================================================
|
||||
Loading Cache
|
||||
================================================================================================
|
||||
|
||||
OpenJDK 64-Bit Server VM 1.8.0_232-b18 on Mac OS X 10.15.7
|
||||
Intel(R) Core(TM) i5-7360U CPU @ 2.30GHz
|
||||
Loading Cache: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
--------------------------------------------------------------------------------------------------------------------------
|
||||
Guava Cache 5 5 0 16.7 60.0 1.0X
|
||||
Caffeine 2 2 1 44.3 22.6 2.7X
|
||||
|
||||
|
|
@ -47,10 +47,6 @@
|
|||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.twitter</groupId>
|
||||
<artifactId>chill_${scala.binary.version}</artifactId>
|
||||
|
|
|
@ -26,7 +26,7 @@ import scala.collection.concurrent
|
|||
import scala.collection.mutable
|
||||
import scala.util.Properties
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.google.common.cache.CacheBuilder
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
|
@ -77,7 +77,7 @@ class SparkEnv (
|
|||
// A general, soft-reference map for metadata needed during HadoopRDD split computation
|
||||
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
|
||||
private[spark] val hadoopJobMetadata =
|
||||
Caffeine.newBuilder().softValues().build[String, AnyRef]().asMap()
|
||||
CacheBuilder.newBuilder().softValues().build[String, AnyRef]().asMap()
|
||||
|
||||
private[spark] var driverTmpDir: Option[String] = None
|
||||
|
||||
|
|
|
@ -18,14 +18,15 @@
|
|||
package org.apache.spark.deploy.history
|
||||
|
||||
import java.util.NoSuchElementException
|
||||
import java.util.concurrent.CompletionException
|
||||
import java.util.concurrent.ExecutionException
|
||||
import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse}
|
||||
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import com.codahale.metrics.{Counter, MetricRegistry, Timer}
|
||||
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache, RemovalCause, RemovalListener}
|
||||
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification}
|
||||
import com.google.common.util.concurrent.UncheckedExecutionException
|
||||
import org.eclipse.jetty.servlet.FilterHolder
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
|
@ -61,27 +62,21 @@ private[history] class ApplicationCache(
|
|||
|
||||
/**
|
||||
* Removal event notifies the provider to detach the UI.
|
||||
* @param key removal key
|
||||
* @param value removal value
|
||||
* @param cause the reason why a `CacheEntry` was removed, it should
|
||||
* always be `SIZE` because `appCache` configured with
|
||||
* `maximumSize` eviction strategy
|
||||
* @param rm removal notification
|
||||
*/
|
||||
override def onRemoval(key: CacheKey, value: CacheEntry, cause: RemovalCause): Unit = {
|
||||
override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): Unit = {
|
||||
metrics.evictionCount.inc()
|
||||
logDebug(s"Evicting entry $key")
|
||||
operations.detachSparkUI(key.appId, key.attemptId, value.loadedUI.ui)
|
||||
val key = rm.getKey
|
||||
logDebug(s"Evicting entry ${key}")
|
||||
operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().loadedUI.ui)
|
||||
}
|
||||
}
|
||||
|
||||
private val appCache: LoadingCache[CacheKey, CacheEntry] = {
|
||||
val builder = Caffeine.newBuilder()
|
||||
.maximumSize(retainedApplications)
|
||||
.removalListener(removalListener)
|
||||
// SPARK-34309: Use custom Executor to compatible with
|
||||
// the data eviction behavior of Guava cache
|
||||
.executor((command: Runnable) => command.run())
|
||||
builder.build[CacheKey, CacheEntry](appLoader)
|
||||
CacheBuilder.newBuilder()
|
||||
.maximumSize(retainedApplications)
|
||||
.removalListener(removalListener)
|
||||
.build(appLoader)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -91,9 +86,9 @@ private[history] class ApplicationCache(
|
|||
|
||||
def get(appId: String, attemptId: Option[String] = None): CacheEntry = {
|
||||
try {
|
||||
appCache.get(CacheKey(appId, attemptId))
|
||||
appCache.get(new CacheKey(appId, attemptId))
|
||||
} catch {
|
||||
case e @ (_: CompletionException | _: RuntimeException) =>
|
||||
case e @ (_: ExecutionException | _: UncheckedExecutionException) =>
|
||||
throw Option(e.getCause()).getOrElse(e)
|
||||
}
|
||||
}
|
||||
|
@ -132,7 +127,7 @@ private[history] class ApplicationCache(
|
|||
}
|
||||
|
||||
/** @return Number of cached UIs. */
|
||||
def size(): Long = appCache.estimatedSize()
|
||||
def size(): Long = appCache.size()
|
||||
|
||||
private def time[T](t: Timer)(f: => T): T = {
|
||||
val timeCtx = t.time()
|
||||
|
@ -202,7 +197,7 @@ private[history] class ApplicationCache(
|
|||
val sb = new StringBuilder(s"ApplicationCache(" +
|
||||
s" retainedApplications= $retainedApplications)")
|
||||
sb.append(s"; time= ${clock.getTimeMillis()}")
|
||||
sb.append(s"; entry count= ${appCache.estimatedSize()}\n")
|
||||
sb.append(s"; entry count= ${appCache.size()}\n")
|
||||
sb.append("----\n")
|
||||
appCache.asMap().asScala.foreach {
|
||||
case(key, entry) => sb.append(s" $key -> $entry\n")
|
||||
|
|
|
@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
|
|||
import scala.reflect.ClassTag
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
|
||||
import com.google.common.cache.{CacheBuilder, CacheLoader}
|
||||
import org.apache.hadoop.fs.Path
|
||||
|
||||
import org.apache.spark._
|
||||
|
@ -85,18 +85,16 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
|
|||
}
|
||||
|
||||
// Cache of preferred locations of checkpointed files.
|
||||
@transient private[spark] lazy val cachedPreferredLocations = {
|
||||
val builder = Caffeine.newBuilder()
|
||||
.expireAfterWrite(
|
||||
SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME).get,
|
||||
TimeUnit.MINUTES)
|
||||
val loader = new CacheLoader[Partition, Seq[String]]() {
|
||||
override def load(split: Partition): Seq[String] = {
|
||||
getPartitionBlockLocations(split)
|
||||
}
|
||||
}
|
||||
builder.build[Partition, Seq[String]](loader)
|
||||
}
|
||||
@transient private[spark] lazy val cachedPreferredLocations = CacheBuilder.newBuilder()
|
||||
.expireAfterWrite(
|
||||
SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME).get,
|
||||
TimeUnit.MINUTES)
|
||||
.build(
|
||||
new CacheLoader[Partition, Seq[String]]() {
|
||||
override def load(split: Partition): Seq[String] = {
|
||||
getPartitionBlockLocations(split)
|
||||
}
|
||||
})
|
||||
|
||||
// Returns the block locations of given partition on file system.
|
||||
private def getPartitionBlockLocations(split: Partition): Seq[String] = {
|
||||
|
|
|
@ -34,7 +34,7 @@ import scala.util.{Failure, Random, Success, Try}
|
|||
import scala.util.control.NonFatal
|
||||
|
||||
import com.codahale.metrics.{MetricRegistry, MetricSet}
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.google.common.cache.CacheBuilder
|
||||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.errors.SparkCoreErrors
|
||||
|
@ -123,7 +123,7 @@ private[spark] class HostLocalDirManager(
|
|||
blockStoreClient: BlockStoreClient) extends Logging {
|
||||
|
||||
private val executorIdToLocalDirsCache =
|
||||
Caffeine
|
||||
CacheBuilder
|
||||
.newBuilder()
|
||||
.maximumSize(cacheSize)
|
||||
.build[String, Array[String]]()
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.spark.storage
|
|||
|
||||
import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
|
||||
|
||||
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
|
||||
import com.google.common.cache.{CacheBuilder, CacheLoader}
|
||||
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
|
@ -136,14 +136,11 @@ private[spark] object BlockManagerId {
|
|||
* The max cache size is hardcoded to 10000, since the size of a BlockManagerId
|
||||
* object is about 48B, the total memory cost should be below 1MB which is feasible.
|
||||
*/
|
||||
val blockManagerIdCache = {
|
||||
Caffeine.newBuilder()
|
||||
.maximumSize(10000)
|
||||
.build[BlockManagerId, BlockManagerId](
|
||||
new CacheLoader[BlockManagerId, BlockManagerId]() {
|
||||
override def load(id: BlockManagerId): BlockManagerId = id
|
||||
})
|
||||
}
|
||||
val blockManagerIdCache = CacheBuilder.newBuilder()
|
||||
.maximumSize(10000)
|
||||
.build(new CacheLoader[BlockManagerId, BlockManagerId]() {
|
||||
override def load(id: BlockManagerId) = id
|
||||
})
|
||||
|
||||
def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
|
||||
blockManagerIdCache.get(id)
|
||||
|
|
|
@ -27,7 +27,7 @@ import scala.concurrent.{ExecutionContext, Future, TimeoutException}
|
|||
import scala.util.Random
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.google.common.cache.CacheBuilder
|
||||
|
||||
import org.apache.spark.{MapOutputTrackerMaster, SparkConf}
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
|
@ -56,7 +56,7 @@ class BlockManagerMasterEndpoint(
|
|||
|
||||
// Mapping from executor id to the block manager's local disk directories.
|
||||
private val executorIdToLocalDirs =
|
||||
Caffeine
|
||||
CacheBuilder
|
||||
.newBuilder()
|
||||
.maximumSize(conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE))
|
||||
.build[String, Array[String]]()
|
||||
|
|
|
@ -44,7 +44,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
|
|||
import scala.util.matching.Regex
|
||||
|
||||
import _root_.io.netty.channel.unix.Errors.NativeIoException
|
||||
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
|
||||
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
|
||||
import com.google.common.collect.Interners
|
||||
import com.google.common.io.{ByteStreams, Files => GFiles}
|
||||
import com.google.common.net.InetAddresses
|
||||
|
@ -1616,16 +1616,13 @@ private[spark] object Utils extends Logging {
|
|||
if (compressedLogFileLengthCache == null) {
|
||||
val compressedLogFileLengthCacheSize = sparkConf.get(
|
||||
UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF)
|
||||
compressedLogFileLengthCache = {
|
||||
val builder = Caffeine.newBuilder()
|
||||
.maximumSize(compressedLogFileLengthCacheSize)
|
||||
builder.build[String, java.lang.Long](
|
||||
new CacheLoader[String, java.lang.Long]() {
|
||||
override def load(path: String): java.lang.Long = {
|
||||
Utils.getCompressedFileLength(new File(path))
|
||||
}
|
||||
})
|
||||
}
|
||||
compressedLogFileLengthCache = CacheBuilder.newBuilder()
|
||||
.maximumSize(compressedLogFileLengthCacheSize)
|
||||
.build[String, java.lang.Long](new CacheLoader[String, java.lang.Long]() {
|
||||
override def load(path: String): java.lang.Long = {
|
||||
Utils.getCompressedFileLength(new File(path))
|
||||
}
|
||||
})
|
||||
}
|
||||
compressedLogFileLengthCache
|
||||
}
|
||||
|
|
|
@ -1,93 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark
|
||||
|
||||
import java.util.concurrent.Callable
|
||||
|
||||
import scala.concurrent.duration.Duration
|
||||
import scala.util.Random
|
||||
|
||||
import com.github.benmanes.caffeine.cache.{CacheLoader => CaffeineCacheLoader, Caffeine}
|
||||
import com.google.common.cache.{CacheBuilder, CacheLoader}
|
||||
|
||||
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
|
||||
import org.apache.spark.util.ThreadUtils
|
||||
|
||||
/**
|
||||
* Benchmark for Guava Cache vs Caffeine.
|
||||
* To run this benchmark:
|
||||
* {{{
|
||||
* 1. without sbt:
|
||||
* bin/spark-submit --class <this class> --jars <spark core test jar>
|
||||
* 2. build/sbt "core/test:runMain <this class>"
|
||||
* 3. generate result:
|
||||
* SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this class>"
|
||||
* Results will be written to "benchmarks/LocalCacheBenchmark-results.txt".
|
||||
* }}}
|
||||
*/
|
||||
object LocalCacheBenchmark extends BenchmarkBase {
|
||||
|
||||
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
|
||||
runBenchmark("Loading Cache") {
|
||||
val size = 10000
|
||||
val parallelism = 8
|
||||
val guavaCacheConcurrencyLevel = 8
|
||||
val dataset = (1 to parallelism)
|
||||
.map(_ => Random.shuffle(List.range(0, size)))
|
||||
.map(list => list.map(i => TestData(i)))
|
||||
val executor = ThreadUtils.newDaemonFixedThreadPool(parallelism, "Loading Cache Test Pool")
|
||||
val guavaCacheLoader = new CacheLoader[TestData, TestData]() {
|
||||
override def load(id: TestData): TestData = {
|
||||
id
|
||||
}
|
||||
}
|
||||
val caffeineCacheLoader = new CaffeineCacheLoader[TestData, TestData]() {
|
||||
override def load(id: TestData): TestData = {
|
||||
id
|
||||
}
|
||||
}
|
||||
|
||||
val benchmark = new Benchmark("Loading Cache", size * parallelism, 3, output = output)
|
||||
benchmark.addCase("Guava Cache") { _ =>
|
||||
val cache = CacheBuilder.newBuilder()
|
||||
.concurrencyLevel(guavaCacheConcurrencyLevel).build[TestData, TestData](guavaCacheLoader)
|
||||
dataset.map(dataList => executor.submit(new Callable[Unit] {
|
||||
override def call(): Unit = {
|
||||
dataList.foreach(key => cache.get(key))
|
||||
}
|
||||
})).foreach(future => ThreadUtils.awaitResult(future, Duration.Inf))
|
||||
cache.cleanUp()
|
||||
}
|
||||
|
||||
benchmark.addCase("Caffeine") { _ =>
|
||||
val cache = Caffeine.newBuilder().build[TestData, TestData](caffeineCacheLoader)
|
||||
dataset.map(dataList => executor.submit(new Callable[Unit] {
|
||||
override def call(): Unit = {
|
||||
dataList.foreach(key => cache.get(key))
|
||||
}
|
||||
})).foreach(future => ThreadUtils.awaitResult(future, Duration.Inf))
|
||||
cache.cleanUp()
|
||||
}
|
||||
|
||||
benchmark.run()
|
||||
}
|
||||
}
|
||||
|
||||
case class TestData(content: Int)
|
||||
}
|
||||
|
|
@ -29,7 +29,7 @@ import scala.collection.immutable
|
|||
import scala.collection.mutable.{ArrayBuffer, Map}
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
|
||||
import com.google.common.cache.{CacheBuilder, CacheLoader}
|
||||
import org.mockito.ArgumentCaptor
|
||||
import org.mockito.ArgumentMatchers.{any, eq => meq}
|
||||
import org.mockito.Mockito.{inOrder, verify, when}
|
||||
|
@ -467,9 +467,9 @@ class ExecutorSuite extends SparkFunSuite
|
|||
}
|
||||
}
|
||||
|
||||
def errorInCaffeine(e: => Throwable): Throwable = {
|
||||
val cache = Caffeine.newBuilder().build[String, String](
|
||||
new CacheLoader[String, String] {
|
||||
def errorInGuavaCache(e: => Throwable): Throwable = {
|
||||
val cache = CacheBuilder.newBuilder()
|
||||
.build(new CacheLoader[String, String] {
|
||||
override def load(key: String): String = throw e
|
||||
})
|
||||
intercept[Throwable] {
|
||||
|
@ -484,18 +484,18 @@ class ExecutorSuite extends SparkFunSuite
|
|||
import Executor.isFatalError
|
||||
// `e`'s depth is 1 so `depthToCheck` needs to be at least 3 to detect fatal errors.
|
||||
assert(isFatalError(e, depthToCheck) == (depthToCheck >= 1 && isFatal))
|
||||
assert(isFatalError(errorInCaffeine(e), depthToCheck) == (depthToCheck >= 1 && isFatal))
|
||||
// `e`'s depth is 2 so `depthToCheck` needs to be at least 3 to detect fatal errors.
|
||||
assert(isFatalError(errorInThreadPool(e), depthToCheck) == (depthToCheck >= 2 && isFatal))
|
||||
assert(isFatalError(errorInGuavaCache(e), depthToCheck) == (depthToCheck >= 2 && isFatal))
|
||||
assert(isFatalError(
|
||||
new SparkException("foo", e),
|
||||
depthToCheck) == (depthToCheck >= 2 && isFatal))
|
||||
assert(isFatalError(
|
||||
errorInThreadPool(errorInCaffeine(e)),
|
||||
depthToCheck) == (depthToCheck >= 2 && isFatal))
|
||||
// `e`'s depth is 3 so `depthToCheck` needs to be at least 3 to detect fatal errors.
|
||||
assert(isFatalError(
|
||||
errorInCaffeine(errorInThreadPool(e)),
|
||||
errorInThreadPool(errorInGuavaCache(e)),
|
||||
depthToCheck) == (depthToCheck >= 3 && isFatal))
|
||||
assert(isFatalError(
|
||||
errorInGuavaCache(errorInThreadPool(e)),
|
||||
depthToCheck) == (depthToCheck >= 3 && isFatal))
|
||||
assert(isFatalError(
|
||||
new SparkException("foo", new SparkException("foo", e)),
|
||||
|
|
|
@ -30,9 +30,7 @@ blas/2.2.0//blas-2.2.0.jar
|
|||
bonecp/0.8.0.RELEASE//bonecp-0.8.0.RELEASE.jar
|
||||
breeze-macros_2.12/1.2//breeze-macros_2.12-1.2.jar
|
||||
breeze_2.12/1.2//breeze_2.12-1.2.jar
|
||||
caffeine/2.9.2//caffeine-2.9.2.jar
|
||||
cats-kernel_2.12/2.1.1//cats-kernel_2.12-2.1.1.jar
|
||||
checker-qual/3.10.0//checker-qual-3.10.0.jar
|
||||
chill-java/0.10.0//chill-java-0.10.0.jar
|
||||
chill_2.12/0.10.0//chill_2.12-0.10.0.jar
|
||||
commons-beanutils/1.9.4//commons-beanutils-1.9.4.jar
|
||||
|
@ -64,7 +62,6 @@ datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar
|
|||
datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar
|
||||
derby/10.14.2.0//derby-10.14.2.0.jar
|
||||
dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar
|
||||
error_prone_annotations/2.5.1//error_prone_annotations-2.5.1.jar
|
||||
flatbuffers-java/1.9.0//flatbuffers-java-1.9.0.jar
|
||||
generex/1.0.2//generex-1.0.2.jar
|
||||
gson/2.2.4//gson-2.2.4.jar
|
||||
|
|
|
@ -25,9 +25,7 @@ blas/2.2.0//blas-2.2.0.jar
|
|||
bonecp/0.8.0.RELEASE//bonecp-0.8.0.RELEASE.jar
|
||||
breeze-macros_2.12/1.2//breeze-macros_2.12-1.2.jar
|
||||
breeze_2.12/1.2//breeze_2.12-1.2.jar
|
||||
caffeine/2.9.2//caffeine-2.9.2.jar
|
||||
cats-kernel_2.12/2.1.1//cats-kernel_2.12-2.1.1.jar
|
||||
checker-qual/3.10.0//checker-qual-3.10.0.jar
|
||||
chill-java/0.10.0//chill-java-0.10.0.jar
|
||||
chill_2.12/0.10.0//chill_2.12-0.10.0.jar
|
||||
commons-cli/1.2//commons-cli-1.2.jar
|
||||
|
@ -55,7 +53,6 @@ datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar
|
|||
datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar
|
||||
derby/10.14.2.0//derby-10.14.2.0.jar
|
||||
dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar
|
||||
error_prone_annotations/2.5.1//error_prone_annotations-2.5.1.jar
|
||||
flatbuffers-java/1.9.0//flatbuffers-java-1.9.0.jar
|
||||
generex/1.0.2//generex-1.0.2.jar
|
||||
gson/2.2.4//gson-2.2.4.jar
|
||||
|
|
6
pom.xml
6
pom.xml
|
@ -182,7 +182,6 @@
|
|||
<commons-pool2.version>2.6.2</commons-pool2.version>
|
||||
<datanucleus-core.version>4.1.17</datanucleus-core.version>
|
||||
<guava.version>14.0.1</guava.version>
|
||||
<caffeine.version>2.9.2</caffeine.version>
|
||||
<janino.version>3.0.16</janino.version>
|
||||
<jersey.version>2.34</jersey.version>
|
||||
<joda.version>2.10.10</joda.version>
|
||||
|
@ -494,11 +493,6 @@
|
|||
<version>${guava.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
<version>${caffeine.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jpmml</groupId>
|
||||
<artifactId>pmml-model</artifactId>
|
||||
|
|
|
@ -52,11 +52,6 @@
|
|||
<type>test-jar</type>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.fabric8</groupId>
|
||||
<artifactId>kubernetes-client</artifactId>
|
||||
|
|
|
@ -18,12 +18,11 @@ package org.apache.spark.scheduler.cluster.k8s
|
|||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.google.common.cache.CacheBuilder
|
||||
import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
|
||||
import io.fabric8.kubernetes.client.KubernetesClient
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable
|
||||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.deploy.k8s.Config._
|
||||
|
@ -48,7 +47,7 @@ private[spark] class ExecutorPodsLifecycleManager(
|
|||
// to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond
|
||||
// bounds.
|
||||
private lazy val removedExecutorsCache =
|
||||
Caffeine.newBuilder()
|
||||
CacheBuilder.newBuilder()
|
||||
.expireAfterWrite(3, TimeUnit.MINUTES)
|
||||
.build[java.lang.Long, java.lang.Long]()
|
||||
|
||||
|
|
|
@ -92,10 +92,6 @@
|
|||
<artifactId>scalacheck_${scala.binary.version}</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.janino</groupId>
|
||||
<artifactId>janino</artifactId>
|
||||
|
|
|
@ -26,7 +26,7 @@ import javax.annotation.concurrent.GuardedBy
|
|||
import scala.collection.mutable
|
||||
import scala.util.{Failure, Success, Try}
|
||||
|
||||
import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
|
||||
import com.google.common.cache.{Cache, CacheBuilder}
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.Path
|
||||
|
||||
|
@ -159,18 +159,19 @@ class SessionCatalog(
|
|||
}
|
||||
|
||||
private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
|
||||
var builder = Caffeine.newBuilder()
|
||||
var builder = CacheBuilder.newBuilder()
|
||||
.maximumSize(cacheSize)
|
||||
|
||||
if (cacheTTL > 0) {
|
||||
builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
|
||||
}
|
||||
builder.build()
|
||||
|
||||
builder.build[QualifiedTableName, LogicalPlan]()
|
||||
}
|
||||
|
||||
/** This method provides a way to get a cached plan. */
|
||||
def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]): LogicalPlan = {
|
||||
tableRelationCache.get(t, (_: QualifiedTableName) => c.call())
|
||||
tableRelationCache.get(t, c)
|
||||
}
|
||||
|
||||
/** This method provides a way to get a cached plan if the key exists. */
|
||||
|
|
|
@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.expressions
|
|||
|
||||
import java.util.IdentityHashMap
|
||||
|
||||
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
|
||||
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
|
||||
import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
|
||||
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
|
@ -38,17 +38,14 @@ class SubExprEvaluationRuntime(cacheMaxEntries: Int) {
|
|||
// won't be use by multi-threads so we don't need to consider concurrency here.
|
||||
private var proxyExpressionCurrentId = 0
|
||||
|
||||
private[sql] val cache: LoadingCache[ExpressionProxy, ResultProxy] =
|
||||
Caffeine.newBuilder().maximumSize(cacheMaxEntries)
|
||||
// SPARK-34309: Use custom Executor to compatible with
|
||||
// the data eviction behavior of Guava cache
|
||||
.executor((command: Runnable) => command.run())
|
||||
.build[ExpressionProxy, ResultProxy](
|
||||
new CacheLoader[ExpressionProxy, ResultProxy]() {
|
||||
override def load(expr: ExpressionProxy): ResultProxy = {
|
||||
ResultProxy(expr.proxyEval(currentInput))
|
||||
}
|
||||
})
|
||||
private[sql] val cache: LoadingCache[ExpressionProxy, ResultProxy] = CacheBuilder.newBuilder()
|
||||
.maximumSize(cacheMaxEntries)
|
||||
.build(
|
||||
new CacheLoader[ExpressionProxy, ResultProxy]() {
|
||||
override def load(expr: ExpressionProxy): ResultProxy = {
|
||||
ResultProxy(expr.proxyEval(currentInput))
|
||||
}
|
||||
})
|
||||
|
||||
private var currentInput: InternalRow = null
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ import scala.collection.mutable
|
|||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
|
||||
import com.google.common.cache.{CacheBuilder, CacheLoader}
|
||||
import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
|
||||
import org.codehaus.commons.compiler.CompileException
|
||||
import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, InternalCompilerException, SimpleCompiler}
|
||||
|
@ -1577,9 +1577,9 @@ object CodeGenerator extends Logging {
|
|||
* automatically, in order to constrain its memory footprint. Note that this cache does not use
|
||||
* weak keys/values and thus does not respond to memory pressure.
|
||||
*/
|
||||
private val cache = Caffeine.newBuilder()
|
||||
private val cache = CacheBuilder.newBuilder()
|
||||
.maximumSize(SQLConf.get.codegenCacheMaxEntries)
|
||||
.build[CodeAndComment, (GeneratedClass, ByteCodeStats)](
|
||||
.build(
|
||||
new CacheLoader[CodeAndComment, (GeneratedClass, ByteCodeStats)]() {
|
||||
override def load(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = {
|
||||
val startTime = System.nanoTime()
|
||||
|
|
|
@ -23,7 +23,7 @@ import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverSt
|
|||
import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
|
||||
import java.util.{Date, Locale}
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.google.common.cache.CacheBuilder
|
||||
|
||||
import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._
|
||||
import org.apache.spark.sql.errors.QueryExecutionErrors
|
||||
|
@ -194,7 +194,7 @@ trait DateTimeFormatterHelper {
|
|||
}
|
||||
|
||||
private object DateTimeFormatterHelper {
|
||||
val cache = Caffeine.newBuilder()
|
||||
val cache = CacheBuilder.newBuilder()
|
||||
.maximumSize(128)
|
||||
.build[(String, Locale, Boolean), DateTimeFormatter]()
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.spark.sql.catalyst.expressions
|
||||
|
||||
import java.util.concurrent.CompletionException
|
||||
import java.util.concurrent.ExecutionException
|
||||
|
||||
import org.apache.spark.SparkFunSuite
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
|
@ -83,7 +83,7 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT
|
|||
}
|
||||
|
||||
test("codegen failures in the CODEGEN_ONLY mode") {
|
||||
val errMsg = intercept[CompletionException] {
|
||||
val errMsg = intercept[ExecutionException] {
|
||||
val input = Seq(BoundReference(0, IntegerType, nullable = true))
|
||||
withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
|
||||
FailedCodegenProjection.createObject(input)
|
||||
|
|
|
@ -23,47 +23,47 @@ class SubExprEvaluationRuntimeSuite extends SparkFunSuite {
|
|||
test("Evaluate ExpressionProxy should create cached result") {
|
||||
val runtime = new SubExprEvaluationRuntime(1)
|
||||
val proxy = ExpressionProxy(Literal(1), 0, runtime)
|
||||
assert(runtime.cache.estimatedSize() == 0)
|
||||
assert(runtime.cache.size() == 0)
|
||||
proxy.eval()
|
||||
assert(runtime.cache.estimatedSize() == 1)
|
||||
assert(runtime.cache.size() == 1)
|
||||
assert(runtime.cache.get(proxy) == ResultProxy(1))
|
||||
}
|
||||
|
||||
test("SubExprEvaluationRuntime cannot exceed configured max entries") {
|
||||
val runtime = new SubExprEvaluationRuntime(2)
|
||||
assert(runtime.cache.estimatedSize() == 0)
|
||||
assert(runtime.cache.size() == 0)
|
||||
|
||||
val proxy1 = ExpressionProxy(Literal(1), 0, runtime)
|
||||
proxy1.eval()
|
||||
assert(runtime.cache.estimatedSize() == 1)
|
||||
assert(runtime.cache.size() == 1)
|
||||
assert(runtime.cache.get(proxy1) == ResultProxy(1))
|
||||
|
||||
val proxy2 = ExpressionProxy(Literal(2), 1, runtime)
|
||||
proxy2.eval()
|
||||
assert(runtime.cache.estimatedSize() == 2)
|
||||
assert(runtime.cache.size() == 2)
|
||||
assert(runtime.cache.get(proxy2) == ResultProxy(2))
|
||||
|
||||
val proxy3 = ExpressionProxy(Literal(3), 2, runtime)
|
||||
proxy3.eval()
|
||||
assert(runtime.cache.estimatedSize() == 2)
|
||||
assert(runtime.cache.size() == 2)
|
||||
assert(runtime.cache.get(proxy3) == ResultProxy(3))
|
||||
}
|
||||
|
||||
test("setInput should empty cached result") {
|
||||
val runtime = new SubExprEvaluationRuntime(2)
|
||||
val proxy1 = ExpressionProxy(Literal(1), 0, runtime)
|
||||
assert(runtime.cache.estimatedSize() == 0)
|
||||
assert(runtime.cache.size() == 0)
|
||||
proxy1.eval()
|
||||
assert(runtime.cache.estimatedSize() == 1)
|
||||
assert(runtime.cache.size() == 1)
|
||||
assert(runtime.cache.get(proxy1) == ResultProxy(1))
|
||||
|
||||
val proxy2 = ExpressionProxy(Literal(2), 1, runtime)
|
||||
proxy2.eval()
|
||||
assert(runtime.cache.estimatedSize() == 2)
|
||||
assert(runtime.cache.size() == 2)
|
||||
assert(runtime.cache.get(proxy2) == ResultProxy(2))
|
||||
|
||||
runtime.setInput()
|
||||
assert(runtime.cache.estimatedSize() == 0)
|
||||
assert(runtime.cache.size() == 0)
|
||||
}
|
||||
|
||||
test("Wrap ExpressionProxy on subexpressions") {
|
||||
|
|
|
@ -89,10 +89,7 @@
|
|||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.orc</groupId>
|
||||
<artifactId>orc-core</artifactId>
|
||||
|
|
|
@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
|
|||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import com.github.benmanes.caffeine.cache.{Cache, Caffeine, RemovalCause, RemovalListener, Weigher}
|
||||
import com.google.common.cache._
|
||||
import org.apache.hadoop.fs.{FileStatus, Path}
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
|
@ -119,10 +119,11 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends
|
|||
}
|
||||
}
|
||||
}
|
||||
val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]] {
|
||||
override def onRemoval(key: (ClientId, Path), value: Array[FileStatus],
|
||||
cause: RemovalCause): Unit = {
|
||||
if (cause == RemovalCause.SIZE &&
|
||||
val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
|
||||
override def onRemoval(
|
||||
removed: RemovalNotification[(ClientId, Path),
|
||||
Array[FileStatus]]): Unit = {
|
||||
if (removed.getCause == RemovalCause.SIZE &&
|
||||
warnedAboutEviction.compareAndSet(false, true)) {
|
||||
logWarning(
|
||||
"Evicting cached table partition metadata from memory due to size constraints " +
|
||||
|
@ -132,7 +133,7 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends
|
|||
}
|
||||
}
|
||||
|
||||
var builder = Caffeine.newBuilder()
|
||||
var builder = CacheBuilder.newBuilder()
|
||||
.weigher(weigher)
|
||||
.removalListener(removalListener)
|
||||
.maximumWeight(maxSizeInBytes / weightScale)
|
||||
|
|
|
@ -22,7 +22,7 @@ import java.util.{Arrays, Locale}
|
|||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
|
||||
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
|
||||
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.scheduler.AccumulableInfo
|
||||
|
@ -97,7 +97,7 @@ object SQLMetrics {
|
|||
val cachedSQLAccumIdentifier = Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)
|
||||
|
||||
private val metricsCache: LoadingCache[String, Option[String]] =
|
||||
Caffeine.newBuilder().maximumSize(10000)
|
||||
CacheBuilder.newBuilder().maximumSize(10000)
|
||||
.build(new CacheLoader[String, Option[String]] {
|
||||
override def load(name: String): Option[String] = {
|
||||
Option(name)
|
||||
|
|
Loading…
Reference in a new issue