Skip to content
Snippets Groups Projects
Commit 42bebdd5 authored by Manavalan Gajapathy's avatar Manavalan Gajapathy
Browse files

adds wrapper tool

parent 08ce5c03
No related branches found
No related tags found
1 merge request!1QuaC - First major review
......@@ -18,3 +18,18 @@
important ones you would care about.
* Pedigree file specific to the project is required. Should be stored as `data/raw/ped/<project_name>.ped`.
* See the header of `workflow/Snakefile` for usage instructions on how to run the workflow
```sh
module reset
module load Anaconda3/2020.02
# create conda environment. Needed only the first time.
conda env create --file configs/env/quac.yaml
# if you need to update existing environment
conda env update --file configs/env/quac.yaml
# activate conda environment
conda activate quac
```
name: quac
channels:
- conda-forge
- bioconda
dependencies:
- python==3.6.13
- black==20.8b1
- pylint==2.7.2
- bioconda::snakefmt==0.4.0
- bioconda::snakemake==6.0.5
- pip
- pip:
- slurmpy==0.0.8
#!/usr/bin/env python3
"""
Reads user input data, constructs snakemake command to run the pipeline
along with their required modules, and submits them as slurm job.
Run the script with --help flag to see its available options.
Example usage:
IO_CONFIG=".test/configs/user_io_config.yaml"
src/run_pipeline.py --io_config $IO_CONFIG
"""
import argparse
from pathlib import Path
import os.path
import yaml
from utility_cgds.cgds.pipeline.src.submit_slurm_job import submit_slurm_job
def is_valid_file(p, arg):
if not Path(arg).is_file():
p.error("The file '%s' does not exist!" % arg)
else:
return arg
def make_dir(d):
"""
Ensure directory exists
"""
Path(d).mkdir(parents=True, exist_ok=True)
return None
# def process_user_io_config(f):
# """
# Reads input-output config file and
# 1. Returns paths that need to be mounted to singularity
# 2. Create project_path dir if it doesn't exist
# 3. Get paths to store job scripts and their logs
# """
# # get dirs to store slurmpy job scripts and their logs, and ensure they exist
# logs_dir = data["logs_dir"]
# make_dir(logs_dir)
# return logs_dir
def create_snakemake_command(args):
"""
Construct snakemake command to run the pipeline
"""
# slurm profile dir for snakemake to properly handle to cluster job fails
snakemake_profile_dir = (
Path(__file__).absolute().parents[1]
/ "configs/snakemake_profile/{{cookiecutter.profile_name}}/"
)
# use absolute path to run it from anywhere
snakefile_path = Path(__file__).absolute().parent / "workflow" / "Snakefile"
# snakemake command to run
cmd = [
"snakemake",
f"--snakefile {snakefile_path}",
f"--config modules='{args.modules}' project_name={args.project_name} ped={args.pedigree} out_dir={args.outdir}",
f"--restart-times {args.rerun_failed}",
"--use-conda",
f"--profile '{snakemake_profile_dir}'",
f"--cluster-config '{args.cluster_config}'",
"--cluster 'sbatch --ntasks {cluster.ntasks} --partition {cluster.partition}"
" --cpus-per-task {cluster.cpus-per-task} --mem {cluster.mem}"
" --output {cluster.output} --parsable'",
]
# add any user provided extra args for snakemake
if args.extra_args:
cmd += [args.extra_args]
# adds option for dryrun if requested
if args.dryrun:
cmd += ["--dryrun"]
return cmd
def main(args):
# process user's input-output config file
slurmpy_logs_dir = process_user_io_config(args.io_config)
# get snakemake command to execute for the pipeline
snakemake_cmd = create_snakemake_command(args)
# put together pipeline command to be run
anaconda_module = "Anaconda3/2020.02"
snakemake_module = "snakemake/5.9.1-foss-2018b-Python-3.6.6"
pipeline_cmd = "\n".join(
[
f"module reset",
f"module load {anaconda_module} {snakemake_module}",
" \\\n\t".join(snakemake_cmd),
]
)
print(
f'{"#" * 40}\n'
f"Input-output configs provided by user: '{args.io_config}'\n"
f"Cluster configs : '{args.cluster_config}'\n\n"
"Command to run the pipeline:\n"
"\x1B[31;95m" + pipeline_cmd + "\x1B[0m\n"
f'{"#" * 40}\n'
)
# submit snakemake command as a slurm job
# Choose resources depending on if manta_execute rule will be run
# as localrule in snakemake or not.
if args.manta_execution_cluster:
slurm_resources = {
"partition": "short", # express(max 2 hrs), short(max 12 hrs), medium(max 50 hrs), long(max 150 hrs)
"ntasks": "1",
"time": "12:00:00",
"cpus-per-task": "1",
"mem": "2G",
}
else:
slurm_resources = {
"partition": "express", # express(max 2 hrs), short(max 12 hrs), medium(max 50 hrs), long(max 150 hrs)
"ntasks": "1",
"time": "2:00:00",
"cpus-per-task": f"{args.cores}",
"mem": "8G",
}
job_dict = {
"basename": "svCaller-pipeline-",
"log_dir": slurmpy_logs_dir,
"run_locally": args.run_locally,
"resources": slurm_resources,
}
submit_slurm_job(pipeline_cmd, job_dict)
return None
if __name__ == "__main__":
PARSER = argparse.ArgumentParser(
description="A wrapper for QuaC pipeline.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
############ Args for QuaC wrapper tool ############
WRAPPER = PARSER.add_argument_group("QuaC wrapper options")
cluster_config_fname_default = (
Path(__file__).absolute().parents[1] / "configs/cluster_config.json"
)
WRAPPER.add_argument(
"--cluster_config",
help="Cluster config json file. Needed for snakemake to run jobs in cluster.",
default=cluster_config_fname_default,
type=lambda x: is_valid_file(PARSER, x),
metavar="",
)
WRAPPER.add_argument(
"-e",
"--extra_args",
help="Pass additional custom args to snakemake. Equal symbol is needed "
"for assignment as in this example: -e='--forceall'",
metavar="",
)
WRAPPER.add_argument(
"-n",
"--dryrun",
action="store_true",
help="Flag to dry-run snakemake. Does not execute anything, and "
"just display what would be done. Equivalent to '--extra_args \"-n\"'",
)
WRAPPER.add_argument(
"-l",
"--run_locally",
action="store_true",
help="Flag to run the snakemake locally and not as a Slurm job. "
"Useful for testing purposes.",
)
RERUN_FAILED_DEFAULT = 1
WRAPPER.add_argument(
"--rerun_failed",
help=f"Number of times snakemake restarts failed jobs. This may be set to >0 "
"to avoid pipeline failing due to job fails due to random SLURM issues",
default=RERUN_FAILED_DEFAULT,
metavar="",
)
# REQUIRED_ARGS = PARSER.add_argument_group("required named arguments")
# REQUIRED_ARGS.add_argument(
# "--io_config",
# help="Input-output config yaml file provided by user",
# required=True,
# type=lambda x: is_valid_file(PARSER, x),
# metavar="",
# )
############ Args for QuaC workflow ############
WORKFLOW = PARSER.add_argument_group("QuaC workflow options")
WORKFLOW.add_argument(
"--project_name",
help="Project name",
metavar="",
)
WORKFLOW.add_argument(
"--pedigree",
help="Pedigree filepath. Must be specific for the project supplied via --project_name",
metavar="",
)
WORKFLOW.add_argument(
"--outdir",
help="Out directory path",
metavar="",
)
WORKFLOW.add_argument(
"-m",
"--select_modules",
help="Runs only these user-specified modules(s). If >1, use comma as delimiter. \
Useful for development.",
default="all",
metavar="",
)
ARGS = PARSER.parse_args()
main(ARGS)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment