261 lines
5.4 KiB
Go
261 lines
5.4 KiB
Go
package game
|
||
|
||
import (
|
||
"encoding/json"
|
||
"runtime"
|
||
kafkaMiddleware "server/middleware/kafka"
|
||
"server/pkg/github.com/name5566/leaf/log"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
)
|
||
|
||
const (
|
||
Login_log = 1
|
||
LoginOut_log = 2
|
||
Event_log = 3
|
||
)
|
||
const (
|
||
PLAYROOM_LOST = "playroom_lost"
|
||
// 增大缓冲:针对 ~1000 条/s,保留足够秒数的缓冲(可按需调整)
|
||
LOG_LENGTH = 100000
|
||
WORKER_COUNT = 500
|
||
)
|
||
|
||
type LogMgr struct {
|
||
// 将切片改为带缓冲通道
|
||
L chan *Log
|
||
Lock sync.Mutex
|
||
wg sync.WaitGroup
|
||
stopOnce sync.Once
|
||
closing bool
|
||
}
|
||
|
||
type Log struct {
|
||
Uid int64
|
||
AppId int
|
||
ServerId int
|
||
EventName string
|
||
Param map[string]interface{}
|
||
TimeStamp int64
|
||
}
|
||
|
||
func (L *LogMgr) InitManager() {
|
||
// 初始化通道
|
||
L.L = make(chan *Log, LOG_LENGTH)
|
||
|
||
// 动态协程池参数
|
||
maxWorkers := WORKER_COUNT
|
||
if cpuWorkers := runtime.NumCPU() * 2; cpuWorkers > maxWorkers {
|
||
maxWorkers = cpuWorkers
|
||
}
|
||
minWorkers := 100
|
||
idleTimeout := 500 * time.Millisecond
|
||
|
||
var activeWorkers int32
|
||
|
||
// worker 启动函数
|
||
spawnWorker := func() {
|
||
L.wg.Add(1)
|
||
atomic.AddInt32(&activeWorkers, 1)
|
||
go func() {
|
||
defer L.wg.Done()
|
||
defer atomic.AddInt32(&activeWorkers, -1)
|
||
|
||
for {
|
||
// 首先阻塞等待一个任务(若 channel 关闭则退出)
|
||
v, ok := <-L.L
|
||
if !ok {
|
||
return
|
||
}
|
||
// 处理消息(与原来逻辑一致)
|
||
value, _ := json.Marshal(v)
|
||
if kafkaMiddleware.KafkaMod == nil {
|
||
// 尝试非阻塞重入,否则丢弃最旧再试一次
|
||
select {
|
||
case L.L <- v:
|
||
default:
|
||
select {
|
||
case <-L.L:
|
||
default:
|
||
}
|
||
select {
|
||
case L.L <- v:
|
||
default:
|
||
}
|
||
}
|
||
time.Sleep(10 * time.Millisecond)
|
||
} else {
|
||
if err := kafkaMiddleware.SendMsg([]byte(v.EventName), value); err != nil {
|
||
log.Debug("kafka log send err:%s", err.Error())
|
||
select {
|
||
case L.L <- v:
|
||
default:
|
||
select {
|
||
case <-L.L:
|
||
default:
|
||
}
|
||
select {
|
||
case L.L <- v:
|
||
default:
|
||
}
|
||
}
|
||
time.Sleep(10 * time.Millisecond)
|
||
}
|
||
}
|
||
|
||
// 快速循环处理后续可用任务,若超时则退出该 worker(回收)
|
||
timer := time.NewTimer(idleTimeout)
|
||
for {
|
||
select {
|
||
case v, ok := <-L.L:
|
||
if !ok {
|
||
if !timer.Stop() {
|
||
<-timer.C
|
||
}
|
||
return
|
||
}
|
||
// 处理消息
|
||
value, _ := json.Marshal(v)
|
||
if kafkaMiddleware.KafkaMod == nil {
|
||
select {
|
||
case L.L <- v:
|
||
default:
|
||
select {
|
||
case <-L.L:
|
||
default:
|
||
}
|
||
select {
|
||
case L.L <- v:
|
||
default:
|
||
}
|
||
}
|
||
time.Sleep(10 * time.Millisecond)
|
||
} else {
|
||
if err := kafkaMiddleware.SendMsg([]byte(v.EventName), value); err != nil {
|
||
log.Debug("kafka log send err:%s", err.Error())
|
||
select {
|
||
case L.L <- v:
|
||
default:
|
||
select {
|
||
case <-L.L:
|
||
default:
|
||
}
|
||
select {
|
||
case L.L <- v:
|
||
default:
|
||
}
|
||
}
|
||
time.Sleep(10 * time.Millisecond)
|
||
}
|
||
}
|
||
// 重置定时器以继续快速处理
|
||
if !timer.Stop() {
|
||
<-timer.C
|
||
}
|
||
timer.Reset(idleTimeout)
|
||
case <-timer.C:
|
||
timer.Stop()
|
||
// 空闲超时,退出该 worker
|
||
return
|
||
}
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
// 启动初始最小 worker 数量
|
||
for i := 0; i < minWorkers; i++ {
|
||
spawnWorker()
|
||
}
|
||
|
||
// 监督器:动态根据队列长度扩展 worker,但不超过 maxWorkers
|
||
go func() {
|
||
ticker := time.NewTicker(200 * time.Millisecond)
|
||
defer ticker.Stop()
|
||
for range ticker.C {
|
||
L.Lock.Lock()
|
||
closing := L.closing
|
||
L.Lock.Unlock()
|
||
if closing {
|
||
return
|
||
}
|
||
backlog := len(L.L)
|
||
active := int(atomic.LoadInt32(&activeWorkers))
|
||
if backlog > active && active < maxWorkers {
|
||
toSpawn := backlog - active
|
||
remain := maxWorkers - active
|
||
if toSpawn > remain {
|
||
toSpawn = remain
|
||
}
|
||
for i := 0; i < toSpawn; i++ {
|
||
spawnWorker()
|
||
}
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
func (L *LogMgr) AddLog(logs *Log) {
|
||
return
|
||
// 复制结构体和 Param map,避免并发修改导致 json.Marshal 时 panic
|
||
copyLog := *logs
|
||
|
||
// 安全地复制 map,使用 defer recover 防止并发迭代时的 panic
|
||
if logs.Param != nil {
|
||
func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
// 发生 panic 时使用空 map
|
||
log.Debug("AddLog: concurrent map read/write detected, using empty map")
|
||
copyLog.Param = make(map[string]interface{})
|
||
}
|
||
}()
|
||
newParam := make(map[string]interface{}, len(logs.Param))
|
||
for k, v := range logs.Param {
|
||
newParam[k] = v
|
||
}
|
||
copyLog.Param = newParam
|
||
}()
|
||
}
|
||
|
||
// 如果已经开始关闭,直接丢弃
|
||
L.Lock.Lock()
|
||
if L.closing {
|
||
L.Lock.Unlock()
|
||
return
|
||
}
|
||
// 非阻塞入队:若通道满则先丢弃最旧一条再入队,避免阻塞调用者
|
||
select {
|
||
case L.L <- ©Log:
|
||
L.Lock.Unlock()
|
||
return
|
||
default:
|
||
// 丢弃最旧一条以腾出空间(若有)
|
||
select {
|
||
case <-L.L:
|
||
default:
|
||
}
|
||
// 再尝试入队一次
|
||
select {
|
||
case L.L <- ©Log:
|
||
default:
|
||
// 放不下就直接丢弃
|
||
}
|
||
L.Lock.Unlock()
|
||
}
|
||
}
|
||
|
||
func (L *LogMgr) Close() {
|
||
L.stopOnce.Do(func() {
|
||
L.Lock.Lock()
|
||
// 标记为正在关闭,阻止后续入队
|
||
L.closing = true
|
||
// 关闭通道,通知所有 worker 退出(workers 会消费完所有已入队的消息)
|
||
close(L.L)
|
||
L.Lock.Unlock()
|
||
// 等待所有 worker 处理完
|
||
L.wg.Wait()
|
||
})
|
||
}
|