RAG Pipeline

Instrument a retrieval-augmented generation pipeline with RETRIEVER, RERANKER, and LLM spans.

A RAG pipeline ("retrieval-augmented generation") answers a question in three steps: retrieve relevant documents, optionally rerank them to keep the best few, then ask the LLM to generate an answer using those documents. This guide adds a span to each step so the dashboard shows you what was retrieved, what was kept, and exactly what the model was given — the usual places a RAG answer goes wrong.

New to spans? Read Your First Trace first.

Setup

import os
import json
import neatlogs

neatlogs.init(
    api_key=os.environ["NEATLOGS_API_KEY"],
    endpoint=os.environ.get("NEATLOGS_ENDPOINT", "https://ingest.neatlogs.com"),
    workflow_name="rag-api",
    instrumentations=["openai", "chromadb"],  # or your vector DB
)

# Import the LLM library AFTER init() so it's auto-instrumented.
from openai import OpenAI

The Pipeline

from neatlogs import SystemPromptTemplate, UserPromptTemplate

rag_template = SystemPromptTemplate([
    {"role": "system", "content": "Answer the question using only the provided context.\n\nContext:\n{{context}}"},
])
user_template = UserPromptTemplate([
    {"role": "user", "content": "{{question}}"},
])


def retrieve(query: str, top_k: int = 5) -> list:
    # If using a supported vector DB (chromadb, qdrant, etc.),
    # this span is created automatically. Only add the trace()
    # block for custom retrieval logic.
    with neatlogs.trace("retrieve", kind="RETRIEVER") as span:
        span.set_attribute("neatlogs.retrieval.query", query)
        span.set_attribute("neatlogs.retrieval.top_k", top_k)
        docs = my_vector_store.search(query, k=top_k)
        span.set_attribute("neatlogs.retrieval.documents", json.dumps(docs))
    return docs


def rerank(query: str, docs: list, top_n: int = 3) -> list:
    with neatlogs.trace("rerank", kind="RERANKER") as span:
        span.set_attribute("neatlogs.reranker.query", query)
        span.set_attribute("neatlogs.reranker.top_k", top_n)
        span.set_attribute("neatlogs.reranker.input_documents", json.dumps(docs))
        reranked = my_reranker.rerank(query, docs, top_n=top_n)
        span.set_attribute("neatlogs.reranker.output_documents", json.dumps(reranked))
    return reranked


@neatlogs.span(kind="CHAIN", name="rag_pipeline")
def rag_pipeline(question: str) -> str:
    docs = retrieve(question, top_k=5)
    reranked = rerank(question, docs, top_n=3)
    context = "\n\n".join(doc["content"] for doc in reranked)

    with neatlogs.trace("generate", kind="LLM",
                        system_prompt_template=rag_template,
                        user_prompt_template=user_template):
        system_msgs = rag_template.compile(context=context)
        user_msgs = user_template.compile(question=question)
        response = OpenAI().chat.completions.create(
            model="gpt-4o",
            messages=system_msgs + user_msgs,
        )
        return response.choices[0].message.content


answer = rag_pipeline("What is the return policy for electronics?")
print(answer)

neatlogs.flush()
neatlogs.shutdown()

What You'll See in the Dashboard

A single trace with a CHAIN span at the top, containing:

  • RETRIEVER span: query, top_k, retrieved documents
  • RERANKER span: input/output documents
  • LLM span: prompt template, variables, token counts, response

If your vector DB is in the supported list (e.g., ChromaDB via instrumentations=["chromadb"]), the RETRIEVER span is created automatically. Remove the manual trace() block in retrieve().

On this page

Ask Neatlogs AI

Answers from the docs

How can I help?

Ask anything about instrumenting, tracing, or the Neatlogs dashboard.