【AI】RAG技术介绍与实操

本文介绍了AI领域RAG增强检索生成技术的介绍和实操记录
从零开始构建本地RAG系统:基于Ollama的DeepSeek-R1与Nomic-Embedd实战
大语言模型的出现改变了我们与信息交互的方式,但这些模型的知识局限于训练数据的截止日期,无法覆盖实时信息和企业私有文档。检索增强生成(Retrieval-Augmented Generation, RAG)通过为模型配备外部知识库,巧妙地解决了这一难题。本文将带领你从零开始,使用Ollama平台拉取DeepSeek-R1推理模型和Nomic-Embed-Text向量化模型,构建一个完整的本地RAG系统。
一、RAG的出现:解决大模型的知识困境
1.1 大模型的固有局限
大型语言模型虽然在语言理解和生成方面表现惊艳,但它们存在两个根本性缺陷:
知识时效性问题:模型的训练数据有严格的截止日期,对于新近发生的事件或最新研究成果一无所知。GPT-4的知识截止于2023年,当你询问2024年的新闻时,它要么无法回答,要么提供过时信息。
私有数据缺失问题:公开训练数据无法覆盖企业的内部文档、产品手册或个人知识库。如果你想让模型回答关于公司内部流程的问题,仅靠通用模型是无能为力的。
更严重的是,当模型面对超出其知识范围的问题时,它不会坦诚地说”不知道”,而是会产生”幻觉”(Hallucination)——自信地编造出看似合理但完全错误的信息。这在企业应用中是不可接受的。
1.2 RAG的核心思想:开卷考试
RAG的核心理念很简单:在回答问题前,先让模型”查阅资料”。就像开卷考试允许学生翻阅教材,RAG系统在执行生成任务前,会从一个外部知识库中检索出最相关的信息,然后将这些信息作为上下文提供给模型。
这种方法带来的优势是显著的:
- 事实准确性提升:模型的回答基于检索到的真实信息,而非凭空编造
- 知识可更新:只需更新知识库,无需重新训练模型
- 来源可追溯:系统可以提供答案的信息来源,增强可信度
- 领域适应性:通过更换知识库,同一个模型可以适配不同专业领域
RAG最早由Facebook AI Research在2020年提出,他们使用维基百科作为外部知识库,通过Dense Passage Retrieval(DPR)技术检索相关文本片段,然后输入给BART生成模型。自那时起,RAG迅速普及,成为AI应用开发的核心范式。
二、RAG向量化流程详解
RAG系统的核心在于将非结构化文本转换为机器可计算的向量表示。这一过程涉及文档加载、文本切分、向量化生成和向量存储四个关键步骤。
2.1 文档加载与切分
原始知识库通常是各种格式的文档——TXT、PDF、Markdown或网页。构建RAG的第一步是加载这些文档并将其切分成适合检索的文本块。
为什么需要切分? 大模型有上下文长度限制,直接将整本手册或整篇文章作为上下文既不现实也不高效。切分的目标是:每个文本块应包含相对完整的语义单元。
切分策略有多种选择:
按换行符切分是最简单直接的方式,适用于行结构清晰的文档:
def split_content(content):
chunks = []
lines = content.splitlines()
for line in lines:
if line.strip(): # 忽略空行
chunks.append(line)
return chunks
递归字符文本切分更智能,它会尝试按段落、句子、单词的顺序逐步切分,尽可能保持语义完整性。实践中,chunk_size通常设为500-1000个字符,chunk_overlap设为50-100个字符,以保留上下文连贯性。
2.2 向量化:文本的数学表达
切分后的文本块需要转换为计算机可以理解和计算的形式——向量(Vector),也称为嵌入(Embedding)。
向量的本质可以这样理解:想象我们要给每段文字拍一张特殊的”照片”,这张照片由几百个数字组成(如768个维度)。这张”照片”就是该段文字的”语义身份证”——语义相近的文字,它们的”照片”在数字空间中的距离也会很近。
这个转换过程由嵌入模型(Embedding Model) 完成。嵌入模型经过海量文本训练,学会了将文字映射到高维语义空间。在本文的实践中,我们将使用Ollama拉取的nomic-embed-text模型,它生成的向量维度为768维。
向量化的核心原则是:语义相似,向量相近。这意味着:
- “苹果是一种水果”和”香蕉富含钾元素”的向量距离较近
- “苹果是一种水果”和”Python是一种编程语言”的向量距离较远
2.3 向量存储
生成向量后,我们需要一个专门的基础设施来存储和检索这些向量。虽然传统数据库(如MySQL)也能存储向量,但它们无法进行高效的相似性搜索。
向量数据库(Vector Database) 专为存储和查询高维向量而设计,内置了高效的近似最近邻(ANN)搜索算法,可以在毫秒级内从数百万向量中找出最相似的几个。
常见的向量数据库包括FAISS、ChromaDB、Milvus等。在本文的实践中,考虑到我们追求理解原理而非生产部署,将使用FAISS或甚至直接使用NumPy数组存储向量——这足以让我们看清RAG的本质。
三、RAG推理检索流程
当用户提出问题后,RAG系统进入在线推理阶段。这一阶段包括查询向量化、相似度检索、结果重排和提示词构建四个环节。
3.1 查询向量化与相似度检索
用户输入的问题同样需要转换为向量——必须使用与知识库向量化完全相同的嵌入模型。这样才能确保查询向量和文档向量位于同一个语义空间,距离计算才有意义。
接下来,系统计算查询向量与知识库中所有文档向量的相似度。最常用的相似度指标是余弦相似度(Cosine Similarity):
def similarity(e1, e2):
# 计算余弦相似度
dot_product = np.dot(e1, e2) # 点乘
norm_e1 = np.linalg.norm(e1) # 向量1的范数
norm_e2 = np.linalg.norm(e2) # 向量2的范数
cosine_sim = dot_product / (norm_e1 * norm_e2) # 余弦相似度
return cosine_sim
余弦相似度的取值范围是[-1, 1],值越大表示两个向量的夹角越小,语义越相近。
计算出所有相似度后,系统按分数从高到低排序,取前K个(通常K=3~5)最相似的文本块作为检索结果。
3.2 结果重排(可选优化)
基础的向量检索存在一个潜在问题:排名靠前的文档不一定是最有用的。检索阶段使用的是双编码器(Bi-Encoder) 架构,它将查询和文档分别编码,速度快但精度有限。
为了提高质量,可以引入重排(Reranking) 阶段。重排使用交叉编码器(Cross-Encoder) 架构,它将查询和文档拼接后一起输入模型,计算相关性得分,精度更高但速度慢。因此,典型的策略是:向量检索先快速召回Top 100,重排模型再从中精筛Top 5。
在本文的简化实现中,我们将跳过重排步骤,直接使用向量检索结果。
3.3 提示词构建与生成
检索到相关文档后,需要将它们与用户问题组装成一个结构化的提示词(Prompt),然后发送给大语言模型。
最简单的提示词模板如下:
prompt_template = """
基于以下知识回答问题:
知识:
1: %s
2: %s
3: %s
4: %s
5: %s
问题:%s
请基于上述知识给出准确、详细的回答。如果知识中不包含相关信息,请明确说明。
"""
这个模板的设计原则是:
- 明确信息来源:告知模型知识是从哪里来的
- 限定回答范围:要求模型”基于知识”回答,减少幻觉
- 允许不知道:为模型提供”不知道”的出口,避免编造
将组装好的提示词发送给大语言模型(本文使用DeepSeek-R1),模型生成的回答就是最终输出。
四、输出整理与优化
RAG系统的输出并非终点,还需要进行整理和可能的优化。
4.1 引用来源
高质量的RAG系统应该在回答中标注信息来源。这类似于学术论文的脚注,用户可以核实每个事实的真实性。在实践中,可以在返回检索结果时保留文档的元数据(如文件名、段落位置),然后在生成回答时要求模型引用来源编号。
4.2 查询重写(高级优化)
用户提问往往含糊不清或包含隐含意图。例如,用户问”告诉我NVIDIA模型的最新更新”,可能暗中对特定功能感兴趣,但这种偏好没有被明确表达。
查询重写(Query Rewriting) 技术可以在检索前优化用户查询,弥合用户提问方式与知识库信息结构之间的语义差距。常用方法包括:
- Q2E(Query2Expand):生成同义词和相关短语,扩展查询
- Q2D(Query2Doc):根据查询构建伪文档,匹配文档风格
- CoT(思维链)查询重写:让模型逐步推理,分解查询意图
研究表明,使用Llama 3.3 Nemotron Super 49B进行CoT查询重写后,检索准确率@10从43.1%提升至63.8%。
五、串联运行:完整Python脚本实现
现在,让我们将所有环节串联起来,编写一个完整的RAG系统Python脚本。本实现将最小化第三方框架依赖,仅使用Ollama和NumPy,确保你能够看清每个步骤的本质。
5.1 环境准备
首先,确保已安装Ollama并拉取所需模型:
# 安装Ollama(请访问ollama.com下载对应系统版本)
# 拉取模型
ollama pull deepseek-r1:8b
ollama pull nomic-embed-text
# 安装Python依赖
pip install ollama numpy
环境问题
系统python 3.9缺失
这个错误是因为Python 3.9不支持|操作符用于类型联合(type union)。在Python 3.10及以上版本中,|才被支持用于类型提示。你的代码运行在Python 3.9环境,所以报错。
需要安装python 3.10.X 使用pyenv工具,切换用户层级的python环境,
Rust编译器缺失:安装tiktoken时报错can’t find Rust compiler
SWIG工具缺失:报错command ‘swig’ failed: No such file or directory
FAISS编译失败:报错command ‘/opt/homebrew/bin/swig’ failed with exit code 1,FAISS安装失败
PyPy兼容性问题
发现你在使用PyPy而不是CPython,导致安装faiss和chromadb均报错。
重置步骤:
# 1. 确保使用 CPython
pyenv install 3.10.15
cd /Users/mac/Dev/Desktop/RagDemo
pyenv local 3.10.15
# 2. 创建干净的环境
python -m venv venv
source venv/bin/activate
# 3. 升级基础工具
pip install --upgrade pip setuptools wheel
# 4. 安装依赖(按顺序)
pip install chromadb
pip install llama-index-core
pip install llama-index-embeddings-ollama
pip install llama-index-llms-ollama
pip install llama-index-vector-stores-chroma
# 5. 验证安装
pip list | grep -E "chroma|llama"
5.2 完整代码实现
"""
基于LlamaIndex的多文件RAG系统
处理文件夹下所有文件,支持多种文件格式
显示DeepSeek-R1的完整思考过程
"""
import os
from typing import List, Generator, Optional
import warnings
warnings.filterwarnings('ignore')
# LlamaIndex核心组件
from llama_index.core import (
SimpleDirectoryReader,
VectorStoreIndex,
Settings,
Document
)
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_index.core.postprocessor import SimilarityPostprocessor
from llama_index.core.storage import StorageContext
# Ollama集成
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.llms.ollama import Ollama
# 向量存储
from llama_index.vector_stores.faiss import FaissVectorStore
import faiss
# 文件监控(可选)
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class MultiFileRAGSystem:
"""
多文件RAG系统
支持文件夹下所有文件,自动处理格式转换
"""
SUPPORTED_EXTENSIONS = [
'.txt', '.pdf', '.docx', '.pptx', '.xlsx',
'.md', '.csv', '.epub', '.html', '.htm',
'.json', '.xml', '.ipynb'
]
def __init__(
self,
docs_dir: str,
embed_model_name: str = "nomic-embed-text",
llm_model_name: str = "deepseek-r1:8b",
ollama_base_url: str = "http://localhost:11434",
chunk_size: int = 512,
chunk_overlap: int = 50,
similarity_top_k: int = 5,
persist_dir: Optional[str] = "./storage"
):
"""
初始化多文件RAG系统
Args:
docs_dir: 文档目录路径
embed_model_name: 嵌入模型名称
llm_model_name: 大语言模型名称
ollama_base_url: Ollama服务地址
chunk_size: 文本块大小
chunk_overlap: 文本块重叠大小
similarity_top_k: 检索返回的相似文档数量
persist_dir: 索引持久化目录(可选)
"""
self.docs_dir = docs_dir
self.persist_dir = persist_dir
self.similarity_top_k = similarity_top_k
self.thinking_buffer = ""
self.answer_buffer = ""
print("=" * 80)
print("多文件RAG系统初始化")
print("=" * 80)
print(f"📁 文档目录: {docs_dir}")
print(f"📊 支持的文件格式: {', '.join(self.SUPPORTED_EXTENSIONS[:5])}等")
# 检查目录是否存在
if not os.path.exists(docs_dir):
raise ValueError(f"文档目录不存在: {docs_dir}")
# 1. 配置嵌入模型
print(f"\n🔧 加载嵌入模型: {embed_model_name}")
self.embed_model = OllamaEmbedding(
model_name=embed_model_name,
base_url=ollama_base_url,
ollama_additional_kwargs={"mirostat": 0}
)
# 2. 配置大语言模型
print(f"🔧 加载语言模型: {llm_model_name}")
self.llm = Ollama(
model=llm_model_name,
base_url=ollama_base_url,
temperature=0.7,
request_timeout=120.0,
additional_kwargs={
"num_predict": 2048,
"top_k": 40,
"top_p": 0.9
}
)
# 3. 设置全局配置
Settings.embed_model = self.embed_model
Settings.llm = self.llm
Settings.chunk_size = chunk_size
Settings.chunk_overlap = chunk_overlap
# 4. 初始化文本切分器
self.node_parser = SentenceSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separator=" ",
paragraph_separator="\n\n",
secondary_chunking_regex="[^,.;:]+[,.;:]?"
)
# 5. 索引对象
self.index = None
self.query_engine = None
# 6. 文件统计信息
self.file_stats = {}
print("✅ 初始化完成")
def scan_documents(self) -> dict:
"""
扫描文档目录,统计文件信息
Returns:
文件统计信息
"""
print(f"\n📋 扫描文档目录: {self.docs_dir}")
stats = {
"total_files": 0,
"supported_files": 0,
"unsupported_files": 0,
"file_types": {},
"total_size_mb": 0
}
for root, dirs, files in os.walk(self.docs_dir):
for file in files:
file_path = os.path.join(root, file)
file_ext = os.path.splitext(file)[1].lower()
file_size = os.path.getsize(file_path) / (1024 * 1024) # MB
stats["total_files"] += 1
stats["total_size_mb"] += file_size
if file_ext in self.SUPPORTED_EXTENSIONS:
stats["supported_files"] += 1
stats["file_types"][file_ext] = stats["file_types"].get(file_ext, 0) + 1
print(f" ✅ {file} ({file_size:.2f} MB) - {file_ext}")
else:
stats["unsupported_files"] += 1
print(f" ⚠️ {file} ({file_size:.2f} MB) - 不支持格式: {file_ext}")
print(f"\n📊 统计结果:")
print(f" 总文件数: {stats['total_files']}")
print(f" 支持文件: {stats['supported_files']}")
print(f" 不支持文件: {stats['unsupported_files']}")
print(f" 总大小: {stats['total_size_mb']:.2f} MB")
print(f" 文件类型分布: {stats['file_types']}")
self.file_stats = stats
return stats
def load_documents(self, recursive: bool = True) -> List[Document]:
"""
加载文档目录下的所有文件
Args:
recursive: 是否递归加载子目录
Returns:
文档列表
"""
print(f"\n📄 加载文档...")
# 首先扫描文件
self.scan_documents()
# 使用SimpleDirectoryReader加载所有文件
reader = SimpleDirectoryReader(
input_dir=self.docs_dir,
recursive=recursive,
exclude_hidden=True, # 排除隐藏文件
required_exts=self.SUPPORTED_EXTENSIONS # 只加载支持的文件
)
documents = reader.load_data()
print(f"\n✅ 成功加载 {len(documents)} 个文档")
# 按文件类型显示统计
doc_by_type = {}
for doc in documents:
file_name = doc.metadata.get('file_name', 'unknown')
file_ext = os.path.splitext(file_name)[1].lower()
doc_by_type[file_ext] = doc_by_type.get(file_ext, 0) + 1
print("📊 文档类型分布:")
for ext, count in doc_by_type.items():
print(f" {ext}: {count} 个文档")
# 显示每个文档的信息
for i, doc in enumerate(documents[:5]): # 只显示前5个
file_name = doc.metadata.get('file_name', 'unknown')
file_size = doc.metadata.get('file_size', 0) / 1024 # KB
preview = doc.text[:100].replace('\n', ' ') + "..." if len(doc.text) > 100 else doc.text
print(f"\n 文档 {i+1}: {file_name}")
print(f" 大小: {file_size:.1f} KB")
print(f" 预览: {preview}")
if len(documents) > 5:
print(f"\n ... 还有 {len(documents) - 5} 个文档未显示")
return documents
def build_index(self, documents: List[Document], force_rebuild: bool = False):
"""
构建向量索引
Args:
documents: 文档列表
force_rebuild: 是否强制重建(忽略已有索引)
"""
# 检查是否有持久化索引
if self.persist_dir and os.path.exists(self.persist_dir) and not force_rebuild:
try:
print(f"\n🔍 发现已有索引,尝试加载...")
self.load_index()
return
except Exception as e:
print(f"⚠️ 加载索引失败: {e}")
print("将重新构建索引...")
print(f"\n🔨 构建向量索引...")
print(f"切分文档为文本块 (size={Settings.chunk_size}, overlap={Settings.chunk_overlap})")
# 构建索引(显示进度)
self.index = VectorStoreIndex.from_documents(
documents,
embed_model=self.embed_model,
node_parser=self.node_parser,
show_progress=True
)
print(f"✅ 索引构建完成")
# 获取节点统计
nodes = self.index.docstore.docs.values()
print(f" 生成文本块数量: {len(nodes)}")
# 计算平均文本块长度
avg_length = sum(len(node.text) for node in nodes) / len(nodes)
print(f" 平均文本块长度: {avg_length:.1f} 字符")
# 保存索引
if self.persist_dir:
self.save_index()
def setup_query_engine(self):
"""
配置查询引擎
"""
if not self.index:
raise ValueError("请先构建索引")
print(f"\n⚙️ 配置查询引擎...")
# 1. 创建检索器
retriever = VectorIndexRetriever(
index=self.index,
similarity_top_k=self.similarity_top_k,
embed_model=self.embed_model
)
# 2. 创建响应合成器(启用流式输出)
response_synthesizer = CompactAndRefine(
llm=self.llm,
streaming=True,
verbose=True
)
# 3. 创建后处理器
postprocessor = SimilarityPostprocessor(similarity_cutoff=0.6) # 降低阈值以包含更多结果
# 4. 组合为查询引擎
self.query_engine = RetrieverQueryEngine(
retriever=retriever,
response_synthesizer=response_synthesizer,
node_postprocessors=[postprocessor]
)
print(f"✅ 查询引擎配置完成")
print(f" 检索数量: top_{self.similarity_top_k}")
print(f" 相似度阈值: 0.6")
def query_with_thinking(self, query: str) -> Generator[dict, None, None]:
"""
执行查询并流式返回思考过程
Args:
query: 用户查询
Yields:
包含类型和内容的字典
"""
if not self.query_engine:
raise ValueError("请先配置查询引擎")
print(f"\n🔍 执行查询: '{query}'")
# 1. 执行检索
nodes = self.query_engine.retriever.retrieve(query)
# 组织检索结果
retrieval_results = []
for node in nodes:
file_name = node.metadata.get('file_name', '未知文件')
retrieval_results.append({
"text": node.text,
"score": node.score,
"file_name": file_name,
"node_id": node.node_id
})
# 显示检索结果
yield {
"type": "retrieval",
"content": {
"query": query,
"results": retrieval_results
}
}
# 2. 构建提示词(带文件来源信息)
context_parts = []
for i, node in enumerate(nodes):
file_name = node.metadata.get('file_name', '未知文件')
context_parts.append(f"[来自文件: {file_name}]\n{node.text}")
context_text = "\n\n---\n\n".join(context_parts)
prompt = f"""
基于以下从多个文件中检索到的知识回答问题。每个知识片段都标注了来源文件。
{context_text}
用户问题:{query}
请严格基于上述知识回答,不要编造信息。如果知识中不包含相关信息,请明确说明。
在回答中可以提及信息来源文件(如"根据XXX文件的描述")。
"""
yield {"type": "prompt", "content": prompt[:300] + "..." if len(prompt) > 300 else prompt}
# 3. 执行查询(流式输出)
yield {"type": "start_generation", "content": "开始生成回答..."}
streaming_response = self.query_engine.query(query)
# 重置缓冲区
self.thinking_buffer = ""
self.answer_buffer = ""
in_thinking = True
# 流式处理响应
for text_chunk in streaming_response.response_gen:
# 检测思考/回答边界
if "思考:" in text_chunk or "分析:" in text_chunk:
in_thinking = True
self.thinking_buffer += text_chunk
yield {"type": "thinking_chunk", "content": text_chunk}
elif "答案:" in text_chunk or "回答:" in text_chunk:
if in_thinking:
yield {"type": "thinking_end", "content": ""}
in_thinking = False
self.answer_buffer += text_chunk
yield {"type": "answer_chunk", "content": text_chunk}
elif in_thinking:
self.thinking_buffer += text_chunk
yield {"type": "thinking_chunk", "content": text_chunk}
else:
self.answer_buffer += text_chunk
yield {"type": "answer_chunk", "content": text_chunk}
# 4. 返回完整结果(带来源信息)
source_nodes = [
{
"text": node.text,
"score": node.score,
"file_name": node.metadata.get('file_name', '未知文件'),
"metadata": node.metadata
}
for node in streaming_response.source_nodes
]
yield {
"type": "complete",
"content": {
"thinking": self.thinking_buffer,
"answer": streaming_response.response,
"source_nodes": source_nodes
}
}
def save_index(self):
"""保存索引到磁盘"""
if not self.index or not self.persist_dir:
return
os.makedirs(self.persist_dir, exist_ok=True)
self.index.storage_context.persist(persist_dir=self.persist_dir)
print(f"✅ 索引已保存到 {self.persist_dir}")
def load_index(self):
"""从磁盘加载索引"""
from llama_index.core import load_index_from_storage
if not os.path.exists(self.persist_dir):
raise ValueError(f"持久化目录不存在: {self.persist_dir}")
storage_context = StorageContext.from_defaults(persist_dir=self.persist_dir)
self.index = load_index_from_storage(storage_context)
print(f"✅ 索引已从 {self.persist_dir} 加载")
class DocumentWatcher(FileSystemEventHandler):
"""文档目录监控器,当文件变化时自动重建索引"""
def __init__(self, rag_system: MultiFileRAGSystem):
self.rag_system = rag_system
self.last_rebuild = time.time()
self.rebuild_cooldown = 60 # 60秒内不重复重建
def on_modified(self, event):
if not event.is_directory:
self._handle_change(event.src_path)
def on_created(self, event):
if not event.is_directory:
self._handle_change(event.src_path)
def on_deleted(self, event):
if not event.is_directory:
self._handle_change(event.src_path)
def _handle_change(self, file_path):
"""处理文件变化"""
current_time = time.time()
if current_time - self.last_rebuild > self.rebuild_cooldown:
print(f"\n🔄 检测到文件变化: {file_path}")
print("正在重建索引...")
self.rag_system.build_index(
self.rag_system.load_documents(),
force_rebuild=True
)
self.last_rebuild = current_time
def interactive_mode():
"""交互式问答模式"""
# 配置
docs_dir = "./blog_docs" # 存放所有知识库文件的目录
# 初始化RAG系统
rag = MultiFileRAGSystem(
docs_dir=docs_dir,
embed_model_name="nomic-embed-text",
llm_model_name="deepseek-r1:8b",
chunk_size=512,
chunk_overlap=50,
similarity_top_k=5,
persist_dir="./storage" # 启用持久化
)
# 加载文档并构建索引
documents = rag.load_documents(recursive=True)
rag.build_index(documents)
rag.setup_query_engine()
# 交互式问答
print("\n" + "=" * 80)
print("多文件RAG系统已就绪!输入问题开始问答(输入 'exit' 退出)")
print("=" * 80)
while True:
query = input("\n📝 【用户】: ").strip()
if query.lower() in ['exit', 'quit', '退出']:
print("再见!")
break
if not query:
continue
print("\n🔄 【系统】正在处理...")
print("-" * 80)
thinking_mode = True
sources_shown = False
for event in rag.query_with_thinking(query):
if event["type"] == "retrieval":
print("\n📚 【检索结果】")
for i, result in enumerate(event["content"]["results"]):
preview = result["text"][:60] + "..." if len(result["text"]) > 60 else result["text"]
print(f" [{i+1}] 相关度: {result['score']:.4f} | 来源: {result['file_name']}")
print(f" 内容: {preview}")
elif event["type"] == "prompt":
print(f"\n📝 【提示词预览】\n {event['content']}")
elif event["type"] == "start_generation":
print("\n🧠 【模型思考中】")
elif event["type"] == "thinking_chunk":
print(event["content"], end='', flush=True)
elif event["type"] == "thinking_end":
print("\n\n💡 【思考完成,开始回答】\n")
thinking_mode = False
elif event["type"] == "answer_chunk":
print(event["content"], end='', flush=True)
elif event["type"] == "complete" and not sources_shown:
print("\n\n📊 【信息来源】")
sources = {}
for node in event["content"]["source_nodes"]:
file_name = node["file_name"]
if file_name not in sources:
sources[file_name] = []
sources[file_name].append({
"score": node["score"],
"preview": node["text"][:50] + "..."
})
for file_name, snippets in sources.items():
print(f"\n 📄 {file_name}:")
for i, snippet in enumerate(snippets):
print(f" [{i+1}] 相关度: {snippet['score']:.4f}")
print(f" {snippet['preview']}")
sources_shown = True
print("\n" + "=" * 80)
def batch_mode():
"""批量处理模式:处理多个问题"""
# 配置
docs_dir = "./blog_docs"
rag = MultiFileRAGSystem(
docs_dir=docs_dir,
persist_dir="./storage"
)
# 加载文档并构建索引
documents = rag.load_documents()
rag.build_index(documents)
rag.setup_query_engine()
# 从文件读取问题列表
questions_file = "questions.txt"
if os.path.exists(questions_file):
with open(questions_file, 'r', encoding='utf-8') as f:
questions = [q.strip() for q in f.readlines() if q.strip()]
else:
# 示例问题
questions = [
"苹果有什么营养价值?",
"香蕉适合什么时候吃?",
"如何保持健康饮食?"
]
print("\n" + "=" * 80)
print("批量处理模式")
print("=" * 80)
results = []
for i, question in enumerate(questions, 1):
print(f"\n📌 问题 {i}/{len(questions)}: {question}")
print("-" * 40)
response = rag.query_engine.query(question)
print(f"回答: {response}")
results.append({
"question": question,
"answer": str(response),
"sources": [
{
"file": node.metadata.get('file_name', '未知'),
"score": node.score,
"text": node.text[:200]
}
for node in response.source_nodes
]
})
print()
# 保存结果
import json
with open("batch_results.json", "w", encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print("✅ 结果已保存到 batch_results.json")
def watch_mode():
"""监控模式:监听文件变化并自动更新索引"""
# 配置
docs_dir = "./blog_docs"
rag = MultiFileRAGSystem(
docs_dir=docs_dir,
persist_dir="./storage"
)
# 初始构建
documents = rag.load_documents()
rag.build_index(documents)
rag.setup_query_engine()
# 启动文件监控
event_handler = DocumentWatcher(rag)
observer = Observer()
observer.schedule(event_handler, docs_dir, recursive=True)
observer.start()
print("\n" + "=" * 80)
print("监控模式已启动 - 文件变化时将自动重建索引")
print("输入问题开始问答(输入 'exit' 退出)")
print("=" * 80)
try:
while True:
query = input("\n📝 【用户】: ").strip()
if query.lower() in ['exit', 'quit', '退出']:
break
if not query:
continue
response = rag.query_engine.query(query)
print(f"\n💬 【回答】: {response}")
# 显示来源
print("\n📚 【来源】:")
for node in response.source_nodes[:3]:
file_name = node.metadata.get('file_name', '未知')
print(f" - {file_name} (相关度: {node.score:.4f})")
finally:
observer.stop()
observer.join()
def main():
"""主函数:选择运行模式"""
print("多文件RAG系统 - 选择运行模式:")
print("1. 交互式问答")
print("2. 批量处理")
print("3. 监控模式(自动更新)")
choice = input("\n请输入选择 (1/2/3): ").strip()
if choice == "1":
interactive_mode()
elif choice == "2":
batch_mode()
elif choice == "3":
watch_mode()
else:
print("无效选择,运行默认交互式模式")
interactive_mode()
if __name__ == "__main__":
main()
代码解析:
系统启动时,首先通过MultiFileRAGSystem类的构造函数完成基础配置。初始化过程包括设置文档目录路径、配置Ollama服务连接的嵌入模型(默认使用nomic-embed-text)和大语言模型(默认使用deepseek-r1:1.5b),以及定义文本处理参数如块大小512字符、块重叠50字符和检索返回数量top_k=5。同时,系统会检查文档目录是否存在,配置全局Settings对象,初始化文本切分器,并准备持久化存储目录。
系统通过scan_documents方法递归扫描指定目录下的所有文件,统计总文件数、支持的文件格式(包括txt、pdf、docx、pptx、xlsx、md、csv等十多种格式)、文件大小分布等信息,并识别不支持的文件格式。随后,load_documents方法调用SimpleDirectoryReader加载所有支持格式的文档,将每个文件转换为Document对象,保留文件名、文件大小等元数据信息。
在索引构建阶段,系统使用SentenceSplitter将加载的文档切分为固定大小的文本块(节点),每个节点包含文本内容和来源元数据。通过OllamaEmbedding将文本块转换为向量表示,并存储到Faiss向量数据库中构建VectorStoreIndex。如果启用了持久化功能,索引会被保存到磁盘,方便后续快速加载而无需重复构建。构建完成后,系统会统计生成的文本块数量和平均长度。
setup_query_engine方法负责配置完整的查询流水线。首先创建VectorIndexRetriever检索器,设置相似度检索的top_k参数。然后创建CompactAndRefine响应合成器,启用流式输出和详细模式。接着配置SimilarityPostprocessor后处理器,设置相似度阈值为0.6以过滤低质量结果。最后将这些组件组合成RetrieverQueryEngine,形成完整的查询处理链路。
当用户提交查询时,系统执行query_with_thinking方法,该方法设计为生成器函数以支持流式输出。查询处理分为多个阶段:
首先是检索阶段,系统将用户问题向量化后到索引中检索最相似的top_k个文本块,返回每个块的文本内容、相似度分数和来源文件信息,这些结果以"retrieval"类型的事件流式输出。
其次是提示词构建阶段,系统将检索到的文本块按照来源文件组织,构建包含完整上下文信息的提示词,明确标注每个知识片段的来源文件,要求LLM严格基于提供的知识回答,并可提及信息来源。构建的提示词以"prompt"类型事件输出。
然后是流式生成阶段,系统调用查询引擎执行查询,获取流式响应。在生成过程中,系统实时分析输出的文本流,智能区分"思考过程"和"最终答案"两部分。通过检测"思考:"、"分析:"等关键词标记思考过程的开始,检测"答案:"、"回答:"等关键词标记思考与回答的边界。思考过程的每个文本块以"thinking_chunk"类型输出,回答部分以"answer_chunk"类型输出,实现类似DeepSeek-R1的思考过程展示效果。
最后是结果整合阶段,生成完成后,系统收集完整的思考过程和最终答案,同时整理所有引用的源节点信息,包括每个节点的文本预览、相似度分数和来源文件,以"complete"类型事件返回完整结果。
三种运行模式详解
交互式模式是系统的主要使用方式,初始化RAG系统后进入问答循环。用户输入问题后,系统实时展示检索到的相关文本块及其相似度分数,然后逐字显示模型的思考过程,接着显示最终生成的答案,最后列出所有信息来源文件及具体引用的文本片段,实现完全透明的问答过程。
批量处理模式适用于需要处理多个预定义问题的场景。系统从questions.txt文件读取问题列表,或使用内置示例问题,依次处理每个问题。每个问题的回答结果和来源信息被收集并保存为结构化的JSON文件,便于后续分析和处理。
监控模式通过watchdog库实现文件系统的实时监控。当检测到文档目录中的文件被修改、创建或删除时,系统自动触发索引重建,确保知识库的实时性。同时,用户仍可进行问答交互,系统会使用最新的索引提供服务。为防止频繁重建,设置了60秒的重建冷却时间。
运行效果对比
直接在命令行中运行提问
>>> 安卓设备开机流程有哪些
Thinking...
好的,用户之前问过关于“helolo”的意思,我解释了一下,然后他又问了“你好”。看起来他是在确认是否能直接使用我的回答。现在他
询问“安卓设备开机流程有哪些”,这显然是在寻求指导。
我觉得可能有些疑问或者对安卓设备操作感兴趣。所以,我决定详细介绍一下安卓设备的开机步骤,包括启动程序、设置软件和完成一
些基本的用户操作。
可能他有新手,或者是想确认一些基本信息来更好地使用手机或电脑,比如设置、任务管理等。我的回答要尽量简单易懂,避免太专业
的术语,让他能够轻松理解。
另外,我还用了感叹号加强语气,显得比较热情和亲切,希望他能对我的建议有所认可。这样不仅帮助他解决了问题,还增加了与用户
的互动感。
...done thinking.
好的!安卓设备开机流程主要分为以下几步:
---
### **第一步:找到并启动操作系统**
1. 打开手机或电脑。
2. 在桌面上找到“应用商店”(在Windows系统中是“Settings”,在Mac系统中是“Programmer”)。
3. 点击“应用商店”后,找到并打开“Android”或“安卓”应用商店(根据你使用的设备类型而定)。
---
### **第二步:启动目标设备**
1. 打开“设置”(如“ Settings”在Windows系统中,或“System Settings”在Mac系统中)。
2. 在设置窗口中找到并打开你想操作的安卓设备的操作系统版本。
3. 点击“确定”即可开始运行目标设备。
---
### **第三步:进入设置**
1. 打开“设置”,然后选择要启动的设备。
2. 设备在设备界面上会显示为一个图标或图标,点击完成即可连接到目标设备。
---
### **第四步:运行设备**
1. 确认输入并点击“确定”按钮(Windows系统)或“下一步”(Mac系统)。
2. 运行目标设备后,你可以看到设备的设置界面,可以通过右键点击设备进行更多操作。
---
### **注意事项**
- 如果你的手机或电脑没有连接到互联网,可能需要关闭网络选项以确保设备能够正确启动。
- 检查并安装必要的软件,如系统更新、应用商店和第三方工具等。
希望这对你有帮助!如果你还有其他问题,随时告诉我哦! 😊
>>> Send a message (/? for help)
RAG运行效果
📝 【用户】: 安卓设备开机流程有哪些
🔄 【系统】正在处理...
--------------------------------------------------------------------------------
🔍 执行查询: '安卓设备开机流程有哪些'
📚 【检索结果】
[1] 相关度: 0.6225 | 来源: 2022-12-12-【Android进阶】Android设备开机流程.md
内容: 通常是芯片内部固化的一段非常小的启动代码(BootROM)。这段代码是芯片制造商预先写入的,不可修改
## 执行芯片内...
[2] 相关度: 0.6050 | 来源: 2022-12-14-【Android进阶】Android热门原理流程总结.md
内容: ## 内存泄漏常见场景
见性能优化篇:
[【Android性能优化】内存](./2025-1-4-【Android性能优...
[3] 相关度: 0.6010 | 来源: 2022-12-12-【Android进阶】Android设备开机流程.md
内容: ..
## 桌面环境启动阶段
System Server启动完成后,AMS会启动Launcher应用。Launcher...
[4] 相关度: 0.5841 | 来源: 2022-12-14-【Android进阶】Android热门依赖库知识点总结.md
内容: * ViewModel 暴露 LiveData
* View(Activity/Fragment) 观察 LiveDat...
[5] 相关度: 0.5775 | 来源: 2022-12-14-【Android进阶】Android热门依赖库知识点总结.md
内容: * 当 Activity 可能被销毁时(如配置变更或后台回收),系统会调用 onSaveInstanceState(Bu...
📝 【提示词预览】
基于以下从多个文件中检索到的知识回答问题。每个知识片段都标注了来源文件。
[来自文件: 2022-12-12-【Android进阶】Android设备开机流程.md]
通常是芯片内部固化的一段非常小的启动代码(BootROM)。这段代码是芯片制造商预先写入的,不可修改
## 执行芯片内部BootROM代码
BootROM代码执行基本的硬件初始化,验证并加载下一阶段的引导程序(通常是从特定存储区域),实现安全验证(如验证Bootloader签名)
## 安全启动(Secure Boot)验证
现代Android设备都支持安全启动机制,BootROM会验证Bootloade...
🧠 【模型思考中】
Android设备开机流程主要包括以下几个部分:
1. **系统服务和桌面组件初始化完成**:设备显示完整的桌面环境,用户可以开始与设备交互。
2. **系统启动稍作了解即可**:除了SystemUI和Launcher外,我们更多关注的是应用层的启动流程。在Android进阶的文章中详细描述了这些内容。
3. **系统启动流程**:
- 系统启动流程包括冷启动流程、Handler消息机制以及Activity Window初始化等部分。
- 冷启动流程用于快速启动设备,使其处于待机状态。
- Handler消息机制负责处理用户请求和应用信息的同步。
- Activity Window初始化为每个应用提供一个独立的工作界面。
4. **设备进入 Bootloader模式**:在系统运行时,设备会按特定键(如Volume+Power)进入Bootloader模式。在启动完成后,Boot ROM代码会被加载并执行,确保设备安全运行。
这些内容详细涵盖了Android设备开机流程的核心步骤和过程,从系统初始化到用户界面的设置。
📊 【信息来源】
📄 2022-12-12-【Android进阶】Android设备开机流程.md:
[1] 相关度: 0.6225
通常是芯片内部固化的一段非常小的启动代码(BootROM)。这段代码是芯片制造商预先写入的,不可修改...
[2] 相关度: 0.6010
..
## 桌面环境启动阶段
System Server启动完成后,AMS会启动Launcher应...
📄 2022-12-14-【Android进阶】Android热门原理流程总结.md:
[1] 相关度: 0.6050
## 内存泄漏常见场景
见性能优化篇:
[【Android性能优化】内存](./2025-1-4-【...
================================================================================
传统模型回答(左):
完全依赖模型预训练知识,回答内容泛化且存在明显错误
将安卓设备开机流程混淆为"打开应用商店"、"启动目标设备"等错误操作
出现了"在Mac系统中是Programmer"等荒谬描述
整体回答缺乏技术深度,更像是通用设备使用指南
RAG增强回答(右):
严格基于检索到的技术文档回答问题
准确描述了BootROM、安全启动、System Server、Launcher等专业概念
引用了具体的Markdown文档作为知识来源(如2022-12-12-【Android进阶】Android设备开机流程.md)
回答内容专业、准确,符合Android开发者的技术认知
结语
本文从RAG的出现原因出发,详细讲解了向量化流程、推理检索流程和输出整理优化,并最终给出了一个完整、可运行的本地RAG系统实现。通过这个实战项目,你可以清晰地看到:RAG的本质就是检索 + 上下文增强 + 生成这三步曲。
RAG系统的本质是”开卷考试”:当面对”安卓设备开机流程”这样的专业问题时,它不像传统模型那样凭记忆”闭卷作答”(然后编造错误答案),而是先查阅技术文档资料,再基于资料给出准确回答。这种模式既保留了大语言模型的强大生成能力,又通过外部知识库弥补了其知识局限性和幻觉问题,是构建专业领域AI助手的理想方案。
当你运行这个系统,输入”健康的水果推荐”,即使知识库中从未出现”推荐”二字,系统也能通过语义相似度找到苹果、香蕉、橙子的描述——这就是向量检索的魅力。当你看到大模型基于检索到的知识给出准确回答,而不是凭空编造时,你就理解了RAG为什么能成为AI应用开发的基石。
从简单的流水线到自主的Agentic RAG,这项技术仍在快速演进。但无论多复杂的系统,其核心都是本文揭示的这些基本原理。希望这篇文章能帮助你打下坚实的基础,在RAG的世界里走得更远。