tasks.py 1.81 KB
Newer Older
1
import vars
root's avatar
root committed
2
import sys
3
import json
4
import time
Krish Moodbidri's avatar
Krish Moodbidri committed
5
import signal
6
7
8

from celery import Celery
from flask_socketio import SocketIO
root's avatar
root committed
9
10

sys.path.append('/cm/shared/rabbitmq_agents/')
11
import rc_util
12

13
broker_url = vars.broker_url
14
15
celery = Celery('flask_user_reg', broker=broker_url)

16
socketio = SocketIO(message_queue=vars.message_queue)
Krish Moodbidri's avatar
Krish Moodbidri committed
17
timeout = 60
18

19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def gen_f(room):
    def callback(channel, method, properties, body):
        msg = json.loads(body)
        username = msg['username']

        if msg['success']:
            print(f'Account for {username} has been created.')
            send_msg('account ready', room)
        else:
            print(f"There's some issue while creating account for {username}")
            errmsg = msg.get('errmsg', [])
            for err in errmsg:
                print(err)
            socketio.emit('account error', errmsg, room= room)

        rc_util.rc_rmq.stop_consume()
        rc_util.rc_rmq.delete_queue()
    return callback
37

38
def send_msg(event, room):
39
    socketio.emit(event, room=room)
40

Krish Moodbidri's avatar
Krish Moodbidri committed
41
42
43
def timeout_handler(signum, frame):
    print("Process timeout, there's might some issue with agents")
    rc_util.rc_rmq.stop_consume()
44

45
@celery.task
46
def celery_create_account(json, session):
47
    room = session
48
49
50
51
52
    username= json['username'] 
    email= json['email']
    fullname= json['fullname']
    reason= json['reason']

53
    print(time.strftime("%m-%d-%Y_%H:%M:%S") + '\tUser ' + username + ' added to queue')
54
    send_msg('creating account', room)
55
    print(username)
56
    rc_util.add_account(username, email, fullname, reason)
root's avatar
root committed
57
    print('sent account info')
Krish Moodbidri's avatar
Krish Moodbidri committed
58
59
    

root's avatar
root committed
60
    print('Waiting for completion...')
61
62
63
    #print(callback(self.EXCHANGE, self.EXCHANGE,self.EXCHANGE,self.EXCHANGE))
    rc_util.consume(username, routing_key=f'complete.{username}', callback=gen_f(room))    
    #send_msg('account ready', room)