🚀 LightRAG 后端运行架构

分布式检索增强生成系统的技术实现原理

🐍 Python 3.11
⚡ FastAPI
🗄️ PostgreSQL + Neo4j
🔍 FAISS + HNSW
📦 Redis
🐳 Docker + K8s
📊 Prometheus
🔄 Nginx

🏗️ 系统架构总览

📊 数据处理流水线

文档接收

API Gateway

文件解析服务

内容提取

NLP处理

实体识别

图构建

关系抽取

向量化

索引存储

图数据库

向量索引

查询响应

检索引擎

结果生成

🔧 核心启动流程
# main.py - 系统启动入口
from fastapi import FastAPI
from lightrag.core import LightRAGEngine
from lightrag.services import DocumentService, QueryService
from lightrag.database import init_databases

app = FastAPI(title="LightRAG API", version="2.0.0")

# 初始化核心组件
async def startup_event():
    # 1. 初始化数据库连接
    await init_databases()
    
    # 2. 启动LightRAG引擎
    engine = LightRAGEngine(
        graph_db_url="neo4j://localhost:7687",
        vector_store_path="/data/faiss_index",
        redis_url="redis://localhost:6379"
    )
    await engine.initialize()
    
    # 3. 注册服务
    app.state.document_service = DocumentService(engine)
    app.state.query_service = QueryService(engine)

app.add_event_handler("startup", startup_event)
🌐
API网关
负载均衡、请求路由、认证鉴权、限流控制
技术栈:
Nginx JWT Rate Limiting
🧠
AI处理引擎
文本理解、实体抽取、关系识别、向量生成
技术栈:
Transformers OpenAI API spaCy
📊
数据存储层
图数据库、向量索引、关系型数据库、缓存层
技术栈:
Neo4j FAISS PostgreSQL Redis
🔍
检索引擎
双层检索、相似性搜索、图遍历、结果排序
技术栈:
HNSW Cypher Async

⚙️ 微服务架构设计

📦 核心服务模块
# lightrag/services/document_service.py
class DocumentService:
    def __init__(self, engine: LightRAGEngine):
        self.engine = engine
        self.parser = DocumentParser()
        self.extractor = EntityExtractor()
        self.graph_builder = GraphBuilder()
    
    async def process_document(self, file_content: bytes, file_type: str):
        # 1. 解析文档内容
        chunks = await self.parser.parse(file_content, file_type)
        
        # 2. 并行处理文档块
        tasks = [self.process_chunk(chunk) for chunk in chunks]
        results = await asyncio.gather(*tasks)
        
        # 3. 构建知识图谱
        graph_data = await self.graph_builder.build(results)
        
        # 4. 存储到数据库
        document_id = await self.engine.store_document(graph_data)
        
        return {"document_id": document_id, "status": "success"}

    async def process_chunk(self, chunk: str):
        # 实体抽取
        entities = await self.extractor.extract_entities(chunk)
        
        # 关系抽取
        relations = await self.extractor.extract_relations(chunk, entities)
        
        # 向量化
        embeddings = await self.engine.vectorize(chunk)
        
        return {
            "text": chunk,
            "entities": entities,
            "relations": relations,
            "embeddings": embeddings
        }
📄
文档处理服务
负责文档解析、分块、实体抽取、关系识别
处理能力: 1000 docs/min
支持格式: PDF, DOCX, TXT, MD, HTML
🔍
查询处理服务
智能查询解析、双层检索、结果融合与排序
响应时间: < 500ms
并发能力: 1000 QPS
🕸️
图管理服务
知识图谱构建、更新、查询、图算法执行
节点数量: 10M+
关系数量: 50M+
🧮
向量检索服务
高维向量存储、相似性搜索、索引优化
向量维度: 1536
搜索精度: 99.5%
缓存服务
查询结果缓存、会话管理、频繁数据预热
命中率: 85%
缓存大小: 50GB
📬
消息队列
异步任务处理、事件驱动、服务解耦
吞吐量: 10K msg/s
可靠性: 99.99%
🔄 服务间通信机制
# lightrag/core/message_bus.py
class MessageBus:
    def __init__(self):
        self.redis_client = Redis(host="redis", port=6379)
        self.subscribers = {}
    
    async def publish(self, channel: str, message: dict):
        # 发布事件到指定频道
        await self.redis_client.publish(
            channel, 
            json.dumps(message)
        )
    
    async def subscribe(self, channel: str, handler):
        # 订阅频道并注册处理函数
        pubsub = self.redis_client.pubsub()
        await pubsub.subscribe(channel)
        
        async for message in pubsub.listen():
            if message['type'] == 'message':
                data = json.loads(message['data'])
                await handler(data)

# 事件示例
await message_bus.publish("document.processed", {
    "document_id": "doc_123",
    "entities_count": 45,
    "relations_count": 67,
    "processing_time": 2.3
})

💾 数据存储架构

📊 PostgreSQL - 元数据
documents
id UUID
filename VARCHAR
file_size BIGINT
upload_time TIMESTAMP
processing_status ENUM
🕸️ Neo4j - 知识图谱
Entity 节点
name String
type String
description Text
RELATES_TO 关系
confidence Float
🧮 FAISS - 向量索引
向量维度 1536
索引类型 HNSW
M参数 16
efConstruction 200
距离度量 Inner Product
向量数量 10M+
⚡ Redis - 缓存层
查询缓存 Hash
会话数据 String
热点实体 Set
消息队列 List
分布式锁 String
TTL 3600s
🗄️ 数据库操作层
# lightrag/database/graph_repository.py
class GraphRepository:
    def __init__(self, neo4j_uri: str):
        self.driver = GraphDatabase.driver(neo4j_uri)
    
    async def create_entity(self, entity: Entity):
        async with self.driver.session() as session:
            query = """
            MERGE (e:Entity {name: $name, type: $type})
            SET e.description = $description,
                e.updated_at = datetime()
            RETURN e
            """
            result = await session.run(query, 
                name=entity.name,
                type=entity.type,
                description=entity.description
            )
            return result.single()
    
    async def create_relationship(self, rel: Relationship):
        async with self.driver.session() as session:
            query = """
            MATCH (a:Entity {name: $from_entity})
            MATCH (b:Entity {name: $to_entity})
            MERGE (a)-[r:RELATES_TO {type: $rel_type}]->(b)
            SET r.confidence = $confidence,
                r.created_at = datetime()
            RETURN r
            """
            await session.run(query,
                from_entity=rel.from_entity,
                to_entity=rel.to_entity,
                rel_type=rel.type,
                confidence=rel.confidence
            )
    
    async def find_related_entities(self, entity_name: str, hops: int = 2):
        async with self.driver.session() as session:
            query = f"""
            MATCH (start:Entity {{name: $entity_name}})
            MATCH path = (start)-[*1..{hops}]-(related:Entity)
            RETURN DISTINCT related, length(path) as distance
            ORDER BY distance, related.name
            LIMIT 100
            """
            result = await session.run(query, entity_name=entity_name)
            return [record["related"] for record in result]
🧮 向量存储操作
# lightrag/database/vector_store.py
import faiss
import numpy as np

class VectorStore:
    def __init__(self, dimension: int = 1536):
        self.dimension = dimension
        self.index = faiss.IndexHNSWFlat(dimension, 16)
        self.index.hnsw.efConstruction = 200
        self.index.hnsw.efSearch = 100
        self.id_to_text = {}
    
    def add_vectors(self, vectors: np.ndarray, texts: list):
        # 添加向量到索引
        start_id = self.index.ntotal
        self.index.add(vectors.astype('float32'))
        
        # 保存ID到文本的映射
        for i, text in enumerate(texts):
            self.id_to_text[start_id + i] = text
    
    def search(self, query_vector: np.ndarray, k: int = 10):
        # 执行向量相似性搜索
        query_vector = query_vector.reshape(1, -1).astype('float32')
        scores, indices = self.index.search(query_vector, k)
        
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx in self.id_to_text:
                results.append({
                    "text": self.id_to_text[idx],
                    "score": float(score),
                    "id": int(idx)
                })
        
        return results
    
    def save_index(self, filepath: str):
        # 保存索引到磁盘
        faiss.write_index(self.index, filepath)
        
        # 保存映射关系
        with open(f"{filepath}.mapping", 'w') as f:
            json.dump(self.id_to_text, f)

🔌 RESTful API 设计

📄 文档管理 API

POST /api/v1/documents/upload
上传并处理新文档
GET /api/v1/documents/{document_id}
获取文档详细信息
GET /api/v1/documents/{document_id}/status
查询文档处理状态
POST /api/v1/documents/{document_id}/reprocess
重新处理指定文档

🔍 查询检索 API

POST /api/v1/query
执行智能查询检索
POST /api/v1/query/entities
实体级精确查询
POST /api/v1/query/concepts
概念级语义查询

🕸️ 知识图谱 API

GET /api/v1/graph/entities/{entity_name}
获取实体详细信息
GET /api/v1/graph/entities/{entity_name}/relations
获取实体关系网络
POST /api/v1/graph/explore
图遍历与路径发现
🔌 API 路由定义
# lightrag/api/routes/documents.py
from fastapi import APIRouter, UploadFile, File
from lightrag.schemas import DocumentResponse, ProcessingStatus

router = APIRouter(prefix="/api/v1/documents", tags=["documents"])

@router.post("/upload", response_model=DocumentResponse)
async def upload_document(
    file: UploadFile = File(...),
    background_tasks: BackgroundTasks,
    document_service: DocumentService = Depends(get_document_service)
):
    # 1. 验证文件格式
    if not is_supported_format(file.filename):
        raise HTTPException(400, "不支持的文件格式")
    
    # 2. 保存文件
    file_content = await file.read()
    document_id = str(uuid4())
    
    # 3. 异步处理
    background_tasks.add_task(
        document_service.process_document,
        document_id,
        file_content,
        file.content_type
    )
    
    return DocumentResponse(
        document_id=document_id,
        filename=file.filename,
        status="processing",
        upload_time=datetime.utcnow()
    )

@router.get("/{document_id}/status")
async def get_processing_status(
    document_id: str,
    document_service: DocumentService = Depends(get_document_service)
):
    status = await document_service.get_status(document_id)
    
    if not status:
        raise HTTPException(404, "文档不存在")
    
    return status
🔍 查询 API 实现
# lightrag/api/routes/query.py
@router.post("/query")
async def intelligent_query(
    request: QueryRequest,
    query_service: QueryService = Depends(get_query_service)
):
    # 1. 查询预处理
    processed_query = await query_service.preprocess_query(request.query)
    
    # 2. 查询路由决策
    strategy = await query_service.determine_strategy(processed_query)
    
    # 3. 执行检索
    if strategy == "low_level":
        results = await query_service.low_level_retrieval(processed_query)
    elif strategy == "high_level":
        results = await query_service.high_level_retrieval(processed_query)
    else:
        # 混合检索
        low_results = await query_service.low_level_retrieval(processed_query)
        high_results = await query_service.high_level_retrieval(processed_query)
        results = await query_service.fuse_results(low_results, high_results)
    
    # 4. 结果后处理
    response = await query_service.generate_response(
        query=request.query,
        retrieved_contexts=results,
        max_tokens=request.max_tokens or 1000
    )
    
    return QueryResponse(
        query=request.query,
        answer=response.answer,
        sources=response.sources,
        confidence=response.confidence,
        processing_time=response.processing_time
    )

🧪 API 测试工具

模拟真实的API调用过程

点击上方按钮测试不同的API端点...

🚀 容器化部署架构

🌐 负载均衡层
Nginx Ingress
SSL终端
限流控制
健康检查
⚙️ 应用服务层
API Gateway
文档处理服务
查询服务
图管理服务
向量检索服务
💾 数据存储层
Neo4j集群
PostgreSQL主从
Redis集群
FAISS分片
📊 监控运维层
Prometheus
Grafana
ELK Stack
Jaeger链路追踪
🐳 Docker Compose 配置
# docker-compose.yml
version: '3.8'

services:
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - api-gateway

  api-gateway:
    build: ./services/api-gateway
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:pass@postgres:5432/lightrag
      - REDIS_URL=redis://redis:6379
      - NEO4J_URI=bolt://neo4j:7687
    depends_on:
      - postgres
      - redis
      - neo4j

  document-processor:
    build: ./services/document-processor
    deploy:
      replicas: 3
    environment:
      - CELERY_BROKER=redis://redis:6379
      - MODEL_CACHE_DIR=/models
    volumes:
      - model-cache:/models
      - document-storage:/data/documents

  query-engine:
    build: ./services/query-engine
    deploy:
      replicas: 5
    environment:
      - FAISS_INDEX_PATH=/data/faiss
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    volumes:
      - faiss-index:/data/faiss

  postgres:
    image: postgres:15
    environment:
      - POSTGRES_DB=lightrag
      - POSTGRES_USER=lightrag_user
      - POSTGRES_PASSWORD=${DB_PASSWORD}
    volumes:
      - postgres-data:/var/lib/postgresql/data

  neo4j:
    image: neo4j:5.0-enterprise
    environment:
      - NEO4J_AUTH=neo4j/${NEO4J_PASSWORD}
      - NEO4J_PLUGINS=["apoc"]
    volumes:
      - neo4j-data:/data
      - neo4j-logs:/logs

  redis:
    image: redis:7-alpine
    command: redis-server --appendonly yes
    volumes:
      - redis-data:/data

volumes:
  postgres-data:
  neo4j-data:
  neo4j-logs:
  redis-data:
  faiss-index:
  model-cache:
  document-storage:
☸️ Kubernetes 部署配置
# k8s/api-gateway-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: lightrag-api-gateway
  labels:
    app: lightrag-api-gateway
spec:
  replicas: 3
  selector:
    matchLabels:
      app: lightrag-api-gateway
  template:
    metadata:
      labels:
        app: lightrag-api-gateway
    spec:
      containers:
      - name: api-gateway
        image: lightrag/api-gateway:v2.0.0
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: lightrag-secrets
              key: database-url
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

📊 监控告警系统

🔍 实时系统指标

1,247
每秒查询数
342ms
平均响应时间
94.7%
检索准确率
99.97%
系统可用性
85.3%
缓存命中率
67%
内存使用率

📈 性能趋势图

📊 Prometheus 配置
# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "lightrag_rules.yml"

scrape_configs:
  - job_name: 'lightrag-api'
    static_configs:
      - targets: ['api-gateway:8000']
    metrics_path: /metrics
    scrape_interval: 10s

  - job_name: 'postgres-exporter'
    static_configs:
      - targets: ['postgres-exporter:9187']

  - job_name: 'redis-exporter'
    static_configs:
      - targets: ['redis-exporter:9121']

  - job_name: 'neo4j-exporter'
    static_configs:
      - targets: ['neo4j-exporter:9308']

alerting:
  alertmanagers:
    - static_configs:
        - targets: ['alertmanager:9093']
🚨 告警规则配置
# lightrag_rules.yml
groups:
- name: lightrag.rules
  rules:
  
  # API 响应时间告警
  - alert: HighResponseTime
    expr: avg(api_request_duration_seconds) > 1.0
    for: 2m
    labels:
      severity: warning
    annotations:
      summary: "API响应时间过高"
      description: "平均响应时间超过1秒,当前值: {{ $value }}s"

  # 错误率告警
  - alert: HighErrorRate
    expr: rate(api_requests_total{status=~"5.."}[5m]) / rate(api_requests_total[5m]) > 0.05
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "服务错误率过高"
      description: "5xx错误率超过5%,当前值: {{ $value | humanizePercentage }}"

  # 数据库连接告警
  - alert: DatabaseConnectionHigh
    expr: pg_stat_activity_count > 80
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "数据库连接数过高"
      description: "PostgreSQL活跃连接数: {{ $value }}"

  # 内存使用告警
  - alert: HighMemoryUsage
    expr: (1 - memory_MemAvailable_bytes / memory_MemTotal_bytes) > 0.9
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "内存使用率过高"
      description: "内存使用率: {{ $value | humanizePercentage }}"
📊 业务指标采集
# lightrag/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time

# 定义指标
REQUEST_COUNT = Counter(
    'lightrag_requests_total',
    'Total requests',
    ['method', 'endpoint', 'status']
)

REQUEST_DURATION = Histogram(
    'lightrag_request_duration_seconds',
    'Request duration in seconds',
    ['method', 'endpoint']
)

RETRIEVAL_ACCURACY = Gauge(
    'lightrag_retrieval_accuracy',
    'Retrieval accuracy percentage'
)

ACTIVE_CONNECTIONS = Gauge(
    'lightrag_active_connections',
    'Number of active database connections'
)

CACHE_HIT_RATE = Gauge(
    'lightrag_cache_hit_rate',
    'Cache hit rate percentage'
)

class MetricsCollector:
    def __init__(self):
        self.start_time = time.time()
    
    def record_request(self, method: str, endpoint: str, status: int, duration: float):
        REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
        REQUEST_DURATION.labels(method=method, endpoint=endpoint).observe(duration)
    
    def update_retrieval_accuracy(self, accuracy: float):
        RETRIEVAL_ACCURACY.set(accuracy)
    
    def update_cache_metrics(self, hit_rate: float):
        CACHE_HIT_RATE.set(hit_rate)

# 中间件
async def metrics_middleware(request: Request, call_next):
    start_time = time.time()
    
    response = await call_next(request)
    
    duration = time.time() - start_time
    metrics_collector.record_request(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code,
        duration=duration
    )
    
    return response