[SPARK-32350][CORE] Add batch-write on LevelDB to improve performance of HybridStore

### What changes were proposed in this pull request?
The idea is to improve the performance of HybridStore by adding batch write support to LevelDB. #28412  introduces HybridStore. HybridStore will write data to InMemoryStore at first and use a background thread to dump data to LevelDB once the writing to InMemoryStore is completed. In the comments section of #28412 , mridulm mentioned using batch writing can improve the performance of this dumping process and he wrote the code of writeAll().

### Why are the changes needed?
I did the comparison of the HybridStore switching time between one-by-one write and batch write on an HDD disk. When the disk is free, the batch-write has around 25% improvement, and when the disk is 100% busy, the batch-write has 7x - 10x improvement.

when the disk is at 0% utilization:
| log size, jobs and tasks per job   | original switching time, with write() | switching time with writeAll() |
| ---------------------------------- | ------------------------------------- | ------------------------------ |
| 133m, 400 jobs, 100 tasks per job  | 16s                                   | 13s                            |
| 265m, 400 jobs, 200 tasks per job  | 30s                                   | 23s                            |
| 1.3g, 1000 jobs, 400 tasks per job | 136s                                  | 108s                           |

when the disk is at 100% utilization:
| log size, jobs and tasks per job  | original switching time, with write() | switching time with writeAll() |
| --------------------------------- | ------------------------------------- | ------------------------------ |
| 133m, 400 jobs, 100 tasks per job | 116s                                  | 17s                            |
| 265m, 400 jobs, 200 tasks per job | 251s                                  | 26s                            |

I also ran some write related benchmarking tests on LevelDBBenchmark.java and measured the total time of writing 1024 objects. The tests were conducted when the disk is at 0% utilization.

| Benchmark test           | with write(), ms | with writeAll(), ms |
| ------------------------ | ---------------- | ------------------- |
| randomUpdatesIndexed     | 213.06           | 157.356             |
| randomUpdatesNoIndex     | 57.869           | 35.439              |
| randomWritesIndexed      | 298.854          | 229.274             |
| randomWritesNoIndex      | 66.764           | 38.361              |
| sequentialUpdatesIndexed | 87.019           | 56.219              |
| sequentialUpdatesNoIndex | 61.851           | 41.942              |
| sequentialWritesIndexed  | 94.044           | 56.534              |
| sequentialWritesNoIndex  | 118.345          | 66.483              |

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually tested.

Closes #29149 from baohe-zhang/SPARK-32350.

Authored-by: Baohe Zhang <baohe.zhang@verizonmedia.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
This commit is contained in:
Baohe Zhang 2020-07-22 13:27:34 +09:00 committed by Jungtaek Lim (HeartSaVioR)
parent 39181ff209
commit 7b9d7551a6
2 changed files with 67 additions and 17 deletions

View file

@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.annotations.VisibleForTesting;
@ -153,24 +154,72 @@ public class LevelDB implements KVStore {
try (WriteBatch batch = db().createWriteBatch()) {
byte[] data = serializer.serialize(value);
synchronized (ti) {
Object existing;
try {
existing = get(ti.naturalIndex().entityKey(null, value), value.getClass());
} catch (NoSuchElementException e) {
existing = null;
}
PrefixCache cache = new PrefixCache(value);
byte[] naturalKey = ti.naturalIndex().toKey(ti.naturalIndex().getValue(value));
for (LevelDBTypeInfo.Index idx : ti.indices()) {
byte[] prefix = cache.getPrefix(idx);
idx.add(batch, value, existing, data, naturalKey, prefix);
}
updateBatch(batch, value, data, value.getClass(), ti.naturalIndex(), ti.indices());
db().write(batch);
}
}
}
public void writeAll(List<?> values) throws Exception {
Preconditions.checkArgument(values != null && !values.isEmpty(),
"Non-empty values required.");
// Group by class, in case there are values from different classes in the values
// Typical usecase is for this to be a single class.
// A NullPointerException will be thrown if values contain null object.
for (Map.Entry<? extends Class<?>, ? extends List<?>> entry :
values.stream().collect(Collectors.groupingBy(Object::getClass)).entrySet()) {
final Iterator<?> valueIter = entry.getValue().iterator();
final Iterator<byte[]> serializedValueIter;
// Deserialize outside synchronized block
List<byte[]> list = new ArrayList<>(entry.getValue().size());
for (Object value : values) {
list.add(serializer.serialize(value));
}
serializedValueIter = list.iterator();
final Class<?> klass = entry.getKey();
final LevelDBTypeInfo ti = getTypeInfo(klass);
synchronized (ti) {
final LevelDBTypeInfo.Index naturalIndex = ti.naturalIndex();
final Collection<LevelDBTypeInfo.Index> indices = ti.indices();
try (WriteBatch batch = db().createWriteBatch()) {
while (valueIter.hasNext()) {
updateBatch(batch, valueIter.next(), serializedValueIter.next(), klass,
naturalIndex, indices);
}
db().write(batch);
}
}
}
}
private void updateBatch(
WriteBatch batch,
Object value,
byte[] data,
Class<?> klass,
LevelDBTypeInfo.Index naturalIndex,
Collection<LevelDBTypeInfo.Index> indices) throws Exception {
Object existing;
try {
existing = get(naturalIndex.entityKey(null, value), klass);
} catch (NoSuchElementException e) {
existing = null;
}
PrefixCache cache = new PrefixCache(value);
byte[] naturalKey = naturalIndex.toKey(naturalIndex.getValue(value));
for (LevelDBTypeInfo.Index idx : indices) {
byte[] prefix = cache.getPrefix(idx);
idx.add(batch, value, existing, data, naturalKey, prefix);
}
}
@Override
public void delete(Class<?> type, Object naturalKey) throws Exception {
Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");

View file

@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
import com.google.common.collect.Lists;
import org.apache.spark.util.kvstore._
/**
@ -144,10 +146,9 @@ private[history] class HybridStore extends KVStore {
backgroundThread = new Thread(() => {
try {
for (klass <- klassMap.keys().asScala) {
val it = inMemoryStore.view(klass).closeableIterator()
while (it.hasNext()) {
levelDB.write(it.next())
}
val values = Lists.newArrayList(
inMemoryStore.view(klass).closeableIterator())
levelDB.writeAll(values)
}
listener.onSwitchToLevelDBSuccess()
shouldUseInMemoryStore.set(false)