devops/script.py
2025-07-09 10:50:47 +08:00

433 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据库备份和清空脚本
功能:
1. 备份MySQL数据库
2. 清空MySQL数据库
3. 清空Redis数据库
"""
import os
import sys
import subprocess
import datetime
import logging
import redis
import pymysql
from pathlib import Path
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('script.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 数据库配置
MYSQL_CONFIG = {
'host': '172.20.0.5',
'port': 3306,
'user': 'root',
'password': 'root',
'charset': 'utf8mb4'
}
REDIS_CONFIG = {
'host': '172.20.0.6',
'port': 6379,
'password': None, # 如果有密码请设置
'db': 0
}
# 备份目录
BACKUP_DIR = './backups'
class DatabaseManager:
def __init__(self):
self.mysql_conn = None
self.redis_conn = None
self.backup_dir = Path(BACKUP_DIR)
self.backup_dir.mkdir(exist_ok=True)
def connect_mysql(self):
"""连接MySQL数据库"""
try:
self.mysql_conn = pymysql.connect(**MYSQL_CONFIG)
logger.info("MySQL连接成功")
return True
except Exception as e:
logger.error(f"MySQL连接失败: {e}")
return False
def connect_redis(self):
"""连接Redis数据库"""
try:
self.redis_conn = redis.Redis(**REDIS_CONFIG)
self.redis_conn.ping()
logger.info("Redis连接成功")
return True
except Exception as e:
logger.error(f"Redis连接失败: {e}")
return False
def get_mysql_databases(self):
"""获取所有MySQL数据库列表排除系统数据库"""
try:
cursor = self.mysql_conn.cursor()
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
# 排除系统数据库
system_dbs = ['information_schema', 'performance_schema', 'mysql', 'sys']
user_dbs = [db for db in databases if db not in system_dbs]
cursor.close()
return user_dbs
except Exception as e:
logger.error(f"获取数据库列表失败: {e}")
return []
def backup_mysql_database(self, database_name):
"""备份指定的MySQL数据库"""
try:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = self.backup_dir / f"{database_name}_{timestamp}.sql"
# 确保备份目录存在
backup_file.parent.mkdir(parents=True, exist_ok=True)
# 使用mysqldump命令备份
cmd = [
'mysqldump',
f'--host={MYSQL_CONFIG["host"]}',
f'--port={MYSQL_CONFIG["port"]}',
f'--user={MYSQL_CONFIG["user"]}',
f'--password={MYSQL_CONFIG["password"]}',
'--single-transaction',
'--routines',
'--triggers',
database_name
]
# 创建备份文件(如果不存在会自动创建)
logger.info(f"准备备份数据库 {database_name} 到文件: {backup_file}")
with open(backup_file, 'w', encoding='utf-8') as f:
result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
# 检查备份文件是否创建成功并且有内容
if backup_file.exists() and backup_file.stat().st_size > 0:
logger.info(f"数据库 {database_name} 备份成功: {backup_file} (大小: {backup_file.stat().st_size} 字节)")
return str(backup_file)
else:
logger.error(f"数据库 {database_name} 备份文件创建失败或为空")
return None
else:
logger.error(f"数据库 {database_name} 备份失败: {result.stderr}")
# 如果备份失败,删除可能创建的空文件
if backup_file.exists():
backup_file.unlink()
return None
except Exception as e:
logger.error(f"备份数据库 {database_name} 时发生错误: {e}")
return None
def backup_all_mysql_databases(self):
"""备份所有用户数据库"""
databases = self.get_mysql_databases()
if not databases:
logger.warning("没有找到用户数据库")
return []
backup_files = []
for db in databases:
backup_file = self.backup_mysql_database(db)
if backup_file:
backup_files.append(backup_file)
return backup_files
def clear_mysql_database(self, database_name):
"""清空指定MySQL数据库的所有表"""
try:
cursor = self.mysql_conn.cursor()
# 使用数据库
cursor.execute(f"USE `{database_name}`")
# 禁用外键检查
cursor.execute("SET FOREIGN_KEY_CHECKS = 0")
# 获取所有表
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
# 清空所有表
for table in tables:
cursor.execute(f"TRUNCATE TABLE `{table}`")
logger.info(f"已清空表: {database_name}.{table}")
# 重新启用外键检查
cursor.execute("SET FOREIGN_KEY_CHECKS = 1")
self.mysql_conn.commit()
cursor.close()
logger.info(f"数据库 {database_name} 所有表已清空")
return True
except Exception as e:
logger.error(f"清空数据库 {database_name} 失败: {e}")
return False
def clear_all_mysql_databases(self):
"""清空所有用户数据库"""
databases = self.get_mysql_databases()
if not databases:
logger.warning("没有找到用户数据库")
return
for db in databases:
self.clear_mysql_database(db)
def backup_redis_database(self):
"""备份Redis数据库"""
try:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = self.backup_dir / f"redis_{timestamp}.rdb"
# 确保备份目录存在
backup_file.parent.mkdir(parents=True, exist_ok=True)
# 执行BGSAVE命令
self.redis_conn.bgsave()
logger.info("Redis后台保存已启动")
# 等待保存完成
import time
initial_lastsave = self.redis_conn.lastsave()
logger.info("等待Redis数据保存完成...")
# 等待BGSAVE完成最多等待30秒
timeout = 30
start_time = time.time()
while time.time() - start_time < timeout:
current_lastsave = self.redis_conn.lastsave()
if current_lastsave > initial_lastsave:
logger.info("Redis数据保存完成")
break
time.sleep(1)
else:
logger.warning("Redis保存超时但继续尝试备份")
# 尝试获取Redis RDB文件路径
try:
config_info = self.redis_conn.config_get('dir')
redis_dir = config_info.get('dir', './') if config_info else './'
dbfilename_info = self.redis_conn.config_get('dbfilename')
dbfilename = dbfilename_info.get('dbfilename', 'dump.rdb') if dbfilename_info else 'dump.rdb'
redis_rdb_path = os.path.join(redis_dir, dbfilename)
logger.info(f"Redis RDB文件路径: {redis_rdb_path}")
except:
# 如果无法获取配置,使用默认路径
redis_rdb_path = "./dump.rdb"
logger.warning(f"无法获取Redis配置使用默认路径: {redis_rdb_path}")
# 复制RDB文件
if os.path.exists(redis_rdb_path):
import shutil
# 确保目标文件目录存在
backup_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(redis_rdb_path, backup_file)
if backup_file.exists() and backup_file.stat().st_size > 0:
logger.info(f"Redis数据备份成功: {backup_file} (大小: {backup_file.stat().st_size} 字节)")
return str(backup_file)
else:
logger.error("Redis备份文件创建失败或为空")
return None
else:
logger.warning(f"找不到Redis RDB文件: {redis_rdb_path}")
# 尝试创建一个简单的Redis数据导出
return self._create_redis_text_backup(backup_file)
except Exception as e:
logger.error(f"备份Redis数据失败: {e}")
return None
def _create_redis_text_backup(self, backup_file):
"""创建Redis文本格式备份"""
try:
text_backup_file = backup_file.with_suffix('.txt')
# 确保目录存在
text_backup_file.parent.mkdir(parents=True, exist_ok=True)
logger.info(f"创建Redis文本格式备份: {text_backup_file}")
with open(text_backup_file, 'w', encoding='utf-8') as f:
f.write("# Redis数据导出\n")
f.write(f"# 导出时间: {datetime.datetime.now()}\n\n")
# 获取所有key
keys = self.redis_conn.keys('*')
total_keys = len(keys)
if total_keys == 0:
f.write("# Redis数据库为空\n")
logger.info("Redis数据库为空创建空备份文件")
else:
f.write(f"# 总共 {total_keys} 个键\n\n")
logger.info(f"正在导出 {total_keys} 个Redis键...")
for i, key in enumerate(keys, 1):
try:
key_str = key.decode('utf-8') if isinstance(key, bytes) else str(key)
key_type = self.redis_conn.type(key).decode('utf-8')
f.write(f"# Key {i}/{total_keys}: {key_str} (类型: {key_type})\n")
if key_type == 'string':
value = self.redis_conn.get(key)
if isinstance(value, bytes):
try:
value = value.decode('utf-8')
except:
value = str(value)
f.write(f"SET \"{key_str}\" \"{value}\"\n")
elif key_type == 'hash':
hash_data = self.redis_conn.hgetall(key)
for field, value in hash_data.items():
if isinstance(field, bytes):
field = field.decode('utf-8')
if isinstance(value, bytes):
value = value.decode('utf-8')
f.write(f"HSET \"{key_str}\" \"{field}\" \"{value}\"\n")
f.write("\n")
except Exception as key_error:
f.write(f"# 错误处理键 {key}: {key_error}\n\n")
continue
if text_backup_file.exists():
logger.info(f"Redis文本备份创建成功: {text_backup_file}")
return str(text_backup_file)
else:
logger.error("Redis文本备份文件创建失败")
return None
except Exception as e:
logger.error(f"创建Redis文本备份失败: {e}")
return None
def clear_redis_database(self):
"""清空Redis数据库"""
try:
# 获取当前数据库的key数量
key_count = self.redis_conn.dbsize()
logger.info(f"Redis数据库当前有 {key_count} 个key")
if key_count > 0:
# 清空当前数据库
self.redis_conn.flushdb()
logger.info("Redis数据库已清空")
else:
logger.info("Redis数据库已经是空的")
return True
except Exception as e:
logger.error(f"清空Redis数据库失败: {e}")
return False
def close_connections(self):
"""关闭数据库连接"""
if self.mysql_conn:
self.mysql_conn.close()
logger.info("MySQL连接已关闭")
if self.redis_conn:
self.redis_conn.close()
logger.info("Redis连接已关闭")
def main():
"""主函数"""
logger.info("开始执行数据库备份和清空脚本")
db_manager = DatabaseManager()
try:
# 连接数据库
mysql_connected = db_manager.connect_mysql()
redis_connected = db_manager.connect_redis()
if not mysql_connected and not redis_connected:
logger.error("无法连接到任何数据库,脚本退出")
return False
# 1. 备份MySQL数据库
if mysql_connected:
logger.info("开始备份MySQL数据库...")
backup_files = db_manager.backup_all_mysql_databases()
if backup_files:
logger.info(f"MySQL备份完成共备份 {len(backup_files)} 个数据库")
else:
logger.warning("MySQL备份失败或没有数据库需要备份")
# 2. 备份Redis数据库
if redis_connected:
logger.info("开始备份Redis数据库...")
redis_backup = db_manager.backup_redis_database()
if redis_backup:
logger.info("Redis备份完成")
# 确认是否继续清空数据库
confirm = input("备份完成,是否继续清空数据库?(y/N): ")
if confirm.lower() != 'y':
logger.info("用户取消清空操作")
return True
# 3. 清空MySQL数据库
if mysql_connected:
logger.info("开始清空MySQL数据库...")
db_manager.clear_all_mysql_databases()
logger.info("MySQL数据库清空完成")
# 4. 清空Redis数据库
if redis_connected:
logger.info("开始清空Redis数据库...")
if db_manager.clear_redis_database():
logger.info("Redis数据库清空完成")
logger.info("所有操作完成")
return True
except KeyboardInterrupt:
logger.info("用户中断了脚本执行")
return False
except Exception as e:
logger.error(f"脚本执行过程中发生错误: {e}")
return False
finally:
db_manager.close_connections()
if __name__ == "__main__":
# 检查依赖包
required_packages = ['pymysql', 'redis']
missing_packages = []
for package in required_packages:
try:
__import__(package)
except ImportError:
missing_packages.append(package)
if missing_packages:
print(f"缺少依赖包: {', '.join(missing_packages)}")
print(f"请运行: pip install {' '.join(missing_packages)}")
sys.exit(1)
success = main()
sys.exit(0 if success else 1)