From a99af471cd429dedb10c70d51c5ba893e333c7e2 Mon Sep 17 00:00:00 2001 From: hahwu <31872165+hahwu@users.noreply.github.com> Date: Wed, 13 May 2026 14:38:32 +0800 Subject: [PATCH] =?UTF-8?q?sdk=20shipping=E9=87=8D=E6=9E=84=EF=BC=8C?= =?UTF-8?q?=E7=A7=BB=E9=99=A4websocket=E5=8F=91=E8=B4=A7=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdk/ship/main.go | 72 ++++++++++++++++++++++++++++++++--- sdk/ship/model/base/base.go | 44 +++++++++++++++++++++ sdk/ship/model/test/test.go | 52 +------------------------ sdk/ship/model/tuyou/tuyou.go | 58 ++++------------------------ util/util.go | 17 +++++++++ 5 files changed, 137 insertions(+), 106 deletions(-) diff --git a/sdk/ship/main.go b/sdk/ship/main.go index 6e12616..b344aad 100644 --- a/sdk/ship/main.go +++ b/sdk/ship/main.go @@ -5,10 +5,15 @@ import ( shipcommon "backend/sdk/ship/common" "backend/sdk/ship/model/test" "backend/sdk/ship/model/tuyou" + "backend/util" + "context" "fmt" "io" "log" + "net/http" "os" + "os/signal" + "syscall" "time" "github.com/gin-gonic/gin" @@ -39,10 +44,10 @@ func init() { } // 打开一个普通的最新日志文件(不使用 symlink),用于提供固定路径的最新日志 - currFile, err := os.OpenFile("./log/charge.log", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + currFile, err := os.OpenFile("./log/charge.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { // 如果打开失败,仍然继续使用轮转器+控制台 - log.Printf("warning: failed to open current log file: %v", err) + util.LogStructured("warn", "failed to open current log file", map[string]any{"error": err.Error()}) logWriter = io.MultiWriter(rl, os.Stdout) errWriter = io.MultiWriter(rl, os.Stderr) } else { @@ -60,11 +65,28 @@ func init() { common.Init() } +func requestLogger() gin.HandlerFunc { + return func(c *gin.Context) { + startedAt := time.Now() + c.Next() + + util.LogStructured("info", "http request", map[string]any{ + "clientIp": c.ClientIP(), + "latencyMs": time.Since(startedAt).Milliseconds(), + "method": c.Request.Method, + "path": c.Request.URL.Path, + "query": c.Request.URL.RawQuery, + "status": c.Writer.Status(), + "userAgent": c.Request.UserAgent(), + }) + } +} + func main() { // 使用 gin.New 并显式注入写入器,确保中间件把日志写到轮转器 // gin.SetMode(gin.ReleaseMode) r := gin.New() - r.Use(gin.LoggerWithWriter(logWriter)) + r.Use(requestLogger()) r.Use(gin.RecoveryWithWriter(errWriter)) ChargeApi := r.Group("/api") @@ -73,6 +95,46 @@ func main() { ChargeApi.POST("test/charge", test.Charge) ChargeApi.POST("tuyou/charge", tuyou.Charge) } - log.Printf("Ship SDK started on port %d; version : 1.1.0", shipcommon.AppConf.Port) - r.Run(fmt.Sprintf(":%d", shipcommon.AppConf.Port)) + + server := &http.Server{ + Addr: fmt.Sprintf(":%d", shipcommon.AppConf.Port), + Handler: r, + ReadHeaderTimeout: 5 * time.Second, + ReadTimeout: 15 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 60 * time.Second, + } + + shutdownCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + defer stop() + + serverErr := make(chan error, 1) + go func() { + util.LogStructured("info", "ship sdk started", map[string]any{ + "port": shipcommon.AppConf.Port, + "version": shipcommon.AppConf.Version, + }) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + serverErr <- err + } + close(serverErr) + }() + + select { + case err, ok := <-serverErr: + if ok && err != nil { + util.LogStructured("error", "ship sdk start failed", map[string]any{"error": err.Error()}) + os.Exit(1) + } + case <-shutdownCtx.Done(): + util.LogStructured("info", "ship sdk shutting down", map[string]any{"reason": shutdownCtx.Err().Error()}) + } + + gracefulCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := server.Shutdown(gracefulCtx); err != nil { + util.LogStructured("error", "ship sdk shutdown failed", map[string]any{"error": err.Error()}) + os.Exit(1) + } + util.LogStructured("info", "ship sdk stopped", nil) } diff --git a/sdk/ship/model/base/base.go b/sdk/ship/model/base/base.go index c815f92..943894b 100644 --- a/sdk/ship/model/base/base.go +++ b/sdk/ship/model/base/base.go @@ -1,6 +1,8 @@ package base import ( + "backend/client" + "backend/msg" "backend/util" "fmt" "log" @@ -22,6 +24,11 @@ type Param struct { ChannelOrderId string } +const ( + orderShippingMaxAttempts = 100 + orderShippingRetryBackoff = time.Second +) + func (p *Param) ChangeOrderStatus(Platform string, prodprice string) error { // 校验成功 修改订单状态为已支付 AppConf, err := util.GetAppConfig(p.AppId) @@ -49,3 +56,40 @@ func (p *Param) ChangeOrderStatus(Platform string, prodprice string) error { _, err = Db.Exec("UPDATE `t_player_charge` SET `PayStatus`=1, `PayTime`=?, `PayChannelOrderId` = ? , `PayPlatform` =? WHERE `OrderId`=? AND `PayStatus`=0", time.Now().Unix(), p.ChannelOrderId, Platform, p.OrderId) return err } + +func ShipOrder(appInfo Param) error { + request := &msg.ReqOrderShipping{ + OrderSn: appInfo.OrderId, + Status: 1, + ChannelOrderSn: appInfo.ChannelOrderId, + } + + var lastErr error + for attempt := 1; attempt <= orderShippingMaxAttempts; attempt++ { + err := client.OrderShipping(appInfo.AppId, appInfo.ServerId, request) + if err == nil { + util.LogStructured("info", "rpc order shipping success", map[string]any{ + "AppId": appInfo.AppId, + "ServerId": appInfo.ServerId, + "OrderId": appInfo.OrderId, + "attempt": attempt, + }) + return nil + } + + lastErr = err + util.LogStructured("warn", "rpc order shipping retry", map[string]any{ + "AppId": appInfo.AppId, + "ServerId": appInfo.ServerId, + "OrderId": appInfo.OrderId, + "attempt": attempt, + "error": err.Error(), + }) + + if attempt < orderShippingMaxAttempts { + time.Sleep(orderShippingRetryBackoff) + } + } + + return fmt.Errorf("rpc order shipping failed after %d attempts: %w", orderShippingMaxAttempts, lastErr) +} diff --git a/sdk/ship/model/test/test.go b/sdk/ship/model/test/test.go index a8916b0..8ba5b7c 100644 --- a/sdk/ship/model/test/test.go +++ b/sdk/ship/model/test/test.go @@ -2,12 +2,8 @@ package test import ( "backend/Type" - "backend/client" - "backend/common" "backend/middleware/feishu" - "backend/msg" "backend/sdk/ship/model/base" - "backend/util" "crypto/md5" "encoding/json" "fmt" @@ -140,52 +136,8 @@ func Charge(c *gin.Context) { // http://localhost:5240/api/tuyou/charge?apiver=2&appId=20659&appInfo=%7B%22appId%22%3A0%2C%22serverId%22%3A1%2C%22orderId%22%3A%22order_105372_20260125225337HhpqbU%22%2C%22uid%22%3A105372%7D&chargedDiamonds=20&chargedRmbs=1.99&clientId=Android_5.00_tyGuest%2Cfacebook.googleplay.0-hall20659.googleplay.Meowment&consumeCoin=20&consumeId=d50b32601260117387&orderId=-&platformOrder=e50b3260126045d972&prodCount=1&prodId=TY206590059&prodPrice=1.99&userId=3790944&code=fd7532d651bed4c3041aa3e628bef80b func Shipping(AppInfo base.Param) { - if common.GetRpcSwitch() { - err := client.OrderShipping(AppInfo.AppId, AppInfo.ServerId, &msg.ReqOrderShipping{ - OrderSn: AppInfo.OrderId, - Status: 1, - ChannelOrderSn: AppInfo.ChannelOrderId, - }) - - if err == nil { - log.Printf("rpc order shipping success;AppId:%d;ServerId:%d;OrderId:%s", AppInfo.AppId, AppInfo.ServerId, AppInfo.OrderId) - return - } - } - - Adminreq := &msg.ReqAdminShipping{ - OrderSn: AppInfo.OrderId, - Status: 1, - ChannelOrderSn: AppInfo.ChannelOrderId, - } - num := 0 - log.Print("charge shipping start;AppId:", AppInfo.AppId, ";ServerId:", AppInfo.ServerId, ";OrderId:", AppInfo.OrderId) - for { - num++ - if num > 100 { - log.Print("charge shipping break infinite loop;AppId:", AppInfo.AppId, ";ServerId:", AppInfo.ServerId, ";OrderId:", AppInfo.OrderId) - break - } - - ws, err := util.GetWebsocket(AppInfo.AppId, AppInfo.ServerId) - if err != nil { - time.Sleep(time.Second) - continue - } - - r, err := util.SendAdminMsg(ws, Adminreq) - // close the websocket immediately to avoid accumulating defers in the loop - if closeErr := ws.Close(); closeErr != nil { - log.Printf("failed to close websocket: %v", closeErr) - } - if err != nil { - time.Sleep(time.Second) - continue - } - if r != nil { - log.Printf("ws charge shipping success:orderSn:%s;res:%v", AppInfo.OrderId, r) - break - } + if err := base.ShipOrder(AppInfo); err != nil { + log.Printf("rpc order shipping failed;AppId:%d;ServerId:%d;OrderId:%s;error:%v", AppInfo.AppId, AppInfo.ServerId, AppInfo.OrderId, err) } } diff --git a/sdk/ship/model/tuyou/tuyou.go b/sdk/ship/model/tuyou/tuyou.go index 3ade8c4..42213a2 100644 --- a/sdk/ship/model/tuyou/tuyou.go +++ b/sdk/ship/model/tuyou/tuyou.go @@ -2,11 +2,8 @@ package tuyou import ( "backend/Type" - "backend/client" - "backend/common" "backend/middleware/alibaba" "backend/middleware/feishu" - "backend/msg" "backend/sdk/ship/model/base" "backend/util" "crypto/md5" @@ -124,7 +121,7 @@ func Charge(c *gin.Context) { err := AppInfo.ChangeOrderStatus("tuyou", req.ProdPrice) if err != nil { alibaba.SendStandardMsg("途游充值发货错误-测试", err.Error(), "red") - log.Print("change order status error:", err) + util.LogStructured("error", "change order status error", map[string]any{"error": err.Error()}) c.JSON(500, gin.H{"error": "failed to change order status"}) return } @@ -134,54 +131,13 @@ func Charge(c *gin.Context) { // http://localhost:5240/api/tuyou/charge?apiver=2&appId=20659&appInfo=%7B%22appId%22%3A0%2C%22serverId%22%3A1%2C%22orderId%22%3A%22order_105372_20260125225337HhpqbU%22%2C%22uid%22%3A105372%7D&chargedDiamonds=20&chargedRmbs=1.99&clientId=Android_5.00_tyGuest%2Cfacebook.googleplay.0-hall20659.googleplay.Meowment&consumeCoin=20&consumeId=d50b32601260117387&orderId=-&platformOrder=e50b3260126045d972&prodCount=1&prodId=TY206590059&prodPrice=1.99&userId=3790944&code=fd7532d651bed4c3041aa3e628bef80b func Shipping(AppInfo base.Param) { - if common.GetRpcSwitch() { - err := client.OrderShipping(AppInfo.AppId, AppInfo.ServerId, &msg.ReqOrderShipping{ - OrderSn: AppInfo.OrderId, - Status: 1, - ChannelOrderSn: AppInfo.ChannelOrderId, + if err := base.ShipOrder(AppInfo); err != nil { + util.LogStructured("error", "rpc order shipping failed", map[string]any{ + "error": err.Error(), + "AppId": AppInfo.AppId, + "ServerId": AppInfo.ServerId, + "OrderId": AppInfo.OrderId, }) - - if err == nil { - log.Printf("rpc order shipping success;AppId:%d;ServerId:%d;OrderId:%s", AppInfo.AppId, AppInfo.ServerId, AppInfo.OrderId) - return - } else { - log.Printf("rpc order shipping error: %v;AppId:%d;ServerId:%d;OrderId:%s", err, AppInfo.AppId, AppInfo.ServerId, AppInfo.OrderId) - } - } - - Adminreq := &msg.ReqAdminShipping{ - OrderSn: AppInfo.OrderId, - Status: 1, - ChannelOrderSn: AppInfo.ChannelOrderId, - } - num := 0 - log.Print("ws charge shipping start;AppId:", AppInfo.AppId, ";ServerId:", AppInfo.ServerId, ";OrderId:", AppInfo.OrderId) - for { - num++ - if num > 100 { - log.Print("ws charge shipping break infinite loop;AppId:", AppInfo.AppId, ";ServerId:", AppInfo.ServerId, ";OrderId:", AppInfo.OrderId) - break - } - - ws, err := util.GetWebsocket(AppInfo.AppId, AppInfo.ServerId) - if err != nil { - time.Sleep(time.Second) - continue - } - - r, err := util.SendAdminMsg(ws, Adminreq) - // close the websocket immediately to avoid accumulating defers in the loop - if closeErr := ws.Close(); closeErr != nil { - log.Printf("failed to close websocket: %v", closeErr) - } - if err != nil { - time.Sleep(time.Second) - continue - } - if r != nil { - log.Printf("ws charge shipping success:orderSn:%s;res:%v", AppInfo.OrderId, r) - break - } } } diff --git a/util/util.go b/util/util.go index 509632e..0ac706b 100644 --- a/util/util.go +++ b/util/util.go @@ -966,3 +966,20 @@ func GetServerWeight(resp *msg.ResServerInfo) int { return int(clampFloat(weight, 1, 100)) } + +func LogStructured(level string, message string, fields map[string]any) { + payload := map[string]any{ + "level": level, + "msg": message, + "time": time.Now().Format(time.RFC3339), + } + for key, value := range fields { + payload[key] = value + } + data, err := json.Marshal(payload) + if err != nil { + log.Printf("level=%s msg=%q marshal_error=%v", level, message, err) + return + } + log.Print(string(data)) +}