2025年6月9日 星期一

如何在 Windows 11 底下開發一個基於 XAMPP + Waitress + FastAPI 的高併發爬蟲系統?

要在 Windows 11 底下開發一個基於 XAMPP + Waitress + FastAPI 的高併發爬蟲系統,同時使用 MySQL 和 Redis,這是一個相當複雜但也非常有挑戰性的任務。下面我會將整個開發流程拆解成幾個主要部分,並提供詳細的步驟和注意事項。

重要提醒:

  • XAMPP 的角色: 在這個架構中,XAMPP 的 Apache 和 MySQL 部分仍然有用,但 PHP 不再是爬蟲系統的核心語言。FastAPI 將承擔大部分後端邏輯。
  • 高併發: 高併發通常需要非同步處理。FastAPI 本身支援非同步,但在 Windows 上部署和優化需要特別注意。
  • Waitress: Waitress 是一個純 Python WSGI 服務器,適合開發和中小型部署。對於真正的高併發生產環境,你可能會考慮更專業的 ASGI 服務器,如 Uvicorn + Gunicorn (但 Gunicorn 不直接支援 Windows,需要用其他方式組合)。不過,對於開發階段和中小型應用,Waitress 已經足夠。
  • MySQL & Redis: 這些是數據存儲和快取層,它們的安裝和配置是基礎。
  • 爬蟲: 爬蟲本身需要考慮反爬、代理、多線程/多進程/異步等。

系統架構概覽

  1. 數據庫層 (MySQL): 透過 XAMPP 提供,用於儲存爬取到的數據。
  2. 快取層 (Redis): 用於任務隊列、快取、分布式鎖等,以提升爬蟲效率和系統響應速度。
  3. FastAPI (Python): 這是爬蟲系統的核心後端服務,負責:
    • 接收爬蟲任務請求。
    • 管理爬蟲進程/協程。
    • 處理數據儲存到 MySQL。
    • 利用 Redis 進行任務調度、狀態管理。
    • 提供 API 接口供前端或其他服務呼叫。
  4. Waitress: 作為 FastAPI 應用程序的 WSGI 服務器。
  5. 爬蟲模塊 (Python): 獨立的 Python 腳本或模塊,由 FastAPI 觸發或管理,執行實際的網頁爬取工作。
  6. XAMPP (Apache): 可選,如果你需要 PHPMyAdmin 來管理 MySQL,或者有其他 PHP 應用,XAMPP 仍然有用。但對於 FastAPI 服務,Apache 不會直接服務它。

開發環境搭建步驟

第一部分:基礎服務安裝 (MySQL & Redis)

  1. 安裝 XAMPP (MySQL 部分):

    • 前往 Apache Friends 官網 下載並安裝 XAMPP。
    • 安裝時確保勾選 MySQL
    • 安裝完成後,啟動 XAMPP 控制面板,啟動 ApacheMySQL 服務。
    • 你可以通過訪問 http://localhost/phpmyadmin 來驗證 MySQL 是否正常運行。
    • 重要: 為 MySQL 創建一個專門的用戶和數據庫,供爬蟲系統使用,並賦予必要的權限。
  2. 安裝 Redis for Windows (第三方適配版本):

    • 官方 Redis 不直接支援 Windows。你可以使用 tporadowski/redis 提供的預編譯版本。
    • 下載最新的 Redis-x64-版本號.zip 文件。
    • 解壓縮到一個你喜歡的目錄,例如 C:\Redis
    • 啟動 Redis 服務:
      • 打開 CMD 或 PowerShell,進入 Redis 目錄。
      • 執行 redis-server.exe redis.windows.conf
      • 推薦: 將 Redis 配置為 Windows 服務,讓它開機自動啟動:
        Bash
        redis-server.exe --service-install redis.windows.conf --service-name "RedisService"
        redis-server.exe --service-start --service-name "RedisService"
        
        你也可以在 Windows 的服務管理工具中找到 "RedisService" 並設置其啟動類型為自動。

第二部分:Python 環境和依賴安裝

  1. 安裝 Python:

    • Python 官網 下載並安裝最新版本的 Python 3 (建議 3.9+)。
    • 確保在安裝時勾選 "Add Python to PATH",這樣你就可以在 CMD 或 PowerShell 中直接使用 pythonpip 命令。
  2. 創建虛擬環境 (Virtual Environment):

    • 打開 CMD 或 PowerShell。
    • 進入你的項目目錄,例如 cd D:\crawler_project
    • 創建虛擬環境:python -m venv venv
    • 激活虛擬環境:venv\Scripts\activate (Windows)
    • 激活後,命令提示符前會出現 (venv),表示你已進入虛擬環境。所有後續的 pip 安裝都將在這個環境中。
  3. 安裝必要的 Python 庫:

    • 在虛擬環境中執行:
      Bash
      pip install fastapi uvicorn waitress pymysql redis beautifulsoup4 requests httpx  # httpx 用於異步請求
      pip install "python-dotenv[cli]" # 用於管理環境變量
      pip install "mysql-connector-python" # 也可以用 PyMySQL,看你習慣
      
      • fastapi: Web 框架。
      • uvicorn: ASGI 服務器,儘管我們用 Waitress 部署,Uvicorn 在開發時很有用。
      • waitress: WSGI 服務器,用於在 Windows 上部署。
      • pymysqlmysql-connector-python: Python 連接 MySQL 的庫。
      • redis: Python 連接 Redis 的庫。
      • beautifulsoup4, requests, httpx: 爬蟲相關庫。httpx 支援異步請求,適合高併發爬蟲。

第三部分:FastAPI 應用開發

這裡只提供基本結構,具體邏輯需要根據你的爬蟲需求實現。

  1. 項目結構示例:

    crawler_project/
    ├── venv/
    ├── .env                # 環境變量文件
    ├── main.py             # FastAPI 應用主入口
    ├── database.py         # MySQL 連接和操作
    ├── redis_client.py     # Redis 連接和操作
    ├── scraper.py          # 爬蟲核心邏輯
    └── requirements.txt    # 項目依賴
    
  2. main.py (FastAPI 應用主入口):

    Python
    from fastapi import FastAPI, BackgroundTasks, HTTPException
    from pydantic import BaseModel
    import asyncio
    
    from database import get_db_connection, insert_data_to_mysql
    from redis_client import get_redis_client, add_task_to_queue, get_task_status
    from scraper import run_scraper
    
    app = FastAPI(title="High-Concurrency Crawler System")
    
    class ScrapeRequest(BaseModel):
        url: str
        depth: int = 1
    
    @app.on_event("startup")
    async def startup_event():
        print("Application startup...")
        # 可以在這裡初始化數據庫連接池、Redis 連接等
        # 例如:await database.init_db_pool()
        # 例如:await redis_client.init_redis_pool()
    
    @app.on_event("shutdown")
    async def shutdown_event():
        print("Application shutdown...")
        # 清理資源,例如關閉數據庫連接、Redis 連接
        # 例如:await database.close_db_pool()
        # 例如:await redis_client.close_redis_pool()
    
    @app.post("/scrape/", tags=["Scraping Tasks"])
    async def start_scrape_task(request: ScrapeRequest, background_tasks: BackgroundTasks):
        task_id = add_task_to_queue(request.url, request.depth) # 將任務加入 Redis 隊列
        # 在後台任務中執行爬蟲,不阻塞 API 響應
        background_tasks.add_task(run_scraper_async, task_id, request.url, request.depth)
        return {"message": "Scraping task initiated", "task_id": task_id}
    
    @app.get("/task/{task_id}/status", tags=["Scraping Tasks"])
    async def get_task_status_endpoint(task_id: str):
        status = get_task_status(task_id)
        if not status:
            raise HTTPException(status_code=404, detail="Task not found or expired")
        return {"task_id": task_id, "status": status}
    
    # 模擬異步執行爬蟲
    async def run_scraper_async(task_id: str, url: str, depth: int):
        print(f"Starting async scraper for task_id: {task_id}, url: {url}")
        try:
            # 實際的爬蟲邏輯將在這裡被調用
            scraped_data = await run_scraper(url, depth) # run_scraper 應該是異步的
            # 將數據儲存到 MySQL
            conn = await get_db_connection()
            if conn:
                await insert_data_to_mysql(conn, scraped_data)
                conn.close() # 關閉連接
            else:
                print("Failed to get DB connection for inserting data.")
            # 更新 Redis 中的任務狀態
            # 例如:update_task_status(task_id, "completed")
            print(f"Scraping task {task_id} completed.")
        except Exception as e:
            print(f"Error in scraping task {task_id}: {e}")
            # 例如:update_task_status(task_id, f"failed: {e}")
    
    # For local development (optional, Waitress will handle in production-like)
    if __name__ == "__main__":
        import uvicorn
        uvicorn.run(app, host="0.0.0.0", port=8000)
    
  3. database.py (MySQL 操作):

    Python
    import pymysql.cursors
    import os
    from dotenv import load_dotenv
    
    load_dotenv() # 加載 .env 文件中的環境變量
    
    async def get_db_connection():
        try:
            conn = pymysql.connect(
                host=os.getenv("MYSQL_HOST", "localhost"),
                user=os.getenv("MYSQL_USER", "root"),
                password=os.getenv("MYSQL_PASSWORD", ""),
                database=os.getenv("MYSQL_DB", "crawler_db"),
                charset='utf8mb4',
                cursorclass=pymysql.cursors.DictCursor
            )
            return conn
        except pymysql.Error as e:
            print(f"Error connecting to MySQL: {e}")
            return None
    
    async def insert_data_to_mysql(conn, data):
        if not conn or not data:
            return
    
        # 這裡的邏輯需要根據你的爬取數據結構和數據庫表結構來實現
        # 假設 data 是一個字典列表,每個字典代表一條記錄
        try:
            with conn.cursor() as cursor:
                # 這裡是範例,你需要替換成實際的 INSERT 語句
                sql = "INSERT INTO scraped_items (title, url, content) VALUES (%s, %s, %s)"
                # 假設 data 是一個字典,包含 'title', 'url', 'content'
                cursor.execute(sql, (data.get('title'), data.get('url'), data.get('content')))
            conn.commit()
            print("Data inserted successfully.")
        except Exception as e:
            conn.rollback()
            print(f"Error inserting data: {e}")
    
    # 更多數據庫操作函數...
    
  4. redis_client.py (Redis 操作):

    Python
    import redis
    import os
    from dotenv import load_dotenv
    import uuid
    
    load_dotenv()
    
    _redis_client = None
    
    def get_redis_client():
        global _redis_client
        if _redis_client is None:
            try:
                _redis_client = redis.StrictRedis(
                    host=os.getenv("REDIS_HOST", "localhost"),
                    port=int(os.getenv("REDIS_PORT", 6379)),
                    db=int(os.getenv("REDIS_DB", 0)),
                    decode_responses=True # 自動解碼,方便操作字符串
                )
                _redis_client.ping() # 測試連接
                print("Connected to Redis.")
            except redis.exceptions.ConnectionError as e:
                print(f"Error connecting to Redis: {e}")
                _redis_client = None
        return _redis_client
    
    def add_task_to_queue(url: str, depth: int):
        r = get_redis_client()
        if r:
            task_id = str(uuid.uuid4())
            task_info = {"url": url, "depth": depth, "status": "pending"}
            r.hset(f"task:{task_id}", mapping=task_info) # 將任務信息儲存為 hash
            r.rpush("scraper_queue", task_id) # 將任務 ID 加入隊列
            r.expire(f"task:{task_id}", 3600) # 設置任務信息過期時間 (1小時)
            return task_id
        return None
    
    def get_task_status(task_id: str):
        r = get_redis_client()
        if r:
            return r.hgetall(f"task:{task_id}")
        return None
    
    def update_task_status(task_id: str, status: str):
        r = get_redis_client()
        if r:
            r.hset(f"task:{task_id}", "status", status)
    
  5. scraper.py (爬蟲核心邏輯):

    Python
    import httpx # 用於異步 HTTP 請求
    from bs4 import BeautifulSoup
    import asyncio
    
    async def fetch_url(url: str):
        async with httpx.AsyncClient() as client:
            try:
                response = await client.get(url, timeout=30)
                response.raise_for_status() # 檢查 HTTP 響應狀態碼
                return response.text
            except httpx.RequestError as e:
                print(f"An error occurred while requesting {url}: {e}")
                return None
            except httpx.HTTPStatusError as e:
                print(f"Error response {e.response.status_code} while requesting {url}: {e}")
                return None
    
    async def parse_html(html_content: str):
        if not html_content:
            return {}
        soup = BeautifulSoup(html_content, 'html.parser')
        # 這裡寫你的解析邏輯,例如提取標題、鏈接、內容等
        title = soup.title.string if soup.title else "No Title"
        # 簡單範例:提取所有段落文本
        paragraphs = [p.get_text() for p in soup.find_all('p')]
        content = "\n".join(paragraphs)
        return {"title": title, "content": content}
    
    async def run_scraper(url: str, depth: int = 1):
        """
        異步執行爬蟲邏輯
        """
        print(f"Scraping: {url} with depth {depth}")
        html_content = await fetch_url(url)
        if html_content:
            parsed_data = await parse_html(html_content)
            parsed_data['url'] = url # 將 URL 加入數據中
            print(f"Scraped data for {url}: {parsed_data['title']}")
            # 在這裡,你可以將爬取到的數據返回,讓調用者(FastAPI)處理儲存
            return parsed_data
        return None
    
    # 如果需要爬取多個鏈接,可以在此處實現遞歸或任務分發
    # 例如:
    # async def crawl_recursive(url: str, current_depth: int, max_depth: int):
    #     if current_depth > max_depth:
    #         return
    #     html = await fetch_url(url)
    #     # 解析鏈接
    #     # 將新鏈接加入隊列或觸發新的爬蟲任務
    
  6. .env 文件:

    MYSQL_HOST=localhost
    MYSQL_USER=root
    MYSQL_PASSWORD=
    MYSQL_DB=crawler_db
    
    REDIS_HOST=localhost
    REDIS_PORT=6379
    REDIS_DB=0
    
  7. requirements.txt

    fastapi
    uvicorn
    waitress
    pymysql
    redis
    beautifulsoup4
    requests
    httpx
    python-dotenv
    

第四部分:運行和部署

  1. 開發階段運行 (使用 Uvicorn):

    • 打開 CMD 或 PowerShell,進入你的項目目錄。
    • 激活虛擬環境:venv\Scripts\activate
    • 運行 FastAPI 應用:uvicorn main:app --reload --host 0.0.0.0 --port 8000
    • --reload 可以在代碼更改時自動重啟服務,方便開發。
    • 打開瀏覽器訪問 http://localhost:8000/docs 可以看到 FastAPI 的自動文檔 (Swagger UI)。
    • 你可以通過 Postman 或其他工具向 /scrape/ 接口發送 POST 請求來測試爬蟲任務。
  2. 生產環境部署 (使用 Waitress):

    • Waitress 是一個簡單的 WSGI 服務器,適合 Windows 環境。
    • 在項目根目錄下,創建一個啟動腳本,例如 run_waitress.py (或者直接在 CMD 中運行):
      Python
      # run_waitress.py
      from waitress import serve
      from main import app # 從你的 main.py 中導入 FastAPI 應用
      
      if __name__ == "__main__":
          print("Starting FastAPI application with Waitress...")
          # listen 參數設置為 0.0.0.0:8000 允許外部訪問
          # threads 參數控制並發處理請求的線程數,可以根據需求調整
          serve(app, host="0.0.0.0", port=8000, threads=10)
      
    • 在 CMD 或 PowerShell 中,激活虛擬環境,然後運行:python run_waitress.py
    • 現在你的 FastAPI 應用就會通過 Waitress 服務器運行在 http://localhost:8000
    • 注意: Waitress 雖然可以處理並發請求,但它是基於線程的。對於 CPU 密集型任務或非常高的 I/O 併發,你可能需要考慮更複雜的部署策略 (例如:Windows Server 上的 IIS 反向代理到 Uvicorn/Gevent/Asyncio workers)。

高併發爬蟲優化考慮

  1. 異步 I/O: FastAPI 和 httpx 本身就是異步的,這對於處理大量的網絡 I/O 非常關鍵。確保你的爬蟲邏輯也盡可能使用 async/await
  2. 任務隊列 (Redis): 使用 Redis 作為任務隊列 (例如 RPUSH/LPOP) 可以實現任務的分發和解耦。你可以有多個爬蟲工作者 (可以是獨立的 Python 進程),它們從 Redis 隊列中拉取任務並執行。
  3. 多進程/多線程 (Python):
    • Python 的 GIL (Global Interpreter Lock): 限制了單個 Python 進程的 CPU 密集型任務只能在一個 CPU 核上運行。
    • I/O 密集型任務 (爬蟲): 由於大部分時間花在等待網絡響應上,GIL 的影響相對較小。異步 I/O 已經能提供很好的並發。
    • 多進程: 如果你的爬蟲涉及到大量的 CPU 密集型解析或處理,可以考慮使用 multiprocessing 模塊來創建多個 Python 進程,每個進程運行獨立的爬蟲實例,並從 Redis 隊列中獲取任務。
    • FastAPI 的 BackgroundTasks 適合處理輕量級的後台任務。對於長時間運行的爬蟲任務,更建議將任務加入 Redis 隊列,然後由獨立的 Worker 進程從隊列中拉取並執行。
  4. 代理池/IP 輪換: 避免被目標網站封鎖,使用代理服務。
  5. 用戶代理 (User-Agent) 輪換: 模擬不同的瀏覽器。
  6. 錯誤處理和重試機制: 針對網絡錯誤、解析錯誤等情況,實現健壯的重試邏輯。
  7. 數據持久化: 定期將 Redis 中的數據持久化到磁盤,或確保 MySQL 是主要數據存儲。
  8. 監控和日誌: 實時監控爬蟲進度、錯誤率、資源使用情況。使用 Python 的 logging 模塊記錄關鍵信息。
  9. 反爬機制: 根據目標網站的反爬策略,可能需要處理驗證碼、JS 渲染、Cookie 管理等。

Windows 部署注意事項

  • 防火牆: 確保 Windows 防火牆允許你的 FastAPI 應用使用的端口 (例如 8000) 被外部訪問。
  • 後台運行: 如果你希望爬蟲系統在 Windows 啟動時自動運行,並且即使你退出登錄也能繼續運行,你可以考慮將 Waitress 啟動的 Python 腳本註冊為 Windows 服務。有很多第三方工具可以幫助完成這個,例如 NSSM (Non-Sucking Service Manager)。
  • 資源管理: 監控 CPU、內存和網絡使用情況,確保系統資源充足。


補充與優化建議
1. 高併發爬蟲的核心優化
在高併發場景下,爬蟲的性能瓶頸主要來自於 I/O 等待(網絡請求)和 CPU 密集型任務(如 HTML 解析)。以下是一些針對高併發的具體優化建議:
  • 非同步爬蟲的批量處理
    • 使用 asyncio.gather 同時處理多個 URL 的爬取,提升吞吐量。例如:
      python
      async def fetch_multiple_urls(urls):
          async with aiohttp.ClientSession() as session:
              tasks = [fetch_page(url, session) for url in urls]
              return await asyncio.gather(*tasks, return_exceptions=True)
      
      async def fetch_page(url, session):
          try:
              async with session.get(url, timeout=30) as response:
                  if response.status == 200:
                      html = await response.text()
                      soup = BeautifulSoup(html, 'html.parser')
                      title = soup.title.string if soup.title else 'No Title'
                      return {'url': url, 'title': title, 'content': html}
          except Exception as e:
              print(f"Error fetching {url}: {e}")
              return None
    • 這種方式可以同時發送多個請求,減少等待時間。
  • 限制併發數
    • 使用 asyncio.Semaphore 限制同時發送的請求數,避免壓垮目標網站或本地資源:
      python
      async def fetch_page_with_semaphore(url, session, semaphore):
          async with semaphore:
              return await fetch_page(url, session)
      
      async def fetch_multiple_urls(urls, max_concurrent=10):
          semaphore = asyncio.Semaphore(max_concurrent)
          async with aiohttp.ClientSession() as session:
              tasks = [fetch_page_with_semaphore(url, session, semaphore) for url in urls]
              return await asyncio.gather(*tasks, return_exceptions=True)
  • 代理池與反爬機制
    • 代理池:使用免費或付費代理服務(如 ScrapingBee、BrightData),並在 aiohttp 中配置代理:
      python
      async def fetch_page_with_proxy(url, proxy=None):
          async with aiohttp.ClientSession() as session:
              async with session.get(url, proxy=proxy, timeout=30) as response:
                  if response.status == 200:
                      return await response.text()
          return None
    • User-Agent 輪換:使用 fake-useragent 套件隨機生成 User-Agent:
      python
      from fake_useragent import UserAgent
      ua = UserAgent()
      headers = {'User-Agent': ua.random}
      async with session.get(url, headers=headers) as response:
          ...
    • 處理 JavaScript 渲染:如果目標網站需要 JavaScript 渲染,考慮使用 playwrightselenium(搭配無頭瀏覽器)。
  • 重試機制
    • 使用 tenacity 套件為爬蟲請求添加自動重試:
      python
      from tenacity import retry, stop_after_attempt, wait_exponential
      
      @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
      async def fetch_page(url, session):
          async with session.get(url, timeout=30) as response:
              response.raise_for_status()
              return await response.text()
2. Waitress 的高併發配置
  • 線程數優化
    • Waitress 的 threads 參數決定了同時處理的請求數。根據 CPU 核心數和記憶體情況,建議設置為 4 * CPU核心數8 * CPU核心數。例如,4 核心 CPU 可設置 threads=16threads=32
    • 示例:
      python
      serve(app, host="0.0.0.0", port=8000, threads=16)
  • 限制:Waitress 是基於線程的 WSGI 伺服器,對於 CPU 密集型任務(如解析大規模 HTML),可能需要結合 multiprocessing 或分佈式 Worker。
  • 替代方案
    • 如果 Waitress 的性能不足,考慮在 WSL2 中運行 gunicorn + uvicorn,因為 Gunicorn 支援多進程,而 Uvicorn 支援 ASGI 的非同步處理。例如:
      bash
      gunicorn -k uvicorn.workers.UvicornWorker -w 4 main:app
    • 在 Windows 上,uvicorn 單獨運行也是一個不錯的選擇,特別是在開發階段。
3. MySQL 與 Redis 的高效使用
  • MySQL 優化
    • 連接池:使用 SQLAlchemy 的連接池來管理 MySQL 連接:
      python
      from sqlalchemy import create_engine
      engine = create_engine(
          'mysql+pymysql://root:your_password@localhost/crawler_db',
          pool_size=20,
          max_overflow=10,
          pool_timeout=30
      )
    • 批量插入:將爬取的數據收集到一定數量後批量插入 MySQL,減少資料庫 I/O:
      python
      def insert_batch_to_mysql(session, data_list):
          session.bulk_insert_mappings(WebPage, data_list)
          session.commit()
  • Redis 優化
    • 管道 (Pipeline):減少 Redis 的網絡往返時間:
      python
      def add_multiple_tasks(urls):
          r = redis_client.pipeline()
          for url in urls:
              r.lpush('crawler_queue', url)
          r.execute()
    • 持久化與備份:配置 Redis 的 save 指令,確保數據定期寫入磁盤(修改 redis.windows.conf 中的 save 設置)。
    • 分片與集群:如果任務量極大,考慮部署 Redis Cluster(需更多伺服器資源)。
4. Windows 環境的特殊處理
  • Redis 安裝
    • 如果 tporadowski/redis 不穩定,強烈建議使用 WSL2 運行 Ubuntu,然後在其中安裝官方 Redis:
      bash
      sudo apt update
      sudo apt install redis
      sudo systemctl enable redis
      sudo systemctl start redis
    • 在 Windows 與 WSL2 之間共享 Redis,可設置 Redis 的 bind 地址為 0.0.0.0,並在 Windows 防火牆中開放 6379 端口。
  • Windows 服務
    • 使用 NSSM 將 Waitress 設置為 Windows 服務:
      bash
      nssm install CrawlerService python run_waitress.py
      nssm start CrawlerService
  • 防火牆與端口
    • 確保 Windows 防火牆允許 8000(FastAPI)、3306(MySQL)、6379(Redis)端口的入站連接。
5. 分佈式爬蟲(進階)
如果需要更高的併發性和可擴展性,可以引入 Celery 作為任務分發框架:
  • 安裝 Celery:
    bash
    pip install celery
  • 配置 Celery 使用 Redis 作為消息代理:
    python
    from celery import Celery
    
    app = Celery('crawler', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
    app.conf.task_serializer = 'json'
    app.conf.result_serializer = 'json'
    app.conf.accept_content = ['json']
    app.conf.task_track_started = True
    
    @app.task
    def crawl_task(url, depth):
        return run_scraper(url, depth)
  • 在 FastAPI 中調用 Celery 任務:
    python
    @app.post("/scrape")
    async def start_scrape_task(request: ScrapeRequest):
        task = crawl_task.delay(request.url, request.depth)
        return {"task_id": task.id}
  • 啟動 Celery Worker:
    bash
    celery -A your_celery_module worker --loglevel=info --pool=eventlet
    注意:Windows 上建議使用 eventletgevent 作為 Celery 的 Worker 池,因為 Windows 不支援 fork
6. 監控與日誌
  • 日誌:使用 loggingloguru 記錄爬蟲狀態:
    python
    from loguru import logger
    
    logger.add("crawler.log", rotation="10 MB")
    logger.info(f"Starting crawl for {url}")
  • 監控
    • 使用 prometheus_client 監控爬蟲性能,暴露指標給 Prometheus。
    • 示例:
      python
      from prometheus_client import Counter, start_http_server
      
      requests_total = Counter('crawler_requests_total', 'Total requests made')
      start_http_server(8001)  # 監控端點
      requests_total.inc()  # 在爬蟲請求時增加計數

7. 程式碼整合與執行
以下是整合後的 main.py,包含所有主要功能:
python
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from sqlalchemy import create_engine, Column, Integer, String, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import redis
import aiohttp
import asyncio
from bs4 import BeautifulSoup
import uuid

# FastAPI 應用
app = FastAPI(title="High-Concurrency Crawler System")

# MySQL 配置
Base = declarative_base()
class WebPage(Base):
    __tablename__ = 'webpages'
    id = Column(Integer, primary_key=True)
    url = Column(String(255), unique=True)
    title = Column(String(255))
    content = Column(Text)

engine = create_engine('mysql+pymysql://root:your_password@localhost/crawler_db', pool_size=20, max_overflow=10)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

# Redis 配置
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# Pydantic 模型
class ScrapeRequest(BaseModel):
    url: str
    depth: int = 1

# Redis 任務隊列
def add_task_to_queue(url: str, depth: int):
    task_id = str(uuid.uuid4())
    task_info = {"url": url, "depth": depth, "status": "pending"}
    redis_client.hset(f"task:{task_id}", mapping=task_info)
    redis_client.rpush("scraper_queue", task_id)
    redis_client.expire(f"task:{task_id}", 3600)
    return task_id

def get_task_status(task_id: str):
    return redis_client.hgetall(f"task:{task_id}")

# 爬蟲邏輯
async def fetch_page(url):
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(url, timeout=30) as response:
                if response.status == 200:
                    html = await response.text()
                    soup = BeautifulSoup(html, 'html.parser')
                    title = soup.title.string if soup.title else 'No Title'
                    return {'url': url, 'title': title, 'content': html}
        except Exception as e:
            print(f"Error fetching {url}: {e}")
            return None

# API 端點
@app.post("/scrape")
async def start_scrape_task(request: ScrapeRequest, background_tasks: BackgroundTasks):
    task_id = add_task_to_queue(request.url, request.depth)
    background_tasks.add_task(run_scraper_async, task_id, request.url, request.depth)
    return {"message": "Scraping task initiated", "task_id": task_id}

@app.get("/task/{task_id}/status")
async def get_task_status_endpoint(task_id: str):
    status = get_task_status(task_id)
    if not status:
        return {"message": "Task not found or expired"}
    return {"task_id": task_id, "status": status}

async def run_scraper_async(task_id: str, url: str, depth: int):
    redis_client.hset(f"task:{task_id}", "status", "running")
    try:
        data = await fetch_page(url)
        if data:
            db = Session()
            webpage = WebPage(url=data['url'], title=data['title'], content=data['content'])
            db.add(webpage)
            db.commit()
            db.close()
            redis_client.hset(f"task:{task_id}", "status", "completed")
            redis_client.set(f"crawled:{url}", data['title'], ex=3600)
        else:
            redis_client.hset(f"task:{task_id}", "status", "failed")
    except Exception as e:
        redis_client.hset(f"task:{task_id}", "status", f"failed: {str(e)}")
        print(f"Error in task {task_id}: {e}")

# Waitress 部署
if __name__ == "__main__":
    from waitress import serve
    serve(app, host="0.0.0.0", port=8000, threads=16)

8. 測試與部署
  1. 啟動服務
    • 啟動 XAMPP(MySQL)。
    • 啟動 Redis(redis-server.exe 或 Windows 服務)。
    • 運行 python main.py 啟動 Waitress。
  2. 測試 API
    • 添加任務:
      bash
      curl -X POST "http://localhost:8000/scrape" -H "Content-Type: application/json" -d '{"url": "https://example.com", "depth": 1}'
    • 檢查任務狀態:
      bash
      curl "http://localhost:8000/task/<task_id>/status"
  3. 驗證數據
    • 在 phpMyAdmin 查看 webpages 表中的數據。
    • 使用 redis-cli 檢查隊列和快取:
      bash
      lrange scraper_queue 0 -1
      hgetall task:<task_id>

9. 常見問題與解決方案
  • Redis 連接失敗:檢查 Redis 是否正在運行,確保 redis.windows.conf 中的 bindport 設置正確。
  • MySQL 性能瓶頸:檢查慢查詢日誌,優化索引和查詢。
  • Waitress 性能不足:考慮使用 WSL2 運行 Gunicorn + Uvicorn,或增加 Waitress 的 threads 數。
  • 反爬限制:實現代理池、隨機延遲、User-Agent 輪換。
     

沒有留言:

張貼留言

網誌存檔