spark-instrumented-optimizer/dev/test-dependencies.sh
Prakhar Jain c560428fe0 [SPARK-20732][CORE] Decommission cache blocks to other executors when an executor is decommissioned
### What changes were proposed in this pull request?
After changes in SPARK-20628, CoarseGrainedSchedulerBackend can decommission an executor and stop assigning new tasks on it. We should also decommission the corresponding blockmanagers in the same way. i.e. Move the cached RDD blocks from those executors to other active executors.

### Why are the changes needed?
We need to gracefully decommission the block managers so that the underlying RDD cache blocks are not lost in case the executors are taken away forcefully after some timeout (because of spotloss/pre-emptible VM etc). Its good to save as much cache data as possible.

Also In future once the decommissioning signal comes from Cluster Manager (say YARN/Mesos etc), dynamic allocation + this change gives us opportunity to downscale the executors faster by making the executors free of cache data.

Note that this is a best effort approach. We try to move cache blocks from decommissioning executors to active executors. If the active executors don't have free resources available on them for caching, then the decommissioning executors will keep the cache block which it was not able to move and it will still be able to serve them.

Current overall Flow:

1. CoarseGrainedSchedulerBackend receives a signal to decommissionExecutor. On receiving the signal, it do 2 things - Stop assigning new tasks (SPARK-20628), Send another message to BlockManagerMasterEndpoint (via BlockManagerMaster) to decommission the corresponding BlockManager.

2. BlockManagerMasterEndpoint receives "DecommissionBlockManagers" message. On receiving this, it moves the corresponding block managers to "decommissioning" state. All decommissioning BMs are excluded from the getPeers RPC call which is used for replication. All these decommissioning BMs are also sent message from BlockManagerMasterEndpoint to start decommissioning process on themselves.

3. BlockManager on worker (say BM-x) receives the "DecommissionBlockManager" message. Now it will start BlockManagerDecommissionManager thread to offload all the RDD cached blocks. This thread can make multiple reattempts to decommission the existing cache blocks (multiple reattempts might be needed as there might not be sufficient space in other active BMs initially).

### Does this PR introduce any user-facing change?
NO

### How was this patch tested?
Added UTs.

Closes #28370 from prakharjain09/SPARK-20732-rddcache-1.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
2020-05-18 11:37:53 -07:00

137 lines
5.2 KiB
Bash
Executable file

#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
set -ex
FWDIR="$(cd "`dirname $0`"/..; pwd)"
cd "$FWDIR"
# Explicitly set locale in order to make `sort` output consistent across machines.
# See https://stackoverflow.com/questions/28881 for more details.
export LC_ALL=C
# TODO: This would be much nicer to do in SBT, once SBT supports Maven-style resolution.
# NOTE: These should match those in the release publishing script
HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkubernetes -Pyarn -Phive"
MVN="build/mvn"
HADOOP_HIVE_PROFILES=(
hadoop-2.7-hive-1.2
hadoop-2.7-hive-2.3
hadoop-3.2-hive-2.3
)
# We'll switch the version to a temp. one, publish POMs using that new version, then switch back to
# the old version. We need to do this because the `dependency:build-classpath` task needs to
# resolve Spark's internal submodule dependencies.
# From http://stackoverflow.com/a/26514030
set +e
OLD_VERSION=$($MVN -q \
-Dexec.executable="echo" \
-Dexec.args='${project.version}' \
--non-recursive \
org.codehaus.mojo:exec-maven-plugin:1.6.0:exec | grep -E '[0-9]+\.[0-9]+\.[0-9]+')
if [ $? != 0 ]; then
echo -e "Error while getting version string from Maven:\n$OLD_VERSION"
exit 1
fi
set -e
TEMP_VERSION="spark-$(python -S -c "import random; print(random.randrange(100000, 999999))")"
function reset_version {
# Delete the temporary POMs that we wrote to the local Maven repo:
find "$HOME/.m2/" | grep "$TEMP_VERSION" | xargs rm -rf
# Restore the original version number:
$MVN -q versions:set -DnewVersion=$OLD_VERSION -DgenerateBackupPoms=false > /dev/null
}
trap reset_version EXIT
$MVN -q versions:set -DnewVersion=$TEMP_VERSION -DgenerateBackupPoms=false > /dev/null
# Generate manifests for each Hadoop profile:
for HADOOP_HIVE_PROFILE in "${HADOOP_HIVE_PROFILES[@]}"; do
if [[ $HADOOP_HIVE_PROFILE == **hadoop-3.2-hive-2.3** ]]; then
HADOOP_PROFILE=hadoop-3.2
HIVE_PROFILE=hive-2.3
elif [[ $HADOOP_HIVE_PROFILE == **hadoop-2.7-hive-2.3** ]]; then
HADOOP_PROFILE=hadoop-2.7
HIVE_PROFILE=hive-2.3
else
HADOOP_PROFILE=hadoop-2.7
HIVE_PROFILE=hive-1.2
fi
echo "Performing Maven install for $HADOOP_HIVE_PROFILE"
$MVN $HADOOP2_MODULE_PROFILES -P$HADOOP_PROFILE -P$HIVE_PROFILE jar:jar jar:test-jar install:install clean -q
echo "Performing Maven validate for $HADOOP_HIVE_PROFILE"
$MVN $HADOOP2_MODULE_PROFILES -P$HADOOP_PROFILE -P$HIVE_PROFILE validate -q
echo "Generating dependency manifest for $HADOOP_HIVE_PROFILE"
mkdir -p dev/pr-deps
$MVN $HADOOP2_MODULE_PROFILES -P$HADOOP_PROFILE -P$HIVE_PROFILE dependency:build-classpath -pl assembly -am \
| grep "Dependencies classpath:" -A 1 \
| tail -n 1 | tr ":" "\n" | awk -F '/' '{
# For each dependency classpath, we fetch the last three parts split by "/": artifact id, version, and jar name.
# Since classifier, if exists, always sits between "artifact_id-version-" and ".jar" suffix in the jar name,
# we extract classifier and put it right before the jar name explicitly.
# For example, `orc-core/1.5.5/nohive/orc-core-1.5.5-nohive.jar`
# ^^^^^^
# extracted classifier
# `okio/1.15.0//okio-1.15.0.jar`
# ^
# empty for dependencies without classifier
artifact_id=$(NF-2);
version=$(NF-1);
jar_name=$NF;
classifier_start_index=length(artifact_id"-"version"-") + 1;
classifier_end_index=index(jar_name, ".jar") - 1;
classifier=substr(jar_name, classifier_start_index, classifier_end_index - classifier_start_index + 1);
print artifact_id"/"version"/"classifier"/"jar_name
}' | sort | grep -v spark > dev/pr-deps/spark-deps-$HADOOP_HIVE_PROFILE
done
if [[ $@ == **replace-manifest** ]]; then
echo "Replacing manifests and creating new files at dev/deps"
rm -rf dev/deps
mv dev/pr-deps dev/deps
exit 0
fi
for HADOOP_HIVE_PROFILE in "${HADOOP_HIVE_PROFILES[@]}"; do
set +e
dep_diff="$(
git diff \
--no-index \
dev/deps/spark-deps-$HADOOP_HIVE_PROFILE \
dev/pr-deps/spark-deps-$HADOOP_HIVE_PROFILE \
)"
set -e
if [ "$dep_diff" != "" ]; then
echo "Spark's published dependencies DO NOT MATCH the manifest file (dev/spark-deps)."
echo "To update the manifest file, run './dev/test-dependencies.sh --replace-manifest'."
echo "$dep_diff"
rm -rf dev/pr-deps
exit 1
fi
done
exit 0