Skip to content

Commit

Permalink
Fix high cpu load due to recurring prune job (#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
BjoernAtBosch authored Apr 4, 2024
1 parent aef778d commit 78a86f8
Show file tree
Hide file tree
Showing 14 changed files with 149 additions and 48 deletions.
4 changes: 2 additions & 2 deletions NOTICE-3RD-PARTY-CONTENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
|distlib|0.3.8|Python Software Foundation License|
|distro|1.8.0|Apache 2.0|
|fasteners|0.19|Apache 2.0|
|filelock|3.13.1|The Unlicense (Unlicense)|
|filelock|3.13.3|The Unlicense (Unlicense)|
|gcovr|5.2|BSD|
|identify|2.5.35|MIT|
|idna|3.6|BSD|
|Jinja2|3.1.3|New BSD|
|lxml|5.1.0|New BSD|
|lxml|5.2.1|New BSD|
|MarkupSafe|2.1.5|New BSD|
|node-semver|0.6.1|MIT|
|nodeenv|1.8.0|BSD|
Expand Down
4 changes: 2 additions & 2 deletions examples/seat-adjuster/AppManifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"required": [
{
"path": "Vehicle.Cabin.Seat.Row1.Pos1.Position",
"access": "read"
"access": "write"
},
{
"path": "Vehicle.Speed",
Expand All @@ -25,7 +25,7 @@
"config": {
"reads": [ "seatadjuster/setPosition/request" ],
"writes": [
"seatadjuster/setPosition/response",
"seatadjuster/setPosition/response",
"seatadjuster/currentPosition"
]
}
Expand Down
14 changes: 12 additions & 2 deletions examples/seat-adjuster/src/SeatAdjusterApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "sdk/QueryBuilder.h"
#include "sdk/vdb/IVehicleDataBrokerClient.h"

#include <csignal>
#include <fmt/core.h>
#include <nlohmann/json.hpp>

Expand Down Expand Up @@ -126,8 +127,17 @@ void SeatAdjusterApp::onErrorTopic(const velocitas::Status& status) {
}
} // namespace example

std::unique_ptr<example::SeatAdjusterApp> myApp;

void signal_handler(int sig) {
velocitas::logger().info("App terminating signal received: {}", sig);
myApp->stop();
}

int main(int argc, char** argv) {
example::SeatAdjusterApp myApp;
myApp.run();
signal(SIGINT, signal_handler);

myApp = std::make_unique<example::SeatAdjusterApp>();
myApp->run();
return 0;
}
14 changes: 12 additions & 2 deletions examples/set-data-points/src/SetDataPointsApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "sdk/vdb/IVehicleDataBrokerClient.h"
#include "vehicle_model/Vehicle.h"

#include <csignal>
#include <fmt/core.h>
#include <nlohmann/json.hpp>

Expand Down Expand Up @@ -77,8 +78,17 @@ class SetDataPointsApp : public velocitas::VehicleApp {

} // namespace example

std::unique_ptr<example::SetDataPointsApp> myApp;

void signal_handler(int sig) {
velocitas::logger().info("App terminating signal received: {}", sig);
myApp->stop();
}

int main(int argc, char** argv) {
example::SetDataPointsApp myApp;
myApp.run();
signal(SIGINT, signal_handler);

myApp = std::make_unique<example::SetDataPointsApp>();
myApp->run();
return 0;
}
4 changes: 2 additions & 2 deletions sdk/include/sdk/Model.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class Service : public Node {
public:
using Node::Node;

std::string getLocation() const;
Middleware::Metadata getMiddlewareMetadata() const;
[[nodiscard]] std::string getLocation() const;
[[nodiscard]] Middleware::Metadata getMiddlewareMetadata() const;
};

} // namespace velocitas
Expand Down
7 changes: 6 additions & 1 deletion sdk/include/sdk/VehicleApp.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include "sdk/AsyncResult.h"
#include "sdk/DataPointReply.h"

#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <string>

namespace velocitas {
Expand Down Expand Up @@ -110,7 +112,7 @@ class VehicleApp {
* the data point value of the requested data point.
*/
template <typename TDataPoint>
AsyncResultPtr_t<typename TDataPoint::value_type>
[[nodiscard]] AsyncResultPtr_t<typename TDataPoint::value_type>
getDataPoint(const TDataPoint& dataPoint) const {
return getDataPoint_internal(dataPoint)->template map<typename TDataPoint::value_type>(
[&dataPoint](const DataPointReply& dataPointValues) {
Expand Down Expand Up @@ -146,6 +148,9 @@ class VehicleApp {

std::shared_ptr<IVehicleDataBrokerClient> m_vdbClient;
std::shared_ptr<IPubSubClient> m_pubSubClient;
bool m_isRunning{false};
std::mutex m_stopWaitMutex;
std::condition_variable m_stopWaitCV;
};

} // namespace velocitas
Expand Down
11 changes: 5 additions & 6 deletions sdk/include/sdk/grpc/GrpcClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,24 @@

namespace velocitas {

class RecurringJob;
class GrpcCall;

class GrpcClient {
public:
GrpcClient();
virtual ~GrpcClient();
GrpcClient() = default;
virtual ~GrpcClient() = default;

GrpcClient(const GrpcClient&) = delete;
GrpcClient(GrpcClient&&) = delete;
GrpcClient& operator=(const GrpcClient&) = delete;
GrpcClient& operator=(GrpcClient&&) = delete;

void addActiveCall(std::shared_ptr<GrpcCall> call);
void addActiveCall(std::shared_ptr<GrpcCall> call);
[[nodiscard]] size_t getNumActiveCalls() const { return m_activeCalls.size(); }

private:
void pruneCompletedRequests();

private:
std::shared_ptr<RecurringJob> m_recurringJob;
std::vector<std::shared_ptr<GrpcCall>> m_activeCalls;
std::mutex m_mutex;
};
Expand Down
26 changes: 17 additions & 9 deletions sdk/src/sdk/VehicleApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@
#include <fmt/core.h>
#include <nlohmann/json.hpp>

#include <chrono>
#include <string>
#include <thread>
#include <utility>

namespace velocitas {

Expand All @@ -40,26 +37,37 @@ VehicleApp::VehicleApp(std::shared_ptr<IVehicleDataBrokerClient> vdbClient,
}

void VehicleApp::run() {
logger().info("Running App...");
logger().info("Starting app ...");
Middleware::getInstance().start();
Middleware::getInstance().waitUntilReady();

m_pubSubClient->connect();
onStart();
{
std::unique_lock lk(m_stopWaitMutex);
m_isRunning = true;
}
logger().info("App is running.");

// TODO: Fix busy waiting
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::unique_lock lk(m_stopWaitMutex);
m_stopWaitCV.wait(lk, [this] { return !m_isRunning; });
}
logger().info("App stopped.");
}

void VehicleApp::stop() {
logger().info("Stopping App...");
logger().info("Stopping app ...");

onStop();
m_pubSubClient->disconnect();

Middleware::getInstance().stop();

{
std::unique_lock lk(m_stopWaitMutex);
m_isRunning = false;
m_stopWaitCV.notify_all();
}
}

AsyncSubscriptionPtr_t<std::string> VehicleApp::subscribeToTopic(const std::string& topic) {
Expand Down
9 changes: 5 additions & 4 deletions sdk/src/sdk/grpc/BrokerAsyncGrpcFacade.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
namespace velocitas {

BrokerAsyncGrpcFacade::BrokerAsyncGrpcFacade(std::shared_ptr<grpc::Channel> channel)
: m_channel{std::move(channel)} {
m_stub = std::make_shared<sdv::databroker::v1::Broker::Stub>(m_channel);
}
: m_stub{sdv::databroker::v1::Broker::NewStub(channel)} {}

void BrokerAsyncGrpcFacade::GetDatapoints(
const std::vector<std::string>& datapoints,
Expand All @@ -51,6 +49,7 @@ void BrokerAsyncGrpcFacade::GetDatapoints(
} catch (std::exception& e) {
logger().error("GRPC: Exception occurred during \"GetDatapoints\": {}", e.what());
}
callData->m_isComplete = true;
};

addActiveCall(callData);
Expand Down Expand Up @@ -83,6 +82,7 @@ void BrokerAsyncGrpcFacade::SetDatapoints(
} catch (std::exception& e) {
logger().error("GRPC: Exception occurred during \"SetDatapoints\": {}", e.what());
}
callData->m_isComplete = true;
};

addActiveCall(callData);
Expand Down Expand Up @@ -110,10 +110,11 @@ void BrokerAsyncGrpcFacade::Subscribe(

callData->onData(itemHandler);

callData->onFinish([errorHandler](const auto& status) {
callData->onFinish([callData, errorHandler](const auto& status) {
if (!status.ok()) {
errorHandler(status);
}
callData->m_isComplete = true;
});

callData->startCall();
Expand Down
10 changes: 7 additions & 3 deletions sdk/src/sdk/grpc/BrokerAsyncGrpcFacade.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@

#include "sdv/databroker/v1/broker.grpc.pb.h"

#include <functional>
#include <map>
#include <memory>
#include <string>
#include <vector>

namespace grpc {
class Channel;
}
class Status;
} // namespace grpc

namespace velocitas {

Expand All @@ -50,8 +55,7 @@ class BrokerAsyncGrpcFacade : public AsyncGrpcFacade, GrpcClient {
std::function<void(const grpc::Status& status)> errorHandler);

private:
std::shared_ptr<sdv::databroker::v1::Broker::Stub> m_stub;
std::shared_ptr<grpc::Channel> m_channel;
std::unique_ptr<sdv::databroker::v1::Broker::StubInterface> m_stub;
};

} // namespace velocitas
Expand Down
20 changes: 6 additions & 14 deletions sdk/src/sdk/grpc/GrpcClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@
*/

#include "sdk/grpc/GrpcClient.h"
#include "sdk/ThreadPool.h"
#include "sdk/grpc/GrpcCall.h"

namespace velocitas {

GrpcClient::GrpcClient() {
m_recurringJob = std::make_shared<RecurringJob>([this]() { pruneCompletedRequests(); });
ThreadPool::getInstance()->enqueue(m_recurringJob);
}

GrpcClient::~GrpcClient() {
m_recurringJob->cancel();
m_recurringJob->waitForTermination();
void GrpcClient::addActiveCall(std::shared_ptr<GrpcCall> call) {
pruneCompletedRequests();
{
std::scoped_lock<std::mutex> lock(m_mutex);
m_activeCalls.emplace_back(call);
}
}

void GrpcClient::pruneCompletedRequests() {
Expand All @@ -40,9 +37,4 @@ void GrpcClient::pruneCompletedRequests() {
}
}

void GrpcClient::addActiveCall(std::shared_ptr<GrpcCall> call) {
std::scoped_lock<std::mutex> lock(m_mutex);
m_activeCalls.emplace_back(call);
}

} // namespace velocitas
3 changes: 2 additions & 1 deletion sdk/tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ add_executable(${TARGET_NAME}
ScopedBoolInverter_tests.cpp
ThreadPool_tests.cpp
Utils_tests.cpp
VehicleDataBrokerClient_tests.cpp
QueryBuilder_tests.cpp
#PubSub_tests.cpp
TestBaseUsingEnvVars.cpp
grpc/GrpcClient_tests.cpp
grpc/VehicleDataBrokerClient_tests.cpp
)

add_dependencies(${TARGET_NAME}
Expand Down
Loading

0 comments on commit 78a86f8

Please sign in to comment.