要在 Windows 11 底下開發一個基於 XAMPP + Waitress + FastAPI 的高併發爬蟲系統,同時使用 MySQL 和 Redis,這是一個相當複雜但也非常有挑戰性的任務。下面我會將整個開發流程拆解成幾個主要部分,並提供詳細的步驟和注意事項。
重要提醒:
- XAMPP 的角色: 在這個架構中,XAMPP 的 Apache 和 MySQL 部分仍然有用,但 PHP 不再是爬蟲系統的核心語言。FastAPI 將承擔大部分後端邏輯。
- 高併發: 高併發通常需要非同步處理。FastAPI 本身支援非同步,但在 Windows 上部署和優化需要特別注意。
- Waitress: Waitress 是一個純 Python WSGI 服務器,適合開發和中小型部署。對於真正的高併發生產環境,你可能會考慮更專業的 ASGI 服務器,如 Uvicorn + Gunicorn (但 Gunicorn 不直接支援 Windows,需要用其他方式組合)。不過,對於開發階段和中小型應用,Waitress 已經足夠。
- MySQL & Redis: 這些是數據存儲和快取層,它們的安裝和配置是基礎。
- 爬蟲: 爬蟲本身需要考慮反爬、代理、多線程/多進程/異步等。
系統架構概覽
- 數據庫層 (MySQL): 透過 XAMPP 提供,用於儲存爬取到的數據。
- 快取層 (Redis): 用於任務隊列、快取、分布式鎖等,以提升爬蟲效率和系統響應速度。
- FastAPI (Python): 這是爬蟲系統的核心後端服務,負責:
- 接收爬蟲任務請求。
- 管理爬蟲進程/協程。
- 處理數據儲存到 MySQL。
- 利用 Redis 進行任務調度、狀態管理。
- 提供 API 接口供前端或其他服務呼叫。
- Waitress: 作為 FastAPI 應用程序的 WSGI 服務器。
- 爬蟲模塊 (Python): 獨立的 Python 腳本或模塊,由 FastAPI 觸發或管理,執行實際的網頁爬取工作。
- XAMPP (Apache): 可選,如果你需要 PHPMyAdmin 來管理 MySQL,或者有其他 PHP 應用,XAMPP 仍然有用。但對於 FastAPI 服務,Apache 不會直接服務它。
開發環境搭建步驟
第一部分:基礎服務安裝 (MySQL & Redis)
-
安裝 XAMPP (MySQL 部分):
- 前往
下載並安裝 XAMPP。Apache Friends 官網 - 安裝時確保勾選
MySQL
。 - 安裝完成後,啟動 XAMPP 控制面板,啟動
Apache
和MySQL
服務。 - 你可以通過訪問
http://localhost/phpmyadmin
來驗證 MySQL 是否正常運行。 - 重要: 為 MySQL 創建一個專門的用戶和數據庫,供爬蟲系統使用,並賦予必要的權限。
- 前往
-
安裝 Redis for Windows (第三方適配版本):
- 官方 Redis 不直接支援 Windows。你可以使用
提供的預編譯版本。tporadowski/redis - 下載最新的
Redis-x64-版本號.zip
文件。 - 解壓縮到一個你喜歡的目錄,例如
C:\Redis
。 - 啟動 Redis 服務:
- 打開 CMD 或 PowerShell,進入 Redis 目錄。
- 執行
redis-server.exe redis.windows.conf
。 - 推薦: 將 Redis 配置為 Windows 服務,讓它開機自動啟動:
你也可以在 Windows 的服務管理工具中找到 "RedisService" 並設置其啟動類型為自動。Bashredis-server.exe --service-install redis.windows.conf --service-name "RedisService" redis-server.exe --service-start --service-name "RedisService"
- 官方 Redis 不直接支援 Windows。你可以使用
第二部分:Python 環境和依賴安裝
-
安裝 Python:
- 從
下載並安裝最新版本的 Python 3 (建議 3.9+)。Python 官網 - 確保在安裝時勾選 "Add Python to PATH",這樣你就可以在 CMD 或 PowerShell 中直接使用
python
或pip
命令。
- 從
-
創建虛擬環境 (Virtual Environment):
- 打開 CMD 或 PowerShell。
- 進入你的項目目錄,例如
cd D:\crawler_project
。 - 創建虛擬環境:
python -m venv venv
- 激活虛擬環境:
venv\Scripts\activate
(Windows) - 激活後,命令提示符前會出現
(venv)
,表示你已進入虛擬環境。所有後續的 pip 安裝都將在這個環境中。
-
安裝必要的 Python 庫:
- 在虛擬環境中執行:
Bash
pip install fastapi uvicorn waitress pymysql redis beautifulsoup4 requests httpx # httpx 用於異步請求 pip install "python-dotenv[cli]" # 用於管理環境變量 pip install "mysql-connector-python" # 也可以用 PyMySQL,看你習慣
fastapi
: Web 框架。uvicorn
: ASGI 服務器,儘管我們用 Waitress 部署,Uvicorn 在開發時很有用。waitress
: WSGI 服務器,用於在 Windows 上部署。pymysql
或mysql-connector-python
: Python 連接 MySQL 的庫。redis
: Python 連接 Redis 的庫。beautifulsoup4
,requests
,httpx
: 爬蟲相關庫。httpx
支援異步請求,適合高併發爬蟲。
- 在虛擬環境中執行:
第三部分:FastAPI 應用開發
這裡只提供基本結構,具體邏輯需要根據你的爬蟲需求實現。
-
項目結構示例:
crawler_project/ ├── venv/ ├── .env # 環境變量文件 ├── main.py # FastAPI 應用主入口 ├── database.py # MySQL 連接和操作 ├── redis_client.py # Redis 連接和操作 ├── scraper.py # 爬蟲核心邏輯 └── requirements.txt # 項目依賴
-
main.py
(FastAPI 應用主入口):Pythonfrom fastapi import FastAPI, BackgroundTasks, HTTPException from pydantic import BaseModel import asyncio from database import get_db_connection, insert_data_to_mysql from redis_client import get_redis_client, add_task_to_queue, get_task_status from scraper import run_scraper app = FastAPI(title="High-Concurrency Crawler System") class ScrapeRequest(BaseModel): url: str depth: int = 1 @app.on_event("startup") async def startup_event(): print("Application startup...") # 可以在這裡初始化數據庫連接池、Redis 連接等 # 例如:await database.init_db_pool() # 例如:await redis_client.init_redis_pool() @app.on_event("shutdown") async def shutdown_event(): print("Application shutdown...") # 清理資源,例如關閉數據庫連接、Redis 連接 # 例如:await database.close_db_pool() # 例如:await redis_client.close_redis_pool() @app.post("/scrape/", tags=["Scraping Tasks"]) async def start_scrape_task(request: ScrapeRequest, background_tasks: BackgroundTasks): task_id = add_task_to_queue(request.url, request.depth) # 將任務加入 Redis 隊列 # 在後台任務中執行爬蟲,不阻塞 API 響應 background_tasks.add_task(run_scraper_async, task_id, request.url, request.depth) return {"message": "Scraping task initiated", "task_id": task_id} @app.get("/task/{task_id}/status", tags=["Scraping Tasks"]) async def get_task_status_endpoint(task_id: str): status = get_task_status(task_id) if not status: raise HTTPException(status_code=404, detail="Task not found or expired") return {"task_id": task_id, "status": status} # 模擬異步執行爬蟲 async def run_scraper_async(task_id: str, url: str, depth: int): print(f"Starting async scraper for task_id: {task_id}, url: {url}") try: # 實際的爬蟲邏輯將在這裡被調用 scraped_data = await run_scraper(url, depth) # run_scraper 應該是異步的 # 將數據儲存到 MySQL conn = await get_db_connection() if conn: await insert_data_to_mysql(conn, scraped_data) conn.close() # 關閉連接 else: print("Failed to get DB connection for inserting data.") # 更新 Redis 中的任務狀態 # 例如:update_task_status(task_id, "completed") print(f"Scraping task {task_id} completed.") except Exception as e: print(f"Error in scraping task {task_id}: {e}") # 例如:update_task_status(task_id, f"failed: {e}") # For local development (optional, Waitress will handle in production-like) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
-
database.py
(MySQL 操作):Pythonimport pymysql.cursors import os from dotenv import load_dotenv load_dotenv() # 加載 .env 文件中的環境變量 async def get_db_connection(): try: conn = pymysql.connect( host=os.getenv("MYSQL_HOST", "localhost"), user=os.getenv("MYSQL_USER", "root"), password=os.getenv("MYSQL_PASSWORD", ""), database=os.getenv("MYSQL_DB", "crawler_db"), charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) return conn except pymysql.Error as e: print(f"Error connecting to MySQL: {e}") return None async def insert_data_to_mysql(conn, data): if not conn or not data: return # 這裡的邏輯需要根據你的爬取數據結構和數據庫表結構來實現 # 假設 data 是一個字典列表,每個字典代表一條記錄 try: with conn.cursor() as cursor: # 這裡是範例,你需要替換成實際的 INSERT 語句 sql = "INSERT INTO scraped_items (title, url, content) VALUES (%s, %s, %s)" # 假設 data 是一個字典,包含 'title', 'url', 'content' cursor.execute(sql, (data.get('title'), data.get('url'), data.get('content'))) conn.commit() print("Data inserted successfully.") except Exception as e: conn.rollback() print(f"Error inserting data: {e}") # 更多數據庫操作函數...
-
redis_client.py
(Redis 操作):Pythonimport redis import os from dotenv import load_dotenv import uuid load_dotenv() _redis_client = None def get_redis_client(): global _redis_client if _redis_client is None: try: _redis_client = redis.StrictRedis( host=os.getenv("REDIS_HOST", "localhost"), port=int(os.getenv("REDIS_PORT", 6379)), db=int(os.getenv("REDIS_DB", 0)), decode_responses=True # 自動解碼,方便操作字符串 ) _redis_client.ping() # 測試連接 print("Connected to Redis.") except redis.exceptions.ConnectionError as e: print(f"Error connecting to Redis: {e}") _redis_client = None return _redis_client def add_task_to_queue(url: str, depth: int): r = get_redis_client() if r: task_id = str(uuid.uuid4()) task_info = {"url": url, "depth": depth, "status": "pending"} r.hset(f"task:{task_id}", mapping=task_info) # 將任務信息儲存為 hash r.rpush("scraper_queue", task_id) # 將任務 ID 加入隊列 r.expire(f"task:{task_id}", 3600) # 設置任務信息過期時間 (1小時) return task_id return None def get_task_status(task_id: str): r = get_redis_client() if r: return r.hgetall(f"task:{task_id}") return None def update_task_status(task_id: str, status: str): r = get_redis_client() if r: r.hset(f"task:{task_id}", "status", status)
-
scraper.py
(爬蟲核心邏輯):Pythonimport httpx # 用於異步 HTTP 請求 from bs4 import BeautifulSoup import asyncio async def fetch_url(url: str): async with httpx.AsyncClient() as client: try: response = await client.get(url, timeout=30) response.raise_for_status() # 檢查 HTTP 響應狀態碼 return response.text except httpx.RequestError as e: print(f"An error occurred while requesting {url}: {e}") return None except httpx.HTTPStatusError as e: print(f"Error response {e.response.status_code} while requesting {url}: {e}") return None async def parse_html(html_content: str): if not html_content: return {} soup = BeautifulSoup(html_content, 'html.parser') # 這裡寫你的解析邏輯,例如提取標題、鏈接、內容等 title = soup.title.string if soup.title else "No Title" # 簡單範例:提取所有段落文本 paragraphs = [p.get_text() for p in soup.find_all('p')] content = "\n".join(paragraphs) return {"title": title, "content": content} async def run_scraper(url: str, depth: int = 1): """ 異步執行爬蟲邏輯 """ print(f"Scraping: {url} with depth {depth}") html_content = await fetch_url(url) if html_content: parsed_data = await parse_html(html_content) parsed_data['url'] = url # 將 URL 加入數據中 print(f"Scraped data for {url}: {parsed_data['title']}") # 在這裡,你可以將爬取到的數據返回,讓調用者(FastAPI)處理儲存 return parsed_data return None # 如果需要爬取多個鏈接,可以在此處實現遞歸或任務分發 # 例如: # async def crawl_recursive(url: str, current_depth: int, max_depth: int): # if current_depth > max_depth: # return # html = await fetch_url(url) # # 解析鏈接 # # 將新鏈接加入隊列或觸發新的爬蟲任務
-
.env
文件:MYSQL_HOST=localhost MYSQL_USER=root MYSQL_PASSWORD= MYSQL_DB=crawler_db REDIS_HOST=localhost REDIS_PORT=6379 REDIS_DB=0
-
requirements.txt
:fastapi uvicorn waitress pymysql redis beautifulsoup4 requests httpx python-dotenv
第四部分:運行和部署
-
開發階段運行 (使用 Uvicorn):
- 打開 CMD 或 PowerShell,進入你的項目目錄。
- 激活虛擬環境:
venv\Scripts\activate
- 運行 FastAPI 應用:
uvicorn main:app --reload --host 0.0.0.0 --port 8000
--reload
可以在代碼更改時自動重啟服務,方便開發。- 打開瀏覽器訪問
http://localhost:8000/docs
可以看到 FastAPI 的自動文檔 (Swagger UI)。 - 你可以通過 Postman 或其他工具向
/scrape/
接口發送 POST 請求來測試爬蟲任務。
-
生產環境部署 (使用 Waitress):
- Waitress 是一個簡單的 WSGI 服務器,適合 Windows 環境。
- 在項目根目錄下,創建一個啟動腳本,例如
run_waitress.py
(或者直接在 CMD 中運行):Python# run_waitress.py from waitress import serve from main import app # 從你的 main.py 中導入 FastAPI 應用 if __name__ == "__main__": print("Starting FastAPI application with Waitress...") # listen 參數設置為 0.0.0.0:8000 允許外部訪問 # threads 參數控制並發處理請求的線程數,可以根據需求調整 serve(app, host="0.0.0.0", port=8000, threads=10)
- 在 CMD 或 PowerShell 中,激活虛擬環境,然後運行:
python run_waitress.py
- 現在你的 FastAPI 應用就會通過 Waitress 服務器運行在
http://localhost:8000
。 - 注意: Waitress 雖然可以處理並發請求,但它是基於線程的。對於 CPU 密集型任務或非常高的 I/O 併發,你可能需要考慮更複雜的部署策略 (例如:Windows Server 上的 IIS 反向代理到 Uvicorn/Gevent/Asyncio workers)。
高併發爬蟲優化考慮
- 異步 I/O: FastAPI 和
httpx
本身就是異步的,這對於處理大量的網絡 I/O 非常關鍵。確保你的爬蟲邏輯也盡可能使用async/await
。 - 任務隊列 (Redis): 使用 Redis 作為任務隊列 (例如
RPUSH
/LPOP
) 可以實現任務的分發和解耦。你可以有多個爬蟲工作者 (可以是獨立的 Python 進程),它們從 Redis 隊列中拉取任務並執行。 - 多進程/多線程 (Python):
- Python 的 GIL (Global Interpreter Lock): 限制了單個 Python 進程的 CPU 密集型任務只能在一個 CPU 核上運行。
- I/O 密集型任務 (爬蟲): 由於大部分時間花在等待網絡響應上,GIL 的影響相對較小。異步 I/O 已經能提供很好的並發。
- 多進程: 如果你的爬蟲涉及到大量的 CPU 密集型解析或處理,可以考慮使用
multiprocessing
模塊來創建多個 Python 進程,每個進程運行獨立的爬蟲實例,並從 Redis 隊列中獲取任務。 - FastAPI 的
BackgroundTasks
: 適合處理輕量級的後台任務。對於長時間運行的爬蟲任務,更建議將任務加入 Redis 隊列,然後由獨立的 Worker 進程從隊列中拉取並執行。
- 代理池/IP 輪換: 避免被目標網站封鎖,使用代理服務。
- 用戶代理 (User-Agent) 輪換: 模擬不同的瀏覽器。
- 錯誤處理和重試機制: 針對網絡錯誤、解析錯誤等情況,實現健壯的重試邏輯。
- 數據持久化: 定期將 Redis 中的數據持久化到磁盤,或確保 MySQL 是主要數據存儲。
- 監控和日誌: 實時監控爬蟲進度、錯誤率、資源使用情況。使用 Python 的
logging
模塊記錄關鍵信息。 - 反爬機制: 根據目標網站的反爬策略,可能需要處理驗證碼、JS 渲染、Cookie 管理等。
Windows 部署注意事項
- 防火牆: 確保 Windows 防火牆允許你的 FastAPI 應用使用的端口 (例如 8000) 被外部訪問。
- 後台運行: 如果你希望爬蟲系統在 Windows 啟動時自動運行,並且即使你退出登錄也能繼續運行,你可以考慮將 Waitress 啟動的 Python 腳本註冊為 Windows 服務。有很多第三方工具可以幫助完成這個,例如
NSSM
(Non-Sucking Service Manager)。 - 資源管理: 監控 CPU、內存和網絡使用情況,確保系統資源充足。
補充與優化建議
1. 高併發爬蟲的核心優化
在高併發場景下,爬蟲的性能瓶頸主要來自於 I/O 等待(網絡請求)和 CPU 密集型任務(如 HTML 解析)。以下是一些針對高併發的具體優化建議:
- 非同步爬蟲的批量處理:
- 使用 asyncio.gather 同時處理多個 URL 的爬取,提升吞吐量。例如:python
async def fetch_multiple_urls(urls): async with aiohttp.ClientSession() as session: tasks = [fetch_page(url, session) for url in urls] return await asyncio.gather(*tasks, return_exceptions=True) async def fetch_page(url, session): try: async with session.get(url, timeout=30) as response: if response.status == 200: html = await response.text() soup = BeautifulSoup(html, 'html.parser') title = soup.title.string if soup.title else 'No Title' return {'url': url, 'title': title, 'content': html} except Exception as e: print(f"Error fetching {url}: {e}") return None
- 這種方式可以同時發送多個請求,減少等待時間。
- 限制併發數:
- 使用 asyncio.Semaphore 限制同時發送的請求數,避免壓垮目標網站或本地資源:python
async def fetch_page_with_semaphore(url, session, semaphore): async with semaphore: return await fetch_page(url, session) async def fetch_multiple_urls(urls, max_concurrent=10): semaphore = asyncio.Semaphore(max_concurrent) async with aiohttp.ClientSession() as session: tasks = [fetch_page_with_semaphore(url, session, semaphore) for url in urls] return await asyncio.gather(*tasks, return_exceptions=True)
- 代理池與反爬機制:
- 代理池:使用免費或付費代理服務(如 ScrapingBee、BrightData),並在 aiohttp 中配置代理:python
async def fetch_page_with_proxy(url, proxy=None): async with aiohttp.ClientSession() as session: async with session.get(url, proxy=proxy, timeout=30) as response: if response.status == 200: return await response.text() return None
- User-Agent 輪換:使用 fake-useragent 套件隨機生成 User-Agent:python
from fake_useragent import UserAgent ua = UserAgent() headers = {'User-Agent': ua.random} async with session.get(url, headers=headers) as response: ...
- 處理 JavaScript 渲染:如果目標網站需要 JavaScript 渲染,考慮使用 playwright 或 selenium(搭配無頭瀏覽器)。
- 重試機制:
- 使用 tenacity 套件為爬蟲請求添加自動重試:python
from tenacity import retry, stop_after_attempt, wait_exponential @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def fetch_page(url, session): async with session.get(url, timeout=30) as response: response.raise_for_status() return await response.text()
2. Waitress 的高併發配置
- 線程數優化:
- Waitress 的 threads 參數決定了同時處理的請求數。根據 CPU 核心數和記憶體情況,建議設置為 4 * CPU核心數 到 8 * CPU核心數。例如,4 核心 CPU 可設置 threads=16 到 threads=32。
- 示例:python
serve(app, host="0.0.0.0", port=8000, threads=16)
- 限制:Waitress 是基於線程的 WSGI 伺服器,對於 CPU 密集型任務(如解析大規模 HTML),可能需要結合 multiprocessing 或分佈式 Worker。
- 替代方案:
- 如果 Waitress 的性能不足,考慮在 WSL2 中運行 gunicorn + uvicorn,因為 Gunicorn 支援多進程,而 Uvicorn 支援 ASGI 的非同步處理。例如:bash
gunicorn -k uvicorn.workers.UvicornWorker -w 4 main:app
- 在 Windows 上,uvicorn 單獨運行也是一個不錯的選擇,特別是在開發階段。
3. MySQL 與 Redis 的高效使用
- MySQL 優化:
- 連接池:使用 SQLAlchemy 的連接池來管理 MySQL 連接:python
from sqlalchemy import create_engine engine = create_engine( 'mysql+pymysql://root:your_password@localhost/crawler_db', pool_size=20, max_overflow=10, pool_timeout=30 )
- 批量插入:將爬取的數據收集到一定數量後批量插入 MySQL,減少資料庫 I/O:python
def insert_batch_to_mysql(session, data_list): session.bulk_insert_mappings(WebPage, data_list) session.commit()
- Redis 優化:
- 管道 (Pipeline):減少 Redis 的網絡往返時間:python
def add_multiple_tasks(urls): r = redis_client.pipeline() for url in urls: r.lpush('crawler_queue', url) r.execute()
- 持久化與備份:配置 Redis 的 save 指令,確保數據定期寫入磁盤(修改 redis.windows.conf 中的 save 設置)。
- 分片與集群:如果任務量極大,考慮部署 Redis Cluster(需更多伺服器資源)。
4. Windows 環境的特殊處理
- Redis 安裝:
- 如果 tporadowski/redis 不穩定,強烈建議使用 WSL2 運行 Ubuntu,然後在其中安裝官方 Redis:bash
sudo apt update sudo apt install redis sudo systemctl enable redis sudo systemctl start redis
- 在 Windows 與 WSL2 之間共享 Redis,可設置 Redis 的 bind 地址為 0.0.0.0,並在 Windows 防火牆中開放 6379 端口。
- Windows 服務:
- 使用 NSSM 將 Waitress 設置為 Windows 服務:bash
nssm install CrawlerService python run_waitress.py nssm start CrawlerService
- 防火牆與端口:
- 確保 Windows 防火牆允許 8000(FastAPI)、3306(MySQL)、6379(Redis)端口的入站連接。
5. 分佈式爬蟲(進階)
如果需要更高的併發性和可擴展性,可以引入 Celery 作為任務分發框架:
- 安裝 Celery:bash
pip install celery
- 配置 Celery 使用 Redis 作為消息代理:python
from celery import Celery app = Celery('crawler', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0') app.conf.task_serializer = 'json' app.conf.result_serializer = 'json' app.conf.accept_content = ['json'] app.conf.task_track_started = True @app.task def crawl_task(url, depth): return run_scraper(url, depth)
- 在 FastAPI 中調用 Celery 任務:python
@app.post("/scrape") async def start_scrape_task(request: ScrapeRequest): task = crawl_task.delay(request.url, request.depth) return {"task_id": task.id}
- 啟動 Celery Worker:bash
celery -A your_celery_module worker --loglevel=info --pool=eventlet
注意:Windows 上建議使用 eventlet 或 gevent 作為 Celery 的 Worker 池,因為 Windows 不支援 fork。
6. 監控與日誌
- 日誌:使用 logging 或 loguru 記錄爬蟲狀態:python
from loguru import logger logger.add("crawler.log", rotation="10 MB") logger.info(f"Starting crawl for {url}")
- 監控:
- 使用 prometheus_client 監控爬蟲性能,暴露指標給 Prometheus。
- 示例:python
from prometheus_client import Counter, start_http_server requests_total = Counter('crawler_requests_total', 'Total requests made') start_http_server(8001) # 監控端點 requests_total.inc() # 在爬蟲請求時增加計數
7. 程式碼整合與執行
以下是整合後的 main.py,包含所有主要功能:
python
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from sqlalchemy import create_engine, Column, Integer, String, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import redis
import aiohttp
import asyncio
from bs4 import BeautifulSoup
import uuid
# FastAPI 應用
app = FastAPI(title="High-Concurrency Crawler System")
# MySQL 配置
Base = declarative_base()
class WebPage(Base):
__tablename__ = 'webpages'
id = Column(Integer, primary_key=True)
url = Column(String(255), unique=True)
title = Column(String(255))
content = Column(Text)
engine = create_engine('mysql+pymysql://root:your_password@localhost/crawler_db', pool_size=20, max_overflow=10)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
# Redis 配置
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Pydantic 模型
class ScrapeRequest(BaseModel):
url: str
depth: int = 1
# Redis 任務隊列
def add_task_to_queue(url: str, depth: int):
task_id = str(uuid.uuid4())
task_info = {"url": url, "depth": depth, "status": "pending"}
redis_client.hset(f"task:{task_id}", mapping=task_info)
redis_client.rpush("scraper_queue", task_id)
redis_client.expire(f"task:{task_id}", 3600)
return task_id
def get_task_status(task_id: str):
return redis_client.hgetall(f"task:{task_id}")
# 爬蟲邏輯
async def fetch_page(url):
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, timeout=30) as response:
if response.status == 200:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
title = soup.title.string if soup.title else 'No Title'
return {'url': url, 'title': title, 'content': html}
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
# API 端點
@app.post("/scrape")
async def start_scrape_task(request: ScrapeRequest, background_tasks: BackgroundTasks):
task_id = add_task_to_queue(request.url, request.depth)
background_tasks.add_task(run_scraper_async, task_id, request.url, request.depth)
return {"message": "Scraping task initiated", "task_id": task_id}
@app.get("/task/{task_id}/status")
async def get_task_status_endpoint(task_id: str):
status = get_task_status(task_id)
if not status:
return {"message": "Task not found or expired"}
return {"task_id": task_id, "status": status}
async def run_scraper_async(task_id: str, url: str, depth: int):
redis_client.hset(f"task:{task_id}", "status", "running")
try:
data = await fetch_page(url)
if data:
db = Session()
webpage = WebPage(url=data['url'], title=data['title'], content=data['content'])
db.add(webpage)
db.commit()
db.close()
redis_client.hset(f"task:{task_id}", "status", "completed")
redis_client.set(f"crawled:{url}", data['title'], ex=3600)
else:
redis_client.hset(f"task:{task_id}", "status", "failed")
except Exception as e:
redis_client.hset(f"task:{task_id}", "status", f"failed: {str(e)}")
print(f"Error in task {task_id}: {e}")
# Waitress 部署
if __name__ == "__main__":
from waitress import serve
serve(app, host="0.0.0.0", port=8000, threads=16)
8. 測試與部署
- 啟動服務:
- 啟動 XAMPP(MySQL)。
- 啟動 Redis(redis-server.exe 或 Windows 服務)。
- 運行 python main.py 啟動 Waitress。
- 測試 API:
- 添加任務:bash
curl -X POST "http://localhost:8000/scrape" -H "Content-Type: application/json" -d '{"url": "https://example.com", "depth": 1}'
- 檢查任務狀態:bash
curl "http://localhost:8000/task/<task_id>/status"
- 驗證數據:
- 在 phpMyAdmin 查看 webpages 表中的數據。
- 使用 redis-cli 檢查隊列和快取:bash
lrange scraper_queue 0 -1 hgetall task:<task_id>
9. 常見問題與解決方案
- Redis 連接失敗:檢查 Redis 是否正在運行,確保 redis.windows.conf 中的 bind 和 port 設置正確。
- MySQL 性能瓶頸:檢查慢查詢日誌,優化索引和查詢。
- Waitress 性能不足:考慮使用 WSL2 運行 Gunicorn + Uvicorn,或增加 Waitress 的 threads 數。
- 反爬限制:實現代理池、隨機延遲、User-Agent 輪換。
沒有留言:
張貼留言