Última atividade 1744609663

pg数据库自动备份恢复

buka5587 revisou este gist 1744609653. Ir para a revisão

1 file changed, 37 insertions, 5 deletions

pg_backup_s3.py

@@ -199,17 +199,42 @@ class BackupManager:
199 199 # 添加详细的日志记录
200 200 self.logger.debug(f"执行pg_dump命令: {' '.join(pg_cmd)}")
201 201
202 - result = subprocess.run(pg_cmd, env=env, capture_output=True, text=True)
202 + # 添加备份开始提示
203 + print(f"[*] 开始数据库备份: {self.config.db_name}")
204 +
205 + # 执行pg_dump并显示进度
206 + with tqdm(desc="数据库导出", unit="B", unit_scale=True) as pbar:
207 + def pg_dump_progress(line):
208 + if "Dumping" in line:
209 + pbar.set_postfix_str(line.strip())
210 + pbar.update(0) # 更新进度条但不增加计数
211 +
212 + result = subprocess.run(
213 + pg_cmd,
214 + env=env,
215 + capture_output=True,
216 + text=True,
217 + bufsize=1,
218 + universal_newlines=True
219 + )
220 +
203 221 if result.returncode != 0:
204 222 error_msg = result.stderr.strip()
205 223 self.logger.error(f"pg_dump详细错误: {error_msg}")
206 224 raise RuntimeError(f"pg_dump失败: {error_msg}")
207 225
208 - with open(sql_path, "rb") as f_in, gzip.open(gz_path, "wb", self.config.compress_level) as f_out:
209 - shutil.copyfileobj(f_in, f_out)
226 + # 添加压缩进度显示
227 + print(f"[*] 正在压缩备份文件: {os.path.basename(gz_path)}")
228 + file_size = os.path.getsize(sql_path)
229 + with tqdm(total=file_size, desc="压缩进度", unit="B", unit_scale=True) as pbar:
230 + with open(sql_path, "rb") as f_in, gzip.open(gz_path, "wb", self.config.compress_level) as f_out:
231 + shutil.copyfileobj(f_in, f_out, length=1024*1024) # 1MB chunks
232 + pbar.update(f_in.tell())
233 +
210 234 os.remove(sql_path)
235 + print(f"[✓] 备份文件已创建: {os.path.basename(gz_path)}")
211 236 return gz_path
212 -
237 +
213 238 except Exception as e:
214 239 self._cleanup_temp_files([sql_path, gz_path])
215 240 self.logger.error(f"备份创建失败: {str(e)}")
@@ -218,17 +243,24 @@ class BackupManager:
218 243 @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=2, max=10), retry=retry_if_exception_type(ClientError))
219 244 def upload_to_s3(self, file_path: str) -> None:
220 245 file_name = os.path.basename(file_path)
246 + print(f"[*] 开始上传到S3: {file_name}")
247 +
221 248 transfer_config = TransferConfig(multipart_threshold=10*1024**2, max_concurrency=10)
222 249
223 250 with open(file_path, "rb") as f, tqdm(
224 251 total=os.path.getsize(file_path),
225 - unit="B", unit_scale=True, desc=f"上传 {file_name}", leave=False
252 + unit="B", unit_scale=True,
253 + desc="上传进度",
254 + leave=True # 修改为True以保留进度条
226 255 ) as pbar:
227 256
228 257 def progress(bytes_transferred):
229 258 pbar.update(bytes_transferred)
230 259
231 260 self.s3_client.upload_fileobj(f, self.config.s3_bucket, file_name, Config=transfer_config, Callback=progress)
261 +
262 + print(f"[✓] 上传完成: {file_name}")
263 + print(f" S3位置: {self.config.s3_bucket}/{file_name}")
232 264
233 265 def clean_old_backups(self) -> None:
234 266 cutoff = datetime.now() - timedelta(days=self.config.keep_days)

buka5587 revisou este gist 1744609238. Ir para a revisão

1 file changed, 16 insertions, 6 deletions

pg_backup_s3.py

@@ -178,22 +178,32 @@ class BackupManager:
178 178 gz_path = sql_path + ".gz"
179 179
180 180 try:
181 + # 检查关键参数是否为空
182 + if not all([self.config.db_name, self.config.db_user, self.config.db_password,
183 + self.config.pg_host, str(self.config.pg_port)]):
184 + raise ValueError("数据库连接参数不完整")
185 +
181 186 env = os.environ.copy()
182 187 env["PGPASSWORD"] = self.config.db_password
183 188 pg_cmd = [
184 189 "pg_dump",
185 - f"-h={self.config.pg_host}",
186 - f"-p={self.config.pg_port}",
187 - f"-U={self.config.db_user}",
188 - f"-d={self.config.db_name}",
189 - f"-f={sql_path}"
190 + f"--dbname={self.config.db_name}",
191 + f"--host={self.config.pg_host}",
192 + f"--port={self.config.pg_port}",
193 + f"--username={self.config.db_user}",
194 + f"--file={sql_path}"
190 195 ]
191 196 if self.config.use_ssl:
192 197 pg_cmd.append("--ssl-mode=require")
193 198
199 + # 添加详细的日志记录
200 + self.logger.debug(f"执行pg_dump命令: {' '.join(pg_cmd)}")
201 +
194 202 result = subprocess.run(pg_cmd, env=env, capture_output=True, text=True)
195 203 if result.returncode != 0:
196 - raise RuntimeError(f"pg_dump失败: {result.stderr.strip()}")
204 + error_msg = result.stderr.strip()
205 + self.logger.error(f"pg_dump详细错误: {error_msg}")
206 + raise RuntimeError(f"pg_dump失败: {error_msg}")
197 207
198 208 with open(sql_path, "rb") as f_in, gzip.open(gz_path, "wb", self.config.compress_level) as f_out:
199 209 shutil.copyfileobj(f_in, f_out)

buka5587 revisou este gist 1744608757. Ir para a revisão

2 files changed, 54 insertions, 5 deletions

pg_backup_s3.py

@@ -18,7 +18,7 @@ from typing import List, NoReturn
18 18 import argparse
19 19 from tqdm import tqdm
20 20 from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
21 -
21 + from boto3.s3.transfer import TransferConfig
22 22 # -------------------------
23 23 # 配置数据类与默认值
24 24 # -------------------------
@@ -42,6 +42,22 @@ class BackupConfig:
42 42 # -------------------------
43 43 # 配置文件管理
44 44 # -------------------------
45 + def create_env_template(env_path: str) -> None:
46 + """生成.env环境变量模板文件"""
47 + template = """# PostgreSQL 备份到 S3 环境变量配置
48 + # 将敏感信息放在这里,不要提交到版本控制
49 +
50 + PG_DB_PASSWORD=your_db_password # 数据库密码
51 + S3_ACCESS_KEY=your_access_key # S3 Access Key
52 + S3_SECRET_KEY=your_secret_key # S3 Secret Key
53 + """
54 + with open(env_path, "w") as f:
55 + f.write(template)
56 + print(f"已生成环境变量模板到 {env_path},请修改敏感信息")
57 + # 添加文件权限设置
58 + if os.name != 'nt': # 非Windows系统
59 + os.chmod(env_path, 0o600) # 仅用户可读写
60 +
45 61 def create_config_template(config_path: str) -> None:
46 62 """生成配置模板文件"""
47 63 default_config = BackupConfig()
@@ -67,6 +83,11 @@ compress_level: {default_config.compress_level} # 压缩级别(0-9,默认6
67 83 with open(config_path, "w") as f:
68 84 f.write(template)
69 85 print(f"已生成配置模板到 {config_path},请修改后重新运行")
86 +
87 + # 同时生成.env文件模板
88 + env_path = os.path.join(os.path.dirname(config_path), ".env")
89 + if not os.path.exists(env_path):
90 + create_env_template(env_path)
70 91
71 92 def load_or_create_config(config_path: str) -> BackupConfig:
72 93 """加载配置文件,不存在则生成模板"""
@@ -74,6 +95,12 @@ def load_or_create_config(config_path: str) -> BackupConfig:
74 95 create_config_template(config_path)
75 96 raise SystemExit(0)
76 97
98 + # 加载.env文件(如果存在)
99 + env_path = os.path.join(os.path.dirname(config_path), ".env")
100 + if os.path.exists(env_path):
101 + from dotenv import load_dotenv
102 + load_dotenv(env_path)
103 +
77 104 with open(config_path, "r") as f:
78 105 cfg = yaml.safe_load(f)
79 106
@@ -132,6 +159,12 @@ class BackupManager:
132 159 def check_prerequisites(self) -> bool:
133 160 try:
134 161 os.makedirs(self.config.backup_dir, exist_ok=True)
162 + # 添加对备份目录可写性的检查
163 + test_file = os.path.join(self.config.backup_dir, ".test")
164 + with open(test_file, "w") as f:
165 + f.write("test")
166 + os.remove(test_file)
167 +
135 168 subprocess.run(["pg_dump", "--version"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
136 169 return True
137 170 except Exception as e:
@@ -220,7 +253,22 @@ def parse_arguments() -> argparse.Namespace:
220 253 # -------------------------
221 254 # 主流程
222 255 # -------------------------
256 + def check_dependencies():
257 + """检查所有依赖是否已安装"""
258 + try:
259 + import boto3
260 + import yaml
261 + import tqdm
262 + import tenacity
263 + import dotenv
264 + except ImportError as e:
265 + print(f"[!] 缺少依赖: {str(e)}")
266 + print("请运行: pip install -r requirements.txt")
267 + raise SystemExit(1)
268 +
269 + # 在main()函数开头调用
223 270 def main():
271 + check_dependencies()
224 272 args = parse_arguments()
225 273 config_path = args.config
226 274

requirements.txt

@@ -1,4 +1,5 @@
1 - boto3
2 - pyyaml
3 - tenacity
4 - tqdm
1 + boto3>=1.26.0,<2.0.0
2 + pyyaml>=6.0,<7.0
3 + tenacity>=8.2.2,<9.0
4 + tqdm>=4.65.0,<5.0
5 + python-dotenv>=0.19.0,<1.0.0

buka5587 revisou este gist 1744607724. Ir para a revisão

1 file changed, 4 insertions, 8 deletions

requirements.txt

@@ -1,8 +1,4 @@
1 - ‘’‘
2 - 推荐依赖版本
3 - ’‘’
4 -
5 - boto3>=1.26.0
6 - pyyaml>=6.0
7 - tenacity>=8.2.2
8 - tqdm>=4.65.0
1 + boto3
2 + pyyaml
3 + tenacity
4 + tqdm

buka5587 revisou este gist 1744607017. Ir para a revisão

3 files changed, 336 insertions, 237 deletions

pg_backup_s3.py

@@ -1,183 +1,250 @@
1 1 #!/usr/bin/env python3
2 - #推荐自行搭配定时执行工具使用
3 - #此脚本功能为pg数据库备份至s3
4 - #使用前请自行修改配置信息(请自行安装py环境及依赖)
5 - #使用方法:
6 - #1. 配置信息
7 - #2. 检查前置条件
8 - #3. 创建压缩备份
9 - #4. 上传备份文件到S3
10 - #5. 清理旧备份
11 - #6. 日志记录
12 - #7. 异常处理
2 + """
3 + PostgreSQL 压缩备份到 S3 一体化脚本
4 + 支持自动生成配置模板、环境变量覆盖、错误重试、进度显示
5 + """
13 6
14 - # 配置信息
15 7 import os
16 8 import subprocess
17 9 import boto3
18 10 from botocore.exceptions import ClientError
19 - from datetime import datetime
11 + from datetime import datetime, timedelta
20 12 import logging
21 13 import gzip
22 14 import shutil
23 - from boto3.s3.transfer import TransferConfig
15 + import yaml
16 + from dataclasses import dataclass
17 + from typing import List, NoReturn
18 + import argparse
19 + from tqdm import tqdm
20 + from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
24 21
25 - # 配置信息 请自行修改
26 - DB_NAME = 'database_name'
27 - DB_USER = 'database_user'
28 - DB_PASSWORD = 'database_password'
29 - S3_ENDPOINT = '你的存储桶域端点'
30 - S3_ACCESS_KEY = '你的存储桶访问ACCESS_KEY'
31 - S3_SECRET_KEY = '你的存储桶访问SECRET_KEY'
32 - S3_BUCKET = '你的存储桶名称'
33 - BACKUP_DIR = '/tmp/pg_backups' # 备份文件存储目录
34 - COMPRESS_LEVEL = 6 # 压缩级别 (0-9), 0为不压缩, 9为最大压缩,不懂不要修改
22 + # -------------------------
23 + # 配置数据类与默认值
24 + # -------------------------
25 + @dataclass
26 + class BackupConfig:
27 + db_name: str = "your_database_name"
28 + db_user: str = "your_db_user"
29 + db_password: str = "your_db_password"
30 + s3_endpoint: str = "https://s3.your-provider.com"
31 + s3_access_key: str = "your_access_key"
32 + s3_secret_key: str = "your_secret_key"
33 + s3_bucket: str = "your-bucket-name"
34 + backup_dir: str = "/var/lib/pg_backup"
35 + compress_level: int = 6
36 + keep_days: int = 7
37 + log_file: str = "/var/log/pg_backup.log"
38 + pg_host: str = "localhost"
39 + pg_port: int = 5432
40 + use_ssl: bool = False
35 41
36 - # 日志设置
37 - logging.basicConfig(
38 - level=logging.INFO,
39 - format='%(asctime)s - %(levelname)s - %(message)s',
40 - datefmt='%Y-%m-%d %H:%M:%S',
41 - handlers=[
42 - logging.StreamHandler(),
43 - logging.FileHandler('/var/log/pg_backup_compressed.log')
44 - ]
45 - )
46 - logger = logging.getLogger('PG_Backup_Compressed')
42 + # -------------------------
43 + # 配置文件管理
44 + # -------------------------
45 + def create_config_template(config_path: str) -> None:
46 + """生成配置模板文件"""
47 + default_config = BackupConfig()
48 + template = f"""# PostgreSQL 备份到 S3 配置文件
49 + # 敏感信息建议通过环境变量设置(优先级高于文件配置)
50 + # 环境变量:PG_DB_PASSWORD, S3_ACCESS_KEY, S3_SECRET_KEY
47 51
48 - def print_step(message):
49 - print(f"→ {message}")
52 + db_name: {default_config.db_name} # 数据库名称
53 + db_user: {default_config.db_user} # 数据库用户
54 + # db_password: {default_config.db_password} # 数据库密码(推荐通过环境变量设置)
55 + s3_endpoint: {default_config.s3_endpoint} # S3 端点(例如:https://s3.example.com)
56 + s3_access_key: {default_config.s3_access_key} # S3 Access Key
57 + # s3_secret_key: {default_config.s3_secret_key} # S3 Secret Key(推荐通过环境变量设置)
58 + s3_bucket: {default_config.s3_bucket} # S3 存储桶名称
59 + backup_dir: {default_config.backup_dir} # 本地备份存储目录(需可写)
60 + keep_days: {default_config.keep_days} # 保留天数(删除超过天数的备份)
61 + pg_host: {default_config.pg_host} # 数据库主机(默认localhost)
62 + pg_port: {default_config.pg_port} # 数据库端口(默认5432)
63 + use_ssl: {default_config.use_ssl} # 是否启用SSL连接(默认false)
64 + log_file: {default_config.log_file} # 日志文件路径
65 + compress_level: {default_config.compress_level} # 压缩级别(0-9,默认6)
66 + """
67 + with open(config_path, "w") as f:
68 + f.write(template)
69 + print(f"已生成配置模板到 {config_path},请修改后重新运行")
50 70
51 - def check_prerequisites():
52 - """检查前置条件"""
53 - try:
54 - os.makedirs(BACKUP_DIR, exist_ok=True)
55 - test_file = os.path.join(BACKUP_DIR, '.test')
56 - with open(test_file, 'w') as f:
57 - f.write('test')
58 - os.remove(test_file)
59 - subprocess.run(['pg_dump', '--version'], check=True, capture_output=True)
60 - return True
61 - except Exception as e:
62 - logger.error(f"前置条件检查失败: {str(e)}")
63 - return False
71 + def load_or_create_config(config_path: str) -> BackupConfig:
72 + """加载配置文件,不存在则生成模板"""
73 + if not os.path.exists(config_path):
74 + create_config_template(config_path)
75 + raise SystemExit(0)
76 +
77 + with open(config_path, "r") as f:
78 + cfg = yaml.safe_load(f)
79 +
80 + # 环境变量覆盖敏感信息
81 + env_override = {
82 + "db_password": os.getenv("PG_DB_PASSWORD"),
83 + "s3_access_key": os.getenv("S3_ACCESS_KEY"),
84 + "s3_secret_key": os.getenv("S3_SECRET_KEY")
85 + }
86 + for key, value in env_override.items():
87 + if value:
88 + cfg[key] = value
89 +
90 + return BackupConfig(**cfg)
64 91
65 - def create_compressed_backup():
66 - """创建压缩备份"""
67 - timestamp = datetime.now().strftime("%m%d%H%M")
68 - sql_file = os.path.join(BACKUP_DIR, f"{DB_NAME}_backup_{timestamp}.sql")
69 - gz_file = f"{sql_file}.gz"
92 + # -------------------------
93 + # 日志初始化
94 + # -------------------------
95 + def setup_logger(log_file: str) -> logging.Logger:
96 + logger = logging.getLogger("PGBackup")
97 + logger.setLevel(logging.DEBUG)
98 + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
70 99
71 - try:
72 - print_step("正在执行pg_dump...")
73 - env = os.environ.copy()
74 - env['PGPASSWORD'] = DB_PASSWORD
75 - cmd = [
76 - 'pg_dump',
77 - '-U', DB_USER,
78 - '-h', 'localhost',
79 - '-d', DB_NAME,
80 - '-f', sql_file
81 - ]
82 -
83 - result = subprocess.run(
84 - cmd,
85 - env=env,
86 - stdout=subprocess.PIPE,
87 - stderr=subprocess.PIPE,
88 - text=True
100 + # 控制台和文件日志
101 + console_handler = logging.StreamHandler()
102 + console_handler.setLevel(logging.INFO)
103 + console_handler.setFormatter(formatter)
104 +
105 + file_handler = logging.FileHandler(log_file)
106 + file_handler.setLevel(logging.DEBUG)
107 + file_handler.setFormatter(formatter)
108 +
109 + logger.addHandler(console_handler)
110 + logger.addHandler(file_handler)
111 + return logger
112 +
113 + # -------------------------
114 + # 核心功能类
115 + # -------------------------
116 + class BackupManager:
117 + def __init__(self, config: BackupConfig):
118 + self.config = config
119 + self.logger = setup_logger(config.log_file)
120 + self.s3_client = self._init_s3_client()
121 +
122 + def _init_s3_client(self):
123 + return boto3.client(
124 + 's3',
125 + endpoint_url=self.config.s3_endpoint,
126 + aws_access_key_id=self.config.s3_access_key,
127 + aws_secret_access_key=self.config.s3_secret_key,
128 + region_name='cn-sy1',
129 + config=boto3.session.Config(signature_version='s3v4')
89 130 )
131 +
132 + def check_prerequisites(self) -> bool:
133 + try:
134 + os.makedirs(self.config.backup_dir, exist_ok=True)
135 + subprocess.run(["pg_dump", "--version"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
136 + return True
137 + except Exception as e:
138 + self.logger.error(f"前置检查失败: {str(e)}")
139 + return False
140 +
141 + def create_compressed_backup(self) -> str:
142 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
143 + base_name = f"{self.config.db_name}_backup_{timestamp}"
144 + sql_path = os.path.join(self.config.backup_dir, base_name + ".sql")
145 + gz_path = sql_path + ".gz"
90 146
91 - if result.returncode != 0:
92 - raise Exception(f"pg_dump失败: {result.stderr.strip()}")
93 -
94 - if not os.path.exists(sql_file):
95 - raise Exception("SQL文件未生成")
96 -
97 - print_step("正在压缩备份文件...")
98 - with open(sql_file, 'rb') as f_in:
99 - with gzip.open(gz_file, 'wb', compresslevel=COMPRESS_LEVEL) as f_out:
147 + try:
148 + env = os.environ.copy()
149 + env["PGPASSWORD"] = self.config.db_password
150 + pg_cmd = [
151 + "pg_dump",
152 + f"-h={self.config.pg_host}",
153 + f"-p={self.config.pg_port}",
154 + f"-U={self.config.db_user}",
155 + f"-d={self.config.db_name}",
156 + f"-f={sql_path}"
157 + ]
158 + if self.config.use_ssl:
159 + pg_cmd.append("--ssl-mode=require")
160 +
161 + result = subprocess.run(pg_cmd, env=env, capture_output=True, text=True)
162 + if result.returncode != 0:
163 + raise RuntimeError(f"pg_dump失败: {result.stderr.strip()}")
164 +
165 + with open(sql_path, "rb") as f_in, gzip.open(gz_path, "wb", self.config.compress_level) as f_out:
100 166 shutil.copyfileobj(f_in, f_out)
167 + os.remove(sql_path)
168 + return gz_path
101 169
102 - os.remove(sql_file)
103 - return gz_file
170 + except Exception as e:
171 + self._cleanup_temp_files([sql_path, gz_path])
172 + self.logger.error(f"备份创建失败: {str(e)}")
173 + raise
174 +
175 + @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=2, max=10), retry=retry_if_exception_type(ClientError))
176 + def upload_to_s3(self, file_path: str) -> None:
177 + file_name = os.path.basename(file_path)
178 + transfer_config = TransferConfig(multipart_threshold=10*1024**2, max_concurrency=10)
104 179
105 - except Exception as e:
106 - for f in [sql_file, gz_file]:
180 + with open(file_path, "rb") as f, tqdm(
181 + total=os.path.getsize(file_path),
182 + unit="B", unit_scale=True, desc=f"上传 {file_name}", leave=False
183 + ) as pbar:
184 +
185 + def progress(bytes_transferred):
186 + pbar.update(bytes_transferred)
187 +
188 + self.s3_client.upload_fileobj(f, self.config.s3_bucket, file_name, Config=transfer_config, Callback=progress)
189 +
190 + def clean_old_backups(self) -> None:
191 + cutoff = datetime.now() - timedelta(days=self.config.keep_days)
192 + for file in os.listdir(self.config.backup_dir):
193 + if file.endswith(".gz"):
194 + path = os.path.join(self.config.backup_dir, file)
195 + mtime = datetime.fromtimestamp(os.path.getmtime(path))
196 + if mtime < cutoff:
197 + try:
198 + os.remove(path)
199 + self.logger.info(f"删除过期备份: {file}")
200 + except Exception as e:
201 + self.logger.warning(f"删除失败: {file} - {str(e)}")
202 +
203 + @staticmethod
204 + def _cleanup_temp_files(files: List[str]) -> None:
205 + for f in files:
107 206 if os.path.exists(f):
108 207 try:
109 208 os.remove(f)
110 209 except:
111 210 pass
112 - raise
113 211
114 - class ProgressPercentage:
115 - """上传进度显示"""
116 - def __init__(self, filename):
117 - self._filename = filename
118 - self._size = float(os.path.getsize(filename))
119 - self._seen_so_far = 0
120 -
121 - def __call__(self, bytes_amount):
122 - self._seen_so_far += bytes_amount
123 - percentage = (self._seen_so_far / self._size) * 100
124 - print(f"\r 上传进度: {percentage:.2f}% ({self._seen_so_far/1024/1024:.2f}MB)", end='')
212 + # -------------------------
213 + # 命令行接口
214 + # -------------------------
215 + def parse_arguments() -> argparse.Namespace:
216 + parser = argparse.ArgumentParser(description="PostgreSQL 备份到 S3 工具")
217 + parser.add_argument("-c", "--config", default="pg_backup.yaml", help="配置文件路径(默认: pg_backup.yaml)")
218 + return parser.parse_args()
125 219
126 - def upload_to_s3(file_path):
127 - """上传到S3"""
220 + # -------------------------
221 + # 主流程
222 + # -------------------------
223 + def main():
224 + args = parse_arguments()
225 + config_path = args.config
226 +
128 227 try:
129 - s3 = boto3.client(
130 - 's3',
131 - endpoint_url=S3_ENDPOINT,
132 - aws_access_key_id=S3_ACCESS_KEY,
133 - aws_secret_access_key=S3_SECRET_KEY,
134 - region_name='cn-sy1',
135 - config=boto3.session.Config(
136 - signature_version='s3v4'
137 - )
138 - )
139 -
140 - transfer_config = TransferConfig(
141 - multipart_threshold=1024*25,
142 - max_concurrency=10,
143 - multipart_chunksize=1024*25,
144 - use_threads=True
145 - )
228 + config = load_or_create_config(config_path)
229 + manager = BackupManager(config)
146 230
147 - file_name = os.path.basename(file_path)
148 - print_step(f"正在上传 {file_name}...")
231 + print("[*] 开始备份流程")
232 + if not manager.check_prerequisites():
233 + raise SystemExit("[!] 前置检查失败,流程终止")
149 234
150 - s3.upload_file(
151 - file_path,
152 - S3_BUCKET,
153 - file_name,
154 - Config=transfer_config,
155 - Callback=ProgressPercentage(file_path)
156 - )
235 + backup_path = manager.create_compressed_backup()
236 + manager.upload_to_s3(backup_path)
237 + manager.clean_old_backups()
238 + os.remove(backup_path) # 清理本地文件
157 239
158 - return True
159 - except Exception as e:
160 - raise
161 -
162 - def main():
163 - print("\n" + "="*50)
164 - print("PostgreSQL 压缩备份脚本")
165 - print("="*50 + "\n")
240 + print("\n[✓] 备份流程成功完成")
166 241
167 - try:
168 - if not check_prerequisites():
169 - raise Exception("前置条件检查未通过")
170 -
171 - backup_file = create_compressed_backup()
172 - if upload_to_s3(backup_file):
173 - os.remove(backup_file)
174 - print_step("上传成功,已清理本地文件")
175 -
242 + except SystemExit:
243 + raise
176 244 except Exception as e:
177 - logger.error(f"备份失败: {str(e)}")
178 - print_step(f"[错误] {str(e)}")
179 - finally:
180 - print("\n[操作完成]")
245 + manager.logger.error(f"备份失败: {str(e)}", exc_info=True)
246 + print(f"\n[!] 备份失败: {str(e)}")
247 + raise SystemExit(1)
181 248
182 249 if __name__ == "__main__":
183 - main()
250 + main()

pg_restore_s3.py

@@ -1,34 +1,26 @@
1 1 #!/usr/bin/env python3
2 - #此脚本请搭配pg_backup_s3.py使用,用于恢复数据库
3 - #使用方法:
4 - #1. 配置信息
5 - #2. 检查前置条件
6 - #3. 列出S3中的备份文件
7 - #4. 下载备份文件
8 - #5. 解压备份文件
9 - #6. 恢复数据库
10 - #7. 清理临时文件
11 - #8. 日志记录
12 - #9. 异常处理
2 + """
3 + PostgreSQL 恢复脚本(优化版)
4 + 此脚本用于恢复由 pg_backup_s3.py 备份到 S3 的数据库
5 + 支持从配置文件读取信息、错误重试、进度显示和详细日志记录
6 + """
13 7
14 8 import os
15 9 import boto3
16 10 import gzip
17 11 import subprocess
18 - import shutil
12 + import shutil
19 13 from datetime import datetime
20 14 import logging
15 + import yaml
21 16 from botocore.exceptions import ClientError
17 + from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
18 + from tqdm import tqdm
19 + import argparse
22 20
23 - # 使用与备份脚本相同的配置
24 - DB_NAME = 'database_name'
25 - DB_USER = 'database_user'
26 - DB_PASSWORD = 'database_password'
27 - S3_ENDPOINT = '你的s3端点'
28 - S3_ACCESS_KEY = '你的s3_access_key'
29 - S3_SECRET_KEY = '你的s3_secret_key'
30 - S3_BUCKET = '你的s3桶名'
31 - RESTORE_DIR = '/tmp/pg_restores' # 恢复文件存储目录
21 +
22 + # 配置文件默认路径
23 + DEFAULT_CONFIG_PATH = "pg_backup.yaml"
32 24
33 25 # 日志设置
34 26 logging.basicConfig(
@@ -42,99 +34,121 @@ logging.basicConfig(
42 34 )
43 35 logger = logging.getLogger('PG_Restore')
44 36
37 +
45 38 def print_step(message):
46 39 print(f"→ {message}")
47 40
48 - def get_s3_client():
49 - """创建S3客户端"""
41 +
42 + def load_config(config_path):
43 + """
44 + 从 YAML 配置文件中加载配置信息
45 + """
46 + if not os.path.exists(config_path):
47 + logger.error(f"配置文件 {config_path} 不存在,请检查路径。")
48 + raise FileNotFoundError(f"配置文件 {config_path} 不存在。")
49 + with open(config_path, 'r') as f:
50 + config = yaml.safe_load(f)
51 + return config
52 +
53 +
54 + def get_s3_client(config):
55 + """
56 + 创建 S3 客户端
57 + """
50 58 return boto3.client(
51 59 's3',
52 - endpoint_url=S3_ENDPOINT,
53 - aws_access_key_id=S3_ACCESS_KEY,
54 - aws_secret_access_key=S3_SECRET_KEY,
60 + endpoint_url=config['s3_endpoint'],
61 + aws_access_key_id=config['s3_access_key'],
62 + aws_secret_access_key=config['s3_secret_key'],
55 63 region_name='cn-sy1',
56 64 config=boto3.session.Config(signature_version='s3v4')
57 65 )
58 66
59 - def list_backup_files():
60 - """列出S3中的备份文件"""
67 +
68 + @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=2, max=10), retry=retry_if_exception_type(ClientError))
69 + def list_backup_files(config):
70 + """
71 + 列出 S3 中的备份文件,并按时间倒序排列
72 + """
61 73 try:
62 - s3 = get_s3_client()
63 - response = s3.list_objects_v2(Bucket=S3_BUCKET)
64 -
74 + s3 = get_s3_client(config)
75 + response = s3.list_objects_v2(Bucket=config['s3_bucket'])
65 76 if 'Contents' not in response:
66 - print_step("S3桶中没有找到备份文件")
77 + print_step("S3 桶中没有找到备份文件")
67 78 return []
68 -
69 79 files = [obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.gz')]
70 80 files.sort(reverse=True) # 按时间倒序排列
71 -
72 81 if not files:
73 - print_step("没有找到.gz格式的备份文件")
82 + print_step("没有找到 .gz 格式的备份文件")
74 83 return []
75 -
76 84 return files
77 -
78 85 except Exception as e:
79 86 logger.error(f"获取备份列表失败: {str(e)}")
80 87 raise
81 88
89 +
82 90 class DownloadProgressPercentage:
83 - """下载进度显示"""
91 + """
92 + 下载进度显示
93 + """
94 +
84 95 def __init__(self, filename, total_size):
85 96 self._filename = filename
86 97 self._size = total_size
87 98 self._seen_so_far = 0
88 -
99 + self._pbar = tqdm(total=total_size, unit='B', unit_scale=True, desc=f"下载 {filename}", leave=False)
100 +
89 101 def __call__(self, bytes_amount):
90 102 self._seen_so_far += bytes_amount
91 - percentage = (self._seen_so_far / self._size) * 100
92 - print(f"\r 下载进度: {percentage:.2f}% ({self._seen_so_far/1024/1024:.2f}MB)", end='')
103 + self._pbar.update(bytes_amount)
93 104
94 - def download_from_s3(file_name):
95 - """从S3下载备份文件"""
105 +
106 + @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=2, max=10), retry=retry_if_exception_type(ClientError))
107 + def download_from_s3(config, file_name):
108 + """
109 + 从 S3 下载备份文件
110 + """
96 111 try:
97 - os.makedirs(RESTORE_DIR, exist_ok=True)
98 - local_path = os.path.join(RESTORE_DIR, file_name)
99 -
100 - s3 = get_s3_client()
112 + restore_dir = config.get('restore_dir', '/tmp/pg_restores')
113 + os.makedirs(restore_dir, exist_ok=True)
114 + local_path = os.path.join(restore_dir, file_name)
115 + s3 = get_s3_client(config)
101 116 print_step(f"正在下载 {file_name}...")
102 -
103 117 # 获取文件大小用于进度显示
104 - file_size = s3.head_object(Bucket=S3_BUCKET, Key=file_name)['ContentLength']
105 -
118 + file_size = s3.head_object(Bucket=config['s3_bucket'], Key=file_name)['ContentLength']
106 119 s3.download_file(
107 - Bucket=S3_BUCKET,
120 + Bucket=config['s3_bucket'],
108 121 Key=file_name,
109 122 Filename=local_path,
110 123 Callback=DownloadProgressPercentage(file_name, file_size)
111 124 )
112 -
113 125 print() # 换行
114 126 return local_path
115 -
116 127 except Exception as e:
117 128 logger.error(f"下载备份文件失败: {str(e)}")
118 129 raise
119 130
131 +
120 132 def decompress_file(compressed_path):
121 - """解压备份文件"""
133 + """
134 + 解压备份文件
135 + """
122 136 try:
123 137 print_step("正在解压备份文件...")
124 - decompressed_path = compressed_path[:-3] # 去掉.gz后缀
125 -
138 + decompressed_path = compressed_path[:-3] # 去掉 .gz 后缀
126 139 with gzip.open(compressed_path, 'rb') as f_in:
127 140 with open(decompressed_path, 'wb') as f_out:
128 141 shutil.copyfileobj(f_in, f_out)
129 -
130 142 return decompressed_path
131 -
132 143 except Exception as e:
133 144 logger.error(f"解压备份文件失败: {str(e)}")
134 145 raise
135 146
136 - def restore_database(sql_file):
137 - """执行数据库恢复"""
147 +
148 + def restore_database(config, sql_file):
149 + """
150 + 执行数据库恢复
151 + """
138 152 try:
139 153 # 让用户选择恢复模式
140 154 print("\n请选择恢复模式:")
@@ -145,18 +159,18 @@ def restore_database(sql_file):
145 159 mode = int(input("请输入选择(1或2): "))
146 160 if mode in [1, 2]:
147 161 break
148 - print("输入无效,请输入1或2")
162 + print("输入无效,请输入 1 或 2")
149 163 except ValueError:
150 164 print("请输入有效的数字")
151 165
152 166 env = os.environ.copy()
153 - env['PGPASSWORD'] = DB_PASSWORD
167 + env['PGPASSWORD'] = config['db_password']
154 168
155 169 # 完全恢复模式
156 170 if mode == 1:
157 171 print_step("正在准备完全恢复...")
158 - temp_db = f"{DB_NAME}_temp"
159 -
172 + temp_db = f"{config['db_name']}_temp"
173 +
160 174 # 0. 先检查并删除已存在的临时数据库
161 175 print_step("正在清理可能存在的临时数据库...")
162 176 drop_temp_cmd = [
@@ -165,51 +179,51 @@ def restore_database(sql_file):
165 179 '-c', f"DROP DATABASE IF EXISTS {temp_db};"
166 180 ]
167 181 subprocess.run(drop_temp_cmd, check=True)
168 -
182 +
169 183 # 1. 创建临时数据库
170 184 print_step("正在创建临时数据库...")
171 185 create_temp_cmd = [
172 186 'sudo', '-u', 'postgres',
173 187 'psql',
174 - '-c', f"CREATE DATABASE {temp_db} WITH OWNER {DB_USER} ENCODING 'UTF8';"
188 + '-c', f"CREATE DATABASE {temp_db} WITH OWNER {config['db_user']} ENCODING 'UTF8';"
175 189 ]
176 190 subprocess.run(create_temp_cmd, check=True)
177 -
191 +
178 192 # 2. 将备份恢复到临时数据库
179 193 print_step("正在恢复数据到临时数据库...")
180 194 restore_temp_cmd = [
181 195 'psql',
182 - '-U', DB_USER,
196 + '-U', config['db_user'],
183 197 '-h', 'localhost',
184 198 '-d', temp_db,
185 199 '-f', sql_file
186 200 ]
187 201 subprocess.run(restore_temp_cmd, env=env, check=True)
188 -
202 +
189 203 # 3. 终止所有连接到原数据库的会话
190 204 print_step("正在终止原数据库连接...")
191 205 terminate_cmd = [
192 206 'sudo', '-u', 'postgres',
193 207 'psql',
194 - '-c', f"SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE pg_stat_activity.datname = '{DB_NAME}';"
208 + '-c', f"SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE pg_stat_activity.datname = '{config['db_name']}';"
195 209 ]
196 210 subprocess.run(terminate_cmd, check=True)
197 -
211 +
198 212 # 4. 删除原数据库
199 213 print_step("正在清理原数据库...")
200 214 drop_orig_cmd = [
201 215 'sudo', '-u', 'postgres',
202 216 'psql',
203 - '-c', f"DROP DATABASE IF EXISTS {DB_NAME};"
217 + '-c', f"DROP DATABASE IF EXISTS {config['db_name']};"
204 218 ]
205 219 subprocess.run(drop_orig_cmd, check=True)
206 -
220 +
207 221 # 5. 重命名临时数据库
208 222 print_step("正在完成恢复...")
209 223 rename_cmd = [
210 224 'sudo', '-u', 'postgres',
211 225 'psql',
212 - '-c', f"ALTER DATABASE {temp_db} RENAME TO {DB_NAME};"
226 + '-c', f"ALTER DATABASE {temp_db} RENAME TO {config['db_name']};"
213 227 ]
214 228 subprocess.run(rename_cmd, check=True)
215 229
@@ -217,9 +231,9 @@ def restore_database(sql_file):
217 231 print_step("正在恢复数据库...")
218 232 restore_cmd = [
219 233 'psql',
220 - '-U', DB_USER,
234 + '-U', config['db_user'],
221 235 '-h', 'localhost',
222 - '-d', DB_NAME,
236 + '-d', config['db_name'],
223 237 '-f', sql_file
224 238 ]
225 239 result = subprocess.run(
@@ -239,46 +253,55 @@ def restore_database(sql_file):
239 253 logger.error(f"数据库恢复失败: {str(e)}")
240 254 raise
241 255
256 +
242 257 def cleanup(file_path):
243 - """清理临时文件"""
258 + """
259 + 清理临时文件
260 + """
244 261 try:
245 262 if os.path.exists(file_path):
246 263 os.remove(file_path)
247 264 except Exception as e:
248 265 logger.warning(f"清理文件失败: {str(e)}")
249 266
267 +
250 268 def main():
251 - print("\n" + "="*50)
269 + parser = argparse.ArgumentParser(description="PostgreSQL 恢复脚本")
270 + parser.add_argument("-c", "--config", default=DEFAULT_CONFIG_PATH, help="配置文件路径")
271 + args = parser.parse_args()
272 +
273 + print("\n" + "=" * 50)
252 274 print("PostgreSQL 恢复脚本")
253 - print("="*50 + "\n")
254 -
275 + print("=" * 50 + "\n")
276 +
255 277 try:
278 + config = load_config(args.config)
256 279 # 列出备份文件
257 - backup_files = list_backup_files()
280 + backup_files = list_backup_files(config)
258 281 if not backup_files:
259 282 return
260 -
283 +
261 284 # 显示备份文件列表
262 285 print("\n可用的备份文件:")
263 286 for i, file in enumerate(backup_files, 1):
264 287 print(f"{i}. {file}")
265 -
288 +
266 289 # 选择要恢复的备份
267 290 while True:
268 291 try:
269 292 choice = int(input("\n请输入要恢复的备份编号: "))
270 293 if 1 <= choice <= len(backup_files):
271 - selected_file = backup_files[choice-1]
294 + selected_file = backup_files[choice - 1]
272 295 break
273 296 print("输入无效,请重新输入")
274 297 except ValueError:
275 298 print("请输入有效的数字")
276 -
299 +
277 300 # 下载并恢复
278 - compressed_path = download_from_s3(selected_file)
301 + compressed_path = download_from_s3(config, selected_file)
279 302 sql_path = decompress_file(compressed_path)
280 - restore_database(sql_path)
281 -
303 + restore_database(config, sql_path)
304 +
282 305 except Exception as e:
283 306 print_step(f"[错误] {str(e)}")
284 307 finally:
@@ -287,8 +310,9 @@ def main():
287 310 cleanup(compressed_path)
288 311 if 'sql_path' in locals():
289 312 cleanup(sql_path)
290 -
313 +
291 314 print("\n[操作完成]")
292 315
316 +
293 317 if __name__ == "__main__":
294 318 main()

requirements.txt(arquivo criado)

@@ -0,0 +1,8 @@
1 + ‘’‘
2 + 推荐依赖版本
3 + ’‘’
4 +
5 + boto3>=1.26.0
6 + pyyaml>=6.0
7 + tenacity>=8.2.2
8 + tqdm>=4.65.0

buka5587 revisou este gist 1744529289. Ir para a revisão

2 files changed, 1 insertion, 1 deletion

(arquivo excluído)

Arquivo vazio

pg_backup_s3.py

@@ -1,5 +1,5 @@
1 1 #!/usr/bin/env python3
2 - #推荐自行搭配定时执行工具使用--测试测试
2 + #推荐自行搭配定时执行工具使用
3 3 #此脚本功能为pg数据库备份至s3
4 4 #使用前请自行修改配置信息(请自行安装py环境及依赖)
5 5 #使用方法:

buka5587 revisou este gist 1744529175. Ir para a revisão

2 files changed, 1 insertion, 1 deletion

(arquivo criado)

Arquivo vazio

pg_backup_s3.py

@@ -1,5 +1,5 @@
1 1 #!/usr/bin/env python3
2 - #推荐自行搭配定时执行工具使用
2 + #推荐自行搭配定时执行工具使用--测试测试
3 3 #此脚本功能为pg数据库备份至s3
4 4 #使用前请自行修改配置信息(请自行安装py环境及依赖)
5 5 #使用方法:

2195299055 revisou este gist 1744528816. Ir para a revisão

Sem alterações

2195299055 revisou este gist 1744528312. Ir para a revisão

1 file changed, 294 insertions

pg_restore_s3.py(arquivo criado)

@@ -0,0 +1,294 @@
1 + #!/usr/bin/env python3
2 + #此脚本请搭配pg_backup_s3.py使用,用于恢复数据库
3 + #使用方法:
4 + #1. 配置信息
5 + #2. 检查前置条件
6 + #3. 列出S3中的备份文件
7 + #4. 下载备份文件
8 + #5. 解压备份文件
9 + #6. 恢复数据库
10 + #7. 清理临时文件
11 + #8. 日志记录
12 + #9. 异常处理
13 +
14 + import os
15 + import boto3
16 + import gzip
17 + import subprocess
18 + import shutil
19 + from datetime import datetime
20 + import logging
21 + from botocore.exceptions import ClientError
22 +
23 + # 使用与备份脚本相同的配置
24 + DB_NAME = 'database_name'
25 + DB_USER = 'database_user'
26 + DB_PASSWORD = 'database_password'
27 + S3_ENDPOINT = '你的s3端点'
28 + S3_ACCESS_KEY = '你的s3_access_key'
29 + S3_SECRET_KEY = '你的s3_secret_key'
30 + S3_BUCKET = '你的s3桶名'
31 + RESTORE_DIR = '/tmp/pg_restores' # 恢复文件存储目录
32 +
33 + # 日志设置
34 + logging.basicConfig(
35 + level=logging.INFO,
36 + format='%(asctime)s - %(levelname)s - %(message)s',
37 + datefmt='%Y-%m-%d %H:%M:%S',
38 + handlers=[
39 + logging.StreamHandler(),
40 + logging.FileHandler('/var/log/pg_restore.log')
41 + ]
42 + )
43 + logger = logging.getLogger('PG_Restore')
44 +
45 + def print_step(message):
46 + print(f"→ {message}")
47 +
48 + def get_s3_client():
49 + """创建S3客户端"""
50 + return boto3.client(
51 + 's3',
52 + endpoint_url=S3_ENDPOINT,
53 + aws_access_key_id=S3_ACCESS_KEY,
54 + aws_secret_access_key=S3_SECRET_KEY,
55 + region_name='cn-sy1',
56 + config=boto3.session.Config(signature_version='s3v4')
57 + )
58 +
59 + def list_backup_files():
60 + """列出S3中的备份文件"""
61 + try:
62 + s3 = get_s3_client()
63 + response = s3.list_objects_v2(Bucket=S3_BUCKET)
64 +
65 + if 'Contents' not in response:
66 + print_step("S3桶中没有找到备份文件")
67 + return []
68 +
69 + files = [obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.gz')]
70 + files.sort(reverse=True) # 按时间倒序排列
71 +
72 + if not files:
73 + print_step("没有找到.gz格式的备份文件")
74 + return []
75 +
76 + return files
77 +
78 + except Exception as e:
79 + logger.error(f"获取备份列表失败: {str(e)}")
80 + raise
81 +
82 + class DownloadProgressPercentage:
83 + """下载进度显示"""
84 + def __init__(self, filename, total_size):
85 + self._filename = filename
86 + self._size = total_size
87 + self._seen_so_far = 0
88 +
89 + def __call__(self, bytes_amount):
90 + self._seen_so_far += bytes_amount
91 + percentage = (self._seen_so_far / self._size) * 100
92 + print(f"\r 下载进度: {percentage:.2f}% ({self._seen_so_far/1024/1024:.2f}MB)", end='')
93 +
94 + def download_from_s3(file_name):
95 + """从S3下载备份文件"""
96 + try:
97 + os.makedirs(RESTORE_DIR, exist_ok=True)
98 + local_path = os.path.join(RESTORE_DIR, file_name)
99 +
100 + s3 = get_s3_client()
101 + print_step(f"正在下载 {file_name}...")
102 +
103 + # 获取文件大小用于进度显示
104 + file_size = s3.head_object(Bucket=S3_BUCKET, Key=file_name)['ContentLength']
105 +
106 + s3.download_file(
107 + Bucket=S3_BUCKET,
108 + Key=file_name,
109 + Filename=local_path,
110 + Callback=DownloadProgressPercentage(file_name, file_size)
111 + )
112 +
113 + print() # 换行
114 + return local_path
115 +
116 + except Exception as e:
117 + logger.error(f"下载备份文件失败: {str(e)}")
118 + raise
119 +
120 + def decompress_file(compressed_path):
121 + """解压备份文件"""
122 + try:
123 + print_step("正在解压备份文件...")
124 + decompressed_path = compressed_path[:-3] # 去掉.gz后缀
125 +
126 + with gzip.open(compressed_path, 'rb') as f_in:
127 + with open(decompressed_path, 'wb') as f_out:
128 + shutil.copyfileobj(f_in, f_out)
129 +
130 + return decompressed_path
131 +
132 + except Exception as e:
133 + logger.error(f"解压备份文件失败: {str(e)}")
134 + raise
135 +
136 + def restore_database(sql_file):
137 + """执行数据库恢复"""
138 + try:
139 + # 让用户选择恢复模式
140 + print("\n请选择恢复模式:")
141 + print("1. 完全恢复 (先清空数据库,再恢复)")
142 + print("2. 追加恢复 (保留现有数据,只添加备份数据)")
143 + while True:
144 + try:
145 + mode = int(input("请输入选择(1或2): "))
146 + if mode in [1, 2]:
147 + break
148 + print("输入无效,请输入1或2")
149 + except ValueError:
150 + print("请输入有效的数字")
151 +
152 + env = os.environ.copy()
153 + env['PGPASSWORD'] = DB_PASSWORD
154 +
155 + # 完全恢复模式
156 + if mode == 1:
157 + print_step("正在准备完全恢复...")
158 + temp_db = f"{DB_NAME}_temp"
159 +
160 + # 0. 先检查并删除已存在的临时数据库
161 + print_step("正在清理可能存在的临时数据库...")
162 + drop_temp_cmd = [
163 + 'sudo', '-u', 'postgres',
164 + 'psql',
165 + '-c', f"DROP DATABASE IF EXISTS {temp_db};"
166 + ]
167 + subprocess.run(drop_temp_cmd, check=True)
168 +
169 + # 1. 创建临时数据库
170 + print_step("正在创建临时数据库...")
171 + create_temp_cmd = [
172 + 'sudo', '-u', 'postgres',
173 + 'psql',
174 + '-c', f"CREATE DATABASE {temp_db} WITH OWNER {DB_USER} ENCODING 'UTF8';"
175 + ]
176 + subprocess.run(create_temp_cmd, check=True)
177 +
178 + # 2. 将备份恢复到临时数据库
179 + print_step("正在恢复数据到临时数据库...")
180 + restore_temp_cmd = [
181 + 'psql',
182 + '-U', DB_USER,
183 + '-h', 'localhost',
184 + '-d', temp_db,
185 + '-f', sql_file
186 + ]
187 + subprocess.run(restore_temp_cmd, env=env, check=True)
188 +
189 + # 3. 终止所有连接到原数据库的会话
190 + print_step("正在终止原数据库连接...")
191 + terminate_cmd = [
192 + 'sudo', '-u', 'postgres',
193 + 'psql',
194 + '-c', f"SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE pg_stat_activity.datname = '{DB_NAME}';"
195 + ]
196 + subprocess.run(terminate_cmd, check=True)
197 +
198 + # 4. 删除原数据库
199 + print_step("正在清理原数据库...")
200 + drop_orig_cmd = [
201 + 'sudo', '-u', 'postgres',
202 + 'psql',
203 + '-c', f"DROP DATABASE IF EXISTS {DB_NAME};"
204 + ]
205 + subprocess.run(drop_orig_cmd, check=True)
206 +
207 + # 5. 重命名临时数据库
208 + print_step("正在完成恢复...")
209 + rename_cmd = [
210 + 'sudo', '-u', 'postgres',
211 + 'psql',
212 + '-c', f"ALTER DATABASE {temp_db} RENAME TO {DB_NAME};"
213 + ]
214 + subprocess.run(rename_cmd, check=True)
215 +
216 + # 普通恢复操作
217 + print_step("正在恢复数据库...")
218 + restore_cmd = [
219 + 'psql',
220 + '-U', DB_USER,
221 + '-h', 'localhost',
222 + '-d', DB_NAME,
223 + '-f', sql_file
224 + ]
225 + result = subprocess.run(
226 + restore_cmd,
227 + env=env,
228 + stdout=subprocess.PIPE,
229 + stderr=subprocess.PIPE,
230 + text=True
231 + )
232 +
233 + if result.returncode != 0:
234 + raise Exception(f"恢复失败: {result.stderr.strip()}")
235 +
236 + print_step("数据库恢复成功")
237 +
238 + except Exception as e:
239 + logger.error(f"数据库恢复失败: {str(e)}")
240 + raise
241 +
242 + def cleanup(file_path):
243 + """清理临时文件"""
244 + try:
245 + if os.path.exists(file_path):
246 + os.remove(file_path)
247 + except Exception as e:
248 + logger.warning(f"清理文件失败: {str(e)}")
249 +
250 + def main():
251 + print("\n" + "="*50)
252 + print("PostgreSQL 恢复脚本")
253 + print("="*50 + "\n")
254 +
255 + try:
256 + # 列出备份文件
257 + backup_files = list_backup_files()
258 + if not backup_files:
259 + return
260 +
261 + # 显示备份文件列表
262 + print("\n可用的备份文件:")
263 + for i, file in enumerate(backup_files, 1):
264 + print(f"{i}. {file}")
265 +
266 + # 选择要恢复的备份
267 + while True:
268 + try:
269 + choice = int(input("\n请输入要恢复的备份编号: "))
270 + if 1 <= choice <= len(backup_files):
271 + selected_file = backup_files[choice-1]
272 + break
273 + print("输入无效,请重新输入")
274 + except ValueError:
275 + print("请输入有效的数字")
276 +
277 + # 下载并恢复
278 + compressed_path = download_from_s3(selected_file)
279 + sql_path = decompress_file(compressed_path)
280 + restore_database(sql_path)
281 +
282 + except Exception as e:
283 + print_step(f"[错误] {str(e)}")
284 + finally:
285 + # 清理临时文件
286 + if 'compressed_path' in locals():
287 + cleanup(compressed_path)
288 + if 'sql_path' in locals():
289 + cleanup(sql_path)
290 +
291 + print("\n[操作完成]")
292 +
293 + if __name__ == "__main__":
294 + main()

2195299055 revisou este gist 1744526322. Ir para a revisão

1 file changed, 183 insertions

pg_backup_s3.py(arquivo criado)

@@ -0,0 +1,183 @@
1 + #!/usr/bin/env python3
2 + #推荐自行搭配定时执行工具使用
3 + #此脚本功能为pg数据库备份至s3
4 + #使用前请自行修改配置信息(请自行安装py环境及依赖)
5 + #使用方法:
6 + #1. 配置信息
7 + #2. 检查前置条件
8 + #3. 创建压缩备份
9 + #4. 上传备份文件到S3
10 + #5. 清理旧备份
11 + #6. 日志记录
12 + #7. 异常处理
13 +
14 + # 配置信息
15 + import os
16 + import subprocess
17 + import boto3
18 + from botocore.exceptions import ClientError
19 + from datetime import datetime
20 + import logging
21 + import gzip
22 + import shutil
23 + from boto3.s3.transfer import TransferConfig
24 +
25 + # 配置信息 请自行修改
26 + DB_NAME = 'database_name'
27 + DB_USER = 'database_user'
28 + DB_PASSWORD = 'database_password'
29 + S3_ENDPOINT = '你的存储桶域端点'
30 + S3_ACCESS_KEY = '你的存储桶访问ACCESS_KEY'
31 + S3_SECRET_KEY = '你的存储桶访问SECRET_KEY'
32 + S3_BUCKET = '你的存储桶名称'
33 + BACKUP_DIR = '/tmp/pg_backups' # 备份文件存储目录
34 + COMPRESS_LEVEL = 6 # 压缩级别 (0-9), 0为不压缩, 9为最大压缩,不懂不要修改
35 +
36 + # 日志设置
37 + logging.basicConfig(
38 + level=logging.INFO,
39 + format='%(asctime)s - %(levelname)s - %(message)s',
40 + datefmt='%Y-%m-%d %H:%M:%S',
41 + handlers=[
42 + logging.StreamHandler(),
43 + logging.FileHandler('/var/log/pg_backup_compressed.log')
44 + ]
45 + )
46 + logger = logging.getLogger('PG_Backup_Compressed')
47 +
48 + def print_step(message):
49 + print(f"→ {message}")
50 +
51 + def check_prerequisites():
52 + """检查前置条件"""
53 + try:
54 + os.makedirs(BACKUP_DIR, exist_ok=True)
55 + test_file = os.path.join(BACKUP_DIR, '.test')
56 + with open(test_file, 'w') as f:
57 + f.write('test')
58 + os.remove(test_file)
59 + subprocess.run(['pg_dump', '--version'], check=True, capture_output=True)
60 + return True
61 + except Exception as e:
62 + logger.error(f"前置条件检查失败: {str(e)}")
63 + return False
64 +
65 + def create_compressed_backup():
66 + """创建压缩备份"""
67 + timestamp = datetime.now().strftime("%m%d%H%M")
68 + sql_file = os.path.join(BACKUP_DIR, f"{DB_NAME}_backup_{timestamp}.sql")
69 + gz_file = f"{sql_file}.gz"
70 +
71 + try:
72 + print_step("正在执行pg_dump...")
73 + env = os.environ.copy()
74 + env['PGPASSWORD'] = DB_PASSWORD
75 + cmd = [
76 + 'pg_dump',
77 + '-U', DB_USER,
78 + '-h', 'localhost',
79 + '-d', DB_NAME,
80 + '-f', sql_file
81 + ]
82 +
83 + result = subprocess.run(
84 + cmd,
85 + env=env,
86 + stdout=subprocess.PIPE,
87 + stderr=subprocess.PIPE,
88 + text=True
89 + )
90 +
91 + if result.returncode != 0:
92 + raise Exception(f"pg_dump失败: {result.stderr.strip()}")
93 +
94 + if not os.path.exists(sql_file):
95 + raise Exception("SQL文件未生成")
96 +
97 + print_step("正在压缩备份文件...")
98 + with open(sql_file, 'rb') as f_in:
99 + with gzip.open(gz_file, 'wb', compresslevel=COMPRESS_LEVEL) as f_out:
100 + shutil.copyfileobj(f_in, f_out)
101 +
102 + os.remove(sql_file)
103 + return gz_file
104 +
105 + except Exception as e:
106 + for f in [sql_file, gz_file]:
107 + if os.path.exists(f):
108 + try:
109 + os.remove(f)
110 + except:
111 + pass
112 + raise
113 +
114 + class ProgressPercentage:
115 + """上传进度显示"""
116 + def __init__(self, filename):
117 + self._filename = filename
118 + self._size = float(os.path.getsize(filename))
119 + self._seen_so_far = 0
120 +
121 + def __call__(self, bytes_amount):
122 + self._seen_so_far += bytes_amount
123 + percentage = (self._seen_so_far / self._size) * 100
124 + print(f"\r 上传进度: {percentage:.2f}% ({self._seen_so_far/1024/1024:.2f}MB)", end='')
125 +
126 + def upload_to_s3(file_path):
127 + """上传到S3"""
128 + try:
129 + s3 = boto3.client(
130 + 's3',
131 + endpoint_url=S3_ENDPOINT,
132 + aws_access_key_id=S3_ACCESS_KEY,
133 + aws_secret_access_key=S3_SECRET_KEY,
134 + region_name='cn-sy1',
135 + config=boto3.session.Config(
136 + signature_version='s3v4'
137 + )
138 + )
139 +
140 + transfer_config = TransferConfig(
141 + multipart_threshold=1024*25,
142 + max_concurrency=10,
143 + multipart_chunksize=1024*25,
144 + use_threads=True
145 + )
146 +
147 + file_name = os.path.basename(file_path)
148 + print_step(f"正在上传 {file_name}...")
149 +
150 + s3.upload_file(
151 + file_path,
152 + S3_BUCKET,
153 + file_name,
154 + Config=transfer_config,
155 + Callback=ProgressPercentage(file_path)
156 + )
157 +
158 + return True
159 + except Exception as e:
160 + raise
161 +
162 + def main():
163 + print("\n" + "="*50)
164 + print("PostgreSQL 压缩备份脚本")
165 + print("="*50 + "\n")
166 +
167 + try:
168 + if not check_prerequisites():
169 + raise Exception("前置条件检查未通过")
170 +
171 + backup_file = create_compressed_backup()
172 + if upload_to_s3(backup_file):
173 + os.remove(backup_file)
174 + print_step("上传成功,已清理本地文件")
175 +
176 + except Exception as e:
177 + logger.error(f"备份失败: {str(e)}")
178 + print_step(f"[错误] {str(e)}")
179 + finally:
180 + print("\n[操作完成]")
181 +
182 + if __name__ == "__main__":
183 + main()
Próximo Anterior