diff --git a/README.md b/README.md
index 008c18b..c29842b 100644
--- a/README.md
+++ b/README.md
@@ -8198,7 +8198,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
| RedisStore | 高性能、支持分布式 | 需要运维Redis | 生产环境集群 |
| InMemoryStore | 最快速度 | 重启丢失数据 | 临时测试 |
-### 第三十九章 大模型必备技术Milvus向量数据库
+### 大模型必备技术Milvus向量数据库
#### 向量数据库介绍和技术选型思考
@@ -8214,7 +8214,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
* **相似度计算**:无法高效处理余弦相似度/Euclidean距离等复杂运算
* **实时性要求**:亿级向量场景下传统方案响应延迟高达秒级,暴力搜索时间复杂度O(N)
- ```
+ ```python
// 传统关系型数据库查询(精确匹配)
SELECT * FROM products WHERE category = 'electronics';
@@ -8283,8 +8283,6 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
* 其他:PostgreSQL、ElasticSearch等
-
-
* 综合选型对比
| 维度\产品 | Pinecone | Milvus | Qdrant | Chroma |
@@ -8294,7 +8292,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
| **扩展能力** | 自动扩展 | 手动分片 | 自动分片 | 单机 |
| **典型场景** | 生产级SaaS | 企业私有云 | 高性能需求 | 本地开发 |
| **成本模型** | 按用量付费 | 基础设施+运维 | 基础设施+运维 | 免费 |
-
+
* 技术选型建议
* **数据规模**
@@ -8313,29 +8311,13 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
- **国产化需求**:腾讯云VectorDB或华为云等国产方案。
- **现有技术栈**:PostgreSQL/MongoDB扩展适合渐进式改造。
-
-
* **总结(花钱的多数更轻松方便)**
* 向量数据库的选择需结合数据规模、业务场景、团队能力和成本预算综合评估。
* 对于AI驱动的应用(如RAG、多模态搜索),建议优先考虑云原生或分布式开源方案(如Milvus、Pinecone);
* 传统业务系统可尝试数据库扩展插件以降低迁移成本,具体案例可参考各数据库的官方文档
-
-
-
-
-
-
-
-
-
-
-
-
-#### 第2集 向量数据库Milvus介绍和架构讲解
-
-**简介: 向量数据库Milvus介绍和和架构讲解**
+#### 向量数据库Milvus介绍和架构讲解
* 什么是Milvus向量数据库
@@ -8382,8 +8364,6 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
| **Index Node** | 负责索引构建与优化 | 支持后台异步构建索引 |
| **Coordinator** | 集群元数据管理、任务调度 | 高可用部署(etcd存储元数据) |
-
-
* 极速认知存储内容
* Collection 是一个二维表,具有固定的列和变化的行。
@@ -8392,17 +8372,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){

-
-
-
-
-
-
-
-
-#### 第3集 Milvus核心概念和数据结构讲解
-
-**简介: Milvus核心概念和数据结构讲解**
+#### Milvus核心概念和数据结构讲解
* **向量数据库对比关系型数据库**
@@ -8420,7 +8390,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
- Schema 定义字段结构(主键、向量、标量字段),支持动态字段(Milvus 2.3+),自动生成唯一ID(可选)
- 例子
- ```
+ ```python
pip install pymilvus==2.5.5
from pymilvus import FieldSchema, CollectionSchema, DataType
@@ -8529,19 +8499,8 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
-
-
-
-
-
-
-
-
-
-#### 第4集 Milvus的分区-分片-段结构和最佳实践
-
-**简介: Milvus的分区-分片-段结构和最佳实践**
+#### Milvus的分区-分片-段结构和最佳实践
* 分区-分片-段 很多同学懵逼,用图书馆比喻理解三者关系
@@ -8560,8 +8519,6 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
* **作用**:优化存储空间,旧书盒可压缩归档
* **类比**:数据库将数据分块存储,便于后台合并优化
-
-
* 三者协作关系
@@ -8582,7 +8539,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
* 按业务维度划分(如用户ID、时间范围), 示例:`partition_2024Q1`, `vip_users`
- ```
+ ```python
# 按商品类别创建分区
# 电子产品存入electronics分区
@@ -8620,7 +8577,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
* **合并优化**
- ```
+ ```python
# 自动将小段合并成大段(类似HBase Compaction)
[Segment1(100MB)] + [Segment2(100MB)] → [SegmentMerged(200MB)]
```
@@ -8637,7 +8594,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
- 按业务线分区:`user_profiles`, `product_info`
- **错误示范**:创建超过1000个分区(影响元数据性能)
- ```
+ ```python
# 好的实践:按时间分区
client.create_partition(
collection_name="logs",
@@ -8658,7 +8615,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
| 写入吞吐低 | 写入吞吐高 |
| 易成性能瓶颈 | 资源消耗大 |
- ```
+ ```python
# 创建集合时指定
collection = Collection(
name="product_images",
@@ -8682,28 +8639,15 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
* 设置段容量阈值:`storage.segmentSize=1024` (单位MB)
* 根据数据特性调整
- ```
+ ```python
if 向量维度 > 1024:
maxSize = 512 # 降段大小缓解内存压力
else:
maxSize = 1024
```
-
-
-
-
-
-
-
-
-
-
-
-#### 第5集 Milvus部署架构选择和Docker部署实战
-
-**简介: Milvus部署架构选择和Docker部署实战 **
+#### Milvus部署架构选择和Docker部署实战
* 部署架构选择
@@ -8717,7 +8661,7 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
* Milvus分层架构(Docker部署都包括了)
- ```
+ ```python
┌───────────────────────────────┐
│ Coordinator │ ← 管理元数据、负载均衡
├───────────────┬───────────────┤
@@ -8730,12 +8674,8 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
* Milvus Standalone 是单机服务器部署,所有组件都打包到一个Docker 镜像中,部署方便。
* 此外,Milvus Standalone 通过主从复制支持高可用性。
-
-
* 有钱的当我没说,直接购买云厂商的服务:https://help.aliyun.com/zh/milvus
-
-
* LInux服务器部署Milvus实战
* 阿里云网络安全组记得开放端口 `2379`、`9091`, `19530`
@@ -8746,22 +8686,19 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
* 下载相关资料 ,使用**【wget】或者【浏览器】远程下载相关依赖包(需要替换群里最新的)**
- ```
+ ```python
原生资料下载方式(账号 - 密码 - ip地址 - 端口 需要替换群里最新的,【其他路径不变】)
wget --http-user=用户名 --http-password=密码 http://ip:端口/dcloud_pan/standalone_embed.sh
-
-
#比如 命令行下
wget --http-user=admin --http-password=xdclass.net888 http://47.115.31.28:9088/dcloud_pan/standalone_embed.sh
-
# 比如 浏览器直接访问
http://47.115.31.28:9088/dcloud_pan/standalone_embed.sh
```
-
+
* 解压后执行【**依赖很多,版本差异大,务必按照下面执行,否则课程无法进行下去,加我微信 xdclass6**】
-
- ```
+
+ ```python
#启动
bash standalone_embed.sh start
@@ -8774,14 +8711,14 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
#升级
bash standalone_embed.sh upgrade
```
-
+
* 运行安装脚本后
-
+
- 一个名为 Milvus 的 docker 容器已在**19530** 端口启动。
- 嵌入式 etcd 与 Milvus 安装在同一个容器中,服务端口为**2379**。
- Milvus 数据卷被映射到当前文件夹中的**volumes/milvus**
- 访问 `http://${MILVUS_PROXY_IP}:9091/webui` 例子: http://47.119.128.20:9091/webui/
-
+
* 注意
* Milvus Web UI 与 Attu等可视化工具 不同,它是一个内置工具,只是提供简单直观的界面,查看系统的基本信息
@@ -8790,25 +8727,8 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
* 参考:https://milvus.io/docs/zh/milvus-webui.md
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-#### 第6集 Milvus可视化客户端安装实战
-
-**简介: Milvus可视化客户端安装实战 **
+#### Milvus可视化客户端安装实战
* Attu 可视化客户端介绍
@@ -8845,9 +8765,2424 @@ public JsonData list(@RequestParam(value = "parent_id")Long parentId){
+### Milvus向量数据库多案例实战
+
+#### Python整合向量数据库Milvus案例实战
+
+* Python操作Milvus实战
+
+ * 安装 Milvus Python SDK, 支持 Python、Node.js、GO 和 Java SDK。
+
+ * 建议安装与所安装 Milvus 服务器版本相匹配的 PyMilvus 版本
+
+ * 安装
+
+ ```
+ pip install pymilvus==2.5.5
+ ```
+
+ * 验证安装 如果 PyMilvus 安装正确,运行以下命令时不会出现异常
+
+ ```
+ python -c "from pymilvus import Collection"
+ ```
+
+ * 接口可分为以下几类:
+
+ * **DDL / DCL:**createCollection / createPartition / dropCollection / dropPartition / hasCollection / hasPartition
+ * **DML / Produce:**插入 / 删除 / 上移
+ * **DQL:**搜索/查询
+
+* 操作Milvus数据库
+
+ * 使用connect()连接 Milvus 服务器,进行相关操作
+
+ ```python
+ #也可以使用MilvusClient
+ #from pymilvus import MilvusClient
+ #client = MilvusClient("http://47.119.128.20:19530")
+
+ from pymilvus import connections, db
+ conn = connections.connect(host="47.119.128.20", port=19530)
+ # 创建数据库
+ #db.create_database("my_database")
+ # 使用数据库
+ db.using_database("my_database")
+ # 列出数据库
+ dbs = db.list_database()
+ print(dbs)
+ #['default', 'my_database']
+ # 删除数据库
+ db.drop_database("my_database")
+ ```
+
+* Collection与Schema的创建和管理
+
+ * Collection 是一个二维表,具有固定的列和变化的行,每列代表一个字段,每行代表一个实体。
+
+ * 要实现这样的结构化数据管理,需要一个 Schema定义 Collections 的表结构
+
+ * 每个Schema由多个`FieldSchema`组成:
+
+ ```python
+ from pymilvus import FieldSchema, DataType
+
+ # 字段定义示例
+ fields = [
+ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
+ FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=128),
+ FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=50)
+ ]
+ ```
+
+ * 字段类型详解
+
+ | 数据类型 | 说明 | 示例 |
+ | :-------------: | :----------: | :---------------: |
+ | `INT8/16/32/64` | 整型 | `DataType.INT64` |
+ | `FLOAT` | 单精度浮点数 | `DataType.FLOAT` |
+ | `DOUBLE` | 双精度浮点数 | `DataType.DOUBLE` |
+ | `VARCHAR` | 变长字符串 | `max_length=255` |
+ | `FLOAT_VECTOR` | 浮点向量 | `dim=768` |
+
+ * 创建collection实战
+
+ ```python
+ from pymilvus import connections
+ from pymilvus import FieldSchema, DataType
+ from pymilvus import CollectionSchema, Collection
+
+ conn = connections.connect(host="47.119.128.20", port=19530)
+ # 步骤1:定义字段
+ fields = [
+ FieldSchema("id", DataType.INT64, is_primary=True),
+ FieldSchema("vector", DataType.FLOAT_VECTOR, dim=128),
+ FieldSchema("tag", DataType.VARCHAR, max_length=50)
+ ]
+
+ # 步骤2:创建Schema
+ schema = CollectionSchema(fields, description="示例集合")
+
+ # 步骤3:实例化Collection
+ collection = Collection(
+ name="demo_collection",
+ schema=schema,
+ shards_num=2 # 分片数(分布式扩展关键)
+ )
+ ```
+
+ * 关键参数解析
+
+ | 参数 | 说明 | 推荐值 |
+ | :-----------: | :------------------------: | :--------------: |
+ | `shards_num` | 分片数量(创建后不可修改) | 集群节点数×2 |
+ | `description` | 集合描述信息 | 建议填写业务用途 |
+
+ * 动态字段Schema
+
+ * 在集合中启用动态字段后,所有未在 Schema 中定义的字段及其值都将作为键值对存储在动态字段中
+
+ ```python
+ # 启用动态字段(Milvus 2.3+)
+ schema = CollectionSchema(
+ fields,
+ enable_dynamic_field=True
+ )
+ ```
+
+ * 案例讲解
+
+ * 假设 Collections Schema 只定义两个字段,名为`id` 和`vector` ,启用了动态字段,在 Collections 中插入以下数据集
+ * 数据集包含 多个实体,每个实体都包括字段`id`,`vector`, 和`color` ,Schema 中没有定义`color` 字段。
+ * 由于 Collections 启用了动态字段,因此字段`color` 将作为键值对存储在动态字段中。
+
+ ```python
+ [
+ {id: 0, vector: [0.3580376395471989, -0.6023495712049978, 0.18414012509913835, -0.26286205330961354, 0.9029438446296592], color: "pink_8682"},
+ {id: 7, vector: [-0.33445148015177995, -0.2567135004164067, 0.8987539745369246, 0.9402995886420709, 0.5378064918413052], color: "grey_8510"},
+ {id: 8, vector: [0.39524717779832685, 0.4000257286739164, -0.5890507376891594, -0.8650502298996872, -0.6140360785406336], color: "white_9381"},
+ {id: 9, vector: [0.5718280481994695, 0.24070317428066512, -0.3737913482606834, -0.06726932177492717, -0.6980531615588608], color: "purple_4976"}
+ ]
+ ```
+
+ | 类型 | 特点 | 适用场景 |
+ | :------------: | :---------------------------: | :----------------------------: |
+ | **静态Schema** | 严格字段定义 | 数据结构固定的业务(用户画像) |
+ | **动态Schema** | 允许灵活字段(需Milvus 2.3+) | 日志类多变数据 |
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+#### 第2集 Milvus索引操作和最佳实践避坑指南
+
+**简介: Milvus索引操作和最佳实践避坑指南**
+
+* 为什么需要索引?
+
+ * 加速查询:避免暴力比对,快速定位相似向量, 平衡召回率与查询速度
+ * 节省资源:减少内存占用和计算开销, 建议为经常访问的向量和标量创建索引
+
+
+
+* 常见的索引类型
+
+ | 索引类型 | 适用场景 | 内存占用 | 精度 | 构建速度 |
+ | :------: | :----------------------: | :------: | :-----: | :------: |
+ | FLAT | 小数据精确搜索(<100万) | 高 | 100% | 快 |
+ | IVF_FLAT | 大数据平衡场景(千万级) | 中 | 95%-98% | 较快 |
+ | HNSW | 高召回率需求 | 高 | 98%-99% | 慢 |
+ | DISKANN | 超大规模(10亿+) | 低 | 90%-95% | 最慢 |
+
+* Milvus索引操作
+
+ * 创建索引
+
+ ```
+ # 导入MilvusClient和DataType模块,用于连接Milvus服务器并操作数据类型
+ from pymilvus import MilvusClient, DataType
+
+ # 实例化MilvusClient以连接到指定的Milvus服务器
+ client = MilvusClient(
+ uri="http://47.119.128.20:19530"
+ )
+
+ # 创建schema对象,设置自动ID生成和动态字段特性
+ schema = MilvusClient.create_schema(
+ auto_id=False,
+ enable_dynamic_field=True,
+ )
+
+ # 向schema中添加字段"id",数据类型为INT64,作为主键
+ schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
+ # 向schema中添加字段"vector",数据类型为FLOAT_VECTOR,维度为5
+ schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=5)
+
+ # 使用create_collection方法根据schema创建集合"customized_setup"
+ client.create_collection(
+ collection_name="customized_setup",
+ schema=schema,
+ )
+
+ # 准备索引参数,为"vector"字段创建索引
+ index_params = MilvusClient.prepare_index_params()
+
+ # 添加索引配置,指定字段名、度量类型、索引类型、索引名和参数
+ index_params.add_index(
+ field_name="vector",
+ metric_type="COSINE", # 距离计算方式 (L2/IP/COSINE)
+ index_type="IVF_FLAT",
+ index_name="vector_index",
+ params={ "nlist": 128 } #聚类中心数 (建议值:sqrt(数据量))
+ )
+
+ # 创建索引,不等待索引创建完成即返回
+ client.create_index(
+ collection_name="customized_setup",
+ index_params=index_params,
+ sync=False # 是否等待索引创建完成后再返回。默认为True。
+ )
+ ```
+
+ * 参数说明
+
+ | 参数 | 参数 |
+ | ----------------- | ------------------------------------------------------------ |
+ | `field_name` | 指定字段名称 |
+ | `metric_type` | 用于衡量向量间相似性的算法。值有**IP**、**L2**、**COSINE**、**JACCARD**、**HAMMING**。只有当指定字段是向量字段时才可用。 |
+ | `index_type` | 索引类型 |
+ | `index_name` | 索引名称 |
+ | `params` | 指定索引类型的微调参数 |
+ | `collection_name` | Collections 的名称。 |
+ | `sync` | 控制与客户端请求相关的索引构建方式。有效值: `True` (默认):客户端等待索引完全建立后才返回。在该过程完成之前不会收到响应。`False`:客户端收到请求后立即返回,索引在后台建立 |
+
+ * 查看索引信息
+
+ ```
+ #列出索引名称
+ res = client.list_indexes(
+ collection_name="customized_setup"
+ )
+ print(res)
+
+ #获取索引详细信息
+ res = client.describe_index(
+ collection_name="customized_setup",
+ index_name="vector_index"
+ )
+ print(res)
+ ```
+
+ * 删除索引
+
+ * 删除前需确保无查询正在使用该索引
+ * 删除后需重新创建索引才能进行有效查询
+
+ ```
+ #如果不再需要索引,可以直接将其删除。
+ client.drop_index(
+ collection_name="customized_setup",
+ index_name="vector_index"
+ )
+ print("索引已删除")
+ ```
+
+* 最佳实践与避坑指南
+
+ * **Schema设计原则**
+
+ - 主键选择
+ - 推荐自增ID避免冲突
+ - 禁止使用向量字段作为主键
+ - **字段数量**:单个集合不超过32个字段
+ - **向量维度**:创建后不可修改,需提前规划
+
+ * **索引选择策略**:
+
+ - 百万级以下 → FLAT
+ - 百万到亿级 → IVF/HNSW
+ - 十亿级以上 → DISKANN
+
+ * **操作规范**:
+
+ - 数据插入完成后再建索引
+ - 定期重建索引(数据变更超过30%)
+ - 为高频查询字段建立独立索引
+ * **常见错误处理**
+
+ | 错误场景 | 解决方案 |
+ | :--------------: | :----------------------------------: |
+ | "字段类型不匹配" | 检查插入数据与Schema定义的一致性 |
+ | "主键冲突" | 插入前检查ID唯一性,或使用自动生成ID |
+ | "向量维度错误" | 校验dim参数与数据实际维度 |
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+#### 第3集 Milvus向量数据库的DML操作实战
+
+**简介: Milvus向量数据库的DML操作实战**
+
+* 核心DML操作实战
+
+ * 创建集合(Collection),集合是Milvus中数据存储的基本单位,需定义字段和索引
+
+ * `auto_id=True`时无需手动指定主键
+ * 动态字段(`enable_dynamic_field=True`)允许灵活扩展非预定义字段
+
+ ```
+ # 导入MilvusClient和DataType模块,用于连接Milvus服务器并操作数据类型
+ from pymilvus import MilvusClient, DataType
+
+ # 实例化MilvusClient以连接到指定的Milvus服务器
+ client = MilvusClient(
+ uri="http://47.119.128.20:19530"
+ )
+ # 定义Schema
+ schema = client.create_schema(auto_id=False, enable_dynamic_field=True)
+ schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
+ schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=128)
+ schema.verify() # 验证Schema
+
+ # 定义索引参数
+ index_params = client.prepare_index_params()
+ index_params.add_index(
+ field_name="vector",
+ index_type="IVF_FLAT", # 量化索引,平衡速度与精度
+ metric_type="L2", # 相似性度量标准(欧式距离)
+ params={"nlist": 1024} # 聚类中心数
+ )
+
+ # 创建集合
+ client.create_collection(
+ collection_name="my_collection",
+ schema=schema,
+ index_params=index_params
+ )
+ ```
-####
+ * 插入数据(Insert)支持单条或批量插入【可视化工具那边需要加载,包括查询等都是需要加载状态才可以操作】
+
+ ```
+ data = [
+ {"id": 1, "vector": [0.1]*128, "text": "Sample text 1"},
+ {"id": 2, "vector": [0.2]*128, "text": "Sample text 2"}
+ ]
+
+ # 插入数据
+ insert_result = client.insert(
+ collection_name="my_collection",
+ data=data
+ )
+ print("插入ID列表:", insert_result["ids"]) # 返回主键ID
+ ```
+
+ * 删除数据(Delete)通过主键或条件表达式删除
+
+ ```
+ # 按主键删除
+ client.delete(
+ collection_name="my_collection",
+ ids=[1, 2] # 主键列表
+ )
+
+ # 按条件删除(如删除text字段为空的记录)
+ client.delete(
+ collection_name="my_collection",
+ filter="text == ''"
+ )
+ ```
+
+ * 更新数据(Update)Milvus不支持直接更新,需通过“删除+插入”实现:
+
+ ```
+ # 删除旧数据
+ client.delete(collection_name="my_collection", ids=[3])
+
+ # 插入新数据
+ client.insert(
+ collection_name="my_collection",
+ data=[{"id": 3, "vector": [0.3]*128, "text": "Updated text"}]
+ )
+ ```
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+#### 第4集 Milvus向量Search查询综合案例实战《上》
+
+**简介: Milvus向量Search查询综合案例实战**
+
+* 需求说明
+
+ * 创建包含混合数据类型(标量+向量)的集合
+ * 批量插入结构化和非结构化数据
+ * 实现带过滤条件的混合查询
+ * 验证端到端的向量搜索流程
+
+* Search语法深度解析
+
+ * 核心参数说明
+
+ ```
+ results = client 或 collection.search(
+ data=[[0.12, 0.23, ..., 0.88]], # 查询向量(必须)
+ anns_field="vector", # 要搜索的向量字段名(必须)
+ param={"metric_type": "L2", "params": {"nprobe": 10}}, # 搜索参数
+ limit=10, # 返回结果数量
+ expr="price > 50", # 过滤表达式(可选)
+ output_fields=["product_id", "price"], # 返回的字段
+ )
+ ```
+
+ | 参数 | 类型 | 说明 | 常用值 |
+ | :-----------: | :--: | :----------: | :--------------------------------------: |
+ | data | list | 查询向量列表 | 二维数组 |
+ | anns_field | str | 向量字段名 | 创建时定义的字段 |
+ | param | dict | 搜索参数 | 包含metric_type和params |
+ | limit | int | 返回结果数 | 5-100 |
+ | expr | str | 过滤条件 | price > 50 AND category == 'electronics' |
+ | output_fields | list | 返回字段 | ["field1", "field2"] |
+
+* 搜索案例实战(MilvusClient方式)
+
+ * 准备数据
+
+ ```
+ from pymilvus import (
+ connections,MilvusClient,
+ FieldSchema, CollectionSchema, DataType,
+ Collection, utility
+ )
+ import random
+
+ # # 创建Milvus客户端
+ client = MilvusClient(
+ uri="http://47.119.128.20:19530",
+ )
+
+ #删除已存在的同名集合
+ if client.has_collection("book"):
+ client.drop_collection("book")
+
+ # 定义字段
+ fields = [
+ FieldSchema(name="book_id", dtype=DataType.INT64, is_primary=True, auto_id=True),
+ FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=200),
+ FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=50),
+ FieldSchema(name="price", dtype=DataType.DOUBLE),
+ FieldSchema(name="book_intro", dtype=DataType.FLOAT_VECTOR, dim=4)
+ ]
+
+ # 创建集合Schema
+ schema = CollectionSchema(
+ fields=fields,
+ description="Book search collection"
+ )
+
+ #创建集合
+ client.create_collection(collection_name="book", schema=schema)
+
+ # 生成测试数据
+ num_books = 1000
+ categories = ["科幻", "科技", "文学", "历史"]
+ titles = ["量子世界", "AI简史", "时光之轮", "文明起源", "未来简史", "数据科学"]
+
+ data = []
+ for i in range(num_books):
+ data.append({
+ "title": f"{random.choice(titles)}_{i}",
+ "category": random.choice(categories),
+ "price": round(random.uniform(10, 100), 2),
+ "book_intro": [random.random() for _ in range(4)] # 4维向量
+ })
+
+ # 批量插入
+ insert_result = client.insert(
+ collection_name="book",
+ data=data
+ )
+
+ print(f"插入数据量:{len(insert_result['ids'])}")
+ ```
+
+ * 创建索引
+
+ ```
+ # 准备索引参数,为"vector"字段创建索引
+ index_params = MilvusClient.prepare_index_params()
+
+ # 添加索引配置,指定字段名、度量类型、索引类型、索引名和参数
+ index_params.add_index(
+ field_name="book_intro",
+ metric_type="L2", # 距离计算方式 (L2/IP/COSINE)
+ index_type="IVF_FLAT",
+ index_name="vector_index",
+ params={ "nlist": 128 } #聚类中心数 (建议值:sqrt(数据量))
+ )
+
+ # 创建索引,不等待索引创建完成即返回
+ client.create_index(
+ collection_name="book",
+ index_params=index_params
+ )
+ print("索引创建完成")
+ ```
+
+ * 执行查询【执行查询前需要加载才可以使用】
+
+ ```
+ client.load_collection(collection_name="book") # 加载集合到内存
+ # 生成查询向量
+ query_vector = [random.random() for _ in range(4)]
+
+ # 执行带过滤条件的向量搜索
+ results = client.search(
+ collection_name="book",
+ data=[query_vector], # 支持批量查询
+ filter="category == '科幻' and price < 50",
+ output_fields=["title", "category", "price"],
+ limit=3,
+ search_params={"nprobe": 10}
+ )
+
+ # 解析结果
+ print("\n科幻类且价格<50的搜索结果:")
+ for result in results[0]: # 第一个查询结果集
+ print(f"ID: {result['book_id']}")
+ print(f"距离: {result['distance']:.4f}")
+ print(f"标题: {result['entity']['title']}")
+ print(f"价格: {result['entity']['price']:.2f}")
+ print("-" * 30)
+ ```
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+#### 第5集 Milvus向量Search查询综合案例实战《下》
+
+**简介: Milvus向量Search查询综合案例实战**
+
+* 向量数据库完整工作流程示意图
+
+ ```
+ 1. 创建集合Schema
+ ↓
+ 2. 插入测试数据
+ ↓
+ 3. 创建向量索引
+ ↓
+ 4. 加载集合到内存
+ ↓
+ 5. 执行混合查询(向量+标量过滤)
+ ```
+
+* 全量查询案例演示
+
+ * 测试是否有 output_fields 字段,返回结果的差异
+
+ ```
+ # 案例1:基础向量查询
+ basic_res = client.search(
+ collection_name="book",
+ data=[query_vector],
+ limit=5
+ )
+
+
+ # 案例2:分页查询
+ page_res = client.search(
+ collection_name="book",
+ data=[query_vector],
+ offset=2,
+ limit=3
+ )
+
+ # 案例3:批量查询
+ batch_res = client.search(
+ collection_name="book",
+ data=[query_vector, [0.5]*4], # 同时查询两个向量,每个向量都会返回2条
+ limit=2
+ )
+ ```
+
+* 集合状态
+
+ ```
+ # 验证集合状态
+ print(client.describe_collection("book"))
+
+
+ # 索引状态检查
+ print(client.list_indexes("book"))
+ ```
+
+* 新旧版本对比表
+
+ | 功能 | PyMilvus旧版 | MilvusClient新版 |
+ | :----------- | :---------------------- | :---------------------------- |
+ | 连接管理 | 需要手动管理connections | 客户端自动管理 |
+ | 数据插入格式 | 多列表结构 | 字典列表 |
+ | 字段定义 | 使用FieldSchema | 在create_collection中直接定义 |
+ | 返回结果格式 | 对象属性访问 | 标准化字典格式 |
+ | 错误处理 | 异常类捕获 | 统一错误码系统 |
+ | 动态字段支持 | 需要额外配置 | 参数开启即可 |
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ **愿景:"IT路上的持续充电平台,让技术不再难学"**
+**更多高级课程请访问 xdclass.net**
+
+### 第四十一章 MMR搜索和LangChain整合Milvus实战
+
+
+
+#### 第1集 相似度Similarity和MMR最大边界相关搜索
+
+**简介: 相似度Similarity和MMR最大边界相关搜索**
+
+* 搜索的行业应用案例:电商推荐系统(明白两个差异)
+
+ * **相似度搜索**:
+
+ ```
+ "用户点击商品A → 推荐相似商品B、C"
+ ```
+
+ * **MMR搜索**:
+
+ ```
+ "用户浏览历史多样 → 推荐跨品类商品"
+ ```
+
+
+
+* 基础相似度搜索(Similarity Search)
+
+ * **原理**:通过向量空间中的距离计算(余弦相似度/L2距离等)找出最接近目标向量的结果
+
+
+
+ * 核心特点
+
+ * **纯向量驱动**:仅依赖向量空间中的几何距离,余弦相似度、L2距离
+ * **结果同质化**:返回最相似的连续区域数据
+ * **高性能**:时间复杂度 O(n + klogk)
+
+ * 参数配置模板,方法 `vector_store.similarity_search( )`
+
+ ```
+ vector_store.as_retriever(
+ search_type="similarity",
+ search_kwargs={
+ "k": 5, # 返回数量
+ "score_threshold": 0.65, # 相似度得分的最低要求,相似度≥65%的才考虑
+ "filter": "category == 'AI'", # 元数据过滤
+ "param": { # Milvus专属参数
+ "nprobe": 32, #nprobe是Milvus中用于控制搜索时访问的聚类数量的参数,nprobe越大,搜索越精确但耗时更长。
+ "radius": 0.8 #是在范围搜索中使用的参数,指定搜索的半径范围,结合score_threshold可用于限定结果的范围
+ }
+ }
+ )
+ ```
+
+ * 典型应用场景
+
+ * 精确语义匹配(专利检索、论文查重)
+
+ * 基于内容的推荐("更多类似内容")
+
+ * 敏感信息过滤(高阈值精准匹配)
+
+
+
+* 最大边界相关搜索(MMR Search)
+
+ * Maximal Marginal Relevance,最大边际相关性, 是一种信息检索和推荐系统中常用的算法
+ * 核心目标是 在返回的结果中平衡相关性与多样性,避免返回大量高度相似的内容。
+ * 设计初衷是解决传统相似性搜索(如余弦相似度)可能导致的“信息冗余”问题,在需要覆盖多角度信息或推荐多样化内容的场景中效果显著
+ * **原理**:在相似度和多样性之间进行权衡,避免结果冗余
+
+
+
+ * 算法原理图解
+
+ ```
+ 初始候选集(fetch_k=20)
+ │
+ ├── 相似度排序
+ │ [1, 2, 3, ..., 20]
+ │
+ └── 多样性选择(λ=0.5)
+ ↓
+ 最终结果(k=5)
+ [1, 5, 12, 3, 18] # 兼顾相似与差异
+ ```
+
+ * 参数配置模板, 方法 `vector_store.max_marginal_relevance_search( )`
+
+ ```
+ mmr_retriever = vector_store.as_retriever(
+ search_type="mmr",
+ search_kwargs={
+ "k": 3, #最终返回的结果数量,MMR 会从更大的候选集中筛选出 3 个最相关且多样化的结果
+ "fetch_k": 20, #指定 MMR 算法的候选集大小,fetch_k 越大,候选集越广,结果可能越多样,但计算成本更高
+ "lambda_mult": 0.6, #控制相关性与多样性之间的权衡系数。范围为 [0, 1]:接近1 结果更相似。接近0:结果差异更大
+ "param": {
+ "nprobe": 64, # 针对Milvus的IVF 类索引,控制搜索的聚类数量,搜索的聚类范围越广,召回率越高,但速度越慢
+ "ef": 128 # Milvus HNSW索引,控制搜索的深度,值越大搜索越精确,耗时增加;值越小速度更快,可能漏掉相关结果
+ }
+ }
+ )
+ ```
+
+ * 关键参数解析
+
+ | 参数 | 相似度搜索 | MMR搜索 | 影响效果 |
+ | :-------------: | :------------: | :--------------: | :------------------------------------: |
+ | k | 控制结果数量 | 控制最终结果数量 | 值越大返回越多但可能降低精度 |
+ | lambda_mult | 无 | 0-1之间 | 值越大越偏向相关性,值越小越强调多样性 |
+ | score_threshold | 过滤低质量结果 | 通常不使用 | 阈值设置需根据embedding模型调整 |
+ | filter | 元数据过滤 | 支持同左 | 可结合业务维度进行筛选 |
+
+ * 典型应用场景
+
+ * 多样化推荐:电商跨品类推荐
+ * 知识发现:科研文献探索
+ * 内容生成:生成多样化文案
+
+* 对比决策矩阵
+
+ | 维度 | Similarity Search | MMR Search |
+ | :----------: | :----------------: | :-----------------: |
+ | **结果质量** | 高相似度但可能重复 | 多样性更佳 |
+ | **响应速度** | 平均 120ms | 平均 200-300ms |
+ | **内存消耗** | 低(仅存储topK) | 高(需缓存fetch_k) |
+ | **适用场景** | 精确匹配、去重 | 推荐系统、知识发现 |
+ | **可解释性** | 直观的相似度排序 | 综合评分需二次解释 |
+
+* 企业推荐系统架构示例
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+#### 第2集 新版LangChain向量数据库VectorStore设计
+
+**简介: 新版LangChain向量数据库VectorStore设计**
+
+* LangChain 向量存储体系架构
+
+ * RAG系统核心设计模式
+
+ ```
+ Document
+ │
+ ▼
+ Text Splitter
+ │
+ ▼
+ Embedding Model
+ │
+ ▼
+ [Milvus|Chroma|Pinecone...] ←→ VectorStore
+ ```
+
+ * LangChain设计抽象类`VectorStore`,统一接口,具体的实现由各自数据库负责
+
+ * 文档(如果过期就忽略) https://python.langchain.com/docs/integrations/vectorstores/
+ * 安装依赖 `pip install langchain-milvus`
+
+ ```
+ from langchain_core.vectorstores import VectorStore
+ ```
+
+
+
+* VectorStore 核心方法详解
+
+ * 通用方法列表
+
+ | 方法名 | 作用描述 | 常用参数示例 |
+ | :------------------------------: | :--------------------: | :----------------------------: |
+ | from_documents() | 从文档创建向量库 | documents, embedding, **kwargs |
+ | add_documents() | 追加文档到已有库 | documents |
+ | similarity_search() | 相似度查询 | query, k=4 |
+ | similarity_search_with_score() | 带相似度得分的查询 | query, k=4 |
+ | max_marginal_relevance_search( ) | MMR最大边界搜索 | query, k=4 |
+ | as_retriever() | 转换为检索器供链式调用 | search_kwargs={} |
+
+ * 初始化方法
+
+ ```
+ @classmethod
+ def from_documents(
+ cls,
+ documents: List[Document],
+ embedding: Embeddings,
+ **kwargs
+ ) -> VectorStore:
+ """
+ 文档自动转换存储
+ :param documents: LangChain Document对象列表
+ :param embedding: 文本向量化模型
+ :param kwargs: 向量库特有参数
+ :return: 初始化的VectorStore实例
+ """
+ ```
+
+ * 和`add_documents()`区别
+
+ | 特性 | `from_documents()` | `add_documents()` |
+ | :----------: | :----------------------: | :--------------------: |
+ | **方法类型** | 类方法(静态方法) | 实例方法 |
+ | **主要用途** | 初始化集合并批量插入文档 | 向已存在的集合追加文档 |
+ | **集合创建** | 自动创建新集合 | 要求集合已存在 |
+ | **性能消耗** | 高(需建索引+数据迁移) | 低(仅数据插入) |
+ | **典型场景** | 首次数据入库 | 增量数据更新 |
+ | **连接参数** | 需要完整连接配置 | 复用已有实例的配置 |
+
+ * 数据插入方法
+
+ ```
+ def add_texts(
+ self,
+ texts: Iterable[str],
+ metadatas: Optional[List[dict]] = None,
+ **kwargs
+ ) -> List[str]:
+ """
+ 插入文本数据到向量库
+ :param texts: 文本内容列表
+ :param metadatas: 对应的元数据列表
+ :return: 插入文档的ID列表
+ """
+ ```
+
+ * 相似性搜索方法
+
+ ```
+ def similarity_search(
+ self,
+ query: str,
+ k: int = 4,
+ filter: Optional[dict] = None,
+ **kwargs
+ ) -> List[Document]:
+ """
+ 执行相似性搜索
+ :param query: 查询文本
+ :param k: 返回结果数量
+ :param filter: 元数据过滤条件
+ :return: 匹配的Document列表
+ """
+ ```
+
+ * 最大边界相关算法(MMR)
+
+ ```
+ def max_marginal_relevance_search(
+ self,
+ query: str,
+ k: int = 4,
+ fetch_k: int = 20,
+ lambda_mult: float = 0.5
+ ) -> List[Document]:
+ """
+ 多样性增强搜索
+ :param k: 返回结果数量
+ :param fetch_k: 初始获取数量
+ :param lambda_mult: 多样性权重(0-1)
+ """
+ ```
+
+* 不同向量数据库支持特性不一样
+
+ | 特性 | Milvus | FAISS | Pinecone | Chroma |
+ | :----------: | :----: | :---: | :------: | :----: |
+ | 分布式支持 | ✓ | ✗ | ✓ | ✗ |
+ | 元数据过滤 | ✓ | ✗ | ✓ | ✓ |
+ | 自动索引管理 | ✓ | ✗ | ✓ | ✓ |
+ | 本地运行 | ✓ | ✓ | ✗ | ✓ |
+ | 相似度算法 | 8种 | 4种 | 3种 | 2种 |
+
+* 场景:知识库冷启动
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+#### 第3集 LangChain整合Milvus新增和删除实战
+
+**简介: LangChain整合Milvus新增和删除实战**
+
+* 需求
+
+ * 使用LangChain整合向量数据库Milvus
+ * 实现新增和删除向量数据库的数据实战
+ * 文档(如果过期就忽略)
+ * https://python.langchain.com/docs/integrations/vectorstores/milvus/
+
+* 案例实战
+
+ * 准备数据
+
+ ```
+ from langchain_community.embeddings import DashScopeEmbeddings
+ #from langchain.vectorstores import Milvus
+ from langchain_milvus import Milvus
+ from langchain_core.documents import Document
+ from uuid import uuid4
+
+ # 初始化模型
+ embeddings = DashScopeEmbeddings(
+ model="text-embedding-v2", # 第二代通用模型
+ max_retries=3,
+ dashscope_api_key="sk-005c3c25f6d042848b29d75f2f020f08"
+ )
+ vector_store = Milvus(
+ embeddings,
+ connection_args={"uri": "http://47.119.128.20:19530"},
+ collection_name="langchain_example",
+ )
+ document_1 = Document(
+ page_content="I had chocolate chip pancakes and scrambled eggs for breakfast this morning.",
+ metadata={"source": "tweet"},
+ )
+ document_2 = Document(
+ page_content="The weather forecast for tomorrow is cloudy and overcast, with a high of 62 degrees.",
+ metadata={"source": "news"},
+ )
+ document_3 = Document(
+ page_content="Building an exciting new project with LangChain - come check it out!",
+ metadata={"source": "tweet"},
+ )
+ document_4 = Document(
+ page_content="Robbers broke into the city bank and stole $1 million in cash.",
+ metadata={"source": "news"},
+ )
+ document_5 = Document(
+ page_content="Wow! That was an amazing movie. I can't wait to see it again.",
+ metadata={"source": "tweet"},
+ )
+ document_6 = Document(
+ page_content="Is the new iPhone worth the price? Read this review to find out.",
+ metadata={"source": "website"},
+ )
+ document_7 = Document(
+ page_content="The top 10 soccer players in the world right now.",
+ metadata={"source": "website"},
+ )
+ document_8 = Document(
+ page_content="LangGraph is the best framework for building stateful, agentic applications!",
+ metadata={"source": "tweet"},
+ )
+ document_9 = Document(
+ page_content="The stock market is down 500 points today due to fears of a recession.",
+ metadata={"source": "news"},
+ )
+ document_10 = Document(
+ page_content="I have a bad feeling I am going to get deleted :(",
+ metadata={"source": "tweet"},
+ )
+ documents = [
+ document_1,document_2,document_3,document_4,document_5,document_6,
+ document_7,document_8,document_9,document_10,
+ ]
+ ```
+
+ * 插入
+
+ ```
+ ids = [ str(i+1) for i in range(len(documents))]
+ print(ids)
+ result = vector_store.add_documents(documents=documents, ids=ids)
+ print(result)
+ ```
+
+ * 删除
+
+ ```
+ result = vector_store.delete(ids=["1"])
+ print(result)
+ #(insert count: 0, delete count: 1, upsert count: 0, timestamp: 456798840753225732, success count: 0, err count: 0
+ ```
+
+
+
+
+
+
+
+
+
+
+
+#### 第4集 LangChain实战MMR和相似性搜索实战
+
+**简介: LangChain实战MMR和相似性搜索实战**
+
+* 需求
+
+ * 使用LangChain整合向量数据库Milvus
+ * 测试相关搜索:相似度搜索和MMR搜索
+
+* 案例实战
+
+ * 准备数据【**执行多次有多条重复记录,向量数据库不会去重,方便测试MMR**】
+
+ ```
+ from langchain_community.embeddings import DashScopeEmbeddings
+ #from langchain.vectorstores import Milvus
+ from langchain_milvus import Milvus
+ from langchain_core.documents import Document
+
+ # 初始化模型
+ embeddings = DashScopeEmbeddings(
+ model="text-embedding-v2", # 第二代通用模型
+ max_retries=3,
+ dashscope_api_key="sk-005c3c25f6d042848b29d75f2f020f08"
+ )
+
+ document_1 = Document(
+ page_content="LangChain支持多种数据库集成,小滴课堂的AI大课",
+ metadata={"source": "xdclass.net/doc1"},
+ )
+ document_2 = Document(
+ page_content="Milvus擅长处理向量搜索,老王的课程不错",
+ metadata={"source": "xdclass.net/doc2"},
+ )
+ document_3 = Document(
+ page_content="我要去学小滴课堂的架构大课",
+ metadata={"source": "xdclass.net/doc3"},
+ )
+ document_4 = Document(
+ page_content="今天天气不错,老王和老帆去按摩了",
+ metadata={"source": "xdclass.net/doc4"},
+ )
+ documents = [document_1,document_2,document_3,document_4]
+ vector_store = Milvus.from_documents(
+ documents=documents,
+ embedding=embeddings,
+ collection_name="mmr_test",
+ connection_args={"uri": "http://47.119.128.20:19530"}
+ )
+ ```
+
+ * 相似性搜索(向量数据库插入多个重复数据,看是否会返回一样的)
+
+ ```
+ # 相似性搜索
+ query = "如何进行数据库集成?"
+ results = vector_store.similarity_search(query, k=2)
+ for doc in results:
+ print(f"内容:{doc.page_content}\n元数据:{doc.metadata}\n")
+
+ # 混合搜索(结合元数据过滤)
+ results = vector_store.similarity_search(
+ query,
+ k=2,
+ expr='source == "xdclass.net/doc1"'
+ )
+
+ print(results)
+ ```
+
+ * MMR搜索(跨类搭配,向量数据库插入多个重复数据,看是否会返回一样的)
+
+ ```
+ # MMR推荐(跨类搭配)
+ diverse_results = vector_store.max_marginal_relevance_search(
+ query="如何进行数据库集成",
+ k=2,
+ fetch_k=10,
+ lambda_mult=0.4,
+ # expr="category in ['shoes', 'clothes', 'accessories']",
+ search_params={
+ "metric_type": "IP",
+ "params": {"nprobe": 32}
+ }
+ )
+
+ print(diverse_results)
+ ```
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ **愿景:"IT路上的持续充电平台,让技术不再难学"**
+**更多高级课程请访问 xdclass.net**
+
+### 第四十二章 Retrievers检索器+RAG文档助手项目实战
+
+
+
+#### 第1集 LangChain检索器Retrievers案例实战
+
+**简介: LangChain检索器Retrievers案例实战**
+
+* 什么是`Retriever`
+
+ * **统一接口**:标准化检索流程,无论数据来源如何,最终输出`Document`对象列表。
+
+ * **多源混合检索**:支持同时查询向量库、传统数据库和搜索引擎【提高召回率】
+
+ * **与VectorStore的关系**:Retriever不直接管理存储,依赖VectorStore(如FAISS、Chroma)实现向量化与检索。
+
+ * **RAG中的角色**:作为检索增强生成(RAG)流程的“数据入口”,为生成模型提供精准上下文
+
+
+
+ * 有多个实现:VectorStoreRetriever、MultiQueryRetriever、SelfQueryRetriever等
+
+ * 特点
+
+ * **模块化设计**:支持插件式扩展,可自定义检索算法(如混合搜索、重排序)。
+ * **异步支持**:通过`async_get_relevant_documents`实现高并发场景下的高效检索。
+ * **链式调用**:可与LangChain的其他组件(如Text Splitters、Memory)无缝集成。
+
+ ```
+ # from langchain_core.retrievers import BaseRetriever
+ ```
+
+ * 补充知识点:召回率(Recall)信息检索和机器学习中衡量模型找全相关结果能力的核心指标
+
+ * 比如
+ * 在文档检索中,如果有100篇相关文档,系统找出了80篇,那么召回率就是80%。
+ * 召回率高意味着系统漏掉的少,但可能夹杂了不相关的结果,这时候准确率可能低。
+
+* Retriever常见类型之基础检索器 `VectorStoreRetriever`
+
+ * 基础使用参考案例
+
+ ```
+ #将文档嵌入为向量,通过相似度计算(如余弦相似度)检索
+ from langchain_community.vectorstores import FAISS
+ retriever = FAISS.from_documents(docs, embeddings).as_retriever(
+ search_type="mmr", # 最大边际相关性
+ search_kwargs={"k": 5, "filter": {"category": "news"}}
+ )
+ ```
+
+ * `as_retriever()` 方法介绍
+
+ * 将向量库实例转换为检索器对象,实现与 LangChain 链式调用(如 `RetrievalQA`)的无缝对接。
+
+ * 源码
+
+ ```
+ def as_retriever(self, **kwargs: Any) -> VectorStoreRetriever:
+ tags = kwargs.pop("tags", None) or [] + self._get_retriever_tags()
+ return VectorStoreRetriever(vectorstore=self, tags=tags, **kwargs)
+ """
+ 向量库实例转换为检索器对象,实现与 LangChain 链式调用
+ """
+ ```
+
+ * 关键参数详解
+
+ * search_type 搜索类型
+
+ | 类型 | 适用场景 | Milvus 对应操作 |
+ | :----------------------------: | :------------: | :-------------------------------: |
+ | `"similarity"` | 基础相似度检索 | `search()` |
+ | `"mmr"` | 多样性结果优化 | `max_marginal_relevance_search()` |
+ | `"similarity_score_threshold"` | 阈值过滤检索 | `search()` + `score_threshold` |
+
+ ```
+ # MMR 检索配置示例
+ mmr_retriever = vector_store.as_retriever(
+ search_type="mmr",
+ search_kwargs={
+ "k": 3,
+ "fetch_k": 20,
+ "lambda_mult": 0.5
+ }
+ )
+ ```
+
+ * search_kwargs 参数详解
+
+ | 参数 | 类型 | 默认值 | 说明 |
+ | :-------------: | :------: | :----: | :-----------------: |
+ | `k` | int | 4 | 返回结果数量 |
+ | `filter`/`expr` | str/dict | None | 元数据过滤条件 |
+ | `param` | dict | None | Milvus 搜索参数配置 |
+
+* 综合案例实战
+
+ * 默认是similarity search
+
+ ````
+ from langchain_community.embeddings import DashScopeEmbeddings
+ from langchain_milvus import Milvus
+ from langchain_core.documents import Document
+
+ # 初始化模型
+ embeddings = DashScopeEmbeddings(
+ model="text-embedding-v2", # 第二代通用模型
+ max_retries=3,
+ dashscope_api_key="sk-005c3c25f6d042848b29d75f2f020f08"
+ )
+
+ document_1 = Document(
+ page_content="LangChain支持多种数据库集成,小滴课堂的AI大课",
+ metadata={"source": "xdclass.net/doc1"},
+ )
+ document_2 = Document(
+ page_content="Milvus擅长处理向量搜索,老王的课程不错",
+ metadata={"source": "xdclass.net/doc2"},
+ )
+ document_3 = Document(
+ page_content="我要去学小滴课堂的架构大课",
+ metadata={"source": "xdclass.net/doc3"},
+ )
+ document_4 = Document(
+ page_content="今天天气不错,老王和老帆去按摩了",
+ metadata={"source": "xdclass.net/doc4"},
+ )
+ documents = [document_1,document_2,document_3,document_4]
+ vector_store = Milvus.from_documents(
+ documents=documents,
+ embedding=embeddings,
+ collection_name="retriever_test1",
+ connection_args={"uri": "http://47.119.128.20:19530"}
+ )
+
+ #默认是 similarity search
+ retriever = vector_store.as_retriever(search_kwargs={"k": 2})
+
+ results = retriever.invoke("如何进行数据库操作")
+
+ for result in results:
+ print(f"内容 {result.page_content} 元数据 {result.metadata}")
+
+ ````
+
+ * 可以调整为MMR检索
+
+ ```
+ retriever = vector_store.as_retriever(search_type="mmr",search_kwargs={"k": 2})
+ ```
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+#### 第2集 大厂面试题-如何提升大模型召回率和实战
+
+**简介: 大厂面试题-如何提升大模型召回率和案例实战**
+
+* **LLM大模型开发高频面试题:如何提升大模型召回率和准确率?**
+
+
+
+* 需求背景
+
+ * 当原始查询不够明确时,或者当文档库中的内容使用不同的术语表达同一概念时
+ * 单一查询可能无法有效检案到所有相关内容;
+ * 或者用户的问题可能有不同的表达方式,导致的检索结果不理想,
+ * 需要从多个角度切入才能找到最相关的文档片段。这种情况下,生成多个变体查询可以提高召回率,确保覆盖更多相关文档。
+
+* `MultiQueryRetriever`
+
+ * 通过生成多个相关查询来增强检索效果,解决单一查询可能不够全面或存在歧义的问题。
+
+ * 原理:
+
+ * **查询扩展技术**:通过LLM生成N个相关查询(如改写、扩展、翻译),合并结果去重,生成多个变体查询
+ * **双重增强效果**:提升召回率(+25%↑)和准确率(+18%↑)的平衡
+
+ 
+
+ * 用法
+
+ ```
+ retriever = MultiQueryRetriever.from_llm(
+ retriever=base_retriever,
+ llm=ChatOpenAI()
+ )
+ ```
+
+ * **典型问题场景**
+
+ - **术语差异问题**:用户提问使用"SSL证书" vs 文档中使用"TLS证书"
+ - **表述模糊问题**:"怎么备份数据库" vs "数据库容灾方案实施步骤"
+ - **多语言混合**:中英文混杂查询(常见于技术文档检索)
+ - **专业领域知识**:医学问诊中的症状不同描述方式
+
+* 案例实战
+
+ ```
+ from langchain_community.embeddings import DashScopeEmbeddings
+ #from langchain.vectorstores import Milvus
+ from langchain_milvus import Milvus
+ from langchain_openai import ChatOpenAI
+
+ from langchain_community.document_loaders import TextLoader
+ from langchain.retrievers.multi_query import MultiQueryRetriever
+ from langchain_text_splitters import RecursiveCharacterTextSplitter
+
+ import logging
+
+ # 设置日志记录的基本配置
+ logging.basicConfig()
+ # 设置多查询检索器的日志记录级别为INFO
+ logging.getLogger("langchain.retrievers.multi_query").setLevel(logging.INFO)
+
+ # 使用TextLoader加载文本数据
+ loader = TextLoader("data/qa.txt")
+ # 加载数据到变量中
+ data = loader.load()
+
+ # 初始化文本分割器,将文本分割成小块
+ text_splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=10)
+ # 执行文本分割
+ splits = text_splitter.split_documents(data)
+
+ # 初始化模型
+ embedding = DashScopeEmbeddings(
+ model="text-embedding-v2", # 第二代通用模型
+ max_retries=3,
+ dashscope_api_key="sk-005c3c25f6d042848b29d75f2f020f08"
+ )
+
+ # 初始化向量数据库
+ vector_store = Milvus.from_documents(
+ documents=splits,
+ embedding=embedding,
+ collection_name="mulit_retriever2",
+ connection_args={"uri": "http://47.119.128.20:19530"}
+ )
+
+ # 定义问题
+ question = "老王不知道为啥抽筋了"
+ # 初始化语言模型
+ llm = ChatOpenAI(
+ model_name = "qwen-plus",
+ base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
+ api_key="sk-005c3c25f6d042848b29d75f2f020f08",
+ temperature=0.7
+ )
+
+ # 从语言模型中创建多查询检索器
+ retriever_from_llm = MultiQueryRetriever.from_llm(
+ retriever=vector_store.as_retriever(), llm=llm
+ )
+
+ # 使用检索器执行问题检索
+ results = retriever_from_llm.invoke(question)
+ # 打印检索到的结果数量
+ len(results)
+
+ # 遍历并打印每个检索结果的内容和元数据
+ for result in results:
+ print(f"内容 {result.page_content} 元数据 {result.metadata}")
+
+
+ # 以下是未使用的代码片段,已将其注释掉
+ # vector_store = Milvus(
+ # embeddings,
+ # connection_args={"uri": "http://47.119.128.20:19530"},
+ # collection_name="mmr_test",
+ # )
+ # print(vector_store)
+ ```
+
+
+
+
+
+
+
+
+
+
+
+
+
+#### 第3集 RAG综合项目实战-AI文档问答助手《上》
+
+**简介: RAG综合项目实战-AI文档问答助手《上》**
+
+* **需求:在线文档的问答助手,方便查找相关手册和接口API**
+
+ * 主要功能包括
+ * 文档加载与切分、向量嵌入生成、向量存储与检索。
+ * 基于检索增强生成(Retrieval-Augmented Generation, RAG)的问答。
+ * 技术选型:LangChain框架+Milvus向量数据库
+
+* **实现的功能**
+
+ * 文档加载与切分
+ * 使用`WebBaseLoader`从指定URL加载文档。
+ * 使用`RecursiveCharacterTextSplitter`将加载的文档按照指定的块大小和重叠大小进行切分。
+
+ * 向量嵌入生成
+ - 使用`DashScopeEmbeddings`生成文档切片的向量嵌入,模型为`text-embedding-v2`,支持最大重试次数为3次。
+
+
+ * 向量存储与检索
+
+ - 使用`Milvus`作为向量数据库,创建名为`doc_qa_db`的Collection。
+
+ - 将生成的向量嵌入存储到Milvus中,并支持相似性检索。
+
+
+ * 基于RAG的问答
+
+ - 初始化`ChatOpenAI`模型,使用`qwen-plus`作为LLM模型。
+
+ - 定义`PromptTemplate`,用于构建输入给LLM的提示信息。
+
+ - 构建RAG链,结合相似性检索结果和LLM生成回答。
+
+
+* 编码实战
+
+ ```
+ from langchain_community.document_loaders import WebBaseLoader
+ from langchain.text_splitter import RecursiveCharacterTextSplitter
+ from langchain_milvus import Milvus
+
+ from langchain.schema.runnable import RunnablePassthrough
+ from langchain.prompts import PromptTemplate
+ from langchain_community.embeddings import DashScopeEmbeddings
+ from langchain_openai import ChatOpenAI
+
+ # 设置Milvus Collection名称。
+ COLLECTION_NAME = 'doc_qa_db'
+
+ # 初始化WebBaseLoader加载指定URL的文档。
+ loader = WebBaseLoader([
+ 'https://milvus.io/docs/overview.md',
+ 'https://milvus.io/docs/release_notes.md'
+ ])
+
+ # 加载文档。
+ docs = loader.load()
+
+ # 初始化RecursiveCharacterTextSplitter,用于切分文档。
+ text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=0)
+
+ # 使用LangChain将输入文档按照chunk_size切分。
+ all_splits = text_splitter.split_documents(docs)
+
+ # 初始化DashScopeEmbeddings,设置embedding模型为DashScope的text-embedding-v2。
+ embeddings = DashScopeEmbeddings(
+ model="text-embedding-v2", # 第二代通用模型
+ max_retries=3,
+ dashscope_api_key="sk-005c3c25f6d042848b29d75f2f020f08"
+ )
+
+ # 创建connection,为阿里云Milvus的访问域名。
+ connection_args = {"uri": "http://47.119.128.20:19530"}
+ # 创建Collection。
+ vector_store = Milvus(
+ embedding_function=embeddings,
+ connection_args=connection_args,
+ collection_name=COLLECTION_NAME,
+ drop_old=True,
+ ).from_documents(
+ all_splits,
+ embedding=embeddings,
+ collection_name=COLLECTION_NAME,
+ connection_args=connection_args,
+ )
+
+ # vector_store = Milvus(
+ # embeddings,
+ # connection_args={"uri": "http://47.119.128.20:19530"},
+ # collection_name=COLLECTION_NAME,
+ # )
+
+ # 利用Milvus向量数据库进行相似性检索。
+ query = "What are the main components of Milvus?"
+ docs = vector_store.similarity_search(query)
+ print(len(docs))
+
+ # 初始化ChatOpenAI模型。
+ llm = ChatOpenAI(
+ model_name = "qwen-plus",
+ base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
+ api_key="sk-005c3c25f6d042848b29d75f2f020f08",
+ temperature=0.7
+ )
+
+ # 将上述相似性检索的结果作为retriever,提出问题输入到LLM之后,获取检索增强之后的回答。
+ retriever = vector_store.as_retriever()
+ ```
+
+
+
+
+
+
+
+
+
+
+
+#### 第4集 RAG综合项目实战-AI文档问答助手《下》
+
+**简介: RAG综合项目实战-AI文档问答助手《下》**
+
+* 编码测试实战
+
+ ```
+ # 定义PromptTemplate,用于构建输入给LLM的prompt。
+ template = """你是AI文档助手,使用以下上下文来回答最后的问题。
+ 如果你不知道答案,就说你不知道,不要试图编造答案。
+ 最多使用10句话,并尽可能简洁地回答。总是在答案的末尾说“谢谢你的提问!”.
+ {context}
+ 问题: {question}
+ """
+
+ rag_prompt = PromptTemplate.from_template(template)
+
+ # 构建Retrieval-Augmented Generation链。
+ rag_chain = (
+ {"context": retriever, "question": RunnablePassthrough()}
+ | rag_prompt
+ | llm
+ )
+
+ # 调用rag_chain回答问题。
+ print(rag_chain.invoke("什么是Milvus."))
+ ```
+
+* 应用场景
+
+ * 教育领域:可用于备课笔记、课程内容总结等场景。
+ * 企业知识库:帮助企业快速构建基于内部文档的知识问答系统。
+ * 技术支持:提供技术文档的智能检索与问答服务。
+
+* 扩展方向
+
+ * 支持更多类型的文档加载器(如PDF、Word等)。
+ * 增加多语言支持。
+ * 优化向量嵌入生成与检索效率
+
+* 大家的疑惑点(下一章讲解)
+
+ ```
+ # 构建Retrieval-Augmented Generation链。
+ rag_chain = (
+ {"context": retriever, "question": RunnablePassthrough()}
+ | rag_prompt
+ | llm
+ )
+ ```
+
+ * 在 `rag_chain` 的定义中,`{"context": retriever, "question": RunnablePassthrough()}` 创建了一个输入字典。
+ * `context` 的值来自 `retriever`,它将使用向量存储检索与问题相关的文档片段。
+ * `question`键的值通过 `RunnablePassthrough()` 直接传递,用户输入的问题会被透传到后续的处理步骤。
+ * 输入字典随后会被传递给 `rag_prompt`,构建最终的提示(prompt)被传递给 `llm`(语言模型),生成最终的回答
+ * 总结:
+ * 用户输入的问题会同时传给`retriever`和`RunnablePassthrough()`
+ * `retriever`完成检索后,会自动把结果赋值给`context`。
+ * 检索结果`context`和用户输入`question`一并传给提示模板`prompt_template`。
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ **愿景:"IT路上的持续充电平台,让技术不再难学"**
+**更多高级课程请访问 xdclass.net**
+
+### 第四十三章 Runnable深度解析和多实现类案例实战
+
+
+
+#### 第1集 LangChain核心之Runnable接口底层实现
+
+**简介: LangChain核心之Runnable接口底层实现**
+
+* 什么是`Runnable`接口
+
+ * 是LangChain框架中所有组件的核心抽象接口,用于封装可执行的逻辑单元(如模型调用、数据处理、API集成等)。
+
+ * 通过实现统一的`invoke`、`batch`、`stream`等方法,支持模块化构建链式任务,允许开发者以声明式编程LCEL串联不同组件
+
+ ```
+ from langchain_core.runnables
+ ```
+
+ * 为什么要使用`Runnable`
+
+ - **统一接口**:所有组件(如Prompt模板、模型、解析器)均实现Runnable接口,确保类型兼容和链式调用的无缝衔接
+ - **灵活组合**:通过管道符`|`将多个Runnable串联成链,简化复杂逻辑的编排,类似数据流处理
+ - **动态配置**:支持运行时参数绑定、组件替换和错误恢复机制(如`with_retry()`),提升系统灵活性和鲁棒性
+ - **异步与性能优化**:内置异步方法(如`ainvoke`)和并行处理(如`RunnableParallel`),适用于高并发场景
+
+* 什么是`RunnableSequence`
+
+ * 是LangChain中用于构建**顺序执行链**的核心组件,通过管道符`|`将多个Runnable串联,形成线性执行流程,是`Runnable`子类
+
+ ```
+ from langchain_core.runnables import RunnableSequence
+ ```
+
+ * 执行 LCEL链调用的方法(invoke/stream/batch),链中的每个组件也调用对应的方法,将输出作为下一个组件的输入
+
+ ```
+ #RunnableSequence.invoke 的源码解读
+
+ def invoke(
+ self, input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any
+ ) -> Output:
+ # invoke all steps in sequence
+ try:
+ for i, step in enumerate(self.steps):
+ # mark each step as a child run
+ config = patch_config(
+ config, callbacks=run_manager.get_child(f"seq:step:{i + 1}")
+ )
+ with set_config_context(config) as context:
+ if i == 0:
+ input = context.run(step.invoke, input, config, **kwargs)
+ else:
+ input = context.run(step.invoke, input, config)
+ ```
+
+* 从LECL表达式开始理解
+
+ ```
+ chain = prompt | model | output_parser # 通过|直接连接
+ ```
+
+ * **数据流传递**
+
+ * 每个Runnable的输出作为下一个Runnable的输入,形成单向数据流。
+ * 例如,若链为`A | B | C`,则执行流程为`A的输出 → B的输入 → B的输出 → C的输入`
+
+ * **统一接口**:
+
+ * 所有组件(如Prompt模板、模型、输出解析器)均实现`Runnable`接口,确保类型兼容性和链式调用的无缝衔接
+
+ * **延迟执行**:
+
+ * 链的构建仅定义逻辑关系,实际执行在调用`invoke`或`stream`时触发,支持动态参数绑定和运行时配置
+
+ * **底层实现**:
+
+ * 管道符`|`在Python中被重写为`__or__`方法,实际调用`RunnableSequence`构造函数,
+
+ * 将多个Runnable存入内部列表`steps`中, 执行时按顺序遍历列表并调用每个Runnable的`invoke`方法
+
+
+
+* Runnable接口定义了以下核心方法,支持多种执行模式
+
+ ```
+ class Runnable(Generic[Input, Output]):
+ #处理单个输入,返回输出。
+ def invoke(self, input: Input) -> Output: ...
+ #异步处理单个输入。
+ async def ainvoke(self, input: Input) -> Output: ...
+ #逐块生成输出,适用于实时响应。
+ def stream(self, input: Input) -> Iterator[Output]: ...
+ #批量处理输入列表,提升吞吐量。
+ def batch(self, inputs: List[Input]) -> List[Output]: ...
+ ```
+
+ | 方法 | 说明 | 使用场景 |
+ | :---------: | :----------: | :----------: |
+ | `invoke()` | 同步执行 | 单次调用 |
+ | `batch()` | 批量同步执行 | 处理数据集 |
+ | `stream()` | 流式输出 | 实时生成文本 |
+ | `ainvoke()` | 异步执行 | Web服务集成 |
+
+* 具有多个子类实现
+
+ | 组件 | 特点 | 适用场景 |
+ | :-------------------: | :------: | :------------: |
+ | `RunnableSequence` | 顺序执行 | 线性处理流水线 |
+ | `RunnableBranch` | 条件路由 | 分支选择逻辑 |
+ | `RunnableParallel` | 并行执行 | 多任务独立处理 |
+ | `RunnablePassthrough` | 数据透传 | 保留原始输入 |
+
+
+
+
+
+
+
+
+
+#### 第2集 RunnablePassthrough介绍和透传参数实战
+
+**简介: RunnablePassthrough介绍和透传参数实战**
+
+* `RunnablePassthrough`
+
+ * 核心功能:用于在链中直接传递输入数据,不进行任何修改,或通过 `.assign()` 扩展上下文字段
+
+ 
+
+ * 应用场景:
+
+ - 保留原始输入数据供后续步骤使用。
+ - 动态添加新字段到上下文中(如结合检索结果与用户问题)
+
+ * 基础用法
+
+ ```
+ from langchain_core.runnables import RunnablePassthrough
+
+ # 直接传递输入
+ chain = RunnablePassthrough() | model
+ output = chain.invoke("Hello")
+ ```
+
+ * 扩展字段案例
+
+ * 案例一
+
+ ```
+ # 使用 assign() 添加新字段
+ from langchain_core.runnables import RunnablePassthrough
+
+ # 使用 assign() 方法添加新字段,该方法接收一个关键字参数,其值是一个函数
+ # 这个函数定义了如何处理输入数据以生成新字段
+ # 在这个例子中,lambda 函数接收一个输入 x,并返回 x["num"] * 2 的结果
+ # 这将创建一个新的字段 'processed',其值是输入字段 'num' 的两倍
+ chain = RunnablePassthrough.assign(processed=lambda x: x["num"] * 2)
+
+ # 调用 chain 对象的 invoke 方法,传入一个包含 'num' 字段的字典
+ # 这将执行之前定义的 lambda 函数,并在输入字典的基础上添加 'processed' 字段
+ # 最后输出处理后的字典
+ output = chain.invoke({"num": 3}) # 输出 {'num':3, 'processed':6}
+ print(output)
+ ```
+
+ * 案例二(伪代码)
+
+ ```
+ # 构建包含原始问题和处理上下文的链
+ chain = (
+ RunnablePassthrough.assign(
+ context=lambda x: retrieve_documents(x["question"])
+ )
+ | prompt
+ | llm
+ )
+
+ # 输入结构
+ input_data = {"question": "LangChain是什么?"}
+ response = chain.invoke(input_data)
+ ```
+
+
+
+ * 透传参数LLM案例实战
+
+ * 用户输入的问题会同时传给`retriever`和`RunnablePassthrough()`
+ * `retriever`完成检索后,会自动把结果赋值给`context`。
+ * 检索结果`context`和用户输入`question`一并传给提示模板`prompt_template`。
+ * **输出**:模型根据检索到的上下文生成答案
+
+ ```
+ from langchain_community.embeddings import DashScopeEmbeddings
+ from langchain_milvus import Milvus
+ from langchain_core.documents import Document
+ from langchain_core.runnables import RunnablePassthrough
+ from langchain_core.prompts import ChatPromptTemplate
+ from langchain_openai import ChatOpenAI
+
+ # 初始化模型
+ embeddings = DashScopeEmbeddings(
+ model="text-embedding-v2", # 第二代通用模型
+ max_retries=3,
+ dashscope_api_key="sk-005c3c25f6d042848b29d75f2f020f08"
+ )
+ document_1 = Document(
+ page_content="LangChain支持多种数据库集成,小滴课堂的AI大课",
+ metadata={"source": "xdclass.net/doc1"},
+ )
+ document_2 = Document(
+ page_content="Milvus擅长处理向量搜索,老王的课程不错",
+ metadata={"source": "xdclass.net/doc2"},
+ )
+
+ documents = [document_1,document_2]
+ vector_store = Milvus.from_documents(
+ documents=documents,
+ embedding=embeddings,
+ collection_name="runnable_test",
+ connection_args={"uri": "http://47.119.128.20:19530"}
+ )
+
+ #默认是 similarity search
+ retriever = vector_store.as_retriever(search_kwargs={"k": 2})
+
+ prompt = ChatPromptTemplate.from_template("基于上下文回答:{context}\n问题:{question}")
+
+ #定义模型
+ model = ChatOpenAI(
+ model_name = "qwen-plus",
+ base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
+ api_key="sk-005c3c25f6d042848b29d75f2f020f08",
+ temperature=0.7
+ )
+
+ chain = {
+ "context": retriever,
+ "question": RunnablePassthrough() # 直接传递用户问题
+ } | prompt | model
+
+ result = chain.invoke("LangChain支持数据库吗")
+ print(result)
+ ```
+
+
+
+
+
+
+
+#### 第3集 AI智能推荐实战之RunnableParallel并行链
+
+**简介: AI智能推荐实战之RunnableParallel并行链**
+
+* `RunnableParallel` 介绍
+
+ * 并行执行多个 Runnable,合并结果为一个字典,键为子链名称,值为对应输出
+
+ ```
+ class RunnableParallel(Runnable[Input, Dict[str, Any]]):
+ """
+ 并行执行多个Runnable的容器类
+ 输出结果为字典结构:{key1: result1, key2: result2...}
+ """
+ ```
+
+ * 在 LCEL 链上,会将字典隐形转换为`RunnableParallel`
+
+ ```
+ multi_retrieval_chain = (
+ RunnableParallel({
+ "context1": retriever1, #数据源一
+ "context2": retriever2, #数据源二
+ "question": RunnablePassthrough()
+ })
+ | prompt_template
+ | llm
+ | outputParser
+ )
+ ======= 自动化转换为下面,写法一样 ========
+ multi_retrieval_chain = (
+ {
+ "context1": retriever1, #数据源一
+ "context2": retriever2, #数据源二
+ "question": RunnablePassthrough()
+ }
+ | prompt_template
+ | llm
+ | outputParser
+ )
+ ```
+
+ * 特点
+
+ | 特性 | 说明 | 示例 |
+ | :----------: | :--------------------: | :------------------------: |
+ | **并行执行** | 所有子Runnable同时运行 | 3个任务耗时2秒(而非累加) |
+ | **类型安全** | 强制校验输入输出类型 | 自动检测字典字段类型 |
+
+
+
+ * API 与用法, 构造函数所有子链接收相同的输入
+
+ ```
+ from langchain_core.runnables import RunnableParallel
+
+ runnable = RunnableParallel(
+ key1=chain1,
+ key2=chain2
+ )
+ ```
+
+ * 应用场景:
+
+ * **数据并行处理器**:同时处理多个数据流
+
+ * **结构化数据装配器**:构建标准化的输出格式
+
+ * **流水线分叉合并器**:实现Map-Reduce模式中的Map阶段
+
+ * 举例
+
+ * 多维度数据分析
+
+ ```
+ analysis_chain = RunnableParallel({
+ "sentiment": sentiment_analyzer,
+ "keywords": keyword_extractor,
+ "entities": ner_recognizer
+ })
+ ```
+
+ * 多模型对比系统
+
+ ```
+ model_comparison = RunnableParallel({
+ "gpt4": gpt4_chain,
+ "claude": claude_chain,
+ "gemini": gemini_chain
+ })
+ ```
+
+ * 智能文档处理系统
+
+ ```
+ document_analyzer = RunnableParallel({
+ "summary": summary_chain, # 摘要生成
+ "toc": toc_generator, # 目录提取
+ "stats": RunnableLambda(lambda doc: {
+ "char_count": len(doc),
+ "page_count": doc.count("PAGE_BREAK") + 1
+ })
+ })
+ # 处理200页PDF文本
+ analysis_result = document_analyzer.invoke(pdf_text)
+ ```
+
+* 案例实战
+
+ * 场景:并行生成景点与书籍推荐
+
+ ```
+ from langchain_core.runnables import RunnableParallel
+ from langchain_core.prompts import ChatPromptTemplate
+ from langchain_openai import ChatOpenAI
+ from langchain_core.output_parsers import JsonOutputParser
+
+ #定义模型
+ model = ChatOpenAI(
+ model_name = "qwen-plus",
+ base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
+ api_key="sk-005c3c25f6d042848b29d75f2f020f08",
+ temperature=0.7
+ )
+ #构建解析器
+ parser = JsonOutputParser()
+
+ prompt_attractions = ChatPromptTemplate.from_template("""列出{city}的{num}个景点。返回 JSON 格式:
+ {{
+ "num": "编号",
+ "city": "城市",
+ "introduce": "景点介绍",
+ }}
+ """)
+ prompt_books = ChatPromptTemplate.from_template("""列出{city}相关的{num}本书返回 JSON 格式:
+ {{
+ "num": "编号",
+ "city": "城市",
+ "introduce": "书籍介绍",
+ }} """)
+
+ chain1 = prompt_attractions | model | parser
+
+ chain2 = prompt_books | model | parser
+
+ chain = RunnableParallel(
+ attractions = chain1 ,
+ books = chain2
+ )
+
+ output = chain.invoke({"city": "南京", "num": 3})
+
+ print(output)
+ ```
+
+
+
+
+
+
+
+
+
+#### 第4集 RunnableLambda介绍和包装链式函数实战
+
+**简介: RunnableLambda介绍和包装链式函数实战**
+
+* `RunnableLambda`
+
+ * 核心功能
+
+ * 将任意 Python 函数转换为 Runnable,将普通的 Python 函数或可调用对象转换为 `Runnable`对象,无缝集成到链中
+ * 把自己需要的功能通过自定义函数 + RunnableLambda的方式包装,可以跟任何外部系统打通,集成到 LCEL 链
+
+ ```
+ class RunnableLambda(Runnable[Input, Output]):
+ """
+ 将任意Python函数转换为符合Runnable协议的对象
+ 实现自定义逻辑与LangChain生态的无缝集成
+ """
+ ```
+
+ * 与普通函数的区别
+
+ | 特性 | 普通函数 | RunnableLambda |
+ | :------: | :-----------------: | :-------------------: |
+ | 可组合性 | ❌ 无法直接接入Chain | ✅ 支持` |
+ | 类型校验 | ❌ 动态类型 | ✅ 静态类型检查 |
+ | 异步支持 | ❌ 需手动实现 | ✅ 原生支持async/await |
+ | 批量处理 | ❌ 需循环调用 | ✅ 自动批量优化 |
+
+ * 适合场景:
+
+ * 插入自定义逻辑(如日志记录、数据清洗)
+ * 转换数据格式(如 JSON 解析)。
+
+ * API 与用法
+
+ ```
+ from langchain_core.runnables import RunnableLambda
+
+ def log_input(x):
+ print(f"Input: {x}")
+ return x
+
+ chain = prompt | RunnableLambda(log_input) | model
+ ```
+
+* 案例实战
+
+ * 基础文本处理链
+
+ ```
+ from langchain_core.runnables import RunnableLambda
+
+ text_clean_chain = (
+ RunnableLambda(lambda x: x.strip())
+ | RunnableLambda(str.lower)
+ )
+
+ result = text_clean_chain.invoke(" Hello123World ")
+ print(result) # 输出 "helloworld"
+
+ ```
+
+ * 打印中间结果并过滤敏感词(在链中插入自定义处理逻辑)
+
+ ```
+ from langchain_core.runnables import RunnableLambda
+ from langchain_openai import ChatOpenAI
+ def filter_content(text: str) -> str:
+ return text.replace("暴力", "***")
+
+ #定义模型
+ model = ChatOpenAI(
+ model_name = "qwen-plus",
+ base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
+ api_key="sk-005c3c25f6d042848b29d75f2f020f08",
+ temperature=0.7
+ )
+
+ chain = (
+ RunnableLambda(lambda x: x["user_input"])
+ | RunnableLambda(filter_content)
+ | model
+ )
+
+ result = chain.invoke({"user_input": "暴力内容"})
+ print(result) # 输出过滤后的结果
+ ```
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+#### 第5集 智能客服路由实战之RunnableBranch条件分支
+
+**简介: 智能客服路由实战之RunnableBranch条件分支**
+
+* `RunnableBranch`
+
+ * 核心功能:根据条件选择执行不同的子链,类似 if-else 路由
+
+ * API 与用法
+
+ ```
+ from langchain_core.runnables import RunnableBranch
+
+ #条件函数:接收输入,返回布尔值。
+ branch = RunnableBranch(
+ (condition1, chain1),
+ (condition2, chain2),
+ default_chain
+ )
+ """
+ 参数说明:
+ - Condition: 返回bool的可调用对象
+ - Runnable: 条件满足时执行的分支
+ - default: 所有条件不满足时执行的默认分支
+
+ 技术细节:
+ 1. 条件按声明顺序
+ 2. 第一个满足条件的分支会被执行
+ 3. 无默认分支且所有条件不满足时抛出异常
+ """
+ ```
+
+
+
+ * 适合场景:
+
+ * 多任务分类(如区分数学问题与物理问题)
+
+ * 错误处理分支(如主链失败时调用备用链)
+
+ * 多轮对话路由(根据对话历史选择回复策略)
+
+ ```
+ # 根据对话历史选择回复策略
+ branch = RunnableBranch(
+ (lambda x: "投诉" in x["history"], complaint_handler),
+ (lambda x: "咨询" in x["history"], inquiry_handler),
+ default_responder
+ )
+ ```
+
+ * 智能路由系统(根据输入类型路由处理方式)
+
+ ```
+ # 定义分类函数
+ def detect_topic(input_text):
+ if "天气" in input_text:
+ return "weather"
+ elif "新闻" in input_text:
+ return "news"
+ else:
+ return "general"
+
+ # 构建分支链
+ branch_chain = RunnableBranch(
+ (lambda x: detect_topic(x["input"]) == "weather", weather_chain),
+ (lambda x: detect_topic(x["input"]) == "news", news_chain),
+ general_chain
+ )
+
+ # 执行示例
+ branch_chain.invoke({"input": "北京今天天气怎么样?"})
+ ```
+
+
+
+* 案例实战:需要构建一个 **智能客服系统**,根据用户输入的请求类型自动路由到不同的处理流程:
+
+ * **技术问题**:路由到技术支持链。
+
+ * **账单问题**:路由到财务链。
+
+ * **默认问题**:路由到通用问答链。
+
+ * 步骤
+
+ * 导入依赖
+
+ ```
+ from langchain_core.runnables import RunnableBranch, RunnableLambda
+ from langchain_core.prompts import ChatPromptTemplate
+ from langchain_openai import ChatOpenAI
+ from langchain_core.output_parsers import StrOutputParser
+ ```
+
+ * 定义模型
+
+ ```
+ #定义模型
+ model = ChatOpenAI(
+ model_name = "qwen-plus",
+ base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
+ api_key="sk-005c3c25f6d042848b29d75f2f020f08",
+ temperature=0.7
+ )
+ ```
+
+ * 定义子链
+
+ ```
+ # 技术支持链
+ tech_prompt = ChatPromptTemplate.from_template(
+ "你是一名技术支持专家,请回答以下技术问题:{input}"
+ )
+ tech_chain = tech_prompt | model | StrOutputParser()
+
+ # 财务链
+ billing_prompt = ChatPromptTemplate.from_template(
+ "你是一名财务专员,请处理以下账单问题:{input}"
+ )
+ billing_chain = billing_prompt | model | StrOutputParser()
+
+ # 默认通用链
+ default_prompt = ChatPromptTemplate.from_template(
+ "你是一名客服,请回答以下问题:{input}"
+ )
+ default_chain = default_prompt | model | StrOutputParser()
+ ```
+
+ * 定义路由条件函数
+
+ ```
+ def is_tech_question(input: dict) -> bool:
+ # 获取 "input" 键对应的值
+ input_value = input.get("input", "")
+ # 检查是否包含关键词
+ return "技术" in input_value or "故障" in input_value
+
+ def is_billing_question(input: dict) -> bool:
+ # 获取 "input" 键对应的值
+ input_value = input.get("input", "")
+ # 检查是否包含关键词
+ return "账单" in input_value or "支付" in input_value
+ ```
+
+ * 构建 RunnableBranch
+
+ ```
+ branch = RunnableBranch(
+ (is_tech_question, tech_chain), # 技术问题 → tech_chain
+ (is_billing_question, billing_chain), # 账单问题 → billing_chain
+ default_chain # 默认问题 → default_chain
+ )
+
+ full_chain = RunnableLambda(lambda x: {"input": x}) | branch
+ ```
+
+ * 测试案例
+
+ ```
+ # 测试技术问题
+ tech_response = full_chain.invoke("我的账号登录失败,提示技术故障")
+ print("技术问题响应:", tech_response)
+
+ # 测试账单问题
+ billing_response = full_chain.invoke("我的账单金额有误,请核对")
+ print("账单问题响应:", billing_response)
+
+ # 测试默认问题
+ default_response = full_chain.invoke("你们公司的地址在哪里?")
+ print("默认问题响应:", default_response)
+
+ #输出示例
+ #技术问题响应: 建议您尝试清除浏览器缓存或重置密码。若问题持续,请联系我们的技术支持团队。
+ #账单问题响应: 已记录您的账单问题,财务部门将在24小时内与您联系核实。
+ #默认问题响应: 我们的公司地址是北京市海淀区中关村大街1号。
+ ```
+
+ * 关键原理解析
+
+ * **条件路由逻辑**
+ * `RunnableBranch` 接收一个由 `(条件函数, Runnable)` 组成的列表。
+ * 按顺序检查条件,第一个满足条件的分支会被执行,若均不满足则执行默认分支
+
+ * **输入处理**:
+ * 输入需为字典格式(如 `{"input": "问题内容"}`),通过 `RunnableLambda` 包装原始输入为字典
+
+ * **链式组合**:
+ * 每个分支链(如 `tech_chain`)独立处理输入,输出结果直接返回给调用方
+
+ * **调试技巧**:
+
+ - 添加日志中间件(通过 `RunnableLambda`)记录路由决策过程
+
+ ```
+ def log_decision(input_data):
+ print(f"路由检查输入:{input_data}")
+ return input_data
+
+ log_chain_branch = RunnableLambda(log_decision) | branch
+
+ full_chain = RunnableLambda(lambda x: {"input": x}) | log_chain_branch
+ ```
+* 总结与最佳实践
+ * **组合使用**:通过 `|` 串联或嵌套 `Runnable` 类,构建复杂逻辑。
+ * **性能优化**:利用 `RunnableParallel` 减少 IO 等待时间。
+ * **调试技巧**:使用 `RunnableLambda` 插入日志或数据检查点。
+ * **容错设计**:结合 `RunnableBranch` 和 提升健壮性