用 blot.new 写了一个读取 markdown 文件中的图片链接并上传 cloudflare 的 R2 的 Python 脚本,代码中的 Cloudflare R2 配置需要替换成自己的配置
import os
import re
import requests
import boto3
import argparse
from datetime import datetime
from urllib.parse import urlparse, unquote
from botocore.config import Config
# Cloudflare R2 配置
R2_BUCKET_NAME = "xxx"
R2_ACCESS_KEY_ID = "xxx"
R2_SECRET_ACCESS_KEY = "xxx"
ACCOUNT_ID = "xxx"
R2_ENDPOINT_URL = f"https://{ACCOUNT_ID}.r2.cloudflarestorage.com"
PUBLIC_URL_PREFIX = "https://xxx" # 修改为你的自定义域名
def get_current_path_prefix():
"""获取当前年月的路径前缀"""
now = datetime.now()
return f"{now.year}/{now.month:02d}"
def get_r2_client():
"""创建并返回 R2 客户端"""
return boto3.client(
's3',
endpoint_url=R2_ENDPOINT_URL,
aws_access_key_id=R2_ACCESS_KEY_ID,
aws_secret_access_key=R2_SECRET_ACCESS_KEY,
config=Config(signature_version='s3v4'),
)
def upload_to_r2(file_path, key):
"""上传文件到 R2 并返回公开访问 URL"""
try:
client = get_r2_client()
# 获取文件的 MIME 类型
content_type = 'image/jpeg' # 默认类型
if file_path.lower().endswith('.png'):
content_type = 'image/png'
elif file_path.lower().endswith('.gif'):
content_type = 'image/gif'
elif file_path.lower().endswith('.webp'):
content_type = 'image/webp'
# 添加年月路径前缀
path_prefix = get_current_path_prefix()
full_key = f"{path_prefix}/{key}"
# 上传文件
client.upload_file(
file_path,
R2_BUCKET_NAME,
full_key,
ExtraArgs={'ContentType': content_type}
)
# 返回公开访问 URL
return f"{PUBLIC_URL_PREFIX}/{full_key}"
except Exception as e:
print(f"上传失败: {str(e)}")
raise
def download_image(url, temp_dir):
"""下载图片到临时目录"""
try:
# 添加请求头,模拟浏览器行为
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
# 从 URL 中提取文件名
parsed_url = urlparse(unquote(url))
filename = os.path.basename(parsed_url.path).split('?')[0]
# 如果文件名为空或没有扩展名,根据内容类型生成文件名
if not filename or '.' not in filename:
content_type = response.headers.get('content-type', '').lower()
ext = '.jpg' # 默认扩展名
if 'png' in content_type:
ext = '.png'
elif 'gif' in content_type:
ext = '.gif'
elif 'webp' in content_type:
ext = '.webp'
# 使用时间戳作为文件名前缀,确保唯一性
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
filename = f"image_{timestamp}_{hash(url)}{ext}"
# 确保文件名是唯一的
file_path = os.path.join(temp_dir, filename)
counter = 1
while os.path.exists(file_path):
name, ext = os.path.splitext(filename)
file_path = os.path.join(temp_dir, f"{name}_{counter}{ext}")
counter += 1
# 保存文件
with open(file_path, 'wb') as f:
f.write(response.content)
return file_path
except Exception as e:
print(f"下载图片失败 {url}: {str(e)}")
return None
def process_markdown_file(file_path):
"""处理 Markdown 文件中的图片"""
# 检查文件是否存在
if not os.path.exists(file_path):
print(f"错误:找不到文件'{file_path}'")
print("请确保:")
print("1. 文件路径正确")
print("2. 使用完整的绝对路径")
print("3. 路径中的目录分隔符使用'/'或'\\\\'")
return False
# 检查文件是否是 Markdown 文件
if not file_path.lower().endswith('.md'):
print(f"错误:文件'{file_path}'不是 Markdown 文件")
return False
# 创建临时目录
temp_dir = "temp_images"
os.makedirs(temp_dir, exist_ok=True)
try:
# 读取 Markdown 文件
print(f"正在读取文件:{file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 查找所有图片链接
image_pattern = r'!\[([^\]]*)\]\((https?://[^)]+)\)'
matches = re.finditer(image_pattern, content)
matches_list = list(matches)
if not matches_list:
print("未找到任何图片链接")
return True
print(f"找到 {len(matches_list)} 个图片链接")
processed_count = 0
current_month = get_current_path_prefix()
print(f"图片将上传到 {current_month} 目录")
# 处理每个图片
for match in matches_list:
alt_text = match.group(1)
image_url = match.group(2)
print(f"\n 处理图片 [{processed_count + 1}/{len(matches_list)}]")
print(f"原始链接: {image_url}")
# 下载图片
local_path = download_image(image_url, temp_dir)
if not local_path:
continue
try:
# 上传到 R2
filename = os.path.basename(local_path)
r2_url = upload_to_r2(local_path, filename)
# 替换 Markdown 中的 URL
old_link = f"![{alt_text}]({image_url})"
new_link = f"![{alt_text}]({r2_url})"
content = content.replace(old_link, new_link)
print(f"新链接: {r2_url}")
processed_count += 1
finally:
# 删除临时文件
if os.path.exists(local_path):
os.remove(local_path)
# 保存更新后的 Markdown 文件
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
print(f"\n 处理完成!成功处理 {processed_count} 个图片")
print(f"所有图片已上传到 {PUBLIC_URL_PREFIX}/{current_month}/ 目录")
return True
except Exception as e:
print(f"处理文件时出错: {str(e)}")
return False
finally:
# 清理临时目录
if os.path.exists(temp_dir):
for file in os.listdir(temp_dir):
os.remove(os.path.join(temp_dir, file))
os.rmdir(temp_dir)
def main():
# 创建命令行参数解析器
parser = argparse.ArgumentParser(description='处理 Markdown 文件中的图片链接,上传到 Cloudflare R2')
parser.add_argument('file_path', help='Markdown 文件的路径')
# 解析命令行参数
args = parser.parse_args()
# 处理文件
success = process_markdown_file(args.file_path)
if not success:
exit(1)
if __name__ == "__main__":
main()
正文完