Use this file to discover all available pages before exploring further.
Google Colab
This guide shows you how to create a Retrieval Augmented Generation (RAG) Agent using Couchbase Vectorstore and evaluate performance with Arize AX. Agentic RAG combines RAG with the power of agents.Retrieval Agents are useful when we want to make decisions about whether to retrieve from an index. To implement a Retrieval Agent, we simply need to give an LLM access to a retriever tool.We’ll go through the following steps:
Create a Agentic RAG QA chatbot with OpenAI, Langgraph, Couchbase and Agent Catalog
Trace the agent’s function calls including retrieval and LLM calls using Arize AX
Create a dataset to benchmark performance
Evaluate performance using LLM as a judge
Experiment with different chunk sizes, overlaps, and k number of documents retrieved to see how these affect the performance of the Agentic RAG
Set API KeysTo follow along with this tutorial, you’ll need to sign up for Arize AX and get your Space, API and Developer keys. You can see the guide here. You will also need an OpenAI key.
import osfrom getpass import getpass# Enter here or set as environment variablesSPACE_ID = globals().get("SPACE_ID") or getpass( "🔑 Enter your Arize AX Space ID: ")API_KEY = globals().get("API_KEY") or getpass("🔑 Enter your Arize AX API Key: ")OPENAI_API_KEY = globals().get("OPENAI_API_KEY") or getpass( "🔑 Enter your OpenAI API key: ")os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
Set up Arize AX Tracing
from arize.otel import register# Setup tracer provider via our convenience functiontracer_provider = register( space_id = SPACE_ID, # api_key = API_KEY, project_name = "langgraph-agentic-rag", # name this to whatever you would like)# Import the automatic instrumentor from OpenInferencefrom openinference.instrumentation.langchain import LangChainInstrumentor# Finish automatic instrumentationLangChainInstrumentor().instrument(tracer_provider=tracer_provider)
from datetime import timedeltafrom couchbase.auth import PasswordAuthenticatorfrom couchbase.cluster import Clusterfrom couchbase.options import ClusterOptionsfrom langchain_couchbase.vectorstores import CouchbaseVectorStorefrom langchain_huggingface import HuggingFaceEmbeddings#Cluster settingsCB_CONN_STRING = getpass("Enter the connection string for the Couchbase cluster: ")CB_USERNAME = getpass("Enter the username for the Couchbase cluster: ")CB_PASSWORD = getpass("Enter the password for the Couchbase cluster: ")BUCKET_NAME = "" #enter your bucket nameSCOPE_NAME = "" #enter your scope nameCOLLECTION_NAME = "" #enter your collection nameSEARCH_INDEX_NAME = "" #enter your search index name#connect to couchbase clusterauth = PasswordAuthenticator(CB_USERNAME, CB_PASSWORD)options = ClusterOptions(auth)options.apply_profile("wan_development")cluster = Cluster(CB_CONN_STRING, options)cluster.wait_until_ready(timedelta(seconds=5))#Initialize vector storeembeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L12-v2")vector_store = CouchbaseVectorStore( cluster=cluster, bucket_name=BUCKET_NAME, scope_name=SCOPE_NAME, collection_name=COLLECTION_NAME, embedding=embeddings, index_name=SEARCH_INDEX_NAME,)
Since we will test multiple runs, we create a convenience function that will reset the vector store with new different chunk sizes and overlaps. Documents content will be sourced from 3 blog posts by Lilian Weng.
from langchain_community.document_loaders import WebBaseLoaderfrom langchain_text_splitters import RecursiveCharacterTextSplitter# Define the reset_vector_store function so we can run experiments with different chunk sizesdef reset_vector_store(vector_store, chunk_size=1024, chunk_overlap=20): try: results = vector_store.similarity_search( k=1000, query="", # Use an empty query or a specific one if needed search_options={ "query": {"field": "metadata.source", "match": "lilian_weng_blog"} }, ) if results: deleted_ids = [] for result in results: deleted_ids.append(result.id) vector_store.delete(ids=deleted_ids) # Load documents from a URL or file urls = [ "https://lilianweng.github.io/posts/2024-07-07-hallucination/", "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/", "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/", ] docs = [WebBaseLoader(url).load() for url in urls] docs_list = [item for sublist in docs for item in sublist] # Use RecursiveCharacterTextSplitter text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, separators=["\n\n", "\n", " ", ""], # Hierarchical separators ) doc_splits = text_splitter.split_documents(docs_list) # Adding metadata to documents for i, doc in enumerate(doc_splits): doc.metadata["source"] = "lilian_weng_blog" try: vector_store.add_documents(doc_splits) except ValueError as e: print(f"Failed to insert documents: {e}") return vector_store except ValueError as e: print(f"Search failed with error: {e}")# Reset the vector storereset_vector_store(vector_store)
Fetch our retriever tool from the Agent Catalog using the agentc provider. In the future, when more tools (and/or prompts) are required and the application grows more complex, Agent Catalog SDK and CLI can be used to automatically fetch the tools based on the use case (semantic search) or by name.For instructions on how this tool was created and more capabilities of Agent catalog, please refer to the documentation here.
import agentc.langchainimport agentcfrom langchain_core.tools import toolimport os# For retrieval from the local catalogprovider = agentc.Provider( decorator=lambda t: tool(t.func))# In case the tools were published to the Couchbase cluster beforehand# provider = agentc.Provider(# decorator=lambda t: tool(t.func),# secrets={"CB_USERNAME": CB_USERNAME,# "CB_PASSWORD": CB_PASSWORD,# "CB_CONN_STRING": CB_CONN_STRING})# This is the tool that will be used to retrieve documents from the vector storeretriever_tool = provider.get_item(name="retriever_tool", item_type="tool")tools = retriever_toolprint (retriever_tool)
We will define a graph of agents to help all involved agents communicate with each other better.
Agents communicate through a state object that is passed around to each node and modified with output from that node.Our state will be a list of messages and each node in our graph will append to it.
from typing import Annotated, Sequence, TypedDictfrom langchain_core.messages import BaseMessagefrom langgraph.graph.message import add_messagesclass AgentState(TypedDict): # The add_messages function defines how an update should be processed # Default is to replace. add_messages says "append" messages: Annotated[Sequence[BaseMessage], add_messages]
from typing import Annotated, Literal, Sequence, TypedDictfrom langchain import hubfrom langchain_core.messages import BaseMessage, HumanMessagefrom langchain_core.output_parsers import StrOutputParserfrom langchain_core.prompts import PromptTemplatefrom langchain_core.pydantic_v1 import BaseModel, Fieldfrom langchain_openai import ChatOpenAIfrom langgraph.prebuilt import tools_condition### Edgesdef grade_documents(state) -> Literal["generate", "rewrite"]: """ Determines whether the retrieved documents are relevant to the question. Args: state (messages): The current state Returns: str: A decision for whether the documents are relevant or not """ print("---CHECK RELEVANCE---") # Data model class grade(BaseModel): """Binary score for relevance check.""" binary_score: str = Field(description="Relevance score 'yes' or 'no'") # LLM model = ChatOpenAI(temperature=0, model="gpt-4o", streaming=True) # LLM with tool and validation llm_with_tool = model.with_structured_output(grade) #fetch a prompt called "grade_documents" from the Agent Catalog grade_documents_prompt = PromptTemplate( template=provider.get_item(name="grade_documents", item_type="prompt").prompt.render(), input_variables=["context", "question"], ) print (grade_documents_prompt) # Chain chain = grade_documents_prompt | llm_with_tool messages = state["messages"] last_message = messages[-1] question = messages[0].content docs = last_message.content scored_result = chain.invoke({"question": question, "context": docs}) score = scored_result.binary_score if score == "yes": print("---DECISION: DOCS RELEVANT---") return "generate" else: print("---DECISION: DOCS NOT RELEVANT---") print(score) return "rewrite"### Nodesdef agent(state): """ Invokes the agent model to generate a response based on the current state. Given the question, it will decide to retrieve using the retriever tool, or simply end. Args: state (messages): The current state Returns: dict: The updated state with the agent response appended to messages """ print("---CALL AGENT---") messages = state["messages"] model = ChatOpenAI(temperature=0, streaming=True, model="gpt-4-turbo") model = model.bind_tools(tools) response = model.invoke(messages) # We return a list, because this will get added to the existing list return {"messages": [response]}def rewrite(state): """ Transform the query to produce a better question. Args: state (messages): The current state Returns: dict: The updated state with re-phrased question """ print("---TRANSFORM QUERY---") messages = state["messages"] question = messages[0].content msg = [ HumanMessage( content=f""" \n Look at the input and try to reason about the underlying semantic intent / meaning. \n Here is the initial question: \n ------- \n {question} \n ------- \n Formulate an improved question: """, ) ] # Grader model = ChatOpenAI(temperature=0, model="gpt-4-0125-preview", streaming=True) response = model.invoke(msg) return {"messages": [response]}def generate(state): """ Generate answer Args: state (messages): The current state Returns: dict: The updated state with re-phrased question """ print("---GENERATE---") messages = state["messages"] question = messages[0].content last_message = messages[-1] docs = last_message.content # Prompt prompt = hub.pull("rlm/rag-prompt") # LLM llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0, streaming=True) # Post-processing def format_docs(docs): return "\n\n".join(doc.page_content for doc in docs) # Chain rag_chain = prompt | llm | StrOutputParser() # Run response = rag_chain.invoke({"context": docs, "question": question}) return {"messages": [response]}print("*" * 20 + "Prompt[rlm/rag-prompt]" + "*" * 20)prompt = hub.pull("rlm/rag-prompt") # Show what the prompt looks likeprompt.pretty_print()
Then call agent with the tool output added to messages (state)
from langgraph.graph import END, StateGraph, STARTfrom langgraph.prebuilt import ToolNode# Define a new graphworkflow = StateGraph(AgentState)# Define the nodes we will cycle betweenworkflow.add_node("agent", agent) # agentretrieve = ToolNode(retriever_tool)workflow.add_node("retrieve", retrieve) # retrievalworkflow.add_node("rewrite", rewrite) # Re-writing the questionworkflow.add_node( "generate", generate) # Generating a response after we know the documents are relevant# Call agent node to decide to retrieve or notworkflow.add_edge(START, "agent")# Decide whether to retrieveworkflow.add_conditional_edges( "agent", # Assess agent decision tools_condition, { # Translate the condition outputs to nodes in our graph "tools": "retrieve", END: END, },)# Edges taken after the `action` node is called.workflow.add_conditional_edges( "retrieve", # Assess agent decision grade_documents,)workflow.add_edge("generate", END)workflow.add_edge("rewrite", "agent")# Compilegraph = workflow.compile()
from IPython.display import Image, displaytry: display(Image(graph.get_graph(xray=True).draw_mermaid_png()))except Exception: # This requires some extra dependencies and is optional pass
import pprintinputs = { "messages": [ ("user", "What does Lilian Weng say about the types of adversarial attacks on LLMs?"), ]}for output in graph.stream(inputs): for key, value in output.items(): pprint.pprint(f"Output from node '{key}':") pprint.pprint(value, indent=2, width=80, depth=None)
Once you’ve run a single query, you can see the trace in the Arize AX UI with each step taken by the retriever, the embedding, and the LLM query.Click through the queries to better understand how the query engine is performing. Arize AX can be used to understand and troubleshoot your RAG app by surfacing:
We will run our Agent against the dataset of questions we generate, and then evaluate the results.
import pandas as pdfrom langchain import hubfrom langchain_openai import ChatOpenAI# Define a template for generating questionsGEN_TEMPLATE = """You are an assistant that generates Q&A questions about the content below.The questions should involve the content, specific facts and figures,names, and elements of the story. Do not ask any questions where the answer isnot in the content.Respond with one question per line. Do not include any numbering at the beginning of each line. Do not include any category headings.Generate 10 questions. Be sure there are no duplicate questions.[START CONTENT]{content}[END CONTENT]"""# Load the content you want to generate questions aboutcontent = """Lilian Weng discusses various aspects of adversarial attacks on LLMs and prompt engineering techniques. Make sure to use Lilian Weng's name in the questions."""# Format the template with the contentformatted_template = GEN_TEMPLATE.format(content=content)# Initialize the language modelmodel = ChatOpenAI(model="gpt-4o", max_tokens=1300)# Generate questions using the language modelresponse = model.invoke(formatted_template)# Extract the content from the response objectquestions_content = response.content # Directly access the content attribute# Split the response into individual questionsquestions = questions_content.strip().split("\n")# Create a dataframe to store the questionsquestions_df = pd.DataFrame(questions, columns=["input"])# Display the first few questionsquestions_df.head()
Run our Agent against the list of generated questions
def run_rag(questions_df, k_value=2): response_df = questions_df.copy(deep=True) for index, row in response_df.iterrows(): inputs = { "messages": [ ("user", f"{row['input']}"), ] } for output in graph.stream(inputs): for key, value in output.items(): if key == "generate": response_df.loc[index, "output"] = value["messages"][-1] if key == "retrieve": response_df.loc[index, "reference"] = value["messages"][-1].content text_columns = ["input", "output", "reference"] response_df[text_columns] = response_df[text_columns].apply( lambda x: x.astype(str) ) return response_dfresponse_df = run_rag(questions_df, k_value=1)
Now that we have run a set of test cases, we can create evaluators to measure performance of our run. This way, we don’t have to manually inspect every single trace to see if the LLM is doing the right thing. First, we’ll define the prompts for the evaluators.There are two evaluators we will use for this example.
Retrieval Relevance: This evaluator checks if the reference text selected by the retriever is relevant to the question.
QA Correctness: This evaluator checks if the answer correctly answers the question based on the reference text provided.
(For more information on these and other prebuilt evaluators see here.)We will be creating an LLM as a judge using prebuilt prompt templates, taking the spans recorded by Phoenix, and then giving them labels using the llm_classify function. This function uses LLMs to evaluate your LLM calls and gives them labels and explanations. You can read more detail here.
from phoenix.evals import ( RAG_RELEVANCY_PROMPT_RAILS_MAP, RAG_RELEVANCY_PROMPT_TEMPLATE, QA_PROMPT_RAILS_MAP, QA_PROMPT_TEMPLATE, OpenAIModel, llm_classify)# The rails is used to hold the output to specific values based on the templateRELEVANCE_RAILS = list(RAG_RELEVANCY_PROMPT_RAILS_MAP.values())QA_RAILS = list(QA_PROMPT_RAILS_MAP.values())relevance_eval_df = llm_classify( dataframe=response_df, template=RAG_RELEVANCY_PROMPT_TEMPLATE, model=OpenAIModel(model="gpt-4o"), rails=RELEVANCE_RAILS, provide_explanation=True, include_prompt=True, concurrency=4,)correctness_eval_df = llm_classify( dataframe=response_df, template=QA_PROMPT_TEMPLATE, model=OpenAIModel(model="gpt-4o"), rails=QA_RAILS, provide_explanation=True, include_prompt=True, concurrency=4,)
Let’s look at and inspect the results of our evaluatiion!
Experiment with different k-values and chunk sizes
Re-run experiments with different k-values and chunk sizes. Then log the results to Arize AX to see how the performance changes.Let’s setup our evaluators to see how the performance changes.