【用 Laravel + FastAPI 打造事件驅動的多租戶 SaaS 架構】
教學目標
本教學旨在引導讀者深入了解如何整合 Laravel 10 (PHP) 與 FastAPI (Python),從零開始建立一個現代化的事件驅動微服務 SaaS 平台。核心目標包括:
掌握多租戶 (Multi-tenant) 架構的設計與實現,特別是基於數據列隔離 (Row-based Isolation) 的方法。
理解基於角色的訪問控制 (RBAC) 在多租戶環境下的應用。
學習如何透過訊息佇列 (RabbitMQ) 實現服務間的解耦通訊。
探索 IoT (物聯網) 模組與區塊鏈整合的實作細節,包括即時數據處理與交易可靠性。
了解前端 (Vue 3) 與後端微服務的協同工作方式,以及 CI/CD 流程在複雜系統中的作用。
適用對象
本教學適用於對 Laravel 或 Python 具有基本實作經驗,並希望進一步探索模組化、微服務化、多租戶架構設計的開發者。特別適合期望將新興技術如 IoT 和區塊鏈應用於實際商業場景的工程師。
1. 系統概覽:StaySync - 智慧酒店 SaaS 平台
StaySync 是一個專為酒店業打造的多租戶 SaaS 平台,旨在提升酒店營運效率並提供清晰的架構參考。它結合了 Laravel 的穩健、Vue 3 的響應式前端和 FastAPI 的高效微服務能力,實現了事件驅動、IoT 整合、動態定價及區塊鏈應用等多元功能。
專案亮點回顧:
多租戶架構:採用
stancl/tenancy
實現數據列隔離,確保各租戶數據獨立。事件驅動微服務:透過 RabbitMQ 實現 Laravel 與 FastAPI 服務的異步解耦通訊。
IoT 與區塊鏈整合:FastAPI 處理 IoT 設備通訊 (EMQX MQTT) 和區塊鏈交易 (Ganache,
web3.py
)。前端體驗:Vue 3 提供多語言響應式界面,並透過 WebSocket 實現即時狀態更新。
DevOps 就緒:完整的 Docker Compose 配置與 GitHub Actions CI/CD 流水線,支援自動化測試、構建和金絲雀部署。
2. 架構圖解析
StaySync 的系統架構設計旨在提供高內聚低耦合的模組化方案,以下圖示化各組件及其互動流程:
用戶端 (Client):包含瀏覽器中的 Vue 3 前端應用,提供旅客入口和後台管理介面。前端透過 API 請求和 WebSocket 連線與後端服務溝通。
API 網關 (API Gateway) - Nginx:
作為所有外部請求的單一入口點,負責反向代理、SSL 終止。
根據子域名 (例如
tenanta.localhost
) 進行多租戶路由,將請求導向正確的 Laravel 或 FastAPI 服務。處理
/health
健康檢查端點。
Laravel 核心後端 (Laravel Core Backend):
主要負責核心業務邏輯,包括 RBAC (角色權限管理)、訂房管理、房源管理、SEO 內容及文化模組等。
透過
stancl/tenancy
確保數據行級隔離 (Row-based Isolation)。與 MySQL 互動,儲存租戶資料、訂單、用戶等結構化數據。
使用 Redis 進行快取和 Session 管理,提升性能。
透過事件發布機制 (Event System) 將訊息發送到 RabbitMQ。
FastAPI 微服務 (FastAPI Microservices):
提供高效且專精的微服務,負責 IoT 設備控制、動態定價演算法和區塊鏈互動。
InfluxDB:專門用於儲存 IoT 設備傳感器的時間序列數據。
Ganache:作為本地區塊鏈模擬器,用於開發和測試區塊鏈應用。
EMQX:高效能的 MQTT 訊息代理,用於 IoT 設備的實時通訊。
FastAPI 服務會消費來自 RabbitMQ 的訊息,並根據業務邏輯執行相應操作。
訊息佇列層 (Message Queue Layer) - RabbitMQ:
作為 Laravel 和 FastAPI 微服務之間的核心溝通橋樑,實現服務的異步解耦。
採用 Topic Exchange 模型,允許服務根據不同的訊息主題進行發布和訂閱。
確保系統在高併發下的穩定性,並提升容錯能力。
3. Docker Compose 實作
為了方便開發與部署,StaySync 採用 Docker Compose 管理所有服務。以下是一個簡化的 docker-compose.yml
範例,突出關鍵服務與健康檢查:
version: '3.8'
services:
nginx:
image: nginx:latest
container_name: staysync_nginx
ports:
- "80:80"
- "443:443"
volumes:
- ./docker/nginx/nginx.conf:/etc/nginx/nginx.conf:ro # Nginx 配置
- ./docker/nginx/certs:/etc/nginx/certs:ro # SSL 憑證
depends_on:
laravel: { condition: service_healthy }
fastapi: { condition: service_healthy }
frontend: { condition: service_healthy }
healthcheck: # Nginx 自身健康檢查
test: ["CMD", "curl", "-f", "http://localhost/health"]
interval: 30s; timeout: 10s; retries: 3
networks: [webnet]
laravel:
build: { context: ., dockerfile: Dockerfile.laravel } # 使用自定義 Laravel Dockerfile
container_name: staysync_laravel
environment: # 關鍵環境變數
- APP_KEY=...
- JWT_SECRET=...
- DB_HOST=mysql
- RABBITMQ_HOST=rabbitmq
- FASTAPI_PRICING_URL=http://fastapi:8000 # 內部 FastAPi 服務網址
volumes:
- ./backend:/var/www/html # 掛載後端程式碼
depends_on:
mysql: { condition: service_healthy }
redis: { condition: service_healthy }
rabbitmq: { condition: service_healthy }
healthcheck: # Laravel 服務健康檢查
test: ["CMD", "curl", "-f", "http://localhost:9000/health"] # 檢查 PHP-FPM 端口
interval: 30s; timeout: 10s; retries: 3
networks: [webnet]
fastapi:
build: { context: ., dockerfile: Dockerfile.fastapi } # 使用自定義 FastAPI Dockerfile
container_name: staysync_fastapi
environment: # 關鍵環境變數
- RABBITMQ_HOST=rabbitmq
- INFLUXDB_URL=http://influxdb:8086
- MQTT_BROKER_HOST=emqx
- INFURA_URL=http://ganache:8545 # 本地 Ganache 網址
volumes:
- ./fastapi/app:/app/app # 掛載 FastAPI 程式碼
depends_on:
redis: { condition: service_healthy }
rabbitmq: { condition: service_healthy }
influxdb: { condition: service_healthy }
emqx: { condition: service_healthy }
ganache: { condition: service_healthy }
healthcheck: # FastAPI 服務健康檢查
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s; timeout: 10s; retries: 3
networks: [webnet]
frontend:
build: { context: ./frontend, dockerfile: Dockerfile } # 前端 Dockerfile
container_name: staysync_frontend
ports: ["3000:3000"] # 開發模式端口
volumes:
- ./frontend:/app # 掛載前端程式碼
- /app/node_modules # 確保 node_modules 在容器內
command: npm run dev -- --host 0.0.0.0 # 啟動開發伺服器
healthcheck: # 前端開發伺服器健康檢查
test: ["CMD", "curl", "-f", "http://localhost:3000"]
interval: 30s; timeout: 10s; retries: 3
networks: [webnet]
mysql: # MySQL 資料庫服務
image: mysql:8.0
container_name: staysync_mysql
environment: { MYSQL_ROOT_PASSWORD: secret, MYSQL_DATABASE: staysync }
volumes: [mysql_data:/var/lib/mysql]
healthcheck: { test: ["CMD", "mysqladmin", "ping", "-h", "localhost"], interval: 10s, timeout: 5s, retries: 3 }
networks: [webnet]
redis: # Redis 快取和訊息佇列服務
image: redis:7.0-alpine
container_name: staysync_redis
healthcheck: { test: ["CMD", "redis-cli", "ping"], interval: 30s, timeout: 10s, retries: 3 }
networks: [webnet]
rabbitmq: # RabbitMQ 訊息佇列服務
image: rabbitmq:3-management
container_name: staysync_rabbitmq
ports: ["5672:5672", "15672:15672"] # AMQP 和管理界面端口
healthcheck: { test: ["CMD", "rabbitmqctl", "status"], interval: 30s, timeout: 10s, retries: 3 }
networks: [webnet]
influxdb: # InfluxDB 時序資料庫
image: influxdb:2.7
container_name: staysync_influxdb
environment: { DOCKER_INFLUXDB_INIT_MODE: setup, DOCKER_INFLUXDB_INIT_USERNAME: admin, DOCKER_INFLUXDB_INIT_PASSWORD: secret, DOCKER_INFLUXDB_INIT_ORG: staysync, DOCKER_INFLUXDB_INIT_BUCKET: iot, DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: your-token }
ports: ["8086:8086"]
volumes: [influxdb_data:/var/lib/influxdb2]
healthcheck: { test: ["CMD", "curl", "-f", "http://localhost:8086/health"], interval: 30s, timeout: 10s, retries: 3 }
networks: [webnet]
emqx: # EMQX MQTT Broker
image: emqx/emqx:latest
container_name: staysync_emqx
ports: ["1883:1883", "8083:8083", "18083:18083"] # MQTT, WebSocket, Dashboard 端口
healthcheck: { test: ["CMD", "curl", "-f", "http://localhost:18083/status"], interval: 30s, timeout: 10s, retries: 3 }
networks: [webnet]
ganache: # 本地區塊鏈模擬器
image: trufflesuite/ganache-cli:latest
container_name: staysync_ganache
command: ["--port", "8545", "--accounts", "10", "--defaultBalanceEther", "100", "--mnemonic", "candy maple cake sugar pudding cream honey rich smooth crumble sweet treat"]
ports: ["8545:8545"]
healthcheck: { test: ["CMD", "curl", "-f", "http://localhost:8545"], interval: 30s, timeout: 10s, retries: 3 }
networks: [webnet]
volumes:
mysql_data:
influxdb_data:
networks:
webnet: { driver: bridge }
重點說明:
服務定義:每個服務都包含明確的
image
或build
指令、container_name
、ports
和volumes
配置。depends_on
:確保服務依賴關係,例如 Laravel 只有在 MySQL、Redis 和 RabbitMQ 啟動並健康後才會啟動。healthcheck
:關鍵的健康檢查配置,確保容器不僅是啟動,而且內部應用程式也正常響應。這對於 CI/CD 和生產環境的穩定性至關重要。網路 (
networks
):所有服務都位於同一個webnet
中,允許它們透過服務名稱互相通訊(例如laravel
可以直接連接mysql
)。
4. Laravel 租戶與權限設計
Laravel 作為核心後端,實現了多租戶和 RBAC,確保數據安全和權限精細控制。
4.1 多租戶架構 (Row-based Isolation)
StaySync 採用 stancl/tenancy
套件實現基於數據列的多租戶隔離。這表示每個租戶的數據會透過一個 tenant_id
欄位存儲在同一組資料表中。
核心概念:
Tenant
Model:定義租戶的基本資訊,如名稱、子域名。TenantScope
:一個 Eloquent 全域作用域,自動將所有查詢限制在當前租戶的tenant_id
下。
程式碼範例:App\Models\CultureContent.php
中的 TenantScope
<?php
namespace App\Models;
use Illuminate\Database\Eloquent\Factories\HasFactory;
use Illuminate\Database\Eloquent\Model;
use App\Scopes\TenantScope; // 引入 TenantScope
class CultureContent extends Model
{
use HasFactory;
protected $fillable = [
'tenant_id', 'title', 'content', 'language', 'category', 'image_url'
];
// boot 方法會在模型啟動時調用
protected static function booted()
{
// 將 TenantScope 添加到模型上,確保所有查詢自動包含 tenant_id
static::addGlobalScope(new TenantScope);
}
public function tenant()
{
return $this->belongsTo(Tenant::class);
}
}
技術說明:TenantScope
的應用極大地簡化了多租戶數據過濾的複雜性,使得開發者無需在每個查詢中重複編寫 where('tenant_id', ...)
條件,提高了程式碼的可讀性和可維護性。stancl/tenancy
還提供了自動識別租戶(例如透過子域名)並初始化租戶環境的能力,確保了請求上下文中的數據隔離。
4.2 基於角色的訪問控制 (RBAC)
StaySync 使用 spatie/laravel-permission
套件實現 RBAC,並將其與多租戶結合,實現租戶級別的權限管理。
核心概念:
角色 (Roles):如
tenant_admin
,property_manager
,guest_user
。權限 (Permissions):原子性的操作,如
manage:users
,manage:bookings
。RoleModuleBinding
:自定義模型,用於更細粒度地定義每個角色在特定模組中擁有的權限(例如bookings
模組可以有read
,write
權限)。
程式碼範例:backend/database/seeders/RolesAndPermissionsSeeder.php
<?php
namespace Database\Seeders;
use Illuminate\Database\Seeder;
use Spatie\Permission\Models\Role;
use Spatie\Permission\Models\Permission;
use App\Models\RoleModuleBinding;
use Stancl\Tenancy\Facades\Tenancy; // 確保 Tenancy 門面可用
class RolesAndPermissionsSeeder extends Seeder
{
public function run(): void
{
app()[\Spatie\Permission\PermissionRegistrar::class]->forgetCachedPermissions();
// 獲取當前租戶 ID,確保角色和權限是租戶專屬的
$currentTenantId = Tenancy::tenant()->id;
// 創建權限,並與租戶 ID 綁定
$permissions = [
'manage:users', 'manage:roles', 'manage:properties',
'manage:bookings', 'manage:iot', 'manage:seo', 'manage:culture',
'view:reports',
];
foreach ($permissions as $permissionName) {
Permission::updateOrCreate(
['name' => $permissionName, 'guard_name' => 'web', 'tenant_id' => $currentTenantId], []
);
}
// 創建角色,並與租戶 ID 綁定
$adminRole = Role::updateOrCreate(
['name' => 'tenant_admin', 'guard_name' => 'web', 'tenant_id' => $currentTenantId], []
);
$managerRole = Role::updateOrCreate(
['name' => 'property_manager', 'guard_name' => 'web', 'tenant_id' => $currentTenantId], []
);
// ... 其他角色
// 將所有權限分配給 tenant_admin 角色
$adminRole->givePermissionTo(Permission::where('tenant_id', $currentTenantId)->get());
// 為 property_manager 角色分配特定權限
$managerRole->givePermissionTo([
'manage:properties', 'manage:bookings', 'manage:iot', 'manage:culture', 'view:reports',
]);
// 定義 RoleModuleBinding,提供更細粒度的模組級權限
$modules = [
'bookings' => ['read' => true, 'write' => true],
'iot' => ['read' => true, 'control' => true],
// ... 其他模組的權限
];
foreach ($modules as $moduleName => $modulePermissions) {
RoleModuleBinding::updateOrCreate(
['role_id' => $adminRole->id, 'module' => $moduleName, 'tenant_id' => $currentTenantId],
['permissions' => $modulePermissions]
);
}
}
}
程式碼範例:backend/app/Http/Controllers/BookingController.php
中間件使用
<?php
namespace App\Http\Controllers;
use App\Models\Booking;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Log;
use Illuminate\Validation\ValidationException;
class BookingController extends Controller
{
public function __construct()
{
// 確保只有經過 Sanctum 認證且具備 'manage:bookings' 權限的用戶可以訪問此控制器
$this->middleware(['auth:sanctum', 'permission:manage:bookings']);
}
// ... store, update, delete 等方法
}
技術說明:spatie/laravel-permission
庫的優勢在於其高度的可擴展性,允許我們定義靈活的角色和權限。透過在角色和權限模型中加入 tenant_id
,我們能確保每個租戶擁有獨立的權限體系。RoleModuleBinding
則是一個額外的抽象層,用於將角色權限與特定的業務模組(如預訂、IoT)聯繫起來,提供更為細粒度的控制,這在大型多功能 SaaS 平台中非常有用。
5. FastAPI IoT 模組
FastAPI 作為 IoT 微服務的核心,負責與 MQTT Broker (EMQX) 通訊,並透過 WebSocket 提供即時設備狀態更新給前端。
程式碼範例:fastapi/app/routers/iot_router.py
from fastapi import APIRouter, WebSocket, HTTPException
from paho.mqtt import client as mqtt_client # MQTT 客戶端庫
import asyncio
import json
import os
from tenacity import retry, stop_after_attempt, wait_fixed # 引入 tenacity 進行重試
router = APIRouter()
# 從環境變數獲取 MQTT Broker 配置
mqtt_broker = os.getenv("MQTT_BROKER_HOST", "emqx")
mqtt_port = int(os.getenv("MQTT_BROKER_PORT", 1883))
mqtt_client_id = f"staysync-iot-backend-{os.getpid()}"
# 健康檢查端點
@router.get("/health")
async def health_check():
"""
返回 FastAPI 服務的健康狀態。
"""
return {"status": "ok", "service": "FastAPI IoT"}
@router.post("/control")
async def control_device(data: dict):
"""
接收來自前端或後端的設備控制指令,並透過 MQTT 發布。
範例 payload: {"device_id": "light-1", "command": "turn_on", "value": {}}
"""
device_id = data.get("device_id")
command = data.get("command")
value = data.get("value", {})
if not device_id or not command:
raise HTTPException(status_code=400, detail="Missing device_id or command")
try:
client = mqtt_client.Client(mqtt_client_id)
client.connect(mqtt_broker, mqtt_port, 60) # 連接 MQTT Broker,60秒超時
# 發布控制指令到特定的設備主題
client.publish(f"staysync/iot/{device_id}/control", json.dumps({"command": command, "value": value}))
client.disconnect()
return {"status": "success", "message": f"Command '{command}' sent to device '{device_id}'"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to send command: {str(e)}")
@router.websocket("/ws/status")
async def websocket_endpoint(websocket: WebSocket):
"""
WebSocket 端點,用於向前端實時推送 IoT 設備狀態更新。
訂閱 MQTT 主題,並將收到的訊息轉發到 WebSocket 客戶端。
"""
await websocket.accept() # 接受 WebSocket 連線
print("WebSocket connection accepted.")
mqtt_ws_client_id = f"ws-iot-client-{os.getpid()}"
client = mqtt_client.Client(mqtt_ws_client_id)
def on_message(client, userdata, msg):
"""
當收到 MQTT 訊息時的回調函數。
將 MQTT 訊息解析後,透過 WebSocket 發送到前端。
"""
try:
# 解析 MQTT 訊息的 payload
status_data = json.loads(msg.payload.decode('utf-8'))
device_id_from_topic = msg.topic.split("/")[-2] # 從主題中提取 device_id
# 構建要發送到前端的 JSON 數據
asyncio.run(websocket.send_json({
"type": "iot_status_update",
"device_id": device_id_from_topic,
"status": status_data # 直接傳遞狀態數據,可以是字符串或對象
}))
print(f"Sent via WebSocket: Device {device_id_from_topic} status: {status_data}")
except json.JSONDecodeError:
print(f"Error decoding JSON from MQTT: {msg.payload.decode('utf-8')}")
except Exception as e:
print(f"Error processing MQTT message: {e}")
client.on_message = on_message
try:
# 連接 MQTT Broker 並訂閱狀態主題
client.connect(mqtt_broker, mqtt_port, 60)
client.subscribe("staysync/iot/+/status") # 訂閱所有設備的狀態更新
client.loop_start() # 在後台線程啟動 MQTT 循環處理
while True:
await asyncio.sleep(1) # 保持 WebSocket 連線活躍
except Exception as e:
print(f"WebSocket or MQTT client error: {e}")
finally:
# WebSocket 關閉時停止 MQTT 客戶端循環並斷開連接
client.loop_stop()
client.disconnect()
print("WebSocket connection closed and MQTT client disconnected.")
技術說明:FastAPI 的非同步特性使其成為處理即時 IoT 通訊的理想選擇。我們利用 paho-mqtt
庫連接 EMQX (MQTT Broker),實現設備控制指令的發布和狀態訊息的訂閱。透過 /ws/status
WebSocket 端點,FastAPI 能將從 MQTT 接收到的即時狀態更新廣播到所有連接的前端客戶端,實現流暢的用戶體驗。這種設計模式確保了 IoT 數據流的高效和低延遲。
6. RabbitMQ 實作:事件驅動通訊
RabbitMQ 作為 Laravel 與 FastAPI 微服務間的異步通訊核心。當 Laravel 產生重要業務事件時,會發布訊息到 RabbitMQ,FastAPI 服務則會訂閱並消費這些訊息以執行後續邏輯。
6.1 Laravel 發布事件
Laravel 透過其內建的事件系統 (Events & Listeners) 來觸發訊息發布。
程式碼範例:backend/app/Events/BookingCreated.php
<?php
namespace App\Events;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
use App\Models\Booking;
class BookingCreated
{
use Dispatchable, InteractsWithSockets, SerializesModels;
public $booking; // 包含預訂資料
public function __construct(Booking $booking)
{
$this->booking = $booking;
}
// 這裡可以定義事件的廣播通道 (如果需要 WebSocket 廣播)
// public function broadcastOn(): array
// {
// return [
// new PrivateChannel('bookings.' . $this->booking->tenant_id),
// ];
// }
}
程式碼範例:backend/app/Listeners/SendBookingConfirmation.php
(假設發送訊息到 RabbitMQ)
<?php
namespace App\Listeners;
use App\Events\BookingCreated;
use Illuminate\Contracts\Queue\ShouldQueue; // 如果監聽器需要異步處理
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Facades\Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class SendBookingConfirmation implements ShouldQueue // 讓此監聽器進入隊列異步執行
{
use InteractsWithQueue;
public function handle(BookingCreated $event): void
{
Log::info('BookingCreated event received, attempting to send to RabbitMQ.');
$connection = null;
try {
// 建立 RabbitMQ 連接
$connection = new AMQPStreamConnection(
env('RABBITMQ_HOST', 'rabbitmq'),
env('RABBITMQ_PORT', 5672),
env('RABBITMQ_LOGIN', 'guest'),
env('RABBITMQ_PASSWORD', 'guest'),
env('RABBITMQ_VHOST', '/')
);
$channel = $connection->channel();
// 宣告一個 Topic Exchange
$exchangeName = 'booking_events';
$channel->exchange_declare($exchangeName, 'topic', false, true, false);
// 定義路由鍵 (routing key)
$routingKey = 'booking.created'; // 例如:booking.created, booking.cancelled
// 準備訊息內容
$messageBody = json_encode($event->booking->toArray()); // 將預訂數據轉換為 JSON
$msg = new AMQPMessage($messageBody, ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 發布訊息
$channel->basic_publish($msg, $exchangeName, $routingKey);
Log::info("Message published to RabbitMQ: {$routingKey} - " . $messageBody);
$channel->close();
$connection->close();
} catch (\Exception | \PhpAmqpLib\Exception\AMQPProtocolChannelException | \PhpAmqpLib\Exception\AMQPTimeoutException $e) {
Log::error("Failed to send booking confirmation to RabbitMQ: " . $e->getMessage());
// 可以在這裡處理重試邏輯,或將失敗訊息記錄到資料庫
if ($connection) {
$connection->close();
}
}
}
}
技術說明:事件驅動架構的優勢在於其卓越的解耦能力和擴展性。透過 Laravel 的事件系統和 ShouldQueue
介面,我們可以將耗時的訊息發布操作異步化,避免阻塞用戶請求。php-amqplib
庫用於直接與 RabbitMQ 交互,發布到 Topic Exchange
允許消費者根據精確的 routing key
訂閱感興趣的事件,實現靈活的事件路由。
6.2 FastAPI 消費訊息
FastAPI 微服務會訂閱 RabbitMQ 的相關 Topic,並在收到訊息後執行相應的邏輯,例如動態定價或日誌記錄。
程式碼範例 (概念性):FastAPI 消費 RabbitMQ 訊息
# fastapi/app/services/price_consumer.py (或類似檔案)
import asyncio
import aio_pika # 非同步 RabbitMQ 客戶端
import json
import os
async def consume_booking_events():
"""
FastAPI 服務中消費 RabbitMQ 預訂事件的範例。
"""
rabbitmq_host = os.getenv("RABBITMQ_HOST", "rabbitmq")
rabbitmq_port = int(os.getenv("RABBITMQ_PORT", 5672))
rabbitmq_user = os.getenv("RABBITMQ_USER", "guest")
rabbitmq_password = os.getenv("RABBITMQ_PASSWORD", "guest")
connection = None
try:
connection = await aio_pika.connect_robust(
f"amqp://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_host}:{rabbitmq_port}/"
)
async with connection:
channel = await connection.channel()
# Declare the topic exchange
exchange = await channel.declare_exchange('staysync.events', 'topic', durable=True)
# Declare queue for price updates and bind to booking.created events
queue = await channel.declare_queue('price.queue', durable=True)
await queue.bind(exchange, routing_key='booking.created')
print("Started consuming 'booking.created' events from RabbitMQ.")
async for message in queue:
async with message.process():
try:
event_data = json.loads(message.body.decode())
event_type = event_data.get('event_type')
payload = event_data.get('payload', {})
if event_type == 'booking.created':
booking_id = payload.get('booking_id')
property_id = payload.get('property_id')
tenant_id = payload.get('tenant_id')
# ... 更多處理邏輯,例如調用 ML 模型進行動態定價
print(f" [x] Received booking: {booking_id} for property {property_id} (Tenant: {tenant_id})")
# 在這裡可以模擬一個耗時的ML模型調用
await asyncio.sleep(1)
# 處理完成後,可以發布一個新的事件,例如 `price.updated`
# await exchange.publish(...)
else:
print(f"Received unknown event type: {event_type}. Message body: {message.body.decode()}")
except json.JSONDecodeError:
print(f"Failed to decode JSON from RabbitMQ message: {message.body.decode()}")
except Exception as e:
print(f"Error processing RabbitMQ message: {e}. Message body: {message.body.decode()}")
# 建議在生產環境中進行更精細的錯誤處理,例如將訊息 NACK 並重新入隊
# message.nack(requeue=True)
except Exception as e:
print(f"RabbitMQ consumer startup failed: {e}")
# 在生產環境中,這裡應該實作適當的重試機制或警報。
finally:
if connection:
await connection.close()
# 在 FastAPI main.py 啟動時調用此消費者 (在真實應用中可能放在 app.on_event("startup"))
# @app.on_event("startup")
# async def startup_event():
# asyncio.create_task(consume_booking_events())
技術說明:FastAPI 利用 aio_pika
實現非同步的 RabbitMQ 訊息消費。這種模式確保了微服務能夠高效地並行處理訊息,而不會阻塞主事件循環。透過定義持久化隊列並綁定到特定的 routing key
,FastAPI 服務能夠可靠地接收並處理來自 Laravel 的事件,即使在服務重啟後也能從上次中斷的地方繼續處理訊息,保證數據一致性。
7. 常見問題與解決方案
在實際開發與部署 StaySync 這樣複雜的微服務系統時,會遇到許多挑戰。
7.1 常見問題與除錯技巧
環境變數未配置:
問題:
APP_KEY
,JWT_SECRET
,INFLUXDB_TOKEN
,WALLET_PRIVATE_KEY
等關鍵環境變數未設置或設置錯誤。解決方案:確保
.env
檔案存在且所有變數已正確配置。對於 Laravel 的APP_KEY
和JWT_SECRET
,務必在 Laravel 容器啟動後執行docker-compose exec laravel php artisan key:generate
和docker-compose exec laravel php artisan jwt:secret
。對於其他服務,檢查其各自的 Dockerfile 和啟動指令是否正確讀取環境變數。
Docker 服務健康檢查失敗:
問題:
docker-compose ps
顯示某個服務狀態為unhealthy
。解決方案:使用
docker-compose logs <服務名>
命令查看該服務的詳細日誌,這是診斷問題的第一步。常見原因包括程式碼錯誤、依賴服務未啟動、端口衝突或環境變數配置不正確。針對具體錯誤訊息進行分析。
Nginx 子域名無法訪問:
問題:無法透過
tenanta.localhost
等自定義子域名訪問應用。解決方案:
Host 檔案檢查:確認本地電腦的
/etc/hosts
(Linux/macOS) 或C:\Windows\System32\drivers\etc\hosts
(Windows) 已添加正確的127.0.0.1
映射,例如127.0.0.1 tenanta.localhost
。Nginx 配置檢查:檢查
docker/nginx/nginx.conf
中的server_name
指令是否包含所有預期的域名和通配符子域名 (例如*.localhost
)。同時,確認proxy_pass
指令指向了正確的後端服務地址和端口(例如http://laravel:9000
)。Nginx 日誌:查看 Nginx 容器的日誌 (
docker-compose logs nginx
),錯誤日誌通常會指出配置問題或上游服務連接問題。
Laravel 遷移或種子數據失敗:
問題:執行
php artisan migrate --seed
時遇到資料庫錯誤。解決方案:
資料庫容器狀態:確保
mysql
容器正在運行且健康。遷移檔案審查:仔細檢查所有遷移檔案的語法、欄位類型、約束條件以及外鍵定義。特別注意多租戶相關的
tenant_id
欄位是否存在。依賴順序:確保遷移檔案的執行順序正確,例如
users
表應在tenants
表之前創建(如果users
表依賴tenants
)。日誌:查看 Laravel 應用程式日誌 (
storage/logs/laravel.log
或docker-compose logs laravel
) 獲取更詳細的錯誤堆棧。
WebSocket 連線問題:
問題:前端無法接收 IoT 實時更新,或 WebSocket 連線建立失敗。
解決方案:
FastAPI WebSocket 端點:檢查 FastAPI
iot_router.py
中的/ws/status
WebSocket 端點是否正確實現,特別是websocket.accept()
和websocket.send_json()
。Nginx WebSocket 代理:確保
docker/nginx/nginx.conf
中location /ws/
塊正確配置了 WebSocket 代理頭部,如proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade";
。瀏覽器開發者工具:使用瀏覽器的開發者工具 (Network Tab) 監控 WebSocket 連線狀態。檢查連線是否成功建立 (狀態碼 101 Switching Protocols),以及是否有錯誤訊息或斷開事件。
7.2 測試策略與實踐
為確保 StaySync 平台的穩定性、可靠性和可維護性,實施全面的測試策略至關重要。
單元測試 (Unit Tests):
目標:驗證程式碼中最小可測試單元的正確性,例如函式、方法、類別。
實踐:
Laravel (
PHPUnit
):對控制器中的業務邏輯、模型的方法、服務層的邏輯、事件的觸發和監聽器的行為進行測試。例如,測試BookingController
的store
方法是否正確創建預訂,並發布事件。FastAPI (
pytest
):針對每個微服務的路由處理函式、內部服務邏輯和外部 API(如與 InfluxDB、EMQX 的互動)進行測試。模擬外部依賴,確保核心邏輯的隔離測試。Vue 3 (
Vitest
或Vue Test Utils
):對前端 Vue 組件的行為、計算屬性、生命週期掛鉤和狀態管理進行測試。確保 UI 響應用戶互動是否符合預期。
整合測試 (Integration Tests):
目標:驗證不同模組或服務之間協同工作的正確性。
實踐:
後端整合:測試 Laravel 後端與 MySQL 資料庫、Redis 快取、RabbitMQ 訊息佇列之間的互動。例如,發送一個 HTTP 請求到 Laravel API,驗證數據是否正確寫入 MySQL,並且相應的訊息是否被發布到 RabbitMQ。
微服務整合:測試 FastAPI 與 InfluxDB、EMQX、Ganache 之間的互動。例如,模擬一個 IoT 設備發送數據到 EMQX,驗證 FastAPI 是否能接收、處理並將數據寫入 InfluxDB。
事件流測試:這是 StaySync 的核心,需要測試端到端事件流的正確性:Laravel 發布事件 -> RabbitMQ 接收 -> FastAPI 消費 -> 執行業務邏輯 -> (可能)發布新事件。
端到端 (E2E) 測試 (End-to-End Tests):
目標:從用戶的角度測試整個應用程式流程,模擬真實用戶行為,涵蓋所有層面 (前端 UI、API 網關、各後端服務和資料庫)。
實踐:使用 Cypress 或 Playwright 等自動化測試工具,模擬用戶在瀏覽器中的點擊、輸入和導航,驗證從登錄到完成一個預訂,或控制一個 IoT 設備的完整業務流程是否順暢。這有助於發現集成層面的問題。
性能測試 (Performance Tests):
目標:評估系統在高併發和高負載下的響應時間、吞吐量和穩定性。
實踐:使用 Apache JMeter 或 Locust 等工具,模擬大量並發用戶對 API 網關和關鍵業務 API(例如訂房 API、IoT 控制 API)發送請求。監控 CPU、記憶體、資料庫連接等指標,找出潛在的性能瓶頸。
多租戶測試 (Multi-tenancy Tests):
目標:這是 SaaS 平台的關鍵,確保不同租戶的數據絕對隔離,且權限控制正確。
實踐:
數據隔離驗證:為多個租戶創建測試數據,然後以不同租戶的身份登錄,驗證每個租戶只能看到自己的數據,無法訪問或修改其他租戶的數據。
權限驗證:為每個租戶角色(如
tenant_admin
,property_manager
,guest_user
)編寫獨立的測試用例,驗證其是否只能執行其被授權的操作,而無權執行其他操作。跨租戶操作:如果系統有中央管理功能(例如 SaaS 營運商對所有租戶進行數據分析或維護),需要測試這些操作是否能在不破壞隔離的前提下正確執行。
安全測試 (Security Tests):
目標:識別應用程式中的安全漏洞,如 SQL 注入、XSS、CSRF、未授權訪問等。
實踐:採用靜態應用程式安全測試 (SAST) 和動態應用程式安全測試 (DAST) 工具。進行權限提升測試、數據篡改測試等。
希望這份詳細的技術指南能夠幫助您更好地理解 StaySync 專案的實作細節和背後的設計理念。
沒有留言:
張貼留言