tasks.py 3.96 KB
Newer Older
1
import vars
root's avatar
root committed
2
import sys
3
import json
4
import time
Krish Moodbidri's avatar
Krish Moodbidri committed
5
import signal
6
7
8

from celery import Celery
from flask_socketio import SocketIO
root's avatar
root committed
9

10
sys.path.append(vars.rabbitmq_agents_loc)
11
import rc_util
12

13
broker_url = vars.broker_url
Krish Moodbidri's avatar
Krish Moodbidri committed
14
celery = Celery(vars.celery_app, broker=broker_url)
15

16
socketio = SocketIO(message_queue=vars.message_queue)
root's avatar
root committed
17
timeout = 30
18

19
20
21
22
def gen_f(room):
    def callback(channel, method, properties, body):
        msg = json.loads(body)
        username = msg['username']
Krish Moodbidri's avatar
Krish Moodbidri committed
23
        queuename = msg['queuename']
24
25
26
27
28
29
30
31
32
33
34
35

        if msg['success']:
            print(f'Account for {username} has been created.')
            send_msg('account ready', room)
        else:
            print(f"There's some issue while creating account for {username}")
            errmsg = msg.get('errmsg', [])
            for err in errmsg:
                print(err)
            socketio.emit('account error', errmsg, room= room)

        rc_util.rc_rmq.stop_consume()
36
        rc_util.rc_rmq.delete_queue(queuename)
37
    return callback
38

Krish Moodbidri's avatar
Krish Moodbidri committed
39
40
41
42
def certify_gen_f(room):
    def callback(channel, method, properties, body):
        msg = json.loads(body)
        username = msg['username']
Krish Moodbidri's avatar
Krish Moodbidri committed
43
        queuename = msg['queuename']
Krish Moodbidri's avatar
Krish Moodbidri committed
44
45
46
47
48
49
50
51
52
53
54
55

        if msg['success']:
            print(f'Account for {username} has been certified.')
            send_msg('certified', room)
        else:
            print(f"There's some issue while certifying account for {username}")
            errmsg = msg.get('errmsg', [])
            for err in errmsg:
                print(err)
            socketio.emit('certify error', errmsg, room= room)

        rc_util.rc_rmq.stop_consume()
56
        rc_util.rc_rmq.delete_queue(queuename)
Krish Moodbidri's avatar
Krish Moodbidri committed
57
58
    return callback

59
def send_msg(event, room):
Krish Moodbidri's avatar
Krish Moodbidri committed
60
61
62
    """
    This function is used to send messages via socketio
    Input:
Krish Moodbidri's avatar
Krish Moodbidri committed
63
        string event, room: event and room to use socketio emit message 
Krish Moodbidri's avatar
Krish Moodbidri committed
64
    Output:
Krish Moodbidri's avatar
Krish Moodbidri committed
65
        string: socketio emit function to emit the event to the room
Krish Moodbidri's avatar
Krish Moodbidri committed
66
    """
67
    socketio.emit(event, room=room)
68

Krish Moodbidri's avatar
Krish Moodbidri committed
69
70
def timeout_handler(signum, frame):
    print("Process timeout, there's might some issue with agents")
root's avatar
root committed
71
    socketio.emit('account error', errmsg, room= room)
Krish Moodbidri's avatar
Krish Moodbidri committed
72
    rc_util.rc_rmq.stop_consume()
root's avatar
root committed
73
    rc_util.rc_rmq.delete_queue()
74

75
@celery.task
76
def celery_create_account(json, session):
Krish Moodbidri's avatar
Krish Moodbidri committed
77
78
79
    """
    This function is used to create account for new users 
    Input:
Krish Moodbidri's avatar
Krish Moodbidri committed
80
        json, string: json object of all user attributes and session/room 
Krish Moodbidri's avatar
Krish Moodbidri committed
81
82
83
    Output:
        gen_f(room): callback to check for success or failure
    """
84
    room = session
85
86
87
88
    username= json['username'] 
    email= json['email']
    fullname= json['fullname']
    reason= json['reason']
89
    queuename= rc_util.encode_name(username)
Krish Moodbidri's avatar
Krish Moodbidri committed
90
91
    updated_by= f'{username}'
    host= vars.app_host
92

93
    print(time.strftime("%m-%d-%Y_%H:%M:%S") + '\tUser ' + username + ' added to queue')
94
    send_msg('creating account', room)
95
    print(username)
Krish Moodbidri's avatar
Krish Moodbidri committed
96
    rc_util.add_account(username, queuename, email, fullname, reason, updated_by, host)
root's avatar
root committed
97
98
    print('sent account info')
    print('Waiting for completion...')
99
100
101
102
    rc_util.consume(queuename, routing_key=f'complete.{queuename}', callback=gen_f(room))

@celery.task
def celery_certify_account(json, session):
Krish Moodbidri's avatar
Krish Moodbidri committed
103
104
105
    """
    This function is used to certify account for existing users
    Input:
Krish Moodbidri's avatar
Krish Moodbidri committed
106
        json, string: json object of all user attributes and session/room
Krish Moodbidri's avatar
Krish Moodbidri committed
107
108
109
    Output:
        gen_f(room): callback to check for success or failure
    """
110
111
112
113
114
    room = session
    username= json['username']
    email= json['email']
    fullname= json['fullname']
    queuename= rc_util.encode_name(username)
115
    updated_by= f'{username}'
116
    host= vars.app_host 
117
118
119
120

    print("CERTIFY : "+time.strftime("%m-%d-%Y_%H:%M:%S") + '\tUser ' + username + ' added to queue')
    send_msg('certifying account', room)
    print(username)
121
    rc_util.certify_account(username, queuename, 'ok', 'all', updated_by, host)
122
123
    print('sent account info')
    print('Waiting for certification...')
Krish Moodbidri's avatar
Krish Moodbidri committed
124
    rc_util.consume(queuename, routing_key=f'certified.{queuename}', callback=certify_gen_f(room))