讓顧客心聲不再石沉大海:InsightFlow 專案的實用解方分享
哈囉,各位軟體同好們。
最近花了一點時間,整理了一個叫做「InsightFlow」的專案。說「小」,其實也揉合了前後端、AI 處理還有容器化部署,不敢說是什麼驚天動地的創新,但實際跑起來,對於處理顧客回饋,確實能省下不少力氣。想說既然都整理了,不如把一些心得和大家分享一下,或許對正在為類似問題煩惱的朋友,能提供一些參考。
本專案的所有程式碼已開源並託管於 GitHub:
為什麼會有 InsightFlow?解決什麼痛點?
在業界打滾久了,大家應該都遇過類似的狀況:顧客的聲音很多元,有的直接、有的隱晦,甚至有些可能只是一兩句話的牢騷。如果量體一大,要人工去篩選、歸納,不僅耗時耗力,還可能漏掉一些重要的洞察。市面上當然有許多專業工具,但有時候,我們只是需要一個能快速、自動化處理這些回饋的「基礎建設」,把最關鍵的資訊提取出來,後續再交由人工去細化判斷。
InsightFlow 就是為了解決這個痛點而生的。它不是什麼高深莫測的魔法,就是紮紮實實地把 AI 分析的流程給自動化了,讓顧客的回饋文字,能夠被有效率地轉換成摘要、情緒分數、關鍵字,甚至是意圖判斷和建議。聽起來好像有點厲害?其實拆開來看,都是大家熟悉的老朋友。
專案裡有什麼?架構與技術選型深度剖析
我們在建構這個專案時,主要考量就是「穩定」和「好維護」。畢竟東西做得再花俏,不能穩定運行,後續維護又費勁,那意義就不大了。所以,我們選擇了大家比較熟悉的技術棧,並採用了微服務的思路:
- 前端(Vue.js):這是和使用者直接互動的介面。用 Vue 寫起來輕巧、效率高,能快速打造一個乾淨俐落的儀表板。Vue 3 搭配 Pinia 作為狀態管理,讓資料流向更清晰。使用者只要輸入文字,點個按鈕,任務就提交出去了。狀態更新?前端會透過定時輪詢 (polling) 的方式,默默地每幾秒鐘檢查一次任務狀態,完成後就直接秀結果,省去你一直重新整理的麻煩。
- 後端(Laravel):扮演著專案的「管理中心」和 API Gateway。所有前端的請求都會先到這裡。它負責接收任務、把任務資料存進資料庫(MySQL),然後,重點來了,它會利用 Laravel 的 Queue 系統,把分析任務「丟」到 Redis 這個「任務等待區」裡。這麼做的好處是,後端不用等 AI 處理完才回應,可以很快地告訴前端「你的任務我收到了!」,實現請求/響應解耦,避免前端請求超時。
- AI 工作處理器(FastAPI):這才是真正做「粗活」的地方。它會一直盯著 Redis 的任務等待區,一有任務進來,就立刻把它領走,然後呼叫內部設計好的 AI 模組進行分析(比如摘要、情緒分析等等)。處理完後,再悄悄地透過 HTTP 內部請求 把結果送回 Laravel 那邊去更新資料庫。之所以選 FastAPI,是因為它基於 ASGI,處理 Python 的 AI 任務很拿手,效能也高,且內建 Swagger UI 方便 API 測試。
- Redis 佇列:可以想像成一個超有效率的「任務中轉站」。Laravel 把任務丟進去,FastAPI 從另一頭拿出來,兩邊互不干擾,大大提升了處理的彈性。這也為未來擴展多個 AI Worker 實例做好了準備。
- MySQL 資料庫:就是負責把所有任務的原始資料和分析結果好好地保存下來。
你看,這樣分工合作,每個環節都專注做自己的事,整個系統的穩定性就上來了。
核心設計理念:AI 模組的「換頭術」與異步處理的實踐
講到 AI 模組,大家可能都會想到什麼 GPT、HuggingFace 之類的。我們在設計時,特別考慮到未來可能會有不同的 AI 模型出現,或者現有的模型需要升級替換。所以,FastAPI 的 AI 處理器,我們採用了一種叫做**「策略模式 (Strategy Pattern)」**的設計。
簡單來說,就是把不同的 AI 功能(摘要、情緒分析)設計成一個個獨立的「模組」,它們都遵守一套共同的「遊戲規則」(例如都有一個 analyze
或 summarize
方法)。將來不管你想換成 GPT-4、LLaMA,還是自己訓練的某個特定模型,只要它符合這套規則,你不需要修改核心的處理邏輯,只需要把新模組「插」進去,系統就能直接識別並使用。這樣一來,系統的擴展性和維護彈性就非常好了。這部分其實蠻值得深究的,但我知道大家都很務實,直接看程式碼會更有感。
核心程式碼片段:
1. Laravel 任務分發:app/Jobs/ProcessAnalysisTask.php
Laravel 後端收到前端的分析請求後,會透過控制器將請求封裝成一個 Job,然後 dispatch 到 Laravel 自己的 Queue 系統。這個 Job 在 handle
方法中,會將任務資訊推送到 Redis 佇列,供 FastAPI Worker 消費。
// laravel-backend/app/Jobs/ProcessAnalysisTask.php
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Redis;
use App\Models\AnalysisTask;
use Illuminate\Support\Facades\Log;
class ProcessAnalysisTask implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $taskId;
protected $textContent;
public function __construct(string $taskId, string $textContent)
{
$this->taskId = $taskId;
$this->textContent = $textContent;
}
public function handle(): void
{
$task = AnalysisTask::where('uuid', $this->taskId)->first();
if (!$task) {
Log::error("Task {$this->taskId} not found for dispatch to FastAPI worker.");
return;
}
try {
// 更新任務狀態為排隊中,通知前端這個任務已進入處理流程
$task->update(['status' => 'queued_for_worker']);
// 將任務資料推送到 Redis 佇列,供 FastAPI Worker 消費
// env('REDIS_QUEUE_NAME_FASTAPI', 'fastapi_analysis_queue') 從 .env 讀取佇列名稱
Redis::rpush(env('REDIS_QUEUE_NAME_FASTAPI', 'fastapi_analysis_queue'), json_encode([
'task_id' => $this->taskId,
'text_content' => $this->textContent,
]));
Log::info("Task {$this->taskId} successfully pushed to FastAPI Redis queue.");
} catch (\Exception $e) {
// 任務分派失敗的處理
$task->update([
'status' => 'failed_dispatch',
'result' => ['error' => 'Failed to dispatch to worker queue', 'details' => $e->getMessage()],
]);
Log::error("Failed to push task {$this->taskId} to FastAPI Redis queue: " . $e->getMessage());
}
}
}
2. FastAPI Worker 核心消費邏輯:fastapi-worker/app/consumer.py
FastAPI 的 consumer.py
會作為一個獨立的程式運行,持續阻塞式地從 Redis 佇列中獲取任務。它使用了 redis.Redis
客戶端,並透過 brpop
(Blocking Right POP) 命令來實現阻塞式消費,這樣可以有效利用 CPU 資源,無需頻繁輪詢。
# fastapi-worker/app/consumer.py
import redis
import json
import logging
import os
from app.core.insight_flow_core import InsightFlowCore
from app.config import settings # 這裡會從環境變數讀取 Redis 配置
from app.models.request_models import AnalysisRequestPayload # Pydantic 模型
import requests
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 初始化 Redis 連線
r = redis.Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB)
# 初始化 InsightFlowCore,這裡是策略模式的載入點
# 根據 settings.py (由環境變數決定) 載入不同的 AI 模組
insight_flow = InsightFlowCore({
"summarizer_model_type": os.getenv("SUMMARIZER_MODEL_TYPE", "gpt_summarizer"), # 預設 GPT
"sentiment_model_type": os.getenv("SENTIMENT_MODEL_TYPE", "hf_sentiment_analyzer"), # 預設 HuggingFace
"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY"),
"HUGGINGFACE_API_TOKEN": os.getenv("HUGGINGFACE_API_TOKEN"),
"sentiment_model_name": os.getenv("HF_SENTIMENT_MODEL_NAME", "cardiffnlp/twitter-roberta-base-sentiment-latest"),
})
def update_laravel_task_status(task_id: str, status: str, result: dict = None):
# 這邊直接呼叫 Docker 內部服務名稱 `app`,不用外部 IP
# LARAVEL_INTERNAL_UPDATE_URL 確保 worker 知道如何找到 Laravel
laravel_update_url = os.getenv("LARAVEL_INTERNAL_UPDATE_URL", "http://app/api/internal/analysis/update")
payload = {"task_id": task_id, "status": status, "result": result}
try:
response = requests.post(laravel_update_url, json=payload, timeout=30)
response.raise_for_status() # 檢查 HTTP 請求是否成功 (2xx)
logger.info(f"Successfully updated Laravel for task {task_id} with status {status}")
except requests.exceptions.RequestException as e:
logger.error(f"Failed to update Laravel for task {task_id}: {e}")
def consume_tasks():
logger.info(f"FastAPI worker starting to consume tasks from Redis queue: {settings.REDIS_QUEUE_NAME}")
while True:
# 阻塞式地從 Redis 佇列右側取出一個任務,timeout=1 表示每秒檢查一次
task_data_raw = r.brpop(settings.REDIS_QUEUE_NAME, timeout=1)
if task_data_raw:
queue_name, task_json = task_data_raw
try:
payload = json.loads(task_json)
request_payload = AnalysisRequestPayload(**payload) # 用 Pydantic 驗證輸入
task_id = request_payload.task_id
text_content = request_payload.text_content
logger.info(f"Processing task: {task_id}")
# 通知 Laravel 任務狀態變為處理中
update_laravel_task_status(task_id, "processing")
analysis_output = insight_flow.process_customer_feedback(text_content) # 執行 AI 分析
# 將分析結果回傳給 Laravel
update_laravel_task_status(task_id, "completed", {"analysis_output": analysis_output})
logger.info(f"Task {task_id} completed successfully.")
except Exception as e:
logger.error(f"Error processing task {task_id}: {e}", exc_info=True)
# 處理失敗時也通知 Laravel
update_laravel_task_status(task_id, "failed", {"error": str(e), "details": "Worker processing failed"})
# 這是 worker 服務啟動時的入口點,在 Dockerfile 中會調用
if __name__ == "__main__":
consume_tasks()
3. FastAPI AI 模組設計:fastapi-worker/app/core/insight_flow_core.py
(策略模式實踐)
這個 InsightFlowCore
類別是整個 AI 處理的核心。它不直接包含 AI 模型的邏輯,而是作為一個「協調者」,根據配置動態地載入和使用不同的 AI 模組實例。每個模組(如 GPTSummarizer
, HFSentimentAnalyzer
)都遵循一個共同的介面或抽象類別,確保它們有相同的方法名稱(例如 summarize
或 analyze
),這樣 InsightFlowCore
就可以統一調用。
# fastapi-worker/app/core/insight_flow_core.py
import logging
from typing import Any, Dict
# 假設這些是您的 AI 模組實現,它們都繼承自一個抽象基類或有共同的方法
from app.modules.summarizers.gpt_summarizer import GPTSummarizer
from app.modules.sentiment_analyzers.hf_sentiment_analyzer import HFSentimentAnalyzer
from app.modules.keyword_extractors.nltk_keyword_extractor import NLTKKeywordExtractor
from app.modules.intent_recognizers.regex_intent_recognizer import RegexIntentRecognizer
from app.modules.recommenders.simple_rule_recommender import SimpleRuleRecommender
logger = logging.getLogger(__name__)
class InsightFlowCore:
def __init__(self, config: Dict[str, Any]):
# 這裡根據配置動態載入摘要模組
self.summarizer = self._load_module(
config.get("summarizer_model_type"), # 這裡會是 "gpt_summarizer" 或其他
{"gpt_summarizer": GPTSummarizer}, # 字典映射類型到實際類別
api_key=config.get("OPENAI_API_KEY")
)
# 載入情緒分析模組
self.sentiment_analyzer = self._load_module(
config.get("sentiment_model_type"),
{"hf_sentiment_analyzer": HFSentimentAnalyzer},
model_name=config.get("sentiment_model_name"),
hf_api_token=config.get("HUGGINGFACE_API_TOKEN") # 可能需要 HuggingFace token
)
# 其他模組,例如關鍵字提取、意圖辨識、推薦系統等,也是類似的載入邏輯
self.keyword_extractor = self._load_module(
config.get("keyword_extractor_type", "nltk_keyword_extractor"),
{"nltk_keyword_extractor": NLTKKeywordExtractor}
)
self.intent_recognizer = self._load_module(
config.get("intent_recognizer_type", "regex_intent_recognizer"),
{"regex_intent_recognizer": RegexIntentRecognizer}
)
self.recommender = self._load_module(
config.get("recommender_type", "simple_rule_recommender"),
{"simple_rule_recommender": SimpleRuleRecommender}
)
def _load_module(self, module_type: str, available_modules: Dict[str, Any], **kwargs):
"""通用模組載入器,實現策略模式彈性切換."""
if module_type in available_modules:
# 根據 module_type 實例化對應的模組類別
return available_modules[module_type](**kwargs)
else:
raise ValueError(f"Unsupported module type: {module_type}")
def process_customer_feedback(self, user_input: str) -> Dict[str, Any]:
"""處理顧客回饋,調用各個 AI 模組,並整合結果."""
try:
# 依序調用不同的分析模組
summary_output = self.summarizer.summarize(user_input)
sentiment_score = self.sentiment_analyzer.analyze(user_input)
keywords = self.keyword_extractor.extract(user_input)
intent = self.intent_recognizer.recognize(user_input)
# 推薦系統可能需要綜合前面分析的結果
recommendations = self.recommender.generate_recommendations(
summary=summary_output,
sentiment=sentiment_score,
keywords=keywords,
intent=intent
)
return {
"摘要": summary_output,
"推薦": recommendations,
"情緒分數": sentiment_score,
"關鍵字": keywords,
"意圖": intent,
}
except Exception as e:
logger.error(f"Error during feedback processing: {e}", exc_info=True)
raise # 重新拋出異常,讓上層調用者處理
4. 前端 Vue (Pinia) 狀態管理與 API 互動:vue-frontend/src/stores/analysis.js
前端使用 Pinia 進行狀態管理,負責發送請求給 Laravel API,並處理任務提交和狀態查詢。submitAnalysis
會發送 POST 請求,fetchTaskStatus
則負責定時 GET 請求,更新任務狀態。
// vue-frontend/src/stores/analysis.js
import { defineStore } from 'pinia';
import axios from 'axios';
// 從環境變數獲取 API 基礎 URL,確保開發與部署環境的彈性
const API_BASE_URL = import.meta.env.VITE_APP_API_URL || 'http://localhost:8000/api';
export const useAnalysisStore = defineStore('analysis', {
state: () => ({
currentTask: null, // 當前正在處理或剛提交的任務
tasks: [], // 所有歷史任務列表
loading: false, // 標示 API 請求是否正在進行
error: null, // 記錄錯誤訊息
}),
actions: {
/**
* 提交分析任務到 Laravel 後端
* @param {string} textContent 顧客回饋文本
* @returns {string} 任務 ID
*/
async submitAnalysis(textContent) {
this.loading = true;
this.error = null;
try {
const response = await axios.post(`${API_BASE_URL}/analysis/submit`, { text_content: textContent });
// 收到 Laravel 回應後,立即更新前端任務狀態為 pending
this.currentTask = {
id: response.data.task_id,
status: 'pending', // 初始狀態
input_data: textContent,
result: null,
createdAt: new Date().toISOString(),
};
this.tasks.unshift(this.currentTask); // 將新任務加到列表最前面
this.loading = false;
return this.currentTask.id;
} catch (err) {
this.error = '提交任務失敗: ' + (err.response?.data?.message || err.message);
this.loading = false;
console.error("Submit analysis error:", err);
throw err; // 拋出錯誤讓組件處理
}
},
/**
* 查詢指定任務的最新狀態
* @param {string} taskId 任務 ID
* @returns {object} 任務狀態與結果
*/
async fetchTaskStatus(taskId) {
// 這裡不設 loading = true 以免影響其他請求的 loading 狀態
try {
const response = await axios.get(`${API_BASE_URL}/analysis/${taskId}/status`);
const taskIndex = this.tasks.findIndex(t => t.id === taskId);
if (taskIndex !== -1) {
// 更新列表中對應任務的狀態和結果
this.tasks[taskIndex].status = response.data.status;
this.tasks[taskIndex].result = response.data.result;
this.tasks[taskIndex].input_data = response.data.input_data;
// 如果是當前任務,也更新 currentTask
if (this.currentTask && this.currentTask.id === taskId) {
this.currentTask.status = response.data.status;
this.currentTask.result = response.data.result;
}
}
return response.data;
} catch (err) {
this.error = '查詢任務狀態失敗: ' + (err.response?.data?.message || err.message);
console.error(`Fetch task status for ${taskId} error:`, err);
throw err;
}
},
},
});
實際跑起來看看?部署與常用指令
雖然這是一個教學專案,但我們也考量到大家想要快速上手的心情。所以,整個專案都用 Docker 容器化了。這代表什麼?你不需要在自己的電腦上安裝什麼 PHP、Python、Node.js,只要有 Docker,幾行指令就能把所有服務跑起來。
複製專案後,你只需要:
- 把專案根目錄下的
docker-compose.yml
和.env.example
檔案都看過一遍。 - 把
env.example
複製一份改名為.env
。 - 在
.env
檔案中,填入你的OPENAI_API_KEY
和HUGGINGFACE_API_TOKEN
(如果有的話)。切記,這個檔案不會被提交到版本控制,很安全。另外,LARAVEL_INTERNAL_UPDATE_URL
這個變數也很關鍵,它定義了 FastAPI Worker 如何在 Docker 內部找到 Laravel 服務。Ini, TOML# .env 範例 APP_NAME=InsightFlow APP_ENV=local APP_KEY= # 會由 make all 自動生成 APP_DEBUG=true APP_URL=http://localhost LOG_CHANNEL=stack LOG_DEPRECATIONS_CHANNEL=null LOG_LEVEL=debug DB_CONNECTION=mysql DB_HOST=mysql # Docker 內部服務名稱 DB_PORT=3306 DB_DATABASE=insight_flow DB_USERNAME=root DB_PASSWORD=root REDIS_HOST=redis # Docker 內部服務名稱 REDIS_PORT=6379 REDIS_CLIENT=phpredis REDIS_PASSWORD=null # 或你設定的密碼 # FastAPI Worker 會從這裡讀取任務 REDIS_QUEUE_NAME_FASTAPI=fastapi_analysis_queue # API Keys for AI Services (自行替換為你的真實金鑰) OPENAI_API_KEY="sk-YOUR_OPENAI_KEY" HUGGINGFACE_API_TOKEN="hf_YOUR_HF_TOKEN" # AI Model Configuration (可依需求調整,對應 InsightFlowCore 中的策略) SUMMARIZER_MODEL_TYPE=gpt_summarizer SENTIMENT_MODEL_TYPE=hf_sentiment_analyzer HF_SENTIMENT_MODEL_NAME=cardiffnlp/twitter-roberta-base-sentiment-latest # 範例模型 # Laravel Internal Update URL for FastAPI Worker to call back LARAVEL_INTERNAL_UPDATE_URL=http://app:80/api/internal/analysis/update # 注意這裡的 app:80 是 Docker 內部服務名稱和埠
- 最關鍵的一步:在專案根目錄下,敲入這行指令:
然後泡杯咖啡等一下,等它把所有服務都建置好、跑起來。這個指令會依序幫你建置 Docker 映像 (Bashmake all
docker-compose build
)、啟動所有容器 (docker-compose up -d
),並自動生成 Laravel 的APP_KEY
和執行資料庫遷移 (docker-compose exec app php artisan key:generate && docker-compose exec app php artisan migrate
)。
搞定之後,打開瀏覽器,連到 http://localhost:5173
,你就能看到前端介面了。隨便輸入一段顧客回饋文字,提交看看,體驗一下任務從提交到分析完成的整個過程。
常用 make
指令:
為了方便開發,我們在 Makefile
裡也包裝了一些常用指令,讓整個流程更順暢。
make build
:建構所有服務的 Docker 映像,只有當Dockerfile
或依賴有變動時才需要重新執行。make start
:啟動所有服務容器,並讓它們在後台運行。make stop
:停止所有服務容器。make down
:停止並移除所有服務容器和網路。make clean
:停止並徹底移除所有容器、網路、資料庫卷和映像檔,讓環境回到最乾淨的狀態。這對於清理測試環境很有用,可以從頭開始。make setup_laravel
:進入 Laravel 容器,執行php artisan key:generate
和php artisan migrate
,這在首次設定或資料庫需要重置時特別方便。make test_laravel
:執行 Laravel 後端的 PHPUnit 測試。make test_worker
:進入 FastAPI Worker 容器,執行 Pytest 測試。make test_frontend
:進入 Vue 前端容器,執行 ESLint 檢查,確保程式碼風格一致。
這些指令可以大大提高開發效率,減少手動輸入複雜 Docker 命令的麻煩。
遇到問題怎麼辦?
專案裡也盡量把日誌和錯誤處理機制給考慮進去了。如果任務處理失敗,Laravel 會記錄錯誤,FastAPI 也會把詳細的失敗原因回傳,方便大家排查問題。
你也可以直接訪問 FastAPI Worker 的 API 文件頁面 http://localhost:8001/docs
,看看健康檢查或同步測試的端點說明,有時候除錯會用得上。透過 docker-compose logs <service_name>
指令,也能查看各個服務的即時日誌,這對於定位問題非常有幫助。
一點小結
InsightFlow 專案是個麻雀雖小、五臟俱全的範例,它展示了如何透過微服務、異步處理(Laravel Queue + Redis)、模組化設計(FastAPI 策略模式)以及容器化部署,來構建一個高效且具備彈性的 AI 應用。當然,目前的 AI 模組還是「模擬」性質的,實際應用時需要對接真實的商業 API。但其背後的架構思路和實作細節,相信還是能給大家帶來一些啟發。
技術之路漫漫,能跟大家互相交流、分享,就是最大的收穫。希望這篇文章能為您的專案帶來一點點靈感。如果大家有什麼想法或建議,也歡迎不吝指教。
沒有留言:
張貼留言