import streamlit as st import requests import json from typing import Optional, List, Dict, Any import asyncio import threading from queue import Queue import time # 後端 API 端點 BACKEND_URL = "http://localhost:8000/generate" # 設定頁面為寬版,適合聊天室 st.set_page_config(page_title="RAG 聊天室", layout="wide") st.title("🤖 RAG 聊天室") # 初始化 session state if "messages" not in st.session_state: st.session_state.messages = [] # 側邊欄:設定 with st.sidebar: st.header("設定") backend_url = st.text_input("後端 URL", value=BACKEND_URL) mode_options = ["streaming", "progress"] selected_mode = st.selectbox("模式 (mode)", mode_options, index=0) # 預設 streaming 適合聊天 st.header("元數據 (metadata)") metadata_json = st.text_area( "JSON 格式 (可選): {'key': 'value'}", value="{}", height=100 ) try: metadata: Optional[Dict[str, Any]] = json.loads(metadata_json) if metadata_json.strip() else None except json.JSONDecodeError: st.error("metadata JSON 格式錯誤,請檢查!") metadata = None # 聊天歷史:自動從 messages 轉換為 chat_history 格式 chat_history = [{"role": msg["role"], "content": msg["content"]} for msg in st.session_state.messages] # 顯示聊天歷史 for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) # 輸入框:像聊天室一樣在底部 if prompt := st.chat_input("輸入您的問題..."): # 追加使用者訊息到歷史 st.session_state.messages.append({"role": "user", "content": prompt}) # 顯示使用者訊息 (右邊) with st.chat_message("user"): st.markdown(prompt) # 準備請求資料 request_data = { "query": prompt, "mode": selected_mode, "chat_history": chat_history, # 包含最新使用者訊息 "metadata": metadata } # 顯示機器人訊息 placeholder (左邊) with st.chat_message("assistant"): # 進度條 (僅 progress 模式明顯) if selected_mode == "progress": progress_bar = st.progress(0) status_placeholder = st.empty() # 回應內容 placeholder response_placeholder = st.empty() # 非同步處理串流 (使用 threading) queue = Queue() stop_event = threading.Event() def fetch_response(): try: response = requests.post( backend_url, json=request_data, stream=True, headers={"Content-Type": "application/json"}, timeout=60 # 較長 timeout 適合 LLM ) response.raise_for_status() full_response = "" if selected_mode == "streaming": # Streaming 模式: 逐字顯示 for chunk in response.iter_content(chunk_size=1, decode_unicode=True): if chunk: full_response += chunk queue.put(chunk) queue.put(None) # 結束標記 elif selected_mode == "progress": # Progress 模式: 解析 SSE 並更新進度 current_step = "" for line in response.iter_lines(decode_unicode=True): if line: line_str = line.decode('utf-8') if isinstance(line, bytes) else line if line_str.startswith("data: "): data = line_str[6:].strip() if data == "[DONE]": break # 解析步驟並更新 if "開始處理查詢" in data: current_step = "開始處理..." queue.put(("status", "開始處理查詢...")) elif "文件檢索完成" in data: current_step = "文件檢索完成" queue.put(("progress", 0.33)) queue.put(("status", "文件檢索完成")) elif "上下文建構完成" in data: current_step = "上下文建構完成" queue.put(("progress", 0.66)) queue.put(("status", "上下文建構完成")) elif "生成中" in data: current_step = "生成中..." queue.put(("status", "生成中...")) elif "生成完成" in data: current_step = "生成完成" queue.put(("progress", 1.0)) queue.put(("status", "生成完成")) elif "最終回答:" in data: full_response = data.split("最終回答: ", 1)[1].strip() queue.put(("content", full_response)) queue.put(("status", "完成!")) queue.put(("end", full_response)) except requests.exceptions.RequestException as e: error_msg = f"錯誤: {str(e)}" queue.put(("error", error_msg)) queue.put(("end", "")) # 啟動背景執行緒 thread = threading.Thread(target=fetch_response, daemon=True) thread.start() # 處理 queue 中的訊息 (模擬串流更新) full_display = "" while True: try: msg = queue.get(timeout=0.1) if msg is None or msg[0] == "end": # 結束 break msg_type, content = msg if msg_type == "content": # 追加內容 full_display += content response_placeholder.markdown(full_display) elif msg_type == "status" and selected_mode == "progress": status_placeholder.markdown(content) elif msg_type == "progress" and selected_mode == "progress": progress_bar.progress(content) elif msg_type == "error": response_placeholder.markdown(f"**錯誤:** {content}") # 強制重新渲染 time.sleep(0.01) st.rerun() except: pass # 逾時繼續 # 最終追加到歷史 (僅成功時) if full_display: st.session_state.messages.append({"role": "assistant", "content": full_display}) # 清除輸入 st.rerun() # 說明 with st.expander("使用說明"): st.write(""" - **聊天室介面**:左邊為機器人 (assistant) 回答,右邊為使用者 (user) 問題。 - **Streaming 模式**:逐字即時顯示回答,適合聊天體驗。 - **Progress 模式**:顯示步驟進度 (SSE),並在最後顯示完整回答。 - **聊天歷史**:自動維護多輪對話,傳送到後端。 - **Metadata**:側邊欄設定,任意 JSON 傳遞額外資訊。 - 確保後端在 localhost:8000 運行。 - 安裝依賴: `pip install streamlit requests` - 執行: `streamlit run chat_app.py` """)