-
Notifications
You must be signed in to change notification settings - Fork 0
/
handlers.cpp
66 lines (54 loc) · 2.59 KB
/
handlers.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#include "handlers.h"
#include <algorithm>
#include <thread>
void SubCommandHandler::Handle(int client_socket_fd, std::vector<Subscription>& subscribers) {
// Check if the client is already subscribed to the topic
auto it = std::find_if(subscribers.begin(), subscribers.end(),
[client_socket_fd, this](const Subscription& subscription) {
return subscription.client_socket_fd == client_socket_fd && subscription.topic == topic_;
});
if (it != subscribers.end()) {
std::cout << "Already subscribed to the topic: " << topic_ << std::endl;
ack_message_ = "-ERR Already subscribed to the topic\r\n";
return;
}
// Add the new subscription to the list
Subscription newSubscription{client_socket_fd, topic_};
subscribers.push_back(newSubscription);
// For testing purpose
// std::this_thread::sleep_for(std::chrono::seconds(3)); // Sleep for 3 seconds
ack_message_ = "+OK\n";
}
void PubCommandHandler::Handle(int client_socket_fd, std::vector<Subscription>& subscribers) {
auto it = std::find_if(subscribers.begin(), subscribers.end(),
[client_socket_fd, this](const Subscription& subscription) {
return subscription.client_socket_fd == client_socket_fd && subscription.topic == topic_;
});
// Read Message
char buffer[1024];
int bytes_read = recv(client_socket_fd, buffer, 1024, 0);
std::string message(buffer);
if (bytes_read <= 0) {
// Client has closed the connection
std::cout << "Client disconnected\n";
// TODO: Remove the client from the subscribers list and close the socket maybe ?
return;
}
// Find subscribers to the topic and send the message to them
for (const auto& subscriber : subscribers) {
if (subscriber.topic == topic_) {
send(subscriber.client_socket_fd, message.c_str(), message.size(), 0);
}
}
ack_message_ = "+OK\n";
}
void UnsubCommandHandler::Handle(int client_socket_fd, std::vector<Subscription>& subscribers) {
auto predicate = [client_socket_fd, this](const Subscription& subscription) {
return subscription.client_socket_fd == client_socket_fd && subscription.topic == topic_;
};
subscribers.erase(std::remove_if(subscribers.begin(), subscribers.end(), predicate), subscribers.end());
ack_message_ = "+OK\n";
}
void PingCommandHandler::Handle(int client_socket_fd, std::vector<Subscription>& subscribers) {
ack_message_ = "PONG\n";
}