读取markdown文件中的图片链接并上传cloudflare的R2

77次阅读
没有评论

blot.new 写了一个读取 markdown 文件中的图片链接并上传 cloudflare 的 R2 的 Python 脚本,代码中的 Cloudflare R2 配置需要替换成自己的配置

import os
import re
import requests
import boto3
import argparse
from datetime import datetime
from urllib.parse import urlparse, unquote
from botocore.config import Config

# Cloudflare R2 配置
R2_BUCKET_NAME = "xxx"
R2_ACCESS_KEY_ID = "xxx"
R2_SECRET_ACCESS_KEY = "xxx"
ACCOUNT_ID = "xxx"
R2_ENDPOINT_URL = f"https://{ACCOUNT_ID}.r2.cloudflarestorage.com"
PUBLIC_URL_PREFIX = "https://xxx"  # 修改为你的自定义域名

def get_current_path_prefix():
    """获取当前年月的路径前缀"""
    now = datetime.now()
    return f"{now.year}/{now.month:02d}"

def get_r2_client():
    """创建并返回 R2 客户端"""
    return boto3.client(
        's3',
        endpoint_url=R2_ENDPOINT_URL,
        aws_access_key_id=R2_ACCESS_KEY_ID,
        aws_secret_access_key=R2_SECRET_ACCESS_KEY,
        config=Config(signature_version='s3v4'),
    )

def upload_to_r2(file_path, key):
    """上传文件到 R2 并返回公开访问 URL"""
    try:
        client = get_r2_client()
        # 获取文件的 MIME 类型
        content_type = 'image/jpeg'  # 默认类型
        if file_path.lower().endswith('.png'):
            content_type = 'image/png'
        elif file_path.lower().endswith('.gif'):
            content_type = 'image/gif'
        elif file_path.lower().endswith('.webp'):
            content_type = 'image/webp'

        # 添加年月路径前缀
        path_prefix = get_current_path_prefix()
        full_key = f"{path_prefix}/{key}"

        # 上传文件
        client.upload_file(
            file_path, 
            R2_BUCKET_NAME, 
            full_key,
            ExtraArgs={'ContentType': content_type}
        )
        
        # 返回公开访问 URL
        return f"{PUBLIC_URL_PREFIX}/{full_key}"
    except Exception as e:
        print(f"上传失败: {str(e)}")
        raise

def download_image(url, temp_dir):
    """下载图片到临时目录"""
    try:
        # 添加请求头,模拟浏览器行为
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        }
        response = requests.get(url, headers=headers, timeout=30)
        response.raise_for_status()
        
        # 从 URL 中提取文件名
        parsed_url = urlparse(unquote(url))
        filename = os.path.basename(parsed_url.path).split('?')[0]
        
        # 如果文件名为空或没有扩展名,根据内容类型生成文件名
        if not filename or '.' not in filename:
            content_type = response.headers.get('content-type', '').lower()
            ext = '.jpg'  # 默认扩展名
            if 'png' in content_type:
                ext = '.png'
            elif 'gif' in content_type:
                ext = '.gif'
            elif 'webp' in content_type:
                ext = '.webp'
            # 使用时间戳作为文件名前缀,确保唯一性
            timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
            filename = f"image_{timestamp}_{hash(url)}{ext}"
        
        # 确保文件名是唯一的
        file_path = os.path.join(temp_dir, filename)
        counter = 1
        while os.path.exists(file_path):
            name, ext = os.path.splitext(filename)
            file_path = os.path.join(temp_dir, f"{name}_{counter}{ext}")
            counter += 1
        
        # 保存文件
        with open(file_path, 'wb') as f:
            f.write(response.content)
        
        return file_path
    except Exception as e:
        print(f"下载图片失败 {url}: {str(e)}")
        return None

def process_markdown_file(file_path):
    """处理 Markdown 文件中的图片"""
    # 检查文件是否存在
    if not os.path.exists(file_path):
        print(f"错误:找不到文件'{file_path}'")
        print("请确保:")
        print("1. 文件路径正确")
        print("2. 使用完整的绝对路径")
        print("3. 路径中的目录分隔符使用'/'或'\\\\'")
        return False

    # 检查文件是否是 Markdown 文件
    if not file_path.lower().endswith('.md'):
        print(f"错误:文件'{file_path}'不是 Markdown 文件")
        return False

    # 创建临时目录
    temp_dir = "temp_images"
    os.makedirs(temp_dir, exist_ok=True)
    
    try:
        # 读取 Markdown 文件
        print(f"正在读取文件:{file_path}")
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        # 查找所有图片链接
        image_pattern = r'!\[([^\]]*)\]\((https?://[^)]+)\)'
        matches = re.finditer(image_pattern, content)
        matches_list = list(matches)
        
        if not matches_list:
            print("未找到任何图片链接")
            return True
        
        print(f"找到 {len(matches_list)} 个图片链接")
        processed_count = 0
        current_month = get_current_path_prefix()
        print(f"图片将上传到 {current_month} 目录")
        
        # 处理每个图片
        for match in matches_list:
            alt_text = match.group(1)
            image_url = match.group(2)
            
            print(f"\n 处理图片 [{processed_count + 1}/{len(matches_list)}]")
            print(f"原始链接: {image_url}")
            
            # 下载图片
            local_path = download_image(image_url, temp_dir)
            if not local_path:
                continue
            
            try:
                # 上传到 R2
                filename = os.path.basename(local_path)
                r2_url = upload_to_r2(local_path, filename)
                
                # 替换 Markdown 中的 URL
                old_link = f"![{alt_text}]({image_url})"
                new_link = f"![{alt_text}]({r2_url})"
                content = content.replace(old_link, new_link)
                
                print(f"新链接: {r2_url}")
                processed_count += 1
            finally:
                # 删除临时文件
                if os.path.exists(local_path):
                    os.remove(local_path)
        
        # 保存更新后的 Markdown 文件
        with open(file_path, 'w', encoding='utf-8') as f:
            f.write(content)
            
        print(f"\n 处理完成!成功处理 {processed_count} 个图片")
        print(f"所有图片已上传到 {PUBLIC_URL_PREFIX}/{current_month}/ 目录")
        return True
    except Exception as e:
        print(f"处理文件时出错: {str(e)}")
        return False
    finally:
        # 清理临时目录
        if os.path.exists(temp_dir):
            for file in os.listdir(temp_dir):
                os.remove(os.path.join(temp_dir, file))
            os.rmdir(temp_dir)

def main():
    # 创建命令行参数解析器
    parser = argparse.ArgumentParser(description='处理 Markdown 文件中的图片链接,上传到 Cloudflare R2')
    parser.add_argument('file_path', help='Markdown 文件的路径')
    
    # 解析命令行参数
    args = parser.parse_args()
    
    # 处理文件
    success = process_markdown_file(args.file_path)
    if not success:
        exit(1)

if __name__ == "__main__":
    main()

正文完
 0
zhongli
版权声明:本站原创文章,由 zhongli 于2024-11-18发表,共计4383字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)