2025年6月7日 星期六

撰寫一個使用 FastAPI 和 Redis 的高併發爬蟲系統

每分鐘處理 10,000 筆資料,需要考慮以下幾個關鍵點:非同步處理、高併發請求管理、Redis 作為任務隊列與快取、以及爬蟲的效率和穩定性。以下是一個完整的設計和實現方案,包含程式碼範例和架構說明。


1. 系統架構設計
為了實現每分鐘處理 10,000 筆資料的高併發爬蟲系統,我們需要:
  • FastAPI:作為 Web 服務框架,處理 API 請求並提供非同步支援。
  • Redis:用於任務隊列(儲存待爬取的 URL 或任務)以及快取(儲存爬取結果,減少重複請求)。
  • 非同步爬蟲:使用 aiohttphttpx 進行非同步 HTTP 請求,提高爬取效率。
  • 任務分發與併發控制:使用 Redis 作為任務隊列,配合 asyncio 控制併發數量,避免過載。
  • 資料處理:將爬取結果存入 Redis 或其他資料庫(如 MongoDB 或 PostgreSQL),並進行後續處理。
假設每筆資料的爬取平均耗時 0.1 秒,10,000 筆資料需要 1,000 秒(約 16.67 分鐘)單執行緒處理。為了在 1 分鐘內完成,需透過併發處理,將併發數設置為合理的數值(如 100~200),並確保系統資源(CPU、記憶體、網路)能承受負載。

2. 環境準備
需要安裝的套件:
bash
pip install fastapi uvicorn aiohttp redis aioredis python-dotenv
假設 Redis 已在本機或雲端部署好,環境變數配置如下(.env 檔案):
env
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0

3. 程式碼實現
以下是一個完整的 FastAPI + Redis 爬蟲系統範例,包含任務隊列、併發爬取、以及結果儲存。
3.1 FastAPI 主程式 (main.py)
python
import asyncio
import json
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import aiohttp
import aioredis
from dotenv import load_dotenv
import os
from typing import List
from contextlib import asynccontextmanager

# 初始化 FastAPI
app = FastAPI()

# 載入環境變數
load_dotenv()
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_DB = int(os.getenv("REDIS_DB", 0))

# Redis 連線池
redis = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global redis
    redis = await aioredis.from_url(f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}")
    yield
    await redis.close()

app.router.lifespan_context = lifespan

# Pydantic 模型,定義任務結構
class CrawlTask(BaseModel):
    urls: List[str]

# 爬蟲函數
async def crawl_url(url: str, session: aiohttp.ClientSession) -> dict:
    try:
        async with session.get(url, timeout=10) as response:
            if response.status == 200:
                data = await response.text()
                return {"url": url, "status": "success", "data": data[:500]}  # 假設只取前 500 字元
            else:
                return {"url": url, "status": "failed", "error": f"Status {response.status}"}
    except Exception as e:
        return {"url": url, "status": "failed", "error": str(e)}

# 批量爬取任務
async def process_crawl_tasks(urls: List[str]):
    async with aiohttp.ClientSession() as session:
        tasks = [crawl_url(url, session) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 將結果存入 Redis
        for result in results:
            if isinstance(result, dict):
                await redis.set(f"crawl_result:{result['url']}", json.dumps(result))

# API 端點:接收爬蟲任務
@app.post("/crawl")
async def start_crawl(task: CrawlTask, background_tasks: BackgroundTasks):
    # 將 URL 加入 Redis 任務隊列
    for url in task.urls:
        await redis.lpush("crawl_queue", url)
    
    # 在背景執行爬蟲任務
    background_tasks.add_task(process_queue)
    return {"message": f"Added {len(task.urls)} URLs to crawl queue"}

# 背景任務:從 Redis 隊列中取出 URL 並爬取
async def process_queue():
    batch_size = 100  # 每次處理 100 個 URL
    while await redis.llen("crawl_queue") > 0:
        # 從隊列中取出 batch_size 個 URL
        urls = []
        for _ in range(min(batch_size, await redis.llen("crawl_queue"))):
            url = await redis.lpop("crawl_queue")
            if url:
                urls.append(url.decode("utf-8"))
        
        # 執行爬蟲
        await process_crawl_tasks(urls)
        
        # 避免過快請求,稍微延遲
        await asyncio.sleep(0.1)

# API 端點:查詢爬蟲結果
@app.get("/result/{url:path}")
async def get_result(url: str):
    result = await redis.get(f"crawl_result:{url}")
    if result:
        return json.loads(result)
    return {"message": "Result not found"}
3.2 執行方式
啟動 FastAPI 服務:
bash
uvicorn main:app --host 0.0.0.0 --port 8000
3.3 使用範例
  1. 提交爬蟲任務: 使用 curl 或 Postman 發送 POST 請求:
    bash
    curl -X POST "http://localhost:8000/crawl" -H "Content-Type: application/json" -d '{"urls": ["https://example.com", "https://example.org"]}'
    這會將 URL 加入 Redis 任務隊列,並在背景執行爬蟲。
  2. 查詢結果: 使用 GET 請求查詢爬取結果:
    bash
    curl "http://localhost:8000/result/https://example.com"

4. 性能優化與併發控制
為了實現每分鐘 10,000 筆資料的目標,需進行以下優化:
4.1 併發數控制
  • 限制併發數aiohttp.ClientSession 的併發數透過 batch_size 控制,建議設為 100~200,具體數值需根據伺服器性能和目標網站的限制調整。
  • Semaphore:若需更精細控制併發,可使用 asyncio.Semaphore
    python
    semaphore = asyncio.Semaphore(100)  # 限制最大併發數為 100
    async def crawl_url(url: str, session: aiohttp.ClientSession) -> dict:
        async with semaphore:
            # 原有爬蟲邏輯
4.2 Redis 優化
  • 隊列管理:使用 Redis 的 LPUSHLPOP 實現高效任務隊列。
  • 快取:將已爬取的 URL 結果存入 Redis,避免重複爬取(設置 TTL 快取過期時間):
    python
    await redis.setex(f"crawl_result:{url}", 3600, json.dumps(result))  # 快取 1 小時
  • 連接池:使用 aioredis 的連接池,減少連線開銷。
4.3 爬蟲效率
  • 非同步 HTTP 請求:使用 aiohttphttpx 進行非同步請求,減少 I/O 等待時間。
  • 重試機制:為失敗的請求添加重試邏輯:
    python
    from tenacity import retry, stop_after_attempt, wait_fixed
    @retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
    async def crawl_url(url: str, session: aiohttp.ClientSession) -> dict:
        # 原有爬蟲邏輯
  • 限速:為避免被目標網站封鎖,添加延遲或使用代理池。
4.4 資源考量
  • 伺服器配置:假設每筆請求耗時 0.1 秒,100 併發下每秒可處理 100 筆資料,10,000 筆需約 100 秒。建議使用多核心伺服器(至少 8 核 CPU,16GB 記憶體)並部署多個 FastAPI 實例。
  • 負載均衡:使用 Nginx 或 Kubernetes 部署多個 FastAPI 實例,均分任務。
  • Redis 效能:確保 Redis 部署在高性能伺服器上,啟用持久化以防止資料遺失。

5. 監控與錯誤處理
  • 日誌:使用 logging 模組記錄爬蟲狀態和錯誤:
    python
    import logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
  • 監控:使用 Prometheus + Grafana 監控 FastAPI 和 Redis 的性能(如請求速率、隊列長度)。
  • 錯誤處理:捕獲異常並記錄到 Redis 或日誌檔案:
    python
    if isinstance(result, Exception):
        logger.error(f"Error crawling {url}: {str(result)}")

6. 擴展建議
  • 分散式爬蟲:若單機無法滿足需求,可使用 Celery 或 Dask 分散任務到多台機器。
  • 資料儲存:將爬取結果存入 MongoDB 或 PostgreSQL,支援更複雜的查詢。
  • 代理與反爬蟲:整合代理池(如 scrapy-proxies)和隨機 User-Agent,應對反爬蟲機制。
  • 任務優先級:在 Redis 隊列中使用優先級(如 ZADD)管理高優先級任務。

7. 性能估算
  • 吞吐量:假設併發數 100,每筆請求 0.1 秒,則每秒處理 100 筆,1 分鐘可處理 6,000 筆。若需 10,000 筆,可提高併發數(例如 200)或部署多實例。
  • 資源需求
    • CPU:8 核以上,支援多執行緒。
    • 記憶體:16GB+,Redis 和 FastAPI 各需一定記憶體。
    • 網路:穩定的高頻寬連線,確保 HTTP 請求順暢。

8. 測試與部署
  • 壓力測試:使用 locust 模擬大量請求,測試系統吞吐量。
  • 部署:使用 Docker 容器化 FastAPI 和 Redis,方便擴展:
    dockerfile
    FROM python:3.9
    WORKDIR /app
    COPY . .
    RUN pip install -r requirements.txt
    CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

9. 注意事項
  • 目標網站限制:檢查目標網站的 robots.txt 和使用條款,避免違規。
  • Redis 持久化:啟用 AOF 或 RDB,防止資料遺失。
  • 安全:為 API 添加認證(如 JWT),防止未授權訪問。

使用 Nginx 部署多個 FastAPI 實例可以實現負載均衡和高可用性,特別適合高併發場景如每分鐘處理 10,000 筆資料的爬蟲系統。以下是詳細的部署步驟,包含 Nginx 配置、FastAPI 實例運行、以及容器化部署的建議。


1. 整體架構
  • Nginx:作為反向代理和負載均衡器,將客戶端請求分發到多個 FastAPI 實例。
  • FastAPI 實例:每個實例運行在獨立的進程或容器中,監聽不同端口(如 8000、8001、8002)。
  • Redis:作為任務隊列和快取,確保多個 FastAPI 實例共享任務和結果。
  • 部署方式:使用 Docker 容器化 FastAPI 和 Nginx,方便擴展和管理。

2. 環境準備
  • 伺服器:一台或多台伺服器,建議至少 8 核 CPU、16GB 記憶體。
  • 安裝軟體
    • Docker 和 Docker Compose(容器化部署)。
    • Nginx(可直接安裝或使用 Docker 映像)。
    • Python 環境(用於 FastAPI)。
  • Redis:假設已部署並可透過環境變數連線。

3. 部署步驟
3.1 準備 FastAPI 應用
假設你已經有一個 FastAPI 應用(如前述爬蟲系統的 main.py),並確認它能正常運行。為了部署多個實例,每個實例需要監聽不同的端口。
修改 main.py(如果需要),確保端口可透過環境變數配置:
python
import os
import uvicorn
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
async def root():
    return {"message": "FastAPI instance running"}

if __name__ == "__main__":
    port = int(os.getenv("PORT", 8000))  # 從環境變數讀取端口,預設 8000
    uvicorn.run(app, host="0.0.0.0", port=port)
3.2 建立 Docker 映像
為 FastAPI 應用創建 Dockerfile
dockerfile
FROM python:3.9
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
requirements.txt 示例:
fastapi==0.115.0
uvicorn==0.30.6
aiohttp==3.10.5
aioredis==2.0.1
python-dotenv==1.0.1
3.3 使用 Docker Compose 運行多個 FastAPI 實例
創建 docker-compose.yml,定義多個 FastAPI 實例和 Redis:
yaml
version: "3.9"
services:
  fastapi1:
    build: .
    environment:
      - PORT=8000
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - REDIS_DB=0
    ports:
      - "8000:8000"
    depends_on:
      - redis

  fastapi2:
    build: .
    environment:
      - PORT=8001
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - REDIS_DB=0
    ports:
      - "8001:8001"
    depends_on:
      - redis

  fastapi3:
    build: .
    environment:
      - PORT=8002
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - REDIS_DB=0
    ports:
      - "8002:8002"
    depends_on:
      - redis

  redis:
    image: redis:7.0
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data

  nginx:
    image: nginx:latest
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
    depends_on:
      - fastapi1
      - fastapi2
      - fastapi3

volumes:
  redis-data:
3.4 配置 Nginx 負載均衡
創建 nginx.conf 檔案,配置反向代理和負載均衡:
nginx
events {}

http {
    upstream fastapi_backend {
        # 定義 FastAPI 實例的負載均衡池
        server fastapi1:8000;
        server fastapi2:8001;
        server fastapi3:8002;
    }

    server {
        listen 80;

        location / {
            # 將請求分發到 FastAPI 實例
            proxy_pass http://fastapi_backend;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
        }
    }
}
  • upstream:定義 FastAPI 實例的地址和端口。
  • proxy_pass:將請求分發到 fastapi_backend,Nginx 會使用輪詢(round-robin)方式分配請求。
  • 負載均衡策略:預設為輪詢,也可改為 least_conn(最少連線)或 ip_hash(基於客戶端 IP)。
3.5 啟動服務
在包含 docker-compose.ymlnginx.conf 的目錄下運行:
bash
docker-compose up -d
這會啟動:
  • 3 個 FastAPI 實例(分別監聽 8000、8001、8002 端口)。
  • 1 個 Redis 實例(監聽 6379 端口)。
  • 1 個 Nginx 實例(監聽 80 端口,作為反向代理)。

4. 驗證部署
  1. 訪問 API: 透過瀏覽器或 curl 訪問 http://<伺服器IP>,Nginx 會將請求分發到某個 FastAPI 實例:
    bash
    curl http://localhost
    應返回 {"message": "FastAPI instance running"}
  2. 提交爬蟲任務: 使用 POST 請求提交任務,確認任務被分發並存入 Redis:
    bash
    curl -X POST "http://localhost/crawl" -H "Content-Type: application/json" -d '{"urls": ["https://example.com", "https://example.org"]}'
  3. 檢查 Redis 隊列: 使用 Redis CLI 確認任務是否正確加入:
    bash
    docker exec -it <redis-container-name> redis-cli
    > LLEN crawl_queue

5. 性能優化
  • 調整 FastAPI 實例數:根據伺服器資源,增加或減少 docker-compose.yml 中的 FastAPI 服務數量。例如,8 核 CPU 可運行 4~8 個實例。
  • Nginx 負載均衡策略
    • 使用 least_conn 減少高負載實例的壓力:
      nginx
      upstream fastapi_backend {
          least_conn;
          server fastapi1:8000;
          server fastapi2:8001;
          server fastapi3:8002;
      }
    • 添加權重(weight)以分配不同性能的伺服器:
      nginx
      server fastapi1:8000 weight=2;  # 分配更多請求
      server fastapi2:8001 weight=1;
  • Gunicorn + Uvicorn:為每個 FastAPI 實例使用 Gunicorn 管理多個 Uvicorn 工作進程,提高單實例性能:
    dockerfile
    CMD ["gunicorn", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "main:app", "--bind", "0.0.0.0:8000"]
    (需在 requirements.txt 中添加 gunicorn
  • Nginx 緩衝與快取
    • 啟用緩衝減少後端負載:
      nginx
      proxy_buffering on;
      proxy_buffer_size 128k;
      proxy_buffers 4 256k;
    • 啟用快取靜態響應(若適用):
      nginx
      location / {
          proxy_cache my_cache;
          proxy_cache_valid 200 1h;
          proxy_pass http://fastapi_backend;
      }

6. 監控與擴展
  • 監控
    • 使用 Prometheus 和 Grafana 監控 Nginx 和 FastAPI 的請求速率、延遲和錯誤率。
    • 檢查 Docker 容器的 CPU 和記憶體使用情況:
      bash
      docker stats
  • 擴展
    • 水平擴展:在 docker-compose.yml 中添加更多 FastAPI 服務(例如 fastapi4fastapi5),並更新 nginx.confupstream
    • 跨伺服器部署:若使用多台伺服器,可將 Nginx 和 FastAPI 分開部署,使用 DNS 或負載均衡器(如 AWS ELB)分發流量。
  • 高可用性
    • 啟用 Nginx 的健康檢查:
      nginx
      upstream fastapi_backend {
          server fastapi1:8000 max_fails=3 fail_timeout=30s;
          server fastapi2:8001 max_fails=3 fail_timeout=30s;
          server fastapi3:8002 max_fails=3 fail_timeout=30s;
      }
    • 確保 Redis 使用主從複製或集群模式,防止單點故障。

7. 注意事項
  • 端口衝突:確保每個 FastAPI 實例的端口不重複,且與 Nginx 的配置一致。
  • Redis 連線:多個 FastAPI 實例共享同一個 Redis,需確保 Redis 性能足夠(考慮啟用持久化或集群)。
  • 安全
    • 為 Nginx 添加 SSL/TLS(使用 Let’s Encrypt 或自簽證書)。
    • 為 FastAPI API 添加認證(如 JWT)。
  • 日誌:配置 Nginx 訪問和錯誤日誌,存儲到檔案或集中式日誌系統:
    nginx
    access_log /var/log/nginx/access.log;
    error_log /var/log/nginx/error.log;

8. 性能估算
  • 吞吐量:假設每個 FastAPI 實例每秒處理 50 筆請求(併發數 50,平均 0.1 秒/請求),3 個實例可達每秒 150 筆,1 分鐘約 9,000 筆。若需 10,000 筆,可增加實例數或優化爬蟲效率。
  • 資源需求
    • 每個 FastAPI 實例(使用 Gunicorn + 4 個 Uvicorn 進程):約 1GB 記憶體、1~2 核 CPU。
    • Nginx:約 512MB 記憶體、1 核 CPU。
    • Redis:視資料量而定,建議 2GB+ 記憶體。

9. 測試
  • 壓力測試:使用 locust 模擬高併發請求:
    python
    from locust import HttpUser, task, between
    
    class CrawlUser(HttpUser):
        wait_time = between(1, 2)
    
        @task
        def submit_crawl(self):
            self.client.post("/crawl", json={"urls": ["https://example.com"]})
    運行:
    bash
    locust -f locustfile.py --host=http://localhost
  • 驗證負載均衡:檢查 Nginx 日誌,確認請求是否均勻分發到各 FastAPI 實例:
    bash
    docker logs <nginx-container-name>

這套方案透過 Nginx 和 Docker Compose 實現了多個 FastAPI 實例的部署,適合高併發爬蟲系統。

沒有留言:

張貼留言

網誌存檔