【AI】RAG技术介绍与实操

【AI】RAG技术介绍与实操

本文介绍了AI领域RAG增强检索生成技术的介绍和实操记录

从零开始构建本地RAG系统:基于Ollama的DeepSeek-R1与Nomic-Embedd实战

大语言模型的出现改变了我们与信息交互的方式,但这些模型的知识局限于训练数据的截止日期,无法覆盖实时信息和企业私有文档。检索增强生成(Retrieval-Augmented Generation, RAG)通过为模型配备外部知识库,巧妙地解决了这一难题。本文将带领你从零开始,使用Ollama平台拉取DeepSeek-R1推理模型和Nomic-Embed-Text向量化模型,构建一个完整的本地RAG系统。

一、RAG的出现:解决大模型的知识困境

1.1 大模型的固有局限

大型语言模型虽然在语言理解和生成方面表现惊艳,但它们存在两个根本性缺陷:

知识时效性问题:模型的训练数据有严格的截止日期,对于新近发生的事件或最新研究成果一无所知。GPT-4的知识截止于2023年,当你询问2024年的新闻时,它要么无法回答,要么提供过时信息。

私有数据缺失问题:公开训练数据无法覆盖企业的内部文档、产品手册或个人知识库。如果你想让模型回答关于公司内部流程的问题,仅靠通用模型是无能为力的。

更严重的是,当模型面对超出其知识范围的问题时,它不会坦诚地说”不知道”,而是会产生”幻觉”(Hallucination)——自信地编造出看似合理但完全错误的信息。这在企业应用中是不可接受的。

1.2 RAG的核心思想:开卷考试

RAG的核心理念很简单:在回答问题前,先让模型”查阅资料”。就像开卷考试允许学生翻阅教材,RAG系统在执行生成任务前,会从一个外部知识库中检索出最相关的信息,然后将这些信息作为上下文提供给模型。

这种方法带来的优势是显著的:

  • 事实准确性提升:模型的回答基于检索到的真实信息,而非凭空编造
  • 知识可更新:只需更新知识库,无需重新训练模型
  • 来源可追溯:系统可以提供答案的信息来源,增强可信度
  • 领域适应性:通过更换知识库,同一个模型可以适配不同专业领域

RAG最早由Facebook AI Research在2020年提出,他们使用维基百科作为外部知识库,通过Dense Passage Retrieval(DPR)技术检索相关文本片段,然后输入给BART生成模型。自那时起,RAG迅速普及,成为AI应用开发的核心范式。

二、RAG向量化流程详解

RAG系统的核心在于将非结构化文本转换为机器可计算的向量表示。这一过程涉及文档加载、文本切分、向量化生成和向量存储四个关键步骤。

2.1 文档加载与切分

原始知识库通常是各种格式的文档——TXT、PDF、Markdown或网页。构建RAG的第一步是加载这些文档并将其切分成适合检索的文本块。

为什么需要切分? 大模型有上下文长度限制,直接将整本手册或整篇文章作为上下文既不现实也不高效。切分的目标是:每个文本块应包含相对完整的语义单元

切分策略有多种选择:

按换行符切分是最简单直接的方式,适用于行结构清晰的文档:

def split_content(content):
    chunks = []
    lines = content.splitlines()
    for line in lines:
        if line.strip():  # 忽略空行
            chunks.append(line)
    return chunks

递归字符文本切分更智能,它会尝试按段落、句子、单词的顺序逐步切分,尽可能保持语义完整性。实践中,chunk_size通常设为500-1000个字符,chunk_overlap设为50-100个字符,以保留上下文连贯性。

2.2 向量化:文本的数学表达

切分后的文本块需要转换为计算机可以理解和计算的形式——向量(Vector),也称为嵌入(Embedding)

向量的本质可以这样理解:想象我们要给每段文字拍一张特殊的”照片”,这张照片由几百个数字组成(如768个维度)。这张”照片”就是该段文字的”语义身份证”——语义相近的文字,它们的”照片”在数字空间中的距离也会很近。

这个转换过程由嵌入模型(Embedding Model) 完成。嵌入模型经过海量文本训练,学会了将文字映射到高维语义空间。在本文的实践中,我们将使用Ollama拉取的nomic-embed-text模型,它生成的向量维度为768维。

向量化的核心原则是:语义相似,向量相近。这意味着:

  • “苹果是一种水果”和”香蕉富含钾元素”的向量距离较近
  • “苹果是一种水果”和”Python是一种编程语言”的向量距离较远

2.3 向量存储

生成向量后,我们需要一个专门的基础设施来存储和检索这些向量。虽然传统数据库(如MySQL)也能存储向量,但它们无法进行高效的相似性搜索。

向量数据库(Vector Database) 专为存储和查询高维向量而设计,内置了高效的近似最近邻(ANN)搜索算法,可以在毫秒级内从数百万向量中找出最相似的几个。

常见的向量数据库包括FAISS、ChromaDB、Milvus等。在本文的实践中,考虑到我们追求理解原理而非生产部署,将使用FAISS或甚至直接使用NumPy数组存储向量——这足以让我们看清RAG的本质。

三、RAG推理检索流程

当用户提出问题后,RAG系统进入在线推理阶段。这一阶段包括查询向量化、相似度检索、结果重排和提示词构建四个环节。

3.1 查询向量化与相似度检索

用户输入的问题同样需要转换为向量——必须使用与知识库向量化完全相同的嵌入模型。这样才能确保查询向量和文档向量位于同一个语义空间,距离计算才有意义。

接下来,系统计算查询向量与知识库中所有文档向量的相似度。最常用的相似度指标是余弦相似度(Cosine Similarity)

def similarity(e1, e2):
    # 计算余弦相似度
    dot_product = np.dot(e1, e2)                     # 点乘
    norm_e1 = np.linalg.norm(e1)                     # 向量1的范数
    norm_e2 = np.linalg.norm(e2)                     # 向量2的范数
    cosine_sim = dot_product / (norm_e1 * norm_e2)   # 余弦相似度
    return cosine_sim

余弦相似度的取值范围是[-1, 1],值越大表示两个向量的夹角越小,语义越相近。

计算出所有相似度后,系统按分数从高到低排序,取前K个(通常K=3~5)最相似的文本块作为检索结果。

3.2 结果重排(可选优化)

基础的向量检索存在一个潜在问题:排名靠前的文档不一定是最有用的。检索阶段使用的是双编码器(Bi-Encoder) 架构,它将查询和文档分别编码,速度快但精度有限。

为了提高质量,可以引入重排(Reranking) 阶段。重排使用交叉编码器(Cross-Encoder) 架构,它将查询和文档拼接后一起输入模型,计算相关性得分,精度更高但速度慢。因此,典型的策略是:向量检索先快速召回Top 100,重排模型再从中精筛Top 5。

在本文的简化实现中,我们将跳过重排步骤,直接使用向量检索结果。

3.3 提示词构建与生成

检索到相关文档后,需要将它们与用户问题组装成一个结构化的提示词(Prompt),然后发送给大语言模型。

最简单的提示词模板如下:

prompt_template = """
基于以下知识回答问题:

知识:
1: %s
2: %s
3: %s
4: %s
5: %s

问题:%s

请基于上述知识给出准确、详细的回答。如果知识中不包含相关信息,请明确说明。
"""

这个模板的设计原则是:

  • 明确信息来源:告知模型知识是从哪里来的
  • 限定回答范围:要求模型”基于知识”回答,减少幻觉
  • 允许不知道:为模型提供”不知道”的出口,避免编造

将组装好的提示词发送给大语言模型(本文使用DeepSeek-R1),模型生成的回答就是最终输出。

四、输出整理与优化

RAG系统的输出并非终点,还需要进行整理和可能的优化。

4.1 引用来源

高质量的RAG系统应该在回答中标注信息来源。这类似于学术论文的脚注,用户可以核实每个事实的真实性。在实践中,可以在返回检索结果时保留文档的元数据(如文件名、段落位置),然后在生成回答时要求模型引用来源编号。

4.2 查询重写(高级优化)

用户提问往往含糊不清或包含隐含意图。例如,用户问”告诉我NVIDIA模型的最新更新”,可能暗中对特定功能感兴趣,但这种偏好没有被明确表达。

查询重写(Query Rewriting) 技术可以在检索前优化用户查询,弥合用户提问方式与知识库信息结构之间的语义差距。常用方法包括:

  • Q2E(Query2Expand):生成同义词和相关短语,扩展查询
  • Q2D(Query2Doc):根据查询构建伪文档,匹配文档风格
  • CoT(思维链)查询重写:让模型逐步推理,分解查询意图

研究表明,使用Llama 3.3 Nemotron Super 49B进行CoT查询重写后,检索准确率@10从43.1%提升至63.8%。

五、串联运行:完整Python脚本实现

现在,让我们将所有环节串联起来,编写一个完整的RAG系统Python脚本。本实现将最小化第三方框架依赖,仅使用Ollama和NumPy,确保你能够看清每个步骤的本质。

5.1 环境准备

首先,确保已安装Ollama并拉取所需模型:

# 安装Ollama(请访问ollama.com下载对应系统版本)

# 拉取模型
ollama pull deepseek-r1:8b
ollama pull nomic-embed-text

# 安装Python依赖
pip install ollama numpy

环境问题

系统python 3.9缺失

这个错误是因为Python 3.9不支持|操作符用于类型联合(type union)。在Python 3.10及以上版本中,|才被支持用于类型提示。你的代码运行在Python 3.9环境,所以报错。

需要安装python 3.10.X 使用pyenv工具,切换用户层级的python环境,

Rust编译器缺失:安装tiktoken时报错can’t find Rust compiler

SWIG工具缺失:报错command ‘swig’ failed: No such file or directory

FAISS编译失败:报错command ‘/opt/homebrew/bin/swig’ failed with exit code 1,FAISS安装失败

PyPy兼容性问题

发现你在使用PyPy而不是CPython,导致安装faiss和chromadb均报错。

重置步骤:

# 1. 确保使用 CPython
pyenv install 3.10.15
cd /Users/mac/Dev/Desktop/RagDemo
pyenv local 3.10.15

# 2. 创建干净的环境
python -m venv venv
source venv/bin/activate

# 3. 升级基础工具
pip install --upgrade pip setuptools wheel

# 4. 安装依赖(按顺序)
pip install chromadb
pip install llama-index-core
pip install llama-index-embeddings-ollama
pip install llama-index-llms-ollama
pip install llama-index-vector-stores-chroma

# 5. 验证安装
pip list | grep -E "chroma|llama"

5.2 完整代码实现

"""
基于LlamaIndex的多文件RAG系统
处理文件夹下所有文件,支持多种文件格式
显示DeepSeek-R1的完整思考过程
"""

import os
from typing import List, Generator, Optional
import warnings
warnings.filterwarnings('ignore')

# LlamaIndex核心组件
from llama_index.core import (
    SimpleDirectoryReader,
    VectorStoreIndex,
    Settings,
    Document
)
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_index.core.postprocessor import SimilarityPostprocessor
from llama_index.core.storage import StorageContext

# Ollama集成
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.llms.ollama import Ollama

# 向量存储
from llama_index.vector_stores.faiss import FaissVectorStore
import faiss

# 文件监控(可选)
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler


class MultiFileRAGSystem:
    """
    多文件RAG系统
    支持文件夹下所有文件,自动处理格式转换
    """

    SUPPORTED_EXTENSIONS = [
        '.txt', '.pdf', '.docx', '.pptx', '.xlsx',
        '.md', '.csv', '.epub', '.html', '.htm',
        '.json', '.xml', '.ipynb'
    ]

    def __init__(
        self,
        docs_dir: str,
        embed_model_name: str = "nomic-embed-text",
        llm_model_name: str = "deepseek-r1:8b",
        ollama_base_url: str = "http://localhost:11434",
        chunk_size: int = 512,
        chunk_overlap: int = 50,
        similarity_top_k: int = 5,
        persist_dir: Optional[str] = "./storage"
    ):
        """
        初始化多文件RAG系统

        Args:
            docs_dir: 文档目录路径
            embed_model_name: 嵌入模型名称
            llm_model_name: 大语言模型名称
            ollama_base_url: Ollama服务地址
            chunk_size: 文本块大小
            chunk_overlap: 文本块重叠大小
            similarity_top_k: 检索返回的相似文档数量
            persist_dir: 索引持久化目录(可选)
        """
        self.docs_dir = docs_dir
        self.persist_dir = persist_dir
        self.similarity_top_k = similarity_top_k
        self.thinking_buffer = ""
        self.answer_buffer = ""

        print("=" * 80)
        print("多文件RAG系统初始化")
        print("=" * 80)
        print(f"📁 文档目录: {docs_dir}")
        print(f"📊 支持的文件格式: {', '.join(self.SUPPORTED_EXTENSIONS[:5])}等")

        # 检查目录是否存在
        if not os.path.exists(docs_dir):
            raise ValueError(f"文档目录不存在: {docs_dir}")

        # 1. 配置嵌入模型
        print(f"\n🔧 加载嵌入模型: {embed_model_name}")
        self.embed_model = OllamaEmbedding(
            model_name=embed_model_name,
            base_url=ollama_base_url,
            ollama_additional_kwargs={"mirostat": 0}
        )

        # 2. 配置大语言模型
        print(f"🔧 加载语言模型: {llm_model_name}")
        self.llm = Ollama(
            model=llm_model_name,
            base_url=ollama_base_url,
            temperature=0.7,
            request_timeout=120.0,
            additional_kwargs={
                "num_predict": 2048,
                "top_k": 40,
                "top_p": 0.9
            }
        )

        # 3. 设置全局配置
        Settings.embed_model = self.embed_model
        Settings.llm = self.llm
        Settings.chunk_size = chunk_size
        Settings.chunk_overlap = chunk_overlap

        # 4. 初始化文本切分器
        self.node_parser = SentenceSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separator=" ",
            paragraph_separator="\n\n",
            secondary_chunking_regex="[^,.;:]+[,.;:]?"
        )

        # 5. 索引对象
        self.index = None
        self.query_engine = None

        # 6. 文件统计信息
        self.file_stats = {}

        print("✅ 初始化完成")

    def scan_documents(self) -> dict:
        """
        扫描文档目录,统计文件信息

        Returns:
            文件统计信息
        """
        print(f"\n📋 扫描文档目录: {self.docs_dir}")

        stats = {
            "total_files": 0,
            "supported_files": 0,
            "unsupported_files": 0,
            "file_types": {},
            "total_size_mb": 0
        }

        for root, dirs, files in os.walk(self.docs_dir):
            for file in files:
                file_path = os.path.join(root, file)
                file_ext = os.path.splitext(file)[1].lower()
                file_size = os.path.getsize(file_path) / (1024 * 1024)  # MB

                stats["total_files"] += 1
                stats["total_size_mb"] += file_size

                if file_ext in self.SUPPORTED_EXTENSIONS:
                    stats["supported_files"] += 1
                    stats["file_types"][file_ext] = stats["file_types"].get(file_ext, 0) + 1
                    print(f"  ✅ {file} ({file_size:.2f} MB) - {file_ext}")
                else:
                    stats["unsupported_files"] += 1
                    print(f"  ⚠️ {file} ({file_size:.2f} MB) - 不支持格式: {file_ext}")

        print(f"\n📊 统计结果:")
        print(f"  总文件数: {stats['total_files']}")
        print(f"  支持文件: {stats['supported_files']}")
        print(f"  不支持文件: {stats['unsupported_files']}")
        print(f"  总大小: {stats['total_size_mb']:.2f} MB")
        print(f"  文件类型分布: {stats['file_types']}")

        self.file_stats = stats
        return stats

    def load_documents(self, recursive: bool = True) -> List[Document]:
        """
        加载文档目录下的所有文件

        Args:
            recursive: 是否递归加载子目录

        Returns:
            文档列表
        """
        print(f"\n📄 加载文档...")

        # 首先扫描文件
        self.scan_documents()

        # 使用SimpleDirectoryReader加载所有文件
        reader = SimpleDirectoryReader(
            input_dir=self.docs_dir,
            recursive=recursive,
            exclude_hidden=True,  # 排除隐藏文件
            required_exts=self.SUPPORTED_EXTENSIONS  # 只加载支持的文件
        )

        documents = reader.load_data()

        print(f"\n✅ 成功加载 {len(documents)} 个文档")

        # 按文件类型显示统计
        doc_by_type = {}
        for doc in documents:
            file_name = doc.metadata.get('file_name', 'unknown')
            file_ext = os.path.splitext(file_name)[1].lower()
            doc_by_type[file_ext] = doc_by_type.get(file_ext, 0) + 1

        print("📊 文档类型分布:")
        for ext, count in doc_by_type.items():
            print(f"  {ext}: {count} 个文档")

        # 显示每个文档的信息
        for i, doc in enumerate(documents[:5]):  # 只显示前5个
            file_name = doc.metadata.get('file_name', 'unknown')
            file_size = doc.metadata.get('file_size', 0) / 1024  # KB
            preview = doc.text[:100].replace('\n', ' ') + "..." if len(doc.text) > 100 else doc.text
            print(f"\n  文档 {i+1}: {file_name}")
            print(f"    大小: {file_size:.1f} KB")
            print(f"    预览: {preview}")

        if len(documents) > 5:
            print(f"\n  ... 还有 {len(documents) - 5} 个文档未显示")

        return documents

    def build_index(self, documents: List[Document], force_rebuild: bool = False):
        """
        构建向量索引

        Args:
            documents: 文档列表
            force_rebuild: 是否强制重建(忽略已有索引)
        """
        # 检查是否有持久化索引
        if self.persist_dir and os.path.exists(self.persist_dir) and not force_rebuild:
            try:
                print(f"\n🔍 发现已有索引,尝试加载...")
                self.load_index()
                return
            except Exception as e:
                print(f"⚠️ 加载索引失败: {e}")
                print("将重新构建索引...")

        print(f"\n🔨 构建向量索引...")
        print(f"切分文档为文本块 (size={Settings.chunk_size}, overlap={Settings.chunk_overlap})")

        # 构建索引(显示进度)
        self.index = VectorStoreIndex.from_documents(
            documents,
            embed_model=self.embed_model,
            node_parser=self.node_parser,
            show_progress=True
        )

        print(f"✅ 索引构建完成")

        # 获取节点统计
        nodes = self.index.docstore.docs.values()
        print(f"  生成文本块数量: {len(nodes)}")

        # 计算平均文本块长度
        avg_length = sum(len(node.text) for node in nodes) / len(nodes)
        print(f"  平均文本块长度: {avg_length:.1f} 字符")

        # 保存索引
        if self.persist_dir:
            self.save_index()

    def setup_query_engine(self):
        """
        配置查询引擎
        """
        if not self.index:
            raise ValueError("请先构建索引")

        print(f"\n⚙️ 配置查询引擎...")

        # 1. 创建检索器
        retriever = VectorIndexRetriever(
            index=self.index,
            similarity_top_k=self.similarity_top_k,
            embed_model=self.embed_model
        )

        # 2. 创建响应合成器(启用流式输出)
        response_synthesizer = CompactAndRefine(
            llm=self.llm,
            streaming=True,
            verbose=True
        )

        # 3. 创建后处理器
        postprocessor = SimilarityPostprocessor(similarity_cutoff=0.6)  # 降低阈值以包含更多结果

        # 4. 组合为查询引擎
        self.query_engine = RetrieverQueryEngine(
            retriever=retriever,
            response_synthesizer=response_synthesizer,
            node_postprocessors=[postprocessor]
        )

        print(f"✅ 查询引擎配置完成")
        print(f"  检索数量: top_{self.similarity_top_k}")
        print(f"  相似度阈值: 0.6")

    def query_with_thinking(self, query: str) -> Generator[dict, None, None]:
        """
        执行查询并流式返回思考过程

        Args:
            query: 用户查询

        Yields:
            包含类型和内容的字典
        """
        if not self.query_engine:
            raise ValueError("请先配置查询引擎")

        print(f"\n🔍 执行查询: '{query}'")

        # 1. 执行检索
        nodes = self.query_engine.retriever.retrieve(query)

        # 组织检索结果
        retrieval_results = []
        for node in nodes:
            file_name = node.metadata.get('file_name', '未知文件')
            retrieval_results.append({
                "text": node.text,
                "score": node.score,
                "file_name": file_name,
                "node_id": node.node_id
            })

        # 显示检索结果
        yield {
            "type": "retrieval",
            "content": {
                "query": query,
                "results": retrieval_results
            }
        }

        # 2. 构建提示词(带文件来源信息)
        context_parts = []
        for i, node in enumerate(nodes):
            file_name = node.metadata.get('file_name', '未知文件')
            context_parts.append(f"[来自文件: {file_name}]\n{node.text}")

        context_text = "\n\n---\n\n".join(context_parts)
        prompt = f"""
基于以下从多个文件中检索到的知识回答问题。每个知识片段都标注了来源文件。

{context_text}

用户问题:{query}

请严格基于上述知识回答,不要编造信息。如果知识中不包含相关信息,请明确说明。
在回答中可以提及信息来源文件(如"根据XXX文件的描述")。
"""

        yield {"type": "prompt", "content": prompt[:300] + "..." if len(prompt) > 300 else prompt}

        # 3. 执行查询(流式输出)
        yield {"type": "start_generation", "content": "开始生成回答..."}

        streaming_response = self.query_engine.query(query)

        # 重置缓冲区
        self.thinking_buffer = ""
        self.answer_buffer = ""
        in_thinking = True

        # 流式处理响应
        for text_chunk in streaming_response.response_gen:
            # 检测思考/回答边界
            if "思考:" in text_chunk or "分析:" in text_chunk:
                in_thinking = True
                self.thinking_buffer += text_chunk
                yield {"type": "thinking_chunk", "content": text_chunk}
            elif "答案:" in text_chunk or "回答:" in text_chunk:
                if in_thinking:
                    yield {"type": "thinking_end", "content": ""}
                in_thinking = False
                self.answer_buffer += text_chunk
                yield {"type": "answer_chunk", "content": text_chunk}
            elif in_thinking:
                self.thinking_buffer += text_chunk
                yield {"type": "thinking_chunk", "content": text_chunk}
            else:
                self.answer_buffer += text_chunk
                yield {"type": "answer_chunk", "content": text_chunk}

        # 4. 返回完整结果(带来源信息)
        source_nodes = [
            {
                "text": node.text,
                "score": node.score,
                "file_name": node.metadata.get('file_name', '未知文件'),
                "metadata": node.metadata
            }
            for node in streaming_response.source_nodes
        ]

        yield {
            "type": "complete",
            "content": {
                "thinking": self.thinking_buffer,
                "answer": streaming_response.response,
                "source_nodes": source_nodes
            }
        }

    def save_index(self):
        """保存索引到磁盘"""
        if not self.index or not self.persist_dir:
            return

        os.makedirs(self.persist_dir, exist_ok=True)
        self.index.storage_context.persist(persist_dir=self.persist_dir)
        print(f"✅ 索引已保存到 {self.persist_dir}")

    def load_index(self):
        """从磁盘加载索引"""
        from llama_index.core import load_index_from_storage

        if not os.path.exists(self.persist_dir):
            raise ValueError(f"持久化目录不存在: {self.persist_dir}")

        storage_context = StorageContext.from_defaults(persist_dir=self.persist_dir)
        self.index = load_index_from_storage(storage_context)
        print(f"✅ 索引已从 {self.persist_dir} 加载")


class DocumentWatcher(FileSystemEventHandler):
    """文档目录监控器,当文件变化时自动重建索引"""

    def __init__(self, rag_system: MultiFileRAGSystem):
        self.rag_system = rag_system
        self.last_rebuild = time.time()
        self.rebuild_cooldown = 60  # 60秒内不重复重建

    def on_modified(self, event):
        if not event.is_directory:
            self._handle_change(event.src_path)

    def on_created(self, event):
        if not event.is_directory:
            self._handle_change(event.src_path)

    def on_deleted(self, event):
        if not event.is_directory:
            self._handle_change(event.src_path)

    def _handle_change(self, file_path):
        """处理文件变化"""
        current_time = time.time()
        if current_time - self.last_rebuild > self.rebuild_cooldown:
            print(f"\n🔄 检测到文件变化: {file_path}")
            print("正在重建索引...")
            self.rag_system.build_index(
                self.rag_system.load_documents(),
                force_rebuild=True
            )
            self.last_rebuild = current_time


def interactive_mode():
    """交互式问答模式"""

    # 配置
    docs_dir = "./blog_docs"  # 存放所有知识库文件的目录

    # 初始化RAG系统
    rag = MultiFileRAGSystem(
        docs_dir=docs_dir,
        embed_model_name="nomic-embed-text",
        llm_model_name="deepseek-r1:8b",
        chunk_size=512,
        chunk_overlap=50,
        similarity_top_k=5,
        persist_dir="./storage"  # 启用持久化
    )

    # 加载文档并构建索引
    documents = rag.load_documents(recursive=True)
    rag.build_index(documents)
    rag.setup_query_engine()

    # 交互式问答
    print("\n" + "=" * 80)
    print("多文件RAG系统已就绪!输入问题开始问答(输入 'exit' 退出)")
    print("=" * 80)

    while True:
        query = input("\n📝 【用户】: ").strip()
        if query.lower() in ['exit', 'quit', '退出']:
            print("再见!")
            break

        if not query:
            continue

        print("\n🔄 【系统】正在处理...")
        print("-" * 80)

        thinking_mode = True
        sources_shown = False

        for event in rag.query_with_thinking(query):
            if event["type"] == "retrieval":
                print("\n📚 【检索结果】")
                for i, result in enumerate(event["content"]["results"]):
                    preview = result["text"][:60] + "..." if len(result["text"]) > 60 else result["text"]
                    print(f"  [{i+1}] 相关度: {result['score']:.4f} | 来源: {result['file_name']}")
                    print(f"      内容: {preview}")

            elif event["type"] == "prompt":
                print(f"\n📝 【提示词预览】\n  {event['content']}")

            elif event["type"] == "start_generation":
                print("\n🧠 【模型思考中】")

            elif event["type"] == "thinking_chunk":
                print(event["content"], end='', flush=True)

            elif event["type"] == "thinking_end":
                print("\n\n💡 【思考完成,开始回答】\n")
                thinking_mode = False

            elif event["type"] == "answer_chunk":
                print(event["content"], end='', flush=True)

            elif event["type"] == "complete" and not sources_shown:
                print("\n\n📊 【信息来源】")
                sources = {}
                for node in event["content"]["source_nodes"]:
                    file_name = node["file_name"]
                    if file_name not in sources:
                        sources[file_name] = []
                    sources[file_name].append({
                        "score": node["score"],
                        "preview": node["text"][:50] + "..."
                    })

                for file_name, snippets in sources.items():
                    print(f"\n  📄 {file_name}:")
                    for i, snippet in enumerate(snippets):
                        print(f"    [{i+1}] 相关度: {snippet['score']:.4f}")
                        print(f"        {snippet['preview']}")

                sources_shown = True

        print("\n" + "=" * 80)


def batch_mode():
    """批量处理模式:处理多个问题"""

    # 配置
    docs_dir = "./blog_docs"

    rag = MultiFileRAGSystem(
        docs_dir=docs_dir,
        persist_dir="./storage"
    )

    # 加载文档并构建索引
    documents = rag.load_documents()
    rag.build_index(documents)
    rag.setup_query_engine()

    # 从文件读取问题列表
    questions_file = "questions.txt"
    if os.path.exists(questions_file):
        with open(questions_file, 'r', encoding='utf-8') as f:
            questions = [q.strip() for q in f.readlines() if q.strip()]
    else:
        # 示例问题
        questions = [
            "苹果有什么营养价值?",
            "香蕉适合什么时候吃?",
            "如何保持健康饮食?"
        ]

    print("\n" + "=" * 80)
    print("批量处理模式")
    print("=" * 80)

    results = []
    for i, question in enumerate(questions, 1):
        print(f"\n📌 问题 {i}/{len(questions)}: {question}")
        print("-" * 40)

        response = rag.query_engine.query(question)
        print(f"回答: {response}")

        results.append({
            "question": question,
            "answer": str(response),
            "sources": [
                {
                    "file": node.metadata.get('file_name', '未知'),
                    "score": node.score,
                    "text": node.text[:200]
                }
                for node in response.source_nodes
            ]
        })

        print()

    # 保存结果
    import json
    with open("batch_results.json", "w", encoding='utf-8') as f:
        json.dump(results, f, ensure_ascii=False, indent=2)
    print("✅ 结果已保存到 batch_results.json")


def watch_mode():
    """监控模式:监听文件变化并自动更新索引"""

    # 配置
    docs_dir = "./blog_docs"

    rag = MultiFileRAGSystem(
        docs_dir=docs_dir,
        persist_dir="./storage"
    )

    # 初始构建
    documents = rag.load_documents()
    rag.build_index(documents)
    rag.setup_query_engine()

    # 启动文件监控
    event_handler = DocumentWatcher(rag)
    observer = Observer()
    observer.schedule(event_handler, docs_dir, recursive=True)
    observer.start()

    print("\n" + "=" * 80)
    print("监控模式已启动 - 文件变化时将自动重建索引")
    print("输入问题开始问答(输入 'exit' 退出)")
    print("=" * 80)

    try:
        while True:
            query = input("\n📝 【用户】: ").strip()
            if query.lower() in ['exit', 'quit', '退出']:
                break

            if not query:
                continue

            response = rag.query_engine.query(query)
            print(f"\n💬 【回答】: {response}")

            # 显示来源
            print("\n📚 【来源】:")
            for node in response.source_nodes[:3]:
                file_name = node.metadata.get('file_name', '未知')
                print(f"  - {file_name} (相关度: {node.score:.4f})")

    finally:
        observer.stop()
        observer.join()


def main():
    """主函数:选择运行模式"""

    print("多文件RAG系统 - 选择运行模式:")
    print("1. 交互式问答")
    print("2. 批量处理")
    print("3. 监控模式(自动更新)")

    choice = input("\n请输入选择 (1/2/3): ").strip()

    if choice == "1":
        interactive_mode()
    elif choice == "2":
        batch_mode()
    elif choice == "3":
        watch_mode()
    else:
        print("无效选择,运行默认交互式模式")
        interactive_mode()


if __name__ == "__main__":
    main()

代码解析:

系统启动时,首先通过MultiFileRAGSystem类的构造函数完成基础配置。初始化过程包括设置文档目录路径、配置Ollama服务连接的嵌入模型(默认使用nomic-embed-text)和大语言模型(默认使用deepseek-r1:1.5b),以及定义文本处理参数如块大小512字符、块重叠50字符和检索返回数量top_k=5。同时,系统会检查文档目录是否存在,配置全局Settings对象,初始化文本切分器,并准备持久化存储目录。

系统通过scan_documents方法递归扫描指定目录下的所有文件,统计总文件数、支持的文件格式(包括txt、pdf、docx、pptx、xlsx、md、csv等十多种格式)、文件大小分布等信息,并识别不支持的文件格式。随后,load_documents方法调用SimpleDirectoryReader加载所有支持格式的文档,将每个文件转换为Document对象,保留文件名、文件大小等元数据信息。

在索引构建阶段,系统使用SentenceSplitter将加载的文档切分为固定大小的文本块(节点),每个节点包含文本内容和来源元数据。通过OllamaEmbedding将文本块转换为向量表示,并存储到Faiss向量数据库中构建VectorStoreIndex。如果启用了持久化功能,索引会被保存到磁盘,方便后续快速加载而无需重复构建。构建完成后,系统会统计生成的文本块数量和平均长度。

setup_query_engine方法负责配置完整的查询流水线。首先创建VectorIndexRetriever检索器,设置相似度检索的top_k参数。然后创建CompactAndRefine响应合成器,启用流式输出和详细模式。接着配置SimilarityPostprocessor后处理器,设置相似度阈值为0.6以过滤低质量结果。最后将这些组件组合成RetrieverQueryEngine,形成完整的查询处理链路。

当用户提交查询时,系统执行query_with_thinking方法,该方法设计为生成器函数以支持流式输出。查询处理分为多个阶段:

首先是检索阶段,系统将用户问题向量化后到索引中检索最相似的top_k个文本块,返回每个块的文本内容、相似度分数和来源文件信息,这些结果以"retrieval"类型的事件流式输出。

其次是提示词构建阶段,系统将检索到的文本块按照来源文件组织,构建包含完整上下文信息的提示词,明确标注每个知识片段的来源文件,要求LLM严格基于提供的知识回答,并可提及信息来源。构建的提示词以"prompt"类型事件输出。

然后是流式生成阶段,系统调用查询引擎执行查询,获取流式响应。在生成过程中,系统实时分析输出的文本流,智能区分"思考过程"和"最终答案"两部分。通过检测"思考:"、"分析:"等关键词标记思考过程的开始,检测"答案:"、"回答:"等关键词标记思考与回答的边界。思考过程的每个文本块以"thinking_chunk"类型输出,回答部分以"answer_chunk"类型输出,实现类似DeepSeek-R1的思考过程展示效果。

最后是结果整合阶段,生成完成后,系统收集完整的思考过程和最终答案,同时整理所有引用的源节点信息,包括每个节点的文本预览、相似度分数和来源文件,以"complete"类型事件返回完整结果。

三种运行模式详解
交互式模式是系统的主要使用方式,初始化RAG系统后进入问答循环。用户输入问题后,系统实时展示检索到的相关文本块及其相似度分数,然后逐字显示模型的思考过程,接着显示最终生成的答案,最后列出所有信息来源文件及具体引用的文本片段,实现完全透明的问答过程。

批量处理模式适用于需要处理多个预定义问题的场景。系统从questions.txt文件读取问题列表,或使用内置示例问题,依次处理每个问题。每个问题的回答结果和来源信息被收集并保存为结构化的JSON文件,便于后续分析和处理。

监控模式通过watchdog库实现文件系统的实时监控。当检测到文档目录中的文件被修改、创建或删除时,系统自动触发索引重建,确保知识库的实时性。同时,用户仍可进行问答交互,系统会使用最新的索引提供服务。为防止频繁重建,设置了60秒的重建冷却时间。

运行效果对比

直接在命令行中运行提问

>>> 安卓设备开机流程有哪些
Thinking...
好的,用户之前问过关于“helolo”的意思,我解释了一下,然后他又问了“你好”。看起来他是在确认是否能直接使用我的回答。现在他
询问“安卓设备开机流程有哪些”,这显然是在寻求指导。

我觉得可能有些疑问或者对安卓设备操作感兴趣。所以,我决定详细介绍一下安卓设备的开机步骤,包括启动程序、设置软件和完成一
些基本的用户操作。

可能他有新手,或者是想确认一些基本信息来更好地使用手机或电脑,比如设置、任务管理等。我的回答要尽量简单易懂,避免太专业
的术语,让他能够轻松理解。

另外,我还用了感叹号加强语气,显得比较热情和亲切,希望他能对我的建议有所认可。这样不仅帮助他解决了问题,还增加了与用户
的互动感。
...done thinking.

好的!安卓设备开机流程主要分为以下几步:

---

### **第一步:找到并启动操作系统**
1. 打开手机或电脑。
2. 在桌面上找到“应用商店”(在Windows系统中是“Settings”,在Mac系统中是“Programmer”)。
3. 点击“应用商店”后,找到并打开“Android”或“安卓”应用商店(根据你使用的设备类型而定)。

---

### **第二步:启动目标设备**
1. 打开“设置”(如“ Settings”在Windows系统中,或“System Settings”在Mac系统中)。
2. 在设置窗口中找到并打开你想操作的安卓设备的操作系统版本。
3. 点击“确定”即可开始运行目标设备。

---

### **第三步:进入设置**
1. 打开“设置”,然后选择要启动的设备。
2. 设备在设备界面上会显示为一个图标或图标,点击完成即可连接到目标设备。

---

### **第四步:运行设备**
1. 确认输入并点击“确定”按钮(Windows系统)或“下一步”(Mac系统)。
2. 运行目标设备后,你可以看到设备的设置界面,可以通过右键点击设备进行更多操作。

---

### **注意事项**
- 如果你的手机或电脑没有连接到互联网,可能需要关闭网络选项以确保设备能够正确启动。
- 检查并安装必要的软件,如系统更新、应用商店和第三方工具等。

希望这对你有帮助!如果你还有其他问题,随时告诉我哦! 😊

>>> Send a message (/? for help)

RAG运行效果

📝 【用户】: 安卓设备开机流程有哪些

🔄 【系统】正在处理...
--------------------------------------------------------------------------------

🔍 执行查询: '安卓设备开机流程有哪些'

📚 【检索结果】
  [1] 相关度: 0.6225 | 来源: 2022-12-12-【Android进阶】Android设备开机流程.md
      内容: 通常是芯片内部固化的一段非常小的启动代码(BootROM)。这段代码是芯片制造商预先写入的,不可修改

## 执行芯片内...
  [2] 相关度: 0.6050 | 来源: 2022-12-14-【Android进阶】Android热门原理流程总结.md
      内容: ## 内存泄漏常见场景
见性能优化篇:
[【Android性能优化】内存](./2025-1-4-【Android性能优...
  [3] 相关度: 0.6010 | 来源: 2022-12-12-【Android进阶】Android设备开机流程.md
      内容: ..

## 桌面环境启动阶段
System Server启动完成后,AMS会启动Launcher应用。Launcher...
  [4] 相关度: 0.5841 | 来源: 2022-12-14-【Android进阶】Android热门依赖库知识点总结.md
      内容: * ViewModel 暴露 LiveData
* View(Activity/Fragment) 观察 LiveDat...
  [5] 相关度: 0.5775 | 来源: 2022-12-14-【Android进阶】Android热门依赖库知识点总结.md
      内容: * 当 Activity 可能被销毁时(如配置变更或后台回收),系统会调用 onSaveInstanceState(Bu...

📝 【提示词预览】
  
    基于以下从多个文件中检索到的知识回答问题。每个知识片段都标注了来源文件。

    [来自文件: 2022-12-12-【Android进阶】Android设备开机流程.md]
通常是芯片内部固化的一段非常小的启动代码(BootROM)。这段代码是芯片制造商预先写入的,不可修改

## 执行芯片内部BootROM代码
BootROM代码执行基本的硬件初始化,验证并加载下一阶段的引导程序(通常是从特定存储区域),实现安全验证(如验证Bootloader签名)

## 安全启动(Secure Boot)验证
现代Android设备都支持安全启动机制,BootROM会验证Bootloade...

🧠 【模型思考中】
Android设备开机流程主要包括以下几个部分:

1. **系统服务和桌面组件初始化完成**:设备显示完整的桌面环境,用户可以开始与设备交互。

2. **系统启动稍作了解即可**:除了SystemUI和Launcher外,我们更多关注的是应用层的启动流程。在Android进阶的文章中详细描述了这些内容。

3. **系统启动流程**:
   - 系统启动流程包括冷启动流程、Handler消息机制以及Activity Window初始化等部分。
   - 冷启动流程用于快速启动设备,使其处于待机状态。
   - Handler消息机制负责处理用户请求和应用信息的同步。
   - Activity Window初始化为每个应用提供一个独立的工作界面。

4. **设备进入 Bootloader模式**:在系统运行时,设备会按特定键(如Volume+Power)进入Bootloader模式。在启动完成后,Boot ROM代码会被加载并执行,确保设备安全运行。

这些内容详细涵盖了Android设备开机流程的核心步骤和过程,从系统初始化到用户界面的设置。

📊 【信息来源】

  📄 2022-12-12-【Android进阶】Android设备开机流程.md:
    [1] 相关度: 0.6225
        通常是芯片内部固化的一段非常小的启动代码(BootROM)。这段代码是芯片制造商预先写入的,不可修改...
    [2] 相关度: 0.6010
        ..

## 桌面环境启动阶段
System Server启动完成后,AMS会启动Launcher应...

  📄 2022-12-14-【Android进阶】Android热门原理流程总结.md:
    [1] 相关度: 0.6050
        ## 内存泄漏常见场景
见性能优化篇:
[【Android性能优化】内存](./2025-1-4-【...

================================================================================

传统模型回答(左):

完全依赖模型预训练知识,回答内容泛化且存在明显错误

将安卓设备开机流程混淆为"打开应用商店"、"启动目标设备"等错误操作

出现了"在Mac系统中是Programmer"等荒谬描述

整体回答缺乏技术深度,更像是通用设备使用指南

RAG增强回答(右):

严格基于检索到的技术文档回答问题

准确描述了BootROM、安全启动、System Server、Launcher等专业概念

引用了具体的Markdown文档作为知识来源(如2022-12-12-【Android进阶】Android设备开机流程.md)

回答内容专业、准确,符合Android开发者的技术认知

结语

本文从RAG的出现原因出发,详细讲解了向量化流程、推理检索流程和输出整理优化,并最终给出了一个完整、可运行的本地RAG系统实现。通过这个实战项目,你可以清晰地看到:RAG的本质就是检索 + 上下文增强 + 生成这三步曲。

RAG系统的本质是”开卷考试”:当面对”安卓设备开机流程”这样的专业问题时,它不像传统模型那样凭记忆”闭卷作答”(然后编造错误答案),而是先查阅技术文档资料,再基于资料给出准确回答。这种模式既保留了大语言模型的强大生成能力,又通过外部知识库弥补了其知识局限性和幻觉问题,是构建专业领域AI助手的理想方案。

当你运行这个系统,输入”健康的水果推荐”,即使知识库中从未出现”推荐”二字,系统也能通过语义相似度找到苹果、香蕉、橙子的描述——这就是向量检索的魅力。当你看到大模型基于检索到的知识给出准确回答,而不是凭空编造时,你就理解了RAG为什么能成为AI应用开发的基石。

从简单的流水线到自主的Agentic RAG,这项技术仍在快速演进。但无论多复杂的系统,其核心都是本文揭示的这些基本原理。希望这篇文章能帮助你打下坚实的基础,在RAG的世界里走得更远。