Integrating Agora's real-time audio communication capabilities with OpenAI's language models enables dynamic, conversational AI experiences. This guide shows you how to set up a Python project that combines Agora's server-side Voice SDK with OpenAI's API to create an interactive, voice-driven assistant.
The RealtimeKitAgent
class manages the integration by connecting to an Agora channel for real-time audio streaming and to OpenAI's API for processing audio input and generating AI-driven responses. Audio frames captured from the Agora channel are streamed to OpenAI's API, where the AI processes the input. The API responses, which include transcribed text and synthesized voice output, are then delivered back to the Agora channel.
The code sets up tools that can be executed locally or passed through the API. This allows the AI to perform specific tasks, such as retrieving data from external sources. The agent processes various message types from OpenAI, such as audio responses, transcription updates, and error messages, and sends them to users through the Agora audio channel, facilitating continuous interaction.
The following figure illustrates the integration topology:
This guide walks you through the core elements of the Agora Conversational AI Demo integrating Agora's Python SDK with OpenAI's Realtime API:
-
Download the Agora Conversational AI Demo code.
-
The project is structured as follows:
_22 │ ├── requirements.txt
_22 │ ├── AccessToken2.py
_22 │ ├── RtcTokenBuilder2.py
_22 │ └── realtimekit_token_builder.py
_22 ├── mic_to_websocket.py
_22 ├── send_audio_to_websocket.py
This project uses the OpenAI realtimeapi-examples
package. Download the project and unzip it into your realtime-agent
folder.
Overview of key files:
agent.py
: The primary script responsible for executing the RealtimeKitAgent
. It integrates Agora's functionality from the rtc.py
module and OpenAI's capabilities from the realtimeapi
package.
rtc.py
: Contains an implementation of the server-side Agora Python Voice SDK.
parse_args.py
: Handles command-line argument parsing for the application.
realtimeapi/
: Contains the classes and methods that interact with OpenAI's Realtime API.
-
Create the .env
file by copying the .env.example
in the root of the repo
-
Fill in the values for the environment variables:
_6# OpenAI API key for authentication
-
Create a virtual environment and activate it:
_1python3 -m venv venv && source venv/bin/activate
-
Install the required dependencies:
_1pip install -r requirements.txt
-
Run the demo server:
_1python -m realtime_agent.agent --channel_name=<channel_name> --uid=<agent_uid>
The RealtimeKitAgent
class integrates Agora's audio communication capabilities with OpenAI's AI services. This class manages audio streams, handles communication with the OpenAI API, and processes AI-generated responses, providing a seamless conversational AI experience.
The setup_and_run_agent
method sets up the RealtimeKitAgent
by connecting to an Agora channel using the provided RtcEngine
and initializing a session with the OpenAI Realtime API client. It sends configuration messages to set up the session and define conversation parameters, such as the system message and output audio format, before starting the agent's operations. The method uses asynchronous execution to handle both listening for the session start and sending conversation configuration updates concurrently. It ensures that the connection is properly managed and cleaned up after use, even in cases of exceptions, early exits, or shutdowns.
UIDs in the Python SDK are set using a string value. Agora recommends using only numerical values for UID strings to ensure compatibility
with all Agora products and extensions.
_62async def setup_and_run_agent(
_62 inference_config: InferenceConfig,
_62 tools: ToolContext | None,
_62 # Create and connect to an Agora channel
_62 channel = engine.create_channel(options)
_62 await channel.connect()
_62 # Initialize the OpenAI Realtime API client
_62 async with RealtimeApiClient(
_62 base_uri="wss://api.openai.com",
_62 api_key=os.getenv("OPENAI_API_KEY"),
_62 # Update the session configuration
_62 await client.send_message(
_62 messages.SessionUpdate(
_62 session=messages.SessionUpdateParams(
_62 turn_detection=inference_config.turn_detection,
_62 tools=tools.model_description() if tools else None,
_62 instructions=inference_config.system_message,
_62 # Concurrently wait for the session to start and update the conversation config
_62 [start_session_message, _] = await asyncio.gather(
_62 anext(client.listen()),
_62 messages.UpdateConversationConfig(
_62 system_message=inference_config.system_message,
_62 output_audio_format=messages.AudioFormats.PCM16,
_62 voice=inference_config.voice,
_62 tools=tools.model_description() if tools else None,
_62 transcribe_input=False,
_62 f"Session started: {start_session_message.session.id} model: {start_session_message.session.model}"
_62 # Create and run the RealtimeKitAgent
_62 # Ensure the Agora engine is destroyed, even if an exception occurs
The RealtimeKitAgent
class constructor accepts an OpenAI RealtimeApiClient
, an optional ToolContext
for function registration, and an Agora channel for managing audio communication. This setup initializes the agent to process audio streams, register tools (if provided), and interacts with the AI model.
_12 client: RealtimeApiClient,
_12 tools: ToolContext | None,
_12 self.client = client # OpenAI Realtime API client
_12 self.tools = tools # Optional tool context for function registration
_12 self._client_tool_futures = {} # For managing asynchronous tool calls
_12 self.channel = channel # Agora channel for audio communication
_12 self.subscribe_user = None # Will store the user ID we're subscribing to
The run
method orchestrates the main operations of the RealtimeKitAgent
. It manages audio streaming, processes tasks related to audio input, output, and model messages, and ensures exception handling is in place.
_55async def run(self) -> None:
_55 # Helper function to log unhandled exceptions in tasks
_55 def log_exception(t: asyncio.Task[Any]) -> None:
_55 if not t.cancelled() and t.exception():
_55 "unhandled exception",
_55 exc_info=t.exception(),
_55 logger.info("Waiting for remote user to join")
_55 # Wait for a remote user to join the channel
_55 self.subscribe_user = await wait_for_remote_user(self.channel)
_55 logger.info(f"Subscribing to user {self.subscribe_user}")
_55 # Subscribe to the audio of the joined user
_55 await self.channel.subscribe_audio(self.subscribe_user)
_55 # Handle user leaving the channel
_55 async def on_user_left(agora_rtc_conn: RTCConnection, user_id: int, reason: int):
_55 logger.info(f"User left: {user_id}")
_55 if self.subscribe_user == user_id:
_55 self.subscribe_user = None
_55 logger.info("Subscribed user left, disconnecting")
_55 await self.channel.disconnect()
_55 self.channel.on("user_left", on_user_left)
_55 # Set up a future to track when the agent should disconnect
_55 disconnected_future = asyncio.Future[None]()
_55 # Handle connection state changes
_55 def callback(agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason):
_55 logger.info(f"Connection state changed: {conn_info.state}")
_55 if conn_info.state == 1: # Disconnected state
_55 if not disconnected_future.done():
_55 disconnected_future.set_result(None)
_55 self.channel.on("connection_state_changed", callback)
_55 # Start tasks for streaming audio and processing messages
_55 asyncio.create_task(self._stream_input_audio_to_model()).add_done_callback(
_55 self._stream_audio_queue_to_audio_output()
_55 ).add_done_callback(log_exception)
_55 asyncio.create_task(self._process_model_messages()).add_done_callback(
_55 # Wait until the agent is disconnected
_55 await disconnected_future
_55 logger.info("Agent finished running")
_55 except asyncio.CancelledError:
_55 logger.info("Agent cancelled")
The _stream_input_audio_to_model
method captures audio frames from the Agora channel and sends them to the OpenAI API client for real-time processing by the AI model.
_12async def _stream_input_audio_to_model(self) -> None:
_12 # Wait until we have a subscribed user
_12 while self.subscribe_user is None:
_12 await asyncio.sleep(0.1)
_12 # Get the audio frame stream for the subscribed user
_12 audio_frames = self.channel.get_audio_frames(self.subscribe_user)
_12 async for audio_frame in audio_frames:
_12 # Send the audio frame to the OpenAI model via the API client
_12 await self.client.send_audio_data(audio_frame.data)
_12 except Exception as e:
_12 logger.error(f"Error sending audio data to model: {e}")
The _stream_audio_queue_to_audio_output
method handles the playback of processed audio data from the AI model. It retrieves audio frames from a queue and sends them to the Agora channel, allowing users to hear AI-generated responses in real-time.
_7async def _stream_audio_queue_to_audio_output(self) -> None:
_7 # Get the next audio frame from the queue (contains audio data from the model)
_7 frame = await self.audio_queue.get()
_7 # Send the frame to the Agora channel for playback to the user
_7 await self.channel.push_audio_frame(frame)
_7 await asyncio.sleep(0) # Allow other tasks to run
The _process_model_messages
method listens for messages from the OpenAI API client and processes them based on their type. It handles a variety of message types, including audio deltas, transcriptions, and errors.
_53async def _process_model_messages(self) -> None:
_53 # Listen for incoming messages from the OpenAI API client
_53 async for message in self.client.listen():
_53 # Process each type of message received from the client
_53 case messages.ResponseAudioDelta():
_53 # Process incoming audio data from the model
_53 await self.audio_queue.put(base64.b64decode(message.delta))
_53 case messages.ResponseAudioTranscriptDelta():
_53 # Handle incoming transcription updates
_53 logger.info(f"Received text message {message=}")
_53 await self.channel.chat.send_message(ChatMessage(message=message.model_dump_json(), msg_id=message.item_id))
_53 case messages.ResponseAudioTranscriptDone():
_53 # Handle completed transcriptions
_53 logger.info(f"Text message done: {message=}")
_53 await self.channel.chat.send_message(ChatMessage(message=message.model_dump_json(), msg_id=message.item_id))
_53 case messages.InputAudioBufferSpeechStarted():
_53 # Handle the start of speech in the input audio
_53 case messages.InputAudioBufferSpeechStopped():
_53 # Handle the end of speech in the input audio
_53 case messages.InputAudioBufferCommitted():
_53 # Handle when an input audio buffer is committed
_53 case messages.ItemCreated():
_53 # Handle when a new item is created in the conversation
_53 case messages.ResponseCreated():
_53 # Handle when a new response is created
_53 case messages.ResponseOutputItemAdded():
_53 # Handle when a new output item is added to the response
_53 case messages.ResponseContenPartAdded():
_53 # Handle when a new content part is added to the response
_53 case messages.ResponseAudioDone():
_53 # Handle when the audio response is complete
_53 case messages.ResponseContentPartDone():
_53 # Handle when a content part of the response is complete
_53 case messages.ResponseOutputItemDone():
_53 # Handle when an output item in the response is complete
_53 # Log any unhandled or unknown message types
_53 logger.warning(f"Unhandled message {message=}")
The main entry point of the application sets up the Agora RTC engine, configures the options, and launches the RealtimeKitAgent.
_42if __name__ == "__main__":
_42 # Load environment variables from .env file
_42 # Parse command line arguments
_42 options = parse_args_realtimekit()
_42 logger.info(f"app_id: channel_id: {options['channel_name']}, uid: {options['uid']}")
_42 # Ensure the Agora App ID is set
_42 if not os.environ.get("AGORA_APP_ID"):
_42 raise ValueError("Need to set environment variable AGORA_APP_ID")
_42 # Run the RealtimeKitAgent
_42 RealtimeKitAgent.entry_point(
_42 # Initialize the RtcEngine with Agora credentials
_42 engine=RtcEngine(appid=os.environ.get("AGORA_APP_ID"), appcert=os.environ.get("AGORA_APP_CERT")),
_42 # Configure RTC options
_42 channel_name=options['channel_name'],
_42 sample_rate=SAMPLE_RATE,
_42 # Configure inference settings
_42 inference_config=InferenceConfig(
_42 # Set up the AI assistant's behavior
_42You are a helpful assistant. If asked about the weather make sure to use the provided tool to get that information. \
_42If you are asked a question that requires a tool, say something like "working on that" and dont provide a concrete response \
_42until you have received the response to the tool call.\
_42 voice=messages.Voices.Alloy,
_42 # Configure voice activity detection
_42 turn_detection=messages.ServerVAD(
_42 prefix_padding_ms=500,
_42 suffix_padding_ms=200,
-
Update the values for AGORA_APP_ID
, AGORA_APP_CERT
, and OPENAI_API_KEY
in the project's .env
file.
This step ensures that the necessary credentials for Agora and OpenAI are correctly configured in your project.
-
Execute the following command to run the demo:
_1python3 agent.py --channel_name=your_channel_name --uid=your_user_id
This command launches the agent.py
script, initializing the Agora channel and the OpenAI API connection. Replace your_channel_name
with the desired channel name and your_user_id
with a unique user ID.
This section contains additional information or links to relevant documentation that complements the current page or explains other aspects of the product.