diff --git a/README.md b/README.md index b845933..292d9d8 100644 --- a/README.md +++ b/README.md @@ -13222,9 +13222,6 @@ tools = [web_search, retriever_tool] print("当前窗口内的对话历史:", window_history) ``` - - - #### 【面试题】LLM存储优化-大量长对话如何解决 @@ -13612,27 +13609,8 @@ tools = [web_search, retriever_tool] print("当前对话历史:", memory.load_memory_variables({})["chat_history"], "\n") ``` - - - - - - - - - - - - - - - - - -#### 第6集 LLM复杂记忆存储-多会话隔离案例实战 - -**简介:LLM复杂记忆存储-多会话隔离案例实战** +#### LLM复杂记忆存储-多会话隔离案例实战 * 背景与需求 @@ -13680,7 +13658,7 @@ tools = [web_search, retriever_tool] * 案例实战 - ``` + ```python from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_openai.chat_models import ChatOpenAI from langchain_core.messages import HumanMessage @@ -13744,21 +13722,8 @@ tools = [web_search, retriever_tool] print(f"存储内容:{store}") ``` - - - - - - - - - - - -#### 第7集 再进阶复杂存储-多租户多会话隔离案例实战 - -**简介:再进阶复杂存储-多用户多会话隔离案例实战** +#### 再进阶复杂存储-多租户多会话隔离案例实战 * 需求背景 @@ -13806,7 +13771,7 @@ tools = [web_search, retriever_tool] * 案例实战 - ``` + ```python from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_openai.chat_models import ChatOpenAI from langchain_core.messages import HumanMessage @@ -13892,25 +13857,8 @@ tools = [web_search, retriever_tool] print(f"存储内容:{store}") ``` - - - - - - - - - - - - - - - -#### 第8集 大模型长期记忆解决方案和案例实战《上》 - -**简介:大模型长期记忆解决方案和案例实战** +#### 大模型长期记忆解决方案和案例实战 * 长期记忆的核心需求 @@ -13982,7 +13930,7 @@ tools = [web_search, retriever_tool] * 之前部署的Redis可以卸载,或者更改端口,也可以配置密码,使用方式一样,redis-stack 7.X版本都可以 - ``` + ```dockerfile docker run -d \ --name redis-stack \ -p 6379:6379 \ @@ -13990,30 +13938,6 @@ tools = [web_search, retriever_tool] redis/redis-stack:latest ``` - - - - - - - - - - - - - - - - - - - - -#### 第9集 大模型长期记忆解决方案和案例实战《下》 - -**简介:大模型长期记忆解决方案和案例实战** - * 案例实战 * 安装依赖 @@ -14024,7 +13948,7 @@ tools = [web_search, retriever_tool] * 案例测试 - ``` + ```python from langchain_redis import RedisChatMessageHistory import os @@ -14047,7 +13971,7 @@ tools = [web_search, retriever_tool] * 编码实战 - ``` + ```python from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.messages import AIMessage, HumanMessage from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder @@ -14102,7 +14026,7 @@ tools = [web_search, retriever_tool] * 注意: 安全起见,RedisStack还是需要配置密码,避免入侵挖矿 - ``` + ```dockerfile docker run -d \ --name redis-stack \ -p 6379:6379 \ @@ -14113,7 +14037,7 @@ tools = [web_search, retriever_tool] * 多租户多会话方案 - ``` + ```python from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_openai.chat_models import ChatOpenAI from langchain_core.messages import HumanMessage @@ -14205,26 +14129,10 @@ tools = [web_search, retriever_tool] print(get_session_history("user_2","session_1").messages) ``` - - - - - - - - - - -![logo](D:/学习/2025/AI智能化云盘学习笔记/笔记/img/image-20230918114907133-5008948.png) **愿景:"IT路上的持续充电平台,让技术不再难学"** -**更多高级课程请访问 xdclass.net** ### 第四十八章 大模型服务必备FastAPI Web框架 - - -#### 第1集 Python Web框架Fast API技术介绍 - -**简介: Python Web框架Fast API技术介绍** +#### Python Web框架Fast API技术介绍 * 需求背景:现代Web开发痛点 @@ -14243,7 +14151,7 @@ tools = [web_search, retriever_tool] - ✅ 强类型安全:Python类型提示 + Pydantic模型,减少Bug。 - 🚀 易学易用:代码简洁,适合快速开发API。 - 适用场景 - * 构建 RESTful API 或 大模型 服务,微服务架构中的独立服务模块。 + * 构建 RESTful API 或 大模型服务,微服务架构中的独立服务模块。 * 需要自动生成 API 文档的项目。 * **核心价值**:FastAPI = 高性能 + 强类型 + 自动文档。 - **优点**: @@ -14280,21 +14188,7 @@ tools = [web_search, retriever_tool] | **Spring Boot** | Java | 企业级功能,但配置复杂、启动慢 | 大型分布式系统 | | **FastAPI** | Python | 高性能、异步、类型安全、自动文档 | 高并发 API、微服务 | - - - - - - - - - - - - -#### 第2集 FastAPI环境安装和基础案例实战 - -**简介: FastAPI环境安装和基础案例实战** +#### FastAPI环境安装和基础案例实战 * 环境安装 @@ -14307,7 +14201,7 @@ tools = [web_search, retriever_tool] * 创建第一个Fast API应用 - ``` + ```python from typing import Union from fastapi import FastAPI @@ -14331,9 +14225,12 @@ tools = [web_search, retriever_tool] * 启动服务 - ``` - #语法:uvicorn 文件的相对路径:实例名 --reload + ```python + # 语法:uvicorn 文件的相对路径:实例名 --reload + # 多级目录使用.,例如: uvicorn xx.xx.main:app -- reload + #--reload:代码修改后自动重启(仅开发环境) + # main:当前目录下的main.py文件,app:文件下的app实例变量 uvicorn main:app --reload #指定程序监听端口 @@ -14342,36 +14239,18 @@ tools = [web_search, retriever_tool] #下面这个方式也可以 fastapi dev main.py ``` - + * 访问API与文档 - + * API地址:`http://localhost:8000` * 交互文档:`http://localhost:8000/docs` * 备用文档( 另一种交互式文档界面,具有清晰简洁的外观 ):`http://localhost:8000/redoc` - - - - - - - - - - - - - - - - -#### 第3集 进阶FastAPI Web多案例参数请求实战 - -**简介: 进阶FastAPI Web多案例参数请求实战** +#### 进阶FastAPI Web多案例参数请求实战 * 路由与 HTTP 方法 - ``` + ```python from fastapi import FastAPI app = FastAPI() @@ -14396,7 +14275,7 @@ tools = [web_search, retriever_tool] * 请求头Header操作 - ``` + ```python from fastapi import Header @app.get("/header") @@ -14407,7 +14286,7 @@ tools = [web_search, retriever_tool] * 获取多个请求头 - ``` + ```python from fastapi import Request @app.get("/all-headers/") def get_all_headers(request: Request): @@ -14416,7 +14295,7 @@ tools = [web_search, retriever_tool] * 自定义响应头 - ``` + ```python from fastapi.responses import JSONResponse @app.get("/custom-header/") def set_custom_header(): @@ -14431,7 +14310,7 @@ tools = [web_search, retriever_tool] * 自定义响应码 - ``` + ```python from fastapi import FastAPI, status,Response @app.get("/status_code", status_code=200) @@ -14441,19 +14320,8 @@ tools = [web_search, retriever_tool] return {"name": name} ``` - - - - - - - - - -#### 第4集 FastAPI异步编程async-await多案例实战 《上》 - -**简介: FastAPI异步编程async-await多案例实战** +#### FastAPI异步编程async-await多案例实战 * 需求背景 @@ -14461,7 +14329,7 @@ tools = [web_search, retriever_tool] * 🧋 **同步(Synchronous)就像传统奶茶店** - ``` + ```markdown 你: 老板,我要一杯珍珠奶茶 店员: 开始制作(5分钟) 你: 干站着等,不能做其他事 @@ -14472,7 +14340,7 @@ tools = [web_search, retriever_tool] * 🚀 异步(Asynchronous)像智能奶茶店 - ``` + ```markdown 你: 扫码下单珍珠奶茶 系统: 收到订单(生成取餐号) 你: 去旁边座位玩手机(不用干等) @@ -14483,7 +14351,7 @@ tools = [web_search, retriever_tool] * 对应到代码中 - ``` + ```markdown # 同步代码(传统奶茶店模式) def make_tea(): print("开始煮茶") # 👨🍳 店员开始工作 @@ -14503,7 +14371,7 @@ tools = [web_search, retriever_tool] * 同步版本(存在性能瓶颈) - ``` + ```python import requests @app.get("/sync-news") @@ -14516,7 +14384,7 @@ tools = [web_search, retriever_tool] * 异步版本(高效并发) - ``` + ```python import httpx @app.get("/async-news") @@ -14558,7 +14426,7 @@ tools = [web_search, retriever_tool] - 通过 `asyncio.run()` 或手动创建事件循环来启动。 - ``` + ```python # 启动事件循环并运行协程 asyncio.run(my_coroutine()) ``` @@ -14569,7 +14437,7 @@ tools = [web_search, retriever_tool] - 通过 `asyncio.create_task()` 创建任务 - ``` + ```python async def main(): task = asyncio.create_task(my_coroutine()) await task @@ -14589,7 +14457,7 @@ tools = [web_search, retriever_tool] * 同步请求案例 - ``` + ```python import httpx # GET 请求 response = httpx.get("https://httpbin.org/get") @@ -14599,7 +14467,7 @@ tools = [web_search, retriever_tool] * 异步请求案例 - ``` + ```python import httpx import asyncio @@ -14613,7 +14481,7 @@ tools = [web_search, retriever_tool] * 使用客户端实例(推荐) - ``` + ```python # 同步客户端 with httpx.Client() as client: response = client.get("https://httpbin.org/get") @@ -14623,27 +14491,12 @@ tools = [web_search, retriever_tool] response = await client.get("https://httpbin.org/get") ``` - - - - - - - - - - - - -#### 第5集 FastAPI异步编程async-await多案例实战 《下》 - -**简介: FastAPI异步编程async-await多案例实战** * 案例实战 * 基础同步和异步编程 - ``` + ```python from fastapi import FastAPI import asyncio import time @@ -14671,7 +14524,7 @@ tools = [web_search, retriever_tool] * 综合案例实战:并发调用外部API - ``` + ```python import asyncio import httpx from fastapi import FastAPI @@ -14719,11 +14572,9 @@ tools = [web_search, retriever_tool] return results ``` - - * 常见错误模式演示 - ``` + ```python # 错误示例:在async函数中使用同步阻塞 @app.get("/wrong") async def bad_example(): @@ -14735,50 +14586,32 @@ tools = [web_search, retriever_tool] await asyncio.sleep(5) ``` - - * 最佳实践指南 * **必须使用async的三种场景** - * 需要await调用的异步库操作 * WebSocket通信端点 * 需要后台任务处理的接口 - + * **以下情况适合同步路由** - + - 纯CPU计算(如数学运算) - 使用同步数据库驱动 - 快速返回的简单端点 - + * **路由声明规范** - + * 所有路由优先使用`async def` * 仅在必须使用同步操作时用普通 `def` - + * **性能优化建议** - + - 保持async函数轻量化 - 长时间CPU密集型任务使用后台线程 - 使用连接池管理数据库/HTTP连接 + - - - - - - - - - - - - - - -#### 第6集 进阶FastAPI模型Pydantic 案例实战 - -**简介: 进阶FastAPI模型Pydantic 案例实战** +#### 进阶FastAPI模型Pydantic 案例实战 * 什么是数据模型 @@ -14788,7 +14621,7 @@ tools = [web_search, retriever_tool] * 基础模型定义 - ``` + ```python from pydantic import BaseModel # 用户注册模型 @@ -14802,7 +14635,7 @@ tools = [web_search, retriever_tool] * 请求体(POST/PUT数据) - ``` + ```python from pydantic import BaseModel,Field class Product(BaseModel): name: str @@ -14824,7 +14657,7 @@ tools = [web_search, retriever_tool] * 返回 Pydantic 模型 - ``` + ```python from pydantic import BaseModel class Item(BaseModel): name: str @@ -14846,7 +14679,7 @@ tools = [web_search, retriever_tool] * 密码强度验证 - ``` + ```python from pydantic import BaseModel, field_validator class UserRegistration(BaseModel): @@ -14877,36 +14710,9 @@ tools = [web_search, retriever_tool] +### FastAPI框架高级技能和综合项目实战 - - - - - - - - - - - - - - - - - - - -![logo](D:/学习/2025/AI智能化云盘学习笔记/笔记/img/image-20230918114907133-5008948.png) **愿景:"IT路上的持续充电平台,让技术不再难学"** -**更多高级课程请访问 xdclass.net** - -### 第四十九章 FastAPI框架高级技能和综合项目实战 - - - -#### 第1集 FastAPI路由管理APIRouter案例实战 - -**简介: FastAPI 路由管理APIRouter案例实战** +#### FastAPI路由管理APIRouter案例实战 * 需求背景 @@ -14914,7 +14720,7 @@ tools = [web_search, retriever_tool] * 所有接口混杂在一个文件中,随着功能增加,文件会变得臃肿难维护。 - ``` + ```python # main.py from fastapi import FastAPI @@ -14937,7 +14743,7 @@ tools = [web_search, retriever_tool] * 每个接口都要重复写`/api/v1`前缀和`tags`,容易出错且难以统一修改 - ``` + ```python # 用户接口 @app.get("/api/v1/users", tags=["用户管理"]) def get_users_v1(): ... @@ -14951,7 +14757,7 @@ tools = [web_search, retriever_tool] * 需要在每个管理员接口重复添加认证依赖。 - ``` + ```python @app.get("/admin/stats", dependencies=[Depends(admin_auth)]) def get_stats(): ... @@ -14965,7 +14771,7 @@ tools = [web_search, retriever_tool] * 按业务模块拆分,每个文件职责单一。 - ``` + ```python # 文件结构 routers/ ├── users.py # 用户路由 @@ -14981,7 +14787,7 @@ tools = [web_search, retriever_tool] * 使用路由管理 - ``` + ```python # 创建管理员专属路由组 admin_router = APIRouter( prefix="/admin", @@ -15002,7 +14808,7 @@ tools = [web_search, retriever_tool] * 基础使用模板 - ``` + ```python from fastapi import APIRouter router = APIRouter( @@ -15029,7 +14835,7 @@ tools = [web_search, retriever_tool] * 创建文件` app/users.py` - ``` + ```python from fastapi import APIRouter, HTTPException from pydantic import BaseModel router = APIRouter( @@ -15065,7 +14871,7 @@ tools = [web_search, retriever_tool] * 创建文件 `app/products.py` - ``` + ```python # app/api/v1/products.py from fastapi import APIRouter router = APIRouter( @@ -15091,7 +14897,7 @@ tools = [web_search, retriever_tool] * 创建入口文件 `main.py` - ``` + ```python from fastapi import FastAPI from app import users, products import uvicorn @@ -15134,7 +14940,7 @@ tools = [web_search, retriever_tool] * 模块化结构参考 - ``` + ```python #整体项目结构 project/ ├── main.py @@ -15160,27 +14966,7 @@ tools = [web_search, retriever_tool] - - - - - - - - - - - - - - - - - - -#### 第2集 FastAPI依赖注入和常见项目结构设计 - -**简介: FastAPI依赖注入和常见项目结构设计** +#### FastAPI依赖注入和常见项目结构设计 * 什么是依赖注入 @@ -15207,7 +14993,7 @@ tools = [web_search, retriever_tool] * 依赖项可以是任何可调用对象(如函数、类),通过 `Depends()` 声明依赖关系 - ``` + ```python from fastapi import Depends, FastAPI app = FastAPI() @@ -15229,7 +15015,7 @@ tools = [web_search, retriever_tool] * 依赖项也可以是类,适合需要初始化或状态管理的场景, 通过依赖注入机制,将复杂逻辑解耦为可复用的模块 - ``` + ```python #案例一 class DatabaseSession: def __init__(self): @@ -15265,7 +15051,7 @@ tools = [web_search, retriever_tool] * 为所有路由添加公共依赖项(如统一认证) - ``` + ```python app = FastAPI(dependencies=[Depends(verify_token)]) # 或针对特定路由组: @@ -15285,7 +15071,7 @@ tools = [web_search, retriever_tool] * 基础分层结构(适合小型项目) - ``` + ```python myproject/ ├── main.py ├── routers/ @@ -15298,7 +15084,7 @@ tools = [web_search, retriever_tool] * 模块化拆分结构一(推荐中型项目) - ``` + ```python src/ ├── app/ │ ├── core/ # 核心配置 @@ -15320,7 +15106,7 @@ tools = [web_search, retriever_tool] * 模块化拆分结构二(推荐中型项目) - ``` + ```python myproject/ ├── app/ # 应用核心目录 │ ├── core/ # 全局配置和工具 @@ -15363,29 +15149,7 @@ tools = [web_search, retriever_tool] | 版本控制 | 通过URL路径实现API版本管理 | | 文档友好 | 为每个路由添加summary和description | - - - - - - - - - - - - - - - - - - - - -#### 第3集 FastAPI+大模型流式AI问答助手实战《上》 - -**简介: FastAPI+大模型流式AI问答助手实战《上》** +#### FastAPI+大模型流式AI问答助手实战 * 需求 @@ -15422,7 +15186,7 @@ tools = [web_search, retriever_tool] * 案例实操 - ``` + ```python async def ai_qa_stream_generator(query: str): """生成A回答的流式响应""" try: @@ -15451,7 +15215,7 @@ tools = [web_search, retriever_tool] * 编码实战 `app/ai_writer.py` - ``` + ```python from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser @@ -15511,31 +15275,9 @@ tools = [web_search, retriever_tool] - - - - - - - - - - - - - - - - - - -#### 第4集 FastAPI+大模型流式AI问答助手实战《下》 - -**简介: FastAPI+大模型流式AI问答助手实战《下》** - * 编码实战 `writer_app.py` - ``` + ```python import uvicorn from fastapi import FastAPI from fastapi.responses import StreamingResponse @@ -15579,3 +15321,1527 @@ tools = [web_search, retriever_tool] * Apifox 提供了专门的 SSE(Server-Sent Events)调试功能,适合处理 AI 大模型的流式响应场景 * 配置 SSE,选择接口的请求方法,在请求头中添加 `Accept: text/event-stream` 来启用 SSE。 + + + +### LLM大模型Web服务智能体中心开发实战 + +#### AI智能体中心Web服务架构和相关环境创建 + +* 需求说明 + + * 基于前面学的`langchain+fastapi`+多个知识点,综合开发大模型Web服务,包括多个智能体 + + * 部分智能体可以结合网盘业务进行,部分智能体可以独立运行 + + * 注意 + + * **智能体中心包括多个应用,非强行绑定,可以结合自己公司业务灵活切换包装简历** + * **要学会举一反三,结合前面的内容进行拓展更多,比如AI医生、AI律师、AI政务等** + + * 智能体业务架构图 + + image-20250418190055953 + +* 项目依赖环境创建 + + * 项目名称:`dcloud-ai-agent` + + * 创建虚拟环境 + + ``` + # 语法:python -m venv <环境目录名> + python -m venv myenv + ``` + + * 激活虚拟环境 + + ``` + source myenv/bin/activate + ``` + + * 安装核心依赖包 (**版本和课程保持一致,不然很多不兼容!!!**) + + * 下载相关资料 ,使用**【wget】或者【浏览器】远程下载相关依赖包(需要替换群里最新的)** + + ``` + 原生资料下载方式(账号 - 密码 - ip地址 - 端口 需要替换群里最新的,【其他路径不变】) + wget --http-user=用户名 --http-password=密码 http://ip:端口/dcloud_pan/aipan_agent_1.zip + + + #比如 命令行下 + wget --http-user=admin --http-password=xdclass.net888 http://47.115.31.28:9088/dcloud_pan/aipan_agent_1.zip + + + # 比如 浏览器直接访问 + http://47.115.31.28:9088/dcloud_pan/aipan_agent_1.zip + ``` + + * 解压后执行【**依赖很多,版本差异大,务必按照下面执行,否则课程无法进行下去,加我微信 xdclass6**】 + + ``` + # 安装依赖 + pip install -r requirements.txt + ``` + + +#### AI智能体中心Web项目搭建和基础配置 + +* 项目基本目录结构-创建对应的目录 + + ```python + dcloud-agent/ + ├── agents/ # 智能代理相关代码 + ├── app/ # 应用主目录 + │ └── main.py # 应用入口文件 + ├── core/ # 核心功能模块 + ├── models/ # 数据模型定义 + ├── routers/ # API路由定义 + ├── services/ # 业务服务层 + ├── tests/ # 测试代码 + ├── tools/ # 工具函数和辅助类 + ├── myenv/ # Python虚拟环境 + ├── Dockerfile # Docker容器配置 + ├── README.md # 项目说明文档 + └── requirements.txt # 项目依赖包列表【不提供,参考上集,进大课群下载】 + ``` + +* 编写主入口文件 + + ```python + import uvicorn + from fastapi import FastAPI + from core.config import settings + from routers import chat + app = FastAPI( + title=settings.APP_NAME, + description="AI智能体中心API服务", + version="1.0.0" + ) + + # 注册路由 + app.include_router(chat.router) + + @app.get("/") + async def root(): + return { + "message": "欢迎使用AI智能体中心API", + "version": "1.0.0", + "available_agents": ["chat"] # 列出可用的智能体 + } + + # 启动服务器的命令 uvicorn app.main:app --reload + if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=8000) + ``` + + + +* 隐藏pychche文件 + + * 打开 首选项 --> 设置 --> 搜索框中 搜索 files.exclude --> 点击 "添加模式" 按钮 + * 输入想要隐藏的文件夹, 如python经常生成的缓存文件夹, 输入 `**/__pycache`__ 即可隐藏 `__pycache__` 文件夹. + + ![image-20250414104551285](D:/学习/2025/AI智能化云盘学习笔记/笔记/img/image-20250414104551285.png) + + + +#### AI智能体中心FastAPI自定义异常+统一响应 + +**简介: AI智能体中心FastAPI自定义异常+统一响应** + +* 需求 + + * Web响应一般都会进行自定义异常+统一响应 + * 前端方便对接和记录相关业务操作日志 + +* 编码实战 + + * 自定义异常,创建一个 `exceptions.py` 文件 + + * 定义一个继承自 `Exception` 的自定义异常类,可以添加额外的字段(如错误码、错误信息等) + * 定义一个接收 `Request` 和 `Exception` 参数的函数,返回标准的 `JSONResponse` + + ```python + from fastapi import HTTPException, status + from fastapi.responses import JSONResponse + from typing import Any + from models.json_response import JsonData + + class ApiException(Exception): + """API异常基类""" + def __init__( + self, + msg: str = "操作失败", + code: int = -1, + data: Any = None + ): + self.msg = msg + self.code = code + self.data = data + super().__init__(msg) + + async def api_exception_handler(request, exc: Exception) -> JSONResponse: + """统一异常处理器""" + print(f"❌ API异常: {str(exc)}") + if isinstance(exc, ApiException): + response = JsonData.error(msg=exc.msg, code=exc.code) + elif isinstance(exc, HTTPException): + response = JsonData.error(msg=str(exc.detail), code=exc.status_code) + else: + # 处理其他所有异常,包括验证错误 + response = JsonData.error(msg=str(exc), code=-1) + + return JSONResponse( + status_code=status.HTTP_200_OK, + content=response.model_dump() + ) + + + + # main.py文件注册统一异常处理器 + app.add_exception_handler(ApiException, api_exception_handler) + ``` + + * 统一响应JsonData,创建`json_response.py`文件 + + ```python + from pydantic import BaseModel + from typing import Any, Optional, Literal + + class JsonData(BaseModel): + """通用响应数据模型""" + code: int = 0 # 状态码 + data: Optional[Any] = None # 响应数据 + msg: str = "" # 响应消息 + type: Literal["stream", "text"] = "text" # 响应类型 + + @classmethod + def success(cls, data: Any = None) -> "JsonData": + """创建成功响应""" + return cls(code=0, data=data, type="text") + + @classmethod + def error(cls, msg: str = "error", code: int = -1) -> "JsonData": + """创建错误响应""" + return cls(code=code, msg=msg, type="text") + + @classmethod + def stream_data(cls, data: Any, msg: str = "") -> "JsonData": + """创建流式数据响应""" + return cls(code=0, data=data, msg=msg, type="stream") + + ``` + + +#### AI智能体中心配置文件和模型方法开发实战 + +* 需求 + + * 统一管理配置文件,包括JWT配置、数据库、分布式缓存、大模型参数 + * 封装LLM方法,方便切换模型 + +* 编码实战 + + * 配置文件 `core/config.py` + + ```python + from pydantic_settings import BaseSettings + + class Settings(BaseSettings): + # Redis配置 + REDIS_HOST: str = "47.119.128.20" + REDIS_PORT: int = 6379 + REDIS_DB: int = 0 + REDIS_PASSWORD: str = "abc123456" + REDIS_MAX_CONNECTIONS: int = 10 + + # MySQL配置 + MYSQL_HOST: str = "39.108.115.28" + MYSQL_PORT: int = 3306 + MYSQL_USER: str = "root" + MYSQL_PASSWORD: str = "xdclass.net168" + MYSQL_DATABASE: str = "dcloud_aipan" + MYSQL_CHARSET: str = "utf8mb4" + + # 应用配置 + APP_NAME: str = "AI智能体中心API服务" + DEBUG: bool = False + + # JWT配置 + JWT_SECRET_KEY: str = "xdclass.net168xdclass.net168xdclass.net168xdclass.net168" + JWT_ALGORITHM: str = "HS256" + JWT_ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 + JWT_LOGIN_SUBJECT: str = "XDCLASS" + + # LLM配置 + LLM_MODEL_NAME: str = "qwen-plus" + LLM_BASE_URL: str = "https://dashscope.aliyuncs.com/compatible-mode/v1" + LLM_API_KEY: str = "sk-005c3c25f6d042848b29d75f2f020f08" + LLM_TEMPERATURE: float = 0.7 + LLM_STREAMING: bool = True + + class Config: + env_file = ".env" + case_sensitive = True + + settings = Settings() + ``` + + * 注意:也可以创建一个 .env 文件来存储敏感信息(如 API_KEY),并将其添加到 .gitignore 中,避免敏感信息泄露。 + + * 大模型方法获取 `core/llm.py` + + ```python + from langchain_openai import ChatOpenAI + from core.config import settings + + + def get_default_llm(): + """获取LLM模型""" + return ChatOpenAI( + name = settings.LLM_MODEL_NAME, + base_url = settings.LLM_BASE_URL, + api_key = settings.LLM_API_KEY, + temperature = settings.LLM_TEMPERATURE, + streaming = settings.LLM_STREAMING + ) + ``` + + +#### AI智能体中心登录JWT和日志处理实战 + +**简介: AI智能体中心登录JWT和日志处理实战** + +* 需求 + + * 部分智能体需要登录才可以进行使用,因此需要加入JWT解析的逻辑 + * 和Java采用一样的JWT方式进行,封装相关方法和密钥 + * 链路一:前端传递token->后端Java透传->fastapi处理 + * 链路二:前端传递token->fastapi处理 + * 快速掌握logging模块使用,替代print + +* 编码实战 + + * logging介绍 + + * 是 Python 标准库中用于记录日志的模块,提供了灵活的日志记录机制,支持多级别日志 + + * 替代 `print` 调试的工业级解决方案 + + ``` + import logging + + # 默认使用 WARNING 级别,输出到控制台 + logging.warning("This is a warning message") # 输出 + logging.info("This is an info message") # 不输出 + ``` + + * 通过 logging.basicConfig 配置 + + ```python + import logging + + logging.basicConfig( + filename="app.log", # 输出到文件 + level=logging.INFO, # 记录器级别 + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" + ) + + logging.info("Program started") # 写入 app.log + ``` + + * 推荐方案,为模块或组件创建独立的记录器 + + ```python + logger = logging.getLogger(__name__) # 推荐方式 + ``` + + + + * 创建 `auth.py`文件 + + ```python + from fastapi import Depends, Header + from jose import JWTError, jwt + from typing import Dict, Any, Optional + import logging + from datetime import datetime, timedelta, UTC + import os + import sys + from core.config import settings + from core.exceptions import ApiException + + logger = logging.getLogger(__name__) + + def remove_prefix(token: str) -> str: + """移除token的前缀""" + return token.replace(settings.JWT_LOGIN_SUBJECT, "").strip() if token.startswith(settings.JWT_LOGIN_SUBJECT) else token + + async def get_current_user(token: Optional[str] = Header(None)) -> Dict[str, Any]: + """ + 获取当前用户信息 + + Args: + token: JWT token字符串,从请求头中获取 + + Returns: + Dict[str, Any]: 包含用户信息的字典,格式为 {"account_id": int, "username": str} + + Raises: + ApiException: 当token无效、过期或解析失败时抛出异常 + """ + try: + # 验证token是否存在 + if not token or not token.strip(): + raise ApiException(msg="Token不能为空", code=-1) + + # 移除前缀并解析token + token = remove_prefix(token.strip()) + payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]) + + # 验证token内容 + if payload.get("sub") != settings.JWT_LOGIN_SUBJECT or not payload.get("accountId"): + raise ApiException(msg="无效的认证凭据", code=-1) + + # 记录日志并返回用户信息 + logger.info(f"JWT解密成功,accountId: {payload.get('accountId')}, username: {payload.get('username')}") + return {"account_id": payload.get("accountId"), "username": payload.get("username")} + + except Exception as e: + logger.error(f"JWT处理失败: {str(e)}") + raise ApiException(msg="无效的认证凭据", code=-1) + ``` + + + + + +### AI智能体中心实战之聊天智能助理-可联网 + +#### AI聊天智能助理需求讲解和路由开发实战 + +* 需求 + + * 开发AI聊天智能助理, 基于 Redis 实现聊天历史存储,支持流式响应(Streaming Response) + + - 技术亮点: + + - 智能对话能力: + + - 支持多轮对话,保持上下文连贯性 + + - 可以理解用户意图并调用合适的工具 + + - 能够生成对话摘要,帮助理解对话历史 + + - 实时信息获取: + + - 可以回答需要实时数据的问题 + - 通过搜索工具获取最新信息,搜索结果会被智能整合到回答中 + + - 高性能设计: + + - 支持流式响应,提升用户体验 + + - 使用 Redis 缓存,提高响应速度,异步处理,提高并发能力 + + - 可扩展性: + + - 模块化设计,易于添加新功能 + + - 工具系统可扩展,支持添加新工具 + + * AI智能体业务使用场景(包装简历和项目): + + - 日常对话和问答、实时信息查询 + + - 多轮对话交互、知识获取和分享 + +* 编码实战 + + * 创建模型类 `chat_schemas.py` + + ```python + from pydantic import BaseModel + from typing import List + + class ChatMessage(BaseModel): + role: str + content: str + timestamp: str + + class ChatHistoryResponse(BaseModel): + messages: List[ChatMessage] + + class ChatRequest(BaseModel): + message: str + ``` + + * 创建 `chat_service.py` + + ```python + import json + from typing import List, Dict + import redis + from datetime import datetime + from core.config import settings + from core.llm import get_default_llm + from models.json_response import JsonData + import logging + + logger = logging.getLogger(__name__) + + class ChatService: + def __init__(self): + self.redis_client = redis.Redis( + host=settings.REDIS_HOST, + port=settings.REDIS_PORT, + db=settings.REDIS_DB, + password=settings.REDIS_PASSWORD, + decode_responses=True + ) + self.llm = get_default_llm() + ``` + + * 开发路由`chat.py` + + ```python + from fastapi import APIRouter, Depends + from fastapi.responses import StreamingResponse + from services.chat_service import ChatService + from models.chat_schemas import ChatRequest, ChatHistoryResponse + from models.json_response import JsonData + from core.auth import get_current_user + from core.exceptions import ApiException + from typing import Dict, Any + + router = APIRouter(prefix="/api/chat", tags=["聊天助手"]) + + @router.post("/stream") + async def chat_stream( + request: ChatRequest, + chat_service: ChatService = Depends(ChatService), + current_user: Dict[str, Any] = Depends(get_current_user) + ): + """聊天接口(流式响应)""" + # 使用当前用户的 account_id + account_id = current_user["account_id"] + ``` + + +#### AI聊天智能助理之聊天会话持久化设计 + +* 需求说明 + + * 采用Redis进行持久化用户的聊天记录 + + image-20250421203059141 + + * **数据结构**聊天历史:JSON数组,每个元素为一条消息,包含以下字段 + + ```json + { + "role": "user/assistant", + "content": "消息内容", + "timestamp": "ISO时间戳" + } + ``` + + * 设计相关方法 + + * **保存聊天历史** + * 将用户的聊天记录以JSON格式存储到Redis中。 + * Redis键格式:`chat_history:{account_id}`。 + * **获取聊天历史** + * 从Redis中读取指定用户的聊天记录, 如果Redis中无记录,则返回空列表。 + * **添加消息** + * 支持向用户的聊天历史中追加新的消息, 消息包含以下字段: + * `role`: 消息角色(如`user`或`assistant`)。 + * `content`: 消息内容。 + * `timestamp`: 消息时间戳(ISO格式)。 + * **清空聊天历史** + * 清除指定用户的聊天历史及相关摘要信息。 + +* 编码实战 `chat_service.py` + + ```python + def _get_chat_key(self, account_id: str) -> str: + """获取用户聊天记录的Redis key""" + return f"chat_history:{account_id}" + + + def save_chat_history(self, account_id: str, messages: List[Dict]): + """保存聊天历史到Redis""" + chat_key = self._get_chat_key(account_id) + self.redis_client.set(chat_key, json.dumps(messages)) + + def get_chat_history(self, account_id: str) -> List[Dict]: + """从Redis获取聊天历史""" + chat_key = self._get_chat_key(account_id) + history = self.redis_client.get(chat_key) + return json.loads(history) if history else [] + + def add_message(self, account_id: str, role: str, content: str): + """添加一条消息到聊天历史""" + messages = self.get_chat_history(account_id) + messages.append({ + "role": role, + "content": content, + "timestamp": datetime.now().isoformat() + }) + self.save_chat_history(account_id, messages) + + def clear_chat_history(self, account_id: str): + """清空用户的聊天历史""" + chat_key = self._get_chat_key(account_id) + summary_key = self._get_summary_key(account_id) + self.redis_client.delete(chat_key) + self.redis_client.delete(summary_key) + + def save_chat_messages( + self, + account_id: str, + user_message: str, + assistant_message: str + ): + """保存对话消息""" + self.add_message(account_id, "user", user_message) + self.add_message(account_id, "assistant", assistant_message) + ``` + +#### AI聊天智能助理之多轮长对话摘要生成 + +* 背景:为啥多轮对话要生成摘要 + + - 技术必要性:突破大模型token长度限制 + - 信息保留率:需保持85%以上的关键信息完整度 + - 推理成本控制:减少30-50%的重复计算 + - 延迟优化:缩短响应时间200-500ms + - 注意:摘要生成核心还是提示词工程 + +* 业务逻辑设计 + + * 基于用户的聊天历史,通过调用LLM(大语言模型)生成简洁的核心摘要。 + * 摘要要求: + * 突出对话的主要话题和关键信息。 + * 使用第三人称描述,提取重要数据、时间节点、待办事项等。 + * 保留原始对话中的重要细节。 + * 包含最新的对话内容。 + * **摘要合并** + - 如果用户已有旧摘要,需将新生成的摘要与旧摘要合并。 + - 合并要求: + - 保留两个摘要中的重要信息。 + - 突出对话的主要话题和关键信息。 + - 使用第三人称描述,提取重要数据、时间节点、待办事项等。 + - 保留原始对话中的重要细节。 + - 包含最新的对话内容。 + * **摘要存储** + - 将生成的摘要存储到Redis中。 + - Redis键格式:`chat_summary:{account_id}`。 + +* 编码实战 `chat_service.py` + + ```python + def _get_summary_key(self, account_id: str) -> str: + """获取聊天摘要的Redis key""" + return f"chat_summary:{account_id}" + + async def generate_summary(self, account_id: str) -> str: + """生成聊天历史的核心摘要""" + try: + # 获取最新的聊天历史 + messages = self.get_chat_history(account_id) + if not messages: + return "" + + # 构建提示词 + prompt = f"""请根据以下对话历史生成一个简洁的核心摘要,突出主要话题和关键信息: + + {json.dumps(messages, ensure_ascii=False, indent=2)} + + 摘要要求: + 1. 突出对话的主要话题和关键信息 + 2. 使用第三人称描述,提取重要数据/时间节点/待办事项 + 3. 保留原始对话中的重要细节 + 4. 确保包含最新的对话内容 + """ + + # 生成新的摘要 + response = await self.llm.ainvoke(prompt) + new_summary = response.content + + # 获取旧的摘要(如果有) + summary_key = self._get_summary_key(account_id) + old_summary = self.redis_client.get(summary_key) + + if old_summary: + # 如果存在旧摘要,生成一个合并的提示词 + merge_prompt = f"""请将以下两个摘要合并成一个新的摘要: + + 旧摘要: + {old_summary} + + 新摘要: + {new_summary} + + 合并要求: + 1. 保留两个摘要中的重要信息 + 2. 突出对话的主要话题和关键信息 + 3. 使用第三人称描述,提取重要数据/时间节点/待办事项 + 4. 保留原始对话中的重要细节 + 5. 确保包含最新的对话内容 + """ + + # 生成合并后的摘要 + merge_response = await self.llm.ainvoke(merge_prompt) + final_summary = merge_response.content + else: + final_summary = new_summary + + # 更新缓存中的摘要 + self.redis_client.set(summary_key, final_summary) + + return final_summary + + except Exception as e: + logger.error(f"生成摘要失败: {str(e)}") + return "" + ``` + + + +#### AI聊天智能助理之联网搜索工具开发 + +* 需求 + + * 聊天助理可以根据用户需求,实时查询最新信息 + * 封装搜索工具,参考前面的代码即可 + * 而且支持拓展更多工具,让AI聊天助理更加智能 + +* 注意工具的参数 `return_direct=True` + + * 当return_direct设置为True时,工具执行完毕后会直接将结果返回给用户,而不会经过Agent的后续处理。 + * Agent不会对工具的输出进行分析或生成进一步的响应,而是立即将结果呈现给用户。 + * 在需要快速返回简单结果或者不希望模型介入后续处理的情况下推荐使用 + + | 参数状态 | 数据流向 | 典型响应示例 | + | :--------------------------- | :-------------------------------------- | :----------------------------- | + | `return_direct=False` (默认) | 工具结果 → Agent → 模型生成自然语言响应 | "查询完成,当前北京气温是25℃" | + | `return_direct=True` | 工具结果 → 直接返回用户 | `{"city": "北京", "temp": 25}` | + +* 编码实战 `chat_tools.py` + + ```python + from langchain_core.tools import tool + from langchain_community.utilities import SearchApiAPIWrapper + import os + os.environ["SEARCHAPI_API_KEY"] = "qQBHMQo4Rk8SihmpJjCs7fML" + + + @tool("web_search", return_direct=False) + def web_search(query: str) -> str: + """ + 使用此工具搜索最新的互联网信息。当你需要获取实时信息或不确定某个事实时使用 + """ + try: + search = SearchApiAPIWrapper() + results = search.results(query) + return "\n\n".join([ + f"来源:{res['title']}\n内容:{res['snippet']}" + for res in results['organic_results'] + ]) + except Exception as e: + return f"搜索失败:{str(e)}" + + def get_chat_tools(): + """获取聊天助手可用的工具集""" + + tools = [ + web_search + ] + + return tools + ``` + + +#### AI聊天智能助理之Agent智能体开发实战 + +* 需求背景 + + * 创建聊天智能体,与智能体进行对话 + * 支持工具调用(如搜索工具) + * 对话历史摘要生成 + +* 知识点:`create_openai_functions_agent` + + * 专门适配 OpenAI 函数调用功能的代理生成器 , 将自然语言指令自动映射到预定义工具函数 + * 实现结构化数据输出与动态行为决策 , 构建符合 ReAct 模式的智能体架构 + +* 创建聊天智能体开发 + + * **功能描述** + + - 创建一个具备对话能力的智能体,支持日常对话、问答以及工具调用。 + - 智能体能够记住用户的对话历史,并在对话中使用摘要信息。 + + * **输入参数** + + - `tools`: 工具列表,用于扩展智能体的功能。 + + * **输出结果** + + - 返回一个`AgentExecutor`实例,代表创建好的聊天智能体。 + + * **实现细节** + + - 使用`create_openai_functions_agent`创建智能体。 + - 定义系统提示词,包含智能体的能力说明和对话摘要占位符。 + - 启用流式响应功能 + + * 编码实现 `chat_agent.py` + + ```python + from langchain.agents import AgentExecutor, create_openai_functions_agent + from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder + from langchain.tools import Tool + from core.llm import get_default_llm + from typing import List, AsyncIterator, Dict, AsyncGenerator + import asyncio + from services.chat_service import ChatService + from tools.chat_tools import get_chat_tools + from models.json_response import JsonData + + def create_chat_agent(tools: List[Tool]): + """创建聊天智能体""" + + system_prompt = """你是一个智能聊天助手。你可以: + 1. 进行日常对话和问答 + 2. 使用搜索工具获取最新信息 + 3. 记住与用户的对话历史 + + 请保持回答专业、友好且准确。如果用户的问题需要最新信息,请使用搜索工具。""" + + prompt = ChatPromptTemplate.from_messages([ + ("system", system_prompt), + ("system", "以下是之前的对话摘要: {summary}"), + ("human", "{input}"), + MessagesPlaceholder(variable_name="agent_scratchpad"), + ]) + + llm = get_default_llm() + llm.streaming = True + + agent = create_openai_functions_agent(llm, tools, prompt) + + agent_executor = AgentExecutor( + agent=agent, + tools=tools, + verbose=True, + handle_parsing_errors=True + ) + + return agent_executo + ``` + + +#### AI聊天智能助理之对话和流式响应开发 + +* 知识点:`AsyncIterator `异步迭代器 + + * Python异步编程中的核心模块,用于在异步环境中逐项处理数据流。 + * 与普通迭代器不同,它能在迭代过程中暂停并执行其他异步操作,特别适合处理I/O密集型任务 + * 比如处理WebSocket连接、批量处理数据库查询结果等异步数据流 + * 注意: 比如忘记使用await 或在错误的地方调用异步迭代器,导致程序阻塞或错误 + + | 特性 | 普通Iterator | AsyncIterator | + | :----------- | :------------------ | :------------------------- | + | **迭代方法** | `__next__()` | `__anext__()` | + | **循环语法** | `for item in iter:` | `async for item in aiter:` | + | **执行模式** | 同步阻塞 | 异步非阻塞 | + | **适用场景** | 内存数据遍历 | 网络流/数据库等异步数据源 | + | **暂停机制** | 无 | 可await其他协程 | + +* **与智能体进行对话业务逻辑开发** + + - **功能描述** + + - 用户通过输入文本与智能体进行对话。 + - 智能体根据对话历史摘要生成回复,并保存对话记录。 + + - **输出结果** + + - 流式返回智能体的回复内容。 + + - **实现细节** + + - 获取用户的对话历史摘要。 + - 调用智能体的`astream`方法生成流式响应。 + - 保存用户输入和智能体回复到对话历史中。 + - 异常处理:如果发生错误,返回错误信息 + + - 编码实战 + + ```python + async def chat_with_agent( + agent_executor: AgentExecutor, + chat_service: ChatService, + account_id: str, + input_text: str + ) -> AsyncIterator[str]: + """与智能体进行对话""" + try: + # 获取最新的历史摘要 + summary = await chat_service.generate_summary(account_id) + + # 执行对话 + async for chunk in agent_executor.astream({ + "input": input_text, + "summary": summary + }): + if "output" in chunk: + response = chunk["output"] + # 保存对话消息(用户输入和助手回复) + await chat_service.save_chat_messages(account_id, input_text, response) + # 流式返回响应 + for token in response: + yield token + await asyncio.sleep(0.01) + + except Exception as e: + yield f"抱歉,处理您的请求时出现错误: {str(e)}" + ``` + +* 流式生成响应 + + - **功能描述** + + - 将智能体的回复内容以流式方式发送给用户。 + - 支持按标点符号或固定长度分割响应内容。 + + - **输入参数** + + - `agent`: 聊天智能体实例。 + - `chat_service`: 聊天服务实例。 + - `account_id`: 用户标识。 + - `message`: 用户输入的文本。 + + - **输出结果** + + - 以SSE(Server-Sent Events)格式返回流式响应。 + + - **实现细节** + + - 逐字符接收智能体的回复内容。 + - 当遇到标点符号或字符长度达到20时,发送当前分块内容。 + - 发送剩余内容后,添加结束标记`[DONE]`。 + + - 编码实战 + + ``` + async def generate_stream_response( + agent: AgentExecutor, + chat_service: ChatService, + account_id: str, + message: str + ) -> AsyncGenerator[str, None]: + """生成流式响应""" + current_chunk = "" + async for token in chat_with_agent(agent, chat_service, account_id, message): + current_chunk += token + # 当遇到标点符号或空格时,发送当前chunk + if token in "。,!?、;:,.!?;: " or len(current_chunk) >= 20: + response = JsonData.stream_data(data=current_chunk) + yield f"data: {response.model_dump_json()}\n\n" + current_chunk = "" + await asyncio.sleep(0.01) + + # 发送剩余的chunk + if current_chunk: + response = JsonData.stream_data(data=current_chunk) + yield f"data: {response.model_dump_json()}\n\n" + + # 发送结束标记 + yield "data: [DONE]\n\n" + ``` + + * 路由层面处理 `chat.py` + + ```python + @router.post("/stream") + async def chat_stream( + request: ChatRequest, + chat_service: ChatService = Depends(ChatService), + current_user: Dict[str, Any] = Depends(get_current_user) + ): + """聊天接口(流式响应)""" + # 使用当前用户的 account_id + account_id = current_user["account_id"] + + # 获取智能体 + agent = await get_chat_agent(chat_service) + + return StreamingResponse( + generate_stream_response(agent, chat_service, account_id, request.message), + media_type="text/event-stream" + ) + ``` + + +#### AI聊天智能助理之全链路测试综合实战 + +**简介: AI聊天智能助理之全链路测试综合实战** + +* 回顾需求 + + * 开发AI聊天智能助理, 基于 Redis 实现聊天历史存储,支持流式响应(Streaming Response) + + - 智能对话能力: + + - 支持多轮对话,保持上下文连贯性 + + - 可以理解用户意图并调用合适的工具 + + - 能够生成对话摘要,帮助理解对话历史 + + - 实时信息获取: + + - 可以回答需要实时数据的问题, + - 通过搜索工具获取最新信息,搜索结果会被智能整合到回答中 + +* 全链路测试实战 + + * 启动项目 `uvicorn app.main:app --reload` + * 准备token ,并且配置到HTTP请求头里面 + * APIFOX增加请求头 `Accept : text/event-stream ` + * 测试案例一 + * 告诉基本当前人员信息(进阶也可以初始化导入用户的历史画像数据) + * 进行基本信息问答,看智能体助理是否有记忆功能 + * 测试案例二 + * 询问当前当前需要联网信息内容,看大模型是否有调用工具进行处理 + * 查看Redis的数据存储:聊天记录和摘要 + * 拓展:不用每次都生成摘要,可以配置进阶策略,比如对话轮次阈值(默认5轮)、关键决策点标记、上下文长度超过XXX + + image-20250421213741268 + +### 第五十二章 AI智能体心实战之AI在线文档助手 + + + +#### AI在线文档助手需求文档和路由开发实战 + +* 需求 + + * AI在线文档助手,提供文档内容获取、解析和智能总结功能 + + * 支持多种文档格式(HTML、PDF等),并能够根据用户需求生成不同形式的文档总结。 + + * 大体流程和聊天助理类似,接下去就会加快速度了哈 + + * 技术亮点 + + * 多格式文档支持:HTML、PDF、纯文本等 + + - 智能文档处理:基于LLM的智能总结和关键点提取 + - 流式处理:支持大文件分块处理和流式响应 + - 多语言支持:支持多种语言的文档处理和总结 + + * 备注 + * 是否需要鉴权登录,取决文档的来源类型 + * 选择网盘里面的相关资源,则需要登录;选择好文件后调用存储接口获取临时访问地址 + * 智能体接受的是在线可以访问的地址,即可实现对应的在线文档解析 + + image-20250427120524719 + +* 编码实战 + + * 创建 请求文件 `doc_schemas.py` + + ```python + from pydantic import BaseModel, HttpUrl + from typing import Optional + + class DocumentRequest(BaseModel): + """文档处理请求""" + url: HttpUrl # 文档URL + summary_type: str = "brief" # 总结类型:brief(简要), detailed(详细), key_points(要点) + language: str = "zh" # 输出语言 + length: Optional[str] = None # 长度限制 + additional_instructions: Optional[str] = None # 额外要求 + ``` + + * 创建业务逻辑处理文件 `doc_service.py` + + ```python + from typing import Any, AsyncIterator + from agents.doc_agent import create_document_agent, process_document + from models.doc_schemas import DocumentRequest + from tools.document_tools import DocumentTools + import gc + + class DocumentService: + ``` + + * 创建路由文件`doc.py` + + ```python + from fastapi import APIRouter, Depends + from models.doc_schemas import DocumentRequest + from models.json_response import JsonData + from services.doc_service import DocumentService + from fastapi.responses import StreamingResponse + import asyncio + + router = APIRouter(prefix="/api/document", tags=["文档助手"]) + + #可以放到service层,也可以放到路由层,或者抽取统一方法 + async def generate_stream_response(request: DocumentRequest, doc_service: DocumentService): + """生成流式响应数据""" + async for chunk in doc_service.process_document_stream(request): + response = JsonData.stream_data(data=chunk) + yield f"data: {response.model_dump_json()}\n\n" + await asyncio.sleep(0.01) + + yield "data: [DONE]\n\n" + + @router.post("/stream") + async def process_document_stream( + request: DocumentRequest, + doc_service: DocumentService = Depends(DocumentService) + ): + """处理文档请求(流式响应)""" + return StreamingResponse( + generate_stream_response(request, doc_service), + media_type="text/event-stream" + ) + ``` + + +#### AI在线文档助手之文档处理工具集开发 + +* 需求背景 + + * 根据用户的需求,智能体提取的文档类型,选择合适的工具进行提取 + + * 实现文档处理服务,支持多种文档格式的处理(HTML、PDF、文本),提供高效的文档内容提取和清理功能 + + * 目前开发PDF和HTML类型,可以根据业务需求自己拓展更多 + + image-20250427125855187 + +* 拓展知识点 + + * `BeautifulSoup`库 + + * 主要是解析 HTML/XML 内容(`html_content`),生成一个结构化的 `BeautifulSoup 对象` + + * 解析器 `html.parser` + + * 是 Python 自带的 HTML 解析器,轻量、无需依赖第三方库。 + + * 方便地提取和操作 HTML 中的元素 + + ```python + from bs4 import BeautifulSoup + + # 假设有一段 HTML 内容 + html_content = "

你好,小滴课堂老王

" + + # 解析 HTML + soup = BeautifulSoup(html_content, 'html.parser') + + # 提取数据 + title = soup.h1.text # 输出:"你好,小滴课堂老王" + ``` + +* 编码实战 `document_tools.py` + + ```python + from typing import Optional, Dict, Any, Generator + import requests + from bs4 import BeautifulSoup + from langchain.tools import Tool + import re + from urllib.parse import urlparse + import time + import io + from PyPDF2 import PdfReader + from tqdm import tqdm + import gc + + class DocumentTools: + """文档处理工具集""" + + @staticmethod + def fetch_document(url: str) -> Dict[str, Any]: + """获取文档内容""" + try: + # 使用流式下载 + response = requests.get(url, stream=True, timeout=30) + response.raise_for_status() + + # 解析文档类型 + content_type = response.headers.get('content-type', '') + print(f"---Content-Type---: {content_type}") + if 'text/html' in content_type: + return DocumentTools._parse_html(response.text) + elif 'application/pdf' in content_type: + return DocumentTools._parse_pdf_stream(response) + else: + return { + "title": urlparse(url).path.split('/')[-1], + "content": response.text, + "type": "text" + } + except Exception as e: + raise Exception(f"获取文档失败: {str(e)}") + + @staticmethod + def _parse_html(html_content: str) -> Dict[str, Any]: + """解析HTML文档""" + soup = BeautifulSoup(html_content, 'html.parser') + + # 提取标题 + title = soup.title.string if soup.title else "未命名文档" + + # 提取正文内容 + content = "" + for p in soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6']): + content += p.get_text() + "\n" + + return { + "title": title, + "content": content.strip(), + "type": "html" + } + + @staticmethod + def _parse_pdf_stream(response: requests.Response) -> Dict[str, Any]: + """流式解析PDF文档""" + pass + ``` + + +* 知识点拓展 + + * `tqdm` + + * 是 Python 中用于在循环或长时间运行的任务中显示进度条的库 + + * 主要功能 + + * 实时进度条:显示当前进度百分比、已用时间、剩余时间、迭代速度等。 + * 低开销:对代码性能影响极小。 + + * 使用 + + ``` + #前面一开始创建项目的时候已经安装了这个依赖 + from tqdm import tqdm + import time + + # 在 for 循环外包裹 tqdm() + for i in tqdm(range(100)): + time.sleep(0.1) # 模拟耗时操作 + ``` + + * `PyPDF2` + + * 是 Python 中一个专门用于处理 PDF 文件的第三方库,支持 PDF 文件的读取、写入、合并、拆分、加密、解密等操作。 + * 功能相对基础,但在处理简单 PDF 任务时非常高效。 + * 建议 + * 对于复杂需求(如表格提取、内容编辑),可以结合其他库(如 `pdfplumber` + `PyPDF2`)等进行选择 + * **`pdfplumber`**: 更适合提取文本、表格和可视化元素。 + * **`PyMuPDF` **: 高性能,支持复杂操作(如 OCR 集成)。 + +* 编码实战 + + ```python + @staticmethod + def _parse_pdf_stream(response: requests.Response) -> Dict[str, Any]: + """流式解析PDF文档 + + Args: + response (requests.Response): 包含PDF文档的响应对象 + + Returns: + Dict[str, Any]: 包含文档标题、内容和类型的字典 + """ + # 创建内存缓冲区 + buffer = io.BytesIO() + + # 获取文件大小 + total_size = int(response.headers.get('content-length', 0)) + + # 使用tqdm显示下载进度 + with tqdm(total=total_size, unit='B', unit_scale=True, desc="下载PDF") as pbar: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + buffer.write(chunk) + pbar.update(len(chunk)) + + # 重置缓冲区位置 + buffer.seek(0) + + # 分块读取PDF内容 + content = "" + try: + # 使用PdfReader替代PdfFileReader + pdf_reader = PdfReader(buffer) + + # 获取文档信息 + info = pdf_reader.metadata + title = info.title if info and info.title else "PDF文档" + + # 分块处理PDF页面 + total_pages = len(pdf_reader.pages) + with tqdm(total=total_pages, desc="处理PDF页面") as pbar: + for page_num in range(total_pages): + page = pdf_reader.pages[page_num] + content += page.extract_text() + "\n" + + # 每处理10页就清理一次内存 + if (page_num + 1) % 10 == 0: + content = DocumentTools._clean_text(content) + gc.collect() + + pbar.update(1) + + except Exception as e: + raise ApiException(f"解析PDF失败: {str(e)}") + finally: + buffer.close() + + return { + "title": title, + "content": DocumentTools._clean_text(content), + "type": "pdf" + } + + @staticmethod + def _clean_text(text: str) -> str: + """清理文本内容 + + Args: + text (str): 待清理的文本内容 + + Returns: + str: 清理后的文本内容 + """ + # 移除多余的空行 + text = re.sub(r'\n\s*\n', '\n\n', text) + # 移除多余的空格 + text = re.sub(r' +', ' ', text) + return text.strip() + + @staticmethod + def create_tools() -> list[Tool]: + """创建文档处理工具集 + + Returns: + list[Tool]: 文档处理工具列表 + """ + return [ + Tool( + name="fetch_document", + func=DocumentTools.fetch_document, + description="获取并解析在线文档内容" + ) + ] + ``` + + +#### AI在线文档助手之文档Agent智能体开发实战 + +* 业务需求 + + * 智能体能够分析用户提供的文档,并根据用户需求生成高质量的总结。 + * 核心功能: + - 分析文档内容并提取关键要点,根据用户指定的总结类型(简要、详细或要点)生成相应的总结。 + - 支持流式输出,确保每个 token 实时返回给用户。 + - 也支持处理用户提出的额外要求。 + +* 编码实战 + + ```python + from langchain.agents import AgentExecutor, create_openai_functions_agent + from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder + from langchain.tools import Tool + from core.llm import get_default_llm + from typing import List, AsyncIterator + import asyncio + from datetime import datetime + + def create_document_agent(tools: List[Tool]): + """创建文档处理智能体""" + + # 定义系统提示词 + system_prompt = """你是一个专业的文档处理助手。你的任务是分析用户提供的文档,生成高质量的总结。 + 你需要: + 1. 仔细阅读并理解文档内容 + 2. 根据用户要求的总结类型(简要/详细/要点)生成相应的总结 + 3. 提取文档的关键要点 + 4. 确保总结准确、全面、易读 + + 如果用户提供了额外的要求,请尽量满足这些要求。""" + + # 创建提示词模板 + prompt = ChatPromptTemplate.from_messages([ + ("system", system_prompt), + ("human", "{input}"), + MessagesPlaceholder(variable_name="agent_scratchpad"), + ]) + + # 获取LLM实例,并配置流式输出 + llm = get_default_llm() + + # 创建智能体 + agent = create_openai_functions_agent(llm, tools, prompt) + + # 创建执行器 + agent_executor = AgentExecutor( + agent=agent, + tools=tools, + verbose=True, + handle_parsing_errors=True + ) + + return agent_executor + + async def process_document(agent_executor: AgentExecutor, input_text: str) -> AsyncIterator[str]: + """处理文档并生成总结""" + print(f"正在处理 input_text: {input_text}") + + async for chunk in agent_executor.astream({"input": input_text}): + if "output" in chunk: + # 确保每个token都能实时返回 + for token in chunk["output"]: + yield token + await asyncio.sleep(0.01) # 控制输出速度 + ``` + + +#### AI在线文档助手之文档Service层处理实战 + +* 需求 + + * 文档处理服务,能够接收用户请求并调用智能体对文档进行分析和总结。 + * 核心功能: + - 接收文档请求并获取文档内容。 + - 根据文档大小决定是否进行分块处理。 + - 使用智能体生成高质量的文档总结,并支持流式输出。 + - 处理大文本时,通过内存管理优化性能。 + +* 编码实战 `doc_service.py` + + ```python + from typing import Any, AsyncIterator + from agents.doc_agent import create_document_agent, process_document + from models.doc_schemas import DocumentRequest + from tools.document_tools import DocumentTools + import gc + + class DocumentService: + """ + 文档处理服务类,负责初始化文档处理所需的工具和智能体,并提供文档处理的接口 + """ + def __init__(self): + # 初始化工具 + self.tools = DocumentTools.create_tools() + # 创建智能体 + self.agent_excutor = create_document_agent(self.tools) + + async def process_document_stream(self, request: DocumentRequest) -> AsyncIterator[str]: + """流式处理文档请求""" + try: + # 获取文档内容 + doc_content = DocumentTools.fetch_document(str(request.url)) + # 根据文档内容长度决定是否分割处理 + content = doc_content['content'] + chunks = [] + # 检查内容长度是否超过1,000,000个字符 + print(f"--------------len(content): {len(content)}") + if len(content) > 1000: + # 初始化当前块变量,用于累积不足1,000,000字符的内容 + current_chunk = "" + # 遍历内容中的每个段落,使用'\n\n'作为分隔符进行分割 + for paragraph in content.split('\n\n'): + # 如果当前块加上新段落后超过1,000,000字符,且当前块非空 + if len(current_chunk) + len(paragraph) > 1000000 and current_chunk: + # 将当前块添加到块列表中,并开始新的块 + chunks.append(current_chunk) + current_chunk = paragraph + else: + # 否则,将当前段落添加到当前块中 + current_chunk = f"{current_chunk}\n\n{paragraph}" if current_chunk else paragraph + # 如果循环结束后当前块非空,将其添加到块列表中 + if current_chunk: + chunks.append(current_chunk) + + else: + # 如果内容长度不超过1,000,000字符,直接将内容作为唯一一个块 + chunks = [content] + + # 处理每个文档片段 + for i, chunk in enumerate(chunks): + # 构建输入文本 + input_text = self._build_input_text( + doc_content['title'], + chunk, + request.summary_type, + request.language, + request.length or '无限制', + request.additional_instructions or '无' + ) + + # 异步处理文档并生成响应 + async for response_chunk in process_document(self.agent_excutor, input_text): + yield response_chunk + + # 如果不是最后一个片段,插入分隔符 + if i < len(chunks) - 1: + yield "\n\n--- 下一部分 ---\n\n" + + # 清理内存 + gc.collect() + + except Exception as e: + # 错误处理 + yield f"处理文档时发生错误: {str(e)}" + + def _build_input_text(self, title: str, content: str, summary_type: str, + language: str, length: str, additional_instructions: str) -> str: + """构建输入文本""" + return f""" + 文档标题: {title} + 文档内容: {content} + 总结类型: {summary_type} + 输出语言: {language} + 最大长度: {length} + 额外要求: {additional_instructions} + """ + ``` + + +#### AI在线文档助手之全链路测试综合实战 + +* 全链路测试,回顾需求 + + * AI在线文档助手,提供文档内容获取、解析和智能总结功能 + * 支持多种文档格式(HTML、PDF等),并能够根据用户需求生成不同形式的文档总结。 + * 注意 + * 不同智能体之间,实现方式也有多样,企业中开发遵循当前团队即可 + * 不用局限一定的格式,智能体工程化这个不像传统的后端和前端,新技术刚出来,还在持续发展中 + * 代码有一定的优化空间和逻辑修复,大家可以进一步跟进自己的需求进行拓展,比如doc文档、其他类型提取等 + * 异常打印堆栈信息:`logger.exception("获取文档失败")` + +* 链路测试一 + * 选择网盘的文件,可以调用下载接口,拿到临时文件访问地址 + * 也可以直接通过minio获取临时的访问地址 + +* 链路测试二 + * 选择在线网站,非动态网站即可,HTML结尾的静态网站才可以提取 + * 比如 + * 短文本:https://www.runoob.com/python3/python3-tutorial.html + * 长文本: https://file.xdclass.net/video/2024/selenium/biji.html