From 69277bff4a68fc0749bd737f6e5d049c579672e3 Mon Sep 17 00:00:00 2001 From: Administrator Date: Wed, 14 May 2025 22:19:41 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=A6=96=E6=AC=A1=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2541 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 2438 insertions(+), 103 deletions(-) diff --git a/README.md b/README.md index 008c18b..c29842b 100644 --- a/README.md +++ b/README.md @@ -8198,7 +8198,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ | RedisStore | 高性能、支持分布式 | 需要运维Redis | 生产环境集群 | | InMemoryStore | 最快速度 | 重启丢失数据 | 临时测试 | -### 第三十九章 大模型必备技术Milvus向量数据库 +### 大模型必备技术Milvus向量数据库 #### 向量数据库介绍和技术选型思考 @@ -8214,7 +8214,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ * **相似度计算**:无法高效处理余弦相似度/Euclidean距离等复杂运算 * **实时性要求**:亿级向量场景下传统方案响应延迟高达秒级,暴力搜索时间复杂度O(N) - ``` + ```python // 传统关系型数据库查询(精确匹配) SELECT * FROM products WHERE category = 'electronics'; @@ -8283,8 +8283,6 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ * 其他:PostgreSQL、ElasticSearch等 - - * 综合选型对比 | 维度\产品 | Pinecone | Milvus | Qdrant | Chroma | @@ -8294,7 +8292,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ | **扩展能力** | 自动扩展 | 手动分片 | 自动分片 | 单机 | | **典型场景** | 生产级SaaS | 企业私有云 | 高性能需求 | 本地开发 | | **成本模型** | 按用量付费 | 基础设施+运维 | 基础设施+运维 | 免费 | - + * 技术选型建议 * **数据规模** @@ -8313,29 +8311,13 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ - **国产化需求**:腾讯云VectorDB或华为云等国产方案。 - **现有技术栈**:PostgreSQL/MongoDB扩展适合渐进式改造。 - - * **总结(花钱的多数更轻松方便)** * 向量数据库的选择需结合数据规模、业务场景、团队能力和成本预算综合评估。 * 对于AI驱动的应用(如RAG、多模态搜索),建议优先考虑云原生或分布式开源方案(如Milvus、Pinecone); * 传统业务系统可尝试数据库扩展插件以降低迁移成本,具体案例可参考各数据库的官方文档 - - - - - - - - - - - - -#### 第2集 向量数据库Milvus介绍和架构讲解 - -**简介: 向量数据库Milvus介绍和和架构讲解** +#### 向量数据库Milvus介绍和架构讲解 * 什么是Milvus向量数据库 @@ -8382,8 +8364,6 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ | **Index Node** | 负责索引构建与优化 | 支持后台异步构建索引 | | **Coordinator** | 集群元数据管理、任务调度 | 高可用部署(etcd存储元数据) | - - * 极速认知存储内容 * Collection 是一个二维表,具有固定的列和变化的行。 @@ -8392,17 +8372,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ ![Collection explained](/img/collection-explained.png) - - - - - - - - -#### 第3集 Milvus核心概念和数据结构讲解 - -**简介: Milvus核心概念和数据结构讲解** +#### Milvus核心概念和数据结构讲解 * **向量数据库对比关系型数据库** @@ -8420,7 +8390,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ - Schema 定义字段结构(主键、向量、标量字段),支持动态字段(Milvus 2.3+),自动生成唯一ID(可选) - 例子 - ``` + ```python pip install pymilvus==2.5.5 from pymilvus import FieldSchema, CollectionSchema, DataType @@ -8529,19 +8499,8 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ image-20250313170520819 - - - - - - - - - -#### 第4集 Milvus的分区-分片-段结构和最佳实践 - -**简介: Milvus的分区-分片-段结构和最佳实践** +#### Milvus的分区-分片-段结构和最佳实践 * 分区-分片-段 很多同学懵逼,用图书馆比喻理解三者关系 @@ -8560,8 +8519,6 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ * **作用**:优化存储空间,旧书盒可压缩归档 * **类比**:数据库将数据分块存储,便于后台合并优化 - - * 三者协作关系 0 @@ -8582,7 +8539,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ * 按业务维度划分(如用户ID、时间范围), 示例:`partition_2024Q1`, `vip_users` - ``` + ```python # 按商品类别创建分区 # 电子产品存入electronics分区 @@ -8620,7 +8577,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ * **合并优化** - ``` + ```python # 自动将小段合并成大段(类似HBase Compaction) [Segment1(100MB)] + [Segment2(100MB)] → [SegmentMerged(200MB)] ``` @@ -8637,7 +8594,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ - 按业务线分区:`user_profiles`, `product_info` - **错误示范**:创建超过1000个分区(影响元数据性能) - ``` + ```python # 好的实践:按时间分区 client.create_partition( collection_name="logs", @@ -8658,7 +8615,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ | 写入吞吐低 | 写入吞吐高 | | 易成性能瓶颈 | 资源消耗大 | - ``` + ```python # 创建集合时指定 collection = Collection( name="product_images", @@ -8682,28 +8639,15 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ * 设置段容量阈值:`storage.segmentSize=1024` (单位MB) * 根据数据特性调整 - ``` + ```python if 向量维度 > 1024: maxSize = 512 # 降段大小缓解内存压力 else: maxSize = 1024 ``` - - - - - - - - - - - -#### 第5集 Milvus部署架构选择和Docker部署实战 - -**简介: Milvus部署架构选择和Docker部署实战 ** +#### Milvus部署架构选择和Docker部署实战 * 部署架构选择 @@ -8717,7 +8661,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ * Milvus分层架构(Docker部署都包括了) - ``` + ```python ┌───────────────────────────────┐ │ Coordinator │ ← 管理元数据、负载均衡 ├───────────────┬───────────────┤ @@ -8730,12 +8674,8 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ * Milvus Standalone 是单机服务器部署,所有组件都打包到一个Docker 镜像中,部署方便。 * 此外,Milvus Standalone 通过主从复制支持高可用性。 - - * 有钱的当我没说,直接购买云厂商的服务:https://help.aliyun.com/zh/milvus - - * LInux服务器部署Milvus实战 * 阿里云网络安全组记得开放端口 `2379`、`9091`, `19530` @@ -8746,22 +8686,19 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ * 下载相关资料 ,使用**【wget】或者【浏览器】远程下载相关依赖包(需要替换群里最新的)** - ``` + ```python 原生资料下载方式(账号 - 密码 - ip地址 - 端口 需要替换群里最新的,【其他路径不变】) wget --http-user=用户名 --http-password=密码 http://ip:端口/dcloud_pan/standalone_embed.sh - - #比如 命令行下 wget --http-user=admin --http-password=xdclass.net888 http://47.115.31.28:9088/dcloud_pan/standalone_embed.sh - # 比如 浏览器直接访问 http://47.115.31.28:9088/dcloud_pan/standalone_embed.sh ``` - + * 解压后执行【**依赖很多,版本差异大,务必按照下面执行,否则课程无法进行下去,加我微信 xdclass6**】 - - ``` + + ```python #启动 bash standalone_embed.sh start @@ -8774,14 +8711,14 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ #升级 bash standalone_embed.sh upgrade ``` - + * 运行安装脚本后 - + - 一个名为 Milvus 的 docker 容器已在**19530** 端口启动。 - 嵌入式 etcd 与 Milvus 安装在同一个容器中,服务端口为**2379**。 - Milvus 数据卷被映射到当前文件夹中的**volumes/milvus** - 访问 `http://${MILVUS_PROXY_IP}:9091/webui` 例子: http://47.119.128.20:9091/webui/ - + * 注意 * Milvus Web UI 与 Attu等可视化工具 不同,它是一个内置工具,只是提供简单直观的界面,查看系统的基本信息 @@ -8790,25 +8727,8 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ * 参考:https://milvus.io/docs/zh/milvus-webui.md - - - - - - - - - - - - - - - -#### 第6集 Milvus可视化客户端安装实战 - -**简介: Milvus可视化客户端安装实战 ** +#### Milvus可视化客户端安装实战 * Attu 可视化客户端介绍 @@ -8845,9 +8765,2424 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){ +### Milvus向量数据库多案例实战 + +#### Python整合向量数据库Milvus案例实战 + +* Python操作Milvus实战 + + * 安装 Milvus Python SDK, 支持 Python、Node.js、GO 和 Java SDK。 + + * 建议安装与所安装 Milvus 服务器版本相匹配的 PyMilvus 版本 + + * 安装 + + ``` + pip install pymilvus==2.5.5 + ``` + + * 验证安装 如果 PyMilvus 安装正确,运行以下命令时不会出现异常 + + ``` + python -c "from pymilvus import Collection" + ``` + + * 接口可分为以下几类: + + * **DDL / DCL:**createCollection / createPartition / dropCollection / dropPartition / hasCollection / hasPartition + * **DML / Produce:**插入 / 删除 / 上移 + * **DQL:**搜索/查询 + +* 操作Milvus数据库 + + * 使用connect()连接 Milvus 服务器,进行相关操作 + + ```python + #也可以使用MilvusClient + #from pymilvus import MilvusClient + #client = MilvusClient("http://47.119.128.20:19530") + + from pymilvus import connections, db + conn = connections.connect(host="47.119.128.20", port=19530) + # 创建数据库 + #db.create_database("my_database") + # 使用数据库 + db.using_database("my_database") + # 列出数据库 + dbs = db.list_database() + print(dbs) + #['default', 'my_database'] + # 删除数据库 + db.drop_database("my_database") + ``` + +* Collection与Schema的创建和管理 + + * Collection 是一个二维表,具有固定的列和变化的行,每列代表一个字段,每行代表一个实体。 + + * 要实现这样的结构化数据管理,需要一个 Schema定义 Collections 的表结构 + + * 每个Schema由多个`FieldSchema`组成: + + ```python + from pymilvus import FieldSchema, DataType + + # 字段定义示例 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=128), + FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=50) + ] + ``` + + * 字段类型详解 + + | 数据类型 | 说明 | 示例 | + | :-------------: | :----------: | :---------------: | + | `INT8/16/32/64` | 整型 | `DataType.INT64` | + | `FLOAT` | 单精度浮点数 | `DataType.FLOAT` | + | `DOUBLE` | 双精度浮点数 | `DataType.DOUBLE` | + | `VARCHAR` | 变长字符串 | `max_length=255` | + | `FLOAT_VECTOR` | 浮点向量 | `dim=768` | + + * 创建collection实战 + + ```python + from pymilvus import connections + from pymilvus import FieldSchema, DataType + from pymilvus import CollectionSchema, Collection + + conn = connections.connect(host="47.119.128.20", port=19530) + # 步骤1:定义字段 + fields = [ + FieldSchema("id", DataType.INT64, is_primary=True), + FieldSchema("vector", DataType.FLOAT_VECTOR, dim=128), + FieldSchema("tag", DataType.VARCHAR, max_length=50) + ] + + # 步骤2:创建Schema + schema = CollectionSchema(fields, description="示例集合") + + # 步骤3:实例化Collection + collection = Collection( + name="demo_collection", + schema=schema, + shards_num=2 # 分片数(分布式扩展关键) + ) + ``` + + * 关键参数解析 + + | 参数 | 说明 | 推荐值 | + | :-----------: | :------------------------: | :--------------: | + | `shards_num` | 分片数量(创建后不可修改) | 集群节点数×2 | + | `description` | 集合描述信息 | 建议填写业务用途 | + + * 动态字段Schema + + * 在集合中启用动态字段后,所有未在 Schema 中定义的字段及其值都将作为键值对存储在动态字段中 + + ```python + # 启用动态字段(Milvus 2.3+) + schema = CollectionSchema( + fields, + enable_dynamic_field=True + ) + ``` + + * 案例讲解 + + * 假设 Collections Schema 只定义两个字段,名为`id` 和`vector` ,启用了动态字段,在 Collections 中插入以下数据集 + * 数据集包含 多个实体,每个实体都包括字段`id`,`vector`, 和`color` ,Schema 中没有定义`color` 字段。 + * 由于 Collections 启用了动态字段,因此字段`color` 将作为键值对存储在动态字段中。 + + ```python + [ + {id: 0, vector: [0.3580376395471989, -0.6023495712049978, 0.18414012509913835, -0.26286205330961354, 0.9029438446296592], color: "pink_8682"}, + {id: 7, vector: [-0.33445148015177995, -0.2567135004164067, 0.8987539745369246, 0.9402995886420709, 0.5378064918413052], color: "grey_8510"}, + {id: 8, vector: [0.39524717779832685, 0.4000257286739164, -0.5890507376891594, -0.8650502298996872, -0.6140360785406336], color: "white_9381"}, + {id: 9, vector: [0.5718280481994695, 0.24070317428066512, -0.3737913482606834, -0.06726932177492717, -0.6980531615588608], color: "purple_4976"} + ] + ``` + + | 类型 | 特点 | 适用场景 | + | :------------: | :---------------------------: | :----------------------------: | + | **静态Schema** | 严格字段定义 | 数据结构固定的业务(用户画像) | + | **动态Schema** | 允许灵活字段(需Milvus 2.3+) | 日志类多变数据 | + + + + + + + + + + + + + + + + + + + +#### 第2集 Milvus索引操作和最佳实践避坑指南 + +**简介: Milvus索引操作和最佳实践避坑指南** + +* 为什么需要索引? + + * 加速查询:避免暴力比对,快速定位相似向量, 平衡召回率与查询速度 + * 节省资源:减少内存占用和计算开销, 建议为经常访问的向量和标量创建索引 + + + +* 常见的索引类型 + + | 索引类型 | 适用场景 | 内存占用 | 精度 | 构建速度 | + | :------: | :----------------------: | :------: | :-----: | :------: | + | FLAT | 小数据精确搜索(<100万) | 高 | 100% | 快 | + | IVF_FLAT | 大数据平衡场景(千万级) | 中 | 95%-98% | 较快 | + | HNSW | 高召回率需求 | 高 | 98%-99% | 慢 | + | DISKANN | 超大规模(10亿+) | 低 | 90%-95% | 最慢 | + +* Milvus索引操作 + + * 创建索引 + + ``` + # 导入MilvusClient和DataType模块,用于连接Milvus服务器并操作数据类型 + from pymilvus import MilvusClient, DataType + + # 实例化MilvusClient以连接到指定的Milvus服务器 + client = MilvusClient( + uri="http://47.119.128.20:19530" + ) + + # 创建schema对象,设置自动ID生成和动态字段特性 + schema = MilvusClient.create_schema( + auto_id=False, + enable_dynamic_field=True, + ) + + # 向schema中添加字段"id",数据类型为INT64,作为主键 + schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True) + # 向schema中添加字段"vector",数据类型为FLOAT_VECTOR,维度为5 + schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=5) + + # 使用create_collection方法根据schema创建集合"customized_setup" + client.create_collection( + collection_name="customized_setup", + schema=schema, + ) + + # 准备索引参数,为"vector"字段创建索引 + index_params = MilvusClient.prepare_index_params() + + # 添加索引配置,指定字段名、度量类型、索引类型、索引名和参数 + index_params.add_index( + field_name="vector", + metric_type="COSINE", # 距离计算方式 (L2/IP/COSINE) + index_type="IVF_FLAT", + index_name="vector_index", + params={ "nlist": 128 } #聚类中心数 (建议值:sqrt(数据量)) + ) + + # 创建索引,不等待索引创建完成即返回 + client.create_index( + collection_name="customized_setup", + index_params=index_params, + sync=False # 是否等待索引创建完成后再返回。默认为True。 + ) + ``` + + * 参数说明 + + | 参数 | 参数 | + | ----------------- | ------------------------------------------------------------ | + | `field_name` | 指定字段名称 | + | `metric_type` | 用于衡量向量间相似性的算法。值有**IP**、**L2**、**COSINE**、**JACCARD**、**HAMMING**。只有当指定字段是向量字段时才可用。 | + | `index_type` | 索引类型 | + | `index_name` | 索引名称 | + | `params` | 指定索引类型的微调参数 | + | `collection_name` | Collections 的名称。 | + | `sync` | 控制与客户端请求相关的索引构建方式。有效值: `True` (默认):客户端等待索引完全建立后才返回。在该过程完成之前不会收到响应。`False`:客户端收到请求后立即返回,索引在后台建立 | + + * 查看索引信息 + + ``` + #列出索引名称 + res = client.list_indexes( + collection_name="customized_setup" + ) + print(res) + + #获取索引详细信息 + res = client.describe_index( + collection_name="customized_setup", + index_name="vector_index" + ) + print(res) + ``` + + * 删除索引 + + * 删除前需确保无查询正在使用该索引 + * 删除后需重新创建索引才能进行有效查询 + + ``` + #如果不再需要索引,可以直接将其删除。 + client.drop_index( + collection_name="customized_setup", + index_name="vector_index" + ) + print("索引已删除") + ``` + +* 最佳实践与避坑指南 + + * **Schema设计原则** + + - 主键选择 + - 推荐自增ID避免冲突 + - 禁止使用向量字段作为主键 + - **字段数量**:单个集合不超过32个字段 + - **向量维度**:创建后不可修改,需提前规划 + + * **索引选择策略**: + + - 百万级以下 → FLAT + - 百万到亿级 → IVF/HNSW + - 十亿级以上 → DISKANN + + * **操作规范**: + + - 数据插入完成后再建索引 + - 定期重建索引(数据变更超过30%) + - 为高频查询字段建立独立索引 + * **常见错误处理** + + | 错误场景 | 解决方案 | + | :--------------: | :----------------------------------: | + | "字段类型不匹配" | 检查插入数据与Schema定义的一致性 | + | "主键冲突" | 插入前检查ID唯一性,或使用自动生成ID | + | "向量维度错误" | 校验dim参数与数据实际维度 | + + + + + + + + + + + + + + + + + + + +#### 第3集 Milvus向量数据库的DML操作实战 + +**简介: Milvus向量数据库的DML操作实战** + +* 核心DML操作实战 + + * 创建集合(Collection),集合是Milvus中数据存储的基本单位,需定义字段和索引 + + * `auto_id=True`时无需手动指定主键 + * 动态字段(`enable_dynamic_field=True`)允许灵活扩展非预定义字段 + + ``` + # 导入MilvusClient和DataType模块,用于连接Milvus服务器并操作数据类型 + from pymilvus import MilvusClient, DataType + + # 实例化MilvusClient以连接到指定的Milvus服务器 + client = MilvusClient( + uri="http://47.119.128.20:19530" + ) + # 定义Schema + schema = client.create_schema(auto_id=False, enable_dynamic_field=True) + schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True) + schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=128) + schema.verify() # 验证Schema + + # 定义索引参数 + index_params = client.prepare_index_params() + index_params.add_index( + field_name="vector", + index_type="IVF_FLAT", # 量化索引,平衡速度与精度 + metric_type="L2", # 相似性度量标准(欧式距离) + params={"nlist": 1024} # 聚类中心数 + ) + + # 创建集合 + client.create_collection( + collection_name="my_collection", + schema=schema, + index_params=index_params + ) + ``` -#### + * 插入数据(Insert)支持单条或批量插入【可视化工具那边需要加载,包括查询等都是需要加载状态才可以操作】 + + ``` + data = [ + {"id": 1, "vector": [0.1]*128, "text": "Sample text 1"}, + {"id": 2, "vector": [0.2]*128, "text": "Sample text 2"} + ] + + # 插入数据 + insert_result = client.insert( + collection_name="my_collection", + data=data + ) + print("插入ID列表:", insert_result["ids"]) # 返回主键ID + ``` + + * 删除数据(Delete)通过主键或条件表达式删除 + + ``` + # 按主键删除 + client.delete( + collection_name="my_collection", + ids=[1, 2] # 主键列表 + ) + + # 按条件删除(如删除text字段为空的记录) + client.delete( + collection_name="my_collection", + filter="text == ''" + ) + ``` + + * 更新数据(Update)Milvus不支持直接更新,需通过“删除+插入”实现: + + ``` + # 删除旧数据 + client.delete(collection_name="my_collection", ids=[3]) + + # 插入新数据 + client.insert( + collection_name="my_collection", + data=[{"id": 3, "vector": [0.3]*128, "text": "Updated text"}] + ) + ``` + + + + + + + + + + + + + + + + + + + + + +#### 第4集 Milvus向量Search查询综合案例实战《上》 + +**简介: Milvus向量Search查询综合案例实战** + +* 需求说明 + + * 创建包含混合数据类型(标量+向量)的集合 + * 批量插入结构化和非结构化数据 + * 实现带过滤条件的混合查询 + * 验证端到端的向量搜索流程 + +* Search语法深度解析 + + * 核心参数说明 + + ``` + results = client 或 collection.search( + data=[[0.12, 0.23, ..., 0.88]], # 查询向量(必须) + anns_field="vector", # 要搜索的向量字段名(必须) + param={"metric_type": "L2", "params": {"nprobe": 10}}, # 搜索参数 + limit=10, # 返回结果数量 + expr="price > 50", # 过滤表达式(可选) + output_fields=["product_id", "price"], # 返回的字段 + ) + ``` + + | 参数 | 类型 | 说明 | 常用值 | + | :-----------: | :--: | :----------: | :--------------------------------------: | + | data | list | 查询向量列表 | 二维数组 | + | anns_field | str | 向量字段名 | 创建时定义的字段 | + | param | dict | 搜索参数 | 包含metric_type和params | + | limit | int | 返回结果数 | 5-100 | + | expr | str | 过滤条件 | price > 50 AND category == 'electronics' | + | output_fields | list | 返回字段 | ["field1", "field2"] | + +* 搜索案例实战(MilvusClient方式) + + * 准备数据 + + ``` + from pymilvus import ( + connections,MilvusClient, + FieldSchema, CollectionSchema, DataType, + Collection, utility + ) + import random + + # # 创建Milvus客户端 + client = MilvusClient( + uri="http://47.119.128.20:19530", + ) + + #删除已存在的同名集合 + if client.has_collection("book"): + client.drop_collection("book") + + # 定义字段 + fields = [ + FieldSchema(name="book_id", dtype=DataType.INT64, is_primary=True, auto_id=True), + FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=200), + FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=50), + FieldSchema(name="price", dtype=DataType.DOUBLE), + FieldSchema(name="book_intro", dtype=DataType.FLOAT_VECTOR, dim=4) + ] + + # 创建集合Schema + schema = CollectionSchema( + fields=fields, + description="Book search collection" + ) + + #创建集合 + client.create_collection(collection_name="book", schema=schema) + + # 生成测试数据 + num_books = 1000 + categories = ["科幻", "科技", "文学", "历史"] + titles = ["量子世界", "AI简史", "时光之轮", "文明起源", "未来简史", "数据科学"] + + data = [] + for i in range(num_books): + data.append({ + "title": f"{random.choice(titles)}_{i}", + "category": random.choice(categories), + "price": round(random.uniform(10, 100), 2), + "book_intro": [random.random() for _ in range(4)] # 4维向量 + }) + + # 批量插入 + insert_result = client.insert( + collection_name="book", + data=data + ) + + print(f"插入数据量:{len(insert_result['ids'])}") + ``` + + * 创建索引 + + ``` + # 准备索引参数,为"vector"字段创建索引 + index_params = MilvusClient.prepare_index_params() + + # 添加索引配置,指定字段名、度量类型、索引类型、索引名和参数 + index_params.add_index( + field_name="book_intro", + metric_type="L2", # 距离计算方式 (L2/IP/COSINE) + index_type="IVF_FLAT", + index_name="vector_index", + params={ "nlist": 128 } #聚类中心数 (建议值:sqrt(数据量)) + ) + + # 创建索引,不等待索引创建完成即返回 + client.create_index( + collection_name="book", + index_params=index_params + ) + print("索引创建完成") + ``` + + * 执行查询【执行查询前需要加载才可以使用】 + + ``` + client.load_collection(collection_name="book") # 加载集合到内存 + # 生成查询向量 + query_vector = [random.random() for _ in range(4)] + + # 执行带过滤条件的向量搜索 + results = client.search( + collection_name="book", + data=[query_vector], # 支持批量查询 + filter="category == '科幻' and price < 50", + output_fields=["title", "category", "price"], + limit=3, + search_params={"nprobe": 10} + ) + + # 解析结果 + print("\n科幻类且价格<50的搜索结果:") + for result in results[0]: # 第一个查询结果集 + print(f"ID: {result['book_id']}") + print(f"距离: {result['distance']:.4f}") + print(f"标题: {result['entity']['title']}") + print(f"价格: {result['entity']['price']:.2f}") + print("-" * 30) + ``` + + + + + + + + + + + + + + + +#### 第5集 Milvus向量Search查询综合案例实战《下》 + +**简介: Milvus向量Search查询综合案例实战** + +* 向量数据库完整工作流程示意图 + + ``` + 1. 创建集合Schema + ↓ + 2. 插入测试数据 + ↓ + 3. 创建向量索引 + ↓ + 4. 加载集合到内存 + ↓ + 5. 执行混合查询(向量+标量过滤) + ``` + +* 全量查询案例演示 + + * 测试是否有 output_fields 字段,返回结果的差异 + + ``` + # 案例1:基础向量查询 + basic_res = client.search( + collection_name="book", + data=[query_vector], + limit=5 + ) + + + # 案例2:分页查询 + page_res = client.search( + collection_name="book", + data=[query_vector], + offset=2, + limit=3 + ) + + # 案例3:批量查询 + batch_res = client.search( + collection_name="book", + data=[query_vector, [0.5]*4], # 同时查询两个向量,每个向量都会返回2条 + limit=2 + ) + ``` + +* 集合状态 + + ``` + # 验证集合状态 + print(client.describe_collection("book")) + + + # 索引状态检查 + print(client.list_indexes("book")) + ``` + +* 新旧版本对比表 + + | 功能 | PyMilvus旧版 | MilvusClient新版 | + | :----------- | :---------------------- | :---------------------------- | + | 连接管理 | 需要手动管理connections | 客户端自动管理 | + | 数据插入格式 | 多列表结构 | 字典列表 | + | 字段定义 | 使用FieldSchema | 在create_collection中直接定义 | + | 返回结果格式 | 对象属性访问 | 标准化字典格式 | + | 错误处理 | 异常类捕获 | 统一错误码系统 | + | 动态字段支持 | 需要额外配置 | 参数开启即可 | + + + + + + + + + + + + + + + +![logo](D:/小滴课堂AI/AI智能化云盘/笔记/img/image-20230918114907133-5008948.png) **愿景:"IT路上的持续充电平台,让技术不再难学"** +**更多高级课程请访问 xdclass.net** + +### 第四十一章 MMR搜索和LangChain整合Milvus实战 + + + +#### 第1集 相似度Similarity和MMR最大边界相关搜索 + +**简介: 相似度Similarity和MMR最大边界相关搜索** + +* 搜索的行业应用案例:电商推荐系统(明白两个差异) + + * **相似度搜索**: + + ``` + "用户点击商品A → 推荐相似商品B、C" + ``` + + * **MMR搜索**: + + ``` + "用户浏览历史多样 → 推荐跨品类商品" + ``` + + + +* 基础相似度搜索(Similarity Search) + + * **原理**:通过向量空间中的距离计算(余弦相似度/L2距离等)找出最接近目标向量的结果 + + 1 + + * 核心特点 + + * **纯向量驱动**:仅依赖向量空间中的几何距离,余弦相似度、L2距离 + * **结果同质化**:返回最相似的连续区域数据 + * **高性能**:时间复杂度 O(n + klogk) + + * 参数配置模板,方法 `vector_store.similarity_search( )` + + ``` + vector_store.as_retriever( + search_type="similarity", + search_kwargs={ + "k": 5, # 返回数量 + "score_threshold": 0.65, # 相似度得分的最低要求,相似度≥65%的才考虑 + "filter": "category == 'AI'", # 元数据过滤 + "param": { # Milvus专属参数 + "nprobe": 32, #nprobe是Milvus中用于控制搜索时访问的聚类数量的参数,nprobe越大,搜索越精确但耗时更长。 + "radius": 0.8 #是在范围搜索中使用的参数,指定搜索的半径范围,结合score_threshold可用于限定结果的范围 + } + } + ) + ``` + + * 典型应用场景 + + * 精确语义匹配(专利检索、论文查重) + + * 基于内容的推荐("更多类似内容") + + * 敏感信息过滤(高阈值精准匹配) + + + +* 最大边界相关搜索(MMR Search) + + * Maximal Marginal Relevance,最大边际相关性, 是一种信息检索和推荐系统中常用的算法 + * 核心目标是 在返回的结果中平衡相关性与多样性,避免返回大量高度相似的内容。 + * 设计初衷是解决传统相似性搜索(如余弦相似度)可能导致的“信息冗余”问题,在需要覆盖多角度信息或推荐多样化内容的场景中效果显著 + * **原理**:在相似度和多样性之间进行权衡,避免结果冗余 + + 2 + + * 算法原理图解 + + ``` + 初始候选集(fetch_k=20) + │ + ├── 相似度排序 + │ [1, 2, 3, ..., 20] + │ + └── 多样性选择(λ=0.5) + ↓ + 最终结果(k=5) + [1, 5, 12, 3, 18] # 兼顾相似与差异 + ``` + + * 参数配置模板, 方法 `vector_store.max_marginal_relevance_search( )` + + ``` + mmr_retriever = vector_store.as_retriever( + search_type="mmr", + search_kwargs={ + "k": 3, #最终返回的结果数量,MMR 会从更大的候选集中筛选出 3 个最相关且多样化的结果 + "fetch_k": 20, #指定 MMR 算法的候选集大小,fetch_k 越大,候选集越广,结果可能越多样,但计算成本更高 + "lambda_mult": 0.6, #控制相关性与多样性之间的权衡系数。范围为 [0, 1]:接近1 结果更相似。接近0:结果差异更大 + "param": { + "nprobe": 64, # 针对Milvus的IVF 类索引,控制搜索的聚类数量,搜索的聚类范围越广,召回率越高,但速度越慢 + "ef": 128 # Milvus HNSW索引,控制搜索的深度,值越大搜索越精确,耗时增加;值越小速度更快,可能漏掉相关结果 + } + } + ) + ``` + + * 关键参数解析 + + | 参数 | 相似度搜索 | MMR搜索 | 影响效果 | + | :-------------: | :------------: | :--------------: | :------------------------------------: | + | k | 控制结果数量 | 控制最终结果数量 | 值越大返回越多但可能降低精度 | + | lambda_mult | 无 | 0-1之间 | 值越大越偏向相关性,值越小越强调多样性 | + | score_threshold | 过滤低质量结果 | 通常不使用 | 阈值设置需根据embedding模型调整 | + | filter | 元数据过滤 | 支持同左 | 可结合业务维度进行筛选 | + + * 典型应用场景 + + * 多样化推荐:电商跨品类推荐 + * 知识发现:科研文献探索 + * 内容生成:生成多样化文案 + +* 对比决策矩阵 + + | 维度 | Similarity Search | MMR Search | + | :----------: | :----------------: | :-----------------: | + | **结果质量** | 高相似度但可能重复 | 多样性更佳 | + | **响应速度** | 平均 120ms | 平均 200-300ms | + | **内存消耗** | 低(仅存储topK) | 高(需缓存fetch_k) | + | **适用场景** | 精确匹配、去重 | 推荐系统、知识发现 | + | **可解释性** | 直观的相似度排序 | 综合评分需二次解释 | + +* 企业推荐系统架构示例 + +3 + + + + + + + + + + + + + + + +#### 第2集 新版LangChain向量数据库VectorStore设计 + +**简介: 新版LangChain向量数据库VectorStore设计** + +* LangChain 向量存储体系架构 + + * RAG系统核心设计模式 + + ``` + Document + │ + ▼ + Text Splitter + │ + ▼ + Embedding Model + │ + ▼ + [Milvus|Chroma|Pinecone...] ←→ VectorStore + ``` + + * LangChain设计抽象类`VectorStore`,统一接口,具体的实现由各自数据库负责 + + * 文档(如果过期就忽略) https://python.langchain.com/docs/integrations/vectorstores/ + * 安装依赖 `pip install langchain-milvus` + + ``` + from langchain_core.vectorstores import VectorStore + ``` + + 1 + +* VectorStore 核心方法详解 + + * 通用方法列表 + + | 方法名 | 作用描述 | 常用参数示例 | + | :------------------------------: | :--------------------: | :----------------------------: | + | from_documents() | 从文档创建向量库 | documents, embedding, **kwargs | + | add_documents() | 追加文档到已有库 | documents | + | similarity_search() | 相似度查询 | query, k=4 | + | similarity_search_with_score() | 带相似度得分的查询 | query, k=4 | + | max_marginal_relevance_search( ) | MMR最大边界搜索 | query, k=4 | + | as_retriever() | 转换为检索器供链式调用 | search_kwargs={} | + + * 初始化方法 + + ``` + @classmethod + def from_documents( + cls, + documents: List[Document], + embedding: Embeddings, + **kwargs + ) -> VectorStore: + """ + 文档自动转换存储 + :param documents: LangChain Document对象列表 + :param embedding: 文本向量化模型 + :param kwargs: 向量库特有参数 + :return: 初始化的VectorStore实例 + """ + ``` + + * 和`add_documents()`区别 + + | 特性 | `from_documents()` | `add_documents()` | + | :----------: | :----------------------: | :--------------------: | + | **方法类型** | 类方法(静态方法) | 实例方法 | + | **主要用途** | 初始化集合并批量插入文档 | 向已存在的集合追加文档 | + | **集合创建** | 自动创建新集合 | 要求集合已存在 | + | **性能消耗** | 高(需建索引+数据迁移) | 低(仅数据插入) | + | **典型场景** | 首次数据入库 | 增量数据更新 | + | **连接参数** | 需要完整连接配置 | 复用已有实例的配置 | + + * 数据插入方法 + + ``` + def add_texts( + self, + texts: Iterable[str], + metadatas: Optional[List[dict]] = None, + **kwargs + ) -> List[str]: + """ + 插入文本数据到向量库 + :param texts: 文本内容列表 + :param metadatas: 对应的元数据列表 + :return: 插入文档的ID列表 + """ + ``` + + * 相似性搜索方法 + + ``` + def similarity_search( + self, + query: str, + k: int = 4, + filter: Optional[dict] = None, + **kwargs + ) -> List[Document]: + """ + 执行相似性搜索 + :param query: 查询文本 + :param k: 返回结果数量 + :param filter: 元数据过滤条件 + :return: 匹配的Document列表 + """ + ``` + + * 最大边界相关算法(MMR) + + ``` + def max_marginal_relevance_search( + self, + query: str, + k: int = 4, + fetch_k: int = 20, + lambda_mult: float = 0.5 + ) -> List[Document]: + """ + 多样性增强搜索 + :param k: 返回结果数量 + :param fetch_k: 初始获取数量 + :param lambda_mult: 多样性权重(0-1) + """ + ``` + +* 不同向量数据库支持特性不一样 + + | 特性 | Milvus | FAISS | Pinecone | Chroma | + | :----------: | :----: | :---: | :------: | :----: | + | 分布式支持 | ✓ | ✗ | ✓ | ✗ | + | 元数据过滤 | ✓ | ✗ | ✓ | ✓ | + | 自动索引管理 | ✓ | ✗ | ✓ | ✓ | + | 本地运行 | ✓ | ✓ | ✗ | ✓ | + | 相似度算法 | 8种 | 4种 | 3种 | 2种 | + +* 场景:知识库冷启动 + +export_j1u9v + + + + + + + + + + + + + + + + + +#### 第3集 LangChain整合Milvus新增和删除实战 + +**简介: LangChain整合Milvus新增和删除实战** + +* 需求 + + * 使用LangChain整合向量数据库Milvus + * 实现新增和删除向量数据库的数据实战 + * 文档(如果过期就忽略) + * https://python.langchain.com/docs/integrations/vectorstores/milvus/ + +* 案例实战 + + * 准备数据 + + ``` + from langchain_community.embeddings import DashScopeEmbeddings + #from langchain.vectorstores import Milvus + from langchain_milvus import Milvus + from langchain_core.documents import Document + from uuid import uuid4 + + # 初始化模型 + embeddings = DashScopeEmbeddings( + model="text-embedding-v2", # 第二代通用模型 + max_retries=3, + dashscope_api_key="sk-005c3c25f6d042848b29d75f2f020f08" + ) + vector_store = Milvus( + embeddings, + connection_args={"uri": "http://47.119.128.20:19530"}, + collection_name="langchain_example", + ) + document_1 = Document( + page_content="I had chocolate chip pancakes and scrambled eggs for breakfast this morning.", + metadata={"source": "tweet"}, + ) + document_2 = Document( + page_content="The weather forecast for tomorrow is cloudy and overcast, with a high of 62 degrees.", + metadata={"source": "news"}, + ) + document_3 = Document( + page_content="Building an exciting new project with LangChain - come check it out!", + metadata={"source": "tweet"}, + ) + document_4 = Document( + page_content="Robbers broke into the city bank and stole $1 million in cash.", + metadata={"source": "news"}, + ) + document_5 = Document( + page_content="Wow! That was an amazing movie. I can't wait to see it again.", + metadata={"source": "tweet"}, + ) + document_6 = Document( + page_content="Is the new iPhone worth the price? Read this review to find out.", + metadata={"source": "website"}, + ) + document_7 = Document( + page_content="The top 10 soccer players in the world right now.", + metadata={"source": "website"}, + ) + document_8 = Document( + page_content="LangGraph is the best framework for building stateful, agentic applications!", + metadata={"source": "tweet"}, + ) + document_9 = Document( + page_content="The stock market is down 500 points today due to fears of a recession.", + metadata={"source": "news"}, + ) + document_10 = Document( + page_content="I have a bad feeling I am going to get deleted :(", + metadata={"source": "tweet"}, + ) + documents = [ + document_1,document_2,document_3,document_4,document_5,document_6, + document_7,document_8,document_9,document_10, + ] + ``` + + * 插入 + + ``` + ids = [ str(i+1) for i in range(len(documents))] + print(ids) + result = vector_store.add_documents(documents=documents, ids=ids) + print(result) + ``` + + * 删除 + + ``` + result = vector_store.delete(ids=["1"]) + print(result) + #(insert count: 0, delete count: 1, upsert count: 0, timestamp: 456798840753225732, success count: 0, err count: 0 + ``` + + + + + + + + + + + +#### 第4集 LangChain实战MMR和相似性搜索实战 + +**简介: LangChain实战MMR和相似性搜索实战** + +* 需求 + + * 使用LangChain整合向量数据库Milvus + * 测试相关搜索:相似度搜索和MMR搜索 + +* 案例实战 + + * 准备数据【**执行多次有多条重复记录,向量数据库不会去重,方便测试MMR**】 + + ``` + from langchain_community.embeddings import DashScopeEmbeddings + #from langchain.vectorstores import Milvus + from langchain_milvus import Milvus + from langchain_core.documents import Document + + # 初始化模型 + embeddings = DashScopeEmbeddings( + model="text-embedding-v2", # 第二代通用模型 + max_retries=3, + dashscope_api_key="sk-005c3c25f6d042848b29d75f2f020f08" + ) + + document_1 = Document( + page_content="LangChain支持多种数据库集成,小滴课堂的AI大课", + metadata={"source": "xdclass.net/doc1"}, + ) + document_2 = Document( + page_content="Milvus擅长处理向量搜索,老王的课程不错", + metadata={"source": "xdclass.net/doc2"}, + ) + document_3 = Document( + page_content="我要去学小滴课堂的架构大课", + metadata={"source": "xdclass.net/doc3"}, + ) + document_4 = Document( + page_content="今天天气不错,老王和老帆去按摩了", + metadata={"source": "xdclass.net/doc4"}, + ) + documents = [document_1,document_2,document_3,document_4] + vector_store = Milvus.from_documents( + documents=documents, + embedding=embeddings, + collection_name="mmr_test", + connection_args={"uri": "http://47.119.128.20:19530"} + ) + ``` + + * 相似性搜索(向量数据库插入多个重复数据,看是否会返回一样的) + + ``` + # 相似性搜索 + query = "如何进行数据库集成?" + results = vector_store.similarity_search(query, k=2) + for doc in results: + print(f"内容:{doc.page_content}\n元数据:{doc.metadata}\n") + + # 混合搜索(结合元数据过滤) + results = vector_store.similarity_search( + query, + k=2, + expr='source == "xdclass.net/doc1"' + ) + + print(results) + ``` + + * MMR搜索(跨类搭配,向量数据库插入多个重复数据,看是否会返回一样的) + + ``` + # MMR推荐(跨类搭配) + diverse_results = vector_store.max_marginal_relevance_search( + query="如何进行数据库集成", + k=2, + fetch_k=10, + lambda_mult=0.4, + # expr="category in ['shoes', 'clothes', 'accessories']", + search_params={ + "metric_type": "IP", + "params": {"nprobe": 32} + } + ) + + print(diverse_results) + ``` + + + + + + + + + + + + + + + + + +![logo](D:/小滴课堂AI/AI智能化云盘/笔记/img/image-20230918114907133-5008948.png) **愿景:"IT路上的持续充电平台,让技术不再难学"** +**更多高级课程请访问 xdclass.net** + +### 第四十二章 Retrievers检索器+RAG文档助手项目实战 + + + +#### 第1集 LangChain检索器Retrievers案例实战 + +**简介: LangChain检索器Retrievers案例实战** + +* 什么是`Retriever` + + * **统一接口**:标准化检索流程,无论数据来源如何,最终输出`Document`对象列表。 + + * **多源混合检索**:支持同时查询向量库、传统数据库和搜索引擎【提高召回率】 + + * **与VectorStore的关系**:Retriever不直接管理存储,依赖VectorStore(如FAISS、Chroma)实现向量化与检索。 + + * **RAG中的角色**:作为检索增强生成(RAG)流程的“数据入口”,为生成模型提供精准上下文 + + data_connection_diagram + + * 有多个实现:VectorStoreRetriever、MultiQueryRetriever、SelfQueryRetriever等 + + * 特点 + + * **模块化设计**:支持插件式扩展,可自定义检索算法(如混合搜索、重排序)。 + * **异步支持**:通过`async_get_relevant_documents`实现高并发场景下的高效检索。 + * **链式调用**:可与LangChain的其他组件(如Text Splitters、Memory)无缝集成。 + + ``` + # from langchain_core.retrievers import BaseRetriever + ``` + + * 补充知识点:召回率(Recall)信息检索和机器学习中衡量模型找全相关结果能力的核心指标 + + * 比如 + * 在文档检索中,如果有100篇相关文档,系统找出了80篇,那么召回率就是80%。 + * 召回率高意味着系统漏掉的少,但可能夹杂了不相关的结果,这时候准确率可能低。 + +* Retriever常见类型之基础检索器 `VectorStoreRetriever` + + * 基础使用参考案例 + + ``` + #将文档嵌入为向量,通过相似度计算(如余弦相似度)检索 + from langchain_community.vectorstores import FAISS + retriever = FAISS.from_documents(docs, embeddings).as_retriever( + search_type="mmr", # 最大边际相关性 + search_kwargs={"k": 5, "filter": {"category": "news"}} + ) + ``` + + * `as_retriever()` 方法介绍 + + * 将向量库实例转换为检索器对象,实现与 LangChain 链式调用(如 `RetrievalQA`)的无缝对接。 + + * 源码 + + ``` + def as_retriever(self, **kwargs: Any) -> VectorStoreRetriever: + tags = kwargs.pop("tags", None) or [] + self._get_retriever_tags() + return VectorStoreRetriever(vectorstore=self, tags=tags, **kwargs) + """ + 向量库实例转换为检索器对象,实现与 LangChain 链式调用 + """ + ``` + + * 关键参数详解 + + * search_type 搜索类型 + + | 类型 | 适用场景 | Milvus 对应操作 | + | :----------------------------: | :------------: | :-------------------------------: | + | `"similarity"` | 基础相似度检索 | `search()` | + | `"mmr"` | 多样性结果优化 | `max_marginal_relevance_search()` | + | `"similarity_score_threshold"` | 阈值过滤检索 | `search()` + `score_threshold` | + + ``` + # MMR 检索配置示例 + mmr_retriever = vector_store.as_retriever( + search_type="mmr", + search_kwargs={ + "k": 3, + "fetch_k": 20, + "lambda_mult": 0.5 + } + ) + ``` + + * search_kwargs 参数详解 + + | 参数 | 类型 | 默认值 | 说明 | + | :-------------: | :------: | :----: | :-----------------: | + | `k` | int | 4 | 返回结果数量 | + | `filter`/`expr` | str/dict | None | 元数据过滤条件 | + | `param` | dict | None | Milvus 搜索参数配置 | + +* 综合案例实战 + + * 默认是similarity search + + ```` + from langchain_community.embeddings import DashScopeEmbeddings + from langchain_milvus import Milvus + from langchain_core.documents import Document + + # 初始化模型 + embeddings = DashScopeEmbeddings( + model="text-embedding-v2", # 第二代通用模型 + max_retries=3, + dashscope_api_key="sk-005c3c25f6d042848b29d75f2f020f08" + ) + + document_1 = Document( + page_content="LangChain支持多种数据库集成,小滴课堂的AI大课", + metadata={"source": "xdclass.net/doc1"}, + ) + document_2 = Document( + page_content="Milvus擅长处理向量搜索,老王的课程不错", + metadata={"source": "xdclass.net/doc2"}, + ) + document_3 = Document( + page_content="我要去学小滴课堂的架构大课", + metadata={"source": "xdclass.net/doc3"}, + ) + document_4 = Document( + page_content="今天天气不错,老王和老帆去按摩了", + metadata={"source": "xdclass.net/doc4"}, + ) + documents = [document_1,document_2,document_3,document_4] + vector_store = Milvus.from_documents( + documents=documents, + embedding=embeddings, + collection_name="retriever_test1", + connection_args={"uri": "http://47.119.128.20:19530"} + ) + + #默认是 similarity search + retriever = vector_store.as_retriever(search_kwargs={"k": 2}) + + results = retriever.invoke("如何进行数据库操作") + + for result in results: + print(f"内容 {result.page_content} 元数据 {result.metadata}") + + ```` + + * 可以调整为MMR检索 + + ``` + retriever = vector_store.as_retriever(search_type="mmr",search_kwargs={"k": 2}) + ``` + + + + + + + + + + + + + + + +#### 第2集 大厂面试题-如何提升大模型召回率和实战 + +**简介: 大厂面试题-如何提升大模型召回率和案例实战** + +* **LLM大模型开发高频面试题:如何提升大模型召回率和准确率?** + + image-20250325144937629 + +* 需求背景 + + * 当原始查询不够明确时,或者当文档库中的内容使用不同的术语表达同一概念时 + * 单一查询可能无法有效检案到所有相关内容; + * 或者用户的问题可能有不同的表达方式,导致的检索结果不理想, + * 需要从多个角度切入才能找到最相关的文档片段。这种情况下,生成多个变体查询可以提高召回率,确保覆盖更多相关文档。 + +* `MultiQueryRetriever` + + * 通过生成多个相关查询来增强检索效果,解决单一查询可能不够全面或存在歧义的问题。 + + * 原理: + + * **查询扩展技术**:通过LLM生成N个相关查询(如改写、扩展、翻译),合并结果去重,生成多个变体查询 + * **双重增强效果**:提升召回率(+25%↑)和准确率(+18%↑)的平衡 + + ![1](D:/小滴课堂AI/AI智能化云盘/笔记/img/1-2885071.png) + + * 用法 + + ``` + retriever = MultiQueryRetriever.from_llm( + retriever=base_retriever, + llm=ChatOpenAI() + ) + ``` + + * **典型问题场景** + + - **术语差异问题**:用户提问使用"SSL证书" vs 文档中使用"TLS证书" + - **表述模糊问题**:"怎么备份数据库" vs "数据库容灾方案实施步骤" + - **多语言混合**:中英文混杂查询(常见于技术文档检索) + - **专业领域知识**:医学问诊中的症状不同描述方式 + +* 案例实战 + + ``` + from langchain_community.embeddings import DashScopeEmbeddings + #from langchain.vectorstores import Milvus + from langchain_milvus import Milvus + from langchain_openai import ChatOpenAI + + from langchain_community.document_loaders import TextLoader + from langchain.retrievers.multi_query import MultiQueryRetriever + from langchain_text_splitters import RecursiveCharacterTextSplitter + + import logging + + # 设置日志记录的基本配置 + logging.basicConfig() + # 设置多查询检索器的日志记录级别为INFO + logging.getLogger("langchain.retrievers.multi_query").setLevel(logging.INFO) + + # 使用TextLoader加载文本数据 + loader = TextLoader("data/qa.txt") + # 加载数据到变量中 + data = loader.load() + + # 初始化文本分割器,将文本分割成小块 + text_splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=10) + # 执行文本分割 + splits = text_splitter.split_documents(data) + + # 初始化模型 + embedding = DashScopeEmbeddings( + model="text-embedding-v2", # 第二代通用模型 + max_retries=3, + dashscope_api_key="sk-005c3c25f6d042848b29d75f2f020f08" + ) + + # 初始化向量数据库 + vector_store = Milvus.from_documents( + documents=splits, + embedding=embedding, + collection_name="mulit_retriever2", + connection_args={"uri": "http://47.119.128.20:19530"} + ) + + # 定义问题 + question = "老王不知道为啥抽筋了" + # 初始化语言模型 + llm = ChatOpenAI( + model_name = "qwen-plus", + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", + api_key="sk-005c3c25f6d042848b29d75f2f020f08", + temperature=0.7 + ) + + # 从语言模型中创建多查询检索器 + retriever_from_llm = MultiQueryRetriever.from_llm( + retriever=vector_store.as_retriever(), llm=llm + ) + + # 使用检索器执行问题检索 + results = retriever_from_llm.invoke(question) + # 打印检索到的结果数量 + len(results) + + # 遍历并打印每个检索结果的内容和元数据 + for result in results: + print(f"内容 {result.page_content} 元数据 {result.metadata}") + + + # 以下是未使用的代码片段,已将其注释掉 + # vector_store = Milvus( + # embeddings, + # connection_args={"uri": "http://47.119.128.20:19530"}, + # collection_name="mmr_test", + # ) + # print(vector_store) + ``` + + + + + + + + + + + + + +#### 第3集 RAG综合项目实战-AI文档问答助手《上》 + +**简介: RAG综合项目实战-AI文档问答助手《上》** + +* **需求:在线文档的问答助手,方便查找相关手册和接口API** + + * 主要功能包括 + * 文档加载与切分、向量嵌入生成、向量存储与检索。 + * 基于检索增强生成(Retrieval-Augmented Generation, RAG)的问答。 + * 技术选型:LangChain框架+Milvus向量数据库 + +* **实现的功能** + + * 文档加载与切分 + * 使用`WebBaseLoader`从指定URL加载文档。 + * 使用`RecursiveCharacterTextSplitter`将加载的文档按照指定的块大小和重叠大小进行切分。 + + * 向量嵌入生成 + - 使用`DashScopeEmbeddings`生成文档切片的向量嵌入,模型为`text-embedding-v2`,支持最大重试次数为3次。 + + + * 向量存储与检索 + + - 使用`Milvus`作为向量数据库,创建名为`doc_qa_db`的Collection。 + + - 将生成的向量嵌入存储到Milvus中,并支持相似性检索。 + + + * 基于RAG的问答 + + - 初始化`ChatOpenAI`模型,使用`qwen-plus`作为LLM模型。 + + - 定义`PromptTemplate`,用于构建输入给LLM的提示信息。 + + - 构建RAG链,结合相似性检索结果和LLM生成回答。 + + +* 编码实战 + + ``` + from langchain_community.document_loaders import WebBaseLoader + from langchain.text_splitter import RecursiveCharacterTextSplitter + from langchain_milvus import Milvus + + from langchain.schema.runnable import RunnablePassthrough + from langchain.prompts import PromptTemplate + from langchain_community.embeddings import DashScopeEmbeddings + from langchain_openai import ChatOpenAI + + # 设置Milvus Collection名称。 + COLLECTION_NAME = 'doc_qa_db' + + # 初始化WebBaseLoader加载指定URL的文档。 + loader = WebBaseLoader([ + 'https://milvus.io/docs/overview.md', + 'https://milvus.io/docs/release_notes.md' + ]) + + # 加载文档。 + docs = loader.load() + + # 初始化RecursiveCharacterTextSplitter,用于切分文档。 + text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=0) + + # 使用LangChain将输入文档按照chunk_size切分。 + all_splits = text_splitter.split_documents(docs) + + # 初始化DashScopeEmbeddings,设置embedding模型为DashScope的text-embedding-v2。 + embeddings = DashScopeEmbeddings( + model="text-embedding-v2", # 第二代通用模型 + max_retries=3, + dashscope_api_key="sk-005c3c25f6d042848b29d75f2f020f08" + ) + + # 创建connection,为阿里云Milvus的访问域名。 + connection_args = {"uri": "http://47.119.128.20:19530"} + # 创建Collection。 + vector_store = Milvus( + embedding_function=embeddings, + connection_args=connection_args, + collection_name=COLLECTION_NAME, + drop_old=True, + ).from_documents( + all_splits, + embedding=embeddings, + collection_name=COLLECTION_NAME, + connection_args=connection_args, + ) + + # vector_store = Milvus( + # embeddings, + # connection_args={"uri": "http://47.119.128.20:19530"}, + # collection_name=COLLECTION_NAME, + # ) + + # 利用Milvus向量数据库进行相似性检索。 + query = "What are the main components of Milvus?" + docs = vector_store.similarity_search(query) + print(len(docs)) + + # 初始化ChatOpenAI模型。 + llm = ChatOpenAI( + model_name = "qwen-plus", + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", + api_key="sk-005c3c25f6d042848b29d75f2f020f08", + temperature=0.7 + ) + + # 将上述相似性检索的结果作为retriever,提出问题输入到LLM之后,获取检索增强之后的回答。 + retriever = vector_store.as_retriever() + ``` + + + + + + + + + + + +#### 第4集 RAG综合项目实战-AI文档问答助手《下》 + +**简介: RAG综合项目实战-AI文档问答助手《下》** + +* 编码测试实战 + + ``` + # 定义PromptTemplate,用于构建输入给LLM的prompt。 + template = """你是AI文档助手,使用以下上下文来回答最后的问题。 + 如果你不知道答案,就说你不知道,不要试图编造答案。 + 最多使用10句话,并尽可能简洁地回答。总是在答案的末尾说“谢谢你的提问!”. + {context} + 问题: {question} + """ + + rag_prompt = PromptTemplate.from_template(template) + + # 构建Retrieval-Augmented Generation链。 + rag_chain = ( + {"context": retriever, "question": RunnablePassthrough()} + | rag_prompt + | llm + ) + + # 调用rag_chain回答问题。 + print(rag_chain.invoke("什么是Milvus.")) + ``` + +* 应用场景 + + * 教育领域:可用于备课笔记、课程内容总结等场景。 + * 企业知识库:帮助企业快速构建基于内部文档的知识问答系统。 + * 技术支持:提供技术文档的智能检索与问答服务。 + +* 扩展方向 + + * 支持更多类型的文档加载器(如PDF、Word等)。 + * 增加多语言支持。 + * 优化向量嵌入生成与检索效率 + +* 大家的疑惑点(下一章讲解) + + ``` + # 构建Retrieval-Augmented Generation链。 + rag_chain = ( + {"context": retriever, "question": RunnablePassthrough()} + | rag_prompt + | llm + ) + ``` + + * 在 `rag_chain` 的定义中,`{"context": retriever, "question": RunnablePassthrough()}` 创建了一个输入字典。 + * `context` 的值来自 `retriever`,它将使用向量存储检索与问题相关的文档片段。 + * `question`键的值通过 `RunnablePassthrough()` 直接传递,用户输入的问题会被透传到后续的处理步骤。 + * 输入字典随后会被传递给 `rag_prompt`,构建最终的提示(prompt)被传递给 `llm`(语言模型),生成最终的回答 + * 总结: + * 用户输入的问题会同时传给`retriever`和`RunnablePassthrough()` + * `retriever`完成检索后,会自动把结果赋值给`context`。 + * 检索结果`context`和用户输入`question`一并传给提示模板`prompt_template`。 + + + + + + + + + + + + + + + +![logo](D:/小滴课堂AI/AI智能化云盘/笔记/img/image-20230918114907133-5008948.png) **愿景:"IT路上的持续充电平台,让技术不再难学"** +**更多高级课程请访问 xdclass.net** + +### 第四十三章 Runnable深度解析和多实现类案例实战 + + + +#### 第1集 LangChain核心之Runnable接口底层实现 + +**简介: LangChain核心之Runnable接口底层实现** + +* 什么是`Runnable`接口 + + * 是LangChain框架中所有组件的核心抽象接口,用于封装可执行的逻辑单元(如模型调用、数据处理、API集成等)。 + + * 通过实现统一的`invoke`、`batch`、`stream`等方法,支持模块化构建链式任务,允许开发者以声明式编程LCEL串联不同组件 + + ``` + from langchain_core.runnables + ``` + + * 为什么要使用`Runnable` + + - **统一接口**:所有组件(如Prompt模板、模型、解析器)均实现Runnable接口,确保类型兼容和链式调用的无缝衔接 + - **灵活组合**:通过管道符`|`将多个Runnable串联成链,简化复杂逻辑的编排,类似数据流处理 + - **动态配置**:支持运行时参数绑定、组件替换和错误恢复机制(如`with_retry()`),提升系统灵活性和鲁棒性 + - **异步与性能优化**:内置异步方法(如`ainvoke`)和并行处理(如`RunnableParallel`),适用于高并发场景 + +* 什么是`RunnableSequence` + + * 是LangChain中用于构建**顺序执行链**的核心组件,通过管道符`|`将多个Runnable串联,形成线性执行流程,是`Runnable`子类 + + ``` + from langchain_core.runnables import RunnableSequence + ``` + + * 执行 LCEL链调用的方法(invoke/stream/batch),链中的每个组件也调用对应的方法,将输出作为下一个组件的输入 + + ``` + #RunnableSequence.invoke 的源码解读 + + def invoke( + self, input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any + ) -> Output: + # invoke all steps in sequence + try: + for i, step in enumerate(self.steps): + # mark each step as a child run + config = patch_config( + config, callbacks=run_manager.get_child(f"seq:step:{i + 1}") + ) + with set_config_context(config) as context: + if i == 0: + input = context.run(step.invoke, input, config, **kwargs) + else: + input = context.run(step.invoke, input, config) + ``` + +* 从LECL表达式开始理解 + + ``` + chain = prompt | model | output_parser # 通过|直接连接 + ``` + + * **数据流传递** + + * 每个Runnable的输出作为下一个Runnable的输入,形成单向数据流。 + * 例如,若链为`A | B | C`,则执行流程为`A的输出 → B的输入 → B的输出 → C的输入` + + * **统一接口**: + + * 所有组件(如Prompt模板、模型、输出解析器)均实现`Runnable`接口,确保类型兼容性和链式调用的无缝衔接 + + * **延迟执行**: + + * 链的构建仅定义逻辑关系,实际执行在调用`invoke`或`stream`时触发,支持动态参数绑定和运行时配置 + + * **底层实现**: + + * 管道符`|`在Python中被重写为`__or__`方法,实际调用`RunnableSequence`构造函数, + + * 将多个Runnable存入内部列表`steps`中, 执行时按顺序遍历列表并调用每个Runnable的`invoke`方法 + + + +* Runnable接口定义了以下核心方法,支持多种执行模式 + + ``` + class Runnable(Generic[Input, Output]): + #处理单个输入,返回输出。 + def invoke(self, input: Input) -> Output: ... + #异步处理单个输入。 + async def ainvoke(self, input: Input) -> Output: ... + #逐块生成输出,适用于实时响应。 + def stream(self, input: Input) -> Iterator[Output]: ... + #批量处理输入列表,提升吞吐量。 + def batch(self, inputs: List[Input]) -> List[Output]: ... + ``` + + | 方法 | 说明 | 使用场景 | + | :---------: | :----------: | :----------: | + | `invoke()` | 同步执行 | 单次调用 | + | `batch()` | 批量同步执行 | 处理数据集 | + | `stream()` | 流式输出 | 实时生成文本 | + | `ainvoke()` | 异步执行 | Web服务集成 | + +* 具有多个子类实现 + + | 组件 | 特点 | 适用场景 | + | :-------------------: | :------: | :------------: | + | `RunnableSequence` | 顺序执行 | 线性处理流水线 | + | `RunnableBranch` | 条件路由 | 分支选择逻辑 | + | `RunnableParallel` | 并行执行 | 多任务独立处理 | + | `RunnablePassthrough` | 数据透传 | 保留原始输入 | + + + + + + + + + +#### 第2集 RunnablePassthrough介绍和透传参数实战 + +**简介: RunnablePassthrough介绍和透传参数实战** + +* `RunnablePassthrough` + + * 核心功能:用于在链中直接传递输入数据,不进行任何修改,或通过 `.assign()` 扩展上下文字段 + + ![2](D:/小滴课堂AI/AI智能化云盘/笔记/img/2-2902256.png) + + * 应用场景: + + - 保留原始输入数据供后续步骤使用。 + - 动态添加新字段到上下文中(如结合检索结果与用户问题) + + * 基础用法 + + ``` + from langchain_core.runnables import RunnablePassthrough + + # 直接传递输入 + chain = RunnablePassthrough() | model + output = chain.invoke("Hello") + ``` + + * 扩展字段案例 + + * 案例一 + + ``` + # 使用 assign() 添加新字段 + from langchain_core.runnables import RunnablePassthrough + + # 使用 assign() 方法添加新字段,该方法接收一个关键字参数,其值是一个函数 + # 这个函数定义了如何处理输入数据以生成新字段 + # 在这个例子中,lambda 函数接收一个输入 x,并返回 x["num"] * 2 的结果 + # 这将创建一个新的字段 'processed',其值是输入字段 'num' 的两倍 + chain = RunnablePassthrough.assign(processed=lambda x: x["num"] * 2) + + # 调用 chain 对象的 invoke 方法,传入一个包含 'num' 字段的字典 + # 这将执行之前定义的 lambda 函数,并在输入字典的基础上添加 'processed' 字段 + # 最后输出处理后的字典 + output = chain.invoke({"num": 3}) # 输出 {'num':3, 'processed':6} + print(output) + ``` + + * 案例二(伪代码) + + ``` + # 构建包含原始问题和处理上下文的链 + chain = ( + RunnablePassthrough.assign( + context=lambda x: retrieve_documents(x["question"]) + ) + | prompt + | llm + ) + + # 输入结构 + input_data = {"question": "LangChain是什么?"} + response = chain.invoke(input_data) + ``` + + + + * 透传参数LLM案例实战 + + * 用户输入的问题会同时传给`retriever`和`RunnablePassthrough()` + * `retriever`完成检索后,会自动把结果赋值给`context`。 + * 检索结果`context`和用户输入`question`一并传给提示模板`prompt_template`。 + * **输出**:模型根据检索到的上下文生成答案 + + ``` + from langchain_community.embeddings import DashScopeEmbeddings + from langchain_milvus import Milvus + from langchain_core.documents import Document + from langchain_core.runnables import RunnablePassthrough + from langchain_core.prompts import ChatPromptTemplate + from langchain_openai import ChatOpenAI + + # 初始化模型 + embeddings = DashScopeEmbeddings( + model="text-embedding-v2", # 第二代通用模型 + max_retries=3, + dashscope_api_key="sk-005c3c25f6d042848b29d75f2f020f08" + ) + document_1 = Document( + page_content="LangChain支持多种数据库集成,小滴课堂的AI大课", + metadata={"source": "xdclass.net/doc1"}, + ) + document_2 = Document( + page_content="Milvus擅长处理向量搜索,老王的课程不错", + metadata={"source": "xdclass.net/doc2"}, + ) + + documents = [document_1,document_2] + vector_store = Milvus.from_documents( + documents=documents, + embedding=embeddings, + collection_name="runnable_test", + connection_args={"uri": "http://47.119.128.20:19530"} + ) + + #默认是 similarity search + retriever = vector_store.as_retriever(search_kwargs={"k": 2}) + + prompt = ChatPromptTemplate.from_template("基于上下文回答:{context}\n问题:{question}") + + #定义模型 + model = ChatOpenAI( + model_name = "qwen-plus", + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", + api_key="sk-005c3c25f6d042848b29d75f2f020f08", + temperature=0.7 + ) + + chain = { + "context": retriever, + "question": RunnablePassthrough() # 直接传递用户问题 + } | prompt | model + + result = chain.invoke("LangChain支持数据库吗") + print(result) + ``` + + + + + + + +#### 第3集 AI智能推荐实战之RunnableParallel并行链 + +**简介: AI智能推荐实战之RunnableParallel并行链** + +* `RunnableParallel` 介绍 + + * 并行执行多个 Runnable,合并结果为一个字典,键为子链名称,值为对应输出 + + ``` + class RunnableParallel(Runnable[Input, Dict[str, Any]]): + """ + 并行执行多个Runnable的容器类 + 输出结果为字典结构:{key1: result1, key2: result2...} + """ + ``` + + * 在 LCEL 链上,会将字典隐形转换为`RunnableParallel` + + ``` + multi_retrieval_chain = ( + RunnableParallel({ + "context1": retriever1, #数据源一 + "context2": retriever2, #数据源二 + "question": RunnablePassthrough() + }) + | prompt_template + | llm + | outputParser + ) + ======= 自动化转换为下面,写法一样 ======== + multi_retrieval_chain = ( + { + "context1": retriever1, #数据源一 + "context2": retriever2, #数据源二 + "question": RunnablePassthrough() + } + | prompt_template + | llm + | outputParser + ) + ``` + + * 特点 + + | 特性 | 说明 | 示例 | + | :----------: | :--------------------: | :------------------------: | + | **并行执行** | 所有子Runnable同时运行 | 3个任务耗时2秒(而非累加) | + | **类型安全** | 强制校验输入输出类型 | 自动检测字典字段类型 | + + 1 + + * API 与用法, 构造函数所有子链接收相同的输入 + + ``` + from langchain_core.runnables import RunnableParallel + + runnable = RunnableParallel( + key1=chain1, + key2=chain2 + ) + ``` + + * 应用场景: + + * **数据并行处理器**:同时处理多个数据流 + + * **结构化数据装配器**:构建标准化的输出格式 + + * **流水线分叉合并器**:实现Map-Reduce模式中的Map阶段 + + * 举例 + + * 多维度数据分析 + + ``` + analysis_chain = RunnableParallel({ + "sentiment": sentiment_analyzer, + "keywords": keyword_extractor, + "entities": ner_recognizer + }) + ``` + + * 多模型对比系统 + + ``` + model_comparison = RunnableParallel({ + "gpt4": gpt4_chain, + "claude": claude_chain, + "gemini": gemini_chain + }) + ``` + + * 智能文档处理系统 + + ``` + document_analyzer = RunnableParallel({ + "summary": summary_chain, # 摘要生成 + "toc": toc_generator, # 目录提取 + "stats": RunnableLambda(lambda doc: { + "char_count": len(doc), + "page_count": doc.count("PAGE_BREAK") + 1 + }) + }) + # 处理200页PDF文本 + analysis_result = document_analyzer.invoke(pdf_text) + ``` + +* 案例实战 + + * 场景:并行生成景点与书籍推荐 + + ``` + from langchain_core.runnables import RunnableParallel + from langchain_core.prompts import ChatPromptTemplate + from langchain_openai import ChatOpenAI + from langchain_core.output_parsers import JsonOutputParser + + #定义模型 + model = ChatOpenAI( + model_name = "qwen-plus", + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", + api_key="sk-005c3c25f6d042848b29d75f2f020f08", + temperature=0.7 + ) + #构建解析器 + parser = JsonOutputParser() + + prompt_attractions = ChatPromptTemplate.from_template("""列出{city}的{num}个景点。返回 JSON 格式: + {{ + "num": "编号", + "city": "城市", + "introduce": "景点介绍", + }} + """) + prompt_books = ChatPromptTemplate.from_template("""列出{city}相关的{num}本书返回 JSON 格式: + {{ + "num": "编号", + "city": "城市", + "introduce": "书籍介绍", + }} """) + + chain1 = prompt_attractions | model | parser + + chain2 = prompt_books | model | parser + + chain = RunnableParallel( + attractions = chain1 , + books = chain2 + ) + + output = chain.invoke({"city": "南京", "num": 3}) + + print(output) + ``` + + + + + + + + + +#### 第4集 RunnableLambda介绍和包装链式函数实战 + +**简介: RunnableLambda介绍和包装链式函数实战** + +* `RunnableLambda` + + * 核心功能 + + * 将任意 Python 函数转换为 Runnable,将普通的 Python 函数或可调用对象转换为 `Runnable`对象,无缝集成到链中 + * 把自己需要的功能通过自定义函数 + RunnableLambda的方式包装,可以跟任何外部系统打通,集成到 LCEL 链 + + ``` + class RunnableLambda(Runnable[Input, Output]): + """ + 将任意Python函数转换为符合Runnable协议的对象 + 实现自定义逻辑与LangChain生态的无缝集成 + """ + ``` + + * 与普通函数的区别 + + | 特性 | 普通函数 | RunnableLambda | + | :------: | :-----------------: | :-------------------: | + | 可组合性 | ❌ 无法直接接入Chain | ✅ 支持` | + | 类型校验 | ❌ 动态类型 | ✅ 静态类型检查 | + | 异步支持 | ❌ 需手动实现 | ✅ 原生支持async/await | + | 批量处理 | ❌ 需循环调用 | ✅ 自动批量优化 | + + * 适合场景: + + * 插入自定义逻辑(如日志记录、数据清洗) + * 转换数据格式(如 JSON 解析)。 + + * API 与用法 + + ``` + from langchain_core.runnables import RunnableLambda + + def log_input(x): + print(f"Input: {x}") + return x + + chain = prompt | RunnableLambda(log_input) | model + ``` + +* 案例实战 + + * 基础文本处理链 + + ``` + from langchain_core.runnables import RunnableLambda + + text_clean_chain = ( + RunnableLambda(lambda x: x.strip()) + | RunnableLambda(str.lower) + ) + + result = text_clean_chain.invoke(" Hello123World ") + print(result) # 输出 "helloworld" + + ``` + + * 打印中间结果并过滤敏感词(在链中插入自定义处理逻辑) + + ``` + from langchain_core.runnables import RunnableLambda + from langchain_openai import ChatOpenAI + def filter_content(text: str) -> str: + return text.replace("暴力", "***") + + #定义模型 + model = ChatOpenAI( + model_name = "qwen-plus", + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", + api_key="sk-005c3c25f6d042848b29d75f2f020f08", + temperature=0.7 + ) + + chain = ( + RunnableLambda(lambda x: x["user_input"]) + | RunnableLambda(filter_content) + | model + ) + + result = chain.invoke({"user_input": "暴力内容"}) + print(result) # 输出过滤后的结果 + ``` + + + + + + + + + + + + + + + +#### 第5集 智能客服路由实战之RunnableBranch条件分支 + +**简介: 智能客服路由实战之RunnableBranch条件分支** + +* `RunnableBranch` + + * 核心功能:根据条件选择执行不同的子链,类似 if-else 路由 + + * API 与用法 + + ``` + from langchain_core.runnables import RunnableBranch + + #条件函数:接收输入,返回布尔值。 + branch = RunnableBranch( + (condition1, chain1), + (condition2, chain2), + default_chain + ) + """ + 参数说明: + - Condition: 返回bool的可调用对象 + - Runnable: 条件满足时执行的分支 + - default: 所有条件不满足时执行的默认分支 + + 技术细节: + 1. 条件按声明顺序 + 2. 第一个满足条件的分支会被执行 + 3. 无默认分支且所有条件不满足时抛出异常 + """ + ``` + + 3 + + * 适合场景: + + * 多任务分类(如区分数学问题与物理问题) + + * 错误处理分支(如主链失败时调用备用链) + + * 多轮对话路由(根据对话历史选择回复策略) + + ``` + # 根据对话历史选择回复策略 + branch = RunnableBranch( + (lambda x: "投诉" in x["history"], complaint_handler), + (lambda x: "咨询" in x["history"], inquiry_handler), + default_responder + ) + ``` + + * 智能路由系统(根据输入类型路由处理方式) + + ``` + # 定义分类函数 + def detect_topic(input_text): + if "天气" in input_text: + return "weather" + elif "新闻" in input_text: + return "news" + else: + return "general" + + # 构建分支链 + branch_chain = RunnableBranch( + (lambda x: detect_topic(x["input"]) == "weather", weather_chain), + (lambda x: detect_topic(x["input"]) == "news", news_chain), + general_chain + ) + + # 执行示例 + branch_chain.invoke({"input": "北京今天天气怎么样?"}) + ``` + + + +* 案例实战:需要构建一个 **智能客服系统**,根据用户输入的请求类型自动路由到不同的处理流程: + + * **技术问题**:路由到技术支持链。 + + * **账单问题**:路由到财务链。 + + * **默认问题**:路由到通用问答链。 + + * 步骤 + + * 导入依赖 + + ``` + from langchain_core.runnables import RunnableBranch, RunnableLambda + from langchain_core.prompts import ChatPromptTemplate + from langchain_openai import ChatOpenAI + from langchain_core.output_parsers import StrOutputParser + ``` + + * 定义模型 + + ``` + #定义模型 + model = ChatOpenAI( + model_name = "qwen-plus", + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", + api_key="sk-005c3c25f6d042848b29d75f2f020f08", + temperature=0.7 + ) + ``` + + * 定义子链 + + ``` + # 技术支持链 + tech_prompt = ChatPromptTemplate.from_template( + "你是一名技术支持专家,请回答以下技术问题:{input}" + ) + tech_chain = tech_prompt | model | StrOutputParser() + + # 财务链 + billing_prompt = ChatPromptTemplate.from_template( + "你是一名财务专员,请处理以下账单问题:{input}" + ) + billing_chain = billing_prompt | model | StrOutputParser() + + # 默认通用链 + default_prompt = ChatPromptTemplate.from_template( + "你是一名客服,请回答以下问题:{input}" + ) + default_chain = default_prompt | model | StrOutputParser() + ``` + + * 定义路由条件函数 + + ``` + def is_tech_question(input: dict) -> bool: + # 获取 "input" 键对应的值 + input_value = input.get("input", "") + # 检查是否包含关键词 + return "技术" in input_value or "故障" in input_value + + def is_billing_question(input: dict) -> bool: + # 获取 "input" 键对应的值 + input_value = input.get("input", "") + # 检查是否包含关键词 + return "账单" in input_value or "支付" in input_value + ``` + + * 构建 RunnableBranch + + ``` + branch = RunnableBranch( + (is_tech_question, tech_chain), # 技术问题 → tech_chain + (is_billing_question, billing_chain), # 账单问题 → billing_chain + default_chain # 默认问题 → default_chain + ) + + full_chain = RunnableLambda(lambda x: {"input": x}) | branch + ``` + + * 测试案例 + + ``` + # 测试技术问题 + tech_response = full_chain.invoke("我的账号登录失败,提示技术故障") + print("技术问题响应:", tech_response) + + # 测试账单问题 + billing_response = full_chain.invoke("我的账单金额有误,请核对") + print("账单问题响应:", billing_response) + + # 测试默认问题 + default_response = full_chain.invoke("你们公司的地址在哪里?") + print("默认问题响应:", default_response) + + #输出示例 + #技术问题响应: 建议您尝试清除浏览器缓存或重置密码。若问题持续,请联系我们的技术支持团队。 + #账单问题响应: 已记录您的账单问题,财务部门将在24小时内与您联系核实。 + #默认问题响应: 我们的公司地址是北京市海淀区中关村大街1号。 + ``` + + * 关键原理解析 + + * **条件路由逻辑** + * `RunnableBranch` 接收一个由 `(条件函数, Runnable)` 组成的列表。 + * 按顺序检查条件,第一个满足条件的分支会被执行,若均不满足则执行默认分支 + + * **输入处理**: + * 输入需为字典格式(如 `{"input": "问题内容"}`),通过 `RunnableLambda` 包装原始输入为字典 + + * **链式组合**: + * 每个分支链(如 `tech_chain`)独立处理输入,输出结果直接返回给调用方 + + * **调试技巧**: + + - 添加日志中间件(通过 `RunnableLambda`)记录路由决策过程 + + ``` + def log_decision(input_data): + print(f"路由检查输入:{input_data}") + return input_data + + log_chain_branch = RunnableLambda(log_decision) | branch + + full_chain = RunnableLambda(lambda x: {"input": x}) | log_chain_branch + ``` +* 总结与最佳实践 + * **组合使用**:通过 `|` 串联或嵌套 `Runnable` 类,构建复杂逻辑。 + * **性能优化**:利用 `RunnableParallel` 减少 IO 等待时间。 + * **调试技巧**:使用 `RunnableLambda` 插入日志或数据检查点。 + * **容错设计**:结合 `RunnableBranch` 和 提升健壮性