每分鐘處理 10,000 筆資料,需要考慮以下幾個關鍵點:非同步處理、高併發請求管理、Redis 作為任務隊列與快取、以及爬蟲的效率和穩定性。以下是一個完整的設計和實現方案,包含程式碼範例和架構說明。
1. 系統架構設計
為了實現每分鐘處理 10,000 筆資料的高併發爬蟲系統,我們需要:
- FastAPI:作為 Web 服務框架,處理 API 請求並提供非同步支援。
- Redis:用於任務隊列(儲存待爬取的 URL 或任務)以及快取(儲存爬取結果,減少重複請求)。
- 非同步爬蟲:使用 aiohttp 或 httpx 進行非同步 HTTP 請求,提高爬取效率。
- 任務分發與併發控制:使用 Redis 作為任務隊列,配合 asyncio 控制併發數量,避免過載。
- 資料處理:將爬取結果存入 Redis 或其他資料庫(如 MongoDB 或 PostgreSQL),並進行後續處理。
假設每筆資料的爬取平均耗時 0.1 秒,10,000 筆資料需要 1,000 秒(約 16.67 分鐘)單執行緒處理。為了在 1 分鐘內完成,需透過併發處理,將併發數設置為合理的數值(如 100~200),並確保系統資源(CPU、記憶體、網路)能承受負載。
2. 環境準備
需要安裝的套件:
bash
pip install fastapi uvicorn aiohttp redis aioredis python-dotenv
假設 Redis 已在本機或雲端部署好,環境變數配置如下(.env 檔案):
env
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
3. 程式碼實現
以下是一個完整的 FastAPI + Redis 爬蟲系統範例,包含任務隊列、併發爬取、以及結果儲存。
3.1 FastAPI 主程式 (main.py)
python
import asyncio
import json
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import aiohttp
import aioredis
from dotenv import load_dotenv
import os
from typing import List
from contextlib import asynccontextmanager
# 初始化 FastAPI
app = FastAPI()
# 載入環境變數
load_dotenv()
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_DB = int(os.getenv("REDIS_DB", 0))
# Redis 連線池
redis = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global redis
redis = await aioredis.from_url(f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}")
yield
await redis.close()
app.router.lifespan_context = lifespan
# Pydantic 模型,定義任務結構
class CrawlTask(BaseModel):
urls: List[str]
# 爬蟲函數
async def crawl_url(url: str, session: aiohttp.ClientSession) -> dict:
try:
async with session.get(url, timeout=10) as response:
if response.status == 200:
data = await response.text()
return {"url": url, "status": "success", "data": data[:500]} # 假設只取前 500 字元
else:
return {"url": url, "status": "failed", "error": f"Status {response.status}"}
except Exception as e:
return {"url": url, "status": "failed", "error": str(e)}
# 批量爬取任務
async def process_crawl_tasks(urls: List[str]):
async with aiohttp.ClientSession() as session:
tasks = [crawl_url(url, session) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 將結果存入 Redis
for result in results:
if isinstance(result, dict):
await redis.set(f"crawl_result:{result['url']}", json.dumps(result))
# API 端點:接收爬蟲任務
@app.post("/crawl")
async def start_crawl(task: CrawlTask, background_tasks: BackgroundTasks):
# 將 URL 加入 Redis 任務隊列
for url in task.urls:
await redis.lpush("crawl_queue", url)
# 在背景執行爬蟲任務
background_tasks.add_task(process_queue)
return {"message": f"Added {len(task.urls)} URLs to crawl queue"}
# 背景任務:從 Redis 隊列中取出 URL 並爬取
async def process_queue():
batch_size = 100 # 每次處理 100 個 URL
while await redis.llen("crawl_queue") > 0:
# 從隊列中取出 batch_size 個 URL
urls = []
for _ in range(min(batch_size, await redis.llen("crawl_queue"))):
url = await redis.lpop("crawl_queue")
if url:
urls.append(url.decode("utf-8"))
# 執行爬蟲
await process_crawl_tasks(urls)
# 避免過快請求,稍微延遲
await asyncio.sleep(0.1)
# API 端點:查詢爬蟲結果
@app.get("/result/{url:path}")
async def get_result(url: str):
result = await redis.get(f"crawl_result:{url}")
if result:
return json.loads(result)
return {"message": "Result not found"}
3.2 執行方式
啟動 FastAPI 服務:
bash
uvicorn main:app --host 0.0.0.0 --port 8000
3.3 使用範例
- 提交爬蟲任務: 使用 curl 或 Postman 發送 POST 請求:bash
curl -X POST "http://localhost:8000/crawl" -H "Content-Type: application/json" -d '{"urls": ["https://example.com", "https://example.org"]}'
這會將 URL 加入 Redis 任務隊列,並在背景執行爬蟲。 - 查詢結果: 使用 GET 請求查詢爬取結果:bash
curl "http://localhost:8000/result/https://example.com"
4. 性能優化與併發控制
為了實現每分鐘 10,000 筆資料的目標,需進行以下優化:
4.1 併發數控制
- 限制併發數:aiohttp.ClientSession 的併發數透過 batch_size 控制,建議設為 100~200,具體數值需根據伺服器性能和目標網站的限制調整。
- Semaphore:若需更精細控制併發,可使用 asyncio.Semaphore:python
semaphore = asyncio.Semaphore(100) # 限制最大併發數為 100 async def crawl_url(url: str, session: aiohttp.ClientSession) -> dict: async with semaphore: # 原有爬蟲邏輯
4.2 Redis 優化
- 隊列管理:使用 Redis 的 LPUSH 和 LPOP 實現高效任務隊列。
- 快取:將已爬取的 URL 結果存入 Redis,避免重複爬取(設置 TTL 快取過期時間):python
await redis.setex(f"crawl_result:{url}", 3600, json.dumps(result)) # 快取 1 小時
- 連接池:使用 aioredis 的連接池,減少連線開銷。
4.3 爬蟲效率
- 非同步 HTTP 請求:使用 aiohttp 或 httpx 進行非同步請求,減少 I/O 等待時間。
- 重試機制:為失敗的請求添加重試邏輯:python
from tenacity import retry, stop_after_attempt, wait_fixed @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) async def crawl_url(url: str, session: aiohttp.ClientSession) -> dict: # 原有爬蟲邏輯
- 限速:為避免被目標網站封鎖,添加延遲或使用代理池。
4.4 資源考量
- 伺服器配置:假設每筆請求耗時 0.1 秒,100 併發下每秒可處理 100 筆資料,10,000 筆需約 100 秒。建議使用多核心伺服器(至少 8 核 CPU,16GB 記憶體)並部署多個 FastAPI 實例。
- 負載均衡:使用 Nginx 或 Kubernetes 部署多個 FastAPI 實例,均分任務。
- Redis 效能:確保 Redis 部署在高性能伺服器上,啟用持久化以防止資料遺失。
5. 監控與錯誤處理
- 日誌:使用 logging 模組記錄爬蟲狀態和錯誤:python
import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__)
- 監控:使用 Prometheus + Grafana 監控 FastAPI 和 Redis 的性能(如請求速率、隊列長度)。
- 錯誤處理:捕獲異常並記錄到 Redis 或日誌檔案:python
if isinstance(result, Exception): logger.error(f"Error crawling {url}: {str(result)}")
6. 擴展建議
- 分散式爬蟲:若單機無法滿足需求,可使用 Celery 或 Dask 分散任務到多台機器。
- 資料儲存:將爬取結果存入 MongoDB 或 PostgreSQL,支援更複雜的查詢。
- 代理與反爬蟲:整合代理池(如 scrapy-proxies)和隨機 User-Agent,應對反爬蟲機制。
- 任務優先級:在 Redis 隊列中使用優先級(如 ZADD)管理高優先級任務。
7. 性能估算
- 吞吐量:假設併發數 100,每筆請求 0.1 秒,則每秒處理 100 筆,1 分鐘可處理 6,000 筆。若需 10,000 筆,可提高併發數(例如 200)或部署多實例。
- 資源需求:
- CPU:8 核以上,支援多執行緒。
- 記憶體:16GB+,Redis 和 FastAPI 各需一定記憶體。
- 網路:穩定的高頻寬連線,確保 HTTP 請求順暢。
8. 測試與部署
- 壓力測試:使用 locust 模擬大量請求,測試系統吞吐量。
- 部署:使用 Docker 容器化 FastAPI 和 Redis,方便擴展:dockerfile
FROM python:3.9 WORKDIR /app COPY . . RUN pip install -r requirements.txt CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
9. 注意事項
- 目標網站限制:檢查目標網站的 robots.txt 和使用條款,避免違規。
- Redis 持久化:啟用 AOF 或 RDB,防止資料遺失。
- 安全:為 API 添加認證(如 JWT),防止未授權訪問。
使用 Nginx 部署多個 FastAPI 實例可以實現負載均衡和高可用性,特別適合高併發場景如每分鐘處理 10,000 筆資料的爬蟲系統。以下是詳細的部署步驟,包含 Nginx 配置、FastAPI 實例運行、以及容器化部署的建議。
1. 整體架構
- Nginx:作為反向代理和負載均衡器,將客戶端請求分發到多個 FastAPI 實例。
- FastAPI 實例:每個實例運行在獨立的進程或容器中,監聽不同端口(如 8000、8001、8002)。
- Redis:作為任務隊列和快取,確保多個 FastAPI 實例共享任務和結果。
- 部署方式:使用 Docker 容器化 FastAPI 和 Nginx,方便擴展和管理。
2. 環境準備
- 伺服器:一台或多台伺服器,建議至少 8 核 CPU、16GB 記憶體。
- 安裝軟體:
- Docker 和 Docker Compose(容器化部署)。
- Nginx(可直接安裝或使用 Docker 映像)。
- Python 環境(用於 FastAPI)。
- Redis:假設已部署並可透過環境變數連線。
3. 部署步驟
3.1 準備 FastAPI 應用
假設你已經有一個 FastAPI 應用(如前述爬蟲系統的 main.py),並確認它能正常運行。為了部署多個實例,每個實例需要監聽不同的端口。
修改 main.py(如果需要),確保端口可透過環境變數配置:
python
import os
import uvicorn
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
return {"message": "FastAPI instance running"}
if __name__ == "__main__":
port = int(os.getenv("PORT", 8000)) # 從環境變數讀取端口,預設 8000
uvicorn.run(app, host="0.0.0.0", port=port)
3.2 建立 Docker 映像
為 FastAPI 應用創建 Dockerfile:
dockerfile
FROM python:3.9
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
requirements.txt 示例:
fastapi==0.115.0
uvicorn==0.30.6
aiohttp==3.10.5
aioredis==2.0.1
python-dotenv==1.0.1
3.3 使用 Docker Compose 運行多個 FastAPI 實例
創建 docker-compose.yml,定義多個 FastAPI 實例和 Redis:
yaml
version: "3.9"
services:
fastapi1:
build: .
environment:
- PORT=8000
- REDIS_HOST=redis
- REDIS_PORT=6379
- REDIS_DB=0
ports:
- "8000:8000"
depends_on:
- redis
fastapi2:
build: .
environment:
- PORT=8001
- REDIS_HOST=redis
- REDIS_PORT=6379
- REDIS_DB=0
ports:
- "8001:8001"
depends_on:
- redis
fastapi3:
build: .
environment:
- PORT=8002
- REDIS_HOST=redis
- REDIS_PORT=6379
- REDIS_DB=0
ports:
- "8002:8002"
depends_on:
- redis
redis:
image: redis:7.0
ports:
- "6379:6379"
volumes:
- redis-data:/data
nginx:
image: nginx:latest
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- fastapi1
- fastapi2
- fastapi3
volumes:
redis-data:
3.4 配置 Nginx 負載均衡
創建 nginx.conf 檔案,配置反向代理和負載均衡:
nginx
events {}
http {
upstream fastapi_backend {
# 定義 FastAPI 實例的負載均衡池
server fastapi1:8000;
server fastapi2:8001;
server fastapi3:8002;
}
server {
listen 80;
location / {
# 將請求分發到 FastAPI 實例
proxy_pass http://fastapi_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
}
- upstream:定義 FastAPI 實例的地址和端口。
- proxy_pass:將請求分發到 fastapi_backend,Nginx 會使用輪詢(round-robin)方式分配請求。
- 負載均衡策略:預設為輪詢,也可改為 least_conn(最少連線)或 ip_hash(基於客戶端 IP)。
3.5 啟動服務
在包含 docker-compose.yml 和 nginx.conf 的目錄下運行:
bash
docker-compose up -d
這會啟動:
- 3 個 FastAPI 實例(分別監聽 8000、8001、8002 端口)。
- 1 個 Redis 實例(監聽 6379 端口)。
- 1 個 Nginx 實例(監聽 80 端口,作為反向代理)。
4. 驗證部署
- 訪問 API: 透過瀏覽器或 curl 訪問 http://<伺服器IP>,Nginx 會將請求分發到某個 FastAPI 實例:bash
curl http://localhost
應返回 {"message": "FastAPI instance running"}。 - 提交爬蟲任務: 使用 POST 請求提交任務,確認任務被分發並存入 Redis:bash
curl -X POST "http://localhost/crawl" -H "Content-Type: application/json" -d '{"urls": ["https://example.com", "https://example.org"]}'
- 檢查 Redis 隊列: 使用 Redis CLI 確認任務是否正確加入:bash
docker exec -it <redis-container-name> redis-cli > LLEN crawl_queue
5. 性能優化
- 調整 FastAPI 實例數:根據伺服器資源,增加或減少 docker-compose.yml 中的 FastAPI 服務數量。例如,8 核 CPU 可運行 4~8 個實例。
- Nginx 負載均衡策略:
- 使用 least_conn 減少高負載實例的壓力:nginx
upstream fastapi_backend { least_conn; server fastapi1:8000; server fastapi2:8001; server fastapi3:8002; }
- 添加權重(weight)以分配不同性能的伺服器:nginx
server fastapi1:8000 weight=2; # 分配更多請求 server fastapi2:8001 weight=1;
- Gunicorn + Uvicorn:為每個 FastAPI 實例使用 Gunicorn 管理多個 Uvicorn 工作進程,提高單實例性能:dockerfile
CMD ["gunicorn", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "main:app", "--bind", "0.0.0.0:8000"]
(需在 requirements.txt 中添加 gunicorn) - Nginx 緩衝與快取:
- 啟用緩衝減少後端負載:nginx
proxy_buffering on; proxy_buffer_size 128k; proxy_buffers 4 256k;
- 啟用快取靜態響應(若適用):nginx
location / { proxy_cache my_cache; proxy_cache_valid 200 1h; proxy_pass http://fastapi_backend; }
6. 監控與擴展
- 監控:
- 使用 Prometheus 和 Grafana 監控 Nginx 和 FastAPI 的請求速率、延遲和錯誤率。
- 檢查 Docker 容器的 CPU 和記憶體使用情況:bash
docker stats
- 擴展:
- 水平擴展:在 docker-compose.yml 中添加更多 FastAPI 服務(例如 fastapi4、fastapi5),並更新 nginx.conf 的 upstream。
- 跨伺服器部署:若使用多台伺服器,可將 Nginx 和 FastAPI 分開部署,使用 DNS 或負載均衡器(如 AWS ELB)分發流量。
- 高可用性:
- 啟用 Nginx 的健康檢查:nginx
upstream fastapi_backend { server fastapi1:8000 max_fails=3 fail_timeout=30s; server fastapi2:8001 max_fails=3 fail_timeout=30s; server fastapi3:8002 max_fails=3 fail_timeout=30s; }
- 確保 Redis 使用主從複製或集群模式,防止單點故障。
7. 注意事項
- 端口衝突:確保每個 FastAPI 實例的端口不重複,且與 Nginx 的配置一致。
- Redis 連線:多個 FastAPI 實例共享同一個 Redis,需確保 Redis 性能足夠(考慮啟用持久化或集群)。
- 安全:
- 為 Nginx 添加 SSL/TLS(使用 Let’s Encrypt 或自簽證書)。
- 為 FastAPI API 添加認證(如 JWT)。
- 日誌:配置 Nginx 訪問和錯誤日誌,存儲到檔案或集中式日誌系統:nginx
access_log /var/log/nginx/access.log; error_log /var/log/nginx/error.log;
8. 性能估算
- 吞吐量:假設每個 FastAPI 實例每秒處理 50 筆請求(併發數 50,平均 0.1 秒/請求),3 個實例可達每秒 150 筆,1 分鐘約 9,000 筆。若需 10,000 筆,可增加實例數或優化爬蟲效率。
- 資源需求:
- 每個 FastAPI 實例(使用 Gunicorn + 4 個 Uvicorn 進程):約 1GB 記憶體、1~2 核 CPU。
- Nginx:約 512MB 記憶體、1 核 CPU。
- Redis:視資料量而定,建議 2GB+ 記憶體。
9. 測試
- 壓力測試:使用 locust 模擬高併發請求:python
from locust import HttpUser, task, between class CrawlUser(HttpUser): wait_time = between(1, 2) @task def submit_crawl(self): self.client.post("/crawl", json={"urls": ["https://example.com"]})
運行:bashlocust -f locustfile.py --host=http://localhost
- 驗證負載均衡:檢查 Nginx 日誌,確認請求是否均勻分發到各 FastAPI 實例:bash
docker logs <nginx-container-name>
這套方案透過 Nginx 和 Docker Compose 實現了多個 FastAPI 實例的部署,適合高併發爬蟲系統。
沒有留言:
張貼留言