216 lines
5.0 KiB
Go
216 lines
5.0 KiB
Go
package mergeCluster
|
|
|
|
import (
|
|
"fmt"
|
|
"server/conf"
|
|
"server/game/mod/msg"
|
|
GoUtil "server/game_util"
|
|
"time"
|
|
|
|
"gitea.bywaystudios.com/pet_home/leaf/network"
|
|
|
|
"gitea.bywaystudios.com/pet_home/leaf/log"
|
|
)
|
|
|
|
func RevcServerMsg(m *msg.Msg) error {
|
|
log.Debug("RevcServerMsg m %v", m)
|
|
return nil
|
|
}
|
|
|
|
// 连接成功 之后同步区服信息
|
|
func HandShake(a *Agent) {
|
|
m := &msg.Msg{
|
|
Type: msg.CLUSTER_HANDSHAKE_1,
|
|
From: conf.Server.ServerID,
|
|
Extra: conf.Server.RemoteAddr,
|
|
}
|
|
data, err := GoUtil.GobMarshal(m)
|
|
if err != nil {
|
|
log.Error("HandShake GobMarshal err %v", err)
|
|
return
|
|
}
|
|
log.Debug("握手 server id :%d", conf.Server.ServerID)
|
|
a.WriteMsg(data)
|
|
}
|
|
|
|
// 握手回调
|
|
func HandShakeRecv(a *Agent, m *msg.Msg) error {
|
|
ServerId := m.From
|
|
log.Debug("收到握手回复 ServerId %v", ServerId)
|
|
a.ServerId = ServerId
|
|
serverAgent.Store(ServerId, a)
|
|
|
|
if conf.Server.ServerID == ClusterCenterId {
|
|
log.Debug("通知区服:%d加入网路", m.From)
|
|
SendMsgAll(&msg.Msg{
|
|
Type: msg.CLUSTER_JOIN,
|
|
From: conf.Server.ServerID,
|
|
Extra: &ClusterJoinData{
|
|
ServerId: m.From,
|
|
RemoteAddr: m.Extra.(string),
|
|
},
|
|
})
|
|
} else {
|
|
syncMsg := &msg.Msg{
|
|
Type: msg.CLUSTER_FRIEND_SYNC,
|
|
To: ServerId,
|
|
HandleType: msg.HANDLE_MOD_CLUSTER_SYNC,
|
|
}
|
|
sendGameMsg(syncMsg)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func HandShakeRecv2(a *Agent, m *msg.Msg) error {
|
|
ServerId := m.From
|
|
a.ServerId = ServerId
|
|
log.Debug("HandShakeRecv2 ServerId %v", ServerId)
|
|
serverAgent.Store(ServerId, a)
|
|
return nil
|
|
}
|
|
|
|
func ClusterJoin(a *Agent, m *msg.Msg) error {
|
|
clusterJoin := m.Extra.(*ClusterJoinData)
|
|
if conf.Server.ServerID == clusterJoin.ServerId {
|
|
log.Debug("区服加入通知为本服, 不处理")
|
|
return nil
|
|
}
|
|
log.Debug("ClusterJoin ServerId %v", clusterJoin.ServerId)
|
|
//connectRemote(clusterJoin.RemoteAddr, clusterJoin.ServerId, "server")
|
|
return nil
|
|
}
|
|
|
|
func ClusterExit(a *Agent, m *msg.Msg) error {
|
|
clusterExit := m.Extra.(*ClusterExitData)
|
|
if conf.Server.ServerID == clusterExit.ServerId {
|
|
return nil
|
|
}
|
|
serverAgent.Delete(clusterExit.ServerId)
|
|
return nil
|
|
}
|
|
|
|
func connectRemote(RemoteAddr string, ConnType int, ConnLabel string) error {
|
|
client := new(network.TCPClient)
|
|
client.Addr = RemoteAddr
|
|
client.ConnNum = 1
|
|
client.PendingWriteNum = 1 << 14
|
|
client.LenMsgLen = 4
|
|
client.MaxMsgLen = 1 << 16
|
|
client.NewAgent = newAgent
|
|
client.ConnType = ConnType
|
|
client.ConnLabel = ConnLabel
|
|
client.ConnectInterval = time.Duration(time.Minute * 1)
|
|
if ConnType == ClusterCenterId { // 中心服断开重连
|
|
client.AutoReconnect = true
|
|
}
|
|
client.Start()
|
|
Center = client
|
|
clients = append(clients, client)
|
|
log.Debug("connet remote to addr:%s", RemoteAddr)
|
|
return nil
|
|
}
|
|
|
|
func SendServerMsg(m *msg.Msg, serverId int) error {
|
|
if m.UniKey == "" {
|
|
m.UniKey = GoUtil.UniKey(fmt.Sprintf("%v,Cluster Msg", m))
|
|
}
|
|
if m.SendT == 0 {
|
|
m.SendT = GoUtil.Now()
|
|
}
|
|
if v, ok := serverAgent.Load(serverId); ok {
|
|
data, err := GoUtil.GobMarshal(m)
|
|
if err != nil {
|
|
log.Error("SendServerMsg GobMarshal err %v", err)
|
|
return err
|
|
}
|
|
v.(network.Agent).WriteMsg(data)
|
|
return nil
|
|
}
|
|
return fmt.Errorf("server %d not online", serverId)
|
|
}
|
|
|
|
func CallServerMsg(m *msg.Msg, serverId int) (*msg.Msg, error) {
|
|
if m.UniKey == "" {
|
|
m.UniKey = GoUtil.UniKey(fmt.Sprintf("%v,Cluster Msg", m))
|
|
}
|
|
if m.SendT == 0 {
|
|
m.SendT = GoUtil.Now()
|
|
}
|
|
v, ok := serverAgent.Load(serverId)
|
|
// 之后再发送消息
|
|
if !ok {
|
|
return nil, fmt.Errorf("server %d not online", serverId)
|
|
}
|
|
// 先注册回调通道,避免发送出去后对方快速返回导致丢失
|
|
newChan := make(chan *msg.Msg, 1)
|
|
registerChanel(m.UniKey, newChan)
|
|
defer unregisterChanel(m.UniKey)
|
|
data, err := GoUtil.GobMarshal(m)
|
|
if err != nil {
|
|
log.Error("CallServerMsg GobMarshal err %v", err)
|
|
return nil, err
|
|
}
|
|
v.(network.Agent).WriteMsg(data)
|
|
// 等待返回(直接接收一次)
|
|
timeout := time.After(5 * time.Second)
|
|
select {
|
|
case backm := <-newChan:
|
|
if backm == nil {
|
|
return nil, fmt.Errorf("server %d not response", serverId)
|
|
}
|
|
// log.Debug("CallServerMsg reply %v", backm)
|
|
return backm, nil
|
|
case <-timeout:
|
|
return nil, fmt.Errorf("timeout waiting for server %d response", serverId)
|
|
}
|
|
}
|
|
|
|
func SendMsgAll(m *msg.Msg) {
|
|
data, err := GoUtil.GobMarshal(m)
|
|
if err != nil {
|
|
log.Error("SendMsgAll GobMarshal err %v", err)
|
|
return
|
|
}
|
|
serverAgent.Range(func(key, value interface{}) bool {
|
|
value.(network.Agent).WriteMsg(data)
|
|
return true
|
|
})
|
|
}
|
|
|
|
func processMsg(a *Agent, m *msg.Msg) error {
|
|
var err error
|
|
|
|
funcMapMu.RLock()
|
|
fun, ok := FuncMap[m.Type]
|
|
funcMapMu.RUnlock()
|
|
|
|
if ok {
|
|
err = fun(a, m)
|
|
} else {
|
|
MsgChan <- m //交由game Module消息处理
|
|
}
|
|
return err
|
|
}
|
|
|
|
func registerFunc(key int, fun func(*Agent, *msg.Msg) error) {
|
|
funcMapMu.Lock()
|
|
FuncMap[key] = fun
|
|
funcMapMu.Unlock()
|
|
}
|
|
|
|
func registerChanel(key string, chanel chan *msg.Msg) {
|
|
callbackChanMu.Lock()
|
|
CallbackChan[key] = chanel
|
|
callbackChanMu.Unlock()
|
|
}
|
|
|
|
func unregisterChanel(key string) {
|
|
callbackChanMu.Lock()
|
|
delete(CallbackChan, key)
|
|
callbackChanMu.Unlock()
|
|
}
|
|
|
|
func sendGameMsg(m *msg.Msg) {
|
|
MsgChan <- m
|
|
}
|