sdk shipping重构,移除websocket发货方式
This commit is contained in:
parent
7c10cebc78
commit
a99af471cd
@ -5,10 +5,15 @@ import (
|
||||
shipcommon "backend/sdk/ship/common"
|
||||
"backend/sdk/ship/model/test"
|
||||
"backend/sdk/ship/model/tuyou"
|
||||
"backend/util"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
@ -39,10 +44,10 @@ func init() {
|
||||
}
|
||||
|
||||
// 打开一个普通的最新日志文件(不使用 symlink),用于提供固定路径的最新日志
|
||||
currFile, err := os.OpenFile("./log/charge.log", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
|
||||
currFile, err := os.OpenFile("./log/charge.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
// 如果打开失败,仍然继续使用轮转器+控制台
|
||||
log.Printf("warning: failed to open current log file: %v", err)
|
||||
util.LogStructured("warn", "failed to open current log file", map[string]any{"error": err.Error()})
|
||||
logWriter = io.MultiWriter(rl, os.Stdout)
|
||||
errWriter = io.MultiWriter(rl, os.Stderr)
|
||||
} else {
|
||||
@ -60,11 +65,28 @@ func init() {
|
||||
common.Init()
|
||||
}
|
||||
|
||||
func requestLogger() gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
startedAt := time.Now()
|
||||
c.Next()
|
||||
|
||||
util.LogStructured("info", "http request", map[string]any{
|
||||
"clientIp": c.ClientIP(),
|
||||
"latencyMs": time.Since(startedAt).Milliseconds(),
|
||||
"method": c.Request.Method,
|
||||
"path": c.Request.URL.Path,
|
||||
"query": c.Request.URL.RawQuery,
|
||||
"status": c.Writer.Status(),
|
||||
"userAgent": c.Request.UserAgent(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
// 使用 gin.New 并显式注入写入器,确保中间件把日志写到轮转器
|
||||
// gin.SetMode(gin.ReleaseMode)
|
||||
r := gin.New()
|
||||
r.Use(gin.LoggerWithWriter(logWriter))
|
||||
r.Use(requestLogger())
|
||||
r.Use(gin.RecoveryWithWriter(errWriter))
|
||||
|
||||
ChargeApi := r.Group("/api")
|
||||
@ -73,6 +95,46 @@ func main() {
|
||||
ChargeApi.POST("test/charge", test.Charge)
|
||||
ChargeApi.POST("tuyou/charge", tuyou.Charge)
|
||||
}
|
||||
log.Printf("Ship SDK started on port %d; version : 1.1.0", shipcommon.AppConf.Port)
|
||||
r.Run(fmt.Sprintf(":%d", shipcommon.AppConf.Port))
|
||||
|
||||
server := &http.Server{
|
||||
Addr: fmt.Sprintf(":%d", shipcommon.AppConf.Port),
|
||||
Handler: r,
|
||||
ReadHeaderTimeout: 5 * time.Second,
|
||||
ReadTimeout: 15 * time.Second,
|
||||
WriteTimeout: 30 * time.Second,
|
||||
IdleTimeout: 60 * time.Second,
|
||||
}
|
||||
|
||||
shutdownCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
serverErr := make(chan error, 1)
|
||||
go func() {
|
||||
util.LogStructured("info", "ship sdk started", map[string]any{
|
||||
"port": shipcommon.AppConf.Port,
|
||||
"version": shipcommon.AppConf.Version,
|
||||
})
|
||||
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
serverErr <- err
|
||||
}
|
||||
close(serverErr)
|
||||
}()
|
||||
|
||||
select {
|
||||
case err, ok := <-serverErr:
|
||||
if ok && err != nil {
|
||||
util.LogStructured("error", "ship sdk start failed", map[string]any{"error": err.Error()})
|
||||
os.Exit(1)
|
||||
}
|
||||
case <-shutdownCtx.Done():
|
||||
util.LogStructured("info", "ship sdk shutting down", map[string]any{"reason": shutdownCtx.Err().Error()})
|
||||
}
|
||||
|
||||
gracefulCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := server.Shutdown(gracefulCtx); err != nil {
|
||||
util.LogStructured("error", "ship sdk shutdown failed", map[string]any{"error": err.Error()})
|
||||
os.Exit(1)
|
||||
}
|
||||
util.LogStructured("info", "ship sdk stopped", nil)
|
||||
}
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
package base
|
||||
|
||||
import (
|
||||
"backend/client"
|
||||
"backend/msg"
|
||||
"backend/util"
|
||||
"fmt"
|
||||
"log"
|
||||
@ -22,6 +24,11 @@ type Param struct {
|
||||
ChannelOrderId string
|
||||
}
|
||||
|
||||
const (
|
||||
orderShippingMaxAttempts = 100
|
||||
orderShippingRetryBackoff = time.Second
|
||||
)
|
||||
|
||||
func (p *Param) ChangeOrderStatus(Platform string, prodprice string) error {
|
||||
// 校验成功 修改订单状态为已支付
|
||||
AppConf, err := util.GetAppConfig(p.AppId)
|
||||
@ -49,3 +56,40 @@ func (p *Param) ChangeOrderStatus(Platform string, prodprice string) error {
|
||||
_, err = Db.Exec("UPDATE `t_player_charge` SET `PayStatus`=1, `PayTime`=?, `PayChannelOrderId` = ? , `PayPlatform` =? WHERE `OrderId`=? AND `PayStatus`=0", time.Now().Unix(), p.ChannelOrderId, Platform, p.OrderId)
|
||||
return err
|
||||
}
|
||||
|
||||
func ShipOrder(appInfo Param) error {
|
||||
request := &msg.ReqOrderShipping{
|
||||
OrderSn: appInfo.OrderId,
|
||||
Status: 1,
|
||||
ChannelOrderSn: appInfo.ChannelOrderId,
|
||||
}
|
||||
|
||||
var lastErr error
|
||||
for attempt := 1; attempt <= orderShippingMaxAttempts; attempt++ {
|
||||
err := client.OrderShipping(appInfo.AppId, appInfo.ServerId, request)
|
||||
if err == nil {
|
||||
util.LogStructured("info", "rpc order shipping success", map[string]any{
|
||||
"AppId": appInfo.AppId,
|
||||
"ServerId": appInfo.ServerId,
|
||||
"OrderId": appInfo.OrderId,
|
||||
"attempt": attempt,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
lastErr = err
|
||||
util.LogStructured("warn", "rpc order shipping retry", map[string]any{
|
||||
"AppId": appInfo.AppId,
|
||||
"ServerId": appInfo.ServerId,
|
||||
"OrderId": appInfo.OrderId,
|
||||
"attempt": attempt,
|
||||
"error": err.Error(),
|
||||
})
|
||||
|
||||
if attempt < orderShippingMaxAttempts {
|
||||
time.Sleep(orderShippingRetryBackoff)
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("rpc order shipping failed after %d attempts: %w", orderShippingMaxAttempts, lastErr)
|
||||
}
|
||||
|
||||
@ -2,12 +2,8 @@ package test
|
||||
|
||||
import (
|
||||
"backend/Type"
|
||||
"backend/client"
|
||||
"backend/common"
|
||||
"backend/middleware/feishu"
|
||||
"backend/msg"
|
||||
"backend/sdk/ship/model/base"
|
||||
"backend/util"
|
||||
"crypto/md5"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@ -140,52 +136,8 @@ func Charge(c *gin.Context) {
|
||||
|
||||
// http://localhost:5240/api/tuyou/charge?apiver=2&appId=20659&appInfo=%7B%22appId%22%3A0%2C%22serverId%22%3A1%2C%22orderId%22%3A%22order_105372_20260125225337HhpqbU%22%2C%22uid%22%3A105372%7D&chargedDiamonds=20&chargedRmbs=1.99&clientId=Android_5.00_tyGuest%2Cfacebook.googleplay.0-hall20659.googleplay.Meowment&consumeCoin=20&consumeId=d50b32601260117387&orderId=-&platformOrder=e50b3260126045d972&prodCount=1&prodId=TY206590059&prodPrice=1.99&userId=3790944&code=fd7532d651bed4c3041aa3e628bef80b
|
||||
func Shipping(AppInfo base.Param) {
|
||||
if common.GetRpcSwitch() {
|
||||
err := client.OrderShipping(AppInfo.AppId, AppInfo.ServerId, &msg.ReqOrderShipping{
|
||||
OrderSn: AppInfo.OrderId,
|
||||
Status: 1,
|
||||
ChannelOrderSn: AppInfo.ChannelOrderId,
|
||||
})
|
||||
|
||||
if err == nil {
|
||||
log.Printf("rpc order shipping success;AppId:%d;ServerId:%d;OrderId:%s", AppInfo.AppId, AppInfo.ServerId, AppInfo.OrderId)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
Adminreq := &msg.ReqAdminShipping{
|
||||
OrderSn: AppInfo.OrderId,
|
||||
Status: 1,
|
||||
ChannelOrderSn: AppInfo.ChannelOrderId,
|
||||
}
|
||||
num := 0
|
||||
log.Print("charge shipping start;AppId:", AppInfo.AppId, ";ServerId:", AppInfo.ServerId, ";OrderId:", AppInfo.OrderId)
|
||||
for {
|
||||
num++
|
||||
if num > 100 {
|
||||
log.Print("charge shipping break infinite loop;AppId:", AppInfo.AppId, ";ServerId:", AppInfo.ServerId, ";OrderId:", AppInfo.OrderId)
|
||||
break
|
||||
}
|
||||
|
||||
ws, err := util.GetWebsocket(AppInfo.AppId, AppInfo.ServerId)
|
||||
if err != nil {
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
r, err := util.SendAdminMsg(ws, Adminreq)
|
||||
// close the websocket immediately to avoid accumulating defers in the loop
|
||||
if closeErr := ws.Close(); closeErr != nil {
|
||||
log.Printf("failed to close websocket: %v", closeErr)
|
||||
}
|
||||
if err != nil {
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
if r != nil {
|
||||
log.Printf("ws charge shipping success:orderSn:%s;res:%v", AppInfo.OrderId, r)
|
||||
break
|
||||
}
|
||||
if err := base.ShipOrder(AppInfo); err != nil {
|
||||
log.Printf("rpc order shipping failed;AppId:%d;ServerId:%d;OrderId:%s;error:%v", AppInfo.AppId, AppInfo.ServerId, AppInfo.OrderId, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -2,11 +2,8 @@ package tuyou
|
||||
|
||||
import (
|
||||
"backend/Type"
|
||||
"backend/client"
|
||||
"backend/common"
|
||||
"backend/middleware/alibaba"
|
||||
"backend/middleware/feishu"
|
||||
"backend/msg"
|
||||
"backend/sdk/ship/model/base"
|
||||
"backend/util"
|
||||
"crypto/md5"
|
||||
@ -124,7 +121,7 @@ func Charge(c *gin.Context) {
|
||||
err := AppInfo.ChangeOrderStatus("tuyou", req.ProdPrice)
|
||||
if err != nil {
|
||||
alibaba.SendStandardMsg("途游充值发货错误-测试", err.Error(), "red")
|
||||
log.Print("change order status error:", err)
|
||||
util.LogStructured("error", "change order status error", map[string]any{"error": err.Error()})
|
||||
c.JSON(500, gin.H{"error": "failed to change order status"})
|
||||
return
|
||||
}
|
||||
@ -134,54 +131,13 @@ func Charge(c *gin.Context) {
|
||||
|
||||
// http://localhost:5240/api/tuyou/charge?apiver=2&appId=20659&appInfo=%7B%22appId%22%3A0%2C%22serverId%22%3A1%2C%22orderId%22%3A%22order_105372_20260125225337HhpqbU%22%2C%22uid%22%3A105372%7D&chargedDiamonds=20&chargedRmbs=1.99&clientId=Android_5.00_tyGuest%2Cfacebook.googleplay.0-hall20659.googleplay.Meowment&consumeCoin=20&consumeId=d50b32601260117387&orderId=-&platformOrder=e50b3260126045d972&prodCount=1&prodId=TY206590059&prodPrice=1.99&userId=3790944&code=fd7532d651bed4c3041aa3e628bef80b
|
||||
func Shipping(AppInfo base.Param) {
|
||||
if common.GetRpcSwitch() {
|
||||
err := client.OrderShipping(AppInfo.AppId, AppInfo.ServerId, &msg.ReqOrderShipping{
|
||||
OrderSn: AppInfo.OrderId,
|
||||
Status: 1,
|
||||
ChannelOrderSn: AppInfo.ChannelOrderId,
|
||||
if err := base.ShipOrder(AppInfo); err != nil {
|
||||
util.LogStructured("error", "rpc order shipping failed", map[string]any{
|
||||
"error": err.Error(),
|
||||
"AppId": AppInfo.AppId,
|
||||
"ServerId": AppInfo.ServerId,
|
||||
"OrderId": AppInfo.OrderId,
|
||||
})
|
||||
|
||||
if err == nil {
|
||||
log.Printf("rpc order shipping success;AppId:%d;ServerId:%d;OrderId:%s", AppInfo.AppId, AppInfo.ServerId, AppInfo.OrderId)
|
||||
return
|
||||
} else {
|
||||
log.Printf("rpc order shipping error: %v;AppId:%d;ServerId:%d;OrderId:%s", err, AppInfo.AppId, AppInfo.ServerId, AppInfo.OrderId)
|
||||
}
|
||||
}
|
||||
|
||||
Adminreq := &msg.ReqAdminShipping{
|
||||
OrderSn: AppInfo.OrderId,
|
||||
Status: 1,
|
||||
ChannelOrderSn: AppInfo.ChannelOrderId,
|
||||
}
|
||||
num := 0
|
||||
log.Print("ws charge shipping start;AppId:", AppInfo.AppId, ";ServerId:", AppInfo.ServerId, ";OrderId:", AppInfo.OrderId)
|
||||
for {
|
||||
num++
|
||||
if num > 100 {
|
||||
log.Print("ws charge shipping break infinite loop;AppId:", AppInfo.AppId, ";ServerId:", AppInfo.ServerId, ";OrderId:", AppInfo.OrderId)
|
||||
break
|
||||
}
|
||||
|
||||
ws, err := util.GetWebsocket(AppInfo.AppId, AppInfo.ServerId)
|
||||
if err != nil {
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
r, err := util.SendAdminMsg(ws, Adminreq)
|
||||
// close the websocket immediately to avoid accumulating defers in the loop
|
||||
if closeErr := ws.Close(); closeErr != nil {
|
||||
log.Printf("failed to close websocket: %v", closeErr)
|
||||
}
|
||||
if err != nil {
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
if r != nil {
|
||||
log.Printf("ws charge shipping success:orderSn:%s;res:%v", AppInfo.OrderId, r)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
17
util/util.go
17
util/util.go
@ -966,3 +966,20 @@ func GetServerWeight(resp *msg.ResServerInfo) int {
|
||||
|
||||
return int(clampFloat(weight, 1, 100))
|
||||
}
|
||||
|
||||
func LogStructured(level string, message string, fields map[string]any) {
|
||||
payload := map[string]any{
|
||||
"level": level,
|
||||
"msg": message,
|
||||
"time": time.Now().Format(time.RFC3339),
|
||||
}
|
||||
for key, value := range fields {
|
||||
payload[key] = value
|
||||
}
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
log.Printf("level=%s msg=%q marshal_error=%v", level, message, err)
|
||||
return
|
||||
}
|
||||
log.Print(string(data))
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user