This guide demonstrates how to instrument your audio application to send events and traces to Arize AX, allowing you to capture key events from the OpenAI Realtime API’s WebSocket and convert them into spans that provide meaningful insights into your system’s behavior.
Instrumenting your audio application to send events and traces to Arize AX involves capturing key events from the OpenAI Realtime API’s WebSocket and converting them into spans that provide meaningful insights into your system’s behavior.
We have identified the following events from OpenAI Realtime API’s WebSocket as the most valuable for LLM observability. While there are many other events, the majority of useful information can be captured by listening for these events:
Session Events
session.created: Indicates the creation of a new session.
session.updated: Denotes updates to the session’s parameters or state.
Audio Input Events
input_audio_buffer.speech_started: Signals the start of speech input.
input_audio_buffer.speech_stopped: Indicates the end of speech input.
input_audio_buffer.committed: Confirms that the audio input buffer has been committed for processing.
Conversation Events
conversation.item.created: Represents the creation of a new conversation item, such as a user message.
Response Events
response.audio_transcript.delta: Provides incremental transcripts of the audio response.
response.audio_transcript.done: Indicates the completion of the audio transcript.
response.done: Marks the completion of the response generation.
response.audio.delta : Represents the output audio bytes
Error Events
error: Conveys any errors encountered during processing.
Session Creation: When receiving a session.created event, start a parent span to represent the session lifecycle.
Python
TS/JS
Java
Copy
Ask AI
if event.get("type") == "session.created": with tracer.start_as_current_span("session.lifecycle") as parent_span: parent_span.set_attribute("session.id", event["session"]["id"]) log_event("Session Created", f"Session ID: {event['session']['id']}")
Response Handling: Log output audio transcripts and set response attributes.
Python
TS/JS
Java
Copy
Ask AI
if event.get("type") == "response.audio_transcript.done": transcript = event.get("transcript", "") with tracer.start_as_current_span("Audio Output") as span: span.set_attribute("output.audio.transcript", transcript)
Tool Calls and Nested Spans: For response.function_call_arguments.done, create nested spans to track tool invocations.
When processing tool calls, you may need to extract attributes and metadata about the tools and set them in spans for observability. Below is an example implementation for processing tools within a session update event. This is just one example and can be adapted for your specific use case.
Python
TS/JS
Java
Copy
Ask AI
def process_tools(session_update_event, _span): """ Process tools in the session update event and set their attributes. """ tools = session_update_event["session"].get("tools", []) for i, tool in enumerate(tools): tool_name = tool.get("name", f"tool_{i}") tool_type = tool.get("type", "unknown") tool_description = tool.get("description", "") tool_parameters = tool.get("parameters", {}) # Create a JSON schema-like attribute for the tool tool_json_schema = json.dumps(tool_parameters) # Set tool attributes in the span _span.set_attribute(f"llm.tools.{i}.tool.name", tool_name) _span.set_attribute(f"llm.tools.{i}.tool.type", tool_type) _span.set_attribute(f"llm.tools.{i}.tool.description", tool_description) _span.set_attribute(f"llm.tools.{i}.tool.json_schema", tool_json_schema) # Log the tool processing log_event( "Tool Processed", f"Processed tool {tool_name}: Type={tool_type}, Description={tool_description}", )
public void processTools(Map<String, Object> sessionUpdateEvent, Span parentSpan) { List<Map<String, Object>> tools = (List<Map<String, Object>>) sessionUpdateEvent.get("session.tools"); if (tools != null) { for (int i = 0; i < tools.size(); i++) { Map<String, Object> tool = tools.get(i); String toolName = tool.getOrDefault("name", "tool_" + i).toString(); String toolType = tool.getOrDefault("type", "unknown").toString(); String toolDescription = tool.getOrDefault("description", "").toString(); String toolJsonSchema = new Gson().toJson(tool.get("parameters")); parentSpan.setAttribute("llm.tools." + i + ".tool.name", toolName); parentSpan.setAttribute("llm.tools." + i + ".tool.type", toolType); parentSpan.setAttribute("llm.tools." + i + ".tool.description", toolDescription); parentSpan.setAttribute("llm.tools." + i + ".tool.json_schema", toolJsonSchema); logEvent("Tool Processed", "Processed tool " + toolName + ": Type=" + toolType + ", Description=" + toolDescription); } }}
Adding URLs: Add input and output audio URLs to the span whenever they become available.
When working with URLs, you may need to save audio files or other data to a storage service like Google Cloud Storage (GCS). Below is an example implementation for GCS. Please note, this is just one example, and you may need to adjust the code for your specific storage solution. See our integrations page, for more info on granting access to your files for other providers.
Python
TS/JS
Java
Copy
Ask AI
def upload_to_gcs(file_path, bucket_name, destination_blob_name, make_public=False): """Uploads a file to Google Cloud Storage.""" try: storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(destination_blob_name) blob.upload_from_filename(file_path) if make_public: blob.make_public() return blob.public_url else: return destination_blob_name except Exception as e: raise RuntimeError(f"Failed to upload {file_path} to GCS: {e}")def process_audio_and_upload(pcm16_audio, span): """Processes audio, saves as WAV, uploads to GCS, and cleans up.""" timestamp = time.strftime("%Y%m%d_%H%M%S") file_name = f"audio_{timestamp}.wav" file_path = file_name bucket_name = "jz999" try: save_audio_to_wav(pcm16_audio, file_path) gcs_url = upload_to_gcs(file_path, bucket_name, f"sallyann/audio/{file_name}") span.set_attribute("input.audio.url", gcs_url) finally: if os.path.exists(file_path): os.remove(file_path) return gcs_url
Copy
Ask AI
import { Storage } from '@google-cloud/storage';import { Span, trace } from '@opentelemetry/api';import * as fs from 'fs';import * as path from 'path';/** * Uploads a file to Google Cloud Storage and returns the URL. * @param filePath - The local path to the file to upload. * @param bucketName - The GCS bucket name. * @param destinationBlobName - The destination path in the GCS bucket. * @param makePublic - Whether to make the file public. */async function uploadToGcs( filePath: string, bucketName: string, destinationBlobName: string, makePublic: boolean = false): Promise<string> { const storage = new Storage(); const bucket = storage.bucket(bucketName); const blob = bucket.file(destinationBlobName); try { // Upload the file to the specified bucket await bucket.upload(filePath, { destination: destinationBlobName, }); if (makePublic) { // Make the file public if requested await blob.makePublic(); return blob.publicUrl(); } else { return `gs://${bucketName}/${destinationBlobName}`; } } catch (error) { throw new Error(`Failed to upload ${filePath} to GCS: ${error.message}`); }}/** * Processes PCM16 audio data, converts it to WAV, uploads it to GCS, and sets the URL in the span. * @param pcm16Audio - The audio data in PCM16 format. * @param span - The current tracing span. */async function processAudioAndUpload(pcm16Audio: Buffer, span: Span): Promise<string> { const timestamp = new Date().toISOString().replace(/[-:.]/g, '_'); const fileName = `audio_${timestamp}.wav`; const filePath = path.join(__dirname, fileName); const bucketName = 'jz999'; const destinationBlobName = `sallyann/audio/${fileName}`; try { // Save audio as a WAV file locally await saveAudioToWav(pcm16Audio, filePath); // Upload the file to GCS const gcsUrl = await uploadToGcs(filePath, bucketName, destinationBlobName, true); // Set the GCS URL as a span attribute span.setAttribute('input.audio.url', gcsUrl); return gcsUrl; } finally { // Clean up the local file after upload if (fs.existsSync(filePath)) { fs.unlinkSync(filePath); } }}/** * Converts PCM16 audio data into a WAV file and saves it locally. * @param pcm16Audio - The audio data in PCM16 format. * @param outputPath - The path to save the WAV file. */async function saveAudioToWav(pcm16Audio: Buffer, outputPath: string): Promise<void> { // Implement WAV file conversion logic here // For demonstration, we assume the audio buffer is directly saved as a WAV fs.writeFileSync(outputPath, pcm16Audio);}
While this guide provides a framework for instrumentation, tailor the implementation to fit your application’s architecture. Ensure that your instrumentation captures the specified key events to provide comprehensive observability into your application’s interactions with the OpenAI Realtime API.
Currently, voice evaluations are supported exclusively with OpenAI models. Support for additional models is planned and will be available soon.
This guide provides instructions on how to evaluate voice applications using OpenAI models within the Phoenix framework. The example notebook linked below demonstrates the process of configuring and running evaluations.
Phoenix Installation: Make sure the phoenix package is installed in your Python environment.
OpenAI API Key: Obtain an API key for the OpenAI model you plan to use.
Audio Data: Prepare the audio data required for evaluation. This can be in the form of raw audio bytes, base64-encoded strings, or URLs pointing to audio files. If you have existing data in Arize AX, you can use our export client to retrieve it.
Python Environment: Ensure you are using Python version 3.7 or higher.
Templates are used to configure prompts sent to the OpenAI model, ensuring that the task is clearly defined and the model’s responses are constrained to valid outputs. Templates consist of rails (the set of valid responses) and a sequence of prompt parts that define the type and content of the input or instructions.In addition to custom templates, we offer an out-of-the-box template for emotion detection. This template streamlines setup, allowing you to start classifying audio with minimal configuration.Below is an example template for tone classification.
Copy
Ask AI
from phoenix.evals.templates import ( ClassificationTemplate, PromptPartContentType, PromptPartTemplate,)# Define valid classification labels (rails)TONE_EMOTION_RAILS = ["positive", "neutral", "negative"]# Create the classification templatetemplate = ClassificationTemplate( rails=TONE_EMOTION_RAILS, # Specify the valid output labels template=[ # Prompt part 1: Task description PromptPartTemplate( content_type=PromptPartContentType.TEXT, template=""" You are a helpful AI bot that checks for the tone of the audio. Analyze the audio file and determine the tone (e.g., positive, neutral, negative). Your evaluation should provide a multiclass label from the following options: ['positive', 'neutral', 'negative']. Here is the audio: """, ), # Prompt part 2: Insert the audio data PromptPartTemplate( content_type=PromptPartContentType.AUDIO, template="{audio}", # Placeholder for the audio content ), # Prompt part 3: Define the response format PromptPartTemplate( content_type=PromptPartContentType.TEXT, template=""" Your response must be a string, either positive, neutral, or negative, and should not contain any text or characters aside from that. """, ), ],)
How It Works
Prompt Parts
Part 1: Provides task instructions and specifies valid response labels.
Part 2: Dynamically inserts the audio data for analysis using the placeholder. You’ll want to ensure that the prompt variable you choose corresponds to the column that holds your base64-encoded audio data.
Part 3: Ensures the model outputs a response in the desired format (a single-word string: positive, neutral, or negative).
Rails
Rails define the set of valid outputs for the classification task: ["positive", "neutral", "negative"].
Any response outside this set can be flagged as invalid.
This modular approach ensures flexibility, allowing you to reuse and adapt templates for different use cases or models.
If you are evaluating text (e.g., a transcript) instead of audio, you can directly use a string prompt without needing dynamic placeholders.
Using a data processor with Phoenix enables parallel processing of your audio data, improving efficiency and scalability. A data processor is responsible for transforming raw audio data into base64-encoded strings, which can then be utilized by your models.Processor RequirementsTo ensure compatibility with Phoenix, your data processor must meet the following criteria:
Consistent Input and Output Types
The input and output of the processor must maintain the same type.
For example: If you are processing a DataFrame, the input would be a series (a single row), and the output would also be a series (updated row with the encoded audio).
Audio Link Processing
The processor must fetch audio from a provided link (either from cloud storage or local storage) and produce a base64-encoded string.
Column Assignment Consistency
The encoded string must be assigned to the same column referenced in your prompt template.
For example, if you are using the EMOTION_AUDIO_TEMPLATE, the base64-encoded audio string should be assigned to the "audio" column.
Example: Fetching and Encoding Audio from Google Cloud StorageBelow is an example data processor that demonstrates how to fetch audio from Google Cloud Storage, encode it as a base64 string, and assign it to the appropriate column in the dataframe:
Copy
Ask AI
async def async_fetch_gcloud_data(row: pd.Series) -> pd.Series: """ Fetches data from a Google Cloud Storage URL and returns the content as a base64-encoded string. """ token = None try: # Execute the gcloud command to fetch the access token output = await asyncio.create_subprocess_exec( "gcloud", "auth", "print-access-token", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await output.communicate() if output.returncode != 0: raise RuntimeError(f"Error executing gcloud command: {stderr.decode('UTF-8').strip()}") token = stdout.decode("UTF-8").strip() # Ensure the token is not empty or None if not token: raise ValueError("Failed to retrieve a valid access token. Token is empty.") except Exception as e: # Catch any other exceptions and re-raise them with additional context raise RuntimeError(f"An unexpected error occurred: {str(e)}") # Set the token in the header gcloud_header = {"Authorization": f"Bearer {token}"} # Must ensure that the url begins with storage.googleapis..., rather than store.cloud.google... url = row["attributes.input.audio.url"] G_API_HOST = "https://storage.googleapis.com/" not_googleapis = url.startswith("https://storage.cloud.google.com/") or url.startswith("gs://") g_api_url = ( url.replace("https://storage.cloud.google.com/", G_API_HOST) if url and not_googleapis else url ) # Get a response back, present the status async with aiohttp.ClientSession() as session: async with session.get(g_api_url, headers=gcloud_header) as response: response.raise_for_status() content = await response.read() encoded_string = base64.b64encode(content).decode("utf-8") row["audio"] = encoded_string return row
If your audio data is already in base64 format as an encoded string, you can skip this step.
To run an evaluation, use the llm_classify function. This function accepts a DataFrame, a list of audio URLs, or raw audio bytes as input. In the example below, data is exported directly from Arize AX to perform the evaluation.
Copy
Ask AI
from phoenix.evals.classify import llm_classifyimport pandas as pd# Example DataFramedf = client.export_model_to_df( space_id='SPACE_ID', model_id='PROJECT_NAME', environment=Environments.TRACING, start_time=datetime.fromisoformat('2024-12-23T07:00:00.000+00:00'), end_time=datetime.fromisoformat('2024-12-31T06:59:59.999+00:00'), )# Run the evaluationresults = llm_classify( model=model, data=df, data_processor=async_fetch_gcloud_data, template=EMOTION_PROMPT_TEMPLATE, rails=EMOTION_AUDIO_RAILS, provide_explanation=True, )