Commit 5941375a authored by William Ellis Warriner's avatar William Ellis Warriner
Browse files

documentation

parent bd2ab20e
# DMTCP Checkpointing Tutorial
Tutorial code for checkpointing SLURM jobs using DMTCP on Cheaha.
\ No newline at end of file
Tutorial code for checkpointing SLURM jobs using DMTCP on Cheaha.
## Workflow Explanation
DMTCP is a state-based checkpointing system intended for single node tasks (MPI checkpointing is possible but in development, see: https://github.com/mpickpt/mana). A very simple explanation of its operation is that it records the state of the job (i.e. in-memory representation) to disk at a user-specified interval or on request to a coordinator. DMTCP waits until it can "break in" to the job's execution, pauses it, records its state, and then allows the job to resume.
The sample task contained here is a simple for-loop over a function which sleep for a random time and returns a random value. These values are formatted and printed to `stdout`. The loop is run 100 times for a median wait time of 0.5 seconds per iteration. Checkpoints are made every 5 seconds.
For more details on the specifics of each component of the execution, please see comments in each code file.
## Usage
Use the command `sbatch job.sh` to run the job. When the job is complete, try running `sbatch restart_job.sh`. You should see ` job.log` and `job-restart.log` files. Compare the last few lines of `job.log` and the contents of `job-restart.log`. They should be identical because the pseudo-random number generator state has been saved to disk as part of the checkpoint, leading to the precise repetition of the last part of the task.
......@@ -8,7 +8,19 @@
#SBATCH --output=%x.log
#SBATCH --error=%x.log
# Example of how to run a single-node DMTCP checkpointed task in SLURM on
# Cheaha.
# Load necessary modules for this example.
module load DMTCP/2.5.0
module load Anaconda3
module load Anaconda3/2020.11
# To run this you will need to build the environment from env.yml. To do so
# please run: `conda env create --file env.yml`
conda activate dmtcp-tutorial
dmtcp_launch -i 5 "python -u task.py 100" # -u means unbuffered, we get logging in real-time
# Launches a dmtcp checkpointed task with example computations. The flag -i
# accepts a checkpoint interval in seconds. The python flag -u requests
# unbuffered streams, so logging is real-time. The value 100 is the number of
# steps to run in the loop in `task.py`.
dmtcp_launch -i 5 "python -u task.py 100"
......@@ -8,8 +8,22 @@
#SBATCH --output=%x.log
#SBATCH --error=%x.log
export DMTCP_COORD_HOST=localhost
# Example of how to restart a DMTCP checkpointed task in SLURM on Cheaha.
# Load necessary modules for this example.
module load DMTCP/2.5.0
module load Anaconda3
# Activate the environment.
conda activate dmtcp-tutorial
# This export is required because, by default, DMTCP records the actual hostnome
# of the node it runs on. On Cheaha, this will be something like `c0038`, and it
# isn't guaranteed you will be able to get back on that same node. Setting the
# DMTCP host to `localhost` ensures the script will run on whatever node the
# restart job ends up on.
export DMTCP_COORD_HOST=localhost
# The script below is created automatically as part of the checkpointing
# process.
./dmtcp_restart_script.sh
from typing import Dict
"""
task.py
Written by William Warriner 2021
wwarr@uab.edu
Contains example process code which we want to checkpoint. The general workflow
is to loop through a sample computation which sleeps a random amount of time and
returns a random float. Both values are returned in a dict and printed to
stdout. This process is repeated
"""
import argparse
import random
import time
from mpi4py import MPI
import sys
import time
from typing import Dict
VALUE = "value"
WAIT = "wait"
SLEEP_TIME = "sleep_time"
def print_step(current_step: int, data: Dict[str, float]):
print(f"Step {current_step+1: >4}: {data[VALUE]: >6.2f} ({data[WAIT]: >6.2f}s)")
def print_step(current_step: int, data: Dict[str, float]) -> None:
"""
Prints the data dict containing the value and the sleep time, with an
iteration count. We flush the buffer each time a line is printed to ensure
real-time updating. We can (and do) use the `-u` argument, i.e. unbuffered
streams, with the Python interpreter for the same effect.
"""
print(
f"Step {current_step+1: >4}: {data[VALUE]: >6.2f} ({data[SLEEP_TIME]: >6.2f}s)"
)
sys.stdout.flush()
def step() -> Dict[str, float]:
"""
The kernel computation of interest which is run once per loop step. Computes
a value using a lognormal distribution. Computes a sleep time using a
triangular distribution to ensure non-negative values with a central
tendency, and then sleeps for that amount of time. A dict is returned
containing both.
"""
value = random.lognormvariate(mu=0.0, sigma=1.0)
wait = abs(random.triangular(low=0.0, high=1, mode=0.5))
time.sleep(wait)
return {VALUE: value, WAIT: wait}
sleep_time = abs(random.triangular(low=0.0, high=1, mode=0.5))
time.sleep(sleep_time)
return {VALUE: value, SLEEP_TIME: sleep_time}
def task(step_count: int) -> None:
"""
The primary kernel loop. Runs the kernel `step()` function, then reports the
values using `print_step()`. The only input is a positive integer which
limits how many times the loop is executed.
"""
assert 0 < step_count
for current_step in range(step_count):
data = step()
print_step(current_step, data)
def task_mpi(step_count: int) -> None:
# TODO import trace
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
print_rank = size
compute_size = size - 1
DATA_TAG = 1
for current_step in range(step_count):
print(f"rank: {rank}, step: {current_step}")
if rank == print_rank:
print(f"rank: {rank}, receiving...")
req = comm.irecv(source=comm.MPI_ANY_SOURCE, tag=DATA_TAG)
print(f"rank: {rank}, waiting for recv...")
data = req.wait()
print(f"rank: {rank}, received!")
print_step(**data)
else:
if current_step % compute_size != rank:
continue
print(f"rank: {rank}, computing...")
data = step()
print(f"rank: {rank}, sending...")
req = comm.isend(
{"current_step": current_step, "data": data}, dest=0, tag=DATA_TAG
)
print(f"rank: {rank}, waiting for send...")
req.wait()
print(f"rank: {rank}, sent!")
def interface() -> None:
parser = argparse.ArgumentParser(description="Run steps.")
"""
The external interface. Prepares an argument parser accepting a positive
integer steps. Steps is clipped to 100 for sanity.
"""
parser = argparse.ArgumentParser(
description="Run kernel function and report values. Intended for use as part of DMTCP checkpointing with SLURM tutorial on Cheaha."
)
parser.add_argument(
"steps", metavar="N", nargs=1, type=int, help="number of steps to run"
"steps",
metavar="N",
nargs="?",
default=100,
type=_check_positive,
help="positive integer number of steps to run, clipped to 100",
)
parser.add_argument("--mpi", metavar="m", action="store_true", help="use mpi?")
args = parser.parse_args()
task(args.steps[0])
steps = args.steps[0]
steps = min(steps, 100)
task(steps)
def _check_positive(value: str) -> int:
"""
Checks if a string supplied to argparse is a positive integer. Raises
argparse.ArgumentTypeError if not.
"""
try:
ivalue = int(value)
except ValueError as e:
raise argparse.ArgumentTypeError(f"{value} must be an integer")
if ivalue <= 0:
raise argparse.ArgumentTypeError(f"{value} must be positive")
return ivalue
if __name__ == "__main__":
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment