Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer fixes #223

Open
wants to merge 8 commits into
base: dev
Choose a base branch
from
1 change: 1 addition & 0 deletions samples/fbuffer/BufferTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ bool BufferTask::ParseLine(rapidjson::Value &line) {
if (i == -1) {
if (not value.IsString()) {
DARWIN_LOG_ERROR("BufferTask::ParseLine:: the source given must be a string.");
return false;
}
this->_input_line["source"] = value.GetString();
i++;
Expand Down
9 changes: 7 additions & 2 deletions samples/fbuffer/BufferThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ bool BufferThread::Main() {

if (len >= 0 && len < this->_connector->GetRequiredLogLength()){
DARWIN_LOG_DEBUG("BufferThread::Main:: Not enough log in Redis, wait for more");
continue;
} else if (len<0 || !this->_connector->REDISPopLogs(len, logs, redis_list)) {
} else if (len < 0 || !this->_connector->REDISPopLogs(len, logs, redis_list)) {
DARWIN_LOG_ERROR("BufferThread::Main:: Error when querying Redis on list: " + redis_list + " for source: '" + redis_config.first + "'");
continue;
} else {
Expand All @@ -43,6 +42,12 @@ bool BufferThread::Main() {
DARWIN_LOG_DEBUG("BufferThread::Main:: Removed " + std::to_string(logs.size()) + " elements from redis");
}
}

// Set an expiration on the redis key, to purge if threads/filters are stopped or configuration is modified
// The expiration MUST be over the interval period
if (not this->_connector->REDISSetExpiry(redis_list, this->_interval + 60)) {
DARWIN_LOG_WARNING("BufferThread::Main:: Could not set an expiration on key '" + redis_list + "'");
}
}

return true;
Expand Down
35 changes: 25 additions & 10 deletions samples/fbuffer/Connectors/AConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ std::vector<std::pair<std::string, std::string>> AConnector::GetRedisLists() con
return this->_redis_lists;
}

bool AConnector::ParseData(std::string fieldname) {
bool AConnector::ParseData(std::map<std::string, std::string> &input_line, std::string fieldname, std::string &entry) {
DARWIN_LOGGER;
if (this->_input_line.find(fieldname) == this->_input_line.end()) {
if (input_line.find(fieldname) == input_line.end()) {
DARWIN_LOG_ERROR("AConnector::ParseData '" + fieldname + "' is missing in the input line. Output ignored.");
return false;
}
if (not this->_entry.empty()) {
this->_entry += ";";
if (not entry.empty()) {
entry += ";";
}
this->_entry += this->_input_line[fieldname];
entry += input_line[fieldname];
return true;
}

Expand Down Expand Up @@ -87,7 +87,7 @@ bool AConnector::PrepareKeysInRedis(){

return ret;
}

bool AConnector::REDISAddEntry(const std::string &entry, const std::string &list_name) {
DARWIN_LOGGER;
DARWIN_LOG_DEBUG("AConnector::REDISAddEntry:: Add data in Redis...");
Expand Down Expand Up @@ -143,6 +143,21 @@ long long int AConnector::REDISListLen(const std::string &list_name) noexcept {
return result;
}

bool AConnector::REDISSetExpiry(const std::string &key, unsigned int expiry) {
DARWIN_LOGGER;
DARWIN_LOG_DEBUG("AConnector::REDISSetExpiry:: reseting expiration for key " + key);

long long int result = 0;

darwin::toolkit::RedisManager& redis = darwin::toolkit::RedisManager::GetInstance();

if(redis.Query(std::vector<std::string>{"EXPIRE", key, std::to_string(expiry)}, result, true) != REDIS_REPLY_INTEGER) {
DARWIN_LOG_ERROR("AConnector::REDISSetExpiry:: not the expected Redis response");
return false;
}
return result == 1;
}

bool AConnector::REDISPopLogs(long long int len, std::vector<std::string> &logs, const std::string &list_name) noexcept {
DARWIN_LOGGER;
DARWIN_LOG_DEBUG("AConnector::REDISPopLogs:: Querying Redis for logs...");
Expand Down Expand Up @@ -247,7 +262,7 @@ bool AConnector::SendToFilter(std::vector<std::string> &logs) {
packet->certitude_size = certitude_size;
packet->filter_code = GetFilterCode();
packet->body_size = data.size();

std::vector<char> uuid = darwin::uuid::GenUuid();
memcpy(packet->evt_id, uuid.data(), 16);
DARWIN_LOG_DEBUG("AConnector::SendToFilter:: Sending header + data");
Expand All @@ -267,11 +282,11 @@ long AConnector::GetFilterCode() noexcept {
return DARWIN_FILTER_BUFFER;
}

std::string AConnector::GetSource() {
std::string AConnector::GetSource(std::map<std::string, std::string> &input_line) {
DARWIN_LOGGER;
if (this->_input_line.find("source") == this->_input_line.end()) {
if (input_line.find("source") == input_line.end()) {
DARWIN_LOG_ERROR("AConnector::GetSource:: 'source' is missing in the input line. Output ignored.");
return std::string();
}
return this->_input_line["source"];
return input_line["source"];
}
43 changes: 26 additions & 17 deletions samples/fbuffer/Connectors/AConnector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class AConnector {
///
///\class AConnector

public:
public:
///\brief Unique constructor. It contains all stuff needed to ensure REDIS and output Filter communication
///
///\param io_context The boost::asio::io_context used by the Server. Needed for communication with output Filter.
Expand All @@ -45,7 +45,6 @@ class AConnector {
///\brief Virtual default constructor
virtual ~AConnector() = default;

public:
///\brief Get the interval set in the connector.
///
///\return this->_interval
Expand Down Expand Up @@ -79,9 +78,17 @@ class AConnector {
///\return true on success, false otherwise.
virtual bool REDISAddEntry(const std::string &entry, const std::string &list_name);

///\brief (Re)set the expiration on a Redis key
///
/// \param key The name of the key to set the expiry on
/// \param expiry The value to set as expiration for the key
///
/// \return true if expiry was set successfuly, false if the key did not exist or the command failed
bool REDISSetExpiry(const std::string &key, unsigned int expiry);

///\brief Get the logs from the Redis List
///
/// \param len THe number of elements to pick up in the list
/// \param len The number of elements to pick up in the list
/// \param logs the vector used to store our logs
///
/// \return true on success, false otherwise.
Expand All @@ -99,12 +106,14 @@ class AConnector {
///\return true on success, false otherwise.
virtual bool REDISReinsertLogs(std::vector<std::string> &logs, const std::string &list_name);

///\brief This function extracts from _input_line some data, format it and add it to _entry.
///\brief This function extracts from input_line some data, format it and add it to entry.
///
///\param fieldname The name of the data to retrieve
///\param input_line The map of each entry name with its value
///\param fieldname The name of the entry to retrieve
///\param entry The variable to store the resulting value to
///
///\return true on success, false otherwise.
bool ParseData(std::string fieldname);
bool ParseData(std::map<std::string, std::string> &input_line, std::string fieldname, std::string &entry);

///\brief Sets the connection with the output Filter and sends logs to it.
///
Expand All @@ -113,7 +122,11 @@ class AConnector {
///\return true on success, false otherwise.
bool SendToFilter(std::vector<std::string> &logs);

public: // Functions that needs to be implemented by children

// ##########################################################
// ###Β Functions that needs to be implemented by children ###
// ##########################################################

///\brief This function sends data to the REDIS storage. It must be overrode as each filter doesn't need the same data.
///
/// It should fill _entry with the datas to send as REDISAddEntry is picking from it.
Expand All @@ -123,13 +136,13 @@ class AConnector {
///\return true on success, false otherwise.
virtual bool ParseInputForRedis(std::map<std::string, std::string> &input_line) = 0;

private:
private:
///\brief Get the Buffer filter code
///
///\return the Buffer filter code
long GetFilterCode() noexcept;

protected:
protected:
///\brief this virtual function "jsonifies" the vector of strings into a single string.
/// By default it performs no other actions on the data but CAN be overrode if needed. (e.g fAnomalyConnector)
///
Expand All @@ -139,10 +152,12 @@ class AConnector {
///\return True on success (formatting successful), False otherwise.
virtual bool FormatDataToSendToFilter(std::vector<std::string> &logs, std::string &formatted);

///\brief Extracts the source of this->_input_line input
///\brief Extracts the "source" entry from input_line
///
///\param input_line The map containing the entries' name and value
///
///\return The source of the current input line
std::string GetSource();
std::string GetSource(std::map<std::string, std::string> &input_line);

// Used to link with the correct task
darwin::outputType _filter_type;
Expand All @@ -162,12 +177,6 @@ class AConnector {
// The different REDIS lists used to store data depending to source before sending to filter
std::vector<std::pair<std::string, std::string>> _redis_lists;

// Temporarily used to between formating and adding data to REDIS
std::string _entry;

// Temporarily used to store an input in a form that the connectors can pick what they need
std::map<std::string, std::string> _input_line;

// The number of log lines in REDIS needed to send to the output Filter
unsigned int _required_log_lines;
};
32 changes: 16 additions & 16 deletions samples/fbuffer/Connectors/SumConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@ SumConnector::SumConnector(boost::asio::io_context &context, std::string &filter


bool SumConnector::ParseInputForRedis(std::map<std::string, std::string> &input_line) {
this->_input_line = input_line;
this->_entry.clear();
std::string entry;

std::string source = this->GetSource();
std::string source = this->GetSource(input_line);

if (not this->ParseData("decimal"))
if (not this->ParseData(input_line, "decimal", entry))
return false;

for (const auto &redis_config : this->_redis_lists) {
// If the source in the input is equal to the source in the redis list, or the redis list's source is empty
if (not redis_config.first.compare(source) or redis_config.first.empty())
this->REDISAddEntry(this->_entry, redis_config.second);
this->REDISAddEntry(entry, redis_config.second);
}
return true;
}
Expand Down Expand Up @@ -115,20 +114,21 @@ bool SumConnector::REDISPopLogs(long long int len __attribute__((unused)), std::

redis_reply = redis.Query(std::vector<std::string>{"GETSET", sum_name, "0"}, result, true);
if (redis_reply == REDIS_REPLY_NIL) {
DARWIN_LOG_INFO("SumConnector:: REDISPopLogs:: key '" + sum_name + "' does not exist (yet?)");
return false;
DARWIN_LOG_DEBUG("SumConnector:: REDISPopLogs:: key '" + sum_name + "' did not exist");
result_string = "0";
}
else if(redis_reply != REDIS_REPLY_STRING) {
DARWIN_LOG_ERROR("SumConnector::REDISPopLogs:: Not the expected Redis response");
return false;
}

try {
result_string = std::any_cast<std::string>(result);
}
catch (const std::bad_any_cast&) {
DARWIN_LOG_ERROR("SumConnector:REDISPopLogs:: Impossible to cast redis response into a string.");
return false;
else {
try {
result_string = std::any_cast<std::string>(result);
}
catch (const std::bad_any_cast&) {
DARWIN_LOG_ERROR("SumConnector:REDISPopLogs:: Impossible to cast redis response into a string.");
return false;
}
}

DARWIN_LOG_DEBUG("SumConnector::REDISPopLogs:: Got '" + result_string + "' from Redis");
Expand All @@ -151,7 +151,7 @@ long long int SumConnector::REDISListLen(const std::string &sum_name) noexcept {

redis_reply = redis.Query(std::vector<std::string>{"GET", sum_name}, result_string, true);
if (redis_reply == REDIS_REPLY_NIL) {
DARWIN_LOG_INFO("SumConnector::REDISListLen:: key '" + sum_name + "' does not exist (yet?)");
DARWIN_LOG_DEBUG("SumConnector::REDISListLen:: key '" + sum_name + "' does not exist (yet?)");
}
else if (redis_reply == REDIS_REPLY_STRING) {
result = strtold(result_string.c_str(), NULL);
Expand All @@ -163,7 +163,7 @@ long long int SumConnector::REDISListLen(const std::string &sum_name) noexcept {
}
else {
DARWIN_LOG_ERROR("SumConnector::REDISListLen:: Error while querying key '" + sum_name + "'");
result = 0.0L;
return -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This modification seems to be done to resolve an issue that I don't know nor can identify the issue, I can't review it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the previous return didn't specify an error in the process, now it does (and is checked in BufferThread.cpp:34)

}

// Return an absolute rounded value for the double, cap with the maximum value of a long long int
Expand Down
15 changes: 7 additions & 8 deletions samples/fbuffer/Connectors/fAnomalyConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,23 @@ bool fAnomalyConnector::FormatDataToSendToFilter(std::vector<std::string> &logs,
}

bool fAnomalyConnector::ParseInputForRedis(std::map<std::string, std::string> &input_line) {
this->_input_line = input_line;
this->_entry.clear();
std::string entry;

std::string source = this->GetSource();
std::string source = this->GetSource(input_line);

if (not this->ParseData("net_src_ip"))
if (not this->ParseData(input_line, "net_src_ip", entry))
return false;
if (not this->ParseData("net_dst_ip"))
if (not this->ParseData(input_line, "net_dst_ip", entry))
return false;
if (not this->ParseData("net_dst_port"))
if (not this->ParseData(input_line, "net_dst_port", entry))
return false;
if (not this->ParseData("ip_proto"))
if (not this->ParseData(input_line, "ip_proto", entry))
return false;

for (const auto &redis_config : this->_redis_lists) {
// If the source in the input is equal to the source in the redis list, or the redis list's source is ""
if (not redis_config.first.compare(source) or redis_config.first.empty())
this->REDISAddEntry(this->_entry, redis_config.second);
this->REDISAddEntry(entry, redis_config.second);
}
return true;
}
Expand Down
17 changes: 8 additions & 9 deletions samples/fbuffer/Connectors/fSofaConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,25 @@ fSofaConnector::fSofaConnector(boost::asio::io_context &context, std::string &fi
AConnector(context, darwin::SOFA, filter_socket_path, interval, redis_lists, minLogLen) {}

bool fSofaConnector::ParseInputForRedis(std::map<std::string, std::string> &input_line) {
this->_input_line = input_line;
this->_entry.clear();
std::string entry;

std::string source = this->GetSource();
std::string source = this->GetSource(input_line);

if (not this->ParseData("ip"))
if (not this->ParseData(input_line, "ip", entry))
return false;
if (not this->ParseData("hostname"))
if (not this->ParseData(input_line, "hostname", entry))
return false;
if (not this->ParseData("os"))
if (not this->ParseData(input_line, "os", entry))
return false;
if (not this->ParseData("proto"))
if (not this->ParseData(input_line, "proto", entry))
return false;
if (not this->ParseData("port"))
if (not this->ParseData(input_line, "port", entry))
return false;

for (const auto &redis_config : this->_redis_lists) {
// If the source in the input is equal to the source in the redis list, or the redis list's source is ""
if (not redis_config.first.compare(source) or redis_config.first.empty())
this->REDISAddEntry(this->_entry, redis_config.second);
this->REDISAddEntry(entry, redis_config.second);
}
return true;
}
16 changes: 9 additions & 7 deletions toolkit/AThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include "AThread.hpp"
#include "Logger.hpp"

AThread::AThread(int interval) :
AThread::AThread(int interval) :
_interval(interval),
_thread(),
_is_stop(false) {}
Expand All @@ -21,14 +21,16 @@ void AThread::ThreadMain() {
std::mutex mtx;
std::unique_lock<std::mutex> lck(mtx);

while (!(this->_is_stop)) {
if (!this->Main()) {
DARWIN_LOG_DEBUG("AThread::ThreadMain:: Error in main function, stopping the thread");
_is_stop = true;
break;
}
while (not this->_is_stop) {
// Wait for notification or until timeout
this->_cv.wait_for(lck, std::chrono::seconds(_interval));
if (not _is_stop) {
if (not this->Main()) {
DARWIN_LOG_DEBUG("AThread::ThreadMain:: Error in main function, stopping the thread");
_is_stop = true;
break;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the logic is mostly hte same, I don't understand the need for a change in this method

Copy link
Member Author

@frikilax frikilax Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the change is in the position of the this->_cv.wait_for()
While we immediately started the task before, now it waits one iteration of _interval before launching it

}
}

Expand Down
Loading