tasks.py 3.14 KB
Newer Older
1
import vars
root's avatar
root committed
2
import sys
3
import json
4
import time
Krish Moodbidri's avatar
Krish Moodbidri committed
5
import signal
6
7
8

from celery import Celery
from flask_socketio import SocketIO
root's avatar
root committed
9

10
sys.path.append(vars.rabbitmq_agents_loc)
11
import rc_util
12

13
broker_url = vars.broker_url
14
15
celery = Celery('flask_user_reg', broker=broker_url)

16
socketio = SocketIO(message_queue=vars.message_queue)
root's avatar
root committed
17
timeout = 30
18

19
20
21
22
def gen_f(room):
    def callback(channel, method, properties, body):
        msg = json.loads(body)
        username = msg['username']
23
        queuename = msg['username']
24
25
26
27
28
29
30
31
32
33
34
35

        if msg['success']:
            print(f'Account for {username} has been created.')
            send_msg('account ready', room)
        else:
            print(f"There's some issue while creating account for {username}")
            errmsg = msg.get('errmsg', [])
            for err in errmsg:
                print(err)
            socketio.emit('account error', errmsg, room= room)

        rc_util.rc_rmq.stop_consume()
36
        rc_util.rc_rmq.delete_queue(queuename)
37
    return callback
38

Krish Moodbidri's avatar
Krish Moodbidri committed
39
40
41
42
def certify_gen_f(room):
    def callback(channel, method, properties, body):
        msg = json.loads(body)
        username = msg['username']
43
        queuename = msg['username']
Krish Moodbidri's avatar
Krish Moodbidri committed
44
45
46
47
48
49
50
51
52
53
54
55

        if msg['success']:
            print(f'Account for {username} has been certified.')
            send_msg('certified', room)
        else:
            print(f"There's some issue while certifying account for {username}")
            errmsg = msg.get('errmsg', [])
            for err in errmsg:
                print(err)
            socketio.emit('certify error', errmsg, room= room)

        rc_util.rc_rmq.stop_consume()
56
        rc_util.rc_rmq.delete_queue(queuename)
Krish Moodbidri's avatar
Krish Moodbidri committed
57
58
    return callback

59
def send_msg(event, room):
60
    socketio.emit(event, room=room)
61

Krish Moodbidri's avatar
Krish Moodbidri committed
62
63
def timeout_handler(signum, frame):
    print("Process timeout, there's might some issue with agents")
root's avatar
root committed
64
    socketio.emit('account error', errmsg, room= room)
Krish Moodbidri's avatar
Krish Moodbidri committed
65
    rc_util.rc_rmq.stop_consume()
root's avatar
root committed
66
    rc_util.rc_rmq.delete_queue()
67

68
@celery.task
69
def celery_create_account(json, session):
70
    room = session
71
72
73
74
    username= json['username'] 
    email= json['email']
    fullname= json['fullname']
    reason= json['reason']
75
    aup= json['aup']
76
    queuename= rc_util.encode_name(username)
77

78
    print(time.strftime("%m-%d-%Y_%H:%M:%S") + '\tUser ' + username + ' added to queue')
79
    send_msg('creating account', room)
80
    print(username)
81
    rc_util.add_account(username, queuename, email, fullname, reason, aup)
root's avatar
root committed
82
83
    print('sent account info')
    print('Waiting for completion...')
84
85
86
87
88
89
90
91
92
93
94
95
96
    rc_util.consume(queuename, routing_key=f'complete.{queuename}', callback=gen_f(room))

@celery.task
def celery_certify_account(json, session):
    room = session
    username= json['username']
    email= json['email']
    fullname= json['fullname']
    queuename= rc_util.encode_name(username)

    print("CERTIFY : "+time.strftime("%m-%d-%Y_%H:%M:%S") + '\tUser ' + username + ' added to queue')
    send_msg('certifying account', room)
    print(username)
Krish Moodbidri's avatar
Krish Moodbidri committed
97
    rc_util.certify_account(username, queuename, 'ok', 'all')
98
99
    print('sent account info')
    print('Waiting for certification...')
Krish Moodbidri's avatar
Krish Moodbidri committed
100
    rc_util.consume(queuename, routing_key=f'certified.{queuename}', callback=certify_gen_f(room))