系统架构总览
📊 数据处理流水线
文档接收
API Gateway
文件解析服务
内容提取
NLP处理
实体识别
图构建
关系抽取
向量化
索引存储
图数据库
向量索引
查询响应
检索引擎
结果生成
🔧 核心启动流程
# main.py - 系统启动入口
from fastapi import FastAPI
from lightrag.core import LightRAGEngine
from lightrag.services import DocumentService, QueryService
from lightrag.database import init_databases
app = FastAPI(title="LightRAG API", version="2.0.0")
# 初始化核心组件
async def startup_event():
# 1. 初始化数据库连接
await init_databases()
# 2. 启动LightRAG引擎
engine = LightRAGEngine(
graph_db_url="neo4j://localhost:7687",
vector_store_path="/data/faiss_index",
redis_url="redis://localhost:6379"
)
await engine.initialize()
# 3. 注册服务
app.state.document_service = DocumentService(engine)
app.state.query_service = QueryService(engine)
app.add_event_handler("startup", startup_event)
API网关
负载均衡、请求路由、认证鉴权、限流控制
技术栈:
Nginx
JWT
Rate Limiting
AI处理引擎
文本理解、实体抽取、关系识别、向量生成
技术栈:
Transformers
OpenAI API
spaCy
数据存储层
图数据库、向量索引、关系型数据库、缓存层
技术栈:
Neo4j
FAISS
PostgreSQL
Redis
检索引擎
双层检索、相似性搜索、图遍历、结果排序
技术栈:
HNSW
Cypher
Async
微服务架构设计
📦 核心服务模块
# lightrag/services/document_service.py
class DocumentService:
def __init__(self, engine: LightRAGEngine):
self.engine = engine
self.parser = DocumentParser()
self.extractor = EntityExtractor()
self.graph_builder = GraphBuilder()
async def process_document(self, file_content: bytes, file_type: str):
# 1. 解析文档内容
chunks = await self.parser.parse(file_content, file_type)
# 2. 并行处理文档块
tasks = [self.process_chunk(chunk) for chunk in chunks]
results = await asyncio.gather(*tasks)
# 3. 构建知识图谱
graph_data = await self.graph_builder.build(results)
# 4. 存储到数据库
document_id = await self.engine.store_document(graph_data)
return {"document_id": document_id, "status": "success"}
async def process_chunk(self, chunk: str):
# 实体抽取
entities = await self.extractor.extract_entities(chunk)
# 关系抽取
relations = await self.extractor.extract_relations(chunk, entities)
# 向量化
embeddings = await self.engine.vectorize(chunk)
return {
"text": chunk,
"entities": entities,
"relations": relations,
"embeddings": embeddings
}
文档处理服务
负责文档解析、分块、实体抽取、关系识别
处理能力: 1000 docs/min
支持格式: PDF, DOCX, TXT, MD, HTML
支持格式: PDF, DOCX, TXT, MD, HTML
查询处理服务
智能查询解析、双层检索、结果融合与排序
响应时间: < 500ms
并发能力: 1000 QPS
并发能力: 1000 QPS
图管理服务
知识图谱构建、更新、查询、图算法执行
节点数量: 10M+
关系数量: 50M+
关系数量: 50M+
向量检索服务
高维向量存储、相似性搜索、索引优化
向量维度: 1536
搜索精度: 99.5%
搜索精度: 99.5%
缓存服务
查询结果缓存、会话管理、频繁数据预热
命中率: 85%
缓存大小: 50GB
缓存大小: 50GB
消息队列
异步任务处理、事件驱动、服务解耦
吞吐量: 10K msg/s
可靠性: 99.99%
可靠性: 99.99%
🔄 服务间通信机制
# lightrag/core/message_bus.py
class MessageBus:
def __init__(self):
self.redis_client = Redis(host="redis", port=6379)
self.subscribers = {}
async def publish(self, channel: str, message: dict):
# 发布事件到指定频道
await self.redis_client.publish(
channel,
json.dumps(message)
)
async def subscribe(self, channel: str, handler):
# 订阅频道并注册处理函数
pubsub = self.redis_client.pubsub()
await pubsub.subscribe(channel)
async for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
await handler(data)
# 事件示例
await message_bus.publish("document.processed", {
"document_id": "doc_123",
"entities_count": 45,
"relations_count": 67,
"processing_time": 2.3
})
数据存储架构
📊 PostgreSQL - 元数据
documents
表
id
UUID
filename
VARCHAR
file_size
BIGINT
upload_time
TIMESTAMP
processing_status
ENUM
🕸️ Neo4j - 知识图谱
Entity
节点
name
String
type
String
description
Text
RELATES_TO
关系
confidence
Float
🧮 FAISS - 向量索引
向量维度
1536
索引类型
HNSW
M参数
16
efConstruction
200
距离度量
Inner Product
向量数量
10M+
⚡ Redis - 缓存层
查询缓存
Hash
会话数据
String
热点实体
Set
消息队列
List
分布式锁
String
TTL
3600s
🗄️ 数据库操作层
# lightrag/database/graph_repository.py
class GraphRepository:
def __init__(self, neo4j_uri: str):
self.driver = GraphDatabase.driver(neo4j_uri)
async def create_entity(self, entity: Entity):
async with self.driver.session() as session:
query = """
MERGE (e:Entity {name: $name, type: $type})
SET e.description = $description,
e.updated_at = datetime()
RETURN e
"""
result = await session.run(query,
name=entity.name,
type=entity.type,
description=entity.description
)
return result.single()
async def create_relationship(self, rel: Relationship):
async with self.driver.session() as session:
query = """
MATCH (a:Entity {name: $from_entity})
MATCH (b:Entity {name: $to_entity})
MERGE (a)-[r:RELATES_TO {type: $rel_type}]->(b)
SET r.confidence = $confidence,
r.created_at = datetime()
RETURN r
"""
await session.run(query,
from_entity=rel.from_entity,
to_entity=rel.to_entity,
rel_type=rel.type,
confidence=rel.confidence
)
async def find_related_entities(self, entity_name: str, hops: int = 2):
async with self.driver.session() as session:
query = f"""
MATCH (start:Entity {{name: $entity_name}})
MATCH path = (start)-[*1..{hops}]-(related:Entity)
RETURN DISTINCT related, length(path) as distance
ORDER BY distance, related.name
LIMIT 100
"""
result = await session.run(query, entity_name=entity_name)
return [record["related"] for record in result]
🧮 向量存储操作
# lightrag/database/vector_store.py
import faiss
import numpy as np
class VectorStore:
def __init__(self, dimension: int = 1536):
self.dimension = dimension
self.index = faiss.IndexHNSWFlat(dimension, 16)
self.index.hnsw.efConstruction = 200
self.index.hnsw.efSearch = 100
self.id_to_text = {}
def add_vectors(self, vectors: np.ndarray, texts: list):
# 添加向量到索引
start_id = self.index.ntotal
self.index.add(vectors.astype('float32'))
# 保存ID到文本的映射
for i, text in enumerate(texts):
self.id_to_text[start_id + i] = text
def search(self, query_vector: np.ndarray, k: int = 10):
# 执行向量相似性搜索
query_vector = query_vector.reshape(1, -1).astype('float32')
scores, indices = self.index.search(query_vector, k)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx in self.id_to_text:
results.append({
"text": self.id_to_text[idx],
"score": float(score),
"id": int(idx)
})
return results
def save_index(self, filepath: str):
# 保存索引到磁盘
faiss.write_index(self.index, filepath)
# 保存映射关系
with open(f"{filepath}.mapping", 'w') as f:
json.dump(self.id_to_text, f)
RESTful API 设计
📄 文档管理 API
POST
/api/v1/documents/upload
上传并处理新文档
GET
/api/v1/documents/{document_id}
获取文档详细信息
GET
/api/v1/documents/{document_id}/status
查询文档处理状态
POST
/api/v1/documents/{document_id}/reprocess
重新处理指定文档
🔍 查询检索 API
POST
/api/v1/query
执行智能查询检索
POST
/api/v1/query/entities
实体级精确查询
POST
/api/v1/query/concepts
概念级语义查询
🕸️ 知识图谱 API
GET
/api/v1/graph/entities/{entity_name}
获取实体详细信息
GET
/api/v1/graph/entities/{entity_name}/relations
获取实体关系网络
POST
/api/v1/graph/explore
图遍历与路径发现
🔌 API 路由定义
# lightrag/api/routes/documents.py
from fastapi import APIRouter, UploadFile, File
from lightrag.schemas import DocumentResponse, ProcessingStatus
router = APIRouter(prefix="/api/v1/documents", tags=["documents"])
@router.post("/upload", response_model=DocumentResponse)
async def upload_document(
file: UploadFile = File(...),
background_tasks: BackgroundTasks,
document_service: DocumentService = Depends(get_document_service)
):
# 1. 验证文件格式
if not is_supported_format(file.filename):
raise HTTPException(400, "不支持的文件格式")
# 2. 保存文件
file_content = await file.read()
document_id = str(uuid4())
# 3. 异步处理
background_tasks.add_task(
document_service.process_document,
document_id,
file_content,
file.content_type
)
return DocumentResponse(
document_id=document_id,
filename=file.filename,
status="processing",
upload_time=datetime.utcnow()
)
@router.get("/{document_id}/status")
async def get_processing_status(
document_id: str,
document_service: DocumentService = Depends(get_document_service)
):
status = await document_service.get_status(document_id)
if not status:
raise HTTPException(404, "文档不存在")
return status
🔍 查询 API 实现
# lightrag/api/routes/query.py
@router.post("/query")
async def intelligent_query(
request: QueryRequest,
query_service: QueryService = Depends(get_query_service)
):
# 1. 查询预处理
processed_query = await query_service.preprocess_query(request.query)
# 2. 查询路由决策
strategy = await query_service.determine_strategy(processed_query)
# 3. 执行检索
if strategy == "low_level":
results = await query_service.low_level_retrieval(processed_query)
elif strategy == "high_level":
results = await query_service.high_level_retrieval(processed_query)
else:
# 混合检索
low_results = await query_service.low_level_retrieval(processed_query)
high_results = await query_service.high_level_retrieval(processed_query)
results = await query_service.fuse_results(low_results, high_results)
# 4. 结果后处理
response = await query_service.generate_response(
query=request.query,
retrieved_contexts=results,
max_tokens=request.max_tokens or 1000
)
return QueryResponse(
query=request.query,
answer=response.answer,
sources=response.sources,
confidence=response.confidence,
processing_time=response.processing_time
)
🧪 API 测试工具
模拟真实的API调用过程
点击上方按钮测试不同的API端点...
容器化部署架构
🌐 负载均衡层
Nginx Ingress
SSL终端
限流控制
健康检查
⚙️ 应用服务层
API Gateway
文档处理服务
查询服务
图管理服务
向量检索服务
💾 数据存储层
Neo4j集群
PostgreSQL主从
Redis集群
FAISS分片
📊 监控运维层
Prometheus
Grafana
ELK Stack
Jaeger链路追踪
🐳 Docker Compose 配置
# docker-compose.yml
version: '3.8'
services:
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- api-gateway
api-gateway:
build: ./services/api-gateway
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:pass@postgres:5432/lightrag
- REDIS_URL=redis://redis:6379
- NEO4J_URI=bolt://neo4j:7687
depends_on:
- postgres
- redis
- neo4j
document-processor:
build: ./services/document-processor
deploy:
replicas: 3
environment:
- CELERY_BROKER=redis://redis:6379
- MODEL_CACHE_DIR=/models
volumes:
- model-cache:/models
- document-storage:/data/documents
query-engine:
build: ./services/query-engine
deploy:
replicas: 5
environment:
- FAISS_INDEX_PATH=/data/faiss
- OPENAI_API_KEY=${OPENAI_API_KEY}
volumes:
- faiss-index:/data/faiss
postgres:
image: postgres:15
environment:
- POSTGRES_DB=lightrag
- POSTGRES_USER=lightrag_user
- POSTGRES_PASSWORD=${DB_PASSWORD}
volumes:
- postgres-data:/var/lib/postgresql/data
neo4j:
image: neo4j:5.0-enterprise
environment:
- NEO4J_AUTH=neo4j/${NEO4J_PASSWORD}
- NEO4J_PLUGINS=["apoc"]
volumes:
- neo4j-data:/data
- neo4j-logs:/logs
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
volumes:
- redis-data:/data
volumes:
postgres-data:
neo4j-data:
neo4j-logs:
redis-data:
faiss-index:
model-cache:
document-storage:
☸️ Kubernetes 部署配置
# k8s/api-gateway-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: lightrag-api-gateway
labels:
app: lightrag-api-gateway
spec:
replicas: 3
selector:
matchLabels:
app: lightrag-api-gateway
template:
metadata:
labels:
app: lightrag-api-gateway
spec:
containers:
- name: api-gateway
image: lightrag/api-gateway:v2.0.0
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: lightrag-secrets
key: database-url
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
监控告警系统
🔍 实时系统指标
1,247
每秒查询数
342ms
平均响应时间
94.7%
检索准确率
99.97%
系统可用性
85.3%
缓存命中率
67%
内存使用率
📈 性能趋势图
📊 Prometheus 配置
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "lightrag_rules.yml"
scrape_configs:
- job_name: 'lightrag-api'
static_configs:
- targets: ['api-gateway:8000']
metrics_path: /metrics
scrape_interval: 10s
- job_name: 'postgres-exporter'
static_configs:
- targets: ['postgres-exporter:9187']
- job_name: 'redis-exporter'
static_configs:
- targets: ['redis-exporter:9121']
- job_name: 'neo4j-exporter'
static_configs:
- targets: ['neo4j-exporter:9308']
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
🚨 告警规则配置
# lightrag_rules.yml
groups:
- name: lightrag.rules
rules:
# API 响应时间告警
- alert: HighResponseTime
expr: avg(api_request_duration_seconds) > 1.0
for: 2m
labels:
severity: warning
annotations:
summary: "API响应时间过高"
description: "平均响应时间超过1秒,当前值: {{ $value }}s"
# 错误率告警
- alert: HighErrorRate
expr: rate(api_requests_total{status=~"5.."}[5m]) / rate(api_requests_total[5m]) > 0.05
for: 1m
labels:
severity: critical
annotations:
summary: "服务错误率过高"
description: "5xx错误率超过5%,当前值: {{ $value | humanizePercentage }}"
# 数据库连接告警
- alert: DatabaseConnectionHigh
expr: pg_stat_activity_count > 80
for: 5m
labels:
severity: warning
annotations:
summary: "数据库连接数过高"
description: "PostgreSQL活跃连接数: {{ $value }}"
# 内存使用告警
- alert: HighMemoryUsage
expr: (1 - memory_MemAvailable_bytes / memory_MemTotal_bytes) > 0.9
for: 5m
labels:
severity: critical
annotations:
summary: "内存使用率过高"
description: "内存使用率: {{ $value | humanizePercentage }}"
📊 业务指标采集
# lightrag/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time
# 定义指标
REQUEST_COUNT = Counter(
'lightrag_requests_total',
'Total requests',
['method', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'lightrag_request_duration_seconds',
'Request duration in seconds',
['method', 'endpoint']
)
RETRIEVAL_ACCURACY = Gauge(
'lightrag_retrieval_accuracy',
'Retrieval accuracy percentage'
)
ACTIVE_CONNECTIONS = Gauge(
'lightrag_active_connections',
'Number of active database connections'
)
CACHE_HIT_RATE = Gauge(
'lightrag_cache_hit_rate',
'Cache hit rate percentage'
)
class MetricsCollector:
def __init__(self):
self.start_time = time.time()
def record_request(self, method: str, endpoint: str, status: int, duration: float):
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
REQUEST_DURATION.labels(method=method, endpoint=endpoint).observe(duration)
def update_retrieval_accuracy(self, accuracy: float):
RETRIEVAL_ACCURACY.set(accuracy)
def update_cache_metrics(self, hit_rate: float):
CACHE_HIT_RATE.set(hit_rate)
# 中间件
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
metrics_collector.record_request(
method=request.method,
endpoint=request.url.path,
status=response.status_code,
duration=duration
)
return response