Commit d63d8a80 authored by Krish Moodbidri's avatar Krish Moodbidri
Browse files

replaced subprocess call with direct call to script, added scripts from...

replaced subprocess call with direct call to script, added scripts from rabbitmq agents repo (https://github.com/uabrc/rabbitmq_agents)
parent c3e552ec
import json
import pika
import socket
import rabbit_config as rcfg
class RCRMQ(object):
USER = 'guest'
PASSWORD = 'guest'
HOST = 'localhost'
PORT = 5672
VHOST = '/'
EXCHANGE = ''
EXCHANGE_TYPE = 'direct'
QUEUE = None
DURABLE = True
ROUTING_KEY = None
DEBUG = False
def __init__(self, config=None, debug=False):
if config:
if 'exchange' in config:
self.EXCHANGE = config['exchange']
if 'exchange_type' in config:
self.EXCHANGE_TYPE = config['exchange_type']
hostname = socket.gethostname().split(".", 1)[0]
self.HOST = rcfg.Server if hostname != rcfg.Server else "localhost"
self.USER = rcfg.User
self.PASSWORD = rcfg.Password
self.VHOST = rcfg.VHost
self.PORT = rcfg.Port
self.DEBUG = debug
if self.DEBUG:
print("""
Created RabbitMQ instance with:
Exchange name: {},
Exchange type: {},
Host: {},
User: {},
VHost: {},
Port: {}
""".format(self.EXCHANGE, self.EXCHANGE_TYPE, self.HOST, self.USER, self.VHOST, self.PORT))
self._consumer_tag = None
self._connection = None
self._consuming = False
self._channel = None
self._parameters = pika.ConnectionParameters(
self.HOST,
self.PORT,
self.VHOST,
pika.PlainCredentials(self.USER, self.PASSWORD))
def connect(self):
if self.DEBUG:
print("Connecting...\n" + "Exchange: " + self.EXCHANGE + " Exchange type: " + self.EXCHANGE_TYPE)
self._connection = pika.BlockingConnection(self._parameters)
self._channel = self._connection.channel()
self._channel.exchange_declare(
exchange=self.EXCHANGE,
exchange_type=self.EXCHANGE_TYPE,
durable=True)
def bind_queue(self):
self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE)
self._channel.queue_bind(exchange=self.EXCHANGE,
queue=self.QUEUE,
routing_key=self.ROUTING_KEY)
def disconnect(self):
self._channel.close()
self._connection.close()
self._connection = None
def delete_queue(self):
self._channel.queue_delete(self.QUEUE)
def publish_msg(self, obj):
if 'routing_key' in obj:
self.ROUTING_KEY = obj['routing_key']
if self._connection is None:
self.connect()
self._channel.basic_publish(exchange=self.EXCHANGE,
routing_key=self.ROUTING_KEY,
body=json.dumps(obj['msg']))
def start_consume(self, obj):
if 'queue' in obj:
self.QUEUE = obj['queue']
self.ROUTING_KEY = obj['routing_key'] if 'routing_key' in obj else self.QUEUE
if 'durable' in obj:
self.DURABLE = obj['durable']
if self.DEBUG:
print("Queue: " + self.QUEUE + "\nRouting_key: " + self.ROUTING_KEY)
if self._connection is None:
self.connect()
self.bind_queue()
self._consumer_tag = self._channel.basic_consume(self.QUEUE,obj['cb'])
self._consuming = True
try:
self._channel.start_consuming()
except KeyboardInterrupt:
self._channel.stop_consuming()
def stop_consume(self):
self._channel.basic_cancel(self._consumer_tag)
import logging
import argparse
from rc_rmq import RCRMQ
import json
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
tasks = {'ohpc_account': None, 'ood_account': None, 'slurm_account': None}
logger_fmt = '%(asctime)s [%(module)s] - %(message)s'
def add_account(username, email, full='', reason=''):
rc_rmq.publish_msg({
'routing_key': 'request.' + username,
'msg': {
"username": username,
"email": email,
"fullname": full,
"reason": reason
}
})
rc_rmq.disconnect()
def worker(ch, method, properties, body):
msg = json.loads(body)
task = msg['task']
tasks[task] = msg['success']
print("Got msg: {}({})".format(msg['task'], msg['success']))
# Check if all tasks are done
done = True
for key, status in tasks.items():
if not status:
print("{} is not done yet.".format(key))
done = False
if done:
rc_rmq.stop_consume()
rc_rmq.delete_queue()
def consume(username, callback=worker, debug=False):
if debug:
sleep(5)
else:
rc_rmq.start_consume({
'queue': username,
'routing_key': 'confirm.' + username,
'cb': callback
})
rc_rmq.disconnect()
return { 'success' : True }
def get_args():
# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true', help='verbose output')
parser.add_argument('-n', '--dry-run', action='store_true', help='enable dry run mode')
return parser.parse_args()
def get_logger(args=None):
if args is None:
args = get_args()
logger_lvl = logging.WARNING
if args.verbose:
logger_lvl = logging.DEBUG
if args.dry_run:
logger_lvl = logging.INFO
logging.basicConfig(format=logger_fmt, level=logger_lvl)
return logging.getLogger(__name__)
......@@ -3,6 +3,7 @@ import time
from flask_socketio import SocketIO
import subprocess
import vars
import rc_util
from gevent import monkey
monkey.patch_all(subprocess=True)
......@@ -29,6 +30,7 @@ def celery_create_account(json, session):
print(time.strftime("%m-%d-%Y_%H:%M:%S") + '\tUser ' + username + ' added to queue')
send_msg('creating account', room)
print(username)
subprocess.call(["/opt/rabbitmq_agents/create_account.py", username, email, fullname, reason])
rc_util.add_account(username, email, fullname, reason)
rc_util.consume(username)
print(time.strftime("%m-%d-%Y_%H:%M:%S") + '\tAccount successfully created for ' + username)
send_msg('account ready', room)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment