#!/usr/bin/env python3 #推荐自行搭配定时执行工具使用--测试测试 #此脚本功能为pg数据库备份至s3 #使用前请自行修改配置信息(请自行安装py环境及依赖) #使用方法: #1. 配置信息 #2. 检查前置条件 #3. 创建压缩备份 #4. 上传备份文件到S3 #5. 清理旧备份 #6. 日志记录 #7. 异常处理 # 配置信息 import os import subprocess import boto3 from botocore.exceptions import ClientError from datetime import datetime import logging import gzip import shutil from boto3.s3.transfer import TransferConfig # 配置信息 请自行修改 DB_NAME = 'database_name' DB_USER = 'database_user' DB_PASSWORD = 'database_password' S3_ENDPOINT = '你的存储桶域端点' S3_ACCESS_KEY = '你的存储桶访问ACCESS_KEY' S3_SECRET_KEY = '你的存储桶访问SECRET_KEY' S3_BUCKET = '你的存储桶名称' BACKUP_DIR = '/tmp/pg_backups' # 备份文件存储目录 COMPRESS_LEVEL = 6 # 压缩级别 (0-9), 0为不压缩, 9为最大压缩,不懂不要修改 # 日志设置 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ logging.StreamHandler(), logging.FileHandler('/var/log/pg_backup_compressed.log') ] ) logger = logging.getLogger('PG_Backup_Compressed') def print_step(message): print(f"→ {message}") def check_prerequisites(): """检查前置条件""" try: os.makedirs(BACKUP_DIR, exist_ok=True) test_file = os.path.join(BACKUP_DIR, '.test') with open(test_file, 'w') as f: f.write('test') os.remove(test_file) subprocess.run(['pg_dump', '--version'], check=True, capture_output=True) return True except Exception as e: logger.error(f"前置条件检查失败: {str(e)}") return False def create_compressed_backup(): """创建压缩备份""" timestamp = datetime.now().strftime("%m%d%H%M") sql_file = os.path.join(BACKUP_DIR, f"{DB_NAME}_backup_{timestamp}.sql") gz_file = f"{sql_file}.gz" try: print_step("正在执行pg_dump...") env = os.environ.copy() env['PGPASSWORD'] = DB_PASSWORD cmd = [ 'pg_dump', '-U', DB_USER, '-h', 'localhost', '-d', DB_NAME, '-f', sql_file ] result = subprocess.run( cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) if result.returncode != 0: raise Exception(f"pg_dump失败: {result.stderr.strip()}") if not os.path.exists(sql_file): raise Exception("SQL文件未生成") print_step("正在压缩备份文件...") with open(sql_file, 'rb') as f_in: with gzip.open(gz_file, 'wb', compresslevel=COMPRESS_LEVEL) as f_out: shutil.copyfileobj(f_in, f_out) os.remove(sql_file) return gz_file except Exception as e: for f in [sql_file, gz_file]: if os.path.exists(f): try: os.remove(f) except: pass raise class ProgressPercentage: """上传进度显示""" def __init__(self, filename): self._filename = filename self._size = float(os.path.getsize(filename)) self._seen_so_far = 0 def __call__(self, bytes_amount): self._seen_so_far += bytes_amount percentage = (self._seen_so_far / self._size) * 100 print(f"\r 上传进度: {percentage:.2f}% ({self._seen_so_far/1024/1024:.2f}MB)", end='') def upload_to_s3(file_path): """上传到S3""" try: s3 = boto3.client( 's3', endpoint_url=S3_ENDPOINT, aws_access_key_id=S3_ACCESS_KEY, aws_secret_access_key=S3_SECRET_KEY, region_name='cn-sy1', config=boto3.session.Config( signature_version='s3v4' ) ) transfer_config = TransferConfig( multipart_threshold=1024*25, max_concurrency=10, multipart_chunksize=1024*25, use_threads=True ) file_name = os.path.basename(file_path) print_step(f"正在上传 {file_name}...") s3.upload_file( file_path, S3_BUCKET, file_name, Config=transfer_config, Callback=ProgressPercentage(file_path) ) return True except Exception as e: raise def main(): print("\n" + "="*50) print("PostgreSQL 压缩备份脚本") print("="*50 + "\n") try: if not check_prerequisites(): raise Exception("前置条件检查未通过") backup_file = create_compressed_backup() if upload_to_s3(backup_file): os.remove(backup_file) print_step("上传成功,已清理本地文件") except Exception as e: logger.error(f"备份失败: {str(e)}") print_step(f"[错误] {str(e)}") finally: print("\n[操作完成]") if __name__ == "__main__": main()