Skip to main content

Integration quickstart

Integrating Agora's real-time audio communication capabilities with OpenAI's language models enables dynamic, conversational AI experiences. This guide shows you how to set up a Python project that combines Agora's server-side Voice SDK with OpenAI's API to create an interactive, voice-driven assistant.

Understand the tech

The RealtimeKitAgent class manages the integration by connecting to an Agora channel for real-time audio streaming and to OpenAI's API for processing audio input and generating AI-driven responses. Audio frames captured from the Agora channel are streamed to OpenAI's API, where the AI processes the input. The API responses, which include transcribed text and synthesized voice output, are then delivered back to the Agora channel.

The code sets up tools that can be executed locally or passed through the API. This allows the AI to perform specific tasks, such as retrieving data from external sources. The agent processes various message types from OpenAI, such as audio responses, transcription updates, and error messages, and sends them to users through the Agora audio channel, facilitating continuous interaction.

The following figure illustrates the integration topology:

Prerequisites

Set up the project

This guide walks you through the core elements of the Agora Conversational AI Demo integrating Agora's Python SDK with OpenAI's Realtime API:

  1. Download the Agora Conversational AI Demo code.

  2. The project is structured as follows:


    _22
    /realtime_agent
    _22
    ├── __init__.py
    _22
    ├── agent.py
    _22
    ├── agora
    _22
    │   ├── __init__.py
    _22
    │   ├── requirements.txt
    _22
    │   ├── rtc.py
    _22
    │   └── token_builder
    _22
    │   ├── AccessToken2.py
    _22
    │   ├── Packer.py
    _22
    │   ├── RtcTokenBuilder2.py
    _22
    │   └── realtimekit_token_builder.py
    _22
    ├── parse_args.py
    _22
    └── realtimeapi
    _22
    ├── __init__.py
    _22
    ├── call_tool.py
    _22
    ├── client.py
    _22
    ├── messages.py
    _22
    ├── mic_to_websocket.py
    _22
    ├── push_to_talk.py
    _22
    ├── send_audio_to_websocket.py
    _22
    └── util.py

    Note

    This project uses the OpenAI realtimeapi-examples package. Download the project and unzip it into your realtime-agent folder.

    Overview of key files:

    • agent.py: The primary script responsible for executing the RealtimeKitAgent. It integrates Agora's functionality from the rtc.py module and OpenAI's capabilities from the realtimeapi package.
    • rtc.py: Contains an implementation of the server-side Agora Python Voice SDK.
    • parse_args.py: Handles command-line argument parsing for the application.
    • realtimeapi/: Contains the classes and methods that interact with OpenAI's Realtime API.
  3. Create the .env file by copying the .env.example in the root of the repo


    _1
    cp .env.example .env

  4. Fill in the values for the environment variables:


    _6
    # Agora RTC app ID
    _6
    AGORA_APP_ID=
    _6
    AGORA_APP_CERT=
    _6
    _6
    # OpenAI API key for authentication
    _6
    OPENAI_API_KEY=

  5. Create a virtual environment and activate it:


    _1
    python3 -m venv venv && source venv/bin/activate

  6. Install the required dependencies:


    _1
    pip install -r requirements.txt

  7. Run the demo server:


    _1
    python -m realtime_agent.agent --channel_name=<channel_name> --uid=<agent_uid>

Implementation

The RealtimeKitAgent class integrates Agora's audio communication capabilities with OpenAI's AI services. This class manages audio streams, handles communication with the OpenAI API, and processes AI-generated responses, providing a seamless conversational AI experience.

Connect to Agora and OpenAI

The setup_and_run_agent method sets up the RealtimeKitAgent by connecting to an Agora channel using the provided RtcEngine and initializing a session with the OpenAI Realtime API client. It sends configuration messages to set up the session and define conversation parameters, such as the system message and output audio format, before starting the agent's operations. The method uses asynchronous execution to handle both listening for the session start and sending conversation configuration updates concurrently. It ensures that the connection is properly managed and cleaned up after use, even in cases of exceptions, early exits, or shutdowns.

Note

UIDs in the Python SDK are set using a string value. Agora recommends using only numerical values for UID strings to ensure compatibility with all Agora products and extensions.


_62
@classmethod
_62
async def setup_and_run_agent(
_62
cls,
_62
*,
_62
engine: RtcEngine,
_62
options: RtcOptions,
_62
inference_config: InferenceConfig,
_62
tools: ToolContext | None,
_62
) -> None:
_62
# Create and connect to an Agora channel
_62
channel = engine.create_channel(options)
_62
await channel.connect()
_62
_62
try:
_62
# Initialize the OpenAI Realtime API client
_62
async with RealtimeApiClient(
_62
base_uri="wss://api.openai.com",
_62
api_key=os.getenv("OPENAI_API_KEY"),
_62
verbose=False,
_62
) as client:
_62
# Update the session configuration
_62
await client.send_message(
_62
messages.SessionUpdate(
_62
session=messages.SessionUpdateParams(
_62
turn_detection=inference_config.turn_detection,
_62
tools=tools.model_description() if tools else None,
_62
tool_choice="auto",
_62
instructions=inference_config.system_message,
_62
)
_62
)
_62
)
_62
_62
# Concurrently wait for the session to start and update the conversation config
_62
[start_session_message, _] = await asyncio.gather(
_62
*[
_62
anext(client.listen()),
_62
client.send_message(
_62
messages.UpdateConversationConfig(
_62
system_message=inference_config.system_message,
_62
output_audio_format=messages.AudioFormats.PCM16,
_62
voice=inference_config.voice,
_62
tools=tools.model_description() if tools else None,
_62
transcribe_input=False,
_62
)
_62
),
_62
]
_62
)
_62
logger.info(
_62
f"Session started: {start_session_message.session.id} model: {start_session_message.session.model}"
_62
)
_62
_62
# Create and run the RealtimeKitAgent
_62
agent = cls(
_62
client=client,
_62
tools=tools,
_62
channel=channel,
_62
)
_62
await agent.run()
_62
_62
finally:
_62
# Ensure the Agora engine is destroyed, even if an exception occurs
_62
engine.destroy()

Initialize the RealtimeKitAgent

The RealtimeKitAgent class constructor accepts an OpenAI RealtimeApiClient, an optional ToolContext for function registration, and an Agora channel for managing audio communication. This setup initializes the agent to process audio streams, register tools (if provided), and interacts with the AI model.


_12
def __init__(
_12
self,
_12
*,
_12
client: RealtimeApiClient,
_12
tools: ToolContext | None,
_12
channel: Channel,
_12
) -> None:
_12
self.client = client # OpenAI Realtime API client
_12
self.tools = tools # Optional tool context for function registration
_12
self._client_tool_futures = {} # For managing asynchronous tool calls
_12
self.channel = channel # Agora channel for audio communication
_12
self.subscribe_user = None # Will store the user ID we're subscribing to

Launch the Agent

The run method orchestrates the main operations of the RealtimeKitAgent. It manages audio streaming, processes tasks related to audio input, output, and model messages, and ensures exception handling is in place.


_55
async def run(self) -> None:
_55
try:
_55
# Helper function to log unhandled exceptions in tasks
_55
def log_exception(t: asyncio.Task[Any]) -> None:
_55
if not t.cancelled() and t.exception():
_55
logger.error(
_55
"unhandled exception",
_55
exc_info=t.exception(),
_55
)
_55
_55
logger.info("Waiting for remote user to join")
_55
# Wait for a remote user to join the channel
_55
self.subscribe_user = await wait_for_remote_user(self.channel)
_55
logger.info(f"Subscribing to user {self.subscribe_user}")
_55
# Subscribe to the audio of the joined user
_55
await self.channel.subscribe_audio(self.subscribe_user)
_55
_55
# Handle user leaving the channel
_55
async def on_user_left(agora_rtc_conn: RTCConnection, user_id: int, reason: int):
_55
logger.info(f"User left: {user_id}")
_55
if self.subscribe_user == user_id:
_55
self.subscribe_user = None
_55
logger.info("Subscribed user left, disconnecting")
_55
await self.channel.disconnect()
_55
_55
self.channel.on("user_left", on_user_left)
_55
_55
# Set up a future to track when the agent should disconnect
_55
disconnected_future = asyncio.Future[None]()
_55
_55
# Handle connection state changes
_55
def callback(agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason):
_55
logger.info(f"Connection state changed: {conn_info.state}")
_55
if conn_info.state == 1: # Disconnected state
_55
if not disconnected_future.done():
_55
disconnected_future.set_result(None)
_55
_55
self.channel.on("connection_state_changed", callback)
_55
_55
# Start tasks for streaming audio and processing messages
_55
asyncio.create_task(self._stream_input_audio_to_model()).add_done_callback(
_55
log_exception
_55
)
_55
asyncio.create_task(
_55
self._stream_audio_queue_to_audio_output()
_55
).add_done_callback(log_exception)
_55
asyncio.create_task(self._process_model_messages()).add_done_callback(
_55
log_exception
_55
)
_55
_55
# Wait until the agent is disconnected
_55
await disconnected_future
_55
logger.info("Agent finished running")
_55
except asyncio.CancelledError:
_55
logger.info("Agent cancelled")

Stream input audio to the AI model

The _stream_input_audio_to_model method captures audio frames from the Agora channel and sends them to the OpenAI API client for real-time processing by the AI model.


_12
async def _stream_input_audio_to_model(self) -> None:
_12
# Wait until we have a subscribed user
_12
while self.subscribe_user is None:
_12
await asyncio.sleep(0.1)
_12
# Get the audio frame stream for the subscribed user
_12
audio_frames = self.channel.get_audio_frames(self.subscribe_user)
_12
async for audio_frame in audio_frames:
_12
try:
_12
# Send the audio frame to the OpenAI model via the API client
_12
await self.client.send_audio_data(audio_frame.data)
_12
except Exception as e:
_12
logger.error(f"Error sending audio data to model: {e}")

Stream audio from the AI model to the user

The _stream_audio_queue_to_audio_output method handles the playback of processed audio data from the AI model. It retrieves audio frames from a queue and sends them to the Agora channel, allowing users to hear AI-generated responses in real-time.


_7
async def _stream_audio_queue_to_audio_output(self) -> None:
_7
while True:
_7
# Get the next audio frame from the queue (contains audio data from the model)
_7
frame = await self.audio_queue.get()
_7
# Send the frame to the Agora channel for playback to the user
_7
await self.channel.push_audio_frame(frame)
_7
await asyncio.sleep(0) # Allow other tasks to run

Process model messages

The _process_model_messages method listens for messages from the OpenAI API client and processes them based on their type. It handles a variety of message types, including audio deltas, transcriptions, and errors.


_53
async def _process_model_messages(self) -> None:
_53
# Listen for incoming messages from the OpenAI API client
_53
async for message in self.client.listen():
_53
# Process each type of message received from the client
_53
match message:
_53
case messages.ResponseAudioDelta():
_53
# Process incoming audio data from the model
_53
await self.audio_queue.put(base64.b64decode(message.delta))
_53
_53
case messages.ResponseAudioTranscriptDelta():
_53
# Handle incoming transcription updates
_53
logger.info(f"Received text message {message=}")
_53
await self.channel.chat.send_message(ChatMessage(message=message.model_dump_json(), msg_id=message.item_id))
_53
_53
case messages.ResponseAudioTranscriptDone():
_53
# Handle completed transcriptions
_53
logger.info(f"Text message done: {message=}")
_53
await self.channel.chat.send_message(ChatMessage(message=message.model_dump_json(), msg_id=message.item_id))
_53
_53
case messages.InputAudioBufferSpeechStarted():
_53
# Handle the start of speech in the input audio
_53
pass
_53
case messages.InputAudioBufferSpeechStopped():
_53
# Handle the end of speech in the input audio
_53
pass
_53
case messages.InputAudioBufferCommitted():
_53
# Handle when an input audio buffer is committed
_53
pass
_53
case messages.ItemCreated():
_53
# Handle when a new item is created in the conversation
_53
pass
_53
case messages.ResponseCreated():
_53
# Handle when a new response is created
_53
pass
_53
case messages.ResponseOutputItemAdded():
_53
# Handle when a new output item is added to the response
_53
pass
_53
case messages.ResponseContenPartAdded():
_53
# Handle when a new content part is added to the response
_53
pass
_53
case messages.ResponseAudioDone():
_53
# Handle when the audio response is complete
_53
pass
_53
case messages.ResponseContentPartDone():
_53
# Handle when a content part of the response is complete
_53
pass
_53
case messages.ResponseOutputItemDone():
_53
# Handle when an output item in the response is complete
_53
pass
_53
_53
case _:
_53
# Log any unhandled or unknown message types
_53
logger.warning(f"Unhandled message {message=}")

Main entry point

The main entry point of the application sets up the Agora RTC engine, configures the options, and launches the RealtimeKitAgent.


_42
if __name__ == "__main__":
_42
# Load environment variables from .env file
_42
load_dotenv()
_42
_42
# Parse command line arguments
_42
options = parse_args_realtimekit()
_42
logger.info(f"app_id: channel_id: {options['channel_name']}, uid: {options['uid']}")
_42
_42
# Ensure the Agora App ID is set
_42
if not os.environ.get("AGORA_APP_ID"):
_42
raise ValueError("Need to set environment variable AGORA_APP_ID")
_42
_42
# Run the RealtimeKitAgent
_42
asyncio.run(
_42
RealtimeKitAgent.entry_point(
_42
# Initialize the RtcEngine with Agora credentials
_42
engine=RtcEngine(appid=os.environ.get("AGORA_APP_ID"), appcert=os.environ.get("AGORA_APP_CERT")),
_42
# Configure RTC options
_42
options=RtcOptions(
_42
channel_name=options['channel_name'],
_42
uid=options['uid'],
_42
sample_rate=SAMPLE_RATE,
_42
channels=CHANNELS
_42
),
_42
# Configure inference settings
_42
inference_config=InferenceConfig(
_42
# Set up the AI assistant's behavior
_42
system_message="""\
_42
You are a helpful assistant. If asked about the weather make sure to use the provided tool to get that information. \
_42
If you are asked a question that requires a tool, say something like "working on that" and dont provide a concrete response \
_42
until you have received the response to the tool call.\
_42
""",
_42
voice=messages.Voices.Alloy,
_42
# Configure voice activity detection
_42
turn_detection=messages.ServerVAD(
_42
threshold=0.5,
_42
prefix_padding_ms=500,
_42
suffix_padding_ms=200,
_42
),
_42
),
_42
)
_42
)

Test the code

  1. Update the values for AGORA_APP_ID, AGORA_APP_CERT, and OPENAI_API_KEY in the project's .env file.

    This step ensures that the necessary credentials for Agora and OpenAI are correctly configured in your project.

  2. Execute the following command to run the demo:


    _1
    python3 agent.py --channel_name=your_channel_name --uid=your_user_id

    This command launches the agent.py script, initializing the Agora channel and the OpenAI API connection. Replace your_channel_name with the desired channel name and your_user_id with a unique user ID.

Reference

This section contains additional information or links to relevant documentation that complements the current page or explains other aspects of the product.

vundefined