Skip to content

Commit c0d7657

Browse files
committed
refactored some common code ; including dask-specific templates in this PR
1 parent 7b767be commit c0d7657

File tree

3 files changed

+643
-21
lines changed

3 files changed

+643
-21
lines changed

dask/dask.sh

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,20 @@ function set_proxy(){
348348
export NO_PROXY="${no_proxy}"
349349
}
350350

351+
function is_ramdisk() {
352+
if [[ "${1:-}" == "-f" ]] ; then unset IS_RAMDISK ; fi
353+
if ( test -v IS_RAMDISK && "${IS_RAMDISK}" == "true" ) ; then return 0
354+
elif ( test -v IS_RAMDISK && "${IS_RAMDISK}" == "false" ) ; then return 1 ; fi
355+
356+
if ( test -d /mnt/shm && grep -q /mnt/shm /proc/mounts ) ; then
357+
IS_RAMDISK="true"
358+
return 0
359+
else
360+
IS_RAMDISK="false"
361+
return 1
362+
fi
363+
}
364+
351365
function mount_ramdisk(){
352366
local free_mem
353367
free_mem="$(awk '/^MemFree/ {print $2}' /proc/meminfo)"
@@ -356,25 +370,19 @@ function mount_ramdisk(){
356370
# Write to a ramdisk instead of churning the persistent disk
357371

358372
tmpdir="/mnt/shm"
359-
mkdir -p "${tmpdir}"
373+
mkdir -p "${tmpdir}/pkgs_dirs"
360374
mount -t tmpfs tmpfs "${tmpdir}"
361375

362376
# Download conda packages to tmpfs
363-
/opt/conda/miniconda3/bin/conda config --add pkgs_dirs "${tmpdir}"
364-
365-
# Clear pip cache
366-
# TODO: make this conditional on which OSs have pip without cache purge
367-
pip cache purge || echo "unable to purge pip cache"
368-
369-
# Download pip packages to tmpfs
370-
pip config set global.cache-dir "${tmpdir}" || echo "unable to set global.cache-dir"
377+
/opt/conda/miniconda3/bin/conda config --add pkgs_dirs "${tmpdir}/pkgs_dirs"
371378

372379
# Download OS packages to tmpfs
373380
if is_debuntu ; then
374381
mount -t tmpfs tmpfs /var/cache/apt/archives
375382
else
376383
mount -t tmpfs tmpfs /var/cache/dnf
377384
fi
385+
is_ramdisk -f
378386
}
379387

380388
function check_os() {
@@ -582,6 +590,21 @@ function install_dependencies() {
582590
touch "${workdir}/complete/install-dependencies"
583591
}
584592

593+
function prepare_pip_env() {
594+
# Clear pip cache
595+
# TODO: make this conditional on which OSs have pip without cache purge
596+
test -d "${tmpdir}/python-venv" || python3 -m venv "${tmpdir}/python-venv"
597+
source "${tmpdir}/python-venv/bin/activate"
598+
599+
pip cache purge || echo "unable to purge pip cache"
600+
if is_ramdisk ; then
601+
# Download pip packages to tmpfs
602+
mkdir -p "${tmpdir}/cache-dir"
603+
pip config set global.cache-dir "${tmpdir}/cache-dir" || echo "unable to set global.cache-dir"
604+
fi
605+
}
606+
607+
585608
function prepare_common_env() {
586609
define_os_comparison_functions
587610

@@ -619,8 +642,6 @@ function prepare_common_env() {
619642

620643
# Knox config
621644
readonly KNOX_HOME=/usr/lib/knox
622-
readonly KNOX_DASK_DIR="${KNOX_HOME}/data/services/dask/0.1.0"
623-
readonly KNOX_DASKWS_DIR="${KNOX_HOME}/data/services/daskws/0.1.0"
624645

625646
mkdir -p "${workdir}/complete"
626647
set_proxy
@@ -665,13 +686,17 @@ function prepare_common_env() {
665686
touch "${workdir}/complete/prepare.common"
666687
}
667688

689+
function pip_exit_handler() {
690+
if is_ramdisk ; then
691+
# remove the tmpfs pip cache-dir
692+
pip config unset global.cache-dir || echo "unable to unset global pip cache"
693+
fi
694+
}
695+
668696
function common_exit_handler() {
669697
set +ex
670698
echo "Exit handler invoked"
671699

672-
# Clear pip cache
673-
pip cache purge || echo "unable to purge pip cache"
674-
675700
# Restart YARN services if they are running already
676701
for svc in resourcemanager nodemanager; do
677702
if [[ "$(systemctl show hadoop-yarn-${svc}.service -p SubState --value)" == 'running' ]]; then
@@ -682,9 +707,6 @@ function common_exit_handler() {
682707

683708
# If system memory was sufficient to mount memory-backed filesystems
684709
if [[ "${tmpdir}" == "/mnt/shm" ]] ; then
685-
# remove the tmpfs pip cache-dir
686-
pip config unset global.cache-dir || echo "unable to unset global pip cache"
687-
688710
# Clean up shared memory mounts
689711
for shmdir in /var/cache/apt/archives /var/cache/dnf /mnt/shm /tmp ; do
690712
if ( grep -q "^tmpfs ${shmdir}" /proc/mounts && ! grep -q "^tmpfs ${shmdir}" /etc/fstab ) ; then
@@ -707,6 +729,7 @@ function common_exit_handler() {
707729
dnf clean all
708730
fi
709731

732+
# When creating image, print disk usage statistics, zero unused disk space
710733
if [[ -n "$(get_metadata_attribute creating-image)" ]]; then
711734
# print disk usage statistics for large components
712735
if is_ubuntu ; then
@@ -748,11 +771,12 @@ function common_exit_handler() {
748771
'@siz=( sort { $a => $b }
749772
map { (split)[2] =~ /^(\d+)/ }
750773
grep { m:^/: } <STDIN> );
751-
$max=$siz[0]; $min=$siz[-1]; $inc=$max-$min;
774+
$max=$siz[0]; $min=$siz[-1]; $starting="unknown"; $inc=q{$max-$starting};
752775
print( " samples-taken: ", scalar @siz, $/,
753-
"maximum-disk-used: $max", $/,
754-
"minimum-disk-used: $min", $/,
755-
" increased-by: $inc", $/ )' < "/run/disk-usage.log"
776+
"starting-disk-used: $starting", $/,
777+
"maximum-disk-used: $max", $/,
778+
"minimum-disk-used: $min", $/,
779+
" increased-by: $inc", $/ )' < "/run/disk-usage.log"
756780

757781

758782
# zero free disk space
@@ -1273,6 +1297,9 @@ function prepare_dask_env() {
12731297
readonly DASK_WORKER_SERVICE=dask-worker
12741298
readonly DASK_SCHEDULER_SERVICE=dask-scheduler
12751299
readonly DASK_CONDA_ENV="/opt/conda/miniconda3/envs/${conda_env}"
1300+
# Knox dask config
1301+
readonly KNOX_DASK_DIR="${KNOX_HOME}/data/services/dask/0.1.0"
1302+
readonly KNOX_DASKWS_DIR="${KNOX_HOME}/data/services/daskws/0.1.0"
12761303
}
12771304

12781305
function prepare_dask_rapids_env(){
@@ -1323,12 +1350,14 @@ function main() {
13231350
}
13241351

13251352
function exit_handler() {
1353+
pip_exit_handler
13261354
common_exit_handler
13271355
return 0
13281356
}
13291357

13301358
function prepare_to_install(){
13311359
prepare_common_env
1360+
prepare_pip_env
13321361
conda_env="$(get_metadata_attribute conda-env 'dask')"
13331362
readonly conda_env
13341363
prepare_dask_env

templates/dask/dask.sh.in

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#!/bin/bash
2+
#
3+
[% INSERT legal/license_header %]
4+
#
5+
[% PROCESS common/template_disclaimer %]
6+
#
7+
# This initialization action script will install Dask and other relevant
8+
# libraries on a Dataproc cluster. This is supported for either "yarn" or
9+
# "standalone" runtimes Please see dask.org and yarn.dask.org for more
10+
# information.
11+
12+
set -euxo pipefail
13+
14+
[% INSERT common/util_functions %]
15+
16+
[% INSERT dask/util_functions %]
17+
18+
function main() {
19+
# Install Dask
20+
install_dask
21+
22+
# In "standalone" mode, Dask relies on a systemd unit to launch.
23+
# In "yarn" mode, it relies on a config.yaml file.
24+
if [[ "${DASK_RUNTIME}" == "yarn" ]]; then
25+
# Create Dask YARN config file
26+
configure_dask_yarn
27+
elif [[ "${DASK_RUNTIME}" == "standalone" ]]; then
28+
# Create Dask service
29+
install_systemd_dask_service
30+
start_systemd_dask_service
31+
32+
configure_knox_for_dask
33+
34+
local DASK_CLOUD_LOGGING="$(get_metadata_attribute dask-cloud-logging || echo 'false')"
35+
if [[ "${DASK_CLOUD_LOGGING}" == "true" ]]; then
36+
configure_fluentd_for_dask
37+
fi
38+
else
39+
echo "Unsupported Dask Runtime: ${DASK_RUNTIME}"
40+
exit 1
41+
fi
42+
43+
echo "Dask for ${DASK_RUNTIME} successfully initialized."
44+
}
45+
46+
function exit_handler() {
47+
pip_exit_handler
48+
common_exit_handler
49+
return 0
50+
}
51+
52+
function prepare_to_install(){
53+
prepare_common_env
54+
prepare_pip_env
55+
conda_env="$(get_metadata_attribute conda-env 'dask')"
56+
readonly conda_env
57+
prepare_dask_env
58+
trap exit_handler EXIT
59+
}
60+
61+
prepare_to_install
62+
63+
main

0 commit comments

Comments
 (0)