打造每秒萬次查詢的高效能分散式網路爬蟲系統
在現今數據驅動的世界裡,大規模、高效率的網路爬蟲系統是許多業務的基石。無論是市場分析、輿情監控還是數據聚合,一個能夠快速、穩定地抓取大量網頁內容的爬蟲系統都至關重要。本文將深入介紹一個設計目標為 每秒 10,000 次查詢 (QPS) 的高效能分散式網路爬蟲系統,並分享其核心架構與實作細節。
您可以透過以下 GitHub 連結檢閱本專案的原始碼:https://github.com/BpsEason/high_qps_crawler.git
🚀 專案概述:為極致效能而生
本專案旨在提供一個模組化、可擴展且生產就緒的分散式爬蟲核心程式碼,以應對高吞吐量的網頁抓取需求。其主要特色包括:
高吞吐量:透過 Docker Compose 實現水平擴展,輕鬆達到 10,000 QPS 的目標。
模組化架構:系統各組件職責分離,提升可維護性與靈活性。
強大反爬蟲能力:整合代理池、隨機 User-Agent 及 CAPTCHA 解決方案。
完善監控:整合 Prometheus、Grafana 和 Celery Flower,便於系統運維。
🛠️ 系統架構:多技術棧協同作戰
為了實現高 QPS 和穩定性,本系統採用了多種領先技術的組合,各司其職,協同運作:
Laravel API (
laravel_api/
):作為使用者介面和任務管理層,負責接收任務提交請求(例如通過 API)並處理任務狀態查詢。
透過 gRPC 與 FastAPI 爬蟲核心進行高速通信。
利用 Redis 儲存即時任務狀態,並將歷史數據持久化到 PostgreSQL 或 MongoDB。
FastAPI 爬蟲核心 (
fastapi_crawler/
):基於 Python 的非同步框架,負責執行實際的網頁抓取任務。
管理代理池和隨機 User-Agent,以應對反爬蟲策略。
提供 gRPC 服務接收來自 Laravel API 的抓取請求。
其核心抓取邏輯使用
httpx
進行非同步 HTTP 請求。
Celery 工作節點 (
celery_workers/
):負責處理分散式數據儲存任務,確保在爬取任務完成後能夠高並發地將數據寫入資料庫,避免阻塞主爬蟲流程。
Redis:
作為高速緩存,用於管理任務佇列、代理池以及實現 URL 去重,極大地提升了系統響應速度和效率。
PostgreSQL/MongoDB:
提供靈活的數據儲存方案。預設使用 PostgreSQL 搭配 PgBouncer 進行連接池管理,也支援切換至 MongoDB。
監控:
整合 Prometheus 用於指標收集,Grafana 進行數據可視化,以及 Celery Flower 用於監控 Celery 任務執行情況。
🔑 關鍵程式碼解析
以下將展示部分關鍵程式碼片段,說明其如何實現高效率和模組化。
1. Laravel API - 任務提交 (laravel_api/app/Http/Controllers/CrawlerController.php
)
此控制器負責接收外部的爬取請求,並通過 gRPC 將任務派發給 FastAPI 服務。
<?php
namespace App\Http\Controllers;
use Illuminate\Http\Request;
use Grpc\ChannelCredentials;
use App\Grpc\CrawlerServiceClient;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Redis;
class CrawlerController extends Controller
{
/**
* 提交爬取任務至 FastAPI(通過 gRPC)
*/
public function submitCrawlTask(Request $request)
{
$request->validate(['url' => 'required|url']);
$url = $request->input('url');
$taskId = uniqid('crawl_'); // 生成唯一任務 ID
try {
$client = new CrawlerServiceClient(env('FASTAPI_GRPC_HOST', 'fastapi_crawler:50051'), [
'credentials' => ChannelCredentials::createInsecure(),
]);
$requestProto = new \Crawler\CrawlRequest();
$requestProto->setUrl($url);
$requestProto->setTaskId($taskId);
list($response, $status) = $client->Crawl($requestProto)->wait();
if ($status->code !== \Grpc\STATUS_OK) {
throw new \Exception("gRPC 調用失敗: " . $status->details);
}
// 儲存任務狀態至 Redis
Redis::hset("crawl_task_status:{$taskId}", "status", "dispatched");
Redis::hset("crawl_task_status:{$taskId}", "url", $url);
Redis::hset("crawl_task_status:{$taskId}", "dispatched_at", now()->toIso8601String());
Log::info("gRPC 成功,URL: $url, 任務 ID: $taskId");
return response()->json([
'message' => '任務已提交',
'task_id' => $taskId,
'grpc_response_status' => $response->getStatus(),
]);
} catch (\Exception $e) {
Log::error("提交失敗,URL: $url. 錯誤: " . $e->getMessage());
Redis::hset("crawl_task_status:{$taskId}", "status", "submission_failed");
Redis::hset("crawl_task_status:{$taskId}", "error", $e->getMessage());
return response()->json(['error' => '提交失敗: ' . $e->getMessage()], 500);
}
}
}
說明:此程式碼片段展示了 Laravel 如何驗證輸入、生成唯一任務 ID,並透過 gRPC 非同步地將爬取請求發送到 FastAPI 服務。任務狀態會即時更新到 Redis,確保快速響應和狀態追蹤。
2. FastAPI 爬蟲核心 - 網頁抓取 (fastapi_crawler/app/crawler.py
)
這是系統的爬取引擎,負責實際執行 HTTP 請求、處理反爬蟲措施並返回網頁內容。
import httpx
from fake_useragent import UserAgent
import asyncio
import redis.asyncio as aioredis
import random
import os
REDIS_HOST = os.getenv("REDIS_HOST", "redis")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
async def get_redis_client():
"""獲取 Redis 連線"""
global _redis_client
if _redis_client is None:
_redis_client = await aioredis.from_url(f"redis://{REDIS_HOST}:{REDIS_PORT}")
return _redis_client
async def get_proxy():
"""從 Redis 選擇有效代理"""
client = await get_redis_client()
try:
proxies_bytes = await client.smembers("proxy_pool")
if not proxies_bytes:
print("無可用代理,使用直接連線")
return None
proxies = [p.decode('utf-8') for p in proxies_bytes]
random.shuffle(proxies)
for proxy in proxies:
try:
async with httpx.AsyncClient(proxies={'http://': proxy, 'https://': proxy}, timeout=5) as test_client:
response = await test_client.get("http://httpbin.org/ip")
response.raise_for_status()
print(f"使用代理: {proxy}")
return proxy
except (httpx.RequestError, httpx.HTTPStatusError) as e:
print(f"代理 {proxy} 失敗: {e}")
await client.srem("proxy_pool", proxy)
print("無有效代理")
return None
except Exception as e:
print(f"獲取代理失敗: {e}")
return None
async def perform_crawl(url: str, task_id: str) -> str:
"""執行網頁爬取"""
user_agent = UserAgent().random
proxy = await get_proxy()
await asyncio.sleep(random.uniform(0.1, 1.0)) # 隨機延遲
headers = {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.0,image/webp,*/*;q=0.8',
}
async with httpx.AsyncClient(headers=headers, proxies={'http://': proxy, 'https://': proxy} if proxy else None,
timeout=30, follow_redirects=True) as client:
try:
print(f"爬取: {url} (任務 ID: {task_id})")
response = await client.get(url)
response.raise_for_status()
redis_client = await get_redis_client()
await redis_client.sadd("crawled_urls", url) # URL 去重
return response.text
except httpx.HTTPStatusError as e:
print(f"爬取失敗,狀態碼 {e.response.status_code}: {e}")
if e.response.status_code in [403, 429]:
print("可能需要 CAPTCHA 解決")
raise
說明:perform_crawl
函數展示了如何隨機化 User-Agent 和從 Redis 代理池中獲取可用代理以規避爬蟲檢測。它利用 httpx
進行非同步請求,並在 Redis 中對已爬取的 URL 進行去重。針對 403/429 等常見反爬狀態碼,也預留了 CAPTCHA 解決的擴展點。
3. Celery 工作節點 - 資料儲存 (celery_workers/app/tasks.py
)
Celery 負責將爬取到的數據非同步地存儲到資料庫中,確保爬取流程不受數據寫入性能影響。
from celery_app import app
import asyncio
import asyncpg
import datetime
import os
import redis.asyncio as aioredis
POSTGRES_URL = os.getenv("POSTGRES_URL", "postgresql://user:password@pgbouncer:6432/crawler_db")
REDIS_HOST = os.getenv("REDIS_HOST", "redis")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
async def get_redis_client():
"""獲取 Redis 連線"""
return await aioredis.from_url(f"redis://{REDIS_HOST}:{REDIS_PORT}")
async def get_pg_pool():
"""獲取 PostgreSQL 連線池"""
global _pg_pool
if _pg_pool is None:
if POSTGRES_URL:
_pg_pool = await asyncpg.create_pool(POSTGRES_URL, min_size=10, max_size=50)
else:
raise ValueError("未設置 POSTGRES_URL")
return _pg_pool
@app.task(name='tasks.store_data', bind=True, default_retry_delay=300, max_retries=5)
def store_data_task(self, url: str, content: str, task_id: str = None):
"""儲存爬取數據"""
print(f"處理 URL: {url} (任務 ID: {task_id})")
redis_client_sync = asyncio.run(get_redis_client())
try:
# ... (儲存邏輯,例如插入到 PostgreSQL/MongoDB)
# 這裡僅為示意,實際會有數據庫操作
print(f"數據成功儲存,URL: {url}")
asyncio.run(redis_client_sync.hset(f"crawl_task_status:{task_id}", "status", "completed"))
asyncio.run(redis_client_sync.hset(f"crawl_task_status:{task_id}", "completed_at", datetime.datetime.now().isoformat()))
except Exception as e:
print(f"儲存數據失敗,URL: {url}. 錯誤: {e}")
asyncio.run(redis_client_sync.hset(f"crawl_task_status:{task_id}", "status", "failed"))
asyncio.run(redis_client_sync.hset(f"crawl_task_status:{task_id}", "error", str(e)))
raise self.retry(exc=e)
說明:store_data_task
是一個 Celery 任務,它接收爬取到的 URL 和內容,然後將其存儲到配置的資料庫中。任務失敗時支援重試機制,並且會更新 Redis 中的任務狀態。
🏗️ 部署:基於 Docker Compose 的快速啟動
整個系統的部署得益於 Docker 和 Docker Compose,實現了環境隔離和快速擴展。
克隆倉庫:
Bashgit clone [請替換為您的倉庫地址] cd [您的倉庫目錄]
安裝 Laravel 基礎環境:
Bashcd laravel_api composer install cp .env.example .env php artisan key:generate
配置環境變數:
在 fastapi_crawler/.env 和 laravel_api/.env 中配置代理 API、CAPTCHA 金鑰、資料庫和 Redis 連線等資訊。
選擇資料庫:
預設為 PostgreSQL,可修改 docker-compose.yml 切換至 MongoDB。
構建並運行服務:
Bashdocker compose build docker compose up -d
初始化資料庫:
Bashdocker compose exec laravel php artisan migrate
擴展 Celery 工作節點:
根據需求擴展 Celery 工作節點數量以提升並發處理能力。
Bashdocker compose up -d --scale celery_worker=10
結語
這個高 QPS 分散式網路爬蟲系統透過模組化的設計、高效的非同步通信(gRPC)、彈性的任務佇列(Redis, Celery)以及完善的反爬蟲和監控機制,為大規模網頁數據抓取提供了強大的基礎。希望這篇文章能為您在構建或優化爬蟲系統時提供寶貴的參考!
沒有留言:
張貼留言