236 lines
5.6 KiB
Go
236 lines
5.6 KiB
Go
package db
|
||
|
||
import (
|
||
"context"
|
||
"server/conf"
|
||
"server/pkg/github.com/name5566/leaf/log"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/redis/go-redis/v9"
|
||
)
|
||
|
||
var ctx = context.Background()
|
||
|
||
var RdbWrite *redis.Client
|
||
var RdbRead *redis.Client
|
||
|
||
// helper: 创建单个客户端(addr 可以为 host:port 或 host)
|
||
func connectClient(addr string) (*redis.Client, error) {
|
||
if addr == "" {
|
||
return nil, nil
|
||
}
|
||
// 如果没有端口且配置了旧的 RedisPort,则尝试补端口
|
||
if !strings.Contains(addr, ":") && conf.Server.RedisPort != "" {
|
||
addr = addr + ":" + conf.Server.RedisPort
|
||
}
|
||
rdb := redis.NewClient(&redis.Options{
|
||
Addr: addr,
|
||
Password: conf.Server.RedisPwd,
|
||
DB: conf.Server.RedisDb,
|
||
})
|
||
if _, err := rdb.Ping(ctx).Result(); err != nil {
|
||
return nil, err
|
||
}
|
||
return rdb, nil
|
||
}
|
||
|
||
// InitRedis: 初始化读写分离客户端(向后兼容旧配置)
|
||
func InitRedis() {
|
||
// 决定写地址:优先使用 RedisWriteAddr,其次使用旧的 RedisAddr:RedisPort
|
||
writeAddr := conf.Server.RedisWriteAddr
|
||
if writeAddr == "" {
|
||
if conf.Server.RedisAddr != "" {
|
||
writeAddr = conf.Server.RedisAddr
|
||
if conf.Server.RedisPort != "" && !strings.Contains(writeAddr, ":") {
|
||
writeAddr = writeAddr + ":" + conf.Server.RedisPort
|
||
}
|
||
}
|
||
}
|
||
|
||
// 决定读地址:优先使用 RedisReadAddrs(逗号分隔),若为空则回退到写地址
|
||
readAddrs := conf.Server.RedisReadAddrs
|
||
if strings.TrimSpace(readAddrs) == "" {
|
||
readAddrs = writeAddr
|
||
}
|
||
|
||
// 取第一个可用的只读地址(简单实现)
|
||
var readClient *redis.Client
|
||
for _, a := range strings.Split(readAddrs, ",") {
|
||
a = strings.TrimSpace(a)
|
||
if a == "" {
|
||
continue
|
||
}
|
||
c, err := connectClient(a)
|
||
if err == nil {
|
||
readClient = c
|
||
break
|
||
}
|
||
log.Debug("connect read addr %s failed: %v", a, err)
|
||
}
|
||
|
||
// 如果所有只读都不可用,尝试连接写地址作为回退
|
||
writeClient, err := connectClient(writeAddr)
|
||
if err != nil {
|
||
log.Debug("连接redis写节点出错,错误信息:%v", err)
|
||
// 若读已连上则也作为写回退,否则返回
|
||
if readClient != nil {
|
||
RdbWrite = readClient
|
||
RdbRead = readClient
|
||
log.Debug("只有只读节点可用,读写共用该节点")
|
||
return
|
||
}
|
||
return
|
||
}
|
||
|
||
// 如果读未连接成功,读回退到写
|
||
if readClient == nil {
|
||
readClient = writeClient
|
||
}
|
||
|
||
RdbWrite = writeClient
|
||
RdbRead = readClient
|
||
log.Debug("成功初始化 redis(读写分离),写: %v, 读: %v", writeAddr, readAddrs)
|
||
}
|
||
|
||
// 写操作使用 RdbWrite
|
||
func RedisSetKey(key string, value string, expiration time.Duration) {
|
||
if RdbWrite == nil {
|
||
log.Debug("redis write client is nil")
|
||
return
|
||
}
|
||
err := RdbWrite.Set(ctx, key, value, expiration).Err()
|
||
if err != nil {
|
||
log.Debug("redis set failed, err:%v\n", err)
|
||
}
|
||
}
|
||
|
||
// 新增:写入字节数据,避免 string 转换拷贝
|
||
func RedisSetKeyBytes(key string, value []byte, expiration time.Duration) {
|
||
if RdbWrite == nil {
|
||
log.Debug("redis write client is nil")
|
||
return
|
||
}
|
||
err := RdbWrite.Set(ctx, key, value, expiration).Err()
|
||
if err != nil {
|
||
log.Debug("redis set failed, err:%v\n", err)
|
||
}
|
||
}
|
||
|
||
// 获取锁(写)
|
||
func RedisLock(key string, value string, expiration time.Duration) bool {
|
||
if RdbWrite == nil {
|
||
log.Debug("redis write client is nil")
|
||
return false
|
||
}
|
||
ok, err := RdbWrite.SetNX(ctx, key, value, expiration).Result()
|
||
if err != nil {
|
||
log.Debug("redis lock failed, err:%v\n", err)
|
||
return false
|
||
}
|
||
return ok
|
||
}
|
||
|
||
// 释放锁(写)
|
||
func RedisUnlock(key string, value string) bool {
|
||
if RdbWrite == nil {
|
||
log.Debug("redis write client is nil")
|
||
return false
|
||
}
|
||
script := `
|
||
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||
return redis.call("DEL", KEYS[1])
|
||
else
|
||
return 0
|
||
end
|
||
`
|
||
result, err := RdbWrite.Eval(ctx, script, []string{key}, value).Result()
|
||
if err != nil {
|
||
log.Debug("redis unlock failed, err:%v\n", err)
|
||
return false
|
||
}
|
||
return result.(int64) == 1
|
||
}
|
||
|
||
// 读操作使用 RdbRead
|
||
func RedisGetKey(key string) (string, error) {
|
||
if RdbRead == nil {
|
||
return "", nil
|
||
}
|
||
val, err := RdbRead.Get(ctx, key).Result()
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
return val, nil
|
||
}
|
||
|
||
func RedisDelKey(key string) {
|
||
if RdbWrite == nil {
|
||
log.Debug("redis write client is nil")
|
||
return
|
||
}
|
||
err := RdbWrite.Del(ctx, key).Err()
|
||
if err != nil {
|
||
log.Debug("redis del failed, err:%v\n", err)
|
||
}
|
||
}
|
||
|
||
func RedisZAdd(key string, member string, score float64) {
|
||
if RdbWrite == nil {
|
||
log.Debug("redis write client is nil")
|
||
return
|
||
}
|
||
err := RdbWrite.ZAdd(ctx, key, redis.Z{Score: score, Member: member}).Err()
|
||
if err != nil {
|
||
log.Debug("redis zadd failed, err:%v\n", err)
|
||
}
|
||
}
|
||
|
||
func RedisZRangeWithScores(key string, start, stop int64) ([]redis.Z, error) {
|
||
if RdbRead == nil {
|
||
return nil, nil
|
||
}
|
||
val, err := RdbRead.ZRangeWithScores(ctx, key, start, stop).Result()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return val, nil
|
||
}
|
||
|
||
func RedisZRevRangeWithScores(key string, start, stop int64) ([]redis.Z, error) {
|
||
if RdbRead == nil {
|
||
return nil, nil
|
||
}
|
||
val, err := RdbRead.ZRevRangeWithScores(ctx, key, start, stop).Result()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return val, nil
|
||
}
|
||
|
||
func RedisZRankWithScores(key, member string) (int64, float64, error) {
|
||
if RdbRead == nil {
|
||
return 0, 0, nil
|
||
}
|
||
val, err := RdbRead.ZRank(ctx, key, member).Result()
|
||
if err != nil {
|
||
return 0, 0, err
|
||
}
|
||
score, err := RdbRead.ZScore(ctx, key, member).Result()
|
||
if err != nil {
|
||
return 0, 0, err
|
||
}
|
||
return val, score, nil
|
||
}
|
||
|
||
func RedisDel(key string) {
|
||
if RdbWrite == nil {
|
||
log.Debug("redis write client is nil")
|
||
return
|
||
}
|
||
err := RdbWrite.Del(ctx, key).Err()
|
||
if err != nil {
|
||
log.Debug("redis del failed, err:%v\n", err)
|
||
}
|
||
}
|