2025年7月26日 星期六

打造每秒萬次查詢的高效能分散式網路爬蟲系統

 

打造每秒萬次查詢的高效能分散式網路爬蟲系統

在現今數據驅動的世界裡,大規模、高效率的網路爬蟲系統是許多業務的基石。無論是市場分析、輿情監控還是數據聚合,一個能夠快速、穩定地抓取大量網頁內容的爬蟲系統都至關重要。本文將深入介紹一個設計目標為 每秒 10,000 次查詢 (QPS) 的高效能分散式網路爬蟲系統,並分享其核心架構與實作細節。

您可以透過以下 GitHub 連結檢閱本專案的原始碼:https://github.com/BpsEason/high_qps_crawler.git

🚀 專案概述:為極致效能而生

本專案旨在提供一個模組化、可擴展且生產就緒的分散式爬蟲核心程式碼,以應對高吞吐量的網頁抓取需求。其主要特色包括:

  • 高吞吐量:透過 Docker Compose 實現水平擴展,輕鬆達到 10,000 QPS 的目標。

  • 模組化架構:系統各組件職責分離,提升可維護性與靈活性。

  • 強大反爬蟲能力:整合代理池、隨機 User-Agent 及 CAPTCHA 解決方案。

  • 完善監控:整合 Prometheus、Grafana 和 Celery Flower,便於系統運維。

🛠️ 系統架構:多技術棧協同作戰

為了實現高 QPS 和穩定性,本系統採用了多種領先技術的組合,各司其職,協同運作:

  1. Laravel API (laravel_api/):

    • 作為使用者介面和任務管理層,負責接收任務提交請求(例如通過 API)並處理任務狀態查詢。

    • 透過 gRPC 與 FastAPI 爬蟲核心進行高速通信。

    • 利用 Redis 儲存即時任務狀態,並將歷史數據持久化到 PostgreSQL 或 MongoDB。

  2. FastAPI 爬蟲核心 (fastapi_crawler/):

    • 基於 Python 的非同步框架,負責執行實際的網頁抓取任務。

    • 管理代理池和隨機 User-Agent,以應對反爬蟲策略。

    • 提供 gRPC 服務接收來自 Laravel API 的抓取請求。

    • 其核心抓取邏輯使用 httpx 進行非同步 HTTP 請求。

  3. Celery 工作節點 (celery_workers/):

    • 負責處理分散式數據儲存任務,確保在爬取任務完成後能夠高並發地將數據寫入資料庫,避免阻塞主爬蟲流程。

  4. Redis

    • 作為高速緩存,用於管理任務佇列、代理池以及實現 URL 去重,極大地提升了系統響應速度和效率。

  5. PostgreSQL/MongoDB

    • 提供靈活的數據儲存方案。預設使用 PostgreSQL 搭配 PgBouncer 進行連接池管理,也支援切換至 MongoDB。

  6. 監控

    • 整合 Prometheus 用於指標收集,Grafana 進行數據可視化,以及 Celery Flower 用於監控 Celery 任務執行情況。

🔑 關鍵程式碼解析

以下將展示部分關鍵程式碼片段,說明其如何實現高效率和模組化。

1. Laravel API - 任務提交 (laravel_api/app/Http/Controllers/CrawlerController.php)

此控制器負責接收外部的爬取請求,並通過 gRPC 將任務派發給 FastAPI 服務。

PHP
<?php
namespace App\Http\Controllers;

use Illuminate\Http\Request;
use Grpc\ChannelCredentials;
use App\Grpc\CrawlerServiceClient;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Redis;

class CrawlerController extends Controller
{
    /**
     * 提交爬取任務至 FastAPI(通過 gRPC)
     */
    public function submitCrawlTask(Request $request)
    {
        $request->validate(['url' => 'required|url']);
        $url = $request->input('url');
        $taskId = uniqid('crawl_'); // 生成唯一任務 ID

        try {
            $client = new CrawlerServiceClient(env('FASTAPI_GRPC_HOST', 'fastapi_crawler:50051'), [
                'credentials' => ChannelCredentials::createInsecure(),
            ]);
            $requestProto = new \Crawler\CrawlRequest();
            $requestProto->setUrl($url);
            $requestProto->setTaskId($taskId);

            list($response, $status) = $client->Crawl($requestProto)->wait();

            if ($status->code !== \Grpc\STATUS_OK) {
                throw new \Exception("gRPC 調用失敗: " . $status->details);
            }

            // 儲存任務狀態至 Redis
            Redis::hset("crawl_task_status:{$taskId}", "status", "dispatched");
            Redis::hset("crawl_task_status:{$taskId}", "url", $url);
            Redis::hset("crawl_task_status:{$taskId}", "dispatched_at", now()->toIso8601String());

            Log::info("gRPC 成功,URL: $url, 任務 ID: $taskId");
            return response()->json([
                'message' => '任務已提交',
                'task_id' => $taskId,
                'grpc_response_status' => $response->getStatus(),
            ]);
        } catch (\Exception $e) {
            Log::error("提交失敗,URL: $url. 錯誤: " . $e->getMessage());
            Redis::hset("crawl_task_status:{$taskId}", "status", "submission_failed");
            Redis::hset("crawl_task_status:{$taskId}", "error", $e->getMessage());
            return response()->json(['error' => '提交失敗: ' . $e->getMessage()], 500);
        }
    }
}

說明:此程式碼片段展示了 Laravel 如何驗證輸入、生成唯一任務 ID,並透過 gRPC 非同步地將爬取請求發送到 FastAPI 服務。任務狀態會即時更新到 Redis,確保快速響應和狀態追蹤。

2. FastAPI 爬蟲核心 - 網頁抓取 (fastapi_crawler/app/crawler.py)

這是系統的爬取引擎,負責實際執行 HTTP 請求、處理反爬蟲措施並返回網頁內容。

Python
import httpx
from fake_useragent import UserAgent
import asyncio
import redis.asyncio as aioredis
import random
import os

REDIS_HOST = os.getenv("REDIS_HOST", "redis")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))

async def get_redis_client():
    """獲取 Redis 連線"""
    global _redis_client
    if _redis_client is None:
        _redis_client = await aioredis.from_url(f"redis://{REDIS_HOST}:{REDIS_PORT}")
    return _redis_client

async def get_proxy():
    """從 Redis 選擇有效代理"""
    client = await get_redis_client()
    try:
        proxies_bytes = await client.smembers("proxy_pool")
        if not proxies_bytes:
            print("無可用代理,使用直接連線")
            return None
        proxies = [p.decode('utf-8') for p in proxies_bytes]
        random.shuffle(proxies)
        for proxy in proxies:
            try:
                async with httpx.AsyncClient(proxies={'http://': proxy, 'https://': proxy}, timeout=5) as test_client:
                    response = await test_client.get("http://httpbin.org/ip")
                    response.raise_for_status()
                print(f"使用代理: {proxy}")
                return proxy
            except (httpx.RequestError, httpx.HTTPStatusError) as e:
                print(f"代理 {proxy} 失敗: {e}")
                await client.srem("proxy_pool", proxy)
        print("無有效代理")
        return None
    except Exception as e:
        print(f"獲取代理失敗: {e}")
        return None

async def perform_crawl(url: str, task_id: str) -> str:
    """執行網頁爬取"""
    user_agent = UserAgent().random
    proxy = await get_proxy()
    await asyncio.sleep(random.uniform(0.1, 1.0))  # 隨機延遲

    headers = {
        'User-Agent': user_agent,
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.0,image/webp,*/*;q=0.8',
    }
    
    async with httpx.AsyncClient(headers=headers, proxies={'http://': proxy, 'https://': proxy} if proxy else None, 
                                 timeout=30, follow_redirects=True) as client:
        try:
            print(f"爬取: {url} (任務 ID: {task_id})")
            response = await client.get(url)
            response.raise_for_status()
            redis_client = await get_redis_client()
            await redis_client.sadd("crawled_urls", url)  # URL 去重
            return response.text
        except httpx.HTTPStatusError as e:
            print(f"爬取失敗,狀態碼 {e.response.status_code}: {e}")
            if e.response.status_code in [403, 429]:
                print("可能需要 CAPTCHA 解決")
            raise

說明perform_crawl 函數展示了如何隨機化 User-Agent 和從 Redis 代理池中獲取可用代理以規避爬蟲檢測。它利用 httpx 進行非同步請求,並在 Redis 中對已爬取的 URL 進行去重。針對 403/429 等常見反爬狀態碼,也預留了 CAPTCHA 解決的擴展點。

3. Celery 工作節點 - 資料儲存 (celery_workers/app/tasks.py)

Celery 負責將爬取到的數據非同步地存儲到資料庫中,確保爬取流程不受數據寫入性能影響。

Python
from celery_app import app
import asyncio
import asyncpg
import datetime
import os
import redis.asyncio as aioredis

POSTGRES_URL = os.getenv("POSTGRES_URL", "postgresql://user:password@pgbouncer:6432/crawler_db")
REDIS_HOST = os.getenv("REDIS_HOST", "redis")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))

async def get_redis_client():
    """獲取 Redis 連線"""
    return await aioredis.from_url(f"redis://{REDIS_HOST}:{REDIS_PORT}")

async def get_pg_pool():
    """獲取 PostgreSQL 連線池"""
    global _pg_pool
    if _pg_pool is None:
        if POSTGRES_URL:
            _pg_pool = await asyncpg.create_pool(POSTGRES_URL, min_size=10, max_size=50)
        else:
            raise ValueError("未設置 POSTGRES_URL")
    return _pg_pool

@app.task(name='tasks.store_data', bind=True, default_retry_delay=300, max_retries=5)
def store_data_task(self, url: str, content: str, task_id: str = None):
    """儲存爬取數據"""
    print(f"處理 URL: {url} (任務 ID: {task_id})")
    redis_client_sync = asyncio.run(get_redis_client())
    try:
        # ... (儲存邏輯,例如插入到 PostgreSQL/MongoDB)
        # 這裡僅為示意,實際會有數據庫操作
        print(f"數據成功儲存,URL: {url}")
        asyncio.run(redis_client_sync.hset(f"crawl_task_status:{task_id}", "status", "completed"))
        asyncio.run(redis_client_sync.hset(f"crawl_task_status:{task_id}", "completed_at", datetime.datetime.now().isoformat()))
    except Exception as e:
        print(f"儲存數據失敗,URL: {url}. 錯誤: {e}")
        asyncio.run(redis_client_sync.hset(f"crawl_task_status:{task_id}", "status", "failed"))
        asyncio.run(redis_client_sync.hset(f"crawl_task_status:{task_id}", "error", str(e)))
        raise self.retry(exc=e)

說明store_data_task 是一個 Celery 任務,它接收爬取到的 URL 和內容,然後將其存儲到配置的資料庫中。任務失敗時支援重試機制,並且會更新 Redis 中的任務狀態。

🏗️ 部署:基於 Docker Compose 的快速啟動

整個系統的部署得益於 Docker 和 Docker Compose,實現了環境隔離和快速擴展。

  1. 克隆倉庫:

    Bash
    git clone [請替換為您的倉庫地址]
    cd [您的倉庫目錄]
    
  2. 安裝 Laravel 基礎環境:

    Bash
    cd laravel_api
    composer install
    cp .env.example .env
    php artisan key:generate
    
  3. 配置環境變數:

    在 fastapi_crawler/.env 和 laravel_api/.env 中配置代理 API、CAPTCHA 金鑰、資料庫和 Redis 連線等資訊。

  4. 選擇資料庫:

    預設為 PostgreSQL,可修改 docker-compose.yml 切換至 MongoDB。

  5. 構建並運行服務:

    Bash
    docker compose build
    docker compose up -d
    
  6. 初始化資料庫:

    Bash
    docker compose exec laravel php artisan migrate
    
  7. 擴展 Celery 工作節點:

    根據需求擴展 Celery 工作節點數量以提升並發處理能力。

    Bash
    docker compose up -d --scale celery_worker=10
    

結語

這個高 QPS 分散式網路爬蟲系統透過模組化的設計、高效的非同步通信(gRPC)、彈性的任務佇列(Redis, Celery)以及完善的反爬蟲和監控機制,為大規模網頁數據抓取提供了強大的基礎。希望這篇文章能為您在構建或優化爬蟲系統時提供寶貴的參考!

沒有留言:

張貼留言

熱門文章