死循环漏洞修复
This commit is contained in:
parent
09749de671
commit
9e896ab874
@ -10,7 +10,10 @@ var clusterHandler map[int]func(*msg.Msg) error
|
||||
func ClusterMgrInit() {
|
||||
go func() {
|
||||
for {
|
||||
m := <-mergeCluster.MsgChan
|
||||
m, ok := <-mergeCluster.MsgChan
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
MessageHandle(m)
|
||||
}
|
||||
}()
|
||||
|
||||
@ -64,6 +64,7 @@ type WorkerPool struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
maxQueue int
|
||||
submitTimeout time.Duration
|
||||
mu sync.Mutex
|
||||
workerCancels []context.CancelFunc
|
||||
monitorTick *time.Ticker
|
||||
@ -506,13 +507,14 @@ func NewWorkerPool(workers, maxQueue int) *WorkerPool {
|
||||
workers = 1
|
||||
}
|
||||
pool := &WorkerPool{
|
||||
workers: 0,
|
||||
minWorkers: workers,
|
||||
maxWorkers: workers * 40,
|
||||
taskQueue: make(chan *MessageTask, maxQueue),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
maxQueue: maxQueue,
|
||||
workers: 0,
|
||||
minWorkers: workers,
|
||||
maxWorkers: workers * 40,
|
||||
taskQueue: make(chan *MessageTask, maxQueue),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
maxQueue: maxQueue,
|
||||
submitTimeout: 5 * time.Second,
|
||||
}
|
||||
pool.start()
|
||||
// 启动监控协程,负责动态扩缩容
|
||||
@ -571,11 +573,16 @@ func (p *WorkerPool) worker(ctx context.Context, id int) {
|
||||
|
||||
// 提交任务
|
||||
func (p *WorkerPool) Submit(task *MessageTask) error {
|
||||
// 当队列已满时,等待并重试入队,直到成功或 pool 关闭
|
||||
deadline := time.NewTimer(p.submitTimeout)
|
||||
defer deadline.Stop()
|
||||
|
||||
// 当队列已满时,等待并重试入队,直到成功、超时或 pool 关闭
|
||||
for {
|
||||
select {
|
||||
case <-p.ctx.Done():
|
||||
return fmt.Errorf("worker pool is closed")
|
||||
case <-deadline.C:
|
||||
return fmt.Errorf("worker pool submit timeout after %v", p.submitTimeout)
|
||||
case p.taskQueue <- task:
|
||||
// 如果队列接近满时,尝试扩容少量 worker
|
||||
queued := len(p.taskQueue)
|
||||
@ -601,6 +608,8 @@ func (p *WorkerPool) Submit(task *MessageTask) error {
|
||||
select {
|
||||
case <-p.ctx.Done():
|
||||
return fmt.Errorf("worker pool is closed")
|
||||
case <-deadline.C:
|
||||
return fmt.Errorf("worker pool submit timeout after %v", p.submitTimeout)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// 重试
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user