From 14d0716607654152c874350c369c828ad771c880 Mon Sep 17 00:00:00 2001 From: kalipso Date: Tue, 26 Apr 2022 20:30:23 +0200 Subject: [PATCH] handle messages concurrently --- include/protocol.hpp | 2 ++ include/server.hpp | 24 ++++++++++++++++++++---- src/main.cpp | 42 ++++++++++++++++++++++++++---------------- 3 files changed, 48 insertions(+), 20 deletions(-) diff --git a/include/protocol.hpp b/include/protocol.hpp index e686595..97dce18 100644 --- a/include/protocol.hpp +++ b/include/protocol.hpp @@ -96,6 +96,7 @@ class draw_rectangle : public generic_message { virtual void handle_message() override { spdlog::debug("draw_rectangle::handle_message()"); spdlog::debug("Position: X = {}, Y = {}", position.x, position.y); + spdlog::debug("handle_message sleeps for 1 sec to test concurrency"); std::this_thread::sleep_for(std::chrono::seconds(1)); } @@ -113,6 +114,7 @@ class draw_pixel : public generic_message { virtual void handle_message() override { spdlog::debug("draw_pixel::handle_message()"); spdlog::debug("Color: ({}, {}, {})", color.x, color.y, color.z); + spdlog::debug("handle_message sleeps for 1 sec to test concurrency"); std::this_thread::sleep_for(std::chrono::seconds(1)); } diff --git a/include/server.hpp b/include/server.hpp index 75e1f24..2704dff 100644 --- a/include/server.hpp +++ b/include/server.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include "protocol.hpp" @@ -12,6 +13,15 @@ namespace commons { using boost::asio::ip::udp; using namespace protocol; +class fake_thread_pool { + public: + template + static void add_task(Task&& task) { + std::thread t{std::forward(task)}; + t.detach(); + } +}; + class server { public: server(boost::asio::io_context& io_context, short port) @@ -25,14 +35,20 @@ class server { boost::asio::buffer(data_.data(), max_length), sender_endpoint_, [this](boost::system::error_code ec, std::size_t bytes_recvd) { if (!ec && bytes_recvd > 0) { - auto msg_obj = deserialize(data_.data(), bytes_recvd); - msg_obj->handle_message(); + std::shared_ptr msg_obj = + deserialize(data_.data(), bytes_recvd); - do_send(bytes_recvd); + fake_thread_pool::add_task([msg_obj = std::move(msg_obj)]() { + msg_obj->handle_message(); + }); + + // do_send(bytes_recvd); spdlog::debug("leaving do receive"); } else { - do_receive(); + spdlog::error("error during do receive"); } + + do_receive(); }); } diff --git a/src/main.cpp b/src/main.cpp index a88ff5d..0f61d98 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -11,11 +11,10 @@ void init_spdlog() { spdlog::debug("Setting Loglevel to debug..."); } -void run_client() { - boost::asio::io_context io_context; +auto run_client(boost::asio::io_context& context) { spdlog::info("Running client..."); - commons::client s(io_context, boost::asio::ip::make_address("0.0.0.0"), 9000, - 0); + auto s = std::make_unique( + context, boost::asio::ip::make_address("0.0.0.0"), 9000, 0); for (int i = 0; i < 100; ++i) { auto msg = std::make_shared(); @@ -23,34 +22,45 @@ void run_client() { // used as lifetime expansion so that msg doesnt go out of scope till it // actually was sent - s.send(msg->serialize(), - std::bind(&commons::protocol::generic_message_base::handle_sent, msg, - std::placeholders::_1)); + s->send(msg->serialize(), + std::bind(&commons::protocol::generic_message_base::handle_sent, + msg, std::placeholders::_1)); } - io_context.run(); + return s; } -void run_server() { - boost::asio::io_context io_context; +auto run_server(boost::asio::io_context& context) { spdlog::info("Running server..."); - commons::server s(io_context, 9000); - io_context.run(); + return std::make_unique(context, 9000); } -int main(int argc, char *argv[]) { +int main(int argc, char* argv[]) { init_spdlog(); try { + boost::asio::io_context io_context; + std::unique_ptr server = nullptr; + std::unique_ptr client = nullptr; if (argc == 1) { - run_client(); + client = run_client(io_context); } if (argc == 2) { - run_server(); + server = run_server(io_context); } - } catch (std::exception &e) { + std::vector asio_pool; + + for (int i = 0; i < std::thread::hardware_concurrency() - 1; ++i) { + asio_pool.emplace_back([&]() { io_context.run(); }); + } + + for (auto& context : asio_pool) { + context.join(); + } + + } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << "\n"; }