handle messages concurrently

This commit is contained in:
2022-04-26 20:30:23 +02:00
parent 06b00843d2
commit 14d0716607
3 changed files with 48 additions and 20 deletions

View File

@@ -96,6 +96,7 @@ class draw_rectangle : public generic_message<Type::DRAW_RECTANGLE> {
virtual void handle_message() override { virtual void handle_message() override {
spdlog::debug("draw_rectangle::handle_message()"); spdlog::debug("draw_rectangle::handle_message()");
spdlog::debug("Position: X = {}, Y = {}", position.x, position.y); spdlog::debug("Position: X = {}, Y = {}", position.x, position.y);
spdlog::debug("handle_message sleeps for 1 sec to test concurrency");
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
} }
@@ -113,6 +114,7 @@ class draw_pixel : public generic_message<Type::DRAW_PIXEL> {
virtual void handle_message() override { virtual void handle_message() override {
spdlog::debug("draw_pixel::handle_message()"); spdlog::debug("draw_pixel::handle_message()");
spdlog::debug("Color: ({}, {}, {})", color.x, color.y, color.z); spdlog::debug("Color: ({}, {}, {})", color.x, color.y, color.z);
spdlog::debug("handle_message sleeps for 1 sec to test concurrency");
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
} }

View File

@@ -2,6 +2,7 @@
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <cstdlib> #include <cstdlib>
#include <future>
#include <iostream> #include <iostream>
#include "protocol.hpp" #include "protocol.hpp"
@@ -12,6 +13,15 @@ namespace commons {
using boost::asio::ip::udp; using boost::asio::ip::udp;
using namespace protocol; using namespace protocol;
class fake_thread_pool {
public:
template <typename Task>
static void add_task(Task&& task) {
std::thread t{std::forward<Task&&>(task)};
t.detach();
}
};
class server { class server {
public: public:
server(boost::asio::io_context& io_context, short port) server(boost::asio::io_context& io_context, short port)
@@ -25,14 +35,20 @@ class server {
boost::asio::buffer(data_.data(), max_length), sender_endpoint_, boost::asio::buffer(data_.data(), max_length), sender_endpoint_,
[this](boost::system::error_code ec, std::size_t bytes_recvd) { [this](boost::system::error_code ec, std::size_t bytes_recvd) {
if (!ec && bytes_recvd > 0) { if (!ec && bytes_recvd > 0) {
auto msg_obj = deserialize(data_.data(), bytes_recvd); std::shared_ptr<protocol::generic_message_base> msg_obj =
msg_obj->handle_message(); deserialize(data_.data(), bytes_recvd);
do_send(bytes_recvd); fake_thread_pool::add_task([msg_obj = std::move(msg_obj)]() {
msg_obj->handle_message();
});
// do_send(bytes_recvd);
spdlog::debug("leaving do receive"); spdlog::debug("leaving do receive");
} else { } else {
do_receive(); spdlog::error("error during do receive");
} }
do_receive();
}); });
} }

View File

@@ -11,11 +11,10 @@ void init_spdlog() {
spdlog::debug("Setting Loglevel to debug..."); spdlog::debug("Setting Loglevel to debug...");
} }
void run_client() { auto run_client(boost::asio::io_context& context) {
boost::asio::io_context io_context;
spdlog::info("Running client..."); spdlog::info("Running client...");
commons::client s(io_context, boost::asio::ip::make_address("0.0.0.0"), 9000, auto s = std::make_unique<commons::client>(
0); context, boost::asio::ip::make_address("0.0.0.0"), 9000, 0);
for (int i = 0; i < 100; ++i) { for (int i = 0; i < 100; ++i) {
auto msg = std::make_shared<commons::protocol::draw_pixel>(); auto msg = std::make_shared<commons::protocol::draw_pixel>();
@@ -23,34 +22,45 @@ void run_client() {
// used as lifetime expansion so that msg doesnt go out of scope till it // used as lifetime expansion so that msg doesnt go out of scope till it
// actually was sent // actually was sent
s.send(msg->serialize(), s->send(msg->serialize(),
std::bind(&commons::protocol::generic_message_base::handle_sent, msg, std::bind(&commons::protocol::generic_message_base::handle_sent,
std::placeholders::_1)); msg, std::placeholders::_1));
} }
io_context.run(); return s;
} }
void run_server() { auto run_server(boost::asio::io_context& context) {
boost::asio::io_context io_context;
spdlog::info("Running server..."); spdlog::info("Running server...");
commons::server s(io_context, 9000); return std::make_unique<commons::server>(context, 9000);
io_context.run();
} }
int main(int argc, char *argv[]) { int main(int argc, char* argv[]) {
init_spdlog(); init_spdlog();
try { try {
boost::asio::io_context io_context;
std::unique_ptr<commons::server> server = nullptr;
std::unique_ptr<commons::client> client = nullptr;
if (argc == 1) { if (argc == 1) {
run_client(); client = run_client(io_context);
} }
if (argc == 2) { if (argc == 2) {
run_server(); server = run_server(io_context);
} }
} catch (std::exception &e) { std::vector<std::thread> asio_pool;
for (int i = 0; i < std::thread::hardware_concurrency() - 1; ++i) {
asio_pool.emplace_back([&]() { io_context.run(); });
}
for (auto& context : asio_pool) {
context.join();
}
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n"; std::cerr << "Exception: " << e.what() << "\n";
} }