devops/script/backup.py
2025-12-12 11:40:38 +08:00

583 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据库备份和清空脚本
功能:
1. 备份MySQL数据库支持指定特定数据库
2. 清空MySQL数据库支持指定特定数据库
3. 清空Redis数据库
"""
import os
import sys
import subprocess
import datetime
import logging
import redis
import pymysql
from pathlib import Path
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('script.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 数据库配置
MYSQL_CONFIG = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'password': 'Xijing1!',
'charset': 'utf8mb4'
}
REDIS_CONFIG = {
'host': '127.0.0.1',
'port': 6379,
'password': None, # 如果有密码请设置
'db': 0
}
# 备份目录
BACKUP_DIR = './backups'
# 指定要操作的数据库列表(为空表示操作所有用户数据库)
TARGET_DATABASES = ["merge_pet_1"] # 例如: ['database1', 'database2']
class DatabaseManager:
def __init__(self, target_databases=None):
self.mysql_conn = None
self.redis_conn = None
self.backup_dir = Path(BACKUP_DIR)
self.backup_dir.mkdir(exist_ok=True)
self.target_databases = target_databases or []
def connect_mysql(self):
"""连接MySQL数据库"""
try:
self.mysql_conn = pymysql.connect(**MYSQL_CONFIG)
logger.info("MySQL连接成功")
return True
except Exception as e:
logger.error(f"MySQL连接失败: {e}")
return False
def connect_redis(self):
"""连接Redis数据库"""
try:
self.redis_conn = redis.Redis(**REDIS_CONFIG)
self.redis_conn.ping()
logger.info("Redis连接成功")
return True
except Exception as e:
logger.error(f"Redis连接失败: {e}")
return False
def get_mysql_databases(self):
"""获取所有MySQL数据库列表排除系统数据库"""
try:
cursor = self.mysql_conn.cursor()
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
# 排除系统数据库
system_dbs = ['information_schema', 'performance_schema', 'mysql', 'sys']
user_dbs = [db for db in databases if db not in system_dbs]
cursor.close()
return user_dbs
except Exception as e:
logger.error(f"获取数据库列表失败: {e}")
return []
def backup_mysql_database(self, database_name):
"""备份指定的MySQL数据库"""
try:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = self.backup_dir / f"{database_name}_{timestamp}.sql"
# 确保备份目录存在
backup_file.parent.mkdir(parents=True, exist_ok=True)
# 使用mysqldump命令备份
cmd = [
'mysqldump',
f'--host={MYSQL_CONFIG["host"]}',
f'--port={MYSQL_CONFIG["port"]}',
f'--user={MYSQL_CONFIG["user"]}',
f'--password={MYSQL_CONFIG["password"]}',
'--single-transaction',
'--routines',
'--triggers',
database_name
]
# 创建备份文件(如果不存在会自动创建)
logger.info(f"准备备份数据库 {database_name} 到文件: {backup_file}")
with open(backup_file, 'w', encoding='utf-8') as f:
result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE)
if result.returncode == 0:
# 检查备份文件是否创建成功并且有内容
if backup_file.exists() and backup_file.stat().st_size > 0:
logger.info(f"数据库 {database_name} 备份成功: {backup_file} (大小: {backup_file.stat().st_size} 字节)")
return str(backup_file)
else:
logger.error(f"数据库 {database_name} 备份文件创建失败或为空")
return None
else:
logger.error(f"数据库 {database_name} 备份失败: {result.stderr}")
# 如果备份失败,删除可能创建的空文件
if backup_file.exists():
backup_file.unlink()
return None
except Exception as e:
logger.error(f"备份数据库 {database_name} 时发生错误: {e}")
return None
def backup_all_mysql_databases(self):
"""备份所有用户数据库"""
databases = self.get_mysql_databases()
if not databases:
logger.warning("没有找到用户数据库")
return []
backup_files = []
for db in databases:
backup_file = self.backup_mysql_database(db)
if backup_file:
backup_files.append(backup_file)
return backup_files
def get_target_databases(self):
"""获取要操作的数据库列表"""
all_databases = self.get_mysql_databases()
if not all_databases:
logger.warning("没有找到用户数据库")
return []
if self.target_databases:
# 验证指定的数据库是否存在
valid_databases = []
for db in self.target_databases:
if db in all_databases:
valid_databases.append(db)
else:
logger.warning(f"指定的数据库 '{db}' 不存在")
return valid_databases
else:
# 如果没有指定,让用户选择
return self.select_databases_interactively(all_databases)
def select_databases_interactively(self, all_databases):
"""交互式选择数据库"""
print("\n可用的数据库列表:")
for i, db in enumerate(all_databases, 1):
print(f"{i}. {db}")
print(f"{len(all_databases) + 1}. 所有数据库")
print("0. 退出")
while True:
try:
choice = input("\n请选择要操作的数据库(输入数字,多个数字用逗号分隔): ").strip()
if choice == '0':
logger.info("用户选择退出")
return []
if choice == str(len(all_databases) + 1):
logger.info("用户选择操作所有数据库")
return all_databases
# 解析用户输入的数字
selected_indices = [int(x.strip()) for x in choice.split(',') if x.strip()]
selected_databases = []
for index in selected_indices:
if 1 <= index <= len(all_databases):
selected_databases.append(all_databases[index - 1])
else:
print(f"无效的选择: {index}")
continue
if selected_databases:
logger.info(f"用户选择的数据库: {', '.join(selected_databases)}")
return selected_databases
else:
print("没有有效的选择,请重新输入")
except ValueError:
print("输入格式错误,请输入数字")
except KeyboardInterrupt:
logger.info("用户中断操作")
return []
def backup_selected_mysql_databases(self):
"""备份选定的MySQL数据库"""
databases = self.get_target_databases()
if not databases:
logger.warning("没有选择要备份的数据库")
return []
backup_files = []
for db in databases:
logger.info(f"正在备份数据库: {db}")
backup_file = self.backup_mysql_database(db)
if backup_file:
backup_files.append(backup_file)
return backup_files
def clear_mysql_database(self, database_name):
"""清空指定MySQL数据库的所有表"""
try:
cursor = self.mysql_conn.cursor()
# 使用数据库
cursor.execute(f"USE `{database_name}`")
# 禁用外键检查
cursor.execute("SET FOREIGN_KEY_CHECKS = 0")
# 获取所有表
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
# 清空所有表
for table in tables:
cursor.execute(f"TRUNCATE TABLE `{table}`")
logger.info(f"已清空表: {database_name}.{table}")
# 重新启用外键检查
cursor.execute("SET FOREIGN_KEY_CHECKS = 1")
self.mysql_conn.commit()
cursor.close()
logger.info(f"数据库 {database_name} 所有表已清空")
return True
except Exception as e:
logger.error(f"清空数据库 {database_name} 失败: {e}")
return False
def clear_all_mysql_databases(self):
"""清空所有用户数据库"""
databases = self.get_mysql_databases()
if not databases:
logger.warning("没有找到用户数据库")
return
for db in databases:
self.clear_mysql_database(db)
def clear_selected_mysql_databases(self):
"""清空选定的MySQL数据库"""
databases = self.get_target_databases()
if not databases:
logger.warning("没有选择要清空的数据库")
return
# 确认清空操作
print(f"\n即将清空以下数据库: {', '.join(databases)}")
confirm = input("确认要清空这些数据库吗?(y/N): ")
if confirm.lower() != 'y':
logger.info("用户取消清空操作")
return
for db in databases:
logger.info(f"正在清空数据库: {db}")
self.clear_mysql_database(db)
def backup_redis_database(self):
"""备份Redis数据库"""
try:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = self.backup_dir / f"redis_{timestamp}.rdb"
# 确保备份目录存在
backup_file.parent.mkdir(parents=True, exist_ok=True)
# 执行BGSAVE命令
self.redis_conn.bgsave()
logger.info("Redis后台保存已启动")
# 等待保存完成
import time
initial_lastsave = self.redis_conn.lastsave()
logger.info("等待Redis数据保存完成...")
# 等待BGSAVE完成最多等待30秒
timeout = 30
start_time = time.time()
while time.time() - start_time < timeout:
current_lastsave = self.redis_conn.lastsave()
if current_lastsave > initial_lastsave:
logger.info("Redis数据保存完成")
break
time.sleep(1)
else:
logger.warning("Redis保存超时但继续尝试备份")
# 尝试获取Redis RDB文件路径
try:
config_info = self.redis_conn.config_get('dir')
redis_dir = config_info.get('dir', './') if config_info else './'
dbfilename_info = self.redis_conn.config_get('dbfilename')
dbfilename = dbfilename_info.get('dbfilename', 'dump.rdb') if dbfilename_info else 'dump.rdb'
redis_rdb_path = os.path.join(redis_dir, dbfilename)
logger.info(f"Redis RDB文件路径: {redis_rdb_path}")
except:
# 如果无法获取配置,使用默认路径
redis_rdb_path = "./dump.rdb"
logger.warning(f"无法获取Redis配置使用默认路径: {redis_rdb_path}")
# 复制RDB文件
if os.path.exists(redis_rdb_path):
import shutil
# 确保目标文件目录存在
backup_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(redis_rdb_path, backup_file)
if backup_file.exists() and backup_file.stat().st_size > 0:
logger.info(f"Redis数据备份成功: {backup_file} (大小: {backup_file.stat().st_size} 字节)")
return str(backup_file)
else:
logger.error("Redis备份文件创建失败或为空")
return None
else:
logger.warning(f"找不到Redis RDB文件: {redis_rdb_path}")
# 尝试创建一个简单的Redis数据导出
return self._create_redis_text_backup(backup_file)
except Exception as e:
logger.error(f"备份Redis数据失败: {e}")
return None
def _create_redis_text_backup(self, backup_file):
"""创建Redis文本格式备份"""
try:
text_backup_file = backup_file.with_suffix('.txt')
# 确保目录存在
text_backup_file.parent.mkdir(parents=True, exist_ok=True)
logger.info(f"创建Redis文本格式备份: {text_backup_file}")
with open(text_backup_file, 'w', encoding='utf-8') as f:
f.write("# Redis数据导出\n")
f.write(f"# 导出时间: {datetime.datetime.now()}\n\n")
# 获取所有key
keys = self.redis_conn.keys('*')
total_keys = len(keys)
if total_keys == 0:
f.write("# Redis数据库为空\n")
logger.info("Redis数据库为空创建空备份文件")
else:
f.write(f"# 总共 {total_keys} 个键\n\n")
logger.info(f"正在导出 {total_keys} 个Redis键...")
for i, key in enumerate(keys, 1):
try:
key_str = key.decode('utf-8') if isinstance(key, bytes) else str(key)
key_type = self.redis_conn.type(key).decode('utf-8')
f.write(f"# Key {i}/{total_keys}: {key_str} (类型: {key_type})\n")
if key_type == 'string':
value = self.redis_conn.get(key)
if isinstance(value, bytes):
try:
value = value.decode('utf-8')
except:
value = str(value)
f.write(f"SET \"{key_str}\" \"{value}\"\n")
elif key_type == 'hash':
hash_data = self.redis_conn.hgetall(key)
for field, value in hash_data.items():
if isinstance(field, bytes):
field = field.decode('utf-8')
if isinstance(value, bytes):
value = value.decode('utf-8')
f.write(f"HSET \"{key_str}\" \"{field}\" \"{value}\"\n")
f.write("\n")
except Exception as key_error:
f.write(f"# 错误处理键 {key}: {key_error}\n\n")
continue
if text_backup_file.exists():
logger.info(f"Redis文本备份创建成功: {text_backup_file}")
return str(text_backup_file)
else:
logger.error("Redis文本备份文件创建失败")
return None
except Exception as e:
logger.error(f"创建Redis文本备份失败: {e}")
return None
def clear_redis_database(self):
"""清空Redis数据库"""
try:
# 获取当前数据库的key数量
key_count = self.redis_conn.dbsize()
logger.info(f"Redis数据库当前有 {key_count} 个key")
if key_count > 0:
# 清空当前数据库
self.redis_conn.flushdb()
logger.info("Redis数据库已清空")
else:
logger.info("Redis数据库已经是空的")
return True
except Exception as e:
logger.error(f"清空Redis数据库失败: {e}")
return False
def close_connections(self):
"""关闭数据库连接"""
if self.mysql_conn:
self.mysql_conn.close()
logger.info("MySQL连接已关闭")
if self.redis_conn:
self.redis_conn.close()
logger.info("Redis连接已关闭")
def select_operation_mode():
"""选择操作模式"""
print("\n请选择操作模式:")
print("1. 仅备份数据库")
print("2. 仅清空数据库")
print("3. 备份后清空数据库")
print("4. 备份和清空Redis")
print("0. 退出")
while True:
try:
choice = input("\n请输入选择 (0-4): ").strip()
if choice in ['0', '1', '2', '3', '4']:
return choice
else:
print("无效选择,请输入 0-4")
except KeyboardInterrupt:
return '0'
def main():
"""主函数"""
logger.info("开始执行数据库备份和清空脚本")
# 选择操作模式
mode = select_operation_mode()
if mode == '0':
logger.info("用户选择退出")
return True
# 从配置或用户交互获取目标数据库
target_dbs = TARGET_DATABASES if TARGET_DATABASES else None
db_manager = DatabaseManager(target_databases=target_dbs)
try:
# 连接数据库
mysql_connected = db_manager.connect_mysql()
redis_connected = db_manager.connect_redis() if mode == '4' else True
if mode in ['1', '2', '3'] and not mysql_connected:
logger.error("无法连接到MySQL数据库脚本退出")
return False
if mode == '4' and not redis_connected:
logger.error("无法连接到Redis数据库脚本退出")
return False
# 执行相应操作
if mode == '1': # 仅备份
logger.info("开始备份MySQL数据库...")
backup_files = db_manager.backup_selected_mysql_databases()
if backup_files:
logger.info(f"MySQL备份完成共备份 {len(backup_files)} 个数据库")
else:
logger.warning("MySQL备份失败或没有数据库需要备份")
elif mode == '2': # 仅清空
logger.info("开始清空MySQL数据库...")
db_manager.clear_selected_mysql_databases()
logger.info("MySQL数据库清空完成")
elif mode == '3': # 备份后清空
# 1. 备份MySQL数据库
logger.info("开始备份MySQL数据库...")
backup_files = db_manager.backup_selected_mysql_databases()
if backup_files:
logger.info(f"MySQL备份完成共备份 {len(backup_files)} 个数据库")
# 确认是否继续清空数据库
confirm = input("备份完成,是否继续清空数据库?(y/N): ")
if confirm.lower() == 'y':
logger.info("开始清空MySQL数据库...")
# 重新使用相同的数据库选择
db_manager.target_databases = [os.path.basename(f).split('_')[0] for f in backup_files]
db_manager.clear_selected_mysql_databases()
logger.info("MySQL数据库清空完成")
else:
logger.info("用户取消清空操作")
else:
logger.warning("MySQL备份失败跳过清空操作")
elif mode == '4': # Redis操作
# 备份Redis
logger.info("开始备份Redis数据库...")
redis_backup = db_manager.backup_redis_database()
if redis_backup:
logger.info("Redis备份完成")
# 确认是否清空Redis
confirm = input("Redis备份完成是否清空Redis数据库(y/N): ")
if confirm.lower() == 'y':
logger.info("开始清空Redis数据库...")
if db_manager.clear_redis_database():
logger.info("Redis数据库清空完成")
else:
logger.info("用户取消Redis清空操作")
logger.info("所有操作完成")
return True
except KeyboardInterrupt:
logger.info("用户中断了脚本执行")
return False
except Exception as e:
logger.error(f"脚本执行过程中发生错误: {e}")
return False
finally:
db_manager.close_connections()
if __name__ == "__main__":
# 检查依赖包
required_packages = ['pymysql', 'redis']
missing_packages = []
for package in required_packages:
try:
__import__(package)
except ImportError:
missing_packages.append(package)
if missing_packages:
print(f"缺少依赖包: {', '.join(missing_packages)}")
print(f"请运行: pip install {' '.join(missing_packages)}")
sys.exit(1)
success = main()
sys.exit(0 if success else 1)