#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 数据库备份和清空脚本 功能: 1. 备份MySQL数据库(支持指定特定数据库) 2. 清空MySQL数据库(支持指定特定数据库) 3. 清空Redis数据库 """ import os import sys import subprocess import datetime import logging import redis import pymysql from pathlib import Path # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('script.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 数据库配置 MYSQL_CONFIG = { 'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'password': 'Xijing1!', 'charset': 'utf8mb4' } REDIS_CONFIG = { 'host': '127.0.0.1', 'port': 6379, 'password': None, # 如果有密码请设置 'db': 0 } # 备份目录 BACKUP_DIR = './backups' # 指定要操作的数据库列表(为空表示操作所有用户数据库) TARGET_DATABASES = ["merge_pet_1"] # 例如: ['database1', 'database2'] class DatabaseManager: def __init__(self, target_databases=None): self.mysql_conn = None self.redis_conn = None self.backup_dir = Path(BACKUP_DIR) self.backup_dir.mkdir(exist_ok=True) self.target_databases = target_databases or [] def connect_mysql(self): """连接MySQL数据库""" try: self.mysql_conn = pymysql.connect(**MYSQL_CONFIG) logger.info("MySQL连接成功") return True except Exception as e: logger.error(f"MySQL连接失败: {e}") return False def connect_redis(self): """连接Redis数据库""" try: self.redis_conn = redis.Redis(**REDIS_CONFIG) self.redis_conn.ping() logger.info("Redis连接成功") return True except Exception as e: logger.error(f"Redis连接失败: {e}") return False def get_mysql_databases(self): """获取所有MySQL数据库列表(排除系统数据库)""" try: cursor = self.mysql_conn.cursor() cursor.execute("SHOW DATABASES") databases = [db[0] for db in cursor.fetchall()] # 排除系统数据库 system_dbs = ['information_schema', 'performance_schema', 'mysql', 'sys'] user_dbs = [db for db in databases if db not in system_dbs] cursor.close() return user_dbs except Exception as e: logger.error(f"获取数据库列表失败: {e}") return [] def backup_mysql_database(self, database_name): """备份指定的MySQL数据库""" try: timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") backup_file = self.backup_dir / f"{database_name}_{timestamp}.sql" # 确保备份目录存在 backup_file.parent.mkdir(parents=True, exist_ok=True) # 使用mysqldump命令备份 cmd = [ 'mysqldump', f'--host={MYSQL_CONFIG["host"]}', f'--port={MYSQL_CONFIG["port"]}', f'--user={MYSQL_CONFIG["user"]}', f'--password={MYSQL_CONFIG["password"]}', '--single-transaction', '--routines', '--triggers', database_name ] # 创建备份文件(如果不存在会自动创建) logger.info(f"准备备份数据库 {database_name} 到文件: {backup_file}") with open(backup_file, 'w', encoding='utf-8') as f: result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE) if result.returncode == 0: # 检查备份文件是否创建成功并且有内容 if backup_file.exists() and backup_file.stat().st_size > 0: logger.info(f"数据库 {database_name} 备份成功: {backup_file} (大小: {backup_file.stat().st_size} 字节)") return str(backup_file) else: logger.error(f"数据库 {database_name} 备份文件创建失败或为空") return None else: logger.error(f"数据库 {database_name} 备份失败: {result.stderr}") # 如果备份失败,删除可能创建的空文件 if backup_file.exists(): backup_file.unlink() return None except Exception as e: logger.error(f"备份数据库 {database_name} 时发生错误: {e}") return None def backup_all_mysql_databases(self): """备份所有用户数据库""" databases = self.get_mysql_databases() if not databases: logger.warning("没有找到用户数据库") return [] backup_files = [] for db in databases: backup_file = self.backup_mysql_database(db) if backup_file: backup_files.append(backup_file) return backup_files def get_target_databases(self): """获取要操作的数据库列表""" all_databases = self.get_mysql_databases() if not all_databases: logger.warning("没有找到用户数据库") return [] if self.target_databases: # 验证指定的数据库是否存在 valid_databases = [] for db in self.target_databases: if db in all_databases: valid_databases.append(db) else: logger.warning(f"指定的数据库 '{db}' 不存在") return valid_databases else: # 如果没有指定,让用户选择 return self.select_databases_interactively(all_databases) def select_databases_interactively(self, all_databases): """交互式选择数据库""" print("\n可用的数据库列表:") for i, db in enumerate(all_databases, 1): print(f"{i}. {db}") print(f"{len(all_databases) + 1}. 所有数据库") print("0. 退出") while True: try: choice = input("\n请选择要操作的数据库(输入数字,多个数字用逗号分隔): ").strip() if choice == '0': logger.info("用户选择退出") return [] if choice == str(len(all_databases) + 1): logger.info("用户选择操作所有数据库") return all_databases # 解析用户输入的数字 selected_indices = [int(x.strip()) for x in choice.split(',') if x.strip()] selected_databases = [] for index in selected_indices: if 1 <= index <= len(all_databases): selected_databases.append(all_databases[index - 1]) else: print(f"无效的选择: {index}") continue if selected_databases: logger.info(f"用户选择的数据库: {', '.join(selected_databases)}") return selected_databases else: print("没有有效的选择,请重新输入") except ValueError: print("输入格式错误,请输入数字") except KeyboardInterrupt: logger.info("用户中断操作") return [] def backup_selected_mysql_databases(self): """备份选定的MySQL数据库""" databases = self.get_target_databases() if not databases: logger.warning("没有选择要备份的数据库") return [] backup_files = [] for db in databases: logger.info(f"正在备份数据库: {db}") backup_file = self.backup_mysql_database(db) if backup_file: backup_files.append(backup_file) return backup_files def clear_mysql_database(self, database_name): """清空指定MySQL数据库的所有表""" try: cursor = self.mysql_conn.cursor() # 使用数据库 cursor.execute(f"USE `{database_name}`") # 禁用外键检查 cursor.execute("SET FOREIGN_KEY_CHECKS = 0") # 获取所有表 cursor.execute("SHOW TABLES") tables = [table[0] for table in cursor.fetchall()] # 清空所有表 for table in tables: cursor.execute(f"TRUNCATE TABLE `{table}`") logger.info(f"已清空表: {database_name}.{table}") # 重新启用外键检查 cursor.execute("SET FOREIGN_KEY_CHECKS = 1") self.mysql_conn.commit() cursor.close() logger.info(f"数据库 {database_name} 所有表已清空") return True except Exception as e: logger.error(f"清空数据库 {database_name} 失败: {e}") return False def clear_all_mysql_databases(self): """清空所有用户数据库""" databases = self.get_mysql_databases() if not databases: logger.warning("没有找到用户数据库") return for db in databases: self.clear_mysql_database(db) def clear_selected_mysql_databases(self): """清空选定的MySQL数据库""" databases = self.get_target_databases() if not databases: logger.warning("没有选择要清空的数据库") return # 确认清空操作 print(f"\n即将清空以下数据库: {', '.join(databases)}") confirm = input("确认要清空这些数据库吗?(y/N): ") if confirm.lower() != 'y': logger.info("用户取消清空操作") return for db in databases: logger.info(f"正在清空数据库: {db}") self.clear_mysql_database(db) def backup_redis_database(self): """备份Redis数据库""" try: timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") backup_file = self.backup_dir / f"redis_{timestamp}.rdb" # 确保备份目录存在 backup_file.parent.mkdir(parents=True, exist_ok=True) # 执行BGSAVE命令 self.redis_conn.bgsave() logger.info("Redis后台保存已启动") # 等待保存完成 import time initial_lastsave = self.redis_conn.lastsave() logger.info("等待Redis数据保存完成...") # 等待BGSAVE完成,最多等待30秒 timeout = 30 start_time = time.time() while time.time() - start_time < timeout: current_lastsave = self.redis_conn.lastsave() if current_lastsave > initial_lastsave: logger.info("Redis数据保存完成") break time.sleep(1) else: logger.warning("Redis保存超时,但继续尝试备份") # 尝试获取Redis RDB文件路径 try: config_info = self.redis_conn.config_get('dir') redis_dir = config_info.get('dir', './') if config_info else './' dbfilename_info = self.redis_conn.config_get('dbfilename') dbfilename = dbfilename_info.get('dbfilename', 'dump.rdb') if dbfilename_info else 'dump.rdb' redis_rdb_path = os.path.join(redis_dir, dbfilename) logger.info(f"Redis RDB文件路径: {redis_rdb_path}") except: # 如果无法获取配置,使用默认路径 redis_rdb_path = "./dump.rdb" logger.warning(f"无法获取Redis配置,使用默认路径: {redis_rdb_path}") # 复制RDB文件 if os.path.exists(redis_rdb_path): import shutil # 确保目标文件目录存在 backup_file.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(redis_rdb_path, backup_file) if backup_file.exists() and backup_file.stat().st_size > 0: logger.info(f"Redis数据备份成功: {backup_file} (大小: {backup_file.stat().st_size} 字节)") return str(backup_file) else: logger.error("Redis备份文件创建失败或为空") return None else: logger.warning(f"找不到Redis RDB文件: {redis_rdb_path}") # 尝试创建一个简单的Redis数据导出 return self._create_redis_text_backup(backup_file) except Exception as e: logger.error(f"备份Redis数据失败: {e}") return None def _create_redis_text_backup(self, backup_file): """创建Redis文本格式备份""" try: text_backup_file = backup_file.with_suffix('.txt') # 确保目录存在 text_backup_file.parent.mkdir(parents=True, exist_ok=True) logger.info(f"创建Redis文本格式备份: {text_backup_file}") with open(text_backup_file, 'w', encoding='utf-8') as f: f.write("# Redis数据导出\n") f.write(f"# 导出时间: {datetime.datetime.now()}\n\n") # 获取所有key keys = self.redis_conn.keys('*') total_keys = len(keys) if total_keys == 0: f.write("# Redis数据库为空\n") logger.info("Redis数据库为空,创建空备份文件") else: f.write(f"# 总共 {total_keys} 个键\n\n") logger.info(f"正在导出 {total_keys} 个Redis键...") for i, key in enumerate(keys, 1): try: key_str = key.decode('utf-8') if isinstance(key, bytes) else str(key) key_type = self.redis_conn.type(key).decode('utf-8') f.write(f"# Key {i}/{total_keys}: {key_str} (类型: {key_type})\n") if key_type == 'string': value = self.redis_conn.get(key) if isinstance(value, bytes): try: value = value.decode('utf-8') except: value = str(value) f.write(f"SET \"{key_str}\" \"{value}\"\n") elif key_type == 'hash': hash_data = self.redis_conn.hgetall(key) for field, value in hash_data.items(): if isinstance(field, bytes): field = field.decode('utf-8') if isinstance(value, bytes): value = value.decode('utf-8') f.write(f"HSET \"{key_str}\" \"{field}\" \"{value}\"\n") f.write("\n") except Exception as key_error: f.write(f"# 错误处理键 {key}: {key_error}\n\n") continue if text_backup_file.exists(): logger.info(f"Redis文本备份创建成功: {text_backup_file}") return str(text_backup_file) else: logger.error("Redis文本备份文件创建失败") return None except Exception as e: logger.error(f"创建Redis文本备份失败: {e}") return None def clear_redis_database(self): """清空Redis数据库""" try: # 获取当前数据库的key数量 key_count = self.redis_conn.dbsize() logger.info(f"Redis数据库当前有 {key_count} 个key") if key_count > 0: # 清空当前数据库 self.redis_conn.flushdb() logger.info("Redis数据库已清空") else: logger.info("Redis数据库已经是空的") return True except Exception as e: logger.error(f"清空Redis数据库失败: {e}") return False def close_connections(self): """关闭数据库连接""" if self.mysql_conn: self.mysql_conn.close() logger.info("MySQL连接已关闭") if self.redis_conn: self.redis_conn.close() logger.info("Redis连接已关闭") def select_operation_mode(): """选择操作模式""" print("\n请选择操作模式:") print("1. 仅备份数据库") print("2. 仅清空数据库") print("3. 备份后清空数据库") print("4. 备份和清空Redis") print("0. 退出") while True: try: choice = input("\n请输入选择 (0-4): ").strip() if choice in ['0', '1', '2', '3', '4']: return choice else: print("无效选择,请输入 0-4") except KeyboardInterrupt: return '0' def main(): """主函数""" logger.info("开始执行数据库备份和清空脚本") # 选择操作模式 mode = select_operation_mode() if mode == '0': logger.info("用户选择退出") return True # 从配置或用户交互获取目标数据库 target_dbs = TARGET_DATABASES if TARGET_DATABASES else None db_manager = DatabaseManager(target_databases=target_dbs) try: # 连接数据库 mysql_connected = db_manager.connect_mysql() redis_connected = db_manager.connect_redis() if mode == '4' else True if mode in ['1', '2', '3'] and not mysql_connected: logger.error("无法连接到MySQL数据库,脚本退出") return False if mode == '4' and not redis_connected: logger.error("无法连接到Redis数据库,脚本退出") return False # 执行相应操作 if mode == '1': # 仅备份 logger.info("开始备份MySQL数据库...") backup_files = db_manager.backup_selected_mysql_databases() if backup_files: logger.info(f"MySQL备份完成,共备份 {len(backup_files)} 个数据库") else: logger.warning("MySQL备份失败或没有数据库需要备份") elif mode == '2': # 仅清空 logger.info("开始清空MySQL数据库...") db_manager.clear_selected_mysql_databases() logger.info("MySQL数据库清空完成") elif mode == '3': # 备份后清空 # 1. 备份MySQL数据库 logger.info("开始备份MySQL数据库...") backup_files = db_manager.backup_selected_mysql_databases() if backup_files: logger.info(f"MySQL备份完成,共备份 {len(backup_files)} 个数据库") # 确认是否继续清空数据库 confirm = input("备份完成,是否继续清空数据库?(y/N): ") if confirm.lower() == 'y': logger.info("开始清空MySQL数据库...") # 重新使用相同的数据库选择 db_manager.target_databases = [os.path.basename(f).split('_')[0] for f in backup_files] db_manager.clear_selected_mysql_databases() logger.info("MySQL数据库清空完成") else: logger.info("用户取消清空操作") else: logger.warning("MySQL备份失败,跳过清空操作") elif mode == '4': # Redis操作 # 备份Redis logger.info("开始备份Redis数据库...") redis_backup = db_manager.backup_redis_database() if redis_backup: logger.info("Redis备份完成") # 确认是否清空Redis confirm = input("Redis备份完成,是否清空Redis数据库?(y/N): ") if confirm.lower() == 'y': logger.info("开始清空Redis数据库...") if db_manager.clear_redis_database(): logger.info("Redis数据库清空完成") else: logger.info("用户取消Redis清空操作") logger.info("所有操作完成") return True except KeyboardInterrupt: logger.info("用户中断了脚本执行") return False except Exception as e: logger.error(f"脚本执行过程中发生错误: {e}") return False finally: db_manager.close_connections() if __name__ == "__main__": # 检查依赖包 required_packages = ['pymysql', 'redis'] missing_packages = [] for package in required_packages: try: __import__(package) except ImportError: missing_packages.append(package) if missing_packages: print(f"缺少依赖包: {', '.join(missing_packages)}") print(f"请运行: pip install {' '.join(missing_packages)}") sys.exit(1) success = main() sys.exit(0 if success else 1)