-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathamqp_setup.py
114 lines (93 loc) · 4.97 KB
/
amqp_setup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import pika
from os import environ
# These module-level variables are initialized whenever a new instance of python interpreter imports the module;
# In each instance of python interpreter (i.e., a program run), the same module is only imported once (guaranteed by the interpreter).
hostname = environ.get("rabbit_host") or "localhost" # default hostname 172.27.0.2
port = environ.get("rabbit_port") or 5672 # default port
# connect to the broker and set up a communication channel in the connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=hostname, port=port,
heartbeat=3600, blocked_connection_timeout=3600, # these parameters to prolong the expiration time (in seconds) of the connection
))
# Note about AMQP connection: various network firewalls, filters, gateways (e.g., SMU VPN on wifi), may hinder the connections;
# If "pika.exceptions.AMQPConnectionError" happens, may try again after disconnecting the wifi and/or disabling firewalls.
# If see: Stream connection lost: ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)
# - Try: simply re-run the program or refresh the page.
# For rare cases, it's incompatibility between RabbitMQ and the machine running it,
# - Use the Docker version of RabbitMQ instead: https://www.rabbitmq.com/download.html
channel = connection.channel()
# Set up the exchange if the exchange doesn't exist
# - use a 'topic' exchange to enable interaction
exchangename="notification"
exchangetype="topic"
channel.exchange_declare(exchange=exchangename, exchange_type=exchangetype, durable=True)
# 'durable' makes the exchange survive broker restarts
# Here can be a place to set up all queues needed by the microservices,
# - instead of setting up the queues using RabbitMQ UI.
############ sms_food queue #############
# declare sms_food queue
queue_name = 'sms_food'
channel.queue_declare(queue=queue_name, durable=True)
# 'durable' makes the queue survive broker restarts
#bind Activity_Log queue
channel.queue_bind(exchange=exchangename, queue=queue_name, routing_key='#.sms.food.#')
# bind the queue to the exchange via the key
# 'routing_key=#' => any routing_key would be matched
print("sms_food queue set up")
############ sms_forum queue #############
# declare sms_forum queue
queue_name = 'sms_forum'
channel.queue_declare(queue=queue_name, durable=True)
# 'durable' makes the queue survive broker restarts
#bind Activity_Log queue
channel.queue_bind(exchange=exchangename, queue=queue_name, routing_key='#.sms.forum.#')
# bind the queue to the exchange via the key
# 'routing_key=#' => any routing_key would be matched
print("sms_forum queue set up")
############ email_food queue #############
# declare email_food queue
queue_name = 'email_food'
channel.queue_declare(queue=queue_name, durable=True)
# 'durable' makes the queue survive broker restarts
#bind Activity_Log queue
channel.queue_bind(exchange=exchangename, queue=queue_name, routing_key='#.email.food.#')
# bind the queue to the exchange via the key
# 'routing_key=#' => any routing_key would be matched
print("email_food queue set up")
############ email_forum queue #############
# declare email_forum queue
queue_name = 'email_forum'
channel.queue_declare(queue=queue_name, durable=True)
# 'durable' makes the queue survive broker restarts
#bind Activity_Log queue
channel.queue_bind(exchange=exchangename, queue=queue_name, routing_key='#.email.forum.#')
# bind the queue to the exchange via the key
# 'routing_key=#' => any routing_key would be matched
print("email_forum queue set up")
"""
This function in this module sets up a connection and a channel to a local AMQP broker,
and declares a 'topic' exchange to be used by the microservices in the solution.
"""
def check_setup():
# The shared connection and channel created when the module is imported may be expired,
# timed out, disconnected by the broker or a client;
# - re-establish the connection/channel is they have been closed
global connection, channel, hostname, port, exchangename, exchangetype
if not is_connection_open(connection):
connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=port, heartbeat=3600, blocked_connection_timeout=3600))
if channel.is_closed:
channel = connection.channel()
channel.exchange_declare(exchange=exchangename, exchange_type=exchangetype, durable=True)
def is_connection_open(connection):
# For a BlockingConnection in AMQP clients,
# when an exception happens when an action is performed,
# it likely indicates a broken connection.
# So, the code below actively calls a method in the 'connection' to check if an exception happens
try:
connection.process_data_events()
return True
except pika.exceptions.AMQPError as e:
print("AMQP Error:", e)
print("...creating a new connection.")
return False