Skip to content
Snippets Groups Projects
Commit 4ff2ffe1 authored by Eesaan Atluri's avatar Eesaan Atluri
Browse files

Move away from using RPC queues and use regular queues

parent 08141d63
No related branches found
No related tags found
2 merge requests!147Merge previous default branch feat-cod-rmq into main,!119Feat account management
......@@ -22,13 +22,10 @@ def expire_account(ch, method, properties, body):
msg = json.loads(body)
username = msg["username"]
action = msg["action"]
msg["success"] = {}
msg["success"][task] = False
msg["task"] = task
queuename = msg["queuename"]
yesterday = date.today() - timedelta(days=1)
corr_id = properties.correlation_id
reply_to = properties.reply_to
try:
expire_account_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set expirationdate {yesterday}; commit;"'
unexpire_account_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set expirationdate 2037/12/31; commit;"'
......@@ -38,23 +35,19 @@ def expire_account(ch, method, properties, body):
elif action == 'unlock':
unblock_ssh = popen(unexpire_account_cmd).read().rstrip()
msg["success"][task] = True
msg["success"] = True
logger.info(f"ssh expiration set to yesterday for user {username}")
except Exception:
msg["success"][task] = False
msg["success"] = False
msg["errmsg"] = "Exception raised, while expiring user's ssh access, check the logs for stack trace"
logger.error("", exc_info=True)
# send response to callback queue with it's correlation ID
if reply_to:
rc_rmq.publish_msg(
{"routing_key": reply_to,
"props": pika.BasicProperties(
correlation_id=corr_id,
),
"msg": msg}
)
rc_rmq.publish_msg(
{"routing_key": f'acctmgr.done.{queuename}',
"msg": msg}
)
logger.debug(f"User {username} confirmation sent for {action}ing {task}")
......
......@@ -21,11 +21,8 @@ def new_jobs(ch, method, properties, body):
msg = json.loads(body)
username = msg["username"]
action = msg["action"]
msg["success"] = {}
msg["success"][task] = False
corr_id = properties.correlation_id
reply_to = properties.reply_to
msg["task"] = task
queuename = msg["queuename"]
try:
block_new_jobs_cmd = f"/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update user {username} set maxjobs=0"
......@@ -36,23 +33,19 @@ def new_jobs(ch, method, properties, body):
elif action == 'unlock':
unblock_new_jobs = popen(unblock_new_jobs_cmd).read().rstrip()
msg["success"][task] = True
msg["success"] = True
logger.info(f"Succeeded in blocking {username}'s jobs getting to run state")
except Exception:
msg["success"][task] = False
msg["success"] = False
msg["errmsg"] = "Exception raised while setting maxjobs that can enter run state, check the logs for stack trace"
logger.error("", exc_info=True)
# send response to callback queue with it's correlation ID
if reply_to:
rc_rmq.publish_msg(
{"routing_key": reply_to,
"props": pika.BasicProperties(
correlation_id=corr_id,
),
"msg": msg}
)
rc_rmq.publish_msg(
{"routing_key": f'acctmgr.done.{queuename}',
"msg": msg}
)
logger.debug(f"User {username} confirmation sent for {action}ing {task}")
......
......@@ -21,9 +21,7 @@ def ssh_access(ch, method, properties, body):
msg = json.loads(body)
username = msg["username"]
action = msg["action"]
msg["success"] = {}
msg["success"][task] = False
msg["task"] = task
corr_id = properties.correlation_id
reply_to = properties.reply_to
......@@ -36,23 +34,21 @@ def ssh_access(ch, method, properties, body):
elif action == 'unlock':
unblock_ssh = popen(unblock_ssh_cmd).read().rstrip()
msg["success"][task] = True
msg["success"] = True
logger.info(f"User {username} is added to nossh group")
except Exception:
msg["success"][task] = False
msg["success"] = False
msg["errmsg"] = "Exception raised, while blocking user's ssh access, check the logs for stack trace"
logger.error("", exc_info=True)
# send response to callback queue with it's correlation ID
if reply_to:
rc_rmq.publish_msg(
{"routing_key": reply_to,
"props": pika.BasicProperties(
correlation_id=corr_id,
),
"msg": msg}
)
rc_rmq.publish_msg(
{
"routing_key": f'acctmgr.done.{queuename}',
"msg": msg
}
)
logger.debug(f"User {username} confirmation sent for {action}ing {task}")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment