Verified Commit 56c788b9 authored by 施乐存's avatar 施乐存
Browse files

下载视频封面并进行OCR



Signed-off-by: 施乐存's avatarszdytom <szdytom@qq.com>
parent 6df0ae92
Loading
Loading
Loading
Loading

.env.example

0 → 100644
+5 −0
Original line number Diff line number Diff line
LLM_API_KEY=your-api-key
LLM_BASE_URL=https://api.openai.com/v1

LLM_COVER_OCR_MODEL=Qwen/Qwen3-VL-8B-Instruct
#LLM_COVER_OCR_MODEL2=Qwen/Qwen2-VL-72B-Instruct
+2 −0
Original line number Diff line number Diff line
@@ -3,3 +3,5 @@ venv
cookie.txt
cookies/
__pycache__
cover/
.env

prompts/cover-ocr.txt

0 → 100644
+46 −0
Original line number Diff line number Diff line
请作为B站视频封面分析专家,专门提取封面上的**设计性文字**(即制作者故意添加的文字元素),忽略图像中非设计元素的文字。

## 分析要求:
1. **只提取设计文字**:即制作者为了传达信息而特意添加到封面上的文字,如标题、副标题、标签、口号等
2. **忽略非设计文字**:
   - 图像中自然出现的文字(如广告牌、书本封面、店铺招牌)
   - UI元素文字(如视频平台的水印、播放按钮文字)
   - 游戏界面文字(如分数、生命值、对话框)
3. **判断依据**:
   - 文字是否在图像焦点位置
   - 文字是否有明显的排版设计
   - 文字是否与封面整体设计风格一致

## 文字属性分析:
对于每个提取的文字块,请分析:
1. **内容**:准确的文字内容(保留原始标点符号)
2. **层级**:
   - "primary":主要文字,通常是最大、最显眼、位于视觉中心的文字
   - "secondary":次要文字,较小,起补充说明作用
3. **颜色特征**:
   - 主色调(可有多个)(red/orange/yellow/chartreuse/green/cyan/blue/violet/purple/magenta/pink/black/white)
   - 背景色调(dark/bright)
4. **艺术化处理**:
   - "plain":普通字体,无特殊效果
   - "styled":有艺术设计,如特殊字体、变形、装饰、描边
   - "heavily_styled":重度艺术化,如立体字、特效字
5. **位置占比**(估计文字区域占整个封面的比例,0-100%)

## 输出格式:
请以严格的JSON格式输出,示例如下:

[
  {
    "text": "文字内容",
    "hierarchy": "primary",
    "color": {
      "main_type": ["red", "white"],
      "bg_type": "dark",

    },
    "stylization": "plain",
    "size_percentage": 20,
  }
]

未识别到文字返回空列表即可。
+5 −0
Original line number Diff line number Diff line
@@ -2,3 +2,8 @@ requests==2.32.5
toml==0.10.2
tqdm==4.67.1
matplotlib==3.10.7
aiohttp==3.10.10
aiolimiter==1.1.0
openai==2.11.0
json-repair==0.54.2
python-dotenv==1.0.0

src/cover_ocr.py

0 → 100644
+223 −0
Original line number Diff line number Diff line
import argparse
import base64
import json
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from threading import Semaphore
from typing import List

from dotenv import load_dotenv
from json_repair import repair_json
from openai import OpenAI
from tqdm import tqdm


# Load environment variables
load_dotenv()

API_KEY = os.getenv("LLM_API_KEY")
BASE_URL = os.getenv("LLM_BASE_URL")
MODEL = os.getenv("LLM_COVER_OCR_MODEL")
MODEL2 = os.getenv("LLM_COVER_OCR_MODEL2")  # Optional second model

COVER_DIR = Path(__file__).resolve().parent.parent / "cover"
PROMPT_FILE = Path(__file__).resolve().parent.parent / "prompts" / "cover-ocr.txt"

# Rate limiting: requests per second
RATE_LIMIT = 10
RATE_LIMIT_SEMAPHORE = Semaphore(RATE_LIMIT)
REQUEST_INTERVAL = 1 / RATE_LIMIT


def load_system_prompt() -> str:
    """Load system prompt from prompts/cover-ocr.txt"""
    if not PROMPT_FILE.exists():
        raise FileNotFoundError(f"System prompt file not found: {PROMPT_FILE}")
    return PROMPT_FILE.read_text(encoding="utf-8")


def encode_image_base64(image_path: Path) -> str:
    """Encode image to base64 string"""
    with open(image_path, "rb") as f:
        return base64.b64encode(f.read()).decode("utf-8")


def get_pending_images(cover_dir: Path, limit: int = None) -> List[int]:
    """Get list of image IDs that need OCR (numeric order), skipping cached JSON."""
    if not cover_dir.exists():
        return []

    # Collect all numeric IDs first
    all_ids = []
    for jpg_file in cover_dir.glob("*.jpg"):
        if jpg_file.stat().st_size == 0:
            continue
        try:
            vid = int(jpg_file.stem)
        except ValueError:
            continue
        all_ids.append(vid)

    # Iterate in numeric order, apply cache skip and optional limit
    pending: List[int] = []
    for vid in sorted(all_ids):
        json_file = cover_dir / f"{vid}.json"
        if json_file.exists() and json_file.stat().st_size > 0:
            continue  # cached
        pending.append(vid)
        if limit and len(pending) >= limit:
            break

    return pending


def ocr_one_image(
    vid: int,
    cover_dir: Path,
    system_prompt: str,
    model: str,
) -> bool:
    img_path = cover_dir / f"{vid}.jpg"
    json_path = cover_dir / f"{vid}.json"
    if not img_path.exists():
        #print(f"[DEBUG] {vid}: image file not found")
        return False
    try:
        #print(f"[DEBUG] {vid}: start processing")
        img_b64 = encode_image_base64(img_path)
        with RATE_LIMIT_SEMAPHORE:
            start_time = time.time()
            # 每线程新建client,避免线程不安全
            client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": [
                        {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}}
                    ]},
                ],
                max_tokens=2000,
                temperature=0.6,
            )
            elapsed = time.time() - start_time
            if elapsed < REQUEST_INTERVAL:
                time.sleep(REQUEST_INTERVAL - elapsed)
        raw_content = response.choices[0].message.content if response.choices else ""
        if not raw_content:
            output = {"id": vid, "result": []}
            json_path.write_text(json.dumps(output, ensure_ascii=False, indent=2), encoding="utf-8")
            #print(f"[DEBUG] {vid}: empty result saved")
            return True
        try:
            repaired = repair_json(raw_content)
            parsed = json.loads(repaired)
        except Exception as e:
            print(f"[DEBUG] {vid}: JSON repair failed: {e}")
            output = {"id": vid, "result": raw_content}
            json_path.write_text(json.dumps(output, ensure_ascii=False, indent=2), encoding="utf-8")
            return False
        output = {"id": vid, "result": parsed}
        json_path.write_text(json.dumps(output, ensure_ascii=False, indent=2), encoding="utf-8")
        #print(f"[DEBUG] {vid}: result saved")
        return True
    except Exception as e:
        print(f"[DEBUG] {vid}: Exception: {e}")
        return False


async def main() -> None:
    # Validate environment variables
    if not API_KEY or not BASE_URL or not MODEL:
        print("Error: Missing environment variables. Please check .env file.")
        print("Required: LLM_API_KEY, LLM_BASE_URL, LLM_COVER_OCR_MODEL")
        return
    
    # Load system prompt
    try:
        system_prompt = load_system_prompt()
    except FileNotFoundError as e:
        print(f"Error: {e}")
        return
    
    # Get pending images
    pending_ids = get_pending_images(COVER_DIR)
def main() -> None:
    # Parse arguments
    parser = argparse.ArgumentParser(description="OCR processing for Bilibili video covers")
    parser.add_argument("--debug", action="store_true", help="Debug mode: only process first 20 images")
    args = parser.parse_args()
    
    # Validate environment variables
    if not API_KEY or not BASE_URL or not MODEL:
        print("Error: Missing environment variables. Please check .env file.")
        print("Required: LLM_API_KEY, LLM_BASE_URL, LLM_COVER_OCR_MODEL")
        return
    
    # Load system prompt
    try:
        system_prompt = load_system_prompt()
    except FileNotFoundError as e:
        print(f"Error: {e}")
        return
    
    # Get pending images
    limit = 20 if args.debug else None
    pending_ids = get_pending_images(COVER_DIR, limit=limit)
    
    if not pending_ids:
        print("No images need OCR processing.")
        return
    
    print(f"Total images to process: {len(pending_ids)}")
    if args.debug:
        print("DEBUG MODE: Processing only first 20 images")
    
    # Prepare models list
    models = [MODEL]
    if MODEL2:
        models.append(MODEL2)
        print(f"Using models (alternating): {MODEL}, {MODEL2}")
    else:
        print(f"Using model: {MODEL}")
    
    print(f"API endpoint: {BASE_URL}")
    print(f"Rate limit: {RATE_LIMIT} requests/second")
    
    # Process with thread pool
    max_workers = 16
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks, alternate models if MODEL2 is configured
        futures = {
            executor.submit(
                ocr_one_image,
                vid,
                COVER_DIR,
                system_prompt,
                models[i % len(models)]
            ): vid
            for i, vid in enumerate(pending_ids)
        }
        
        # Track progress
        progress = tqdm(total=len(pending_ids), desc="OCR processing", unit="img")
        
        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                vid = futures[future]
                print(f"\nUnexpected error for image {vid}: {e}")
            finally:
                progress.update(1)
        
        progress.close()
    
    print("OCR processing completed.")


if __name__ == "__main__":
    main()
 No newline at end of file
Loading