Skip to content

Commit a16cd75

Browse files
committed
rate-limit: Process requests on original thread
Synchronous requests can be long-running, which can cause issues if they are all processed on the same worker thread. This commit updates the code to process synchronous requests on the original caller thread - the worker thread is now only responsible for signalling on a provided channel to wake up the caller. Signed-off-by: Christian Pardillo Laursen <christian.pardillolaursen@citrix.com>
1 parent ff6be8f commit a16cd75

File tree

1 file changed

+30
-17
lines changed

1 file changed

+30
-17
lines changed

ocaml/libs/rate-limit/bucket_table.ml

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -138,21 +138,34 @@ let submit t ~user_agent ~callback amount =
138138
if need_signal then Condition.signal worker_thread_cond
139139
)
140140

141+
(* Block and execute on the same thread *)
141142
let submit_sync t ~user_agent ~callback amount =
142-
let result = ref None in
143-
let mutex = Mutex.create () in
144-
let condition = Condition.create () in
145-
let wrapped_callback () =
146-
let r = callback () in
147-
Mutex.lock mutex ;
148-
result := Some r ;
149-
Condition.signal condition ;
150-
Mutex.unlock mutex
151-
in
152-
submit t ~user_agent ~callback:wrapped_callback amount ;
153-
Mutex.lock mutex ;
154-
while Option.is_none !result do
155-
Condition.wait condition mutex
156-
done ;
157-
Mutex.unlock mutex ;
158-
Option.get !result
143+
let map = Atomic.get t in
144+
match StringMap.find_opt user_agent map with
145+
| None ->
146+
callback ()
147+
| Some bucket_data -> (
148+
let channel_opt =
149+
with_lock bucket_data.process_queue_lock (fun () ->
150+
if
151+
Queue.is_empty bucket_data.process_queue
152+
&& Token_bucket.consume bucket_data.bucket amount
153+
then
154+
None (* Can run callback immediately after releasing lock *)
155+
else
156+
(* Rate limited, need to retrieve function result via channel *)
157+
let channel = Event.new_channel () in
158+
Queue.add
159+
(amount, fun () -> Event.sync (Event.send channel ()))
160+
bucket_data.process_queue ;
161+
Condition.signal bucket_data.worker_thread_cond ;
162+
Some channel
163+
)
164+
in
165+
match channel_opt with
166+
| None ->
167+
callback ()
168+
| Some channel ->
169+
Event.sync (Event.receive channel) ;
170+
callback ()
171+
)

0 commit comments

Comments
 (0)