#!/usr/bin/env python3 """ PostgreSQL 压缩备份到 S3 一体化脚本 支持自动生成配置模板、环境变量覆盖、错误重试、进度显示 """ import os import subprocess import boto3 from botocore.exceptions import ClientError from datetime import datetime, timedelta import logging import gzip import shutil import yaml from dataclasses import dataclass from typing import List, NoReturn import argparse from tqdm import tqdm from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type # ------------------------- # 配置数据类与默认值 # ------------------------- @dataclass class BackupConfig: db_name: str = "your_database_name" db_user: str = "your_db_user" db_password: str = "your_db_password" s3_endpoint: str = "https://s3.your-provider.com" s3_access_key: str = "your_access_key" s3_secret_key: str = "your_secret_key" s3_bucket: str = "your-bucket-name" backup_dir: str = "/var/lib/pg_backup" compress_level: int = 6 keep_days: int = 7 log_file: str = "/var/log/pg_backup.log" pg_host: str = "localhost" pg_port: int = 5432 use_ssl: bool = False # ------------------------- # 配置文件管理 # ------------------------- def create_config_template(config_path: str) -> None: """生成配置模板文件""" default_config = BackupConfig() template = f"""# PostgreSQL 备份到 S3 配置文件 # 敏感信息建议通过环境变量设置(优先级高于文件配置) # 环境变量:PG_DB_PASSWORD, S3_ACCESS_KEY, S3_SECRET_KEY db_name: {default_config.db_name} # 数据库名称 db_user: {default_config.db_user} # 数据库用户 # db_password: {default_config.db_password} # 数据库密码(推荐通过环境变量设置) s3_endpoint: {default_config.s3_endpoint} # S3 端点(例如:https://s3.example.com) s3_access_key: {default_config.s3_access_key} # S3 Access Key # s3_secret_key: {default_config.s3_secret_key} # S3 Secret Key(推荐通过环境变量设置) s3_bucket: {default_config.s3_bucket} # S3 存储桶名称 backup_dir: {default_config.backup_dir} # 本地备份存储目录(需可写) keep_days: {default_config.keep_days} # 保留天数(删除超过天数的备份) pg_host: {default_config.pg_host} # 数据库主机(默认localhost) pg_port: {default_config.pg_port} # 数据库端口(默认5432) use_ssl: {default_config.use_ssl} # 是否启用SSL连接(默认false) log_file: {default_config.log_file} # 日志文件路径 compress_level: {default_config.compress_level} # 压缩级别(0-9,默认6) """ with open(config_path, "w") as f: f.write(template) print(f"已生成配置模板到 {config_path},请修改后重新运行") def load_or_create_config(config_path: str) -> BackupConfig: """加载配置文件,不存在则生成模板""" if not os.path.exists(config_path): create_config_template(config_path) raise SystemExit(0) with open(config_path, "r") as f: cfg = yaml.safe_load(f) # 环境变量覆盖敏感信息 env_override = { "db_password": os.getenv("PG_DB_PASSWORD"), "s3_access_key": os.getenv("S3_ACCESS_KEY"), "s3_secret_key": os.getenv("S3_SECRET_KEY") } for key, value in env_override.items(): if value: cfg[key] = value return BackupConfig(**cfg) # ------------------------- # 日志初始化 # ------------------------- def setup_logger(log_file: str) -> logging.Logger: logger = logging.getLogger("PGBackup") logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') # 控制台和文件日志 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_handler.setFormatter(formatter) file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) logger.addHandler(console_handler) logger.addHandler(file_handler) return logger # ------------------------- # 核心功能类 # ------------------------- class BackupManager: def __init__(self, config: BackupConfig): self.config = config self.logger = setup_logger(config.log_file) self.s3_client = self._init_s3_client() def _init_s3_client(self): return boto3.client( 's3', endpoint_url=self.config.s3_endpoint, aws_access_key_id=self.config.s3_access_key, aws_secret_access_key=self.config.s3_secret_key, region_name='cn-sy1', config=boto3.session.Config(signature_version='s3v4') ) def check_prerequisites(self) -> bool: try: os.makedirs(self.config.backup_dir, exist_ok=True) subprocess.run(["pg_dump", "--version"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return True except Exception as e: self.logger.error(f"前置检查失败: {str(e)}") return False def create_compressed_backup(self) -> str: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") base_name = f"{self.config.db_name}_backup_{timestamp}" sql_path = os.path.join(self.config.backup_dir, base_name + ".sql") gz_path = sql_path + ".gz" try: env = os.environ.copy() env["PGPASSWORD"] = self.config.db_password pg_cmd = [ "pg_dump", f"-h={self.config.pg_host}", f"-p={self.config.pg_port}", f"-U={self.config.db_user}", f"-d={self.config.db_name}", f"-f={sql_path}" ] if self.config.use_ssl: pg_cmd.append("--ssl-mode=require") result = subprocess.run(pg_cmd, env=env, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"pg_dump失败: {result.stderr.strip()}") with open(sql_path, "rb") as f_in, gzip.open(gz_path, "wb", self.config.compress_level) as f_out: shutil.copyfileobj(f_in, f_out) os.remove(sql_path) return gz_path except Exception as e: self._cleanup_temp_files([sql_path, gz_path]) self.logger.error(f"备份创建失败: {str(e)}") raise @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=2, max=10), retry=retry_if_exception_type(ClientError)) def upload_to_s3(self, file_path: str) -> None: file_name = os.path.basename(file_path) transfer_config = TransferConfig(multipart_threshold=10*1024**2, max_concurrency=10) with open(file_path, "rb") as f, tqdm( total=os.path.getsize(file_path), unit="B", unit_scale=True, desc=f"上传 {file_name}", leave=False ) as pbar: def progress(bytes_transferred): pbar.update(bytes_transferred) self.s3_client.upload_fileobj(f, self.config.s3_bucket, file_name, Config=transfer_config, Callback=progress) def clean_old_backups(self) -> None: cutoff = datetime.now() - timedelta(days=self.config.keep_days) for file in os.listdir(self.config.backup_dir): if file.endswith(".gz"): path = os.path.join(self.config.backup_dir, file) mtime = datetime.fromtimestamp(os.path.getmtime(path)) if mtime < cutoff: try: os.remove(path) self.logger.info(f"删除过期备份: {file}") except Exception as e: self.logger.warning(f"删除失败: {file} - {str(e)}") @staticmethod def _cleanup_temp_files(files: List[str]) -> None: for f in files: if os.path.exists(f): try: os.remove(f) except: pass # ------------------------- # 命令行接口 # ------------------------- def parse_arguments() -> argparse.Namespace: parser = argparse.ArgumentParser(description="PostgreSQL 备份到 S3 工具") parser.add_argument("-c", "--config", default="pg_backup.yaml", help="配置文件路径(默认: pg_backup.yaml)") return parser.parse_args() # ------------------------- # 主流程 # ------------------------- def main(): args = parse_arguments() config_path = args.config try: config = load_or_create_config(config_path) manager = BackupManager(config) print("[*] 开始备份流程") if not manager.check_prerequisites(): raise SystemExit("[!] 前置检查失败,流程终止") backup_path = manager.create_compressed_backup() manager.upload_to_s3(backup_path) manager.clean_old_backups() os.remove(backup_path) # 清理本地文件 print("\n[✓] 备份流程成功完成") except SystemExit: raise except Exception as e: manager.logger.error(f"备份失败: {str(e)}", exc_info=True) print(f"\n[!] 备份失败: {str(e)}") raise SystemExit(1) if __name__ == "__main__": main()