1575 lines
70 KiB
Python
1575 lines
70 KiB
Python
"""
|
||
Thrift 配置文件完整流程工具
|
||
从 Excel 到 Unity 的一站式处理
|
||
"""
|
||
import tkinter as tk
|
||
from tkinter import ttk, filedialog, messagebox, scrolledtext
|
||
import json
|
||
import os
|
||
import sys
|
||
import re
|
||
import subprocess
|
||
import shutil
|
||
import importlib
|
||
import traceback
|
||
from pathlib import Path
|
||
from typing import List, Dict, Any, Tuple
|
||
import openpyxl
|
||
from datetime import datetime
|
||
|
||
# 版本号
|
||
VERSION = "v2.2.0 (简化路径配置)"
|
||
|
||
class IntegratedPipeline:
|
||
def __init__(self, root):
|
||
self.root = root
|
||
self.root.title(f"Thrift 完整流程工具 - {VERSION}")
|
||
self.root.geometry("1200x950")
|
||
|
||
# 配置路径 - 只需要两个基础路径
|
||
self.docs_path = "" # Docs 项目路径
|
||
self.main_project_path = "" # Unity 主项目路径
|
||
|
||
# 获取当前工具所在的项目根目录(自动检测:IntegratedTool -> PythonWorkSpace -> 项目根目录)
|
||
if getattr(sys, 'frozen', False):
|
||
# exe 模式
|
||
self.tool_root = Path(sys.executable).parent.parent.parent.resolve()
|
||
else:
|
||
# 开发模式 - 当前文件在 PythonWorkSpace/IntegratedTool/
|
||
self.tool_root = Path(__file__).parent.parent.parent.resolve()
|
||
|
||
# 获取配置文件路径(支持exe模式)
|
||
if getattr(sys, 'frozen', False):
|
||
app_dir = Path(os.path.dirname(sys.executable))
|
||
else:
|
||
app_dir = Path(__file__).parent
|
||
|
||
self.config_file = app_dir / "pipeline_config.json"
|
||
self.gen_py_dir = app_dir / "gen-py" # 临时Python代码目录
|
||
|
||
# 报告数据
|
||
self.report = {
|
||
'start_time': None,
|
||
'end_time': None,
|
||
'steps': []
|
||
}
|
||
|
||
self.load_config()
|
||
self.setup_ui()
|
||
|
||
def load_config(self):
|
||
"""加载配置"""
|
||
try:
|
||
if self.config_file.exists():
|
||
with open(self.config_file, 'r', encoding='utf-8') as f:
|
||
config = json.load(f)
|
||
self.docs_path = config.get('docs_path', '')
|
||
self.main_project_path = config.get('main_project_path', '')
|
||
except Exception as e:
|
||
print(f"加载配置失败: {e}")
|
||
|
||
def save_config(self):
|
||
"""保存配置"""
|
||
try:
|
||
config = {
|
||
'docs_path': self.docs_path,
|
||
'main_project_path': self.main_project_path
|
||
}
|
||
with open(self.config_file, 'w', encoding='utf-8') as f:
|
||
json.dump(config, f, indent=2, ensure_ascii=False)
|
||
except Exception as e:
|
||
print(f"保存配置失败: {e}")
|
||
|
||
def get_derived_paths(self) -> Dict[str, str]:
|
||
"""根据基础路径获取所有派生路径"""
|
||
paths = {}
|
||
|
||
# Docs 相关路径
|
||
if self.docs_path:
|
||
docs_root = Path(self.docs_path)
|
||
paths['cfg_json'] = str(docs_root / "tool" / "cfg" / "cfg_txt.json")
|
||
paths['config_dir'] = str(docs_root / "config")
|
||
|
||
# 工具项目相关路径(相对于项目根目录)
|
||
paths['compiler'] = str(self.tool_root / "compiler" / "exe" / "thrift.exe")
|
||
paths['thrift_dir'] = str(self.tool_root / "thrift-files" / "meowment")
|
||
paths['csharp_output_dir'] = str(self.tool_root / "compiled_output" / "csharp")
|
||
paths['bytes_output_dir'] = str(self.tool_root / "binary_output")
|
||
|
||
# Unity 主项目相关路径
|
||
if self.main_project_path:
|
||
main_root = Path(self.main_project_path)
|
||
paths['unity_csharp_dir'] = str(main_root / "Assets" / "Scripts" / "thrift" / "gen-netstd")
|
||
paths['unity_bytes_dir'] = str(main_root / "Assets" / "Resources" / "ConfigData")
|
||
paths['dr_output_dir'] = str(main_root / "Assets" / "GameMain" / "Scripts" / "DR_Generated")
|
||
|
||
return paths
|
||
|
||
def setup_ui(self):
|
||
"""设置UI界面"""
|
||
main_frame = ttk.Frame(self.root, padding="10")
|
||
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
||
|
||
self.root.columnconfigure(0, weight=1)
|
||
self.root.rowconfigure(0, weight=1)
|
||
main_frame.columnconfigure(1, weight=1)
|
||
main_frame.rowconfigure(9, weight=1)
|
||
|
||
row = 0
|
||
|
||
# 说明文字
|
||
info_label = ttk.Label(main_frame, text="只需配置两个基础路径,其他路径将自动推导",
|
||
font=("", 10, "bold"), foreground="darkgreen")
|
||
info_label.grid(row=row, column=0, columnspan=3, sticky=tk.W, pady=10)
|
||
row += 1
|
||
|
||
# Docs 项目路径
|
||
ttk.Label(main_frame, text="Docs 项目路径:").grid(row=row, column=0, sticky=tk.W, pady=5)
|
||
self.docs_path_var = tk.StringVar(value=self.docs_path)
|
||
ttk.Entry(main_frame, textvariable=self.docs_path_var, width=85).grid(row=row, column=1, sticky=(tk.W, tk.E), pady=5, padx=5)
|
||
ttk.Button(main_frame, text="选择", command=lambda: self.select_directory('docs_path', 'Docs项目路径')).grid(row=row, column=2, pady=5)
|
||
row += 1
|
||
|
||
# Unity 主项目路径
|
||
ttk.Label(main_frame, text="Unity 主项目路径:").grid(row=row, column=0, sticky=tk.W, pady=5)
|
||
self.main_project_path_var = tk.StringVar(value=self.main_project_path)
|
||
ttk.Entry(main_frame, textvariable=self.main_project_path_var, width=85).grid(row=row, column=1, sticky=(tk.W, tk.E), pady=5, padx=5)
|
||
ttk.Button(main_frame, text="选择", command=lambda: self.select_directory('main_project_path', 'Unity主项目路径')).grid(row=row, column=2, pady=5)
|
||
row += 1
|
||
|
||
# 分隔线
|
||
separator = ttk.Separator(main_frame, orient='horizontal')
|
||
separator.grid(row=row, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=15)
|
||
row += 1
|
||
|
||
# 派生路径预览标题
|
||
preview_label = ttk.Label(main_frame, text="派生路径预览(自动生成):",
|
||
font=("", 9, "bold"), foreground="navy")
|
||
preview_label.grid(row=row, column=0, columnspan=3, sticky=tk.W, pady=5)
|
||
row += 1
|
||
|
||
# 派生路径显示区域
|
||
self.preview_text = scrolledtext.ScrolledText(main_frame, width=140, height=10,
|
||
wrap=tk.WORD, font=("", 8),
|
||
state=tk.DISABLED, bg="#f0f0f0")
|
||
self.preview_text.grid(row=row, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=5)
|
||
row += 1
|
||
|
||
# 更新预览按钮
|
||
ttk.Button(main_frame, text="刷新路径预览", command=self.update_path_preview).grid(row=row, column=0, columnspan=3, pady=10)
|
||
row += 1
|
||
|
||
# 开始按钮
|
||
ttk.Button(main_frame, text="开始执行完整流程", command=self.run_pipeline).grid(row=row, column=0, columnspan=3, pady=20, ipadx=80, ipady=15)
|
||
row += 1
|
||
|
||
# 版本号显示
|
||
version_label = ttk.Label(main_frame, text=f"当前版本: {VERSION}", font=("", 9), foreground="blue")
|
||
version_label.grid(row=row, column=0, columnspan=3, pady=5)
|
||
row += 1
|
||
|
||
# 日志区域
|
||
ttk.Label(main_frame, text="执行日志:", font=("", 10, "bold")).grid(row=row, column=0, sticky=tk.W, pady=5)
|
||
row += 1
|
||
|
||
self.log_text = scrolledtext.ScrolledText(main_frame, width=140, height=30, wrap=tk.WORD, font=("Consolas", 9))
|
||
self.log_text.grid(row=row, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5)
|
||
|
||
def select_directory(self, attr_name: str, title: str):
|
||
"""选择目录"""
|
||
current_val = getattr(self, attr_name)
|
||
initialdir = current_val if current_val else None
|
||
dir_path = filedialog.askdirectory(title=f"选择{title}", initialdir=initialdir)
|
||
if dir_path:
|
||
setattr(self, attr_name, dir_path)
|
||
getattr(self, attr_name + '_var').set(dir_path)
|
||
self.save_config()
|
||
self.log(f"[OK] 已选择 {title}: {dir_path}\n")
|
||
self.update_path_preview()
|
||
|
||
def update_path_preview(self):
|
||
"""更新路径预览"""
|
||
self.preview_text.config(state=tk.NORMAL)
|
||
self.preview_text.delete(1.0, tk.END)
|
||
|
||
paths = self.get_derived_paths()
|
||
|
||
preview_lines = []
|
||
|
||
# 显示项目根目录
|
||
preview_lines.append(f"[工具项目根目录] {self.tool_root}")
|
||
preview_lines.append("")
|
||
|
||
preview_lines.append("[Docs 项目相关路径]")
|
||
preview_lines.append(f" * cfg_txt.json: {paths.get('cfg_json', '未配置')}")
|
||
preview_lines.append(f" * Config 目录 (xlsx): {paths.get('config_dir', '未配置')}")
|
||
preview_lines.append("")
|
||
|
||
preview_lines.append("[工具项目相关路径]")
|
||
# 检查路径是否存在
|
||
compiler_path = paths.get('compiler', '')
|
||
compiler_exists = " [OK]" if os.path.exists(compiler_path) else " [不存在!]"
|
||
preview_lines.append(f" * Thrift 编译器: {compiler_path}{compiler_exists}")
|
||
|
||
thrift_dir = paths.get('thrift_dir', '')
|
||
thrift_exists = " [OK]" if os.path.exists(thrift_dir) else " [不存在!]"
|
||
preview_lines.append(f" * Thrift 文件目录: {thrift_dir}{thrift_exists}")
|
||
|
||
preview_lines.append(f" * C# 输出目录: {paths.get('csharp_output_dir', '')}")
|
||
preview_lines.append(f" * Bytes 输出目录: {paths.get('bytes_output_dir', '')}")
|
||
preview_lines.append("")
|
||
|
||
preview_lines.append("[Unity 主项目相关路径]")
|
||
preview_lines.append(f" * Unity C# 目录: {paths.get('unity_csharp_dir', '未配置')}")
|
||
preview_lines.append(f" * Unity Bytes 目录: {paths.get('unity_bytes_dir', '未配置')}")
|
||
preview_lines.append(f" * DR 生成目录: {paths.get('dr_output_dir', '未配置')}")
|
||
|
||
self.preview_text.insert(tk.END, "\n".join(preview_lines))
|
||
self.preview_text.config(state=tk.DISABLED)
|
||
|
||
def log(self, message: str):
|
||
"""输出日志"""
|
||
try:
|
||
self.log_text.insert(tk.END, message)
|
||
self.log_text.see(tk.END)
|
||
self.root.update()
|
||
except Exception as e:
|
||
print(f"日志输出失败: {e}")
|
||
|
||
def clear_log(self):
|
||
"""清空日志"""
|
||
self.log_text.delete(1.0, tk.END)
|
||
|
||
def add_step_report(self, step_name: str, status: str, details: Dict = None):
|
||
"""添加步骤报告"""
|
||
step = {
|
||
'name': step_name,
|
||
'status': status,
|
||
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
'details': details or {}
|
||
}
|
||
self.report['steps'].append(step)
|
||
|
||
def generate_report(self) -> str:
|
||
"""生成最终报告"""
|
||
report_lines = []
|
||
|
||
# =============== 简报部分 ===============
|
||
report_lines.append("="*100)
|
||
report_lines.append(" Thrift 流程执行简报")
|
||
report_lines.append("="*100)
|
||
report_lines.append(f"执行时间: {self.report['start_time']} -> {self.report['end_time']}")
|
||
report_lines.append("")
|
||
|
||
# 统计总体状态
|
||
total_errors = 0
|
||
step_summary = []
|
||
|
||
for idx, step in enumerate(self.report['steps'], 1):
|
||
status_icon = "[OK]" if step['status'] == 'success' else "[FAIL]"
|
||
status_text = "成功" if step['status'] == 'success' else "失败"
|
||
|
||
# 提取关键数字
|
||
summary_text = f"{idx}. [{status_icon}] {step['name']}: {status_text}"
|
||
|
||
if step['details']:
|
||
details = step['details']
|
||
|
||
# 根据不同步骤提取关键信息
|
||
if 'success' in details and 'total' in details:
|
||
summary_text += f" - {details['success']}/{details['total']} 个文件"
|
||
if 'failed' in details and details['failed'] > 0:
|
||
summary_text += f" (失败: {details['failed']})"
|
||
total_errors += details['failed']
|
||
elif 'count' in details:
|
||
summary_text += f" - {details['count']} 个文件"
|
||
if 'skipped' in details and details['skipped']:
|
||
summary_text += f" (跳过: {len(details['skipped'])})"
|
||
elif 'action' in details:
|
||
# step4的新格式:跳过清理
|
||
summary_text += f" - {details['action']}"
|
||
elif 'deleted' in details:
|
||
summary_text += f" - 删除 {details['deleted']} 个文件"
|
||
|
||
step_summary.append(summary_text)
|
||
if step['status'] != 'success':
|
||
total_errors += 1
|
||
|
||
# 显示简报
|
||
for line in step_summary:
|
||
report_lines.append(line)
|
||
|
||
report_lines.append("")
|
||
if total_errors == 0:
|
||
report_lines.append("【总体状态】[OK] 所有步骤成功完成,无错误")
|
||
else:
|
||
report_lines.append(f"【总体状态】[WARNING] 发现 {total_errors} 个问题,详见下方详细报告")
|
||
|
||
report_lines.append("")
|
||
report_lines.append("="*100)
|
||
report_lines.append(" 详细执行报告")
|
||
report_lines.append("="*100)
|
||
report_lines.append("")
|
||
|
||
# =============== 详细报告部分 ===============
|
||
for idx, step in enumerate(self.report['steps'], 1):
|
||
status_icon = "[OK]" if step['status'] == 'success' else "[FAIL]"
|
||
report_lines.append(f"【步骤 {idx}】{step['name']}")
|
||
report_lines.append(f"状态: [{status_icon}] {'成功' if step['status'] == 'success' else '失败'}")
|
||
report_lines.append(f"耗时: {step['time']}")
|
||
report_lines.append("-"*80)
|
||
|
||
if step['details']:
|
||
details = step['details']
|
||
|
||
# 显示基本统计
|
||
if 'success' in details and 'total' in details:
|
||
report_lines.append(f"处理文件: {details['success']}/{details['total']}")
|
||
if 'failed' in details:
|
||
report_lines.append(f"失败文件: {details['failed']}")
|
||
# 显示AllConfigs状态
|
||
if 'allconfigs' in details:
|
||
allconfigs_status = "[OK] 已生成" if details['allconfigs'] else "[FAIL] 未生成"
|
||
report_lines.append(f"AllConfigs.bytes: {allconfigs_status}")
|
||
elif 'count' in details:
|
||
report_lines.append(f"处理文件: {details['count']}")
|
||
elif 'deleted' in details:
|
||
report_lines.append(f"删除文件: {details['deleted']}")
|
||
|
||
report_lines.append("")
|
||
|
||
# 显示详细列表
|
||
for key, value in details.items():
|
||
if isinstance(value, list):
|
||
if len(value) > 0:
|
||
# 特殊处理 failed_files - 显示完整错误信息
|
||
if key == 'failed_files' and value and isinstance(value[0], dict):
|
||
report_lines.append(f"[FAIL] 失败详情 ({len(value)} 项):")
|
||
for item in value:
|
||
report_lines.append(f" - {item['name']}")
|
||
report_lines.append(f" 错误: {item['error']}")
|
||
if 'detail' in item:
|
||
# 显示堆栈前10行
|
||
detail_lines = item['detail'].split('\n')[:10]
|
||
for line in detail_lines:
|
||
if line.strip():
|
||
report_lines.append(f" {line}")
|
||
report_lines.append("")
|
||
|
||
# 成功的文件列表
|
||
elif key in ['success_files', 'files']:
|
||
report_lines.append(f"[OK] 成功文件 ({len(value)} 项):")
|
||
for item in value:
|
||
report_lines.append(f" - {item}")
|
||
|
||
# 跳过的文件列表
|
||
elif key == 'skipped':
|
||
if value:
|
||
report_lines.append(f"[SKIP] 跳过文件 ({len(value)} 项):")
|
||
for item in value:
|
||
report_lines.append(f" - {item}")
|
||
|
||
# 其他列表
|
||
elif key not in ['success', 'failed', 'total', 'count', 'deleted', 'expected']:
|
||
report_lines.append(f"{key} ({len(value)} 项):")
|
||
for item in value:
|
||
report_lines.append(f" - {item}")
|
||
|
||
# 非列表值(跳过已显示的统计值)
|
||
elif key not in ['success', 'failed', 'total', 'count', 'deleted', 'expected', 'error']:
|
||
report_lines.append(f"{key}: {value}")
|
||
|
||
# 显示错误信息
|
||
if 'error' in details:
|
||
report_lines.append(f"[ERROR] 错误信息: {details['error']}")
|
||
|
||
report_lines.append("")
|
||
report_lines.append("")
|
||
|
||
# =============== 路径信息 ===============
|
||
report_lines.append("="*100)
|
||
report_lines.append(" 配置路径信息")
|
||
report_lines.append("="*100)
|
||
|
||
paths = self.get_derived_paths()
|
||
|
||
report_lines.append("[基础路径]")
|
||
report_lines.append(f" - Docs 项目: {self.docs_path}")
|
||
report_lines.append(f" - Unity 主项目: {self.main_project_path}")
|
||
report_lines.append("")
|
||
|
||
report_lines.append("[Docs 派生路径]")
|
||
report_lines.append(f" - cfg_txt.json: {paths.get('cfg_json', '')}")
|
||
report_lines.append(f" - Config 目录: {paths.get('config_dir', '')}")
|
||
report_lines.append("")
|
||
|
||
report_lines.append("[工具项目派生路径 (相对路径)]")
|
||
report_lines.append(f" - Thrift 编译器: {paths.get('compiler', '')}")
|
||
report_lines.append(f" - Thrift 文件目录: {paths.get('thrift_dir', '')}")
|
||
report_lines.append(f" - C# 输出目录: {paths.get('csharp_output_dir', '')}")
|
||
report_lines.append(f" - Bytes 输出目录: {paths.get('bytes_output_dir', '')}")
|
||
report_lines.append("")
|
||
|
||
report_lines.append("[Unity 派生路径]")
|
||
report_lines.append(f" - Unity C# 目录: {paths.get('unity_csharp_dir', '')}")
|
||
report_lines.append(f" - Unity Bytes 目录: {paths.get('unity_bytes_dir', '')}")
|
||
report_lines.append(f" - DR 生成目录: {paths.get('dr_output_dir', '')}")
|
||
report_lines.append("="*100)
|
||
|
||
return "\n".join(report_lines)
|
||
|
||
def validate_paths(self) -> bool:
|
||
"""验证所有路径"""
|
||
errors = []
|
||
|
||
# 验证基础路径
|
||
if not self.docs_path:
|
||
errors.append("[X] 请选择 Docs 项目路径")
|
||
|
||
if not self.main_project_path:
|
||
errors.append("[X] 请选择 Unity 主项目路径")
|
||
|
||
# 获取派生路径并验证
|
||
paths = self.get_derived_paths()
|
||
|
||
if self.docs_path:
|
||
if 'cfg_json' in paths and not os.path.exists(paths['cfg_json']):
|
||
errors.append(f"[X] cfg_txt.json 文件不存在: {paths['cfg_json']}")
|
||
|
||
if 'config_dir' in paths and not os.path.exists(paths['config_dir']):
|
||
errors.append(f"[X] Config 目录不存在: {paths['config_dir']}")
|
||
|
||
# 验证工具项目相关路径
|
||
if not os.path.exists(paths.get('compiler', '')):
|
||
errors.append(f"[X] Thrift 编译器不存在: {paths.get('compiler', '')}")
|
||
|
||
if not os.path.exists(paths.get('thrift_dir', '')):
|
||
errors.append(f"[X] Thrift 文件目录不存在: {paths.get('thrift_dir', '')}")
|
||
|
||
if errors:
|
||
error_msg = "\n".join(errors)
|
||
messagebox.showerror("配置错误", f"请检查以下配置:\n\n{error_msg}")
|
||
return False
|
||
|
||
return True
|
||
|
||
def run_pipeline(self):
|
||
"""执行完整流程"""
|
||
if not self.validate_paths():
|
||
return
|
||
|
||
# 获取所有派生路径
|
||
paths = self.get_derived_paths()
|
||
self.cfg_json_path = paths.get('cfg_json', '')
|
||
self.compiler_path = paths.get('compiler', '')
|
||
self.thrift_dir = paths.get('thrift_dir', '')
|
||
self.csharp_output_dir = paths.get('csharp_output_dir', '')
|
||
self.bytes_output_dir = paths.get('bytes_output_dir', '')
|
||
self.config_dir = paths.get('config_dir', '')
|
||
self.unity_csharp_dir = paths.get('unity_csharp_dir', '')
|
||
self.unity_bytes_dir = paths.get('unity_bytes_dir', '')
|
||
self.dr_output_dir = paths.get('dr_output_dir', '')
|
||
|
||
self.clear_log()
|
||
self.report = {'start_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'end_time': None, 'steps': []}
|
||
|
||
self.log("="*100 + "\n")
|
||
self.log(f"开始执行完整流程 ({VERSION})\n")
|
||
self.log("="*100 + "\n\n")
|
||
|
||
try:
|
||
# 步骤1: 生成 Thrift 文件
|
||
if not self.step1_generate_thrift():
|
||
return
|
||
|
||
# 步骤2: 编译 Thrift 到 C#
|
||
if not self.step2_compile_to_csharp():
|
||
return
|
||
|
||
# 步骤3: 生成 Bytes 文件
|
||
if not self.step3_generate_bytes():
|
||
return
|
||
|
||
# 步骤4: 清理冲突的 Extensions 文件
|
||
if not self.step4_cleanup_extensions():
|
||
return
|
||
|
||
# 步骤5: 复制 C# 到 Unity
|
||
if not self.step5_copy_csharp():
|
||
return
|
||
|
||
# 步骤6: 复制 Bytes 到 Unity(原步骤6)
|
||
if not self.step6_copy_bytes():
|
||
return
|
||
|
||
# 步骤7: 清理临时文件(原步骤7)
|
||
self.step7_cleanup_temp()
|
||
|
||
# 步骤8: 生成 DR 文件
|
||
if not self.step8_generate_dr():
|
||
return
|
||
|
||
self.report['end_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
# 生成报告
|
||
report_text = self.generate_report()
|
||
self.log("\n" + report_text)
|
||
|
||
# 保存报告
|
||
report_path = Path(__file__).parent / f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
|
||
with open(report_path, 'w', encoding='utf-8') as f:
|
||
f.write(report_text)
|
||
|
||
self.log(f"\n报告已保存: {report_path}\n")
|
||
|
||
# 原始:根据成功率显示不同类型的消息
|
||
total_steps = len(self.report['steps'])
|
||
failed_steps = sum(1 for step in self.report['steps'] if step['status'] != 'success')
|
||
|
||
if failed_steps == 0:
|
||
messagebox.showinfo("完成", f"所有步骤执行成功!\n\n详细报告:\n{report_path}")
|
||
elif failed_steps < total_steps:
|
||
messagebox.showwarning("部分完成", f"部分步骤执行成功\n成功: {total_steps - failed_steps}/{total_steps}\n\n详细报告请查看日志。")
|
||
else:
|
||
messagebox.showerror("失败", f"所有步骤都失败了\n\n详细报告请查看日志。")
|
||
|
||
except Exception as e:
|
||
self.log(f"\n[FAIL] 执行失败: {str(e)}\n")
|
||
self.log(traceback.format_exc())
|
||
messagebox.showerror("错误", f"执行失败:\n{str(e)}")
|
||
|
||
def step1_generate_thrift(self) -> bool:
|
||
"""步骤1: 根据 xlsx 和 cfg_txt.json 生成 thrift 文件"""
|
||
self.log("\n【步骤1】生成 Thrift 文件\n")
|
||
self.log("-"*80 + "\n")
|
||
|
||
try:
|
||
# 读取 cfg_txt.json
|
||
with open(self.cfg_json_path, 'r', encoding='utf-8') as f:
|
||
cfg = json.load(f)
|
||
|
||
file_list = cfg.get('file_list', [])
|
||
|
||
# 创建 thrift 目录
|
||
os.makedirs(self.thrift_dir, exist_ok=True)
|
||
|
||
generated_count = 0
|
||
struct_names = [] # 收集所有结构名,用于生成 AllConfigs
|
||
|
||
for config_item in file_list:
|
||
in_file = config_item.get('in_file', '')
|
||
out_file = config_item.get('out_file', '')
|
||
sheet_name = config_item.get('sheet_name', '')
|
||
coloum_types = config_item.get('coloum_type', []) # 从配置读取类型
|
||
|
||
if not all([in_file, out_file]):
|
||
continue
|
||
|
||
struct_name = out_file.replace('.txt', '')
|
||
excel_path = os.path.join(self.config_dir, in_file)
|
||
|
||
if not os.path.exists(excel_path):
|
||
self.log(f" [WARN] Excel 不存在: {in_file}\n")
|
||
continue
|
||
|
||
# 读取 Excel 获取字段和注释
|
||
try:
|
||
wb = openpyxl.load_workbook(excel_path, data_only=True)
|
||
|
||
# 查找 sheet
|
||
target_sheet = sheet_name if sheet_name in wb.sheetnames else wb.sheetnames[0]
|
||
ws = wb[target_sheet]
|
||
|
||
# 读取表头(第一行)- 根据coloum_types数量限制
|
||
first_row = list(ws[1])
|
||
headers = []
|
||
header_indices = [] # 记录有表头的列索引
|
||
|
||
# 只读取coloum_types定义的数量
|
||
max_columns = len(coloum_types) if coloum_types else len(first_row)
|
||
|
||
for idx, cell in enumerate(first_row):
|
||
if cell.value is not None and str(cell.value).strip():
|
||
headers.append(str(cell.value).strip())
|
||
header_indices.append(idx)
|
||
# 达到coloum_types定义的数量后停止
|
||
if len(headers) >= max_columns:
|
||
break
|
||
|
||
# 读取注释(第二行),只读取有表头的列
|
||
comments = []
|
||
if ws.max_row >= 2:
|
||
second_row = list(ws[2])
|
||
for idx in header_indices:
|
||
if idx < len(second_row) and second_row[idx].value is not None:
|
||
comments.append(str(second_row[idx].value).strip())
|
||
else:
|
||
comments.append("")
|
||
else:
|
||
comments = [""] * len(headers)
|
||
|
||
# 确保类型数量与表头匹配
|
||
if len(coloum_types) < len(headers):
|
||
# 类型不够,补充为 string
|
||
coloum_types = coloum_types + ['string'] * (len(headers) - len(coloum_types))
|
||
elif len(coloum_types) > len(headers):
|
||
# 类型多了,截断
|
||
coloum_types = coloum_types[:len(headers)]
|
||
|
||
wb.close()
|
||
|
||
# 生成 thrift 文件
|
||
thrift_content = self.generate_thrift_content(struct_name, headers, coloum_types, comments)
|
||
thrift_path = os.path.join(self.thrift_dir, f"{struct_name}.thrift")
|
||
|
||
with open(thrift_path, 'w', encoding='utf-8') as f:
|
||
f.write(thrift_content)
|
||
|
||
generated_count += 1
|
||
struct_names.append(struct_name)
|
||
self.log(f" [OK] 生成: {struct_name}.thrift\n")
|
||
|
||
except Exception as e:
|
||
self.log(f" [FAIL] 失败 {struct_name}: {str(e)}\n")
|
||
|
||
# 生成 AllConfigs.thrift(合并配置文件)
|
||
try:
|
||
self.log(f"\n 生成 AllConfigs.thrift...\n")
|
||
allconfigs_content = self.generate_allconfigs_content(struct_names)
|
||
allconfigs_path = os.path.join(self.thrift_dir, "AllConfigs.thrift")
|
||
with open(allconfigs_path, 'w', encoding='utf-8') as f:
|
||
f.write(allconfigs_content)
|
||
generated_count += 1
|
||
self.log(f" [OK] AllConfigs.thrift 生成成功\n")
|
||
except Exception as e:
|
||
self.log(f" [FAIL] AllConfigs.thrift 生成失败: {str(e)}\n")
|
||
|
||
# 保存配置文件列表供后续步骤使用
|
||
self.config_struct_names = struct_names
|
||
|
||
self.log(f"\n生成完成: {generated_count} 个 thrift 文件\n")
|
||
self.add_step_report("生成Thrift文件", "success", {
|
||
"total": generated_count,
|
||
"files": struct_names # 完整列表
|
||
})
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.log(f"[FAIL] 步骤1失败: {str(e)}\n")
|
||
self.add_step_report("生成Thrift文件", "failed", {"error": str(e)})
|
||
return False
|
||
|
||
def generate_thrift_content(self, struct_name: str, headers: List[str], types: List[str], comments: List[str]) -> str:
|
||
"""生成 thrift 文件内容,支持注释
|
||
|
||
Args:
|
||
struct_name: 结构体名称
|
||
headers: 字段名列表
|
||
types: 类型列表(从cfg_txt.json的coloum_type读取)
|
||
comments: 注释列表(从Excel第2行读取)
|
||
"""
|
||
lines = []
|
||
lines.append("namespace netstd Byway.Thrift.Data\n")
|
||
lines.append(f"\nstruct {struct_name}Item\n{{")
|
||
|
||
# 类型映射 - 支持多种格式(新版,更完整)
|
||
type_mapping = {
|
||
'int': 'i32',
|
||
'int32': 'i32',
|
||
'i32': 'i32',
|
||
'integer': 'i32',
|
||
'long': 'i64',
|
||
'int64': 'i64',
|
||
'i64': 'i64',
|
||
'float': 'double',
|
||
'double': 'double',
|
||
'string': 'string',
|
||
'str': 'string',
|
||
'text': 'string',
|
||
'bool': 'bool',
|
||
'boolean': 'bool',
|
||
'i8': 'i8',
|
||
'i16': 'i16'
|
||
}
|
||
|
||
for idx, header in enumerate(headers):
|
||
field_type = 'string' # 默认类型
|
||
|
||
if idx < len(types) and types[idx]:
|
||
type_str = str(types[idx]).strip().lower()
|
||
field_type = type_mapping.get(type_str, 'string')
|
||
|
||
# 调试信息:记录未知类型
|
||
if type_str and type_str not in type_mapping:
|
||
self.log(f" [WARN] 未知类型 '{types[idx]}' (字段: {header}),使用默认类型 string\n")
|
||
|
||
# 获取注释
|
||
comment = comments[idx] if idx < len(comments) and comments[idx] else ""
|
||
|
||
# 生成字段,带注释
|
||
if comment:
|
||
lines.append(f"\t{idx+1}:{field_type} {header}, // {comment}")
|
||
else:
|
||
lines.append(f"\t{idx+1}:{field_type} {header},")
|
||
|
||
lines.append("}\n")
|
||
lines.append(f"\nstruct {struct_name}\n{{")
|
||
lines.append(f"\t1:map<i32,{struct_name}Item> {struct_name.lower()}s,")
|
||
lines.append("}\n")
|
||
|
||
return "\n".join(lines)
|
||
|
||
def generate_allconfigs_content(self, struct_names: List[str]) -> str:
|
||
"""生成 AllConfigs.thrift 文件内容"""
|
||
lines = []
|
||
lines.append("namespace netstd Byway.Thrift.Data")
|
||
|
||
# 添加所有 include
|
||
for struct_name in struct_names:
|
||
lines.append(f'include "{struct_name}.thrift"')
|
||
|
||
lines.append("")
|
||
lines.append("struct AllConfigs")
|
||
lines.append("{")
|
||
|
||
# 添加所有字段(PascalCase命名,与类型名相同,C#属性名也会是PascalCase)
|
||
for idx, struct_name in enumerate(struct_names, 1):
|
||
# 直接使用struct_name作为字段名(PascalCase)
|
||
lines.append(f"\t{idx}:{struct_name}.{struct_name} {struct_name},")
|
||
|
||
lines.append("}")
|
||
|
||
return "\n".join(lines)
|
||
|
||
def step2_compile_to_csharp(self) -> bool:
|
||
"""步骤2: 用 thrift.exe 编译成 C#"""
|
||
self.log("\n【步骤2】编译 Thrift 到 C#\n")
|
||
self.log("-"*80 + "\n")
|
||
|
||
try:
|
||
os.makedirs(self.csharp_output_dir, exist_ok=True)
|
||
|
||
thrift_files = [f for f in os.listdir(self.thrift_dir) if f.endswith('.thrift')]
|
||
success_count = 0
|
||
failed_list = []
|
||
|
||
self.log(f" 找到 {len(thrift_files)} 个 thrift 文件\n")
|
||
self.log(f" 输出目录: {self.csharp_output_dir}\n\n")
|
||
|
||
for idx, thrift_file in enumerate(thrift_files, 1):
|
||
thrift_path = os.path.join(self.thrift_dir, thrift_file)
|
||
|
||
cmd = [
|
||
self.compiler_path,
|
||
'-strict', '-v', '-r',
|
||
'--gen', 'netstd:unity,serial,async_postfix',
|
||
'-out', self.csharp_output_dir,
|
||
thrift_path
|
||
]
|
||
|
||
# 原始:添加超时设置
|
||
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=30)
|
||
|
||
if result.returncode == 0:
|
||
success_count += 1
|
||
# 原始:改进进度显示
|
||
if idx % 10 == 0 or idx == 1 or idx == len(thrift_files):
|
||
self.log(f" [OK] [{idx}/{len(thrift_files)}] {thrift_file}\n")
|
||
self.root.update() # 原始:刷新UI
|
||
else:
|
||
self.log(f" [FAIL] 编译失败: {thrift_file}\n")
|
||
error_msg = result.stderr if result.stderr else result.stdout
|
||
if error_msg:
|
||
self.log(f" 错误详情:\n{error_msg}\n")
|
||
# 保存详细错误信息到报告
|
||
failed_list.append({
|
||
"name": thrift_file,
|
||
"error": error_msg[:200] if error_msg else "未知错误"
|
||
})
|
||
|
||
if success_count > 5:
|
||
self.log(f" ... 共编译 {success_count} 个文件\n")
|
||
|
||
# 检查生成的C#文件
|
||
csharp_actual_dir = os.path.join(self.csharp_output_dir, 'Byway', 'Thrift', 'Data')
|
||
cs_files = []
|
||
if os.path.exists(csharp_actual_dir):
|
||
cs_files = [f for f in os.listdir(csharp_actual_dir) if f.endswith('.cs')]
|
||
self.log(f"\n C# 文件生成在: {csharp_actual_dir}\n")
|
||
self.log(f" 生成了 {len(cs_files)} 个 C# 文件\n")
|
||
|
||
# 立即删除 AllConfigs.Extensions.cs(避免后续被复制)
|
||
allconfigs_ext = os.path.join(csharp_actual_dir, 'AllConfigs.Extensions.cs')
|
||
if os.path.exists(allconfigs_ext):
|
||
os.remove(allconfigs_ext)
|
||
self.log(f" [OK] 删除 AllConfigs.Extensions.cs(避免二义性冲突)\n")
|
||
# 更新文件计数
|
||
cs_files = [f for f in os.listdir(csharp_actual_dir) if f.endswith('.cs')]
|
||
self.log(f" 最终保留 {len(cs_files)} 个 C# 文件\n")
|
||
|
||
self.log(f"\n编译完成: {success_count}/{len(thrift_files)}\n")
|
||
self.add_step_report("编译到C#", "success", {
|
||
"success": success_count,
|
||
"failed": len(failed_list),
|
||
"total": len(thrift_files),
|
||
"cs_files_count": len(cs_files),
|
||
"failed_files": failed_list
|
||
})
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.log(f"[FAIL] 步骤2失败: {str(e)}\n")
|
||
self.add_step_report("编译到C#", "failed", {"error": str(e)})
|
||
return False
|
||
|
||
def step3_generate_bytes(self) -> bool:
|
||
"""步骤3: 生成 bytes 文件"""
|
||
self.log("\n【步骤3】生成 Bytes 文件\n")
|
||
self.log("-"*80 + "\n")
|
||
|
||
try:
|
||
# 先生成 gen-py
|
||
os.makedirs(self.gen_py_dir, exist_ok=True)
|
||
|
||
thrift_files = [f for f in os.listdir(self.thrift_dir) if f.endswith('.thrift')]
|
||
|
||
# 编译为 Python
|
||
for thrift_file in thrift_files:
|
||
thrift_path = os.path.join(self.thrift_dir, thrift_file)
|
||
cmd = [self.compiler_path, '--gen', 'py', '-out', str(self.gen_py_dir), thrift_path]
|
||
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
|
||
self.log(f" [OK] 生成 gen-py 完成\n")
|
||
|
||
# 导入 thrift 库并生成 bytes
|
||
from thrift.protocol import TBinaryProtocol
|
||
from thrift.transport import TTransport
|
||
|
||
sys.path.insert(0, str(self.gen_py_dir))
|
||
|
||
# 读取配置
|
||
with open(self.cfg_json_path, 'r', encoding='utf-8') as f:
|
||
cfg = json.load(f)
|
||
|
||
os.makedirs(self.bytes_output_dir, exist_ok=True)
|
||
|
||
success_count = 0
|
||
failed_list = []
|
||
success_list = []
|
||
config_containers = {} # 保存每个配置的 container 对象 {struct_name: container}
|
||
file_list = cfg.get('file_list', [])
|
||
|
||
self.log(f" 找到 {len(file_list)} 个配置项\n\n")
|
||
|
||
# 注意:现在只生成 AllConfigs.bytes,不生成单个配置的 bytes 文件
|
||
self.log(f" [INFO] 新策略:只生成 AllConfigs.bytes(合并文件),跳过单个 bytes 文件生成\n\n")
|
||
|
||
# 收集所有成功处理的配置对象,用于生成 AllConfigs
|
||
for idx, config_item in enumerate(file_list, 1):
|
||
in_file = config_item.get('in_file', '')
|
||
out_file = config_item.get('out_file', '')
|
||
sheet_name = config_item.get('sheet_name', '')
|
||
coloum_types = config_item.get('coloum_type', []) # 从配置读取类型
|
||
|
||
# 检查配置完整性(原始:检查所有必需项)
|
||
if not all([in_file, out_file, sheet_name, coloum_types]):
|
||
self.log(f" [{idx}/{len(file_list)}] [WARN] 配置项不完整,跳过: {in_file}\n\n")
|
||
continue
|
||
|
||
struct_name = out_file.replace('.txt', '')
|
||
excel_path = os.path.join(self.config_dir, in_file)
|
||
|
||
# 原始:显示处理进度(只验证配置,不生成单个bytes)
|
||
self.log(f" [{idx}/{len(file_list)}] 验证: {in_file} -> {struct_name}\n")
|
||
|
||
try:
|
||
# 动态导入模块(只验证结构体存在)
|
||
module_name = f"{struct_name}.ttypes"
|
||
ttypes_module = importlib.import_module(module_name)
|
||
item_class = getattr(ttypes_module, f"{struct_name}Item")
|
||
container_class = getattr(ttypes_module, struct_name)
|
||
|
||
# 读取 Excel 数据
|
||
wb = openpyxl.load_workbook(excel_path, data_only=True)
|
||
target_sheet = sheet_name if sheet_name in wb.sheetnames else wb.sheetnames[0]
|
||
ws = wb[target_sheet]
|
||
|
||
# 读取表头(第一行),记录有表头的列索引(原始:使用字典)- 根据coloum_types数量限制
|
||
first_row = list(ws[1])
|
||
headers = []
|
||
header_indices = []
|
||
excel_headers = {} # {header_name: column_index}
|
||
|
||
# 只读取coloum_types定义的数量
|
||
max_columns = len(coloum_types) if coloum_types else len(first_row)
|
||
|
||
for idx, cell in enumerate(first_row):
|
||
if cell.value is not None and str(cell.value).strip():
|
||
header_name = str(cell.value).strip()
|
||
headers.append(header_name)
|
||
header_indices.append(idx)
|
||
excel_headers[header_name] = idx
|
||
# 达到coloum_types定义的数量后停止
|
||
if len(headers) >= max_columns:
|
||
break
|
||
|
||
# 验证字段匹配(原始:添加缺少字段警告)
|
||
missing_fields = []
|
||
for header in headers:
|
||
# 这里简单检查,如果需要可以和thrift定义对比
|
||
pass
|
||
|
||
# 读取数据(从第3行开始),使用字典存储(原始:更安全)
|
||
data_rows = []
|
||
for row in ws.iter_rows(min_row=3, values_only=False):
|
||
row_data = {}
|
||
has_data = False
|
||
|
||
for header in headers:
|
||
col_idx = excel_headers[header]
|
||
if col_idx < len(row):
|
||
cell_value = row[col_idx].value
|
||
if cell_value is not None:
|
||
has_data = True
|
||
row_data[header] = cell_value
|
||
else:
|
||
row_data[header] = None
|
||
|
||
if has_data:
|
||
data_rows.append(row_data)
|
||
|
||
wb.close()
|
||
|
||
# 创建容器
|
||
container = container_class()
|
||
container_items = {}
|
||
|
||
for idx, row_data in enumerate(data_rows):
|
||
if not any(row_data.values()):
|
||
continue
|
||
|
||
item = item_class()
|
||
|
||
# 获取ID(原始:灵活查找多个可能的ID列名)
|
||
id_value = None
|
||
for id_field in ['id', 'Id', 'ID', 'Key']:
|
||
if id_field in row_data and row_data[id_field] is not None:
|
||
id_value = row_data[id_field]
|
||
break
|
||
|
||
# 如果找不到ID,使用当前方式(兜底)
|
||
if id_value is None:
|
||
# 尝试使用第一个字段
|
||
if headers and headers[0] in row_data:
|
||
id_value = row_data[headers[0]]
|
||
|
||
if id_value is None:
|
||
id_value = idx + 1
|
||
|
||
try:
|
||
id_value = int(id_value)
|
||
except (ValueError, TypeError):
|
||
id_value = idx + 1
|
||
|
||
# 填充每个字段
|
||
for col_idx, header in enumerate(headers):
|
||
value = row_data.get(header)
|
||
|
||
if value is not None:
|
||
# 类型转换(使用cfg_txt.json的coloum_type)(原始:详细错误处理)
|
||
if col_idx < len(coloum_types) and coloum_types[col_idx]:
|
||
type_str = str(coloum_types[col_idx]).lower().strip()
|
||
try:
|
||
if type_str in ['int', 'i32', 'i16', 'i8', 'int32', 'integer']:
|
||
value = int(value)
|
||
elif type_str in ['long', 'i64', 'int64']:
|
||
value = int(value)
|
||
elif type_str in ['float', 'double']:
|
||
value = float(value)
|
||
elif type_str in ['bool', 'boolean']:
|
||
value = str(value).upper() in ["TRUE", "1", "YES"]
|
||
else:
|
||
value = str(value)
|
||
|
||
# 设置字段(原始:setattr在try内)
|
||
setattr(item, header, value)
|
||
except (ValueError, TypeError) as e:
|
||
# 原始:详细错误信息
|
||
raise ValueError(f"字段 '{header}' (类型: {type_str}) 转换失败,值: '{value}' (行ID: {id_value})")
|
||
except AttributeError as e:
|
||
raise AttributeError(f"字段 '{header}' 不存在于结构体中 (行ID: {id_value})")
|
||
else:
|
||
# 没有类型定义,转为字符串
|
||
try:
|
||
setattr(item, header, str(value))
|
||
except AttributeError as e:
|
||
raise AttributeError(f"字段 '{header}' 不存在于结构体中 (行ID: {id_value})")
|
||
|
||
container_items[id_value] = item
|
||
|
||
setattr(container, f"{struct_name.lower()}s", container_items)
|
||
|
||
# 不再生成单个 bytes 文件,但保存 container 对象用于生成 AllConfigs
|
||
self.log(f" [OK] {struct_name} 验证通过(数据行数: {len(data_rows)})\n")
|
||
success_count += 1
|
||
success_list.append(struct_name)
|
||
config_containers[struct_name] = container # 保存 container 对象
|
||
|
||
# 原始:定期刷新UI
|
||
if idx % 10 == 0 or idx == 1 or idx == len(file_list):
|
||
self.root.update()
|
||
|
||
self.log("\n")
|
||
|
||
except Exception as e:
|
||
error_detail = traceback.format_exc()
|
||
self.log(f" [FAIL] {struct_name}: {str(e)}\n")
|
||
self.log(f" 详细错误:\n{error_detail}\n")
|
||
failed_list.append({
|
||
"name": struct_name,
|
||
"error": str(e),
|
||
"detail": error_detail
|
||
})
|
||
|
||
if failed_list:
|
||
self.log(f"\n失败: {len(failed_list)} 个配置项\n")
|
||
for failed in failed_list:
|
||
self.log(f" - {failed['name']}: {failed['error'][:100]}\n")
|
||
|
||
# 生成 AllConfigs.bytes(唯一的 bytes 文件)
|
||
self.log(f"\n生成 AllConfigs.bytes(合并所有配置)...\n")
|
||
allconfigs_generated = False
|
||
if self.generate_all_configs_bytes(config_containers):
|
||
self.log(f" [OK] AllConfigs.bytes 生成成功\n")
|
||
allconfigs_generated = True
|
||
else:
|
||
self.log(f" [FAIL] AllConfigs.bytes 生成失败\n")
|
||
|
||
# 只保存 AllConfigs 到列表,供步骤6使用
|
||
self.generated_bytes_list = ['AllConfigs'] if allconfigs_generated else []
|
||
|
||
# 最终统计
|
||
if allconfigs_generated:
|
||
self.log(f"\n[OK] 生成完成: AllConfigs.bytes(包含 {success_count} 个配置)\n")
|
||
else:
|
||
self.log(f"\n[FAIL] 生成失败\n")
|
||
|
||
self.add_step_report("生成Bytes文件", "success", {
|
||
"success": success_count,
|
||
"failed": len(failed_list),
|
||
"total": len(file_list) + (1 if allconfigs_generated else 0),
|
||
"success_files": success_list,
|
||
"failed_files": failed_list, # 完整错误信息
|
||
"allconfigs": allconfigs_generated
|
||
})
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.log(f"[FAIL] 步骤3失败: {str(e)}\n")
|
||
self.log(traceback.format_exc())
|
||
self.add_step_report("生成Bytes文件", "failed", {"error": str(e)})
|
||
return False
|
||
|
||
def step4_cleanup_extensions(self) -> bool:
|
||
"""步骤4: 清理冲突的 Extensions 文件(双重检查)"""
|
||
self.log("\n【步骤4】清理冲突的 Extensions 文件(双重检查)\n")
|
||
self.log("-"*80 + "\n")
|
||
|
||
try:
|
||
# C# 文件实际在 Byway\Thrift\Data 子目录中
|
||
csharp_actual_dir = os.path.join(self.csharp_output_dir, 'Byway', 'Thrift', 'Data')
|
||
|
||
deleted_count = 0
|
||
deleted_files = []
|
||
|
||
if os.path.exists(csharp_actual_dir):
|
||
# 再次检查是否有 AllConfigs.Extensions.cs(双重保险)
|
||
allconfigs_ext = os.path.join(csharp_actual_dir, 'AllConfigs.Extensions.cs')
|
||
if os.path.exists(allconfigs_ext):
|
||
os.remove(allconfigs_ext)
|
||
deleted_count += 1
|
||
deleted_files.append('AllConfigs.Extensions.cs')
|
||
self.log(f" [OK] 删除残留的 AllConfigs.Extensions.cs\n")
|
||
else:
|
||
self.log(f" [OK] 确认 AllConfigs.Extensions.cs 已不存在\n")
|
||
|
||
# 统计保留的 Extensions 文件
|
||
extension_files = [f for f in os.listdir(csharp_actual_dir) if f.endswith('.Extensions.cs')]
|
||
self.log(f" 保留 {len(extension_files)} 个配置的 Extensions 文件\n")
|
||
|
||
self.log(f"\n清理完成: {deleted_count} 个文件被删除\n")
|
||
self.add_step_report("清理Extensions", "success", {
|
||
"deleted": deleted_count,
|
||
"deleted_files": deleted_files
|
||
})
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.log(f"[FAIL] 步骤4失败: {str(e)}\n")
|
||
self.add_step_report("清理Extensions", "failed", {"error": str(e)})
|
||
return False
|
||
|
||
def step5_copy_csharp(self) -> bool:
|
||
"""步骤5: 复制 C# 文件到 Unity"""
|
||
self.log("\n【步骤5】复制 C# 文件到 Unity\n")
|
||
self.log("-"*80 + "\n")
|
||
|
||
try:
|
||
# 在Unity目录下也创建 Byway\Thrift\Data 子目录结构
|
||
unity_target_dir = os.path.join(self.unity_csharp_dir, 'Byway', 'Thrift', 'Data')
|
||
os.makedirs(unity_target_dir, exist_ok=True)
|
||
|
||
# 先清理Unity目录中的AllConfigs.Extensions.cs(如果存在旧文件)
|
||
unity_allconfigs_ext = os.path.join(unity_target_dir, 'AllConfigs.Extensions.cs')
|
||
if os.path.exists(unity_allconfigs_ext):
|
||
os.remove(unity_allconfigs_ext)
|
||
self.log(f" [OK] 清理Unity目录中的旧 AllConfigs.Extensions.cs\n")
|
||
|
||
copied_count = 0
|
||
copied_files = []
|
||
|
||
# C# 文件在 Byway\Thrift\Data 子目录中
|
||
csharp_actual_dir = os.path.join(self.csharp_output_dir, 'Byway', 'Thrift', 'Data')
|
||
|
||
if os.path.exists(csharp_actual_dir):
|
||
self.log(f" 源目录: {csharp_actual_dir}\n")
|
||
self.log(f" 目标目录: {unity_target_dir}\n\n")
|
||
|
||
# 复制所有C#文件(AllConfigsExtensions.cs已在步骤4删除)
|
||
for file in os.listdir(csharp_actual_dir):
|
||
if file.endswith('.cs'):
|
||
src = os.path.join(csharp_actual_dir, file)
|
||
dst = os.path.join(unity_target_dir, file)
|
||
shutil.copy2(src, dst)
|
||
copied_files.append(file)
|
||
copied_count += 1
|
||
|
||
# 标记文件类型
|
||
if file == 'AllConfigs.cs':
|
||
self.log(f" [OK] {file} [合并配置类]\n")
|
||
elif file.endswith('Item.cs'):
|
||
self.log(f" [OK] {file} [Item文件]\n")
|
||
elif file.endswith('.Extensions.cs'):
|
||
self.log(f" [OK] {file} [Extensions文件]\n")
|
||
else:
|
||
self.log(f" [OK] {file}\n")
|
||
|
||
else:
|
||
self.log(f" [FAIL] 未找到C#文件: {csharp_actual_dir}\n")
|
||
|
||
self.log(f"\n复制完成: {copied_count} 个 C# 文件\n")
|
||
|
||
self.add_step_report("复制C#到Unity", "success", {
|
||
"count": copied_count,
|
||
"files": copied_files # 完整列表
|
||
})
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.log(f"[FAIL] 步骤5失败: {str(e)}\n")
|
||
self.add_step_report("复制C#到Unity", "failed", {"error": str(e)})
|
||
return False
|
||
|
||
def step6_copy_bytes(self) -> bool:
|
||
"""步骤6: 复制 Bytes 文件到 Unity"""
|
||
self.log("\n【步骤6】复制 Bytes 文件到 Unity\n")
|
||
self.log("-"*80 + "\n")
|
||
|
||
try:
|
||
os.makedirs(self.unity_bytes_dir, exist_ok=True)
|
||
|
||
copied_count = 0
|
||
copied_files = []
|
||
skipped_files = []
|
||
|
||
self.log(f" 源目录: {self.bytes_output_dir}\n")
|
||
self.log(f" 目标目录: {self.unity_bytes_dir}\n")
|
||
self.log(f" [INFO] 只复制 AllConfigs.bytes\n\n")
|
||
|
||
# 只复制 AllConfigs.bytes
|
||
allconfigs_file = 'AllConfigs.bytes'
|
||
allconfigs_src = os.path.join(self.bytes_output_dir, allconfigs_file)
|
||
|
||
if os.path.exists(allconfigs_src):
|
||
allconfigs_dst = os.path.join(self.unity_bytes_dir, allconfigs_file)
|
||
shutil.copy2(allconfigs_src, allconfigs_dst)
|
||
copied_files.append(allconfigs_file)
|
||
copied_count = 1
|
||
self.log(f" [OK] {allconfigs_file} [唯一的配置文件]\n")
|
||
else:
|
||
self.log(f" [FAIL] 未找到 {allconfigs_file}\n")
|
||
|
||
# 列出所有其他 bytes 文件(已废弃)
|
||
other_bytes = [f for f in os.listdir(self.bytes_output_dir)
|
||
if f.endswith('.bytes') and f != allconfigs_file]
|
||
if other_bytes:
|
||
self.log(f"\n [INFO] 发现 {len(other_bytes)} 个其他 bytes 文件(已废弃,不复制):\n")
|
||
for f in other_bytes:
|
||
self.log(f" - {f}\n")
|
||
|
||
self.log(f"\n复制完成: {copied_count} 个 bytes 文件\n")
|
||
|
||
self.add_step_report("复制Bytes到Unity", "success", {
|
||
"count": copied_count,
|
||
"expected": len(self.generated_bytes_list),
|
||
"files": copied_files, # 完整列表
|
||
"skipped": skipped_files
|
||
})
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.log(f"[FAIL] 步骤6失败: {str(e)}\n")
|
||
self.add_step_report("复制Bytes到Unity", "failed", {"error": str(e)})
|
||
return False
|
||
|
||
def step7_cleanup_temp(self):
|
||
"""步骤7: 清理临时文件 (gen-py)"""
|
||
self.log("\n【步骤7】清理临时文件\n")
|
||
self.log("-"*80 + "\n")
|
||
|
||
try:
|
||
if self.gen_py_dir.exists():
|
||
shutil.rmtree(self.gen_py_dir)
|
||
self.log(" [OK] 清理 gen-py 目录\n")
|
||
|
||
self.add_step_report("清理临时文件", "success", {})
|
||
|
||
except Exception as e:
|
||
self.log(f" [WARN] 清理失败: {str(e)}\n")
|
||
self.add_step_report("清理临时文件", "failed", {"error": str(e)})
|
||
|
||
def step8_generate_dr(self) -> bool:
|
||
"""步骤8: 生成 DataRow 文件"""
|
||
self.log("\n【步骤8】生成 DataRow 文件\n")
|
||
self.log("-"*80 + "\n")
|
||
|
||
try:
|
||
# 确保输出目录存在
|
||
os.makedirs(self.dr_output_dir, exist_ok=True)
|
||
|
||
# C# 文件目录
|
||
csharp_actual_dir = os.path.join(self.csharp_output_dir, 'Byway', 'Thrift', 'Data')
|
||
|
||
if not os.path.exists(csharp_actual_dir):
|
||
self.log(f" [FAIL] C# 文件目录不存在: {csharp_actual_dir}\n")
|
||
self.add_step_report("生成DataRow文件", "failed", {"error": "C# 文件目录不存在"})
|
||
return False
|
||
|
||
# 获取所有 C# 配置文件
|
||
cs_files = [f for f in os.listdir(csharp_actual_dir) if f.endswith('.cs')]
|
||
|
||
success_count = 0
|
||
failed_list = []
|
||
success_list = []
|
||
|
||
self.log(f" 找到 {len(cs_files)} 个 C# 文件\n\n")
|
||
|
||
for idx, cs_file in enumerate(cs_files, 1):
|
||
file_path = os.path.join(csharp_actual_dir, cs_file)
|
||
|
||
try:
|
||
# 解析配置类
|
||
config_info = self.parse_config_class(file_path)
|
||
if not config_info:
|
||
continue
|
||
|
||
self.log(f" [{idx}/{len(cs_files)}] {config_info['class_name']}\n")
|
||
|
||
# 解析 Item 类属性
|
||
item_file_path = os.path.join(csharp_actual_dir, f"{config_info['item_type']}.cs")
|
||
properties = self.parse_item_class(item_file_path)
|
||
|
||
if not properties:
|
||
self.log(f" [WARN] 没有找到属性\n")
|
||
continue
|
||
|
||
# 生成 DR 代码
|
||
dr_code = self.generate_dr_code(config_info, properties)
|
||
|
||
# 写入文件
|
||
output_file = os.path.join(self.dr_output_dir, f"DR{config_info['class_name']}.cs")
|
||
with open(output_file, 'w', encoding='utf-8') as f:
|
||
f.write(dr_code)
|
||
|
||
success_count += 1
|
||
success_list.append(f"DR{config_info['class_name']}.cs")
|
||
|
||
if idx % 10 == 0 or idx == 1 or idx == len(cs_files):
|
||
self.root.update()
|
||
|
||
except Exception as e:
|
||
self.log(f" [FAIL] 生成失败: {str(e)}\n")
|
||
failed_list.append({"name": cs_file, "error": str(e)})
|
||
|
||
self.log(f"\n生成完成: {success_count} 个 DR 文件\n")
|
||
self.log(f"输出目录: {self.dr_output_dir}\n")
|
||
|
||
self.add_step_report("生成DataRow文件", "success", {
|
||
"success": success_count,
|
||
"failed": len(failed_list),
|
||
"total": len(cs_files),
|
||
"success_files": success_list,
|
||
"failed_files": failed_list
|
||
})
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.log(f"[FAIL] 步骤8失败: {str(e)}\n")
|
||
self.log(traceback.format_exc())
|
||
self.add_step_report("生成DataRow文件", "failed", {"error": str(e)})
|
||
return False
|
||
|
||
def parse_config_class(self, file_path: str) -> Dict[str, str]:
|
||
"""解析配置类文件,提取类名和 Dictionary 属性"""
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
|
||
# 提取类名:public partial class XXX : TBase
|
||
class_match = re.search(r'public\s+partial\s+class\s+(\w+)\s*:\s*TBase', content)
|
||
if not class_match:
|
||
return None
|
||
|
||
class_name = class_match.group(1)
|
||
|
||
# 提取 Dictionary 属性:public Dictionary<int, XXXItem> PropertyName
|
||
dict_match = re.search(
|
||
r'public\s+Dictionary<int,\s*global::[\w.]+\.(\w+)>\s+(\w+)',
|
||
content
|
||
)
|
||
|
||
if not dict_match:
|
||
return None
|
||
|
||
item_type = dict_match.group(1)
|
||
dict_property = dict_match.group(2)
|
||
|
||
return {
|
||
'class_name': class_name,
|
||
'item_type': item_type,
|
||
'dict_property': dict_property
|
||
}
|
||
except Exception as e:
|
||
return None
|
||
|
||
def parse_item_class(self, file_path: str) -> List[Dict[str, str]]:
|
||
"""解析 Item 类,提取所有公共属性"""
|
||
try:
|
||
if not os.path.exists(file_path):
|
||
return []
|
||
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
|
||
# 查找所有公共属性:public TYPE PropertyName
|
||
properties = []
|
||
for match in re.finditer(r'public\s+([\w<>,\.\s]+)\s+(\w+)\s*\{', content):
|
||
prop_type = match.group(1).strip()
|
||
prop_name = match.group(2)
|
||
|
||
# 跳过特殊属性
|
||
if prop_name in ['__isset', 'Isset']:
|
||
continue
|
||
|
||
# 映射 C# 类型
|
||
cs_type, default_val = self.get_type_info(prop_type)
|
||
|
||
properties.append({
|
||
'name': prop_name,
|
||
'type': cs_type,
|
||
'default': default_val,
|
||
'original_type': prop_type # 保存原始类型用于判断
|
||
})
|
||
|
||
return properties
|
||
except Exception as e:
|
||
return []
|
||
|
||
def get_type_info(self, cs_type: str) -> Tuple[str, str]:
|
||
"""获取类型信息和默认值"""
|
||
type_map = {
|
||
'int': ('int', '0'),
|
||
'long': ('long', '0'),
|
||
'double': ('double', '0.0'),
|
||
'float': ('float', '0f'),
|
||
'bool': ('bool', 'false'),
|
||
'string': ('string', '""'),
|
||
}
|
||
|
||
# 处理泛型类型
|
||
if 'Dictionary' in cs_type or 'List' in cs_type:
|
||
return ('object', 'null')
|
||
|
||
# 处理基础类型
|
||
for key, (mapped_type, default_val) in type_map.items():
|
||
if key in cs_type.lower():
|
||
return (mapped_type, default_val)
|
||
|
||
# 默认为 string
|
||
return ('string', '""')
|
||
|
||
def generate_all_configs_bytes(self, config_data_map: Dict[str, Any]) -> bool:
|
||
"""
|
||
生成 AllConfigs.bytes(合并所有配置)
|
||
参数 config_data_map: {struct_name: container_object} 的映射
|
||
"""
|
||
try:
|
||
from thrift.protocol import TBinaryProtocol
|
||
from thrift.transport import TTransport
|
||
|
||
# 动态导入 AllConfigs 模块
|
||
allconfigs_module = importlib.import_module("AllConfigs.ttypes")
|
||
AllConfigsClass = getattr(allconfigs_module, "AllConfigs")
|
||
|
||
# 创建 AllConfigs 实例
|
||
all_configs = AllConfigsClass()
|
||
|
||
# 直接使用传入的配置对象
|
||
for struct_name, container in config_data_map.items():
|
||
try:
|
||
# 设置到AllConfigs(字段名为PascalCase,与类型名相同)
|
||
setattr(all_configs, struct_name, container)
|
||
self.log(f" [OK] 添加 {struct_name}\n")
|
||
|
||
except Exception as e:
|
||
self.log(f" [FAIL] {struct_name} 添加失败: {str(e)}\n")
|
||
continue
|
||
|
||
# 序列化 AllConfigs
|
||
transport = TTransport.TMemoryBuffer()
|
||
protocol = TBinaryProtocol.TBinaryProtocol(transport)
|
||
all_configs.write(protocol)
|
||
binary_data = transport.getvalue()
|
||
|
||
# 写入 AllConfigs.bytes
|
||
allconfigs_path = os.path.join(self.bytes_output_dir, "AllConfigs.bytes")
|
||
with open(allconfigs_path, 'wb') as f:
|
||
f.write(binary_data)
|
||
|
||
self.log(f" 总大小: {len(binary_data)} bytes ({len(binary_data) / 1024:.2f} KB)\n")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.log(f" 生成失败: {str(e)}\n")
|
||
self.log(traceback.format_exc())
|
||
return False
|
||
|
||
def generate_dr_code(self, config_info: Dict, properties: List[Dict]) -> str:
|
||
"""生成 DR 类代码"""
|
||
class_name = config_info['class_name']
|
||
item_type = config_info['item_type']
|
||
dict_property = config_info['dict_property']
|
||
|
||
lines = []
|
||
lines.append("// 此文件由 DataRowGenerator 自动生成,请勿手动修改")
|
||
lines.append(f"// 配置类: {class_name}")
|
||
lines.append(f"// 数据类: {item_type}")
|
||
lines.append("")
|
||
lines.append("using UnityEngine;")
|
||
lines.append("using Byway.Config;")
|
||
lines.append("using Byway.Thrift.Data;")
|
||
lines.append("using UnityGameFramework.Runtime;")
|
||
lines.append("")
|
||
lines.append("namespace CrazyMaple")
|
||
lines.append("{")
|
||
lines.append(f" /// <summary>")
|
||
lines.append(f" /// {class_name} 数据行")
|
||
lines.append(f" /// </summary>")
|
||
lines.append(f" public class DR{class_name} : DataRowBase")
|
||
lines.append(" {")
|
||
lines.append(f" private {item_type} _configData;")
|
||
lines.append("")
|
||
|
||
# Id 属性
|
||
id_prop = next((p for p in properties if p['name'].lower() == 'id'), None)
|
||
lines.append(" /// <summary>")
|
||
lines.append(" /// 唯一标识")
|
||
lines.append(" /// </summary>")
|
||
lines.append(" public override int Id")
|
||
lines.append(" {")
|
||
lines.append(" get")
|
||
lines.append(" {")
|
||
if id_prop:
|
||
lines.append(f" return _configData?.{id_prop['name']} ?? 0;")
|
||
else:
|
||
lines.append(" return 0;")
|
||
lines.append(" }")
|
||
lines.append(" }")
|
||
lines.append("")
|
||
|
||
# 其他属性
|
||
for prop in properties:
|
||
if prop['name'].lower() == 'id':
|
||
continue
|
||
|
||
lines.append(f" /// <summary>")
|
||
lines.append(f" /// {prop['name']}")
|
||
lines.append(f" /// </summary>")
|
||
|
||
# 特殊处理:double类型在DR中返回float
|
||
original_type = prop.get('original_type', '').lower()
|
||
if 'double' in original_type:
|
||
lines.append(f" public float {prop['name']}")
|
||
lines.append(" {")
|
||
lines.append(" get")
|
||
lines.append(" {")
|
||
lines.append(f" return System.Convert.ToSingle(_configData?.{prop['name']} ?? 0.0);")
|
||
lines.append(" }")
|
||
lines.append(" }")
|
||
else:
|
||
lines.append(f" public {prop['type']} {prop['name']}")
|
||
lines.append(" {")
|
||
lines.append(" get")
|
||
lines.append(" {")
|
||
lines.append(f" return _configData?.{prop['name']} ?? {prop['default']};")
|
||
lines.append(" }")
|
||
lines.append(" }")
|
||
lines.append("")
|
||
|
||
# LoadFromConfig 方法
|
||
lines.append(" /// <summary>")
|
||
lines.append(" /// 从配置加载数据")
|
||
lines.append(" /// </summary>")
|
||
lines.append(" public void LoadFromConfig(int id)")
|
||
lines.append(" {")
|
||
# UIForm 特殊处理
|
||
if class_name == "UIForm":
|
||
lines.append(f" var config = ConfigManager.Instance.GetConfig<Byway.Thrift.Data.UIForm>();")
|
||
else:
|
||
lines.append(f" var config = ConfigManager.Instance.GetConfig<{class_name}>();")
|
||
lines.append(f" if (config?.{dict_property} != null)")
|
||
lines.append(" {")
|
||
lines.append(f" config.{dict_property}.TryGetValue(id, out _configData);")
|
||
lines.append(" }")
|
||
lines.append(" }")
|
||
lines.append("")
|
||
|
||
# ParseDataRow 方法
|
||
lines.append(" /// <summary>")
|
||
lines.append(" /// 解析数据行(兼容旧系统,实际使用 LoadFromConfig)")
|
||
lines.append(" /// </summary>")
|
||
lines.append(" public override bool ParseDataRow(string dataRowString, object userData)")
|
||
lines.append(" {")
|
||
lines.append(" int id = 0;")
|
||
lines.append(" if (int.TryParse(dataRowString, out id))")
|
||
lines.append(" {")
|
||
lines.append(" LoadFromConfig(id);")
|
||
lines.append(" return _configData != null;")
|
||
lines.append(" }")
|
||
lines.append(" return false;")
|
||
lines.append(" }")
|
||
lines.append(" }")
|
||
lines.append("}")
|
||
|
||
return '\n'.join(lines)
|
||
|
||
|
||
def main():
|
||
root = tk.Tk()
|
||
app = IntegratedPipeline(root)
|
||
root.mainloop()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|