116 lines
4.6 KiB
Python
116 lines
4.6 KiB
Python
import os
|
||
import json
|
||
import openpyxl
|
||
import csv
|
||
import shutil
|
||
|
||
current_dir = os.getcwd()
|
||
print(current_dir)
|
||
# 读取配置文件
|
||
cfg_path = os.path.join(current_dir, 'tool/cfg/cfg_txt.json')
|
||
with open(cfg_path, 'r', encoding='utf-8') as f:
|
||
cfg = json.load(f)
|
||
|
||
def read_table(file_path, sheet_name=None):
|
||
if file_path.lower().endswith('.xlsx'):
|
||
workbook = openpyxl.load_workbook(file_path)
|
||
sheet = workbook[sheet_name] if sheet_name else workbook.active
|
||
fieldnames = [cell.value for cell in sheet[1]]
|
||
rows = list(sheet.iter_rows(values_only=True))[2:]
|
||
elif file_path.lower().endswith('.csv'):
|
||
with open(file_path, 'r', encoding='utf-8') as csvfile:
|
||
reader = csv.reader(csvfile)
|
||
data = list(reader)
|
||
fieldnames = data[0]
|
||
rows = data[2:]
|
||
else:
|
||
raise ValueError("Unsupported file type")
|
||
return fieldnames, rows
|
||
|
||
file_list = cfg['file_list']
|
||
target_dir = cfg['target_dir']
|
||
source_dir = cfg['source_dir']
|
||
fields_to_remove = cfg.get('fields_to_remove', [])
|
||
post_move_cfg = cfg.get('post_move', {})
|
||
|
||
# 确保目标目录存在
|
||
os.makedirs(target_dir, exist_ok=True)
|
||
|
||
# 遍历文件列表并转换文件
|
||
for file_cfg in file_list:
|
||
source_file_path = os.path.join(current_dir, source_dir, file_cfg["in_file"])
|
||
target_file_path = os.path.join(current_dir, target_dir, file_cfg["out_file"])
|
||
sheet_name = file_cfg["sheet_name"]
|
||
|
||
# 读取XLSX文件
|
||
workbook = openpyxl.load_workbook(source_file_path)
|
||
sheet = workbook[sheet_name] if sheet_name else workbook.active
|
||
|
||
# 将数据写入TXT文件
|
||
with open(target_file_path, 'w', encoding='utf-8') as txt_file:
|
||
header = [str(cell.value) for cell in sheet[1] if cell.value not in fields_to_remove]
|
||
txt_file.write('#\t 界面配置表\n')
|
||
i = 0
|
||
for row in sheet.iter_rows(values_only=True):
|
||
i += 1
|
||
if i < 3 :
|
||
txt_file.write('#\t')
|
||
else:
|
||
txt_file.write('\t')
|
||
if i == 2:
|
||
txt_file.write('\t'.join(file_cfg["coloum_type"]) + '\n')
|
||
txt_file.write('#\t')
|
||
row_data = [str(cell) if cell is not None else "" for cell in row if cell not in fields_to_remove]
|
||
txt_file.write('\t'.join(row_data) + '\n')
|
||
|
||
print(f"Converted: {source_file_path} to {target_file_path}")
|
||
|
||
# Post-move/copy selected files if configured
|
||
if post_move_cfg and post_move_cfg.get('enabled', False):
|
||
mode = (post_move_cfg.get('mode') or 'move').lower() # 'move' or 'copy'
|
||
files = post_move_cfg.get('files') or []
|
||
dest_dir_cfg = post_move_cfg.get('dest_dir') or ''
|
||
|
||
if not dest_dir_cfg:
|
||
print('[post_move] dest_dir 未配置,跳过后续移动/拷贝步骤')
|
||
else:
|
||
dest_dir_abs = dest_dir_cfg if os.path.isabs(dest_dir_cfg) else os.path.join(current_dir, dest_dir_cfg)
|
||
os.makedirs(dest_dir_abs, exist_ok=True)
|
||
|
||
moved_count = 0
|
||
for fname in files:
|
||
src_path = os.path.join(current_dir, target_dir, fname)
|
||
dst_path = os.path.join(dest_dir_abs, fname)
|
||
|
||
if not os.path.isfile(src_path):
|
||
print(f"[post_move] 源文件不存在,跳过: {src_path}")
|
||
continue
|
||
|
||
# 覆盖目标
|
||
if os.path.exists(dst_path):
|
||
try:
|
||
if os.path.isdir(dst_path):
|
||
# 防御:如果是目录,跳过以避免误删
|
||
print(f"[post_move] 目标路径是目录,跳过: {dst_path}")
|
||
continue
|
||
os.remove(dst_path)
|
||
except Exception as e:
|
||
print(f"[post_move] 覆盖目标文件失败: {dst_path}, 错误: {e}")
|
||
continue
|
||
|
||
try:
|
||
if mode == 'copy':
|
||
shutil.copy2(src_path, dst_path)
|
||
else:
|
||
# 默认 move:为了兼容不同文件系统,使用 copy2 + 删除 源
|
||
shutil.copy2(src_path, dst_path)
|
||
try:
|
||
os.remove(src_path)
|
||
except Exception as e:
|
||
print(f"[post_move] 删除源文件失败(已完成拷贝): {src_path}, 错误: {e}")
|
||
moved_count += 1
|
||
print(f"[post_move] {mode} 完成: {src_path} -> {dst_path}")
|
||
except Exception as e:
|
||
print(f"[post_move] {mode} 失败: {src_path} -> {dst_path}, 错误: {e}")
|
||
|
||
print(f"[post_move] 共处理 {moved_count}/{len(files)} 个文件,目标目录: {dest_dir_abs}") |