跳转至

如何进行RAG

欢迎来到知识增强的魔法世界——RAG!🧠📚

1.为什么AI会说谎?

  AI大模型本质是“概率生成器”,依赖训练数据的记忆。当问题超出其知识范围时,可能编造看似合理但错误的答案。

核心思想

纯生成式AI像“闭卷考试”,无法实时获取外部知识。

2.为什么需要RAG?

  想象你有一个聪明的朋友,但他2021年后就没看过任何新闻和资料。无论他多聪明,问他2023年的事情,他也只能猜测或编造。这就是大语言模型的局限:它们的知识在训练截止日期就"冻结"了。

RAG解决了以下问题

  • 知识时效性:获取最新信息和事实
  • 专业领域知识:访问特定领域的专业知识
  • 减少幻觉:引用实际文档而非"凭空想象"
  • 信息透明度:能够提供信息来源和引用
  • 私有数据访问:处理企业内部或私有资料

3.RAG是什么?

  RAG全称是Retrieval-Augmented Generation,即检索增强生成,是一种结合了检索系统和生成模型的强大技术,能够让大语言模型访问外部知识库,从而生成更准确、更新、更专业的内容。

核心思想

让AI“开卷考试”——先查资料再回答,结合生成能力与实时检索能力。

4.RAG工作原理

RAG系统主要包含三个核心步骤:

  1. 检索(Retrieval):根据用户问题,从知识库中找出最相关的文档或片段
  2. 增强(Augmentation):将检索到的信息与用户问题一起发送给语言模型
  3. 生成(Generation):语言模型根据问题和检索到的信息生成回答

Example

场景:玩家询问一个游戏玩法

步骤1:检索(Retrieve)

  • 玩家问题和知识库内容转化为向量。

  • 通过向量相似度在向量知识库(存储游戏攻略、更新日志等)中检索和问题最相关的k个内容。

示例图片

步骤2:生成(Generate)

  • 将问题 + 检索到的知识上下文与提示词组合一起输入大模型

  • 模型生成游戏玩法详解

示例图片

5.构建简单的RAG应用程序

5.1 索引

  从数据源提取数据并进行索引的管道。这通常在线下进行。

1.加载

  首先需要加载博客文章内容。本例使用 WebBaseLoader,它是LangChain中的一个文档加载器,用于从网页加载数据。

import bs4
from langchain_community.document_loaders import WebBaseLoader
# Only keep post title, headers, and content from the full HTML.
bs4_strainer = bs4.SoupStrainer(class_=("post-title", "post-header", "post-content"))
loader = WebBaseLoader(
   web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
   bs_kwargs={"parse_only": bs4_strainer},
)
docs = loader.load()
assert len(docs) == 1
print(f"Total characters: {len(docs[0].page_content)}")
  结果:
Total characters: 43131
2.拆分
from langchain_text_splitters import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
   chunk_size=1000,  # chunk size (characters)
   chunk_overlap=200,  # chunk overlap (characters)
   add_start_index=True,  # track index in original document
)
all_splits = text_splitter.split_documents(docs)
print(f"Split blog post into {len(all_splits)} sub-documents.")
  结果:
Split blog post into 66 sub-documents.
3.存储

  对这66个文本块进行索引,以便在运行时进行搜索。

from langchain_openai import OpenAIEmbeddings
from langchain_core.vectorstores import InMemoryVectorStore
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vector_store = InMemoryVectorStore(embeddings)
document_ids = vector_store.add_documents(documents=all_splits)
print(document_ids[:3])

  结果:

['be2fadc8-bebd-462f-90a9-ed7045e22530', '8a318bfc-a44d-4d5d-8c49-1cb0ee2d9fe2', 'a8dab8c4-7177-4a2f-a520-3e6a6fcc6a33']

5.2 检索与生成

1.提示词

  使用RAG提示词

from langchain import hub
prompt = hub.pull("rlm/rag-prompt")

2.使用LangGraph将检索和生成步骤整合到一个应用程序中

  需定义三件事: 一是应用程序的状态; 二是应用步骤; 三是应用程序的“控制流”。

  应用程序的状态:控制输入、数据怎么传递和输出

from langchain_core.documents import Document
from typing_extensions import List, TypedDict


class State(TypedDict):
   question: str
   context: List[Document]
   answer: str
  应用步骤:检索和生成

def retrieve(state: State):
   retrieved_docs = vector_store.similarity_search(state["question"])
   return {"context": retrieved_docs}


def generate(state: State):
   docs_content = "\n\n".join(doc.page_content for doc in state["context"])
   messages = prompt.invoke({"question": state["question"], "context": docs_content})
   response = llm.invoke(messages)
   return {"answer": response.content}
  应用程序的“控制流”:将应用步骤连接成一个序列
from langgraph.graph import START, StateGraph


graph_builder = StateGraph(State).add_sequence([retrieve, generate])
graph_builder.add_edge(START, "retrieve")
graph = graph_builder.compile()

5.3 调用

response = graph.invoke({"question": "What is Task Decomposition?"})
print(response["answer"])
import os
os.environ["OPENAI_BASE_URL"] = "https://oneapi.handbook.cool/v1"
os.environ["OPENAI_API_KEY"] = "sk-XnbHbzBOmPYGHgL_8q1nHn9pF7SRIQO-3M0QhYcpYAmV3kxQJ7SiqbzfETE"

from langchain_openai import ChatOpenAI
llm= ChatOpenAI(model="gpt-4.1-nano")

from langchain_openai import OpenAIEmbeddings
from langchain_core.vectorstores import InMemoryVectorStore
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vector_store = InMemoryVectorStore(embeddings)

import bs4
from langchain import hub
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langgraph.graph import START, StateGraph
from typing_extensions import List, TypedDict

loader = WebBaseLoader(
    web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
    bs_kwargs=dict(
        parse_only=bs4.SoupStrainer(
            class_=("post-content", "post-title", "post-header")
        )
    ),
)
docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
all_splits = text_splitter.split_documents(docs)

_ = vector_store.add_documents(documents=all_splits)

prompt = hub.pull("rlm/rag-prompt")

class State(TypedDict):
    question: str
    context: List[Document]
    answer: str

def retrieve(state: State):
    retrieved_docs = vector_store.similarity_search(state["question"])
    return {"context": retrieved_docs}


def generate(state: State):
    docs_content = "\n\n".join(doc.page_content for doc in state["context"])
    messages = prompt.invoke({"question": state["question"], "context": docs_content})
    response = llm.invoke(messages)
    return {"answer": response.content}

# Compile application and test
graph_builder = StateGraph(State).add_sequence([retrieve, generate])
graph_builder.add_edge(START, "retrieve")
graph = graph_builder.compile()    

response = graph.invoke({"question": "What is Task Decomposition?"})
print(response["answer"])

  结果:

Task Decomposition is the process of breaking down complex tasks into smaller, manageable sub-steps or subgoals. It can be performed using simple prompting with LLMs, task-specific instructions, or external planning tools like classical planners with PDDL. This approach helps in organizing and solving complex problems more effectively.

6.RAG的高级优化技术

初级RAG主要在三个方面面临挑战

  • 检索质量低
  • 生成质量差
  • 增强过程难

6.1 检索前处理

1.查询转换

查询转换是RAG系统中的一项高级技术,它通过以下方式提升检索质量:

  • 复杂查询分解:将多部分问题拆解为独立的子查询
  • 并行检索:同时执行多个简单查询提高效率
  • 结果合成:将子查询结果合并生成最终答案

哪个框架在Github上,Langchain和mem0哪个star更多?

问题分解:“Langchain 在 Github 上有多少 star?”+“mem0在 Github 上有多少 star?”

结果展示

根据检索到的信息LangChain在GitHub上的Star数为109,000而mem0的Star数为33,500因此LangChain在GitHub上的Star数更多
pip install rank_bm25    
import asyncio
from typing import List, Dict
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from langchain_core.runnables import RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
import os
os.environ["OPENAI_BASE_URL"] = "https://oneapi.handbook.cool/v1"
os.environ["OPENAI_API_KEY"] = "sk-XnbHbzBOmPYGHgL_8q1nHn9pF7SRIQO-3M0QhYcpYAmV3kxQJ7SiqbzfETE"

class QueryTransformer:
    def __init__(self, llm):
        self.llm = llm

    async def decompose_query(self, query: str) -> List[str]:
        """使用LLM分解复杂查询为子查询"""
        decomposition_prompt = ChatPromptTemplate.from_template(
            "将以下查询分解为可以独立回答的子查询。"
            "每个子查询应该足够简单以便直接检索。"
            "如果查询已经很简单,则无需分解。\n"
            "查询: {query}\n"
            "以JSON列表格式输出子查询,不要包含其他内容。"
            "示例输出: [\"子查询1\", \"子查询2\"]"
        )

        chain = decomposition_prompt | self.llm | StrOutputParser()
        result = await chain.ainvoke({"query": query})

        try:
            import json
            sub_queries = json.loads(result)
            if not sub_queries:  # 如果LLM认为不需要分解
                return [query]
            return sub_queries
        except:
            # 如果解析失败,返回原始查询
            return [query]

class ParallelRetriever:
    def __init__(self, retriever: BaseRetriever):
        self.retriever = retriever

    async def retrieve_for_subquery(self, query: str) -> List[Document]:
        """为单个子查询执行检索"""
        return await self.retriever.ainvoke(query)

    async def parallel_retrieve(self, sub_queries: List[str]) -> Dict[str, List[Document]]:
        """并行执行所有子查询的检索"""
        tasks = [self.retrieve_for_subquery(q) for q in sub_queries]
        results = await asyncio.gather(*tasks)
        return {query: docs for query, docs in zip(sub_queries, results)}

class AnswerSynthesizer:
    def __init__(self, llm):
        self.llm = llm

    async def synthesize_answer(self, original_query: str, retrieval_results: Dict[str, List[Document]]) -> str:
        """基于原始查询和检索结果合成最终答案"""
        synthesis_prompt = ChatPromptTemplate.from_template(
            "你是一个专业的问答助手。请基于以下检索到的信息回答原始问题。\n"
            "原始问题: {query}\n\n"
            "检索到的信息:\n{context}\n\n"
            "请给出一个完整、准确的答案。如果信息不足,请说明。"
        )

        # 将检索结果格式化为上下文字符串
        context_str = ""
        for sub_q, docs in retrieval_results.items():
            context_str += f"子问题: {sub_q}\n"
            context_str += "相关文档:\n" + "\n".join([d.page_content for d in docs]) + "\n\n"

        chain = synthesis_prompt | self.llm | StrOutputParser()
        return await chain.ainvoke({"query": original_query, "context": context_str})

class QueryTransformationRAG:
    def __init__(self, retriever: BaseRetriever, llm):
        self.query_transformer = QueryTransformer(llm)
        self.parallel_retriever = ParallelRetriever(retriever)
        self.answer_synthesizer = AnswerSynthesizer(llm)

    async def answer_query(self, query: str) -> str:
        """完整的查询转换RAG流程"""
        # 1. 查询分解
        sub_queries = await self.query_transformer.decompose_query(query)
        print(f"分解后的子查询: {sub_queries}")

        # 2. 并行检索
        retrieval_results = await self.parallel_retriever.parallel_retrieve(sub_queries)
        print(f"检索结果: {retrieval_results}")

        # 3. 答案合成
        final_answer = await self.answer_synthesizer.synthesize_answer(query, retrieval_results)
        return final_answer

# 使用示例
if __name__ == "__main__":
    # 初始化组件 - 实际使用时需要替换为真实的retriever和LLM
    from langchain_community.retrievers import BM25Retriever
    from langchain_openai import ChatOpenAI

    # 模拟文档库
    documents = [
        Document(page_content="LangChain has 109,000 stars on GitHub"),
        Document(page_content="mem0 has 33,500 stars on GitHub"),
        Document(page_content="LangChain is a popular framework for building LLM applications"),
        Document(page_content="mem0 is an innovative memory optimization library designed for AI and machine learning applications.")
    ]

    retriever = BM25Retriever.from_documents(documents)
    llm= ChatOpenAI(model="gpt-4.1-nano")

    # 创建RAG系统
    rag_system = QueryTransformationRAG(retriever, llm)

    # 示例查询
    query = "哪个框架在Github上,Langchain和mem0哪个star更多?"

    async def main():
        answer = await rag_system.answer_query(query)
        print("\n最终答案:")
        print(answer)

# 在 Jupyter Notebook 中可以直接运行
    await main()

6.2 索引优化

1.混合检索策略

结合多种检索方法以获得更好的结果:

from langchain.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever
# BM25检索器 - 基于关键词匹配
bm25_retriever = BM25Retriever.from_documents(chunks)
bm25_retriever.k = 5
# 向量检索器 - 基于语义相似度
vector_retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
# 集成检索器
ensemble_retriever = EnsembleRetriever(
    retrievers=[bm25_retriever, vector_retriever],
    weights=[0.5, 0.5]
)
docs = ensemble_retriever.get_relevant_documents(query)
import asyncio
from typing import List, Dict
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from langchain_core.runnables import RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever

import os
os.environ["OPENAI_BASE_URL"] = "https://oneapi.handbook.cool/v1"
os.environ["OPENAI_API_KEY"] = "sk-XnbHbzBOmPYGHgL_8q1nHn9pF7SRIQO-3M0QhYcpYAmV3kxQJ7SiqbzfETE"

class QueryTransformer:
    def __init__(self, llm):
        self.llm = llm

    async def decompose_query(self, query: str) -> List[str]:
        """使用LLM分解复杂查询为子查询"""
        decomposition_prompt = ChatPromptTemplate.from_template(
            "将以下查询分解为可以独立回答的子查询。"
            "每个子查询应该足够简单以便直接检索。"
            "如果查询已经很简单,则无需分解。\n"
            "查询: {query}\n"
            "以JSON列表格式输出子查询,不要包含其他内容。"
            "示例输出: [\"子查询1\", \"子查询2\"]"
        )

        chain = decomposition_prompt | self.llm | StrOutputParser()
        result = await chain.ainvoke({"query": query})

        try:
            import json
            sub_queries = json.loads(result)
            if not sub_queries:  # 如果LLM认为不需要分解
                return [query]
            return sub_queries
        except:
            # 如果解析失败,返回原始查询
            return [query]

class ParallelRetriever:
    def __init__(self, retriever: BaseRetriever):
        self.retriever = retriever

    async def retrieve_for_subquery(self, query: str) -> List[Document]:
        """为单个子查询执行检索"""
        return await self.retriever.ainvoke(query)

    async def parallel_retrieve(self, sub_queries: List[str]) -> Dict[str, List[Document]]:
        """并行执行所有子查询的检索"""
        tasks = [self.retrieve_for_subquery(q) for q in sub_queries]
        results = await asyncio.gather(*tasks)
        return {query: docs for query, docs in zip(sub_queries, results)}

class AnswerSynthesizer:
    def __init__(self, llm):
        self.llm = llm

    async def synthesize_answer(self, original_query: str, retrieval_results: Dict[str, List[Document]]) -> str:
        """基于原始查询和检索结果合成最终答案"""
        synthesis_prompt = ChatPromptTemplate.from_template(
            "你是一个专业的问答助手。请基于以下检索到的信息回答原始问题。\n"
            "原始问题: {query}\n\n"
            "检索到的信息:\n{context}\n\n"
            "请给出一个完整、准确的答案。如果信息不足,请说明。"
        )

        # 将检索结果格式化为上下文字符串
        context_str = ""
        for sub_q, docs in retrieval_results.items():
            context_str += f"子问题: {sub_q}\n"
            context_str += "相关文档:\n" + "\n".join([d.page_content for d in docs]) + "\n\n"

        chain = synthesis_prompt | self.llm | StrOutputParser()
        return await chain.ainvoke({"query": original_query, "context": context_str})

class QueryTransformationRAG:
    def __init__(self, retriever: BaseRetriever, llm):
        self.query_transformer = QueryTransformer(llm)
        self.parallel_retriever = ParallelRetriever(retriever)
        self.answer_synthesizer = AnswerSynthesizer(llm)

    async def answer_query(self, query: str) -> str:
        """完整的查询转换RAG流程"""
        # 1. 查询分解
        sub_queries = await self.query_transformer.decompose_query(query)
        print(f"分解后的子查询: {sub_queries}")

        # 2. 并行检索
        retrieval_results = await self.parallel_retriever.parallel_retrieve(sub_queries)
        print(f"检索结果: {retrieval_results}")

        # 3. 答案合成
        final_answer = await self.answer_synthesizer.synthesize_answer(query, retrieval_results)
        return final_answer

# 正确的占位符向量检索器实现
class PlaceholderVectorRetriever(BaseRetriever):
    def _get_relevant_documents(self, query: str) -> List[Document]:
        # 这里返回空列表作为占位符
        return []

# 使用示例
if __name__ == "__main__":
    # 初始化组件
    from langchain_community.retrievers import BM25Retriever
    from langchain_openai import ChatOpenAI

    # 模拟文档库
    documents = [
        Document(page_content="LangChain has 109,000 stars on GitHub"),
        Document(page_content="mem0 has 33,500 stars on GitHub"),
        Document(page_content="LangChain is a popular framework for building LLM applications"),
        Document(page_content="mem0 is an innovative memory optimization library designed for AI and machine learning applications.")
    ]

    # 创建BM25检索器
    bm25_retriever = BM25Retriever.from_documents(documents)
    bm25_retriever.k = 5

    # 创建占位符向量检索器
    vector_retriever = PlaceholderVectorRetriever()

    # 创建集成检索器
    ensemble_retriever = EnsembleRetriever(
        retrievers=[bm25_retriever, vector_retriever],
        weights=[0.5, 0.5]  # 可以根据需要调整权重
    )

    llm = ChatOpenAI(model="gpt-4.1-nano")

    # 创建RAG系统
    rag_system = QueryTransformationRAG(ensemble_retriever, llm)

    # 示例查询
    query = "哪个框架在Github上,Langchain和mem0哪个star更多?"

    async def main():
        answer = await rag_system.answer_query(query)
        print("\n最终答案:")
        print(answer)

# 在 Jupyter Notebook 中可以直接运行
    await main()

2.分层检索(Parent-Child Chunking)

通过保持文档的层次结构来提高检索质量:

from langchain.retrievers import ParentDocumentRetriever
# 创建父子检索器
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
parent_retriever = ParentDocumentRetriever(
    vectorstore=vectorstore,
    parent_splitter=parent_splitter,
    child_splitter=child_splitter
)
docs = parent_retriever.get_relevant_documents(query)

6.3 检索后处理

1.再排序(Re-ranking)

检索到的文档可能不是最相关的,再排序可以改进结果:

from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CohereRerank
# 使用Cohere的再排序API
compressor = CohereRerank()
reranking_retriever = ContextualCompressionRetriever(
    base_retriever=vectorstore.as_retriever(search_kwargs={"k": 10}),
    base_compressor=compressor
)
docs = reranking_retriever.get_relevant_documents(query)

2.检索增强提示工程

通过更好的提示模板来引导模型如何使用检索到的信息:

from langchain.prompts import PromptTemplate
template = """
你是一位专业的客服代表。使用以下检索到的文档片段来回答客户问题。
始终保持礼貌和专业,如果检索的文档中没有相关信息,请诚实地说明你不知道,不要编造信息。
文档:
{context}
客户问题: {question}
请按照以下格式回答:
1. 简明直接地回答问题
2. 如果适用,引用文档中的相关政策或规定
3. 如果客户需要更多帮助,提供他们可以联系的部门或人员
"""
PROMPT = PromptTemplate(
    template=template,
    input_variables=["context", "question"]
)

7.评估RAG系统

建立RAG系统后,评估其性能至关重要:

问题: 哪个框架在Github上,Langchain和mem0哪个star更多?

参考答案: LangChain的Star数明显多于mem0

系统回答: 根据提供的信息,LangChain在GitHub上的star数是109,000,而mem0的star数是33,500。因此,LangChain在GitHub上的star数更多。

GRADE: CORRECT

import asyncio
from typing import List, Dict
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from langchain_core.runnables import RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever

import os
os.environ["OPENAI_BASE_URL"] = "https://oneapi.handbook.cool/v1"
os.environ["OPENAI_API_KEY"] = "sk-XnbHbzBOmPYGHgL_8q1nHn9pF7SRIQO-3M0QhYcpYAmV3kxQJ7SiqbzfETE"

class QueryTransformer:
    def __init__(self, llm):
        self.llm = llm

    async def decompose_query(self, query: str) -> List[str]:
        """使用LLM分解复杂查询为子查询"""
        decomposition_prompt = ChatPromptTemplate.from_template(
            "将以下查询分解为可以独立回答的子查询。"
            "每个子查询应该足够简单以便直接检索。"
            "如果查询已经很简单,则无需分解。\n"
            "查询: {query}\n"
            "以JSON列表格式输出子查询,不要包含其他内容。"
            "示例输出: [\"子查询1\", \"子查询2\"]"
        )

        chain = decomposition_prompt | self.llm | StrOutputParser()
        result = await chain.ainvoke({"query": query})

        try:
            import json
            sub_queries = json.loads(result)
            if not sub_queries:  # 如果LLM认为不需要分解
                return [query]
            return sub_queries
        except:
            # 如果解析失败,返回原始查询
            return [query]

class ParallelRetriever:
    def __init__(self, retriever: BaseRetriever):
        self.retriever = retriever

    async def retrieve_for_subquery(self, query: str) -> List[Document]:
        """为单个子查询执行检索"""
        return await self.retriever.ainvoke(query)

    async def parallel_retrieve(self, sub_queries: List[str]) -> Dict[str, List[Document]]:
        """并行执行所有子查询的检索"""
        tasks = [self.retrieve_for_subquery(q) for q in sub_queries]
        results = await asyncio.gather(*tasks)
        return {query: docs for query, docs in zip(sub_queries, results)}

class AnswerSynthesizer:
    def __init__(self, llm):
        self.llm = llm

    async def synthesize_answer(self, original_query: str, retrieval_results: Dict[str, List[Document]]) -> str:
        """基于原始查询和检索结果合成最终答案"""
        synthesis_prompt = ChatPromptTemplate.from_template(
            "你是一个专业的问答助手。请基于以下检索到的信息回答原始问题。\n"
            "原始问题: {query}\n\n"
            "检索到的信息:\n{context}\n\n"
            "请给出一个完整、准确的答案。如果信息不足,请说明。"
        )

        # 将检索结果格式化为上下文字符串
        context_str = ""
        for sub_q, docs in retrieval_results.items():
            context_str += f"子问题: {sub_q}\n"
            context_str += "相关文档:\n" + "\n".join([d.page_content for d in docs]) + "\n\n"

        chain = synthesis_prompt | self.llm | StrOutputParser()
        return await chain.ainvoke({"query": original_query, "context": context_str})

class QueryTransformationRAG:
    def __init__(self, retriever: BaseRetriever, llm):
        self.query_transformer = QueryTransformer(llm)
        self.parallel_retriever = ParallelRetriever(retriever)
        self.answer_synthesizer = AnswerSynthesizer(llm)

    async def answer_query(self, query: str) -> str:
        """完整的查询转换RAG流程"""
        # 1. 查询分解
        sub_queries = await self.query_transformer.decompose_query(query)
        print(f"分解后的子查询: {sub_queries}")

        # 2. 并行检索
        retrieval_results = await self.parallel_retriever.parallel_retrieve(sub_queries)
        print(f"检索结果: {retrieval_results}")

        # 3. 答案合成
        final_answer = await self.answer_synthesizer.synthesize_answer(query, retrieval_results)
        return final_answer

# 准备评估数据
eval_data = [
    {"query": "哪个框架在Github上,Langchain和mem0哪个star更多?", "answer": "LangChain的Star数明显多于mem0"},
]

# 初始化RAG系统
retriever = BM25Retriever.from_documents([
    Document(page_content="LangChain has 109,000 stars on GitHub"),
    Document(page_content="mem0 has 33,500 stars on GitHub"),
    Document(page_content="LangChain is a popular framework for building LLM applications"),
    Document(page_content="mem0 is an innovative memory optimization library designed for AI and machine learning applications.")
])
llm = ChatOpenAI(model="gpt-4.1-nano")
rag_system = QueryTransformationRAG(retriever, llm)

# 获取RAG系统的回答(异步调用)
async def get_rag_predictions(eval_data):
    predictions = []
    for example in eval_data:
        result = await rag_system.answer_query(example["query"])
        predictions.append({"query": example["query"], "result": result})
    return predictions

# 主评估函数
async def main():
    # 获取预测结果
    predictions = await get_rag_predictions(eval_data)

    # 创建评估链
    eval_llm = ChatOpenAI(model="gpt-4.1-nano")
    eval_chain = QAEvalChain.from_llm(eval_llm)

    # 评估结果
    graded_outputs = eval_chain.evaluate(
        eval_data,
        predictions,
        question_key="query",
        prediction_key="result",
        answer_key="answer"
    )

    # 打印评估结果
    for i, grade in enumerate(graded_outputs):
        print(f"问题: {eval_data[i]['query']}")
        print(f"参考答案: {eval_data[i]['answer']}")
        print(f"系统回答: {predictions[i]['result']}")
        # 直接访问 'results' 键
        print(f" {grade['results']}")
        print("---")


# 运行评估
await main()

8. 数据库句子嵌入与可视化

  在实际应用中,RAG的一个重要任务是对用户输入的句子进行语义理解,以便匹配到适合的类别或意图。例如,在智能客服场景中,我们可能需要将用户的问题归类到预定义的类别中,以便进一步处理。以下是一个基于句子嵌入的流程,包含从数据加载到可视化的完整过程。

8.1 数据加载与预处理

  第一步是加载意图识别数据库中的句子数据,并对句子进行清理和预处理。这是整个流程的基础,目的是确保数据质量和一致性,使后续的嵌入生成和分析更加准确。

为什么需要数据预处理?

数据库中的句子可能包含特殊字符、空值、格式错误等问题。如果不进行预处理,这些问题可能会影响嵌入的质量和分类的准确性。

数据加载与预处理代码示例

以下是加载数据集和清理句子的代码

# 数据集文件名(替换成自己的数据集)
dataset_names = []
# 拼接数据集
def load_and_merge_datasets(dataset_names):
   dataframes = []
   for dataset_name in dataset_names:
       df = pd.read_csv(f"{dataset_name}_dataset.csv")
       dataframes.append(df)
   merged_df = pd.concat(dataframes, ignore_index=True)
   return merged_df
# 预处理句子
def preprocess_sentence(sentence):
   if not isinstance(sentence, str):
       sentence = str(sentence) if pd.notna(sentence) else ""
   sentence = re.sub(r"\[.*?\]", "", sentence)
   sentence = re.sub(r"[^a-zA-Zа-яА-Я0-9\s]", "", sentence)
   return sentence.strip()

8.2 生成句子嵌入

  第二步是通过预训练模型将每个句子转换为向量表示(嵌入)。这种嵌入捕捉了句子的语义信息,让我们可以通过数学方法比较句子之间的相似性。

什么是句子嵌入

句子嵌入是一种将文本转换为固定长度向量的技术。这些向量可以表示句子的语义,类似于将语言转化为机器可以理解的数学形式。

使用预训练模型生成嵌入

以下代码使用sentence-transformers库生成句子嵌入:

from transformers import AutoTokenizer, AutoModel
import torch
# 加载模型和分词器
model_name = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name).to(device)
# 生成嵌入
def generate_embeddings(sentences, batch_size=32):
   all_embeddings = []
   for i in range(0, len(sentences), batch_size):
       batch = sentences[i:i+batch_size]
       inputs = tokenizer(batch, return_tensors="pt", padding=True, truncation=True).to(device)
       with torch.no_grad():
           outputs = model(**inputs)
       embeddings = outputs.last_hidden_state.mean(dim=1).cpu().numpy()
       all_embeddings.append(embeddings)
   return np.vstack(all_embeddings)

8.3 类别匹配

  第三步是将句子分配到预定义的类别。我们为每个类别生成一个嵌入向量,然后计算每个句子与类别之间的相似度,选择最匹配的类别。

为什么需要类别匹配?

类别匹配的目的是将用户输入的句子归类到具体的意图或主题中。例如,将“如何充值游戏币?”归类到“充值相关”类别。

类别匹配代码示例

以下代码展示如何生成类别嵌入并匹配句子:

# 生成类别嵌入
def compute_category_embeddings(categories):
   category_embeddings = {}
   for category in categories:
       descriptive_text = f"Category {category}"
       category_embeddings[category] = generate_embeddings([descriptive_text])[0]
   return category_embeddings
# 分配类别
def assign_categories(embeddings, category_embeddings):
   assigned_categories = []
   for embedding in embeddings:
       distances = {category: np.dot(embedding, category_embeddings[category]) /
                    (np.linalg.norm(embedding) * np.linalg.norm(category_embeddings[category]))
                    for category in category_embeddings}
       closest_category = max(distances, key=distances.get)
       assigned_categories.append(closest_category)
   return assigned_categories

8.4 降维与聚类

  为了更直观地观察句子嵌入和类别之间的关系,我们可以对嵌入向量进行降维处理(如使用t-SNE或PCA),并对类别嵌入进行聚类分析。

为什么需要降维与聚类?

句子嵌入通常是高维向量(例如512维)。降维可以将这些高维数据映射到2D或3D空间,方便可视化。聚类可以帮助我们发现类别之间的关系或模式。

降维与聚类代码示例

以下代码展示如何实现降维和聚类:

from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
# 降维处理
def reduce_dimensions(embeddings, method="tsne", n_components=2):
   if method == "tsne":
       reducer = TSNE(n_components=n_components, perplexity=30, learning_rate=200, random_state=42)
   elif method == "pca":
       reducer = PCA(n_components=n_components)
   else:
       raise ValueError("Unsupported reduction method. Use 'tsne' or 'pca'.")
   return reducer.fit_transform(embeddings)
# 聚类
def cluster_embeddings(reduced_embeddings, n_clusters=5):
   kmeans = KMeans(n_clusters=n_clusters, random_state=42)
   return kmeans.fit_predict(reduced_embeddings)

8.5 可视化

  最后,我们将降维后的嵌入向量进行可视化,展示句子与类别之间的关系,或类别点的聚类结果。

可视化句子与类别关系(2D)

以下代码展示如何绘制句子与类别的关系图:

import matplotlib.pyplot as plt
# 可视化句子与类别的关系
def visualize_sentence_category_relationship_2d(sentence_embeddings, category_embeddings, assigned_categories, categories):
all_embeddings = np.vstack([sentence_embeddings] + [category_embeddings[cat] for cat in categories])
labels = assigned_categories + categories


reduced_embeddings = reduce_dimensions(all_embeddings, method="tsne", n_components=2)
sentence_points = reduced_embeddings[:len(sentence_embeddings)]
category_points = reduced_embeddings[len(sentence_embeddings):]


colors = plt.cm.tab20.colors


plt.figure(figsize=(12, 8))
for i, category in enumerate(categories):
    indices = [j for j, assigned_category in enumerate(assigned_categories) if assigned_category == category]
    plt.scatter(sentence_points[indices, 0], sentence_points[indices, 1],
                c=[colors[i]], alpha=0.6, label=f"Category: {category}")
    plt.scatter(category_points[i, 0], category_points[i, 1],
                c=[colors[i]], alpha=1.0, s=100, edgecolor="black")


plt.title("Sentence Embeddings and Closest Categories (2D)")
plt.legend(loc="upper right")
plt.xlabel("Dimension 1")
plt.ylabel("Dimension 2")
plt.show()

#模拟数据生成:
#创建3个文本类别:体育、技术、政治
#为每个类别生成10维的嵌入向量(作为类别中心)
#围绕每个类别中心生成20个带噪声的句子嵌入(共60个句子)

import numpy as np
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE

# 模拟降维函数
def reduce_dimensions(embeddings, method="tsne", n_components=2):
    tsne = TSNE(n_components=n_components, random_state=42)
    return tsne.fit_transform(embeddings)

# 可视化函数
def visualize_sentence_category_relationship_2d(sentence_embeddings, category_embeddings, assigned_categories, categories):
    all_embeddings = np.vstack([sentence_embeddings] + [category_embeddings[cat] for cat in categories])
    labels = assigned_categories + categories

    reduced_embeddings = reduce_dimensions(all_embeddings, method="tsne", n_components=2)
    sentence_points = reduced_embeddings[:len(sentence_embeddings)]
    category_points = reduced_embeddings[len(sentence_embeddings):]

    colors = plt.cm.tab20.colors

    plt.figure(figsize=(12, 8))
    for i, category in enumerate(categories):
        # 绘制属于该类别的句子点
        indices = [j for j, assigned_category in enumerate(assigned_categories) if assigned_category == category]
        plt.scatter(sentence_points[indices, 0], sentence_points[indices, 1],
                    c=[colors[i]], alpha=0.6, label=f"Category: {category}")

        # 绘制类别中心点
        plt.scatter(category_points[i, 0], category_points[i, 1],
                    c=[colors[i]], alpha=1.0, s=200, edgecolor="black", marker="X")

    plt.title("Sentence Embeddings and Category Centers (2D T-SNE Projection)")
    plt.legend(loc="best")
    plt.xlabel("Dimension 1")
    plt.ylabel("Dimension 2")
    plt.grid(alpha=0.2)
    plt.show()

# 生成模拟数据
np.random.seed(42)

# 定义3个类别
categories = ["Sports", "Technology", "Politics"]

# 生成类别嵌入 (3个10维向量)
category_embeddings = {
    "Sports": np.random.normal(1.0, 0.1, 10),
    "Technology": np.random.normal(2.0, 0.2, 10),
    "Politics": np.random.normal(0.5, 0.3, 10)
}

# 生成句子嵌入 (每个类别生成20个句子)
sentence_embeddings = []
assigned_categories = []

for category in categories:
    for _ in range(20):
        # 在类别中心附近添加随机噪声
        base = category_embeddings[category]
        noise = np.random.normal(0, 0.2, 10)
        sentence_embeddings.append(base + noise)
        assigned_categories.append(category)

sentence_embeddings = np.array(sentence_embeddings)

# 执行可视化
visualize_sentence_category_relationship_2d(
    sentence_embeddings,
    category_embeddings,
    assigned_categories,
    categories
)
rag2D

可视化句子与类别关系(3D)

以下代码展示如何绘制类别嵌入的聚类结果:

from mpl_toolkits.mplot3d import Axes3D


# 可视化类别点的聚类结果(3D)
def visualize_clusters_3d(category_points, cluster_labels, categories):
fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection='3d')


colors = plt.cm.tab20.colors   for i, category in enumerate(categories):
    ax.scatter(category_points[i, 0], category_points[i, 1], category_points[i, 2],
                c=[colors[cluster_labels[i]]], marker='X', s=200, label=f"Category: {category}")


ax.set_title("Category Embedding Clusters (3D)")
ax.set_xlabel("Dimension 1")
ax.set_ylabel("Dimension 2")
ax.set_zlabel("Dimension 3")
ax.legend(loc="upper right")
plt.show()

# 数据模拟:
# 创建了10个文本类别(如体育、科技、政治等)
# 将类别分为3个语义相关的簇:
    #科技簇(技术/科学/教育)
    #生活簇(健康/食物/旅行)
    #综合簇(体育/娱乐/商业/政治)
#为每个类别生成三维坐标(添加随机噪声模拟真实分布)

import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

# 可视化函数
def visualize_clusters_3d(category_points, cluster_labels, categories):
    fig = plt.figure(figsize=(12, 10))
    ax = fig.add_subplot(111, projection='3d')

    colors = plt.cm.tab10.colors  # 使用10种不同颜色

    # 为每个类别绘制点
    for i, category in enumerate(categories):
        ax.scatter(category_points[i, 0], category_points[i, 1], category_points[i, 2],
                c=[colors[cluster_labels[i]]], 
                marker='X', 
                s=200, 
                edgecolor='black',
                depthshade=True,
                label=f"{category} (Cluster {cluster_labels[i]})")

    # 添加连接线(相同簇的点之间)
    for cluster_id in set(cluster_labels):
        cluster_indices = [i for i, label in enumerate(cluster_labels) if label == cluster_id]

        # 绘制簇中心
        cluster_center = np.mean(category_points[cluster_indices], axis=0)
        ax.scatter(cluster_center[0], cluster_center[1], cluster_center[2],
                c=[colors[cluster_id]], 
                marker='o', 
                s=300, 
                alpha=0.3)

        # 连接簇内点
        for i in cluster_indices:
            ax.plot([category_points[i, 0], cluster_center[0]],
                    [category_points[i, 1], cluster_center[1]],
                    [category_points[i, 2], cluster_center[2]],
                    c=colors[cluster_id], alpha=0.3, linestyle='--')

    ax.set_title("3D Visualization of Category Clusters", fontsize=16)
    ax.set_xlabel("Dimension 1", fontsize=12)
    ax.set_ylabel("Dimension 2", fontsize=12)
    ax.set_zlabel("Dimension 3", fontsize=12)

    # 调整图例位置避免遮挡
    ax.legend(loc="upper left", bbox_to_anchor=(0.05, 0.95))

    # 添加网格和美化
    ax.xaxis.pane.fill = False
    ax.yaxis.pane.fill = False
    ax.zaxis.pane.fill = False
    ax.grid(True, linestyle='--', alpha=0.4)

    plt.tight_layout()
    plt.show()

# 生成模拟数据
np.random.seed(42)

# 定义类别名称
categories = ["Sports", "Technology", "Politics", 
            "Health", "Entertainment", "Education",
            "Business", "Science", "Travel", "Food"]

# 生成类别点 (10个类别,每个类别有3维坐标)
category_points = np.zeros((10, 3))

# 创建三个明显的聚类簇
# 簇0:科技相关
category_points[0] = [1.2, 0.8, 2.5]  # Technology
category_points[1] = [1.5, 1.0, 2.3]  # Science
category_points[2] = [0.9, 0.7, 2.7]  # Education

# 簇1:生活相关
category_points[3] = [3.0, 4.2, 1.0]  # Health
category_points[4] = [3.5, 4.5, 0.8]  # Food
category_points[5] = [2.8, 3.9, 1.2]  # Travel

# 簇2:其他
category_points[6] = [0.5, 2.5, 0.5]  # Sports
category_points[7] = [0.3, 2.2, 0.7]  # Entertainment
category_points[8] = [0.6, 2.8, 0.3]  # Business
category_points[9] = [0.4, 2.0, 0.6]  # Politics

# 添加随机噪声使分布更自然
category_points += np.random.normal(0, 0.15, category_points.shape)

# 定义聚类标签 (3个簇)
cluster_labels = [0, 0, 0,  # Technology, Science, Education
                1, 1, 1,  # Health, Food, Travel
                2, 2, 2, 2]  # Sports, Entertainment, Business, Politics

# 执行可视化
visualize_clusters_3d(category_points, cluster_labels, categories)
rag3D

8.6 总结

  通过上述步骤,我们实现了从句子嵌入生成到类别匹配,再到降维与可视化的完整流程。这种方法不仅帮助我们理解句子与类别之间的语义关系,还为后续的分类或意图识别任务提供了技术基础。

9. 常见问题与解决方案

问题 可能原因 解决方案
检索结果不相关 文本块太大或太小 调整chunk_size,通常500-1500字符较适合
回答包含错误信息 检索内容不足或模型幻觉 增加检索文档数量,改进提示以减少幻觉
系统响应速度慢 检索或嵌入计算开销大 使用批处理,缓存结果,优化向量数据库
无法处理特定领域问题 领域知识不足 添加更多领域专业文档,使用领域微调模型
回答格式不一致 提示模板不够明确 改进提示模板,明确指定输出格式

10. 最佳实践

  1. 文档处理:确保文档质量好,删除无关内容,标准化格式
  2. 文本分块:根据内容性质调整分块大小和重叠程度
  3. 检索策略:实验不同的k值和检索类型(相似度、MMR等)
  4. 动态系统:定期更新知识库以保持信息最新
  5. 引用来源:让回答包含信息来源,提高透明度和可信度
  6. 人机协作:对于关键应用,加入人类审核环节

11. 下一步

完成本章后,你已经掌握了RAG的基本原理和实现方法。接下来,你可以:

  • 探索更先进的向量数据库(如Pinecone、Weaviate)
  • 学习基于多查询技术的RAG,提高召回率
  • 尝试结合Agent和RAG,创建能自主搜索信息的系统
  • 构建领域专用的RAG系统,如法律、医疗或金融顾问

祝你在RAG的世界里找到知识的力量!🚀