153 lines
6.2 KiB
Python
153 lines
6.2 KiB
Python
import os
|
||
import json
|
||
import openpyxl
|
||
import csv
|
||
import shutil
|
||
|
||
# 支持本地覆盖配置:读取 cfg_txt.local.json 深度合并到主配置,
|
||
# 允许在本地设置多个 dest_dirs,不提交到 Git。
|
||
|
||
def deep_merge(base, override):
|
||
if not isinstance(base, dict) or not isinstance(override, dict):
|
||
return override
|
||
result = dict(base)
|
||
for k, v in override.items():
|
||
if k in result and isinstance(result[k], dict) and isinstance(v, dict):
|
||
result[k] = deep_merge(result[k], v)
|
||
else:
|
||
result[k] = v
|
||
return result
|
||
|
||
current_dir = os.getcwd()
|
||
print(current_dir)
|
||
# 读取配置文件
|
||
cfg_main_path = os.path.join(current_dir, 'tool/cfg/cfg_txt.json')
|
||
with open(cfg_main_path, 'r', encoding='utf-8') as f:
|
||
cfg_main = json.load(f)
|
||
|
||
# 尝试读取本地覆盖文件(忽略不存在)
|
||
cfg_local_path = os.path.join(current_dir, 'tool/cfg/cfg_txt.local.json')
|
||
if os.path.isfile(cfg_local_path):
|
||
try:
|
||
with open(cfg_local_path, 'r', encoding='utf-8') as lf:
|
||
cfg_local = json.load(lf)
|
||
cfg = deep_merge(cfg_main, cfg_local)
|
||
print(f"Loaded local override: {cfg_local_path}")
|
||
except Exception as e:
|
||
print(f"加载本地覆盖文件失败,使用主配置: {e}")
|
||
cfg = cfg_main
|
||
else:
|
||
cfg = cfg_main
|
||
|
||
# 尝试读取 files 配置文件(可纳入Git管理)
|
||
cfg_files_path = os.path.join(current_dir, 'tool/cfg/cfg_txt_files.json')
|
||
if os.path.isfile(cfg_files_path):
|
||
try:
|
||
with open(cfg_files_path, 'r', encoding='utf-8') as ff:
|
||
cfg_files = json.load(ff)
|
||
cfg = deep_merge(cfg, cfg_files)
|
||
print(f"Loaded files config: {cfg_files_path}")
|
||
except Exception as e:
|
||
print(f"加载files配置文件失败: {e}")
|
||
|
||
def read_table(file_path, sheet_name=None):
|
||
if file_path.lower().endswith('.xlsx'):
|
||
workbook = openpyxl.load_workbook(file_path, data_only=True)
|
||
sheet = workbook[sheet_name] if sheet_name else workbook.active
|
||
fieldnames = [cell.value for cell in sheet[1]]
|
||
rows = list(sheet.iter_rows(values_only=True))[2:]
|
||
elif file_path.lower().endswith('.csv'):
|
||
with open(file_path, 'r', encoding='utf-8') as csvfile:
|
||
reader = csv.reader(csvfile)
|
||
data = list(reader)
|
||
fieldnames = data[0]
|
||
rows = data[2:]
|
||
else:
|
||
raise ValueError("Unsupported file type")
|
||
return fieldnames, rows
|
||
|
||
file_list = cfg['file_list']
|
||
target_dir = cfg['target_dir']
|
||
source_dir = cfg['source_dir']
|
||
fields_to_remove = cfg.get('fields_to_remove', [])
|
||
post_move_cfg = cfg.get('post_move', {})
|
||
|
||
# 确保目标目录存在
|
||
os.makedirs(target_dir, exist_ok=True)
|
||
|
||
# 遍历文件列表并转换文件
|
||
for file_cfg in file_list:
|
||
source_file_path = os.path.join(current_dir, source_dir, file_cfg["in_file"])
|
||
target_file_path = os.path.join(current_dir, target_dir, file_cfg["out_file"])
|
||
sheet_name = file_cfg["sheet_name"]
|
||
|
||
# 读取XLSX文件,data_only=True 确保读取公式的计算值而非公式本身
|
||
workbook = openpyxl.load_workbook(source_file_path, data_only=True)
|
||
sheet = workbook[sheet_name] if sheet_name else workbook.active
|
||
|
||
# 将数据写入TXT文件
|
||
with open(target_file_path, 'w', encoding='utf-8') as txt_file:
|
||
header = [str(cell.value) for cell in sheet[1] if cell.value not in fields_to_remove]
|
||
txt_file.write('#\t 界面配置表\n')
|
||
i = 0
|
||
for row in sheet.iter_rows(values_only=True):
|
||
i += 1
|
||
if i < 3 :
|
||
txt_file.write('#\t')
|
||
else:
|
||
txt_file.write('\t')
|
||
if i == 2:
|
||
txt_file.write('\t'.join(file_cfg["coloum_type"]) + '\n')
|
||
txt_file.write('#\t')
|
||
row_data = [str(cell) if cell is not None else "" for cell in row if cell not in fields_to_remove]
|
||
txt_file.write('\t'.join(row_data) + '\n')
|
||
|
||
print(f"Converted: {source_file_path} to {target_file_path}")
|
||
|
||
# Post-copy to multiple destination directories if configured
|
||
if post_move_cfg and post_move_cfg.get('enabled', False):
|
||
files = post_move_cfg.get('files') or []
|
||
# 支持新字段 dest_dirs(List),兼容旧字段 dest_dir(String)
|
||
dest_dirs = []
|
||
if 'dest_dirs' in post_move_cfg and isinstance(post_move_cfg['dest_dirs'], list):
|
||
dest_dirs = [d for d in post_move_cfg['dest_dirs'] if isinstance(d, str) and d.strip()]
|
||
elif 'dest_dir' in post_move_cfg and isinstance(post_move_cfg['dest_dir'], str):
|
||
# 兼容旧写法
|
||
dest_dirs = [post_move_cfg['dest_dir']]
|
||
|
||
if not dest_dirs:
|
||
print('[post_copy] 未配置任何 dest_dirs,跳过复制步骤')
|
||
else:
|
||
total_targets = len(dest_dirs)
|
||
for dest_dir_cfg in dest_dirs:
|
||
dest_dir_abs = dest_dir_cfg if os.path.isabs(dest_dir_cfg) else os.path.join(current_dir, dest_dir_cfg)
|
||
os.makedirs(dest_dir_abs, exist_ok=True)
|
||
copied_count = 0
|
||
for fname in files:
|
||
src_path = os.path.join(current_dir, target_dir, fname)
|
||
dst_path = os.path.join(dest_dir_abs, fname)
|
||
|
||
if not os.path.isfile(src_path):
|
||
print(f"[post_copy] 源文件不存在,跳过: {src_path}")
|
||
continue
|
||
|
||
# 覆盖目标
|
||
if os.path.exists(dst_path):
|
||
if os.path.isdir(dst_path):
|
||
print(f"[post_copy] 目标路径是目录,跳过: {dst_path}")
|
||
continue
|
||
try:
|
||
os.remove(dst_path)
|
||
except Exception as e:
|
||
print(f"[post_copy] 覆盖目标文件失败: {dst_path}, 错误: {e}")
|
||
continue
|
||
|
||
try:
|
||
shutil.copy2(src_path, dst_path)
|
||
copied_count += 1
|
||
print(f"[post_copy] copy 完成: {src_path} -> {dst_path}")
|
||
except Exception as e:
|
||
print(f"[post_copy] copy 失败: {src_path} -> {dst_path}, 错误: {e}")
|
||
|
||
print(f"[post_copy] 目录完成: {dest_dir_abs} 已复制 {copied_count}/{len(files)} 个文件")
|
||
print(f"[post_copy] 共处理 {total_targets} 个目标目录") |