From fc5e88144bdbd81a2eddee52912d6afda9541533 Mon Sep 17 00:00:00 2001 From: Rayne <52399183+rayne-qvq@users.noreply.github.com> Date: Fri, 27 Jun 2025 11:34:32 +0800 Subject: [PATCH] fix(OOM): There is a memory leak problem in the queue I discovered a memory leak issue during actual use. Through pprof's investigation, it was found that it was caused by the log service. After investigation, it was found that there is a risk of memory leakage in the queue's memory Goroutine leakage in the Append method: Whenever a new log message enters the queue, the Append method starts a new goroutine to put the message into the channel. When the queue is full, this goroutine will be blocked and will never be released. If the log generation speed is higher than the processing speed, it will lead to a large number of goroutine leaks, thereby depleting memory. Modify the Append method: Remove unnecessary goroutines and send messages to the channel in a non-blocking manner. If the queue is full, the message will be directly discarded and a log will be printed, while an error will be returned. This can fundamentally solve the problem of Goroutine leakage. --- storage/queue/memory.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/storage/queue/memory.go b/storage/queue/memory.go index 0ae6fe87..5b1f39c3 100644 --- a/storage/queue/memory.go +++ b/storage/queue/memory.go @@ -60,10 +60,13 @@ func (m *Memory) Append(message storage.Messager) error { q = m.makeQueue() m.queue.Store(message.GetStream(), q) } - go func(gm storage.Messager, gq queue) { - gm.SetID(uuid.New().String()) - gq <- gm - }(memoryMessage, q) + memoryMessage.SetID(uuid.New().String()) + select { + case q <- memoryMessage: + default: + log.Printf("memory queue for stream %s is full, dropping message", message.GetStream()) + return fmt.Errorf("memory queue for stream %s is full", message.GetStream()) + } return nil }