老胡茶室
老胡茶室

无废话 RAG:实践篇

胡键

简单复习

理论篇中,我们已经了解:

  • embedding 和向量数据库相关概念
  • RAG 的两个阶段:indexing 和 querying
  • 每个阶段典型的组件:
    • indexing 阶段:data loader 、data spliter、embedding model 和 vector db
    • querying 阶段:llm、embedding model、vector db、 retriever 和 reranker(注:如果一篇 RAG 文章不介绍什么是 reranker,大可忽略不读。)
  • langchain.js 基础:
    • 用于 RAG 实现的相关组件
    • 与 pgvector 配合时,相关的数据库表结构,以及如何实现增量更新向量数据。

下面,本文将重点介绍实现 RAG 时需要注意的细节和实践经验。

注:本文假设你已经具备一定的开发经验和了解 RAG 的基本知识,基于此,本文从一开始就没有打算写一篇手把手的扫盲教程,而是将重点放在:实现一个面向真实用户的 RAG 系统时需要注意的细节和实践经验。

当然,你会在文中看到核心数据库表设计、关键函数代码示例,以及诸如 CAG和权限这类典型的高级场景,以便更好地理解这些细节和经验。

通用实现

要实现前文中介绍的通用 RAG 应用架构,一般需要以下功能:

  • 数据源管理
  • Indexing Pipeline
  • Querying
  • Setting

高级一点的,可能会提一提 eval,但这又是一大主题,再说容易跑题,故而在此略过不提,我将另行撰文说明。

有兴趣的可自行先用 deep research 去研究以建立整体概念,若想了解工具,则可以看看 langchain 自己的解决方案:langsmith,或前往老胡茶室的用物志了解其他类似工具。

下面的讨论依旧采用 langchain.js + pgvector 技术选型。

数据库设计

对于 langchain.js + pgvector 组合,实现 RAG 的核心数据库表就两个:

  • documents:存储向量数据和原始文档及其元数据
  • upsertion_records:类似一般 db migration 实现中记录数据库 ddl 变更的表,用于记录原始文档是否改变,实现增量更新。

理论篇已经详细介绍过两张表的结构,在此不再赘述。只提醒一点:除非你明确指明,langchain.js 并不会启用增量更新!

同时,你也无需在项目代码中维护这两张表的定义,因为它们会在第一次使用时自动创建。

然而,光有这两张表并不足以支撑一个实际的应用。根据我们的经验,建议在此基础上,增加以下表:

  • knowledges,存储 RAG 应用的原始文档信息
  • tasks,存储异步任务执行状态
  • settings,存储应用的配置
  • sessions,存储用户会话信息

其中:

  • 强烈建议使用 knowledgestasks
  • 如果不打算给用户太多自定义的选择,如切换 model 或自定义 prompt,则 settings 可以不使用。但我个人建议保留,至少允许高级用户可以自行定义 prompt。
  • 如果不打算把 RAG 做成 chatbot 形式,则 sessions 可以不使用。并且,即使是 chatbot 形式,你还有一个选择就是将会话信息存储在用户本地浏览器中。

并且,关于会话历史,这里再啰嗦一句:虽然 langchain.js 提供了 checkpointer,但在看完其表结构和代码后,个人不太喜欢其目前的设计。同时,实现符合自己口味的会话历史功能也并不费事。

因此,我的结论是:虽然直接用 checkpointer 可以获得 langchain.js 一些内置好处,但好处有限,故而对于当前版本我不推荐它。各位在实现时可自行决定。

当然,还有一种选择就是,根据它的接口完成自己的实现,将两者适配起来。

关于这些表的设计细节,我会在后续章节介绍。

至于其他常用表(如用户权限)和业务表,这基本上都是 case by case,各位可结合具体情形自行设计。

数据源管理

好的 RAG 应用需要有好的支撑其 querying 阶段的知识库(即 documents 表)。但知识库并不能凭空产生,同时它也需要推陈出新、与时俱进。这便是数据源管理功能存在的原因。

在设计该功能前,首先需要明确你的 RAG 应用的目的是什么,基于此来决定需要支持哪些数据源。

关于数据源:

  • 按位置分:本地和远程
  • 按类型分:
    • 文档:如 PDF、Word、Markdown 等
    • 网页:如 HTML、RSS 等
    • 目录:如文件夹、Git 仓库等
    • 数据库:如 SQL、NoSQL 等
    • API:如 RESTful API、GraphQL 等
  • 按内容分:结构化和非结构化
  • 按更新频率分:静态和动态

此外,跟设计服务于传统应用的数据库一样,你还需要了解你的 RAG 应用的典型查询场景。比如:

  • 权限,是否某些数据源仅对某些用户开放?
  • 类别,用户使用时是否会提到诸如“我想了解关于 X 的知识”的查询?
  • 地点,用户是否会提到诸如“给我介绍一下 X 地区的情况”的查询?

诸如此类的场景,都导向一个对于 RAG 新手来说可能有些陌生的概念:数据源的元数据。这便是在 documents 表中你会发现有 metadata 字段的原因。

此外,单独设计一张记录数据源的表(即 knowledges),还可以帮你更好地以整个数据源为单位进行管理,因为 documents 相当于记录的是数据源的 chunk。这两张表之间是一对多的关系。

但是,为了让你的代码与 langchain.js 的代码保持松耦合,不要直接去在 documents 中加一个外键指向 knowledges:利用 metadata 字段来存储数据源 ID 即可。

典型的 knowledges 表结构如下(drizzle orm 代码):

export const knowledge = pgTable("knowledge", {
  id: bigserial({ mode: "bigint" }).primaryKey(),
  uuid: uuid().defaultRandom().notNull().unique(),
  uri: varchar("uri").notNull(),
  // uri + metadata => metadata in documents
  metadata: jsonb("metadata").notNull().$type<Record<string, unknown>>(),
  processed: boolean().default(false).notNull(),
  createdAt: timestamp("created_at", { withTimezone: true })
    .defaultNow()
    .notNull(),
});

代码不言自明,只说明重要字段:

  • uri:标名数据源的位置,它可以是远程 url、也可以是本地文件上传之后的远程存储路径等。
  • metadata:保存附加的元数据,该字段在 indexing 阶段会被用来合并到 documents 表中的 metadata 字段。
  • processed:标记该数据源是否已经被处理过,用于 indexing 阶段的任务处理。

看到这里,有经验的读者应该已经脑补最基本的数据源管理功能了:

  • knowledges CRUD
  • 在其编辑页面,用户至少:
    • 可以上传本地文件或输入远程 url
    • 可以添加或编辑元数据

对于本地文件上传功能,我推荐 flydrive

It provides a unified API to interact with the local file system and cloud storage solutions like S3, R2, and GCS.

接下来,让我们讨论一下 indexing pipeline 的设计。

Indexing Pipeline

Indexing Pipeline 负责将数据源转换为向量数据并存储到 documents 表中。它通常包括以下几个步骤:

  1. 数据加载:从数据源中加载原始数据。
  2. 数据分割:将原始数据分割成适合向量化的 chunk。
  3. 向量化:使用 embedding model 将 chunk 转换为向量。
  4. 存储:将向量数据和原始数据存储到 documents 表中。

如何触发 Pipeline 执行?

首先,该何时执行这个 pipeline 呢?无外乎两种:

  • 同步方式:用户在数据源 CRUD 操作完成后,立即启动针对该数据源的 indexing。
  • 异步方式:用户在数据源 CRUD 操作完成后,创建一个异步任务,稍后再执行 indexing。

我建议使用异步方式,原因:可以使得 indexing 更可控,避免资源挤兑。因为 indexing 是一个耗时操作,假如多个用户同时更新不同的数据源,采用同步方式的话会瞬间启动多个 indexing 任务,严重消耗资源。

前面提到的 tasks 表便是为这些异步任务而设计的,对应的 drizzle orm 代码如下:

export const TaskStatus = {
  PENDING: 0,
  EXECUTED: 1,
  ERROR: 2,
};

export const TaskType = {
  createDocument: 0,
  refreshDocument: 1,
  removeDocument: 2,
};

export const tasks = pgTable(
  "tasks",
  {
    id: serial("id").primaryKey(),
    uuid: uuid().defaultRandom().notNull().unique(),
    type: integer("task_type").notNull(), // task type
    taskParams: jsonb("task_params").notNull(),
    errorMsg: varchar("error_msg"), // if error, update it
    createdAt: timestamp("created_at", { withTimezone: true })
      .defaultNow()
      .notNull(),
    status: integer("status").notNull().default(TaskStatus.PENDING),
    executedAt: timestamp("executed_at", { withTimezone: true }),
  },
  (table) => [index("idx_tasks_status").on(table.status)],
);

注意以下几个字段:

  • taskParams:存储任务参数,如数据源 ID。
  • errorMsg:存储任务执行错误信息,方便排查。

你可以将 tasksknowledges 的更新放在一个事务中,以确保数据一致性。

剩下的就简单了,你可以创建一个定时任务,定期检查 tasks 表中状态为 PENDING 的任务,并执行相应的任务。在每个任务中,你可以加载数据源,分割数据,向量化,并将结果存储到 documents 表中,同时将之前定义的 metadata 合并到 documentsmetadata 字段中。

从数据源到向量数据

关于这部分的基础知识,请参考理论篇。这里仅列出 indexing 的关键框架代码:

export async function index(
  documents: Document[],  // 可通过 langchain.js 的 Document Loader 加载
  myMetadata?: Record<string, unknown>, // 自行根据需求定义
) {
  const { EMBEDDING_CHUNK_SIZE, EMBEDDING_CHUNK_OVERLAP, DATABASE_URL } = env();

  try {
    if (!documents || documents.length === 0) {
      throw new Error("No documents provided for indexing");
    }

    const processedDocuments = documents.map((doc) => ({
      ...doc,
      metadata: {
        ...doc.metadata,
        ...myMetadata,
      },
    }));

    // 最常用,在不确定的情况下,先用它尝试,看效果再决定是否需要更复杂的分割逻辑
    // 详见:理论篇
    const splitter = new RecursiveCharacterTextSplitter({
      chunkSize: EMBEDDING_CHUNK_SIZE,
      chunkOverlap: EMBEDDING_CHUNK_OVERLAP,
    });

    const splitDocuments = await splitter.splitDocuments(processedDocuments);

    // Ensure we have documents after splitting
    if (!splitDocuments || splitDocuments.length === 0) {
      throw new Error("No documents remaining after splitting");
    }

    // 用于跟踪数据源的 namespace,启用增量更新时需要
    const namespace = splitDocuments[0].metadata?.source ?? "default-namespace";
    const recordManager = new PostgresRecordManager(namespace, {
      postgresConnectionOptions: {
        connectionString: DATABASE_URL,
      },
    });
    await recordManager.createSchema();
    // 注意确保 embedding model 支持多语言
    const embeddings = await getEmbeddings();
    const vectorStore = await PGVectorStore.initialize(embeddings, {
      postgresConnectionOptions: {
        connectionString: DATABASE_URL,
      },
      tableName: "documents",
      columns: {
        contentColumnName: "content",
      },
    });
    // import { index as langchainIndex } from "langchain/indexes";
    const result = await langchainIndex({
      docsSource: splitDocuments,
      recordManager,
      vectorStore,
      options: {
        cleanup: "full",
        sourceIdKey: "source",
      },
    });
    await Promise.all([recordManager.end(), vectorStore.end()]);
    return result;
  } catch (e: unknown) {
    throw new Error(`index failed: ${e}`, { cause: e });
  }
}

到此,indexing 阶段的核心设计和实现就完成了。

Querying

相比于 indexing pipeline,Querying 最多的时候会需要调用三个 llm 相关的 api:

  • embedding model:用于将用户查询转换为向量。
  • retriever:用于从向量数据库中检索相关的文档。
  • reranker:用于对检索到的文档进行排序。
    • 注:llm provider 一般不会提供免费的 reranker,即使是量大管饱的 Gemini,该服务也不在免费的套餐内。

理论篇曾提到两种典型的 RAG 实现方式并推荐直接使用:Agentic RAG + Hybrid Search + Reranking。整体框架代码如下:

export async function query(
  messages: BaseMessage[], // 历史消息,最后一条是用户提问
  filter: Record<string, unknown> = {}, // 过滤条件,用于 metadata 过滤
  signal?: AbortSignal, // 方便用户在 ui 终止
  prompt?: string, // 可选的自定义系统提示
  temperature?: number,
): Promise<IterableReadableStream<StreamEvent>> {
  const chatModel = newChatModel({
    temperature: temperature ?? DEFAULT_TEMPERATURE, // 默认值推荐:0.1,让 llm 不要瞎想
  });
  const systemPrompt = new SystemMessage(prompt ?? DEFAULT_SYSTEM_PROMPT);

  // assume the last message is the user's question
  const question = messages[messages.length - 1].content as string;
  // 生成 retriever 供 agent 使用
  const retrieve = getRetrieveTool(question, filter);

  const agent = createReactAgent({
    llm: chatModel,
    tools: [retrieve],
    prompt: systemPrompt,
  });

  // 确保 context 不要过长
  const trimmer = trimMessages({
    maxTokens: 5,
    strategy: "last",
    tokenCounter: (msgs) => msgs.length,
    includeSystem: true,
    allowPartial: false,
    startOn: "human",
  });

  return agent.streamEvents(
    { messages: await trimmer.invoke(messages) },
    { version: "v2", signal },
  );
}

在这里,我们用到了 langgraph 缺省的 ReactAgent,然后基于用户的问题创建一个 Retriever 工具,供 agent 使用。该工具的框架代码如下:

export function getRetrieveTool(question: string, filter?: DocFilter) {
  const maxDocuments = 10;
  return tool(
    async ({ query }) => {
      // 1. 将用户问题转换为向量
      // 2. hybrid search:先用向量数据库检索相关文档,再用 PG 的 full text search 功能搜索文档
      const  retrievedDocs = await matchDocuments(query, filter);
      if (retrievedDocs.length > maxDocuments) {
        // 3. 如果有 reranker,则对检索到的文档进行排序
        // 注:请自行初始化 reranker
        if (reranker) {
          console.log(
            new Date(),
            `Got ${retrievedDocs.length} documents, reranking by query: '${question}'...`,
          );
          retrievedDocs = await reranker.compressDocuments(
            retrievedDocs,
            question,
          );
          console.log(
            new Date(),
            `After reranking, got ${retrievedDocs.length} documents.`,
            retrievedDocs,
          );
        } else {
          retrievedDocs = retrievedDocs.slice(0, maxDocuments);
        }
      }
    
      const serialized =
        retrievedDocs.length > 0
          ? retrievedDocs
              .map(
                (doc) =>
                  `Source: ${doc.metadata.source}\nContent: ${doc.pageContent}`,
              )
              .join("\n")
          : "No relevant knowledge found.";
      return [serialized, retrievedDocs];
    },
    {
      name: "retrieve",
      description:
        "This is a knowledge retrieval tool. Before answering any questions, you must use this tool to search for relevant information. Even if the question seems simple, please search first to ensure accuracy. Input the user's question or keywords to retrieve relevant knowledge. But this tool may return an empty result if no knowledge is found.",
      schema: retrieveSchema,
      responseFormat: "content_and_artifact",
    },
  );
}

在这里,我仍然要重复强调一下在理论篇中提到的:不要使用 langchain 缺省的 retriever 实现,直接上手 sql ,这样会给你带来更大的灵活性。结合 drizzle 的 magic sql 可以让你实现更方便。

至于 reranker,则可以通过实现自己的 BaseDocumentCompressor 来完成,在此略过。

Setting

Setting 不算是 RAG 的核心功能,对于简单的 RAG 应用,你甚至可以完全忽略。但我个人建议保留该功能,或者将其与应用本身的 Setting 功能合并。这样将给高级用户带来更大的自由度。

典型的 Setting 项包括:

  • models:llm、embedding model、reranker 等
  • prompt:系统提示、用户提示等
  • temperature:控制 llm 的输出随机性

这里面,建议至少保留后两者。如果不想让每个用户自行设置,那么至少让管理员可以设置。

关于它的表结构,略。

Session

前面说过,若非你打算做成 chatbot 形式,Session 功能可以不使用。如果要用,建议抛弃或重头实现 langchain.js 的 checkpointer,这里列一下我们的调研结果,方便读者自行判断:

  • 优点:
    • 可以轻易实现会话历史的存储和恢复。
    • 支持多种消息类型,并且支持 fork。
  • 缺点:
    • 数据库表的设计无法支持灵活的 sql 查询,对于 sql 把控力强的团队来说,它限制了灵活性。
    • 整体实现只能说是半成品,虽然有 PostgresSaver,但却没有 PostgresStore
    • 数据库设计是以 session(thread) 为中心,而为以 user 为中心,在实际中仍需关联一张额外的 user 表。

既然如此,何必作茧自缚?不如自行实现。

关于它的表结构,略。

高级特性

实现了以上功能之后,只能说你的 RAG 应用已经达到了 60 分的水准。如果要想进一步提高,那么需要考虑一些高级特性。本节将介绍一些常见的高级特性和实现方式。至于其他高级特性,则留给各位自行探索了。

外部数据源

一般企业虽然会有内网和外网之分,但是使用上却要求两者统一。对于知识库也不例外:

  • 内部知识库来源于内网,即上面通过 indexing pipeline 处理生成的知识库。
  • 外部知识库则泛指互联网的各类知识库。

如何结合两者可以有多种选择,这里没有对错,需求根据具体需求而定:

  • 直接使用 langchain.js tools 中现成的 tools,如:DuckDuckGoSearchExaSearchResultsSerpAPI 等。
    • 这是结合我们现有的 Agentic RAG 最简单的实现方式,但请注意,它是将搜索的决定权交给 agent 来自行决定,因此有一定的不确定性。
  • 仿造 Hybrid Search,将它与内网搜索结果合并,一起传给 llm。
    • 你可以通过扩展上面的 getRetrieveTool 来实现。这样的好处是,结果是确定的,它一定会被调用。

同时,这里还需要注意一点:假如你搜索到的结果是一个外部文档或视频链接,该怎么办?虽然搜索引擎会多少返回一些数据,如 summary,但面对这类数据源,这些数据并不足以支撑 RAG 的查询。是否读取其实际内容完全取决于你的需求:

  • 简单场景,这类搜索引擎返回结果可能就足够了。
  • 如果期望得到更专业的结果,可能就涉及到读取其实际内容。

CAG

由于 LLM 支持 long context 是大势所趋,如果你知道你选用的 LLM 支持 long context,那么你可以考虑实现 CAG(Contextual Agentic Generation)。说白了,就是直接将文档内容连同用户问题和 prompt 一起传给 LLM,既不需要 embedding,也不需要 reranking。

这样的好处显而易见:即简单且效果好。

但缺点也很明显:将 llm 的上下文限定在了该文档内。除非你有足够的把握确信用户的问题一定就是跟该文档相关且仅限于该文档就可以了,否则需要三思而行。

但是对于某些应用场景,选择 CAG 完全是最佳选择,比如:

  • 网页问答
  • 电子书问答
  • 主题文档较小时,即该主题下所有文档加起来也不超过 llm 的 long context 限制时。

通过设计,你也可以有意造就一些使用 CAG 的场景,实现更好的用户体验。同时还降低了 llm 的调用成本,因为此时只需调用一次 llm 而非一般的三次。

Permission 和 Filter

关于查询权限和过滤条件,它们本质上是一回事:都需要借助 metadata 字段来实现,前者算是后者的特例。

在数据源管理部分,我们提到在 CRUD 时让用户自行定义 metadata:

  • 对于权限控制,你可以在 metadata 中添加一个 permission 字段,存储用户或用户组的权限信息。
  • 对于过滤条件,则先搜集可能的过滤条件,然后将其设计成 metadata 的字段即可。

最后,在查询时,你可以在 matchDocuments 函数中根据用户的权限和过滤条件来过滤结果。剩下的就简单了,都是 sql 查询的技巧。

简单说说 Graph RAG

相比本文介绍的 RAG,Graph RAG 的主要区别在于:它构建的是了一个知识图谱(实体、属性、关系),并将其存储在图数据库中。相比传统的向量数据,图的表现力更强,因而会带来更好的查询效果,进而得到更准确的答案。典型的开源实现有微软的 Graph RAG

但 Graph RAG 同样也带来了更多的复杂性和实现成本:它通常需要额外的图数据库支持(如 Neo4j 等),且大多数开发者对图数据库的理解和使用不如关系型数据库熟悉。

并且,在 indexing 阶段,Graph RAG 比传统 RAG 更耗时。同时在更新知识库时,也需要更多的工作来维护图的结构和关系。

从这一点上看,Graph RAG 和本文介绍的 RAG 并不是非此即彼的关系,是可以互为补充的。在实际应用中完全可以将两者结合起来,强调准确性的地方使用 Graph RAG,强调效率的地方使用传统 RAG。

结语

通过两篇 RAG 文章的介绍,我们从理论到实践,全面了解了 RAG 的理论基础、实现方式和关键技术决策。相信它们将成为你未来 RAG 实现方案的有力基础。

付费内容

本文包含付费内容,需要会员权限才能查看完整内容。