RAG Document Pipeline

AI Agent
8 nodes · 6 edgesai agent
ex-rag-pipeline.osop.yaml
# RAG (Retrieval-Augmented Generation) Pipeline
# Ingest documents, chunk, embed, store, then query and generate answers.
osop_version: "2.0"
id: rag-pipeline
name: RAG Document Pipeline

nodes:
  - id: upload_docs
    type: api
    purpose: Accept document uploads via REST endpoint
    runtime:
      endpoint: /api/v1/documents
      method: POST
      url: https://docs-service.internal
    outputs:
      - raw_documents

  - id: chunk_documents
    type: cli
    purpose: Split documents into overlapping chunks for embedding
    runtime:
      command: "python chunk.py --overlap 128 --size 512 --input ${raw_documents}"
    inputs:
      - raw_documents
    outputs:
      - chunks
    explain: "Parallel chunking across documents for throughput."

  - id: embed_chunks
    type: agent
    purpose: Generate vector embeddings for each chunk
    runtime:
      provider: openai
      model: text-embedding-3-large
    inputs:
      - chunks
    outputs:
      - embeddings
    retry_policy:
      max_retries: 3
      backoff_sec: 2
    timeout_sec: 90

  - id: store_vectors
    type: db
    purpose: Store embeddings in vector database for similarity search
    runtime:
      engine: pgvector
      connection: postgresql://vec:5432/embeddings
    inputs:
      - embeddings
    outputs:
      - store_confirmation

  - id: query_input
    type: api
    purpose: Receive user query via search endpoint
    runtime:
      endpoint: /api/v1/search
      method: POST
      url: https://docs-service.internal
    outputs:
      - user_query

  - id: retrieve_context
    type: db
    purpose: Retrieve top-k relevant chunks using cosine similarity
    runtime:
      engine: pgvector
      connection: postgresql://vec:5432/embeddings
    inputs:
      - user_query
    outputs:
      - retrieved_chunks

  - id: generate_answer
    type: agent
    purpose: Generate grounded answer from retrieved context and user query
    runtime:
      provider: anthropic
      model: claude-sonnet-4-20250514
    inputs:
      - user_query
      - retrieved_chunks
    outputs:
      - answer
    timeout_sec: 30

  - id: evaluate_answer
    type: cli
    purpose: Score answer for faithfulness and relevance using RAGAS metrics
    runtime:
      command: "python evaluate.py --metrics faithfulness,relevance"
    inputs:
      - answer
      - retrieved_chunks
    outputs:
      - eval_scores

edges:
  # Ingestion pipeline
  - from: upload_docs
    to: chunk_documents
    mode: sequential

  - from: chunk_documents
    to: embed_chunks
    mode: parallel
    explain: "Each chunk is embedded independently in parallel."

  - from: embed_chunks
    to: store_vectors
    mode: sequential

  # Query pipeline
  - from: query_input
    to: retrieve_context
    mode: sequential

  - from: retrieve_context
    to: generate_answer
    mode: sequential

  - from: generate_answer
    to: evaluate_answer
    mode: sequential