Skip to content

Commit

Permalink
Added MessageBus to Player and Recorder
Browse files Browse the repository at this point in the history
  • Loading branch information
gvinals authored and gitsgh committed Sep 25, 2024
1 parent 682fc83 commit a5ae1db
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 0 deletions.
2 changes: 2 additions & 0 deletions common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ list(APPEND common_SOURCES
include/malloc_allocator.h
include/MemorySniffer.h
include/message_pump.h
include/messageBus.h
include/performance.h
include/platform.h
include/pragmas.h
Expand Down Expand Up @@ -73,6 +74,7 @@ list(APPEND common_SOURCES
log.cpp
MemorySniffer.cpp
message_pump.cpp
messageBus.cpp
performance.cpp
resource_manager.cpp
runner.cpp
Expand Down
8 changes: 8 additions & 0 deletions common/include/gits.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "tools_lite.h"
#include "pragmas.h"
#include "apis_iface.h"
#include "messageBus.h"

#include <atomic>
#include <functional>
#include <memory>
Expand Down Expand Up @@ -182,6 +184,8 @@ class CGits : private gits::noncopyable {

std::unordered_map<void*, uint64_t> _ptrToOrderedId;

MessageBus _messageBus;

CGits();
CGits(uint16_t v0, uint16_t v1, uint16_t v2, uint16_t v3);
~CGits();
Expand Down Expand Up @@ -426,6 +430,10 @@ class CGits : private gits::noncopyable {
bool traceGLAPIBypass;
ApisIface apis;

MessageBus& GetMessageBus() {
return _messageBus;
}

friend std::ostream& operator<<(std::ostream& stream, const CGits& g);
friend CBinOStream& operator<<(CBinOStream& stream, const CGits& g);
friend CBinIStream& operator>>(CBinIStream& stream, CGits& g);
Expand Down
99 changes: 99 additions & 0 deletions common/include/messageBus.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// ===================== begin_copyright_notice ============================
//
// Copyright (C) 2023-2024 Intel Corporation
//
// SPDX-License-Identifier: MIT
//
// ===================== end_copyright_notice ==============================

#pragma once

#include "tools_lite.h"
#include "log.h"

#include <string>
#include <vector>
#include <unordered_map>
#include <functional>
#include <memory>

namespace gits {
enum PublisherId { PUBLISHER_PLAYER = 0, PUBLISHER_RECORDER, PUBLISHER_PLUGIN };
enum TopicId { TOPIC_NONE = 0, TOPIC_LOG };
struct Topic {
PublisherId publisherId{};
TopicId topicId{};

bool operator==(const Topic& other) const {
return publisherId == other.publisherId && topicId == other.topicId;
}
};
} // namespace gits

namespace std {
template <>
struct hash<gits::Topic> {
std::size_t operator()(const gits::Topic& topic) const {
std::size_t h1 = std::hash<gits::PublisherId>{}(topic.publisherId);
std::size_t h2 = std::hash<unsigned>{}(topic.topicId);
return h1 ^ (h2 << 1);
}
};
} // namespace std

namespace gits {
class Message;
using MessagePtr = std::shared_ptr<Message>;
using SubscriberCb = std::function<void(Topic, const MessagePtr&)>;
using Subscription = std::pair<unsigned, SubscriberCb>;

class MessageBus : gits::noncopyable {
public:
MessageBus() = default;
~MessageBus() = default;

unsigned subscribe(Topic topic, SubscriberCb callback);
void unsubscribe(Topic topic, unsigned id);
void publish(Topic topic, const MessagePtr& message);

private:
unsigned currentSubscriptionId_ = 0;
std::unordered_map<Topic, std::vector<Subscription>> subscribers_;
};

class Message : gits::noncopyable {
public:
Message() = default;
virtual ~Message() = default;
};

class LogMessage : public Message {
public:
LogMessage(LogLevel level, const std::string& text) : level_(level), text_(text) {}
LogMessage(LogLevel level, const std::ostringstream& os) : level_(level), text_(os.str()) {}

// Variadic template constructor for formatted messages
template <typename... Args>
LogMessage(LogLevel level, Args&&... args)
: level_(level), text_(fold(std::forward<Args>(args)...)) {}

LogLevel getLevel() const {
return level_;
}
const std::string& getText() const {
return text_;
}

private:
LogLevel level_{};
std::string text_{};

template <typename... Args>
static std::string fold(Args&&... args) {
std::ostringstream stream;
(stream << ... << args); // Fold expression (C++17)
return stream.str();
}
};

} // namespace gits
34 changes: 34 additions & 0 deletions common/messageBus.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// ===================== begin_copyright_notice ============================
//
// Copyright (C) 2023-2024 Intel Corporation
//
// SPDX-License-Identifier: MIT
//
// ===================== end_copyright_notice ==============================

#include "include/messageBus.h"

namespace gits {
unsigned MessageBus::subscribe(Topic topic, SubscriberCb callback) {
unsigned id = currentSubscriptionId_++;
subscribers_[topic].emplace_back(id, callback);
return id;
}

void MessageBus::unsubscribe(Topic topic, unsigned id) {
auto isEqual = [id](const auto& subscription) { return subscription.first == id; };
auto& subscriptions = subscribers_[topic];
subscriptions.erase(std::remove_if(subscriptions.begin(), subscriptions.end(), isEqual),
subscriptions.end());
}

void MessageBus::publish(Topic topic, const MessagePtr& message) {
auto it = subscribers_.find(topic);
if (it != subscribers_.end()) {
for (const auto& subscription : it->second) {
subscription.second(topic, message);
}
}
}

} // namespace gits
7 changes: 7 additions & 0 deletions common/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ gits::CRecorder::CRecorder()
Log(INFO) << " GITS Recorder (" << inst.Version() << ")";
Log(INFO) << "-----------------------------------------------------";

inst.GetMessageBus().subscribe({PUBLISHER_PLUGIN, TOPIC_LOG}, [](Topic t, const MessagePtr& m) {
auto msg = std::dynamic_pointer_cast<LogMessage>(m);
if (msg) {
CLog(msg->getLevel(), NORMAL) << msg->getText();
}
});

#ifdef GITS_PLATFORM_WINDOWS
// handling signals
SignalsHandler();
Expand Down
7 changes: 7 additions & 0 deletions player/playerMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ int MainBody(int argc, char* argv[]) {
CGits& inst = CGits::Instance();
Log(INFO, NO_PREFIX) << inst << "\n";

inst.GetMessageBus().subscribe({PUBLISHER_PLUGIN, TOPIC_LOG}, [](Topic t, const MessagePtr& m) {
auto msg = std::dynamic_pointer_cast<LogMessage>(m);
if (msg) {
CLog(msg->getLevel(), NORMAL) << msg->getText();
}
});

if (!cfg.common.player.outputTracePath.empty()) {
CLog::LogFilePlayer(cfg.common.player.outputTracePath);
}
Expand Down

0 comments on commit a5ae1db

Please sign in to comment.