Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
115 changes: 115 additions & 0 deletions callautomation-openai-voice/azureOpenAIService.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import asyncio
import json
from rtclient import (
RTLowLevelClient,
SessionUpdateMessage,
ServerVAD,
SessionUpdateParams,
InputAudioBufferAppendMessage,
InputAudioTranscription,
)
from azure.core.credentials import AzureKeyCredential
active_websocket = None
answer_prompt_system_template = "You are an AI assistant that helps people find information."
OPENAI_SERVICE_KEY = "<OPENAI_SERVICE_KEY>"
AZURE_OPENAI_DEPLOYMENT_MODEL_NAME = "gpt-4o-realtime-preview-2024-12-17"

async def start_conversation():
global client
client = RTLowLevelClient(url=AZURE_OPENAI_SERVICE_ENDPOINT, key_credential=AzureKeyCredential(OPENAI_SERVICE_KEY), azure_deployment=AZURE_OPENAI_DEPLOYMENT_MODEL_NAME)
await client.connect()
await client.send(
SessionUpdateMessage(
session=SessionUpdateParams(
instructions=answer_prompt_system_template,
turn_detection=ServerVAD(type="server_vad"),
voice= 'shimmer',
input_audio_format='pcm16',
output_audio_format='pcm16',
input_audio_transcription=InputAudioTranscription(model="whisper-1")
)
)
)

asyncio.create_task(receive_messages(client))

async def send_audio_to_external_ai(audioData: str):
await client.send(message=InputAudioBufferAppendMessage(type="input_audio_buffer.append", audio=audioData, _is_azure=True))

async def receive_messages(client: RTLowLevelClient):
while not client.closed:
message = await client.recv()
if message is None:
continue
match message.type:
case "session.created":
print("Session Created Message")
print(f" Session Id: {message.session.id}")
pass
case "error":
print(f" Error: {message.error}")
pass
case "input_audio_buffer.cleared":
print("Input Audio Buffer Cleared Message")
pass
case "input_audio_buffer.speech_started":
print(f"Voice activity detection started at {message.audio_start_ms} [ms]")
await stop_audio()
pass
case "input_audio_buffer.speech_stopped":
pass
case "conversation.item.input_audio_transcription.completed":
print(f" User:-- {message.transcript}")
case "conversation.item.input_audio_transcription.failed":
print(f" Error: {message.error}")
case "response.done":
print("Response Done Message")
print(f" Response Id: {message.response.id}")
if message.response.status_details:
print(f" Status Details: {message.response.status_details.model_dump_json()}")
case "response.audio_transcript.done":
print(f" AI:-- {message.transcript}")
case "response.audio.delta":
await receive_audio_for_outbound(message.delta)
pass
case _:
pass

async def init_websocket(socket):
global active_websocket
active_websocket = socket

async def receive_audio_for_outbound(data):
try:
data = {
"Kind": "AudioData",
"AudioData": {
"Data": data
},
"StopAudio": None
}

# Serialize the server streaming data
serialized_data = json.dumps(data)
await send_message(serialized_data)

except Exception as e:
print(e)

async def stop_audio():
stop_audio_data = {
"Kind": "StopAudio",
"AudioData": None,
"StopAudio": {}
}

json_data = json.dumps(stop_audio_data)
await send_message(json_data)

async def send_message(message: str):
global active_websocket
try:
await active_websocket.send(message)
except Exception as e:
print(f"Failed to send message: {e}")

134 changes: 134 additions & 0 deletions callautomation-openai-voice/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from quart import Quart, Response, request, json, redirect, websocket
from azure.eventgrid import EventGridEvent, SystemEventNames
from urllib.parse import urlencode, urljoin, urlparse, urlunparse
from logging import INFO
from azure.communication.callautomation import (
MediaStreamingOptions,
AudioFormat,
MediaStreamingTransportType,
MediaStreamingContentType,
MediaStreamingAudioChannelType,
)
from azure.communication.callautomation.aio import (
CallAutomationClient
)
import uuid
from azure.core.messaging import CloudEvent

from azureOpenAIService import init_websocket, start_conversation
from mediaStreamingHandler import process_websocket_message_async
from threading import Thread

# Your ACS resource connection string
ACS_CONNECTION_STRING = "ACS_CONNECTION_STRING"

# Callback events URI to handle callback events.
CALLBACK_URI_HOST = "CALLBACK_URI_HOST"
CALLBACK_EVENTS_URI = CALLBACK_URI_HOST + "/api/callbacks"

acs_client = CallAutomationClient.from_connection_string(ACS_CONNECTION_STRING)
app = Quart(__name__)

@app.route("/api/incomingCall", methods=['POST'])
async def incoming_call_handler():
app.logger.info("incoming event data")
for event_dict in await request.json:
event = EventGridEvent.from_dict(event_dict)
app.logger.info("incoming event data --> %s", event.data)
if event.event_type == SystemEventNames.EventGridSubscriptionValidationEventName:
app.logger.info("Validating subscription")
validation_code = event.data['validationCode']
validation_response = {'validationResponse': validation_code}
return Response(response=json.dumps(validation_response), status=200)
elif event.event_type =="Microsoft.Communication.IncomingCall":
app.logger.info("Incoming call received: data=%s",
event.data)
if event.data['from']['kind'] =="phoneNumber":
caller_id = event.data['from']["phoneNumber"]["value"]
else :
caller_id = event.data['from']['rawId']
app.logger.info("incoming call handler caller id: %s",
caller_id)
incoming_call_context=event.data['incomingCallContext']
guid =uuid.uuid4()
query_parameters = urlencode({"callerId": caller_id})
callback_uri = f"{CALLBACK_EVENTS_URI}/{guid}?{query_parameters}"

parsed_url = urlparse(CALLBACK_EVENTS_URI)
websocket_url = urlunparse(('wss',parsed_url.netloc,'/ws','', '', ''))

app.logger.info("callback url: %s", callback_uri)
app.logger.info("websocket url: %s", websocket_url)

media_streaming_options = MediaStreamingOptions(
transport_url=websocket_url,
transport_type=MediaStreamingTransportType.WEBSOCKET,
content_type=MediaStreamingContentType.AUDIO,
audio_channel_type=MediaStreamingAudioChannelType.MIXED,
start_media_streaming=True,
enable_bidirectional=True,
audio_format=AudioFormat.PCM24_K_MONO)

answer_call_result = await acs_client.answer_call(incoming_call_context=incoming_call_context,
operation_context="incomingCall",
callback_url=callback_uri,
media_streaming=media_streaming_options)
app.logger.info("Answered call for connection id: %s",
answer_call_result.call_connection_id)
return Response(status=200)

@app.route('/api/callbacks/<contextId>', methods=['POST'])
async def callbacks(contextId):
for event in await request.json:
# Parsing callback events
global call_connection_id
event_data = event['data']
call_connection_id = event_data["callConnectionId"]
app.logger.info(f"Received Event:-> {event['type']}, Correlation Id:-> {event_data['correlationId']}, CallConnectionId:-> {call_connection_id}")
if event['type'] == "Microsoft.Communication.CallConnected":
call_connection_properties = await acs_client.get_call_connection(call_connection_id).get_call_properties()
media_streaming_subscription = call_connection_properties.media_streaming_subscription
app.logger.info(f"MediaStreamingSubscription:--> {media_streaming_subscription}")
app.logger.info(f"Received CallConnected event for connection id: {call_connection_id}")
app.logger.info("CORRELATION ID:--> %s", event_data["correlationId"])
app.logger.info("CALL CONNECTION ID:--> %s", event_data["callConnectionId"])
elif event['type'] == "Microsoft.Communication.MediaStreamingStarted":
app.logger.info(f"Media streaming content type:--> {event_data['mediaStreamingUpdate']['contentType']}")
app.logger.info(f"Media streaming status:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatus']}")
app.logger.info(f"Media streaming status details:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatusDetails']}")
elif event['type'] == "Microsoft.Communication.MediaStreamingStopped":
app.logger.info(f"Media streaming content type:--> {event_data['mediaStreamingUpdate']['contentType']}")
app.logger.info(f"Media streaming status:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatus']}")
app.logger.info(f"Media streaming status details:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatusDetails']}")
elif event['type'] == "Microsoft.Communication.MediaStreamingFailed":
app.logger.info(f"Code:->{event_data['resultInformation']['code']}, Subcode:-> {event_data['resultInformation']['subCode']}")
app.logger.info(f"Message:->{event_data['resultInformation']['message']}")
elif event['type'] == "Microsoft.Communication.CallDisconnected":
pass
return Response(status=200)

# WebSocket.
@app.websocket('/ws')
async def ws():
print("Client connected to WebSocket")
await init_websocket(websocket)
await start_conversation()
while True:
try:
# Receive data from the client
data = await websocket.receive()
await process_websocket_message_async(data)
except Exception as e:
print(f"WebSocket connection closed: {e}")
break

@app.route('/')
def home():
return 'Hello ACS CallAutomation!'

if __name__ == '__main__':
app.logger.setLevel(INFO)
app.run(port=8080)



12 changes: 12 additions & 0 deletions callautomation-openai-voice/mediaStreamingHandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import json
from azureOpenAIService import send_audio_to_external_ai

async def process_websocket_message_async(stream_data):
try:
data = json.loads(stream_data)
kind = data['kind']
if kind == "AudioData":
audio_data = data["audioData"]["data"]
await send_audio_to_external_ai(audio_data)
except Exception as e:
print(f'Error processing WebSocket message: {e}')
58 changes: 58 additions & 0 deletions callautomation-openai-voice/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
|page_type| languages |products
|---|-----------------------------------------|---|
|sample| <table><tr><td>Python</tr></td></table> |<table><tr><td>azure</td><td>azure-communication-services</td></tr></table>|

# Call Automation - Quick Start Sample

This is a sample application demonstrated during Microsoft Ignite 2024. It highlights an integration of Azure Communication Services with OpenAI Service to enable intelligent conversational agents.

## Prerequisites

- An Azure account with an active subscription. [Create an account for free](https://azure.microsoft.com/free/?WT.mc_id=A261C142F).
- A deployed Communication Services resource. [Create a Communication Services resource](https://docs.microsoft.com/azure/communication-services/quickstarts/create-communication-resource).
- A [phone number](https://learn.microsoft.com/en-us/azure/communication-services/quickstarts/telephony/get-phone-number) in your Azure Communication Services resource that can get inbound calls. NB: phone numbers are not available in free subscriptions.
- [Python](https://www.python.org/downloads/) 3.7 or above.
- An OpenAI Resource and Deployed Model. See [instructions](https://platform.openai.com/docs/guides/realtime).

## Before running the sample for the first time

1. Open an instance of PowerShell, Windows Terminal, Command Prompt or equivalent and navigate to the directory that you would like to clone the sample to.
2. git clone `https://github.com/Azure-Samples/communication-services-python-quickstarts.git`.
3. Navigate to `callautomation-openai-voice` folder and open `main.py` file.

### Setup the Python environment

Create and activate python virtual environment and install required packages using following command
```
pip install -r requirements.txt
pip install -r ./aoai-whl/rtclient-0.5.1-py3-none-any.whl
```

### Setup and host your Azure DevTunnel

[Azure DevTunnels](https://learn.microsoft.com/en-us/azure/developer/dev-tunnels/overview) is an Azure service that enables you to share local web services hosted on the internet. Use the commands below to connect your local development environment to the public internet. This creates a tunnel with a persistent endpoint URL and which allows anonymous access. We will then use this endpoint to notify your application of calling events from the ACS Call Automation service.

```bash
devtunnel create --allow-anonymous
devtunnel port create -p 8080
devtunnel host
```

### Configuring application

Open `main.py` file to configure the following settings

1. `ACS_CONNECTION_STRING`: Azure Communication Service resource's connection string.
2. `CALLBACK_URI_HOST`: Base url of the app. (For local development use dev tunnel url)

Open `azureOpenAIService.py` file to configure the following settings

1. `OPENAI_SERVICE_KEY`: Azure Open AI service key

## Run app locally

1. Navigate to `callautomation-azure-openai-voice` folder and run `main.py` in debug mode or use command `python ./main.py` to run it from PowerShell, Command Prompt or Unix Terminal
2. Browser should pop up with the below page. If not navigate it to `http://localhost:8080/`or your dev tunnel url.
3. Register an EventGrid Webhook for the IncomingCall(`https://<devtunnelurl>/api/incomingCall`) event that points to your devtunnel URI. Instructions [here](https://learn.microsoft.com/en-us/azure/communication-services/concepts/call-automation/incoming-call-notification).

Once that's completed you should have a running application. The best way to test this is to place a call to your ACS phone number and talk to your intelligent agent.
4 changes: 4 additions & 0 deletions callautomation-openai-voice/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Quart>=0.19.6
azure-eventgrid==4.11.0
aiohttp>= 3.11.9
azure-communication-callautomation==1.4.0b1