消息服务优化

This commit is contained in:
hahwu 2025-12-25 10:00:48 +08:00
parent e21aac1c08
commit 9ec87776f0

View File

@ -37,12 +37,17 @@ type MessageList struct {
// Worker Pool 结构 // Worker Pool 结构
type WorkerPool struct { type WorkerPool struct {
workers int workers int
taskQueue chan *MessageTask minWorkers int
wg sync.WaitGroup maxWorkers int
ctx context.Context taskQueue chan *MessageTask
cancel context.CancelFunc wg sync.WaitGroup
maxQueue int ctx context.Context
cancel context.CancelFunc
maxQueue int
mu sync.Mutex
workerCancels []context.CancelFunc
monitorTick *time.Ticker
} }
// 消息任务 // 消息任务
@ -374,35 +379,58 @@ func MessageHandle(m *msg.Msg) error {
// 创建 Worker Pool // 创建 Worker Pool
func NewWorkerPool(workers, maxQueue int) *WorkerPool { func NewWorkerPool(workers, maxQueue int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
if workers <= 0 {
workers = 1
}
pool := &WorkerPool{ pool := &WorkerPool{
workers: workers, workers: 0,
taskQueue: make(chan *MessageTask, maxQueue), minWorkers: workers,
ctx: ctx, maxWorkers: workers * 4,
cancel: cancel, taskQueue: make(chan *MessageTask, maxQueue),
maxQueue: maxQueue, ctx: ctx,
cancel: cancel,
maxQueue: maxQueue,
} }
pool.start() pool.start()
// 启动监控协程,负责动态扩缩容
pool.monitorTick = time.NewTicker(500 * time.Millisecond)
go pool.monitor()
return pool return pool
} }
// 启动 Worker Pool // 启动 Worker Pool
func (p *WorkerPool) start() { func (p *WorkerPool) start() {
for i := 0; i < p.workers; i++ { // 启动最小数量的 worker
p.wg.Add(1) for i := 0; i < p.minWorkers; i++ {
go p.worker(i) p.spawnWorker()
} }
} }
// Worker 工作函数 // spawnWorker 启动一个 worker使用独立的 cancelable context
func (p *WorkerPool) worker(id int) { func (p *WorkerPool) spawnWorker() {
p.mu.Lock()
defer p.mu.Unlock()
childCtx, childCancel := context.WithCancel(p.ctx)
p.workerCancels = append(p.workerCancels, childCancel)
id := len(p.workerCancels)
p.wg.Add(1)
p.workers++
go p.worker(childCtx, id)
}
// Worker 工作函数,监听其子上下文以便单独停止
func (p *WorkerPool) worker(ctx context.Context, id int) {
defer p.wg.Done() defer p.wg.Done()
log.Debug("Worker %d started", id) log.Debug("Worker %d started", id)
for { for {
select { select {
case <-p.ctx.Done(): case <-ctx.Done():
log.Debug("Worker %d stopped", id) log.Debug("Worker %d stopped", id)
return return
case <-p.ctx.Done():
log.Debug("Worker %d pool closed", id)
return
case task, ok := <-p.taskQueue: case task, ok := <-p.taskQueue:
if !ok { if !ok {
log.Debug("Worker %d: task queue closed", id) log.Debug("Worker %d: task queue closed", id)
@ -426,6 +454,24 @@ func (p *WorkerPool) Submit(task *MessageTask) error {
case <-p.ctx.Done(): case <-p.ctx.Done():
return fmt.Errorf("worker pool is closed") return fmt.Errorf("worker pool is closed")
case p.taskQueue <- task: case p.taskQueue <- task:
// 如果队列接近满时,尝试扩容少量 worker
queued := len(p.taskQueue)
capQ := cap(p.taskQueue)
if capQ > 0 && queued > capQ*3/4 {
go func() {
p.mu.Lock()
deficit := p.maxWorkers - p.workers
p.mu.Unlock()
if deficit > 0 {
// 最多扩容 2 个以避免剧烈波动
add := 2
if deficit < add {
add = deficit
}
p.ScaleUp(add)
}
}()
}
return nil return nil
default: default:
return fmt.Errorf("task queue is full") return fmt.Errorf("task queue is full")
@ -435,12 +481,97 @@ func (p *WorkerPool) Submit(task *MessageTask) error {
// 关闭 Worker Pool // 关闭 Worker Pool
func (p *WorkerPool) Shutdown() { func (p *WorkerPool) Shutdown() {
log.Debug("Shutting down worker pool...") log.Debug("Shutting down worker pool...")
// 停止监控
if p.monitorTick != nil {
p.monitorTick.Stop()
}
// 取消根 context会让所有子 worker 退出
p.cancel() p.cancel()
// 关闭任务通道,释放阻塞的接收者
close(p.taskQueue) close(p.taskQueue)
// 等待所有 worker 退出
p.wg.Wait() p.wg.Wait()
log.Debug("Worker pool shut down complete") log.Debug("Worker pool shut down complete")
} }
// ScaleUp 按数量增加 worker
func (p *WorkerPool) ScaleUp(n int) {
if n <= 0 {
return
}
p.mu.Lock()
can := p.maxWorkers - p.workers
if can <= 0 {
p.mu.Unlock()
return
}
if n > can {
n = can
}
p.mu.Unlock()
for i := 0; i < n; i++ {
p.spawnWorker()
}
}
// ScaleDown 按数量减少 worker
func (p *WorkerPool) ScaleDown(n int) {
if n <= 0 {
return
}
p.mu.Lock()
for i := 0; i < n && len(p.workerCancels) > 0 && p.workers > p.minWorkers; i++ {
last := len(p.workerCancels) - 1
cancel := p.workerCancels[last]
// 从 slice 中移除
p.workerCancels = p.workerCancels[:last]
p.workers--
// 调用 cancel 让对应 worker 退出
cancel()
}
p.mu.Unlock()
}
// monitor 周期性检查队列长度并做扩缩容
func (p *WorkerPool) monitor() {
for {
select {
case <-p.ctx.Done():
return
case <-p.monitorTick.C:
queued := len(p.taskQueue)
capQ := cap(p.taskQueue)
if capQ == 0 {
continue
}
// 扩容条件
if queued > capQ*3/4 {
p.mu.Lock()
deficit := p.maxWorkers - p.workers
p.mu.Unlock()
if deficit > 0 {
add := 1
if deficit >= 2 {
add = 2
}
p.ScaleUp(add)
}
}
// 缩容条件
if queued < capQ/4 {
p.mu.Lock()
extra := p.workers - p.minWorkers
p.mu.Unlock()
if extra > 0 {
// 每次缩一个,避免抖动
p.ScaleDown(1)
}
}
}
}
}
// ==================== 中间件实现 ==================== // ==================== 中间件实现 ====================
// 日志中间件 // 日志中间件