import streamlit as st from langchain_openai import ChatOpenAI from pkg.config import cfg with st.sidebar: st.button("Clear Chat", on_click=lambda: st.session_state.pop("messages", None), width="stretch") st.title("MCP Chatbot") st.caption("🚀 A Streamlit chatbot powered by Qwen") llm = ChatOpenAI( base_url=cfg.llm_base_url, model=cfg.llm_model, api_key=cfg.llm_api_key, temperature=0.3, ) # 堆代码 duidaima.com # Initialize chat history if "messages" not in st.session_state: st.session_state["messages"] = [] # Display chat messages from history on app rerun for msg in st.session_state.messages: st.chat_message(msg["role"]).markdown(msg["content"]) # React to user input if prompt := st.chat_input(placeholder="What's up?"): st.session_state.messages.append({"role": "user", "content": prompt}) # st.chat_message("user").write(prompt) with st.chat_message("user"): st.markdown(prompt) def steam_llm(): for chunk in llm.stream(input=st.session_state.messages): yield chunk.content with st.chat_message("assistant"): msg = st.write_stream(steam_llm()) st.session_state.messages.append({"role": "assistant", "content": msg})在上述代码中,我们创建了一个基础的聊天机器人应用:
4.通过llm.stream方法实现了流式响应,在用户界面上逐字显示AI回复
from http import HTTPStatus from typing import Sequence import uvicorn from fastapi import FastAPI from fastapi.responses import (JSONResponse, PlainTextResponse, StreamingResponse) from langchain_core.messages import AIMessage, HumanMessage from langchain_openai import ChatOpenAI from pydantic import BaseModel from pkg.config import cfg from pkg.log import logger app = FastAPI() class Message(BaseModel): role: str content: str class UserAsk(BaseModel): thread_id: str messages: Sequence[Message] llm = ChatOpenAI( model=cfg.llm_model, api_key=cfg.llm_api_key, base_url=cfg.llm_base_url, temperature=0.3, ) async def generate_response(messages: Sequence[Message]): """一个异步生成器,用于实时生成文本""" msgs = [] for m in messages: if m.role in ("human", "user"): msgs.append(HumanMessage(content=m.content)) elif m.role in ("ai", "assistant"): msgs.append(AIMessage(content=m.content)) else: print(f"Unknown role: {m.role}") async for chunk in llm.astream(msgs): # Ensure only string is yielded if hasattr(chunk, "content"): yield str(chunk.content) else: yield str(chunk) @app.get("/health") async def health(): return PlainTextResponse(content="ok", status_code=HTTPStatus.OK) @app.post("/stream") async def post_ask_stream(user_ask: UserAsk): logger.info(f"user_ask: {user_ask}") if not user_ask.messages: return JSONResponse(content={"error": "query is empty"}, status_code=HTTPStatus.BAD_REQUEST) generator = generate_response(user_ask.messages) return StreamingResponse(generator, media_type="text/event-stream") if __name__ == "__main__": uvicorn.run(app, host="127.0.0.1", port=8000)在服务端代码中:
import asyncio import httpx def test_sync_stream(): """同步方式测试流式响应""" print("=== 同步方式测试流式响应 ===") with httpx.stream("POST", "http://127.0.0.1:8000/stream", json={ "thread_id": "test_thread_1", "messages":[{"role": "user", "content": "你好,请介绍一下你自己"}], }) as response: print("响应状态码:", response.status_code) for chunk in response.iter_text(): if chunk: print(chunk, end='', flush=True) print("\n" + "="*50 + "\n") async def test_async_stream(): """异步方式测试流式响应""" print("=== 异步方式测试流式响应 ===") async with httpx.AsyncClient() as client: async with client.stream("POST", "http://127.0.0.1:8000/stream", json={ "thread_id": "test_thread_2", "messages": [{"role": "user", "content": "写一首关于夏天雨天的现代诗"}] }) as response: print("响应状态码:", response.status_code) async for chunk in response.aiter_text(): if chunk: print(chunk, end='', flush=True) print("\n" + "="*50 + "\n") def test_health_endpoint(): """测试健康检查端点""" print("=== 测试健康检查端点 ===") response = httpx.get("http://127.0.0.1:8000/health") print("健康检查响应:", response.text) print("状态码:", response.status_code) print("="*50 + "\n") if __name__ == "__main__": test_health_endpoint() test_sync_stream() asyncio.run(test_async_stream())客户端示例展示了如何使用httpx处理流式响应:
两种方式都使用了httpx的流式API逐块处理响应内容
import httpx import streamlit as st def stream_llm(messages: list = []): with httpx.stream("POST", "http://127.0.0.1:8000/stream", json={"thread_id": "test_thread_1", "messages": messages}) as resp: for chunk in resp.iter_text(): if chunk: yield chunk with st.sidebar: st.button("Clear Chat", on_click=lambda: st.session_state.pop("messages", None), width="stretch") st.title("MCP Chatbot") st.caption("🚀 A Streamlit chatbot powered by Qwen") # Initialize chat history if "messages" not in st.session_state: st.session_state["messages"] = [] # Display chat messages from history on app rerun for msg in st.session_state.messages: st.chat_message(msg["role"]).markdown(msg["content"]) # React to user input if prompt := st.chat_input(placeholder="What's up?"): st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) with st.chat_message("assistant"): msg = st.write_stream(stream_llm(st.session_state.messages)) st.session_state.messages.append({"role": "assistant", "content": msg})集成代码中,我们创建了一个函数stream_llm来通过httpx连接FastAPI后端,并将流式响应传递给Streamlit前端显示。这样就实现了从前端到后端的完整流式处理链路。
pkg/config.py import json from pathlib import Path class Config: def __init__(self): p = Path(__file__).parent.parent / "conf" / "config.json" if not p.exists(): raise FileNotFoundError(f"Config file not found: {p}") self.data = self.read_json(str(p)) def read_json(self, filepath: str) -> dict: with open(filepath, "r") as f: return json.load(f) @property def llm_model(self) -> str: return self.data["llm"]["model"] @property def llm_api_key(self): return self.data["llm"]["api_key"] @property def llm_base_url(self) -> str: return self.data["llm"]["base_url"] @property def server_host(self) -> str: return self.data["server"]["host"] @property def server_port(self) -> int: return self.data["server"]["port"] cfg = Config() pkg/log.py import logging import sys def set_formatter(): """设置formatter""" fmt = "%(asctime)s | %(name)s | %(levelname)s | %(filename)s:%(lineno)d | %(funcName)s | %(message)s" datefmt = "%Y-%m-%d %H:%M:%S" return logging.Formatter(fmt, datefmt=datefmt) def set_stream_handler(): return logging.StreamHandler(sys.stdout) def set_file_handler(): return logging.FileHandler("app.log", mode="a", encoding="utf-8") def get_logger(name: str = "mylogger", level=logging.DEBUG): logger = logging.getLogger(name) formatter = set_formatter() # handler = set_stream_handler() handler = set_file_handler() handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(level) return logger logger = get_logger()