thrift-related/PythonWorkSpace/IntegratedTool/integrated_pipeline.py
2026-01-12 19:24:48 +08:00

1160 lines
53 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Thrift 配置文件完整流程工具
从 Excel 到 Unity 的一站式处理
"""
import tkinter as tk
from tkinter import ttk, filedialog, messagebox, scrolledtext
import json
import os
import sys
import re
import subprocess
import shutil
import importlib
from pathlib import Path
from typing import List, Dict, Any, Tuple
import openpyxl
from datetime import datetime
class IntegratedPipeline:
def __init__(self, root):
self.root = root
self.root.title("Thrift 完整流程工具")
self.root.geometry("1200x850")
# 配置路径
self.cfg_json_path = ""
self.compiler_path = ""
self.thrift_dir = ""
self.csharp_output_dir = ""
self.bytes_output_dir = ""
self.config_dir = "" # xlsx 文件夹
self.unity_csharp_dir = ""
self.unity_bytes_dir = ""
# 获取配置文件路径支持exe模式
if getattr(sys, 'frozen', False):
app_dir = Path(os.path.dirname(sys.executable))
else:
app_dir = Path(__file__).parent
self.config_file = app_dir / "pipeline_config.json"
self.gen_py_dir = app_dir / "gen-py" # 临时Python代码目录
# 报告数据
self.report = {
'start_time': None,
'end_time': None,
'steps': []
}
self.load_config()
self.setup_ui()
def load_config(self):
"""加载配置"""
try:
if self.config_file.exists():
with open(self.config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
self.cfg_json_path = config.get('cfg_json_path', '')
self.compiler_path = config.get('compiler_path', '')
self.thrift_dir = config.get('thrift_dir', '')
self.csharp_output_dir = config.get('csharp_output_dir', '')
self.bytes_output_dir = config.get('bytes_output_dir', '')
self.config_dir = config.get('config_dir', '')
self.unity_csharp_dir = config.get('unity_csharp_dir', '')
self.unity_bytes_dir = config.get('unity_bytes_dir', '')
except Exception as e:
print(f"加载配置失败: {e}")
def save_config(self):
"""保存配置"""
try:
config = {
'cfg_json_path': self.cfg_json_path,
'compiler_path': self.compiler_path,
'thrift_dir': self.thrift_dir,
'csharp_output_dir': self.csharp_output_dir,
'bytes_output_dir': self.bytes_output_dir,
'config_dir': self.config_dir,
'unity_csharp_dir': self.unity_csharp_dir,
'unity_bytes_dir': self.unity_bytes_dir
}
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"保存配置失败: {e}")
def setup_ui(self):
"""设置UI界面"""
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
main_frame.columnconfigure(1, weight=1)
main_frame.rowconfigure(9, weight=1)
row = 0
# cfg_txt.json
ttk.Label(main_frame, text="cfg_txt.json:").grid(row=row, column=0, sticky=tk.W, pady=5)
self.cfg_json_var = tk.StringVar(value=self.cfg_json_path)
ttk.Entry(main_frame, textvariable=self.cfg_json_var, width=85).grid(row=row, column=1, sticky=(tk.W, tk.E), pady=5, padx=5)
ttk.Button(main_frame, text="选择", command=lambda: self.select_file('cfg_json', 'cfg_txt.json', [("JSON", "*.json")])).grid(row=row, column=2, pady=5)
row += 1
# Thrift 编译器
ttk.Label(main_frame, text="Thrift 编译器:").grid(row=row, column=0, sticky=tk.W, pady=5)
self.compiler_var = tk.StringVar(value=self.compiler_path)
ttk.Entry(main_frame, textvariable=self.compiler_var, width=85).grid(row=row, column=1, sticky=(tk.W, tk.E), pady=5, padx=5)
ttk.Button(main_frame, text="选择", command=lambda: self.select_file('compiler', 'Thrift编译器', [("EXE", "*.exe")])).grid(row=row, column=2, pady=5)
row += 1
# Thrift 文件路径
ttk.Label(main_frame, text="Thrift 文件路径:").grid(row=row, column=0, sticky=tk.W, pady=5)
self.thrift_dir_var = tk.StringVar(value=self.thrift_dir)
ttk.Entry(main_frame, textvariable=self.thrift_dir_var, width=85).grid(row=row, column=1, sticky=(tk.W, tk.E), pady=5, padx=5)
ttk.Button(main_frame, text="选择", command=lambda: self.select_directory('thrift_dir', 'Thrift文件路径')).grid(row=row, column=2, pady=5)
row += 1
# C# 输出路径
ttk.Label(main_frame, text="C# 输出路径:").grid(row=row, column=0, sticky=tk.W, pady=5)
self.csharp_output_dir_var = tk.StringVar(value=self.csharp_output_dir)
ttk.Entry(main_frame, textvariable=self.csharp_output_dir_var, width=85).grid(row=row, column=1, sticky=(tk.W, tk.E), pady=5, padx=5)
ttk.Button(main_frame, text="选择", command=lambda: self.select_directory('csharp_output_dir', 'C#输出路径')).grid(row=row, column=2, pady=5)
row += 1
# Bytes 输出路径
ttk.Label(main_frame, text="Bytes 输出路径:").grid(row=row, column=0, sticky=tk.W, pady=5)
self.bytes_output_dir_var = tk.StringVar(value=self.bytes_output_dir)
ttk.Entry(main_frame, textvariable=self.bytes_output_dir_var, width=85).grid(row=row, column=1, sticky=(tk.W, tk.E), pady=5, padx=5)
ttk.Button(main_frame, text="选择", command=lambda: self.select_directory('bytes_output_dir', 'Bytes输出路径')).grid(row=row, column=2, pady=5)
row += 1
# Config 路径 (xlsx)
ttk.Label(main_frame, text="Config 路径 (xlsx):").grid(row=row, column=0, sticky=tk.W, pady=5)
self.config_dir_var = tk.StringVar(value=self.config_dir)
ttk.Entry(main_frame, textvariable=self.config_dir_var, width=85).grid(row=row, column=1, sticky=(tk.W, tk.E), pady=5, padx=5)
ttk.Button(main_frame, text="选择", command=lambda: self.select_directory('config_dir', 'Config路径')).grid(row=row, column=2, pady=5)
row += 1
# Unity C# 路径
ttk.Label(main_frame, text="Unity C# 路径:").grid(row=row, column=0, sticky=tk.W, pady=5)
self.unity_csharp_dir_var = tk.StringVar(value=self.unity_csharp_dir)
ttk.Entry(main_frame, textvariable=self.unity_csharp_dir_var, width=85).grid(row=row, column=1, sticky=(tk.W, tk.E), pady=5, padx=5)
ttk.Button(main_frame, text="选择", command=lambda: self.select_directory('unity_csharp_dir', 'Unity C#路径')).grid(row=row, column=2, pady=5)
row += 1
# Unity Bytes 路径
ttk.Label(main_frame, text="Unity Bytes 路径:").grid(row=row, column=0, sticky=tk.W, pady=5)
self.unity_bytes_dir_var = tk.StringVar(value=self.unity_bytes_dir)
ttk.Entry(main_frame, textvariable=self.unity_bytes_dir_var, width=85).grid(row=row, column=1, sticky=(tk.W, tk.E), pady=5, padx=5)
ttk.Button(main_frame, text="选择", command=lambda: self.select_directory('unity_bytes_dir', 'Unity Bytes路径')).grid(row=row, column=2, pady=5)
row += 1
# 开始按钮
start_btn = ttk.Button(main_frame, text="🚀 开始执行完整流程", command=self.run_pipeline)
start_btn.grid(row=row, column=0, columnspan=3, pady=20, ipadx=80, ipady=15)
row += 1
# 日志区域
ttk.Label(main_frame, text="执行日志:", font=("", 10, "bold")).grid(row=row, column=0, sticky=tk.W, pady=5)
row += 1
self.log_text = scrolledtext.ScrolledText(main_frame, width=140, height=30, wrap=tk.WORD, font=("Consolas", 9))
self.log_text.grid(row=row, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5)
def select_file(self, attr_name: str, title: str, filetypes: List):
"""选择文件"""
current_val = getattr(self, attr_name + '_path')
initialdir = os.path.dirname(current_val) if current_val else None
file_path = filedialog.askopenfilename(title=f"选择{title}", initialdir=initialdir, filetypes=filetypes)
if file_path:
setattr(self, attr_name + '_path', file_path)
getattr(self, attr_name + '_var').set(file_path)
self.save_config()
self.log(f"✓ 已选择 {title}: {file_path}\n")
def select_directory(self, attr_name: str, title: str):
"""选择目录"""
current_val = getattr(self, attr_name)
initialdir = current_val if current_val else None
dir_path = filedialog.askdirectory(title=f"选择{title}", initialdir=initialdir)
if dir_path:
setattr(self, attr_name, dir_path)
getattr(self, attr_name + '_var').set(dir_path)
self.save_config()
self.log(f"✓ 已选择 {title}: {dir_path}\n")
def log(self, message: str):
"""输出日志"""
try:
self.log_text.insert(tk.END, message)
self.log_text.see(tk.END)
self.root.update()
except Exception as e:
print(f"日志输出失败: {e}")
def clear_log(self):
"""清空日志"""
self.log_text.delete(1.0, tk.END)
def add_step_report(self, step_name: str, status: str, details: Dict = None):
"""添加步骤报告"""
step = {
'name': step_name,
'status': status,
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'details': details or {}
}
self.report['steps'].append(step)
def generate_report(self) -> str:
"""生成最终报告"""
report_lines = []
# =============== 简报部分 ===============
report_lines.append("="*100)
report_lines.append(" Thrift 流程执行简报")
report_lines.append("="*100)
report_lines.append(f"执行时间: {self.report['start_time']}{self.report['end_time']}")
report_lines.append("")
# 统计总体状态
total_errors = 0
step_summary = []
for idx, step in enumerate(self.report['steps'], 1):
status_icon = "" if step['status'] == 'success' else ""
status_text = "成功" if step['status'] == 'success' else "失败"
# 提取关键数字
summary_text = f"{idx}. [{status_icon}] {step['name']}: {status_text}"
if step['details']:
details = step['details']
# 根据不同步骤提取关键信息
if 'success' in details and 'total' in details:
summary_text += f" - {details['success']}/{details['total']} 个文件"
if 'failed' in details and details['failed'] > 0:
summary_text += f" (失败: {details['failed']})"
total_errors += details['failed']
elif 'count' in details:
summary_text += f" - {details['count']} 个文件"
if 'skipped' in details and details['skipped']:
summary_text += f" (跳过: {len(details['skipped'])})"
elif 'deleted' in details:
summary_text += f" - 删除 {details['deleted']} 个文件"
step_summary.append(summary_text)
if step['status'] != 'success':
total_errors += 1
# 显示简报
for line in step_summary:
report_lines.append(line)
report_lines.append("")
if total_errors == 0:
report_lines.append("【总体状态】✓ 所有步骤成功完成,无错误")
else:
report_lines.append(f"【总体状态】⚠ 发现 {total_errors} 个问题,详见下方详细报告")
report_lines.append("")
report_lines.append("="*100)
report_lines.append(" 详细执行报告")
report_lines.append("="*100)
report_lines.append("")
# =============== 详细报告部分 ===============
for idx, step in enumerate(self.report['steps'], 1):
status_icon = "" if step['status'] == 'success' else ""
report_lines.append(f"【步骤 {idx}{step['name']}")
report_lines.append(f"状态: [{status_icon}] {'成功' if step['status'] == 'success' else '失败'}")
report_lines.append(f"耗时: {step['time']}")
report_lines.append("-"*80)
if step['details']:
details = step['details']
# 显示基本统计
if 'success' in details and 'total' in details:
report_lines.append(f"处理文件: {details['success']}/{details['total']}")
if 'failed' in details:
report_lines.append(f"失败文件: {details['failed']}")
elif 'count' in details:
report_lines.append(f"处理文件: {details['count']}")
elif 'deleted' in details:
report_lines.append(f"删除文件: {details['deleted']}")
report_lines.append("")
# 显示详细列表
for key, value in details.items():
if isinstance(value, list):
if len(value) > 0:
# 特殊处理 failed_files - 显示完整错误信息
if key == 'failed_files' and value and isinstance(value[0], dict):
report_lines.append(f"❌ 失败详情 ({len(value)} 项):")
for item in value:
report_lines.append(f"{item['name']}")
report_lines.append(f" 错误: {item['error']}")
if 'detail' in item:
# 显示堆栈前10行
detail_lines = item['detail'].split('\n')[:10]
for line in detail_lines:
if line.strip():
report_lines.append(f" {line}")
report_lines.append("")
# 成功的文件列表
elif key in ['success_files', 'files']:
report_lines.append(f"✓ 成功文件 ({len(value)} 项):")
for item in value:
report_lines.append(f"{item}")
# 跳过的文件列表
elif key == 'skipped':
if value:
report_lines.append(f"→ 跳过文件 ({len(value)} 项):")
for item in value:
report_lines.append(f"{item}")
# 其他列表
elif key not in ['success', 'failed', 'total', 'count', 'deleted', 'expected']:
report_lines.append(f"{key} ({len(value)} 项):")
for item in value:
report_lines.append(f"{item}")
# 非列表值(跳过已显示的统计值)
elif key not in ['success', 'failed', 'total', 'count', 'deleted', 'expected', 'error']:
report_lines.append(f"{key}: {value}")
# 显示错误信息
if 'error' in details:
report_lines.append(f"❌ 错误信息: {details['error']}")
report_lines.append("")
report_lines.append("")
# =============== 路径信息 ===============
report_lines.append("="*100)
report_lines.append(" 配置路径信息")
report_lines.append("="*100)
report_lines.append("📁 输入路径:")
report_lines.append(f" • cfg_txt.json: {self.cfg_json_path}")
report_lines.append(f" • Thrift 编译器: {self.compiler_path}")
report_lines.append(f" • Thrift 文件目录: {self.thrift_dir}")
report_lines.append("")
report_lines.append("📁 生成路径:")
report_lines.append(f" • C# 输出根目录: {self.csharp_output_dir}")
report_lines.append(f" • C# 实际目录: {os.path.join(self.csharp_output_dir, 'Byway', 'Thrift', 'Data')}")
report_lines.append(f" • Bytes 输出目录: {self.bytes_output_dir}")
report_lines.append("")
report_lines.append("📁 Unity目标路径:")
report_lines.append(f" • Unity C# 根目录: {self.unity_csharp_dir}")
report_lines.append(f" • Unity C# 实际目录: {os.path.join(self.unity_csharp_dir, 'Byway', 'Thrift', 'Data')}")
report_lines.append(f" • Unity Bytes 目录: {self.unity_bytes_dir}")
report_lines.append("="*100)
return "\n".join(report_lines)
def validate_paths(self) -> bool:
"""验证所有路径"""
errors = []
if not self.cfg_json_path or not os.path.exists(self.cfg_json_path):
errors.append("❌ cfg_txt.json 文件不存在或未选择")
if not self.compiler_path or not os.path.exists(self.compiler_path):
errors.append("❌ Thrift 编译器不存在或未选择")
if not self.thrift_dir:
errors.append("❌ 请选择 Thrift 文件路径")
if not self.csharp_output_dir:
errors.append("❌ 请选择 C# 输出路径")
if not self.bytes_output_dir:
errors.append("❌ 请选择 Bytes 输出路径")
if not self.config_dir or not os.path.exists(self.config_dir):
errors.append("❌ Config 路径Excel文件夹不存在或未选择")
if not self.unity_csharp_dir:
errors.append("❌ 请选择 Unity C# 路径")
if not self.unity_bytes_dir:
errors.append("❌ 请选择 Unity Bytes 路径")
if errors:
error_msg = "\n".join(errors)
messagebox.showerror("配置错误", f"请检查以下配置:\n\n{error_msg}")
return False
return True
def run_pipeline(self):
"""执行完整流程"""
if not self.validate_paths():
return
self.clear_log()
self.report = {'start_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'end_time': None, 'steps': []}
self.log("="*100 + "\n")
self.log("开始执行完整流程...\n")
self.log("="*100 + "\n\n")
try:
# 步骤1: 生成 Thrift 文件
if not self.step1_generate_thrift():
return
# 步骤2: 编译 Thrift 到 C#
if not self.step2_compile_to_csharp():
return
# 步骤3: 生成 Bytes 文件
if not self.step3_generate_bytes():
return
# 步骤4: 清理 Extensions 文件
if not self.step4_cleanup_extensions():
return
# 步骤5: 复制 C# 到 Unity
if not self.step5_copy_csharp():
return
# 步骤6: 复制 Bytes 到 Unity
if not self.step6_copy_bytes():
return
# 步骤7: 清理临时文件
self.step7_cleanup_temp()
self.report['end_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 生成报告
report_text = self.generate_report()
self.log("\n" + report_text)
# 保存报告
report_path = Path(__file__).parent / f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(report_path, 'w', encoding='utf-8') as f:
f.write(report_text)
self.log(f"\n报告已保存: {report_path}\n")
# 原始:根据成功率显示不同类型的消息
total_steps = len(self.report['steps'])
failed_steps = sum(1 for step in self.report['steps'] if step['status'] != 'success')
if failed_steps == 0:
messagebox.showinfo("完成", f"所有步骤执行成功!\n\n详细报告:\n{report_path}")
elif failed_steps < total_steps:
messagebox.showwarning("部分完成", f"部分步骤执行成功\n成功: {total_steps - failed_steps}/{total_steps}\n\n详细报告请查看日志。")
else:
messagebox.showerror("失败", f"所有步骤都失败了\n\n详细报告请查看日志。")
except Exception as e:
self.log(f"\n✗ 执行失败: {str(e)}\n")
import traceback
self.log(traceback.format_exc())
messagebox.showerror("错误", f"执行失败:\n{str(e)}")
def step1_generate_thrift(self) -> bool:
"""步骤1: 根据 xlsx 和 cfg_txt.json 生成 thrift 文件"""
self.log("\n【步骤1】生成 Thrift 文件\n")
self.log("-"*80 + "\n")
try:
# 读取 cfg_txt.json
with open(self.cfg_json_path, 'r', encoding='utf-8') as f:
cfg = json.load(f)
file_list = cfg.get('file_list', [])
# 创建 thrift 目录
os.makedirs(self.thrift_dir, exist_ok=True)
generated_count = 0
struct_names = [] # 收集所有结构名,用于生成 AllConfigs
for config_item in file_list:
in_file = config_item.get('in_file', '')
out_file = config_item.get('out_file', '')
sheet_name = config_item.get('sheet_name', '')
coloum_types = config_item.get('coloum_type', []) # 从配置读取类型
if not all([in_file, out_file]):
continue
struct_name = out_file.replace('.txt', '')
excel_path = os.path.join(self.config_dir, in_file)
if not os.path.exists(excel_path):
self.log(f" ⚠ Excel 不存在: {in_file}\n")
continue
# 读取 Excel 获取字段和注释
try:
wb = openpyxl.load_workbook(excel_path, data_only=True)
# 查找 sheet
target_sheet = sheet_name if sheet_name in wb.sheetnames else wb.sheetnames[0]
ws = wb[target_sheet]
# 读取表头(第一行)
first_row = list(ws[1])
headers = []
header_indices = [] # 记录有表头的列索引
for idx, cell in enumerate(first_row):
if cell.value is not None and str(cell.value).strip():
headers.append(str(cell.value).strip())
header_indices.append(idx)
# 读取注释(第二行),只读取有表头的列
comments = []
if ws.max_row >= 2:
second_row = list(ws[2])
for idx in header_indices:
if idx < len(second_row) and second_row[idx].value is not None:
comments.append(str(second_row[idx].value).strip())
else:
comments.append("")
else:
comments = [""] * len(headers)
# 确保类型数量与表头匹配
if len(coloum_types) < len(headers):
# 类型不够,补充为 string
coloum_types = coloum_types + ['string'] * (len(headers) - len(coloum_types))
elif len(coloum_types) > len(headers):
# 类型多了,截断
coloum_types = coloum_types[:len(headers)]
wb.close()
# 生成 thrift 文件
thrift_content = self.generate_thrift_content(struct_name, headers, coloum_types, comments)
thrift_path = os.path.join(self.thrift_dir, f"{struct_name}.thrift")
with open(thrift_path, 'w', encoding='utf-8') as f:
f.write(thrift_content)
generated_count += 1
struct_names.append(struct_name)
self.log(f" ✓ 生成: {struct_name}.thrift\n")
except Exception as e:
self.log(f" ✗ 失败 {struct_name}: {str(e)}\n")
# 生成 AllConfigs.thrift
if struct_names:
self.log(f"\n生成 AllConfigs.thrift...\n")
allconfigs_content = self.generate_allconfigs_content(struct_names)
allconfigs_path = os.path.join(self.thrift_dir, "AllConfigs.thrift")
with open(allconfigs_path, 'w', encoding='utf-8') as f:
f.write(allconfigs_content)
self.log(f" ✓ 生成: AllConfigs.thrift (包含 {len(struct_names)} 个配置)\n")
generated_count += 1
# 保存配置文件列表供后续步骤使用
self.config_struct_names = struct_names # 只包含配置不含AllConfigs
self.log(f"\n生成完成: {generated_count} 个 thrift 文件\n")
self.log(f" 配置文件: {len(struct_names)}\n")
self.log(f" + AllConfigs: 1 个\n")
self.add_step_report("生成Thrift文件", "success", {
"total": generated_count,
"config_files": len(struct_names),
"allconfigs": 1,
"files": struct_names # 完整列表
})
return True
except Exception as e:
self.log(f"✗ 步骤1失败: {str(e)}\n")
self.add_step_report("生成Thrift文件", "failed", {"error": str(e)})
return False
def generate_thrift_content(self, struct_name: str, headers: List[str], types: List[str], comments: List[str]) -> str:
"""生成 thrift 文件内容,支持注释
Args:
struct_name: 结构体名称
headers: 字段名列表
types: 类型列表从cfg_txt.json的coloum_type读取
comments: 注释列表从Excel第2行读取
"""
lines = []
lines.append("namespace netstd Byway.Thrift.Data\n")
lines.append(f"\nstruct {struct_name}Item\n{{")
# 类型映射 - 支持多种格式(新版,更完整)
type_mapping = {
'int': 'i32',
'int32': 'i32',
'i32': 'i32',
'integer': 'i32',
'long': 'i64',
'int64': 'i64',
'i64': 'i64',
'float': 'double',
'double': 'double',
'string': 'string',
'str': 'string',
'text': 'string',
'bool': 'bool',
'boolean': 'bool',
'i8': 'i8',
'i16': 'i16'
}
for idx, header in enumerate(headers):
field_type = 'string' # 默认类型
if idx < len(types) and types[idx]:
type_str = str(types[idx]).strip().lower()
field_type = type_mapping.get(type_str, 'string')
# 调试信息:记录未知类型
if type_str and type_str not in type_mapping:
self.log(f" ⚠ 未知类型 '{types[idx]}' (字段: {header}),使用默认类型 string\n")
# 获取注释
comment = comments[idx] if idx < len(comments) and comments[idx] else ""
# 生成字段,带注释
if comment:
lines.append(f"\t{idx+1}:{field_type} {header}, // {comment}")
else:
lines.append(f"\t{idx+1}:{field_type} {header},")
lines.append("}\n")
lines.append(f"\nstruct {struct_name}\n{{")
lines.append(f"\t1:map<i32,{struct_name}Item> {struct_name.lower()}s,")
lines.append("}\n")
return "\n".join(lines)
def generate_allconfigs_content(self, struct_names: List[str]) -> str:
"""生成 AllConfigs.thrift 文件内容"""
lines = []
lines.append("namespace netstd Byway.Thrift.Data")
# 添加所有 include
for struct_name in struct_names:
lines.append(f'include "{struct_name}.thrift"')
lines.append("")
lines.append("struct AllConfigs")
lines.append("{")
# 添加所有字段(驼峰命名)
for idx, struct_name in enumerate(struct_names, 1):
# 转换为驼峰命名(首字母小写)
field_name = struct_name[0].lower() + struct_name[1:]
lines.append(f"\t{idx}:{struct_name}.{struct_name} {field_name},")
lines.append("}")
return "\n".join(lines)
def step2_compile_to_csharp(self) -> bool:
"""步骤2: 用 thrift.exe 编译成 C#"""
self.log("\n【步骤2】编译 Thrift 到 C#\n")
self.log("-"*80 + "\n")
try:
os.makedirs(self.csharp_output_dir, exist_ok=True)
thrift_files = [f for f in os.listdir(self.thrift_dir) if f.endswith('.thrift')]
success_count = 0
failed_list = []
self.log(f" 找到 {len(thrift_files)} 个 thrift 文件\n")
self.log(f" 输出目录: {self.csharp_output_dir}\n\n")
for idx, thrift_file in enumerate(thrift_files, 1):
thrift_path = os.path.join(self.thrift_dir, thrift_file)
cmd = [
self.compiler_path,
'-strict', '-v', '-r',
'--gen', 'netstd:unity,serial,async_postfix',
'-out', self.csharp_output_dir,
thrift_path
]
# 原始:添加超时设置
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
success_count += 1
# 原始:改进进度显示
if idx % 10 == 0 or idx == 1 or idx == len(thrift_files):
self.log(f" ✓ [{idx}/{len(thrift_files)}] {thrift_file}\n")
self.root.update() # 原始刷新UI
else:
self.log(f" ✗ 编译失败: {thrift_file}\n")
if result.stderr:
self.log(f" 错误: {result.stderr[:100]}\n")
failed_list.append(thrift_file)
if success_count > 5:
self.log(f" ... 共编译 {success_count} 个文件\n")
# 检查生成的C#文件
csharp_actual_dir = os.path.join(self.csharp_output_dir, 'Byway', 'Thrift', 'Data')
cs_files = []
if os.path.exists(csharp_actual_dir):
cs_files = [f for f in os.listdir(csharp_actual_dir) if f.endswith('.cs')]
self.log(f"\n C# 文件生成在: {csharp_actual_dir}\n")
self.log(f" 生成了 {len(cs_files)} 个 C# 文件\n")
self.log(f"\n编译完成: {success_count}/{len(thrift_files)}\n")
self.add_step_report("编译到C#", "success", {
"success": success_count,
"failed": len(failed_list),
"total": len(thrift_files),
"cs_files_count": len(cs_files),
"failed_files": failed_list
})
return True
except Exception as e:
self.log(f"✗ 步骤2失败: {str(e)}\n")
self.add_step_report("编译到C#", "failed", {"error": str(e)})
return False
def step3_generate_bytes(self) -> bool:
"""步骤3: 生成 bytes 文件"""
self.log("\n【步骤3】生成 Bytes 文件\n")
self.log("-"*80 + "\n")
try:
# 先生成 gen-py
os.makedirs(self.gen_py_dir, exist_ok=True)
thrift_files = [f for f in os.listdir(self.thrift_dir) if f.endswith('.thrift')]
# 编译为 Python
for thrift_file in thrift_files:
thrift_path = os.path.join(self.thrift_dir, thrift_file)
cmd = [self.compiler_path, '--gen', 'py', '-out', str(self.gen_py_dir), thrift_path]
subprocess.run(cmd, capture_output=True)
self.log(f" ✓ 生成 gen-py 完成\n")
# 导入 thrift 库并生成 bytes
from thrift.protocol import TBinaryProtocol
from thrift.transport import TTransport
sys.path.insert(0, str(self.gen_py_dir))
# 读取配置
with open(self.cfg_json_path, 'r', encoding='utf-8') as f:
cfg = json.load(f)
os.makedirs(self.bytes_output_dir, exist_ok=True)
success_count = 0
failed_list = []
success_list = []
file_list = cfg.get('file_list', [])
self.log(f" 找到 {len(file_list)} 个配置项\n\n")
for idx, config_item in enumerate(file_list, 1):
in_file = config_item.get('in_file', '')
out_file = config_item.get('out_file', '')
sheet_name = config_item.get('sheet_name', '')
coloum_types = config_item.get('coloum_type', []) # 从配置读取类型
# 检查配置完整性(原始:检查所有必需项)
if not all([in_file, out_file, sheet_name, coloum_types]):
self.log(f" [{idx}/{len(file_list)}] ⚠ 配置项不完整,跳过: {in_file}\n\n")
continue
struct_name = out_file.replace('.txt', '')
excel_path = os.path.join(self.config_dir, in_file)
# 原始:显示处理进度
self.log(f" [{idx}/{len(file_list)}] 处理: {in_file} -> {struct_name}\n")
try:
# 动态导入模块
module_name = f"{struct_name}.ttypes"
ttypes_module = importlib.import_module(module_name)
item_class = getattr(ttypes_module, f"{struct_name}Item")
container_class = getattr(ttypes_module, struct_name)
# 读取 Excel 数据
wb = openpyxl.load_workbook(excel_path, data_only=True)
target_sheet = sheet_name if sheet_name in wb.sheetnames else wb.sheetnames[0]
ws = wb[target_sheet]
# 读取表头(第一行),记录有表头的列索引(原始:使用字典)
first_row = list(ws[1])
headers = []
header_indices = []
excel_headers = {} # {header_name: column_index}
for idx, cell in enumerate(first_row):
if cell.value is not None and str(cell.value).strip():
header_name = str(cell.value).strip()
headers.append(header_name)
header_indices.append(idx)
excel_headers[header_name] = idx
# 验证字段匹配(原始:添加缺少字段警告)
missing_fields = []
for header in headers:
# 这里简单检查如果需要可以和thrift定义对比
pass
# 读取数据从第3行开始使用字典存储原始更安全
data_rows = []
for row in ws.iter_rows(min_row=3, values_only=False):
row_data = {}
has_data = False
for header in headers:
col_idx = excel_headers[header]
if col_idx < len(row):
cell_value = row[col_idx].value
if cell_value is not None:
has_data = True
row_data[header] = cell_value
else:
row_data[header] = None
if has_data:
data_rows.append(row_data)
wb.close()
# 创建容器
container = container_class()
container_items = {}
for idx, row_data in enumerate(data_rows):
if not any(row_data.values()):
continue
item = item_class()
# 获取ID原始灵活查找多个可能的ID列名
id_value = None
for id_field in ['id', 'Id', 'ID', 'Key']:
if id_field in row_data and row_data[id_field] is not None:
id_value = row_data[id_field]
break
# 如果找不到ID使用当前方式兜底
if id_value is None:
# 尝试使用第一个字段
if headers and headers[0] in row_data:
id_value = row_data[headers[0]]
if id_value is None:
id_value = idx + 1
try:
id_value = int(id_value)
except (ValueError, TypeError):
id_value = idx + 1
# 填充每个字段
for col_idx, header in enumerate(headers):
value = row_data.get(header)
if value is not None:
# 类型转换使用cfg_txt.json的coloum_type原始详细错误处理
if col_idx < len(coloum_types) and coloum_types[col_idx]:
type_str = str(coloum_types[col_idx]).lower().strip()
try:
if type_str in ['int', 'i32', 'i16', 'i8', 'int32', 'integer']:
value = int(value)
elif type_str in ['long', 'i64', 'int64']:
value = int(value)
elif type_str in ['float', 'double']:
value = float(value)
elif type_str in ['bool', 'boolean']:
value = bool(value)
else:
value = str(value)
# 设置字段原始setattr在try内
setattr(item, header, value)
except (ValueError, TypeError) as e:
# 原始:详细错误信息
raise ValueError(f"字段 '{header}' (类型: {type_str}) 转换失败,值: '{value}' (行ID: {id_value})")
except AttributeError as e:
raise AttributeError(f"字段 '{header}' 不存在于结构体中 (行ID: {id_value})")
else:
# 没有类型定义,转为字符串
try:
setattr(item, header, str(value))
except AttributeError as e:
raise AttributeError(f"字段 '{header}' 不存在于结构体中 (行ID: {id_value})")
container_items[id_value] = item
setattr(container, f"{struct_name.lower()}s", container_items)
# 序列化
transport = TTransport.TMemoryBuffer()
protocol = TBinaryProtocol.TBinaryProtocol(transport)
container.write(protocol)
binary_data = transport.getvalue()
# 写入文件
bytes_path = os.path.join(self.bytes_output_dir, f"{struct_name}.bytes")
with open(bytes_path, 'wb') as f:
f.write(binary_data)
self.log(f"{struct_name}.bytes ({len(binary_data)} bytes)\n")
success_count += 1
success_list.append(struct_name)
# 原始定期刷新UI
if idx % 10 == 0 or idx == 1 or idx == len(file_list):
self.root.update()
self.log("\n")
except Exception as e:
import traceback
error_detail = traceback.format_exc()
self.log(f"{struct_name}: {str(e)}\n")
self.log(f" 详细错误:\n{error_detail}\n")
failed_list.append({
"name": struct_name,
"error": str(e),
"detail": error_detail
})
# 保存成功生成的列表供步骤6使用
self.generated_bytes_list = success_list
self.log(f"\n生成完成: {success_count} 个 bytes 文件\n")
if failed_list:
self.log(f" 失败: {len(failed_list)}\n")
for failed in failed_list:
self.log(f" - {failed['name']}: {failed['error'][:100]}\n")
self.add_step_report("生成Bytes文件", "success", {
"success": success_count,
"failed": len(failed_list),
"total": len(file_list),
"success_files": success_list,
"failed_files": failed_list # 完整错误信息
})
return True
except Exception as e:
self.log(f"✗ 步骤3失败: {str(e)}\n")
import traceback
self.log(traceback.format_exc())
self.add_step_report("生成Bytes文件", "failed", {"error": str(e)})
return False
def step4_cleanup_extensions(self) -> bool:
"""步骤4: 清理多余的 Extensions 文件"""
self.log("\n【步骤4】清理 Extensions 文件\n")
self.log("-"*80 + "\n")
try:
deleted_count = 0
deleted_files = []
# C# 文件实际在 Byway\Thrift\Data 子目录中
csharp_actual_dir = os.path.join(self.csharp_output_dir, 'Byway', 'Thrift', 'Data')
if os.path.exists(csharp_actual_dir):
self.log(f" 扫描目录: {csharp_actual_dir}\n")
# 获取配置文件列表不含AllConfigs
config_extensions = {f"{name}.Extensions.cs" for name in self.config_struct_names}
for file in os.listdir(csharp_actual_dir):
if file.endswith('.Extensions.cs'):
# 只删除配置相关的Extensions不含AllConfigs.Extensions.cs
if file in config_extensions:
file_path = os.path.join(csharp_actual_dir, file)
os.remove(file_path)
self.log(f" ✓ 删除: {file}\n")
deleted_files.append(file)
deleted_count += 1
elif file == 'AllConfigs.Extensions.cs':
self.log(f" → 保留: {file}\n")
else:
self.log(f" → 跳过: {file} (不在配置中)\n")
else:
self.log(f" ⚠ 未找到C#输出目录: {csharp_actual_dir}\n")
self.log(f"\n清理完成: 删除 {deleted_count} 个文件\n")
self.add_step_report("清理Extensions", "success", {
"deleted": deleted_count,
"files": deleted_files # 完整列表
})
return True
except Exception as e:
self.log(f"✗ 步骤4失败: {str(e)}\n")
self.add_step_report("清理Extensions", "failed", {"error": str(e)})
return False
def step5_copy_csharp(self) -> bool:
"""步骤5: 复制 C# 文件到 Unity"""
self.log("\n【步骤5】复制 C# 文件到 Unity\n")
self.log("-"*80 + "\n")
try:
# 在Unity目录下也创建 Byway\Thrift\Data 子目录结构
unity_target_dir = os.path.join(self.unity_csharp_dir, 'Byway', 'Thrift', 'Data')
os.makedirs(unity_target_dir, exist_ok=True)
copied_count = 0
copied_files = []
skipped_files = []
# C# 文件在 Byway\Thrift\Data 子目录中
csharp_actual_dir = os.path.join(self.csharp_output_dir, 'Byway', 'Thrift', 'Data')
if os.path.exists(csharp_actual_dir):
self.log(f" 源目录: {csharp_actual_dir}\n")
self.log(f" 目标目录: {unity_target_dir}\n\n")
# 配置的Extensions文件已在step4被删除不应该存在
config_extensions = {f"{name}.Extensions.cs" for name in self.config_struct_names}
for file in os.listdir(csharp_actual_dir):
if file.endswith('.cs'):
# 跳过配置的Extensions文件理论上已被删除但双重保险
if file in config_extensions:
skipped_files.append(f"{file} (配置Extensions)")
self.log(f" → 跳过: {file} (配置Extensions不应存在)\n")
else:
# 复制所有其他C#文件主文件、Item文件、AllConfigs.Extensions.cs等
src = os.path.join(csharp_actual_dir, file)
dst = os.path.join(unity_target_dir, file)
shutil.copy2(src, dst)
copied_files.append(file)
copied_count += 1
# 标记文件类型
if file.endswith('Item.cs'):
self.log(f"{file} 📋 (Item文件)\n")
elif file == 'AllConfigs.Extensions.cs':
self.log(f"{file} 🔧 (保留的Extensions)\n")
elif file.endswith('.Extensions.cs'):
self.log(f"{file} 🔧 (非配置Extensions)\n")
else:
self.log(f"{file}\n")
else:
self.log(f" ✗ 未找到C#文件: {csharp_actual_dir}\n")
self.log(f"\n复制完成: {copied_count} 个 C# 文件\n")
if skipped_files:
self.log(f" 跳过: {len(skipped_files)} 个文件\n")
self.add_step_report("复制C#到Unity", "success", {
"count": copied_count,
"files": copied_files, # 完整列表
"skipped": skipped_files
})
return True
except Exception as e:
self.log(f"✗ 步骤5失败: {str(e)}\n")
self.add_step_report("复制C#到Unity", "failed", {"error": str(e)})
return False
def step6_copy_bytes(self) -> bool:
"""步骤6: 复制 Bytes 文件到 Unity"""
self.log("\n【步骤6】复制 Bytes 文件到 Unity\n")
self.log("-"*80 + "\n")
try:
os.makedirs(self.unity_bytes_dir, exist_ok=True)
copied_count = 0
copied_files = []
skipped_files = []
self.log(f" 源目录: {self.bytes_output_dir}\n")
self.log(f" 目标目录: {self.unity_bytes_dir}\n")
self.log(f" 预期文件: {len(self.generated_bytes_list)} 个 (步骤3成功生成的)\n\n")
# 只复制步骤3中成功生成的bytes文件
bytes_to_copy = {f"{name}.bytes" for name in self.generated_bytes_list}
for file in os.listdir(self.bytes_output_dir):
if file.endswith('.bytes'):
if file in bytes_to_copy:
src = os.path.join(self.bytes_output_dir, file)
dst = os.path.join(self.unity_bytes_dir, file)
shutil.copy2(src, dst)
copied_files.append(file)
copied_count += 1
self.log(f"{file}\n")
else:
skipped_files.append(file)
self.log(f" → 跳过: {file} (不在成功生成列表中)\n")
self.log(f"\n复制完成: {copied_count} 个 bytes 文件\n")
# 验证数量一致性
if copied_count != len(self.generated_bytes_list):
self.log(f" ⚠ 警告: 复制数量({copied_count})与生成数量({len(self.generated_bytes_list)})不一致!\n")
self.add_step_report("复制Bytes到Unity", "success", {
"count": copied_count,
"expected": len(self.generated_bytes_list),
"files": copied_files, # 完整列表
"skipped": skipped_files
})
return True
except Exception as e:
self.log(f"✗ 步骤6失败: {str(e)}\n")
self.add_step_report("复制Bytes到Unity", "failed", {"error": str(e)})
return False
def step7_cleanup_temp(self):
"""步骤7: 清理临时文件 (gen-py)"""
self.log("\n【步骤7】清理临时文件\n")
self.log("-"*80 + "\n")
try:
if self.gen_py_dir.exists():
shutil.rmtree(self.gen_py_dir)
self.log(" ✓ 清理 gen-py 目录\n")
self.add_step_report("清理临时文件", "success", {})
except Exception as e:
self.log(f" ⚠ 清理失败: {str(e)}\n")
self.add_step_report("清理临时文件", "failed", {"error": str(e)})
def main():
root = tk.Tk()
app = IntegratedPipeline(root)
root.mainloop()
if __name__ == "__main__":
main()