admin_backend/util/Mysql.go
2026-04-22 16:44:37 +08:00

249 lines
6.0 KiB
Go

package util
import (
"backend/Type"
"backend/common"
"context"
"fmt"
"log"
"net"
"strings"
"sync"
"github.com/go-sql-driver/mysql"
// _ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"golang.org/x/crypto/ssh"
)
type MysqlPool struct {
lock sync.Mutex
poolList map[string]*poolInfo
}
type poolInfo struct {
DbList []*Db
Addr string
}
type Db struct {
*sqlx.DB
key string
ssh *ssh.Client
}
func (db *Db) Close() error {
if db == nil {
return nil
}
MPool.putDb(db.key, db)
return nil
}
func (m *MysqlPool) getDb(key string) *Db {
m.lock.Lock()
pool, ok := m.poolList[key]
if !ok || len(pool.DbList) == 0 {
m.lock.Unlock()
return nil
}
db := pool.DbList[0]
pool.DbList = pool.DbList[1:]
m.lock.Unlock()
// 在锁释放后再做 Ping 与关闭操作,避免死锁
if err := db.Ping(); err != nil {
// 底层连接失效,关闭 SSH 隧道和数据库连接,不放回池中
if db.ssh != nil {
db.ssh.Close()
}
db.DB.Close()
return nil
}
return db
}
func (m *MysqlPool) putDb(key string, db *Db) {
m.lock.Lock()
pool := m.poolList[key]
if pool == nil {
pool = &poolInfo{Addr: key}
m.poolList[key] = pool
}
if len(pool.DbList) >= 5 {
// 池已满,释放锁后再关闭连接,避免递归调用 putDb 导致死锁
m.lock.Unlock()
if db.ssh != nil {
db.ssh.Close()
}
db.DB.Close()
return
}
pool.DbList = append(pool.DbList, db)
m.lock.Unlock()
}
func (m *MysqlPool) GetMysqlDB(AppCnf *Type.App, ServerId int) *Db {
key := fmt.Sprintf("%s_%s_%d", AppCnf.NodeName, AppCnf.MysqlName, ServerId)
ADb := m.getDb(key)
if ADb != nil {
return ADb
}
var SQLDb *sqlx.DB
var err error
SQLDb, sshConn, err := connectToMySQLViaSSH(AppCnf, ServerId)
if err != nil {
log.Printf("failed to connect to mysql: %v", err)
return nil
}
ADb = &Db{SQLDb, key, sshConn}
// m.putDb(key, Db)
return ADb
}
func (m *MysqlPool) GetGameDB() *Db {
Server, Mysql := "log", "log"
key := fmt.Sprintf("%s_%s_game", Server, Mysql)
ADb := m.getDb(key)
if ADb != nil {
return ADb
}
SQLDb, err := ConnectMysql(Mysql, "game")
if err != nil {
return nil
}
ADb = &Db{SQLDb, key, nil}
// m.putDb(key, Db)
return ADb
}
func (m *MysqlPool) GetTopicDB(Topic string) *Db {
Server, Mysql := "log", "log"
key := fmt.Sprintf("%s_%s_%s", Server, Mysql, Topic)
ADb := m.getDb(key)
if ADb != nil {
return ADb
}
SQLDb, err := ConnectMysql(Mysql, Topic)
if err != nil {
return nil
}
ADb = &Db{SQLDb, key, nil}
// m.putDb(key, Db)
return ADb
}
func init() {
MPool = &MysqlPool{poolList: make(map[string]*poolInfo)}
}
var MPool *MysqlPool
func connectToMySQLViaSSH(AppCnf *Type.App, ServerId int) (*sqlx.DB, *ssh.Client, error) {
MysqlConfig, err := common.GetMysqlConfig(AppCnf.MysqlName)
if err != nil {
return nil, nil, fmt.Errorf("failed to get MySQL config: %v", err)
}
if common.GetSsh() {
SshConfig, err := common.GetServerConfig(AppCnf.NodeName)
if err != nil {
return nil, nil, fmt.Errorf("failed to get SSH config: %v", err)
}
SP, _ := Decrypt(SshConfig.Password)
// 创建 SSH 客户端配置
sshConfig := &ssh.ClientConfig{
User: SshConfig.Username,
Auth: []ssh.AuthMethod{
ssh.Password(SP),
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}
// 连接到 SSH 服务器
sshConn, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", SshConfig.Host, SshConfig.Port), sshConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to dial SSH: %v", err)
}
// defer sshConn.Close()
// 创建到 MySQL 服务器的隧道
mysqlConn, err := sshConn.Dial("tcp", fmt.Sprintf("%s:%d", MysqlConfig.Host, MysqlConfig.Port))
if err != nil {
return nil, nil, fmt.Errorf("failed to dial MySQL: %v", err)
}
// 注册 MySQL 驱动
mysql.RegisterDialContext("mysql+tcp", func(ctx context.Context, addr string) (net.Conn, error) {
return mysqlConn, nil
})
var Database string
if strings.Contains(AppCnf.Database, "%") {
Database = fmt.Sprintf(AppCnf.Database, ServerId)
} else {
Database = AppCnf.Database
}
MP, _ := Decrypt(MysqlConfig.Password)
// 连接到 MySQL 数据库
dsn := fmt.Sprintf("%s:%s@mysql+tcp(%s:%d)/%s", MysqlConfig.Username, MP, MysqlConfig.Host, MysqlConfig.Port, Database)
db, err := sqlx.Open("mysql", dsn)
if err != nil {
return nil, nil, fmt.Errorf("failed to open MySQL: %v", err)
}
return db, sshConn, nil
} else {
var Database string
if strings.Contains(AppCnf.Database, "%") {
Database = fmt.Sprintf(AppCnf.Database, ServerId)
} else {
Database = AppCnf.Database
}
MP, _ := Decrypt(MysqlConfig.Password)
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", MysqlConfig.Username, MP, MysqlConfig.Host, MysqlConfig.Port, Database)
db, err := sqlx.Open("mysql", dsn)
if err != nil {
return nil, nil, fmt.Errorf("failed to open MySQL: %v", err)
}
return db, nil, nil
}
}
func ConnectMysql(MysqlName, DataBase string) (*sqlx.DB, error) {
MysqlConfig, err := common.GetMysqlConfig(MysqlName)
if err != nil {
return nil, fmt.Errorf("failed to get MySQL config: %v", err)
}
MP, _ := Decrypt(MysqlConfig.Password)
// 连接到 MySQL 数据库
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", MysqlConfig.Username, MP, MysqlConfig.Host, MysqlConfig.Port, DataBase)
db, err := sqlx.Open("mysql", dsn)
if err != nil {
return nil, fmt.Errorf("failed to open MySQL: %v", err)
}
return db, nil
}
func GetAppConfig(AppId int) (*Type.App, error) {
Db := MPool.GetTopicDB("game")
defer Db.Close()
var app Type.App
err := Db.Get(&app, "SELECT * FROM app WHERE `AppId` = ?", AppId)
if err != nil {
return nil, fmt.Errorf("failed to scan rows: %v", err)
}
return &app, nil
}
func GetServerConfig(AppId, ServerId int) (*Type.ServerInfo, error) {
Db := MPool.GetTopicDB("game")
defer Db.Close()
var server Type.ServerInfo
err := Db.Get(&server, "SELECT `AppId`,`ServerId`,`Host`,`Status`,`ws_port`,`ecs`, `grpc_port` FROM server WHERE `AppId` = ? AND `ServerId` = ?", AppId, ServerId)
if err != nil {
return nil, fmt.Errorf("failed to scan rows: %v", err)
}
return &server, nil
}