Verified Commit f82da5c4 authored by 施乐存's avatar 施乐存
Browse files

重构爬虫代码,增加账号池管理和WBI keys初始化,支持多账号轮询抓取



Signed-off-by: 施乐存's avatarszdytom <szdytom@qq.com>
parent f7432fa0
Loading
Loading
Loading
Loading
+128 −30
Original line number Diff line number Diff line
@@ -22,21 +22,71 @@ sys.path.insert(0, str(HERE))
import biliapi
import time
import logging
import glob
from http.cookiejar import MozillaCookieJar


class Account:
    """单个账号的 Cookie 和 WBI keys 容器。"""
    def __init__(self, cookie_path: Path):
        self.cookie_path = cookie_path
        self.name = cookie_path.stem  # 文件名(不含扩展名)作为账号标识
        self.cookiejar = None
        self.wbi_keys = None
        
    def ensure_initialized(self):
        """确保 cookiejar 和 wbi_keys 已初始化。"""
        if self.cookiejar is None:
            self.cookiejar = MozillaCookieJar(str(self.cookie_path))
            try:
                self.cookiejar.load(ignore_discard=True, ignore_expires=True)
            except Exception as e:
                logging.warning(f"加载 cookie 文件 {self.cookie_path} 失败: {e}")
        
# 模块级全局 WBI keys:只获取一次并复用
WBI_KEYS = None
        if self.wbi_keys is None:
            try:
                self.wbi_keys = biliapi.WbiKeys.getWbiKeys(cookiejar=self.cookiejar)
                logging.info(f"账号 {self.name} WBI keys 初始化成功")
            except Exception as e:
                logging.error(f"账号 {self.name} 获取 WBI keys 失败: {e}")
                raise


def init_wbi_keys(cookiejar=None):
    """初始化并返回模块级的 WBI keys;若已初始化则直接返回。
class AccountPool:
    """账号池:管理多个账号并轮流使用。"""
    def __init__(self, cookie_dir: Path):
        self.cookie_dir = cookie_dir
        self.accounts = []
        self.current_index = 0
        self._load_accounts()
    
    可选参数:
        cookiejar: requests-compatible 的 cookiejar(传递给 getWbiKeys)
    """
    global WBI_KEYS
    if WBI_KEYS is None:
        WBI_KEYS = biliapi.WbiKeys.getWbiKeys(cookiejar=cookiejar)
    return WBI_KEYS
    def _load_accounts(self):
        """从 cookie 目录加载所有 .txt cookie 文件。"""
        cookie_files = sorted(self.cookie_dir.glob('*.txt'))
        if not cookie_files:
            logging.warning(f"{self.cookie_dir} 目录下未找到任何 .txt cookie 文件")
            return
        
        for cookie_file in cookie_files:
            account = Account(cookie_file)
            self.accounts.append(account)
            logging.info(f"加载账号: {account.name}")
    
    def get_next_account(self) -> Account:
        """获取下一个账号(轮询方式)。"""
        if not self.accounts:
            raise RuntimeError("账号池为空,无法获取账号")
        
        account = self.accounts[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.accounts)
        
        # 确保账号已初始化
        account.ensure_initialized()
        return account
    
    def has_accounts(self) -> bool:
        """检查是否有可用账号。"""
        return len(self.accounts) > 0


def ensure_schema(conn: sqlite3.Connection, init_sql_path: Path):
@@ -132,15 +182,28 @@ def map_video_to_row(video: Dict[str, Any], author_id: int, weekly_n: int) -> Di
    }


def crawl_first_issue(db_path: str = 'data.db', number: int = 1, cookiejar=None):
    """抓取指定期次(默认第1期)并写入数据库(在单个事务中)。"""
    # WBI 签名/网络有时会失败,使用模块级 keys(只获取一次),重试请求3次
def crawl_first_issue(db_path: str = 'data.db', number: int = 1, account: Account = None):
    """抓取指定期次(默认第1期)并写入数据库(在单个事务中)。
    
    参数:
        db_path: 数据库路径
        number: 期次编号
        account: Account 对象,包含 cookiejar 和 wbi_keys
    """
    # 确保账号已初始化
    if account:
        account.ensure_initialized()
        cookiejar = account.cookiejar
        keys = account.wbi_keys
        account_name = account.name
    else:
        cookiejar = None
        keys = biliapi.WbiKeys.getWbiKeys(cookiejar=None)
        account_name = "无账号"
    
    # WBI 签名/网络有时会失败,重试请求3次
    resp = None
    last_exc = None
    try:
        keys = init_wbi_keys(cookiejar=cookiejar)
    except Exception as e:
        raise RuntimeError(f"获取 WbiKeys 失败: {e}")

    for attempt in range(1, 4):
        try:
@@ -206,7 +269,7 @@ def crawl_first_issue(db_path: str = 'data.db', number: int = 1, cookiejar=None)
            )

        conn.commit()
        print(f"成功写入期次 {n}{len(videos)} 个视频)到数据库 {db_path}")
        logging.info(f"[账号: {account_name}] 成功写入期次 {n}{len(videos)} 个视频)到数据库 {db_path}")
    except Exception:
        conn.rollback()
        raise
@@ -214,18 +277,28 @@ def crawl_first_issue(db_path: str = 'data.db', number: int = 1, cookiejar=None)
        conn.close()


def crawl_loop(db_path: str = 'data.db', start_number: int = None, delay: int = 30, cookiejar=None, max_retries_per_issue: int = 3):
def crawl_loop(db_path: str = 'data.db', start_number: int = None, delay: int = 30, cookie_dir: Path = None, max_retries_per_issue: int = 3):
    """自动循环爬取,从数据库记录的下一期或指定 start_number 开始。

    参数:
        db_path: sqlite 数据库路径
        start_number: 可选,若指定则从该期开始;否则从数据库中已存在最大期的下一期开始(若无记录则 1)
        delay: 每次请求间隔(秒)
        cookiejar: 可选的 requests cookiejar,传递给网络请求
        cookie_dir: cookie 文件目录,若为 None 则使用默认的 cookies 目录
        max_retries_per_issue: 每一期的最大重试次数,超过则跳到下一期
    """
    logging.info("启动 crawl_loop db=%s start=%s delay=%ss", db_path, start_number, delay)
    
    # 初始化账号池
    if cookie_dir is None:
        cookie_dir = HERE.parent / 'cookies'
    
    account_pool = AccountPool(cookie_dir)
    if not account_pool.has_accounts():
        logging.warning("未找到任何账号,将使用无账号模式")
    else:
        logging.info(f"账号池初始化完成,共 {len(account_pool.accounts)} 个账号")

    # 计算起始期次
    db_file = Path(db_path)
    if start_number is None:
@@ -256,14 +329,26 @@ def crawl_loop(db_path: str = 'data.db', start_number: int = None, delay: int =
            attempt = 0
            success = False
            last_exc = None

            while attempt < max_retries_per_issue and not success:
                attempt += 1
                
                # 获取下一个账号(轮询)
                account = account_pool.get_next_account() if account_pool.has_accounts() else None
                account_name = account.name if account else "无账号"
                
                try:
                    logging.info("爬取期次 %s(尝试 %s/%s)", current, attempt, max_retries_per_issue)
                    crawl_first_issue(db_path=db_path, number=current, cookiejar=cookiejar)
                    logging.info("爬取期次 %s(尝试 %s/%s,账号: %s)", current, attempt, max_retries_per_issue, account_name)
                    crawl_first_issue(db_path=db_path, number=current, account=account)
                    success = True
                except Exception as e:
                    last_exc = e
                    # 检查是否API code为-404
                    msg = str(e)
                    if "API returned code -404" in msg or "code -404" in msg:
                        logging.warning("期次 %s API返回404,自动跳过", current)
                        success = True  # 跳过当前期,推进到下一期
                        break
                    logging.exception("爬取期次 %s 失败(尝试 %s):%s", current, attempt, e)
                    # 指数退避:在失败后稍等一段时间再重试
                    backoff = min(60, 2 ** attempt)
@@ -274,7 +359,7 @@ def crawl_loop(db_path: str = 'data.db', start_number: int = None, delay: int =
                logging.error("期次 %s 达到最大重试次数 %s,程序终止", current, max_retries_per_issue)
                raise RuntimeError(f"期次 {current} 达到最大重试次数 {max_retries_per_issue},最后错误: {last_exc}")

            # 成功后推进到下一期
            # 成功(包括404跳过)后推进到下一期
            current += 1
            logging.info("等待 %s 秒准备爬取下一期 %s", delay, current)
            time.sleep(delay)
@@ -294,12 +379,25 @@ if __name__ == '__main__':
    parser.add_argument('--delay', type=int, default=30, help='循环模式下每次请求的延迟(秒),默认 30')
    parser.add_argument('--start', type=int, help='循环模式起始期次(若未指定则从数据库最大期次+1 开始)')
    parser.add_argument('--retries', type=int, default=3, help='每期的最大重试次数,超过则跳过到下一期,默认 3')
    parser.add_argument('--cookie-dir', type=str, help='cookie 文件目录(默认为项目根下的 cookies 目录)')
    parser.add_argument('--cookie', type=str, help='单次抓取模式下指定的 cookie 文件路径')
    args = parser.parse_args()

    # 配置简单日志
    logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s')

    if args.loop:
        crawl_loop(db_path=args.db, start_number=args.start, delay=args.delay, max_retries_per_issue=args.retries)
        cookie_dir = Path(args.cookie_dir) if args.cookie_dir else None
        crawl_loop(db_path=args.db, start_number=args.start, delay=args.delay, cookie_dir=cookie_dir, max_retries_per_issue=args.retries)
    else:
        # 单次抓取模式
        account = None
        if args.cookie:
            cookie_path = Path(args.cookie)
            if cookie_path.exists():
                account = Account(cookie_path)
                logging.info(f"使用账号: {account.name}")
            else:
        crawl_first_issue(db_path=args.db, number=args.number)
                logging.error(f"Cookie 文件不存在: {args.cookie}")
                sys.exit(1)
        crawl_first_issue(db_path=args.db, number=args.number, account=account)