Skip to content

Commit d7c9306

Browse files
committed
Add sequential execution support for serial interface functions
- Introduced a new dispatch mechanism for executing serial interface functions sequentially across threads, ensuring that calls are processed in the order they are made. - Added corresponding sequential variants for multiple serial functions, including `serialAbortRead`, `serialRead`, `serialWrite`, and others, to enhance thread safety and execution consistency. - Updated the `serial.h` header to include the new sequential interface headers. This update improves the robustness of the serial API by providing a clear and reliable way to handle function calls in a sequential manner.
1 parent f01cbfb commit d7c9306

24 files changed

+721
-0
lines changed
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
#pragma once
2+
3+
#include <condition_variable>
4+
#include <functional>
5+
#include <future>
6+
#include <memory>
7+
#include <mutex>
8+
#include <queue>
9+
#include <thread>
10+
#include <type_traits>
11+
12+
namespace cpp_core::detail::seq
13+
{
14+
15+
struct DispatchState
16+
{
17+
std::mutex mtx;
18+
std::condition_variable cv;
19+
std::queue<std::function<void()>> queue;
20+
std::once_flag worker_started;
21+
};
22+
23+
/**
24+
* @brief Access the global dispatcher state (singleton).
25+
*
26+
* Implements the C++11 *Meyers-Singleton* idiom: the static local variable is
27+
* initialised on first use in a thread-safe manner and subsequently reused.
28+
* This ensures exactly one queue / mutex / condition_variable exists per
29+
* process while avoiding the static-initialisation-order problem.
30+
*
31+
* @return Reference to the sole ::DispatchState instance.
32+
*/
33+
inline auto state() -> DispatchState &
34+
{
35+
static DispatchState instance;
36+
return instance;
37+
}
38+
39+
/**
40+
* @brief Worker thread main loop.
41+
*
42+
* Waits for new jobs in `state().queue` and executes them sequentially.
43+
* The loop never terminates, the thread lives for the entire lifetime of
44+
* the process.
45+
*/
46+
inline void worker()
47+
{
48+
for (;;)
49+
{
50+
std::function<void()> job;
51+
{
52+
std::unique_lock<std::mutex> lock(state().mtx);
53+
state().cv.wait(lock, [] { return !state().queue.empty(); });
54+
job = std::move(state().queue.front());
55+
state().queue.pop();
56+
}
57+
job();
58+
}
59+
}
60+
61+
/**
62+
* @brief Starts the background worker thread (called once).
63+
*
64+
* Invoked internally by ::call via `std::call_once`. On the first invocation a
65+
* detached thread is spawned; subsequent calls are no-ops.
66+
*/
67+
inline void startWorker()
68+
{
69+
std::thread(worker).detach();
70+
}
71+
72+
/**
73+
* @brief Execute a callable sequentially on the background thread.
74+
*
75+
* The function/lambda is enqueued in FIFO order. The calling thread then
76+
* blocks until completion and returns the result (or propagates an exception).
77+
*
78+
* Steps:
79+
* 1. Wrap the callable in `std::packaged_task` to obtain an associated
80+
* `std::future`.
81+
* 2. Ensure the worker thread is running (`std::call_once`).
82+
* 3. Push a copyable thunk into the queue and wake the worker via
83+
* `notify_one()`.
84+
* 4. Wait on `future.get()` for completion and forward the return value.
85+
*
86+
* @tparam FunctionT Callable type with no parameters.
87+
* @param function Function object to be executed sequentially.
88+
* @return The callable's return value (or `void`).
89+
*/
90+
template <typename FunctionT> auto call(FunctionT &&function) -> decltype(function())
91+
{
92+
using FunctionReturnT = decltype(function());
93+
94+
auto task_ptr = std::make_shared<std::packaged_task<FunctionReturnT()>>(std::forward<FunctionT>(function));
95+
auto future = task_ptr->get_future();
96+
97+
std::call_once(state().worker_started, startWorker);
98+
99+
{
100+
std::lock_guard<std::mutex> lock(state().mtx);
101+
state().queue.emplace([task_ptr]() { (*task_ptr)(); });
102+
}
103+
state().cv.notify_one();
104+
105+
if constexpr (std::is_void_v<FunctionReturnT>)
106+
{
107+
future.get();
108+
}
109+
else
110+
{
111+
return future.get();
112+
}
113+
}
114+
115+
} // namespace cpp_core::detail::seq
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#pragma once
2+
3+
// Aggregates all *Sequential headers
4+
#include "get_version_sequential.h"
5+
#include "serial_abort_read_sequential.h"
6+
#include "serial_abort_write_sequential.h"
7+
#include "serial_clear_buffer_in_sequential.h"
8+
#include "serial_clear_buffer_out_sequential.h"
9+
#include "serial_close_sequential.h"
10+
#include "serial_drain_sequential.h"
11+
#include "serial_get_ports_info_sequential.h"
12+
#include "serial_in_bytes_total_sequential.h"
13+
#include "serial_in_bytes_waiting_sequential.h"
14+
#include "serial_open_sequential.h"
15+
#include "serial_out_bytes_total_sequential.h"
16+
#include "serial_out_bytes_waiting_sequential.h"
17+
#include "serial_read_line_sequential.h"
18+
#include "serial_read_sequential.h"
19+
#include "serial_read_until_sequence_sequential.h"
20+
#include "serial_read_until_sequential.h"
21+
#include "serial_set_error_callback_sequential.h"
22+
#include "serial_set_read_callback_sequential.h"
23+
#include "serial_set_write_callback_sequential.h"
24+
#include "serial_write_sequential.h"
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#pragma once
2+
3+
#include "../../detail/sequential_dispatch.h"
4+
#include "../../version.h"
5+
#include "../get_version.h"
6+
7+
#ifdef __cplusplus
8+
extern "C"
9+
{
10+
#endif
11+
12+
/**
13+
* @copydoc getVersion
14+
* @note Sequential variant: call executes strictly in the order it was enqueued across threads.
15+
*/
16+
inline MODULE_API void getVersionSequential(cpp_core::Version *out)
17+
{
18+
cpp_core::detail::seq::call([=] { getVersion(out); });
19+
}
20+
21+
#ifdef __cplusplus
22+
}
23+
#endif
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#pragma once
2+
3+
#include "../../detail/sequential_dispatch.h"
4+
#include "../../error_callback.h"
5+
#include "../serial_abort_read.h"
6+
7+
#ifdef __cplusplus
8+
extern "C"
9+
{
10+
#endif
11+
12+
/**
13+
* @copydoc serialAbortRead
14+
* @note Sequential variant: guarantees execution in the exact order the calls were made across threads.
15+
*/
16+
inline MODULE_API auto serialAbortReadSequential(
17+
int64_t handle,
18+
ErrorCallbackT error_callback = nullptr
19+
) -> int
20+
{
21+
return cpp_core::detail::seq::call([=] { return serialAbortRead(handle, error_callback); });
22+
}
23+
24+
#ifdef __cplusplus
25+
}
26+
#endif
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#pragma once
2+
3+
#include "../../detail/sequential_dispatch.h"
4+
#include "../../error_callback.h"
5+
#include "../serial_abort_write.h"
6+
7+
#ifdef __cplusplus
8+
extern "C"
9+
{
10+
#endif
11+
12+
/**
13+
* @copydoc serialAbortWrite
14+
* @note Sequential variant: guarantees execution in the exact order the calls were made across threads.
15+
*/
16+
inline MODULE_API auto serialAbortWriteSequential(
17+
int64_t handle,
18+
ErrorCallbackT error_callback = nullptr
19+
) -> int
20+
{
21+
return cpp_core::detail::seq::call([=] { return serialAbortWrite(handle, error_callback); });
22+
}
23+
24+
#ifdef __cplusplus
25+
}
26+
#endif
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#pragma once
2+
3+
#include "../../detail/sequential_dispatch.h"
4+
#include "../../error_callback.h"
5+
#include "../serial_clear_buffer_in.h"
6+
7+
#ifdef __cplusplus
8+
extern "C"
9+
{
10+
#endif
11+
12+
/**
13+
* @copydoc serialClearBufferIn
14+
* @note Sequential variant: guarantees execution in the exact order the calls were made across threads.
15+
*/
16+
inline MODULE_API auto serialClearBufferInSequential(
17+
int64_t handle,
18+
ErrorCallbackT error_callback = nullptr
19+
) -> int
20+
{
21+
return cpp_core::detail::seq::call([=] { return serialClearBufferIn(handle, error_callback); });
22+
}
23+
24+
#ifdef __cplusplus
25+
}
26+
#endif
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#pragma once
2+
3+
#include "../../detail/sequential_dispatch.h"
4+
#include "../../error_callback.h"
5+
#include "../serial_clear_buffer_out.h"
6+
7+
#ifdef __cplusplus
8+
extern "C"
9+
{
10+
#endif
11+
12+
/**
13+
* @copydoc serialClearBufferOut
14+
* @note Sequential variant: guarantees execution in the exact order the calls were made across threads.
15+
*/
16+
inline MODULE_API auto serialClearBufferOutSequential(
17+
int64_t handle,
18+
ErrorCallbackT error_callback = nullptr
19+
) -> int
20+
{
21+
return cpp_core::detail::seq::call([=] { return serialClearBufferOut(handle, error_callback); });
22+
}
23+
24+
#ifdef __cplusplus
25+
}
26+
#endif
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#pragma once
2+
3+
#include "../../detail/sequential_dispatch.h"
4+
#include "../../error_callback.h"
5+
#include "../serial_close.h"
6+
7+
#ifdef __cplusplus
8+
extern "C"
9+
{
10+
#endif
11+
12+
/**
13+
* @copydoc serialClose
14+
* @note Sequential variant: guarantees execution in the exact order the calls were made across threads.
15+
*/
16+
inline MODULE_API auto serialCloseSequential(
17+
int64_t handle,
18+
ErrorCallbackT error_callback = nullptr
19+
) -> int
20+
{
21+
return cpp_core::detail::seq::call([=] { return serialClose(handle, error_callback); });
22+
}
23+
24+
#ifdef __cplusplus
25+
}
26+
#endif
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#pragma once
2+
3+
#include "../../detail/sequential_dispatch.h"
4+
#include "../../error_callback.h"
5+
#include "../serial_drain.h"
6+
7+
#ifdef __cplusplus
8+
extern "C"
9+
{
10+
#endif
11+
12+
/**
13+
* @copydoc serialDrain
14+
* @note Sequential variant: guarantees execution in the exact order the calls were made across threads.
15+
*/
16+
inline MODULE_API auto serialDrainSequential(
17+
int64_t handle,
18+
ErrorCallbackT error_callback = nullptr
19+
) -> int
20+
{
21+
return cpp_core::detail::seq::call([=] { return serialDrain(handle, error_callback); });
22+
}
23+
24+
#ifdef __cplusplus
25+
}
26+
#endif
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#pragma once
2+
3+
#include "../../detail/sequential_dispatch.h"
4+
#include "../../error_callback.h"
5+
#include "../serial_get_ports_info.h"
6+
7+
#ifdef __cplusplus
8+
extern "C"
9+
{
10+
#endif
11+
12+
/**
13+
* @copydoc serialGetPortsInfo
14+
* @note Sequential variant: guarantees execution in the exact order the calls were made across threads.
15+
*/
16+
inline MODULE_API auto serialGetPortsInfoSequential(
17+
void (*callback_fn)(
18+
const char *port,
19+
const char *path,
20+
const char *manufacturer,
21+
const char *serial_number,
22+
const char *pnp_id,
23+
const char *location_id,
24+
const char *product_id,
25+
const char *vendor_id
26+
),
27+
ErrorCallbackT error_callback = nullptr
28+
) -> int
29+
{
30+
return cpp_core::detail::seq::call([=] { return serialGetPortsInfo(callback_fn, error_callback); });
31+
}
32+
33+
#ifdef __cplusplus
34+
}
35+
#endif

0 commit comments

Comments
 (0)