Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update threading to allow multiple vehicles. #8

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/link/gz_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ void GzClient::udp_send(const synapse_pb::Frame& frame) const
return;
}
if (udp_link_ != nullptr) {
udp_link_.get()->write((const uint8_t*)stream.str().c_str(), stream.str().length());
udp_link_->write((const uint8_t*)stream.str().c_str(), stream.str().length());
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/link/gz_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class GzClient : public gz::transport::Node {
std::string topic_sub_logical_camera_;

public:
std::shared_ptr<UDPLink> udp_link_ { NULL };
UDPLink* udp_link_;
gz::transport::Node::Publisher pub_actuators_;
gz::transport::Node::Publisher pub_lighting_config_;
gz::transport::Node::Publisher pub_material_color_;
Expand Down
6 changes: 4 additions & 2 deletions src/link/udp_link.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ using boost::asio::ip::udp;
using std::placeholders::_1;
using std::placeholders::_2;

UDPLink::UDPLink(std::string host, int port)
UDPLink::UDPLink(std::string host, int port, GzClient* gz)
{
gz_ = gz;
gz_->udp_link_ = this;
remote_endpoint_ = *udp::resolver(io_context_).resolve(udp::resolver::query(host, std::to_string(port)));
my_endpoint_ = udp::endpoint(udp::v4(), GZ_PORT);

Expand Down Expand Up @@ -70,7 +72,7 @@ void UDPLink::rx_handler(const boost::system::error_code& ec, std::size_t bytes_
} else if (frame.msg_case() == synapse_pb::Frame::kLedArray) {
gz_->publish_led_array(frame.led_array());
} else {
std::cerr << "unhandled message: " << frame.msg_case() << std::endl;
std::cerr << "unhandled message: " << frame.msg_case() << std::endl;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/link/udp_link.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ class UDPLink {
};
boost::asio::ip::udp::endpoint remote_endpoint_;
boost::asio::ip::udp::endpoint my_endpoint_;
GzClient* gz_;

public:
std::shared_ptr<GzClient> gz_ {};
UDPLink(std::string host, int port);
UDPLink(std::string host, int port, GzClient* gz);
void run_for(std::chrono::seconds sec);
void write(const uint8_t* buf, uint32_t len);

Expand Down
84 changes: 34 additions & 50 deletions src/synapse_gz.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,67 +6,51 @@
#include <memory>
#include <rclcpp/parameter_value.hpp>

std::atomic<bool> g_stop { false };
std::shared_ptr<UDPLink> g_udp_link = NULL;
std::shared_ptr<GzClient> g_gz_client = NULL;
using namespace std::chrono_literals;

void signal_handler(int signum)
{
(void)signum;
g_stop = true;
}
class SynapseGz : public rclcpp::Node {
public:
SynapseGz();
virtual ~SynapseGz();

void udp_link_entry_point()
{
while (not g_stop) {
g_udp_link->run_for(std::chrono::seconds(1));
}
}
private:
void udp_run();
std::shared_ptr<std::thread> udp_link_thread_;
std::shared_ptr<UDPLink> udp_link_ = NULL;
std::shared_ptr<GzClient> gz_client_ = NULL;
};

void gz_entry_point()
void SynapseGz::udp_run()
{
while (not g_stop) {
std::this_thread::sleep_for(std::chrono::seconds(1));
while (rclcpp::ok()) {
udp_link_->run_for(1s);
}
}

class SynapseGz : public rclcpp::Node {
public:
SynapseGz()
: Node("synapze_gz")
{
this->declare_parameter("host", "127.0.0.1");
this->declare_parameter("port", 4243);
this->declare_parameter("vehicle", "b3rb");

std::string host = this->get_parameter("host").as_string();
int port = this->get_parameter("port").as_int();
std::string vehicle = this->get_parameter("vehicle").as_string();
SynapseGz::SynapseGz()
: Node("synapze_gz")
{
this->declare_parameter("host", "127.0.0.1");
this->declare_parameter("port", 4243);
this->declare_parameter("vehicle", "b3rb");

// create udp link
g_udp_link = std::make_shared<UDPLink>(host, port);
std::string host = this->get_parameter("host").as_string();
int port = this->get_parameter("port").as_int();
std::string vehicle = this->get_parameter("vehicle").as_string();

// create gz client
g_gz_client = std::make_shared<GzClient>(vehicle);
g_udp_link.get()->gz_ = g_gz_client;
g_gz_client->udp_link_ = g_udp_link;
gz_client_ = std::make_shared<GzClient>(vehicle);
udp_link_ = std::make_shared<UDPLink>(host, port, gz_client_.get());

// start threads
udp_link_thread_ = std::make_shared<std::thread>(udp_link_entry_point);
gz_thread_ = std::make_shared<std::thread>(gz_entry_point);
}
virtual ~SynapseGz()
{
// join threads
g_stop = true;
udp_link_thread_->join();
gz_thread_->join();
}
// start threads
udp_link_thread_ = std::make_shared<std::thread>(std::bind(
&SynapseGz::udp_run, this));
}

private:
std::shared_ptr<std::thread> udp_link_thread_;
std::shared_ptr<std::thread> gz_thread_;
};
SynapseGz::~SynapseGz()
{
// join threads
udp_link_thread_->join();
}

int main(int argc, char** argv)
{
Expand Down