thrift-related/PythonWorkSpace/IntegratedTool/integrated_pipeline.py
2026-01-16 10:37:17 +08:00

1563 lines
70 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Thrift 配置文件完整流程工具
从 Excel 到 Unity 的一站式处理
"""
import tkinter as tk
from tkinter import ttk, filedialog, messagebox, scrolledtext
import json
import os
import sys
import re
import subprocess
import shutil
import importlib
import traceback
from pathlib import Path
from typing import List, Dict, Any, Tuple
import openpyxl
from datetime import datetime
# 版本号
VERSION = "v2.2.0 (简化路径配置)"
class IntegratedPipeline:
def __init__(self, root):
self.root = root
self.root.title(f"Thrift 完整流程工具 - {VERSION}")
self.root.geometry("1200x950")
# 配置路径 - 只需要两个基础路径
self.docs_path = "" # Docs 项目路径
self.main_project_path = "" # Unity 主项目路径
# 获取当前工具所在的项目根目录自动检测IntegratedTool -> PythonWorkSpace -> 项目根目录)
if getattr(sys, 'frozen', False):
# exe 模式
self.tool_root = Path(sys.executable).parent.parent.parent
else:
# 开发模式 - 当前文件在 PythonWorkSpace/IntegratedTool/
self.tool_root = Path(__file__).parent.parent.parent
# 获取配置文件路径支持exe模式
if getattr(sys, 'frozen', False):
app_dir = Path(os.path.dirname(sys.executable))
else:
app_dir = Path(__file__).parent
self.config_file = app_dir / "pipeline_config.json"
self.gen_py_dir = app_dir / "gen-py" # 临时Python代码目录
# 报告数据
self.report = {
'start_time': None,
'end_time': None,
'steps': []
}
self.load_config()
self.setup_ui()
def load_config(self):
"""加载配置"""
try:
if self.config_file.exists():
with open(self.config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
self.docs_path = config.get('docs_path', '')
self.main_project_path = config.get('main_project_path', '')
except Exception as e:
print(f"加载配置失败: {e}")
def save_config(self):
"""保存配置"""
try:
config = {
'docs_path': self.docs_path,
'main_project_path': self.main_project_path
}
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"保存配置失败: {e}")
def get_derived_paths(self) -> Dict[str, str]:
"""根据基础路径获取所有派生路径"""
paths = {}
# Docs 相关路径
if self.docs_path:
docs_root = Path(self.docs_path)
paths['cfg_json'] = str(docs_root / "tool" / "cfg" / "cfg_txt.json")
paths['config_dir'] = str(docs_root / "config")
# 工具项目相关路径(相对于项目根目录)
paths['compiler'] = str(self.tool_root / "compiler" / "exe" / "thrift.exe")
paths['thrift_dir'] = str(self.tool_root / "thrift-files" / "meowment")
paths['csharp_output_dir'] = str(self.tool_root / "compiled_output" / "csharp")
paths['bytes_output_dir'] = str(self.tool_root / "binary_output")
# Unity 主项目相关路径
if self.main_project_path:
main_root = Path(self.main_project_path)
paths['unity_csharp_dir'] = str(main_root / "Assets" / "Scripts" / "thrift" / "gen-netstd")
paths['unity_bytes_dir'] = str(main_root / "Assets" / "Resources" / "ConfigData")
paths['dr_output_dir'] = str(main_root / "Assets" / "GameMain" / "Scripts" / "DR_Generated")
return paths
def setup_ui(self):
"""设置UI界面"""
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
main_frame.columnconfigure(1, weight=1)
main_frame.rowconfigure(9, weight=1)
row = 0
# 说明文字
info_label = ttk.Label(main_frame, text="只需配置两个基础路径,其他路径将自动推导",
font=("", 10, "bold"), foreground="darkgreen")
info_label.grid(row=row, column=0, columnspan=3, sticky=tk.W, pady=10)
row += 1
# Docs 项目路径
ttk.Label(main_frame, text="Docs 项目路径:").grid(row=row, column=0, sticky=tk.W, pady=5)
self.docs_path_var = tk.StringVar(value=self.docs_path)
ttk.Entry(main_frame, textvariable=self.docs_path_var, width=85).grid(row=row, column=1, sticky=(tk.W, tk.E), pady=5, padx=5)
ttk.Button(main_frame, text="选择", command=lambda: self.select_directory('docs_path', 'Docs项目路径')).grid(row=row, column=2, pady=5)
row += 1
# Unity 主项目路径
ttk.Label(main_frame, text="Unity 主项目路径:").grid(row=row, column=0, sticky=tk.W, pady=5)
self.main_project_path_var = tk.StringVar(value=self.main_project_path)
ttk.Entry(main_frame, textvariable=self.main_project_path_var, width=85).grid(row=row, column=1, sticky=(tk.W, tk.E), pady=5, padx=5)
ttk.Button(main_frame, text="选择", command=lambda: self.select_directory('main_project_path', 'Unity主项目路径')).grid(row=row, column=2, pady=5)
row += 1
# 分隔线
separator = ttk.Separator(main_frame, orient='horizontal')
separator.grid(row=row, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=15)
row += 1
# 派生路径预览标题
preview_label = ttk.Label(main_frame, text="派生路径预览(自动生成):",
font=("", 9, "bold"), foreground="navy")
preview_label.grid(row=row, column=0, columnspan=3, sticky=tk.W, pady=5)
row += 1
# 派生路径显示区域
self.preview_text = scrolledtext.ScrolledText(main_frame, width=140, height=10,
wrap=tk.WORD, font=("", 8),
state=tk.DISABLED, bg="#f0f0f0")
self.preview_text.grid(row=row, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=5)
row += 1
# 更新预览按钮
ttk.Button(main_frame, text="刷新路径预览", command=self.update_path_preview).grid(row=row, column=0, columnspan=3, pady=10)
row += 1
# 开始按钮
ttk.Button(main_frame, text="开始执行完整流程", command=self.run_pipeline).grid(row=row, column=0, columnspan=3, pady=20, ipadx=80, ipady=15)
row += 1
# 版本号显示
version_label = ttk.Label(main_frame, text=f"当前版本: {VERSION}", font=("", 9), foreground="blue")
version_label.grid(row=row, column=0, columnspan=3, pady=5)
row += 1
# 日志区域
ttk.Label(main_frame, text="执行日志:", font=("", 10, "bold")).grid(row=row, column=0, sticky=tk.W, pady=5)
row += 1
self.log_text = scrolledtext.ScrolledText(main_frame, width=140, height=30, wrap=tk.WORD, font=("Consolas", 9))
self.log_text.grid(row=row, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5)
def select_directory(self, attr_name: str, title: str):
"""选择目录"""
current_val = getattr(self, attr_name)
initialdir = current_val if current_val else None
dir_path = filedialog.askdirectory(title=f"选择{title}", initialdir=initialdir)
if dir_path:
setattr(self, attr_name, dir_path)
getattr(self, attr_name + '_var').set(dir_path)
self.save_config()
self.log(f"[OK] 已选择 {title}: {dir_path}\n")
self.update_path_preview()
def update_path_preview(self):
"""更新路径预览"""
self.preview_text.config(state=tk.NORMAL)
self.preview_text.delete(1.0, tk.END)
paths = self.get_derived_paths()
preview_lines = []
preview_lines.append("[Docs 项目相关路径]")
preview_lines.append(f" * cfg_txt.json: {paths.get('cfg_json', '未配置')}")
preview_lines.append(f" * Config 目录 (xlsx): {paths.get('config_dir', '未配置')}")
preview_lines.append("")
preview_lines.append("[工具项目相关路径 (相对路径)]")
preview_lines.append(f" * Thrift 编译器: {paths.get('compiler', '')}")
preview_lines.append(f" * Thrift 文件目录: {paths.get('thrift_dir', '')}")
preview_lines.append(f" * C# 输出目录: {paths.get('csharp_output_dir', '')}")
preview_lines.append(f" * Bytes 输出目录: {paths.get('bytes_output_dir', '')}")
preview_lines.append("")
preview_lines.append("[Unity 主项目相关路径]")
preview_lines.append(f" * Unity C# 目录: {paths.get('unity_csharp_dir', '未配置')}")
preview_lines.append(f" * Unity Bytes 目录: {paths.get('unity_bytes_dir', '未配置')}")
preview_lines.append(f" * DR 生成目录: {paths.get('dr_output_dir', '未配置')}")
self.preview_text.insert(tk.END, "\n".join(preview_lines))
self.preview_text.config(state=tk.DISABLED)
def log(self, message: str):
"""输出日志"""
try:
self.log_text.insert(tk.END, message)
self.log_text.see(tk.END)
self.root.update()
except Exception as e:
print(f"日志输出失败: {e}")
def clear_log(self):
"""清空日志"""
self.log_text.delete(1.0, tk.END)
def add_step_report(self, step_name: str, status: str, details: Dict = None):
"""添加步骤报告"""
step = {
'name': step_name,
'status': status,
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'details': details or {}
}
self.report['steps'].append(step)
def generate_report(self) -> str:
"""生成最终报告"""
report_lines = []
# =============== 简报部分 ===============
report_lines.append("="*100)
report_lines.append(" Thrift 流程执行简报")
report_lines.append("="*100)
report_lines.append(f"执行时间: {self.report['start_time']} -> {self.report['end_time']}")
report_lines.append("")
# 统计总体状态
total_errors = 0
step_summary = []
for idx, step in enumerate(self.report['steps'], 1):
status_icon = "[OK]" if step['status'] == 'success' else "[FAIL]"
status_text = "成功" if step['status'] == 'success' else "失败"
# 提取关键数字
summary_text = f"{idx}. [{status_icon}] {step['name']}: {status_text}"
if step['details']:
details = step['details']
# 根据不同步骤提取关键信息
if 'success' in details and 'total' in details:
summary_text += f" - {details['success']}/{details['total']} 个文件"
if 'failed' in details and details['failed'] > 0:
summary_text += f" (失败: {details['failed']})"
total_errors += details['failed']
elif 'count' in details:
summary_text += f" - {details['count']} 个文件"
if 'skipped' in details and details['skipped']:
summary_text += f" (跳过: {len(details['skipped'])})"
elif 'action' in details:
# step4的新格式跳过清理
summary_text += f" - {details['action']}"
elif 'deleted' in details:
summary_text += f" - 删除 {details['deleted']} 个文件"
step_summary.append(summary_text)
if step['status'] != 'success':
total_errors += 1
# 显示简报
for line in step_summary:
report_lines.append(line)
report_lines.append("")
if total_errors == 0:
report_lines.append("【总体状态】[OK] 所有步骤成功完成,无错误")
else:
report_lines.append(f"【总体状态】[WARNING] 发现 {total_errors} 个问题,详见下方详细报告")
report_lines.append("")
report_lines.append("="*100)
report_lines.append(" 详细执行报告")
report_lines.append("="*100)
report_lines.append("")
# =============== 详细报告部分 ===============
for idx, step in enumerate(self.report['steps'], 1):
status_icon = "[OK]" if step['status'] == 'success' else "[FAIL]"
report_lines.append(f"【步骤 {idx}{step['name']}")
report_lines.append(f"状态: [{status_icon}] {'成功' if step['status'] == 'success' else '失败'}")
report_lines.append(f"耗时: {step['time']}")
report_lines.append("-"*80)
if step['details']:
details = step['details']
# 显示基本统计
if 'success' in details and 'total' in details:
report_lines.append(f"处理文件: {details['success']}/{details['total']}")
if 'failed' in details:
report_lines.append(f"失败文件: {details['failed']}")
# 显示AllConfigs状态
if 'allconfigs' in details:
allconfigs_status = "[OK] 已生成" if details['allconfigs'] else "[FAIL] 未生成"
report_lines.append(f"AllConfigs.bytes: {allconfigs_status}")
elif 'count' in details:
report_lines.append(f"处理文件: {details['count']}")
elif 'deleted' in details:
report_lines.append(f"删除文件: {details['deleted']}")
report_lines.append("")
# 显示详细列表
for key, value in details.items():
if isinstance(value, list):
if len(value) > 0:
# 特殊处理 failed_files - 显示完整错误信息
if key == 'failed_files' and value and isinstance(value[0], dict):
report_lines.append(f"[FAIL] 失败详情 ({len(value)} 项):")
for item in value:
report_lines.append(f" - {item['name']}")
report_lines.append(f" 错误: {item['error']}")
if 'detail' in item:
# 显示堆栈前10行
detail_lines = item['detail'].split('\n')[:10]
for line in detail_lines:
if line.strip():
report_lines.append(f" {line}")
report_lines.append("")
# 成功的文件列表
elif key in ['success_files', 'files']:
report_lines.append(f"[OK] 成功文件 ({len(value)} 项):")
for item in value:
report_lines.append(f" - {item}")
# 跳过的文件列表
elif key == 'skipped':
if value:
report_lines.append(f"[SKIP] 跳过文件 ({len(value)} 项):")
for item in value:
report_lines.append(f" - {item}")
# 其他列表
elif key not in ['success', 'failed', 'total', 'count', 'deleted', 'expected']:
report_lines.append(f"{key} ({len(value)} 项):")
for item in value:
report_lines.append(f" - {item}")
# 非列表值(跳过已显示的统计值)
elif key not in ['success', 'failed', 'total', 'count', 'deleted', 'expected', 'error']:
report_lines.append(f"{key}: {value}")
# 显示错误信息
if 'error' in details:
report_lines.append(f"[ERROR] 错误信息: {details['error']}")
report_lines.append("")
report_lines.append("")
# =============== 路径信息 ===============
report_lines.append("="*100)
report_lines.append(" 配置路径信息")
report_lines.append("="*100)
paths = self.get_derived_paths()
report_lines.append("[基础路径]")
report_lines.append(f" - Docs 项目: {self.docs_path}")
report_lines.append(f" - Unity 主项目: {self.main_project_path}")
report_lines.append("")
report_lines.append("[Docs 派生路径]")
report_lines.append(f" - cfg_txt.json: {paths.get('cfg_json', '')}")
report_lines.append(f" - Config 目录: {paths.get('config_dir', '')}")
report_lines.append("")
report_lines.append("[工具项目派生路径 (相对路径)]")
report_lines.append(f" - Thrift 编译器: {paths.get('compiler', '')}")
report_lines.append(f" - Thrift 文件目录: {paths.get('thrift_dir', '')}")
report_lines.append(f" - C# 输出目录: {paths.get('csharp_output_dir', '')}")
report_lines.append(f" - Bytes 输出目录: {paths.get('bytes_output_dir', '')}")
report_lines.append("")
report_lines.append("[Unity 派生路径]")
report_lines.append(f" - Unity C# 目录: {paths.get('unity_csharp_dir', '')}")
report_lines.append(f" - Unity Bytes 目录: {paths.get('unity_bytes_dir', '')}")
report_lines.append(f" - DR 生成目录: {paths.get('dr_output_dir', '')}")
report_lines.append("="*100)
return "\n".join(report_lines)
def validate_paths(self) -> bool:
"""验证所有路径"""
errors = []
# 验证基础路径
if not self.docs_path:
errors.append("[X] 请选择 Docs 项目路径")
if not self.main_project_path:
errors.append("[X] 请选择 Unity 主项目路径")
# 获取派生路径并验证
paths = self.get_derived_paths()
if self.docs_path:
if 'cfg_json' in paths and not os.path.exists(paths['cfg_json']):
errors.append(f"[X] cfg_txt.json 文件不存在: {paths['cfg_json']}")
if 'config_dir' in paths and not os.path.exists(paths['config_dir']):
errors.append(f"[X] Config 目录不存在: {paths['config_dir']}")
# 验证工具项目相关路径
if not os.path.exists(paths.get('compiler', '')):
errors.append(f"[X] Thrift 编译器不存在: {paths.get('compiler', '')}")
if not os.path.exists(paths.get('thrift_dir', '')):
errors.append(f"[X] Thrift 文件目录不存在: {paths.get('thrift_dir', '')}")
if errors:
error_msg = "\n".join(errors)
messagebox.showerror("配置错误", f"请检查以下配置:\n\n{error_msg}")
return False
return True
def run_pipeline(self):
"""执行完整流程"""
if not self.validate_paths():
return
# 获取所有派生路径
paths = self.get_derived_paths()
self.cfg_json_path = paths.get('cfg_json', '')
self.compiler_path = paths.get('compiler', '')
self.thrift_dir = paths.get('thrift_dir', '')
self.csharp_output_dir = paths.get('csharp_output_dir', '')
self.bytes_output_dir = paths.get('bytes_output_dir', '')
self.config_dir = paths.get('config_dir', '')
self.unity_csharp_dir = paths.get('unity_csharp_dir', '')
self.unity_bytes_dir = paths.get('unity_bytes_dir', '')
self.dr_output_dir = paths.get('dr_output_dir', '')
self.clear_log()
self.report = {'start_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'end_time': None, 'steps': []}
self.log("="*100 + "\n")
self.log(f"开始执行完整流程 ({VERSION})\n")
self.log("="*100 + "\n\n")
try:
# 步骤1: 生成 Thrift 文件
if not self.step1_generate_thrift():
return
# 步骤2: 编译 Thrift 到 C#
if not self.step2_compile_to_csharp():
return
# 步骤3: 生成 Bytes 文件
if not self.step3_generate_bytes():
return
# 步骤4: 清理冲突的 Extensions 文件
if not self.step4_cleanup_extensions():
return
# 步骤5: 复制 C# 到 Unity
if not self.step5_copy_csharp():
return
# 步骤6: 复制 Bytes 到 Unity原步骤6
if not self.step6_copy_bytes():
return
# 步骤7: 清理临时文件原步骤7
self.step7_cleanup_temp()
# 步骤8: 生成 DR 文件
if not self.step8_generate_dr():
return
self.report['end_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 生成报告
report_text = self.generate_report()
self.log("\n" + report_text)
# 保存报告
report_path = Path(__file__).parent / f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(report_path, 'w', encoding='utf-8') as f:
f.write(report_text)
self.log(f"\n报告已保存: {report_path}\n")
# 原始:根据成功率显示不同类型的消息
total_steps = len(self.report['steps'])
failed_steps = sum(1 for step in self.report['steps'] if step['status'] != 'success')
if failed_steps == 0:
messagebox.showinfo("完成", f"所有步骤执行成功!\n\n详细报告:\n{report_path}")
elif failed_steps < total_steps:
messagebox.showwarning("部分完成", f"部分步骤执行成功\n成功: {total_steps - failed_steps}/{total_steps}\n\n详细报告请查看日志。")
else:
messagebox.showerror("失败", f"所有步骤都失败了\n\n详细报告请查看日志。")
except Exception as e:
self.log(f"\n[FAIL] 执行失败: {str(e)}\n")
self.log(traceback.format_exc())
messagebox.showerror("错误", f"执行失败:\n{str(e)}")
def step1_generate_thrift(self) -> bool:
"""步骤1: 根据 xlsx 和 cfg_txt.json 生成 thrift 文件"""
self.log("\n【步骤1】生成 Thrift 文件\n")
self.log("-"*80 + "\n")
try:
# 读取 cfg_txt.json
with open(self.cfg_json_path, 'r', encoding='utf-8') as f:
cfg = json.load(f)
file_list = cfg.get('file_list', [])
# 创建 thrift 目录
os.makedirs(self.thrift_dir, exist_ok=True)
generated_count = 0
struct_names = [] # 收集所有结构名,用于生成 AllConfigs
for config_item in file_list:
in_file = config_item.get('in_file', '')
out_file = config_item.get('out_file', '')
sheet_name = config_item.get('sheet_name', '')
coloum_types = config_item.get('coloum_type', []) # 从配置读取类型
if not all([in_file, out_file]):
continue
struct_name = out_file.replace('.txt', '')
excel_path = os.path.join(self.config_dir, in_file)
if not os.path.exists(excel_path):
self.log(f" [WARN] Excel 不存在: {in_file}\n")
continue
# 读取 Excel 获取字段和注释
try:
wb = openpyxl.load_workbook(excel_path, data_only=True)
# 查找 sheet
target_sheet = sheet_name if sheet_name in wb.sheetnames else wb.sheetnames[0]
ws = wb[target_sheet]
# 读取表头(第一行)- 根据coloum_types数量限制
first_row = list(ws[1])
headers = []
header_indices = [] # 记录有表头的列索引
# 只读取coloum_types定义的数量
max_columns = len(coloum_types) if coloum_types else len(first_row)
for idx, cell in enumerate(first_row):
if cell.value is not None and str(cell.value).strip():
headers.append(str(cell.value).strip())
header_indices.append(idx)
# 达到coloum_types定义的数量后停止
if len(headers) >= max_columns:
break
# 读取注释(第二行),只读取有表头的列
comments = []
if ws.max_row >= 2:
second_row = list(ws[2])
for idx in header_indices:
if idx < len(second_row) and second_row[idx].value is not None:
comments.append(str(second_row[idx].value).strip())
else:
comments.append("")
else:
comments = [""] * len(headers)
# 确保类型数量与表头匹配
if len(coloum_types) < len(headers):
# 类型不够,补充为 string
coloum_types = coloum_types + ['string'] * (len(headers) - len(coloum_types))
elif len(coloum_types) > len(headers):
# 类型多了,截断
coloum_types = coloum_types[:len(headers)]
wb.close()
# 生成 thrift 文件
thrift_content = self.generate_thrift_content(struct_name, headers, coloum_types, comments)
thrift_path = os.path.join(self.thrift_dir, f"{struct_name}.thrift")
with open(thrift_path, 'w', encoding='utf-8') as f:
f.write(thrift_content)
generated_count += 1
struct_names.append(struct_name)
self.log(f" [OK] 生成: {struct_name}.thrift\n")
except Exception as e:
self.log(f" [FAIL] 失败 {struct_name}: {str(e)}\n")
# 生成 AllConfigs.thrift合并配置文件
try:
self.log(f"\n 生成 AllConfigs.thrift...\n")
allconfigs_content = self.generate_allconfigs_content(struct_names)
allconfigs_path = os.path.join(self.thrift_dir, "AllConfigs.thrift")
with open(allconfigs_path, 'w', encoding='utf-8') as f:
f.write(allconfigs_content)
generated_count += 1
self.log(f" [OK] AllConfigs.thrift 生成成功\n")
except Exception as e:
self.log(f" [FAIL] AllConfigs.thrift 生成失败: {str(e)}\n")
# 保存配置文件列表供后续步骤使用
self.config_struct_names = struct_names
self.log(f"\n生成完成: {generated_count} 个 thrift 文件\n")
self.add_step_report("生成Thrift文件", "success", {
"total": generated_count,
"files": struct_names # 完整列表
})
return True
except Exception as e:
self.log(f"[FAIL] 步骤1失败: {str(e)}\n")
self.add_step_report("生成Thrift文件", "failed", {"error": str(e)})
return False
def generate_thrift_content(self, struct_name: str, headers: List[str], types: List[str], comments: List[str]) -> str:
"""生成 thrift 文件内容,支持注释
Args:
struct_name: 结构体名称
headers: 字段名列表
types: 类型列表从cfg_txt.json的coloum_type读取
comments: 注释列表从Excel第2行读取
"""
lines = []
lines.append("namespace netstd Byway.Thrift.Data\n")
lines.append(f"\nstruct {struct_name}Item\n{{")
# 类型映射 - 支持多种格式(新版,更完整)
type_mapping = {
'int': 'i32',
'int32': 'i32',
'i32': 'i32',
'integer': 'i32',
'long': 'i64',
'int64': 'i64',
'i64': 'i64',
'float': 'double',
'double': 'double',
'string': 'string',
'str': 'string',
'text': 'string',
'bool': 'bool',
'boolean': 'bool',
'i8': 'i8',
'i16': 'i16'
}
for idx, header in enumerate(headers):
field_type = 'string' # 默认类型
if idx < len(types) and types[idx]:
type_str = str(types[idx]).strip().lower()
field_type = type_mapping.get(type_str, 'string')
# 调试信息:记录未知类型
if type_str and type_str not in type_mapping:
self.log(f" [WARN] 未知类型 '{types[idx]}' (字段: {header}),使用默认类型 string\n")
# 获取注释
comment = comments[idx] if idx < len(comments) and comments[idx] else ""
# 生成字段,带注释
if comment:
lines.append(f"\t{idx+1}:{field_type} {header}, // {comment}")
else:
lines.append(f"\t{idx+1}:{field_type} {header},")
lines.append("}\n")
lines.append(f"\nstruct {struct_name}\n{{")
lines.append(f"\t1:map<i32,{struct_name}Item> {struct_name.lower()}s,")
lines.append("}\n")
return "\n".join(lines)
def generate_allconfigs_content(self, struct_names: List[str]) -> str:
"""生成 AllConfigs.thrift 文件内容"""
lines = []
lines.append("namespace netstd Byway.Thrift.Data")
# 添加所有 include
for struct_name in struct_names:
lines.append(f'include "{struct_name}.thrift"')
lines.append("")
lines.append("struct AllConfigs")
lines.append("{")
# 添加所有字段PascalCase命名与类型名相同C#属性名也会是PascalCase
for idx, struct_name in enumerate(struct_names, 1):
# 直接使用struct_name作为字段名PascalCase
lines.append(f"\t{idx}:{struct_name}.{struct_name} {struct_name},")
lines.append("}")
return "\n".join(lines)
def step2_compile_to_csharp(self) -> bool:
"""步骤2: 用 thrift.exe 编译成 C#"""
self.log("\n【步骤2】编译 Thrift 到 C#\n")
self.log("-"*80 + "\n")
try:
os.makedirs(self.csharp_output_dir, exist_ok=True)
thrift_files = [f for f in os.listdir(self.thrift_dir) if f.endswith('.thrift')]
success_count = 0
failed_list = []
self.log(f" 找到 {len(thrift_files)} 个 thrift 文件\n")
self.log(f" 输出目录: {self.csharp_output_dir}\n\n")
for idx, thrift_file in enumerate(thrift_files, 1):
thrift_path = os.path.join(self.thrift_dir, thrift_file)
cmd = [
self.compiler_path,
'-strict', '-v', '-r',
'--gen', 'netstd:unity,serial,async_postfix',
'-out', self.csharp_output_dir,
thrift_path
]
# 原始:添加超时设置
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=30)
if result.returncode == 0:
success_count += 1
# 原始:改进进度显示
if idx % 10 == 0 or idx == 1 or idx == len(thrift_files):
self.log(f" [OK] [{idx}/{len(thrift_files)}] {thrift_file}\n")
self.root.update() # 原始刷新UI
else:
self.log(f" [FAIL] 编译失败: {thrift_file}\n")
error_msg = result.stderr if result.stderr else result.stdout
if error_msg:
self.log(f" 错误详情:\n{error_msg}\n")
# 保存详细错误信息到报告
failed_list.append({
"name": thrift_file,
"error": error_msg[:200] if error_msg else "未知错误"
})
if success_count > 5:
self.log(f" ... 共编译 {success_count} 个文件\n")
# 检查生成的C#文件
csharp_actual_dir = os.path.join(self.csharp_output_dir, 'Byway', 'Thrift', 'Data')
cs_files = []
if os.path.exists(csharp_actual_dir):
cs_files = [f for f in os.listdir(csharp_actual_dir) if f.endswith('.cs')]
self.log(f"\n C# 文件生成在: {csharp_actual_dir}\n")
self.log(f" 生成了 {len(cs_files)} 个 C# 文件\n")
# 立即删除 AllConfigs.Extensions.cs避免后续被复制
allconfigs_ext = os.path.join(csharp_actual_dir, 'AllConfigs.Extensions.cs')
if os.path.exists(allconfigs_ext):
os.remove(allconfigs_ext)
self.log(f" [OK] 删除 AllConfigs.Extensions.cs避免二义性冲突\n")
# 更新文件计数
cs_files = [f for f in os.listdir(csharp_actual_dir) if f.endswith('.cs')]
self.log(f" 最终保留 {len(cs_files)} 个 C# 文件\n")
self.log(f"\n编译完成: {success_count}/{len(thrift_files)}\n")
self.add_step_report("编译到C#", "success", {
"success": success_count,
"failed": len(failed_list),
"total": len(thrift_files),
"cs_files_count": len(cs_files),
"failed_files": failed_list
})
return True
except Exception as e:
self.log(f"[FAIL] 步骤2失败: {str(e)}\n")
self.add_step_report("编译到C#", "failed", {"error": str(e)})
return False
def step3_generate_bytes(self) -> bool:
"""步骤3: 生成 bytes 文件"""
self.log("\n【步骤3】生成 Bytes 文件\n")
self.log("-"*80 + "\n")
try:
# 先生成 gen-py
os.makedirs(self.gen_py_dir, exist_ok=True)
thrift_files = [f for f in os.listdir(self.thrift_dir) if f.endswith('.thrift')]
# 编译为 Python
for thrift_file in thrift_files:
thrift_path = os.path.join(self.thrift_dir, thrift_file)
cmd = [self.compiler_path, '--gen', 'py', '-out', str(self.gen_py_dir), thrift_path]
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.log(f" [OK] 生成 gen-py 完成\n")
# 导入 thrift 库并生成 bytes
from thrift.protocol import TBinaryProtocol
from thrift.transport import TTransport
sys.path.insert(0, str(self.gen_py_dir))
# 读取配置
with open(self.cfg_json_path, 'r', encoding='utf-8') as f:
cfg = json.load(f)
os.makedirs(self.bytes_output_dir, exist_ok=True)
success_count = 0
failed_list = []
success_list = []
config_containers = {} # 保存每个配置的 container 对象 {struct_name: container}
file_list = cfg.get('file_list', [])
self.log(f" 找到 {len(file_list)} 个配置项\n\n")
# 注意:现在只生成 AllConfigs.bytes不生成单个配置的 bytes 文件
self.log(f" [INFO] 新策略:只生成 AllConfigs.bytes合并文件跳过单个 bytes 文件生成\n\n")
# 收集所有成功处理的配置对象,用于生成 AllConfigs
for idx, config_item in enumerate(file_list, 1):
in_file = config_item.get('in_file', '')
out_file = config_item.get('out_file', '')
sheet_name = config_item.get('sheet_name', '')
coloum_types = config_item.get('coloum_type', []) # 从配置读取类型
# 检查配置完整性(原始:检查所有必需项)
if not all([in_file, out_file, sheet_name, coloum_types]):
self.log(f" [{idx}/{len(file_list)}] [WARN] 配置项不完整,跳过: {in_file}\n\n")
continue
struct_name = out_file.replace('.txt', '')
excel_path = os.path.join(self.config_dir, in_file)
# 原始显示处理进度只验证配置不生成单个bytes
self.log(f" [{idx}/{len(file_list)}] 验证: {in_file} -> {struct_name}\n")
try:
# 动态导入模块(只验证结构体存在)
module_name = f"{struct_name}.ttypes"
ttypes_module = importlib.import_module(module_name)
item_class = getattr(ttypes_module, f"{struct_name}Item")
container_class = getattr(ttypes_module, struct_name)
# 读取 Excel 数据
wb = openpyxl.load_workbook(excel_path, data_only=True)
target_sheet = sheet_name if sheet_name in wb.sheetnames else wb.sheetnames[0]
ws = wb[target_sheet]
# 读取表头(第一行),记录有表头的列索引(原始:使用字典)- 根据coloum_types数量限制
first_row = list(ws[1])
headers = []
header_indices = []
excel_headers = {} # {header_name: column_index}
# 只读取coloum_types定义的数量
max_columns = len(coloum_types) if coloum_types else len(first_row)
for idx, cell in enumerate(first_row):
if cell.value is not None and str(cell.value).strip():
header_name = str(cell.value).strip()
headers.append(header_name)
header_indices.append(idx)
excel_headers[header_name] = idx
# 达到coloum_types定义的数量后停止
if len(headers) >= max_columns:
break
# 验证字段匹配(原始:添加缺少字段警告)
missing_fields = []
for header in headers:
# 这里简单检查如果需要可以和thrift定义对比
pass
# 读取数据从第3行开始使用字典存储原始更安全
data_rows = []
for row in ws.iter_rows(min_row=3, values_only=False):
row_data = {}
has_data = False
for header in headers:
col_idx = excel_headers[header]
if col_idx < len(row):
cell_value = row[col_idx].value
if cell_value is not None:
has_data = True
row_data[header] = cell_value
else:
row_data[header] = None
if has_data:
data_rows.append(row_data)
wb.close()
# 创建容器
container = container_class()
container_items = {}
for idx, row_data in enumerate(data_rows):
if not any(row_data.values()):
continue
item = item_class()
# 获取ID原始灵活查找多个可能的ID列名
id_value = None
for id_field in ['id', 'Id', 'ID', 'Key']:
if id_field in row_data and row_data[id_field] is not None:
id_value = row_data[id_field]
break
# 如果找不到ID使用当前方式兜底
if id_value is None:
# 尝试使用第一个字段
if headers and headers[0] in row_data:
id_value = row_data[headers[0]]
if id_value is None:
id_value = idx + 1
try:
id_value = int(id_value)
except (ValueError, TypeError):
id_value = idx + 1
# 填充每个字段
for col_idx, header in enumerate(headers):
value = row_data.get(header)
if value is not None:
# 类型转换使用cfg_txt.json的coloum_type原始详细错误处理
if col_idx < len(coloum_types) and coloum_types[col_idx]:
type_str = str(coloum_types[col_idx]).lower().strip()
try:
if type_str in ['int', 'i32', 'i16', 'i8', 'int32', 'integer']:
value = int(value)
elif type_str in ['long', 'i64', 'int64']:
value = int(value)
elif type_str in ['float', 'double']:
value = float(value)
elif type_str in ['bool', 'boolean']:
value = str(value).upper() in ["TRUE", "1", "YES"]
else:
value = str(value)
# 设置字段原始setattr在try内
setattr(item, header, value)
except (ValueError, TypeError) as e:
# 原始:详细错误信息
raise ValueError(f"字段 '{header}' (类型: {type_str}) 转换失败,值: '{value}' (行ID: {id_value})")
except AttributeError as e:
raise AttributeError(f"字段 '{header}' 不存在于结构体中 (行ID: {id_value})")
else:
# 没有类型定义,转为字符串
try:
setattr(item, header, str(value))
except AttributeError as e:
raise AttributeError(f"字段 '{header}' 不存在于结构体中 (行ID: {id_value})")
container_items[id_value] = item
setattr(container, f"{struct_name.lower()}s", container_items)
# 不再生成单个 bytes 文件,但保存 container 对象用于生成 AllConfigs
self.log(f" [OK] {struct_name} 验证通过(数据行数: {len(data_rows)}\n")
success_count += 1
success_list.append(struct_name)
config_containers[struct_name] = container # 保存 container 对象
# 原始定期刷新UI
if idx % 10 == 0 or idx == 1 or idx == len(file_list):
self.root.update()
self.log("\n")
except Exception as e:
error_detail = traceback.format_exc()
self.log(f" [FAIL] {struct_name}: {str(e)}\n")
self.log(f" 详细错误:\n{error_detail}\n")
failed_list.append({
"name": struct_name,
"error": str(e),
"detail": error_detail
})
if failed_list:
self.log(f"\n失败: {len(failed_list)} 个配置项\n")
for failed in failed_list:
self.log(f" - {failed['name']}: {failed['error'][:100]}\n")
# 生成 AllConfigs.bytes唯一的 bytes 文件)
self.log(f"\n生成 AllConfigs.bytes合并所有配置...\n")
allconfigs_generated = False
if self.generate_all_configs_bytes(config_containers):
self.log(f" [OK] AllConfigs.bytes 生成成功\n")
allconfigs_generated = True
else:
self.log(f" [FAIL] AllConfigs.bytes 生成失败\n")
# 只保存 AllConfigs 到列表供步骤6使用
self.generated_bytes_list = ['AllConfigs'] if allconfigs_generated else []
# 最终统计
if allconfigs_generated:
self.log(f"\n[OK] 生成完成: AllConfigs.bytes包含 {success_count} 个配置)\n")
else:
self.log(f"\n[FAIL] 生成失败\n")
self.add_step_report("生成Bytes文件", "success", {
"success": success_count,
"failed": len(failed_list),
"total": len(file_list) + (1 if allconfigs_generated else 0),
"success_files": success_list,
"failed_files": failed_list, # 完整错误信息
"allconfigs": allconfigs_generated
})
return True
except Exception as e:
self.log(f"[FAIL] 步骤3失败: {str(e)}\n")
self.log(traceback.format_exc())
self.add_step_report("生成Bytes文件", "failed", {"error": str(e)})
return False
def step4_cleanup_extensions(self) -> bool:
"""步骤4: 清理冲突的 Extensions 文件(双重检查)"""
self.log("\n【步骤4】清理冲突的 Extensions 文件(双重检查)\n")
self.log("-"*80 + "\n")
try:
# C# 文件实际在 Byway\Thrift\Data 子目录中
csharp_actual_dir = os.path.join(self.csharp_output_dir, 'Byway', 'Thrift', 'Data')
deleted_count = 0
deleted_files = []
if os.path.exists(csharp_actual_dir):
# 再次检查是否有 AllConfigs.Extensions.cs双重保险
allconfigs_ext = os.path.join(csharp_actual_dir, 'AllConfigs.Extensions.cs')
if os.path.exists(allconfigs_ext):
os.remove(allconfigs_ext)
deleted_count += 1
deleted_files.append('AllConfigs.Extensions.cs')
self.log(f" [OK] 删除残留的 AllConfigs.Extensions.cs\n")
else:
self.log(f" [OK] 确认 AllConfigs.Extensions.cs 已不存在\n")
# 统计保留的 Extensions 文件
extension_files = [f for f in os.listdir(csharp_actual_dir) if f.endswith('.Extensions.cs')]
self.log(f" 保留 {len(extension_files)} 个配置的 Extensions 文件\n")
self.log(f"\n清理完成: {deleted_count} 个文件被删除\n")
self.add_step_report("清理Extensions", "success", {
"deleted": deleted_count,
"deleted_files": deleted_files
})
return True
except Exception as e:
self.log(f"[FAIL] 步骤4失败: {str(e)}\n")
self.add_step_report("清理Extensions", "failed", {"error": str(e)})
return False
def step5_copy_csharp(self) -> bool:
"""步骤5: 复制 C# 文件到 Unity"""
self.log("\n【步骤5】复制 C# 文件到 Unity\n")
self.log("-"*80 + "\n")
try:
# 在Unity目录下也创建 Byway\Thrift\Data 子目录结构
unity_target_dir = os.path.join(self.unity_csharp_dir, 'Byway', 'Thrift', 'Data')
os.makedirs(unity_target_dir, exist_ok=True)
# 先清理Unity目录中的AllConfigs.Extensions.cs如果存在旧文件
unity_allconfigs_ext = os.path.join(unity_target_dir, 'AllConfigs.Extensions.cs')
if os.path.exists(unity_allconfigs_ext):
os.remove(unity_allconfigs_ext)
self.log(f" [OK] 清理Unity目录中的旧 AllConfigs.Extensions.cs\n")
copied_count = 0
copied_files = []
# C# 文件在 Byway\Thrift\Data 子目录中
csharp_actual_dir = os.path.join(self.csharp_output_dir, 'Byway', 'Thrift', 'Data')
if os.path.exists(csharp_actual_dir):
self.log(f" 源目录: {csharp_actual_dir}\n")
self.log(f" 目标目录: {unity_target_dir}\n\n")
# 复制所有C#文件AllConfigsExtensions.cs已在步骤4删除
for file in os.listdir(csharp_actual_dir):
if file.endswith('.cs'):
src = os.path.join(csharp_actual_dir, file)
dst = os.path.join(unity_target_dir, file)
shutil.copy2(src, dst)
copied_files.append(file)
copied_count += 1
# 标记文件类型
if file == 'AllConfigs.cs':
self.log(f" [OK] {file} [合并配置类]\n")
elif file.endswith('Item.cs'):
self.log(f" [OK] {file} [Item文件]\n")
elif file.endswith('.Extensions.cs'):
self.log(f" [OK] {file} [Extensions文件]\n")
else:
self.log(f" [OK] {file}\n")
else:
self.log(f" [FAIL] 未找到C#文件: {csharp_actual_dir}\n")
self.log(f"\n复制完成: {copied_count} 个 C# 文件\n")
self.add_step_report("复制C#到Unity", "success", {
"count": copied_count,
"files": copied_files # 完整列表
})
return True
except Exception as e:
self.log(f"[FAIL] 步骤5失败: {str(e)}\n")
self.add_step_report("复制C#到Unity", "failed", {"error": str(e)})
return False
def step6_copy_bytes(self) -> bool:
"""步骤6: 复制 Bytes 文件到 Unity"""
self.log("\n【步骤6】复制 Bytes 文件到 Unity\n")
self.log("-"*80 + "\n")
try:
os.makedirs(self.unity_bytes_dir, exist_ok=True)
copied_count = 0
copied_files = []
skipped_files = []
self.log(f" 源目录: {self.bytes_output_dir}\n")
self.log(f" 目标目录: {self.unity_bytes_dir}\n")
self.log(f" [INFO] 只复制 AllConfigs.bytes\n\n")
# 只复制 AllConfigs.bytes
allconfigs_file = 'AllConfigs.bytes'
allconfigs_src = os.path.join(self.bytes_output_dir, allconfigs_file)
if os.path.exists(allconfigs_src):
allconfigs_dst = os.path.join(self.unity_bytes_dir, allconfigs_file)
shutil.copy2(allconfigs_src, allconfigs_dst)
copied_files.append(allconfigs_file)
copied_count = 1
self.log(f" [OK] {allconfigs_file} [唯一的配置文件]\n")
else:
self.log(f" [FAIL] 未找到 {allconfigs_file}\n")
# 列出所有其他 bytes 文件(已废弃)
other_bytes = [f for f in os.listdir(self.bytes_output_dir)
if f.endswith('.bytes') and f != allconfigs_file]
if other_bytes:
self.log(f"\n [INFO] 发现 {len(other_bytes)} 个其他 bytes 文件(已废弃,不复制):\n")
for f in other_bytes:
self.log(f" - {f}\n")
self.log(f"\n复制完成: {copied_count} 个 bytes 文件\n")
self.add_step_report("复制Bytes到Unity", "success", {
"count": copied_count,
"expected": len(self.generated_bytes_list),
"files": copied_files, # 完整列表
"skipped": skipped_files
})
return True
except Exception as e:
self.log(f"[FAIL] 步骤6失败: {str(e)}\n")
self.add_step_report("复制Bytes到Unity", "failed", {"error": str(e)})
return False
def step7_cleanup_temp(self):
"""步骤7: 清理临时文件 (gen-py)"""
self.log("\n【步骤7】清理临时文件\n")
self.log("-"*80 + "\n")
try:
if self.gen_py_dir.exists():
shutil.rmtree(self.gen_py_dir)
self.log(" [OK] 清理 gen-py 目录\n")
self.add_step_report("清理临时文件", "success", {})
except Exception as e:
self.log(f" [WARN] 清理失败: {str(e)}\n")
self.add_step_report("清理临时文件", "failed", {"error": str(e)})
def step8_generate_dr(self) -> bool:
"""步骤8: 生成 DataRow 文件"""
self.log("\n【步骤8】生成 DataRow 文件\n")
self.log("-"*80 + "\n")
try:
# 确保输出目录存在
os.makedirs(self.dr_output_dir, exist_ok=True)
# C# 文件目录
csharp_actual_dir = os.path.join(self.csharp_output_dir, 'Byway', 'Thrift', 'Data')
if not os.path.exists(csharp_actual_dir):
self.log(f" [FAIL] C# 文件目录不存在: {csharp_actual_dir}\n")
self.add_step_report("生成DataRow文件", "failed", {"error": "C# 文件目录不存在"})
return False
# 获取所有 C# 配置文件
cs_files = [f for f in os.listdir(csharp_actual_dir) if f.endswith('.cs')]
success_count = 0
failed_list = []
success_list = []
self.log(f" 找到 {len(cs_files)} 个 C# 文件\n\n")
for idx, cs_file in enumerate(cs_files, 1):
file_path = os.path.join(csharp_actual_dir, cs_file)
try:
# 解析配置类
config_info = self.parse_config_class(file_path)
if not config_info:
continue
self.log(f" [{idx}/{len(cs_files)}] {config_info['class_name']}\n")
# 解析 Item 类属性
item_file_path = os.path.join(csharp_actual_dir, f"{config_info['item_type']}.cs")
properties = self.parse_item_class(item_file_path)
if not properties:
self.log(f" [WARN] 没有找到属性\n")
continue
# 生成 DR 代码
dr_code = self.generate_dr_code(config_info, properties)
# 写入文件
output_file = os.path.join(self.dr_output_dir, f"DR{config_info['class_name']}.cs")
with open(output_file, 'w', encoding='utf-8') as f:
f.write(dr_code)
success_count += 1
success_list.append(f"DR{config_info['class_name']}.cs")
if idx % 10 == 0 or idx == 1 or idx == len(cs_files):
self.root.update()
except Exception as e:
self.log(f" [FAIL] 生成失败: {str(e)}\n")
failed_list.append({"name": cs_file, "error": str(e)})
self.log(f"\n生成完成: {success_count} 个 DR 文件\n")
self.log(f"输出目录: {self.dr_output_dir}\n")
self.add_step_report("生成DataRow文件", "success", {
"success": success_count,
"failed": len(failed_list),
"total": len(cs_files),
"success_files": success_list,
"failed_files": failed_list
})
return True
except Exception as e:
self.log(f"[FAIL] 步骤8失败: {str(e)}\n")
self.log(traceback.format_exc())
self.add_step_report("生成DataRow文件", "failed", {"error": str(e)})
return False
def parse_config_class(self, file_path: str) -> Dict[str, str]:
"""解析配置类文件,提取类名和 Dictionary 属性"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 提取类名public partial class XXX : TBase
class_match = re.search(r'public\s+partial\s+class\s+(\w+)\s*:\s*TBase', content)
if not class_match:
return None
class_name = class_match.group(1)
# 提取 Dictionary 属性public Dictionary<int, XXXItem> PropertyName
dict_match = re.search(
r'public\s+Dictionary<int,\s*global::[\w.]+\.(\w+)>\s+(\w+)',
content
)
if not dict_match:
return None
item_type = dict_match.group(1)
dict_property = dict_match.group(2)
return {
'class_name': class_name,
'item_type': item_type,
'dict_property': dict_property
}
except Exception as e:
return None
def parse_item_class(self, file_path: str) -> List[Dict[str, str]]:
"""解析 Item 类,提取所有公共属性"""
try:
if not os.path.exists(file_path):
return []
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 查找所有公共属性public TYPE PropertyName
properties = []
for match in re.finditer(r'public\s+([\w<>,\.\s]+)\s+(\w+)\s*\{', content):
prop_type = match.group(1).strip()
prop_name = match.group(2)
# 跳过特殊属性
if prop_name in ['__isset', 'Isset']:
continue
# 映射 C# 类型
cs_type, default_val = self.get_type_info(prop_type)
properties.append({
'name': prop_name,
'type': cs_type,
'default': default_val,
'original_type': prop_type # 保存原始类型用于判断
})
return properties
except Exception as e:
return []
def get_type_info(self, cs_type: str) -> Tuple[str, str]:
"""获取类型信息和默认值"""
type_map = {
'int': ('int', '0'),
'long': ('long', '0'),
'double': ('double', '0.0'),
'float': ('float', '0f'),
'bool': ('bool', 'false'),
'string': ('string', '""'),
}
# 处理泛型类型
if 'Dictionary' in cs_type or 'List' in cs_type:
return ('object', 'null')
# 处理基础类型
for key, (mapped_type, default_val) in type_map.items():
if key in cs_type.lower():
return (mapped_type, default_val)
# 默认为 string
return ('string', '""')
def generate_all_configs_bytes(self, config_data_map: Dict[str, Any]) -> bool:
"""
生成 AllConfigs.bytes合并所有配置
参数 config_data_map: {struct_name: container_object} 的映射
"""
try:
from thrift.protocol import TBinaryProtocol
from thrift.transport import TTransport
# 动态导入 AllConfigs 模块
allconfigs_module = importlib.import_module("AllConfigs.ttypes")
AllConfigsClass = getattr(allconfigs_module, "AllConfigs")
# 创建 AllConfigs 实例
all_configs = AllConfigsClass()
# 直接使用传入的配置对象
for struct_name, container in config_data_map.items():
try:
# 设置到AllConfigs字段名为PascalCase与类型名相同
setattr(all_configs, struct_name, container)
self.log(f" [OK] 添加 {struct_name}\n")
except Exception as e:
self.log(f" [FAIL] {struct_name} 添加失败: {str(e)}\n")
continue
# 序列化 AllConfigs
transport = TTransport.TMemoryBuffer()
protocol = TBinaryProtocol.TBinaryProtocol(transport)
all_configs.write(protocol)
binary_data = transport.getvalue()
# 写入 AllConfigs.bytes
allconfigs_path = os.path.join(self.bytes_output_dir, "AllConfigs.bytes")
with open(allconfigs_path, 'wb') as f:
f.write(binary_data)
self.log(f" 总大小: {len(binary_data)} bytes ({len(binary_data) / 1024:.2f} KB)\n")
return True
except Exception as e:
self.log(f" 生成失败: {str(e)}\n")
self.log(traceback.format_exc())
return False
def generate_dr_code(self, config_info: Dict, properties: List[Dict]) -> str:
"""生成 DR 类代码"""
class_name = config_info['class_name']
item_type = config_info['item_type']
dict_property = config_info['dict_property']
lines = []
lines.append("// 此文件由 DataRowGenerator 自动生成,请勿手动修改")
lines.append(f"// 配置类: {class_name}")
lines.append(f"// 数据类: {item_type}")
lines.append("")
lines.append("using UnityEngine;")
lines.append("using Byway.Config;")
lines.append("using Byway.Thrift.Data;")
lines.append("using UnityGameFramework.Runtime;")
lines.append("")
lines.append("namespace CrazyMaple")
lines.append("{")
lines.append(f" /// <summary>")
lines.append(f" /// {class_name} 数据行")
lines.append(f" /// </summary>")
lines.append(f" public class DR{class_name} : DataRowBase")
lines.append(" {")
lines.append(f" private {item_type} _configData;")
lines.append("")
# Id 属性
id_prop = next((p for p in properties if p['name'].lower() == 'id'), None)
lines.append(" /// <summary>")
lines.append(" /// 唯一标识")
lines.append(" /// </summary>")
lines.append(" public override int Id")
lines.append(" {")
lines.append(" get")
lines.append(" {")
if id_prop:
lines.append(f" return _configData?.{id_prop['name']} ?? 0;")
else:
lines.append(" return 0;")
lines.append(" }")
lines.append(" }")
lines.append("")
# 其他属性
for prop in properties:
if prop['name'].lower() == 'id':
continue
lines.append(f" /// <summary>")
lines.append(f" /// {prop['name']}")
lines.append(f" /// </summary>")
# 特殊处理double类型在DR中返回float
original_type = prop.get('original_type', '').lower()
if 'double' in original_type:
lines.append(f" public float {prop['name']}")
lines.append(" {")
lines.append(" get")
lines.append(" {")
lines.append(f" return System.Convert.ToSingle(_configData?.{prop['name']} ?? 0.0);")
lines.append(" }")
lines.append(" }")
else:
lines.append(f" public {prop['type']} {prop['name']}")
lines.append(" {")
lines.append(" get")
lines.append(" {")
lines.append(f" return _configData?.{prop['name']} ?? {prop['default']};")
lines.append(" }")
lines.append(" }")
lines.append("")
# LoadFromConfig 方法
lines.append(" /// <summary>")
lines.append(" /// 从配置加载数据")
lines.append(" /// </summary>")
lines.append(" public void LoadFromConfig(int id)")
lines.append(" {")
# UIForm 特殊处理
if class_name == "UIForm":
lines.append(f" var config = ConfigManager.Instance.GetConfig<Byway.Thrift.Data.UIForm>();")
else:
lines.append(f" var config = ConfigManager.Instance.GetConfig<{class_name}>();")
lines.append(f" if (config?.{dict_property} != null)")
lines.append(" {")
lines.append(f" config.{dict_property}.TryGetValue(id, out _configData);")
lines.append(" }")
lines.append(" }")
lines.append("")
# ParseDataRow 方法
lines.append(" /// <summary>")
lines.append(" /// 解析数据行(兼容旧系统,实际使用 LoadFromConfig")
lines.append(" /// </summary>")
lines.append(" public override bool ParseDataRow(string dataRowString, object userData)")
lines.append(" {")
lines.append(" int id = 0;")
lines.append(" if (int.TryParse(dataRowString, out id))")
lines.append(" {")
lines.append(" LoadFromConfig(id);")
lines.append(" return _configData != null;")
lines.append(" }")
lines.append(" return false;")
lines.append(" }")
lines.append(" }")
lines.append("}")
return '\n'.join(lines)
def main():
root = tk.Tk()
app = IntegratedPipeline(root)
root.mainloop()
if __name__ == "__main__":
main()