tasks.py 3.19 KB
Newer Older
1
import vars
root's avatar
root committed
2
import sys
3
import json
4
import time
Krish Moodbidri's avatar
Krish Moodbidri committed
5
import signal
6
7
8

from celery import Celery
from flask_socketio import SocketIO
root's avatar
root committed
9

10
sys.path.append(vars.rabbitmq_agents_loc)
11
import rc_util
12

13
broker_url = vars.broker_url
Krish Moodbidri's avatar
Krish Moodbidri committed
14
celery = Celery(vars.celery_app, broker=broker_url)
15

16
socketio = SocketIO(message_queue=vars.message_queue)
root's avatar
root committed
17
timeout = 30
18

19
20
21
22
def gen_f(room):
    def callback(channel, method, properties, body):
        msg = json.loads(body)
        username = msg['username']
Krish Moodbidri's avatar
Krish Moodbidri committed
23
        queuename = msg['queuename']
24
25
26
27
28
29
30
31
32
33
34
35

        if msg['success']:
            print(f'Account for {username} has been created.')
            send_msg('account ready', room)
        else:
            print(f"There's some issue while creating account for {username}")
            errmsg = msg.get('errmsg', [])
            for err in errmsg:
                print(err)
            socketio.emit('account error', errmsg, room= room)

        rc_util.rc_rmq.stop_consume()
36
        rc_util.rc_rmq.delete_queue(queuename)
37
    return callback
38

Krish Moodbidri's avatar
Krish Moodbidri committed
39
40
41
42
def certify_gen_f(room):
    def callback(channel, method, properties, body):
        msg = json.loads(body)
        username = msg['username']
Krish Moodbidri's avatar
Krish Moodbidri committed
43
        queuename = msg['queuename']
Krish Moodbidri's avatar
Krish Moodbidri committed
44
45
46
47
48
49
50
51
52
53
54
55

        if msg['success']:
            print(f'Account for {username} has been certified.')
            send_msg('certified', room)
        else:
            print(f"There's some issue while certifying account for {username}")
            errmsg = msg.get('errmsg', [])
            for err in errmsg:
                print(err)
            socketio.emit('certify error', errmsg, room= room)

        rc_util.rc_rmq.stop_consume()
56
        rc_util.rc_rmq.delete_queue(queuename)
Krish Moodbidri's avatar
Krish Moodbidri committed
57
58
    return callback

59
def send_msg(event, room):
60
    socketio.emit(event, room=room)
61

Krish Moodbidri's avatar
Krish Moodbidri committed
62
63
def timeout_handler(signum, frame):
    print("Process timeout, there's might some issue with agents")
root's avatar
root committed
64
    socketio.emit('account error', errmsg, room= room)
Krish Moodbidri's avatar
Krish Moodbidri committed
65
    rc_util.rc_rmq.stop_consume()
root's avatar
root committed
66
    rc_util.rc_rmq.delete_queue()
67

68
@celery.task
69
def celery_create_account(json, session):
70
    room = session
71
72
73
74
    username= json['username'] 
    email= json['email']
    fullname= json['fullname']
    reason= json['reason']
Krish Moodbidri's avatar
Krish Moodbidri committed
75
#    aup= json['aup']
76
    queuename= rc_util.encode_name(username)
77

78
    print(time.strftime("%m-%d-%Y_%H:%M:%S") + '\tUser ' + username + ' added to queue')
79
    send_msg('creating account', room)
80
    print(username)
Krish Moodbidri's avatar
Krish Moodbidri committed
81
    rc_util.add_account(username, queuename, email, fullname, reason)
root's avatar
root committed
82
83
    print('sent account info')
    print('Waiting for completion...')
84
85
86
87
88
89
90
91
92
    rc_util.consume(queuename, routing_key=f'complete.{queuename}', callback=gen_f(room))

@celery.task
def celery_certify_account(json, session):
    room = session
    username= json['username']
    email= json['email']
    fullname= json['fullname']
    queuename= rc_util.encode_name(username)
Krish Moodbidri's avatar
Krish Moodbidri committed
93
    updated_by= f'{username}@login005'
94
95
96
97

    print("CERTIFY : "+time.strftime("%m-%d-%Y_%H:%M:%S") + '\tUser ' + username + ' added to queue')
    send_msg('certifying account', room)
    print(username)
Krish Moodbidri's avatar
Krish Moodbidri committed
98
    rc_util.certify_account(username, queuename, 'ok', 'all', updated_by)
99
100
    print('sent account info')
    print('Waiting for certification...')
Krish Moodbidri's avatar
Krish Moodbidri committed
101
    rc_util.consume(queuename, routing_key=f'certified.{queuename}', callback=certify_gen_f(room))