Skip to content
Snippets Groups Projects
Commit 08141d63 authored by Eesaan Atluri's avatar Eesaan Atluri
Browse files

move acctmgr agent workflow out of driver script

parent e63517fb
No related branches found
No related tags found
2 merge requests!147Merge previous default branch feat-cod-rmq into main,!119Feat account management
......@@ -26,78 +26,54 @@ args = parser.parse_args()
timeout = 60
queuename = rc_util.encode_name(args.username)
username = args.username
state = args.state
service = args.service
corr_id = str(uuid.uuid4())
# Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
callback_queue = rc_rmq.bind_queue(exclusive=True)
msg = {}
msg["username"] = username
msg["state"] = state
msg["service"] = service
msg["queuename"] = queuename
# publish msg with acctmgr.{uname} routing key.
rc_rmq.publish_msg(
{
"routing_key": f'acctmgr.request.{queuename}',
"msg": msg,
}
)
if state == 'blocked' or state == 'certification':
action = "lock"
elif state == 'ok':
action = "unlock"
else:
print("Invalid state provided. Check the help menu.")
if args.service == 'all':
# send a broadcast message to all agents
rc_rmq.publish_msg(
{
"routing_key": f"{action}.{username}",
"props": pika.BasicProperties(
correlation_id=corr_id, reply_to=callback_queue
),
"msg": {"username": username, "action": action, "service": service},
}
)
else:
for each_service in service:
rc_rmq.publish_msg(
{
"routing_key": f"{each_service}.{username}",
"props": pika.BasicProperties(
correlation_id=corr_id, reply_to=callback_queue
),
"msg": {"username": username, "action": action, "service": service},
}
)
def timeout_handler(signum, frame):
print("Process timeout, there's some issue with agents")
rc_rmq.stop_consume()
def callback(channel, method, properties, body):
def callback(ch, method, properties, body):
msg = json.loads(body)
username = msg["username"]
# Check if each task returned success
for each_task in msg["success"].values():
if each_task == True:
success=True
else:
success=False
break
if success:
print(f"Account for {username} has been {action}ed.\n Updating the user state in DB")
rc_util.update_state(username, state)
print(msg)
if msg["success"]:
print(f"Account for {username} has been {msg['action']}ed.\n Updating the user state in DB")
else:
print(f"There's some issue in account management agents for {username}")
errmsg = msg.get("errmsg", [])
for err in errmsg:
print(err)
ch.basic_ack(delivery_tag=method.delivery_tag)
rc_rmq.stop_consume()
rc_rmq.disconnect()
rc_rmq.delete_queue(queuename)
print(f"{action} action for {args.username} requested.")
print(f"Request {username} account state set to {state}.")
# Set initial timeout timer
signal.signal(signal.SIGALRM, timeout_handler)
......@@ -106,9 +82,8 @@ signal.setitimer(signal.ITIMER_REAL, timeout)
print("Waiting for completion...")
rc_rmq.start_consume(
{
"queue": callback_queue,
"exclusive": True,
"bind": False,
"queue": queuename,
"routing_key": f'certified.{queuename}',
"cb": callback,
}
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment