diff --git a/callautomation-openai-voice/aoai-whl/rtclient-0.5.1-py3-none-any.whl b/callautomation-openai-voice/aoai-whl/rtclient-0.5.1-py3-none-any.whl new file mode 100644 index 0000000..8dfcd95 Binary files /dev/null and b/callautomation-openai-voice/aoai-whl/rtclient-0.5.1-py3-none-any.whl differ diff --git a/callautomation-openai-voice/azureOpenAIService.py b/callautomation-openai-voice/azureOpenAIService.py new file mode 100644 index 0000000..610f5e9 --- /dev/null +++ b/callautomation-openai-voice/azureOpenAIService.py @@ -0,0 +1,115 @@ +import asyncio +import json +from rtclient import ( + RTLowLevelClient, + SessionUpdateMessage, + ServerVAD, + SessionUpdateParams, + InputAudioBufferAppendMessage, + InputAudioTranscription, + ) +from azure.core.credentials import AzureKeyCredential +active_websocket = None +answer_prompt_system_template = "You are an AI assistant that helps people find information." +OPENAI_SERVICE_KEY = "" +AZURE_OPENAI_DEPLOYMENT_MODEL_NAME = "gpt-4o-realtime-preview-2024-12-17" + +async def start_conversation(): + global client + client = RTLowLevelClient(url=AZURE_OPENAI_SERVICE_ENDPOINT, key_credential=AzureKeyCredential(OPENAI_SERVICE_KEY), azure_deployment=AZURE_OPENAI_DEPLOYMENT_MODEL_NAME) + await client.connect() + await client.send( + SessionUpdateMessage( + session=SessionUpdateParams( + instructions=answer_prompt_system_template, + turn_detection=ServerVAD(type="server_vad"), + voice= 'shimmer', + input_audio_format='pcm16', + output_audio_format='pcm16', + input_audio_transcription=InputAudioTranscription(model="whisper-1") + ) + ) + ) + + asyncio.create_task(receive_messages(client)) + +async def send_audio_to_external_ai(audioData: str): + await client.send(message=InputAudioBufferAppendMessage(type="input_audio_buffer.append", audio=audioData, _is_azure=True)) + +async def receive_messages(client: RTLowLevelClient): + while not client.closed: + message = await client.recv() + if message is None: + continue + match message.type: + case "session.created": + print("Session Created Message") + print(f" Session Id: {message.session.id}") + pass + case "error": + print(f" Error: {message.error}") + pass + case "input_audio_buffer.cleared": + print("Input Audio Buffer Cleared Message") + pass + case "input_audio_buffer.speech_started": + print(f"Voice activity detection started at {message.audio_start_ms} [ms]") + await stop_audio() + pass + case "input_audio_buffer.speech_stopped": + pass + case "conversation.item.input_audio_transcription.completed": + print(f" User:-- {message.transcript}") + case "conversation.item.input_audio_transcription.failed": + print(f" Error: {message.error}") + case "response.done": + print("Response Done Message") + print(f" Response Id: {message.response.id}") + if message.response.status_details: + print(f" Status Details: {message.response.status_details.model_dump_json()}") + case "response.audio_transcript.done": + print(f" AI:-- {message.transcript}") + case "response.audio.delta": + await receive_audio_for_outbound(message.delta) + pass + case _: + pass + +async def init_websocket(socket): + global active_websocket + active_websocket = socket + +async def receive_audio_for_outbound(data): + try: + data = { + "Kind": "AudioData", + "AudioData": { + "Data": data + }, + "StopAudio": None + } + + # Serialize the server streaming data + serialized_data = json.dumps(data) + await send_message(serialized_data) + + except Exception as e: + print(e) + +async def stop_audio(): + stop_audio_data = { + "Kind": "StopAudio", + "AudioData": None, + "StopAudio": {} + } + + json_data = json.dumps(stop_audio_data) + await send_message(json_data) + +async def send_message(message: str): + global active_websocket + try: + await active_websocket.send(message) + except Exception as e: + print(f"Failed to send message: {e}") + diff --git a/callautomation-openai-voice/main.py b/callautomation-openai-voice/main.py new file mode 100644 index 0000000..3e2ca4c --- /dev/null +++ b/callautomation-openai-voice/main.py @@ -0,0 +1,134 @@ +from quart import Quart, Response, request, json, redirect, websocket +from azure.eventgrid import EventGridEvent, SystemEventNames +from urllib.parse import urlencode, urljoin, urlparse, urlunparse +from logging import INFO +from azure.communication.callautomation import ( + MediaStreamingOptions, + AudioFormat, + MediaStreamingTransportType, + MediaStreamingContentType, + MediaStreamingAudioChannelType, + ) +from azure.communication.callautomation.aio import ( + CallAutomationClient + ) +import uuid +from azure.core.messaging import CloudEvent + +from azureOpenAIService import init_websocket, start_conversation +from mediaStreamingHandler import process_websocket_message_async +from threading import Thread + +# Your ACS resource connection string +ACS_CONNECTION_STRING = "ACS_CONNECTION_STRING" + +# Callback events URI to handle callback events. +CALLBACK_URI_HOST = "CALLBACK_URI_HOST" +CALLBACK_EVENTS_URI = CALLBACK_URI_HOST + "/api/callbacks" + +acs_client = CallAutomationClient.from_connection_string(ACS_CONNECTION_STRING) +app = Quart(__name__) + +@app.route("/api/incomingCall", methods=['POST']) +async def incoming_call_handler(): + app.logger.info("incoming event data") + for event_dict in await request.json: + event = EventGridEvent.from_dict(event_dict) + app.logger.info("incoming event data --> %s", event.data) + if event.event_type == SystemEventNames.EventGridSubscriptionValidationEventName: + app.logger.info("Validating subscription") + validation_code = event.data['validationCode'] + validation_response = {'validationResponse': validation_code} + return Response(response=json.dumps(validation_response), status=200) + elif event.event_type =="Microsoft.Communication.IncomingCall": + app.logger.info("Incoming call received: data=%s", + event.data) + if event.data['from']['kind'] =="phoneNumber": + caller_id = event.data['from']["phoneNumber"]["value"] + else : + caller_id = event.data['from']['rawId'] + app.logger.info("incoming call handler caller id: %s", + caller_id) + incoming_call_context=event.data['incomingCallContext'] + guid =uuid.uuid4() + query_parameters = urlencode({"callerId": caller_id}) + callback_uri = f"{CALLBACK_EVENTS_URI}/{guid}?{query_parameters}" + + parsed_url = urlparse(CALLBACK_EVENTS_URI) + websocket_url = urlunparse(('wss',parsed_url.netloc,'/ws','', '', '')) + + app.logger.info("callback url: %s", callback_uri) + app.logger.info("websocket url: %s", websocket_url) + + media_streaming_options = MediaStreamingOptions( + transport_url=websocket_url, + transport_type=MediaStreamingTransportType.WEBSOCKET, + content_type=MediaStreamingContentType.AUDIO, + audio_channel_type=MediaStreamingAudioChannelType.MIXED, + start_media_streaming=True, + enable_bidirectional=True, + audio_format=AudioFormat.PCM24_K_MONO) + + answer_call_result = await acs_client.answer_call(incoming_call_context=incoming_call_context, + operation_context="incomingCall", + callback_url=callback_uri, + media_streaming=media_streaming_options) + app.logger.info("Answered call for connection id: %s", + answer_call_result.call_connection_id) + return Response(status=200) + +@app.route('/api/callbacks/', methods=['POST']) +async def callbacks(contextId): + for event in await request.json: + # Parsing callback events + global call_connection_id + event_data = event['data'] + call_connection_id = event_data["callConnectionId"] + app.logger.info(f"Received Event:-> {event['type']}, Correlation Id:-> {event_data['correlationId']}, CallConnectionId:-> {call_connection_id}") + if event['type'] == "Microsoft.Communication.CallConnected": + call_connection_properties = await acs_client.get_call_connection(call_connection_id).get_call_properties() + media_streaming_subscription = call_connection_properties.media_streaming_subscription + app.logger.info(f"MediaStreamingSubscription:--> {media_streaming_subscription}") + app.logger.info(f"Received CallConnected event for connection id: {call_connection_id}") + app.logger.info("CORRELATION ID:--> %s", event_data["correlationId"]) + app.logger.info("CALL CONNECTION ID:--> %s", event_data["callConnectionId"]) + elif event['type'] == "Microsoft.Communication.MediaStreamingStarted": + app.logger.info(f"Media streaming content type:--> {event_data['mediaStreamingUpdate']['contentType']}") + app.logger.info(f"Media streaming status:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatus']}") + app.logger.info(f"Media streaming status details:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatusDetails']}") + elif event['type'] == "Microsoft.Communication.MediaStreamingStopped": + app.logger.info(f"Media streaming content type:--> {event_data['mediaStreamingUpdate']['contentType']}") + app.logger.info(f"Media streaming status:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatus']}") + app.logger.info(f"Media streaming status details:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatusDetails']}") + elif event['type'] == "Microsoft.Communication.MediaStreamingFailed": + app.logger.info(f"Code:->{event_data['resultInformation']['code']}, Subcode:-> {event_data['resultInformation']['subCode']}") + app.logger.info(f"Message:->{event_data['resultInformation']['message']}") + elif event['type'] == "Microsoft.Communication.CallDisconnected": + pass + return Response(status=200) + +# WebSocket. +@app.websocket('/ws') +async def ws(): + print("Client connected to WebSocket") + await init_websocket(websocket) + await start_conversation() + while True: + try: + # Receive data from the client + data = await websocket.receive() + await process_websocket_message_async(data) + except Exception as e: + print(f"WebSocket connection closed: {e}") + break + +@app.route('/') +def home(): + return 'Hello ACS CallAutomation!' + +if __name__ == '__main__': + app.logger.setLevel(INFO) + app.run(port=8080) + + + diff --git a/callautomation-openai-voice/mediaStreamingHandler.py b/callautomation-openai-voice/mediaStreamingHandler.py new file mode 100644 index 0000000..eaa2373 --- /dev/null +++ b/callautomation-openai-voice/mediaStreamingHandler.py @@ -0,0 +1,12 @@ +import json +from azureOpenAIService import send_audio_to_external_ai + +async def process_websocket_message_async(stream_data): + try: + data = json.loads(stream_data) + kind = data['kind'] + if kind == "AudioData": + audio_data = data["audioData"]["data"] + await send_audio_to_external_ai(audio_data) + except Exception as e: + print(f'Error processing WebSocket message: {e}') \ No newline at end of file diff --git a/callautomation-openai-voice/readme.md b/callautomation-openai-voice/readme.md new file mode 100644 index 0000000..44fd52c --- /dev/null +++ b/callautomation-openai-voice/readme.md @@ -0,0 +1,58 @@ +|page_type| languages |products +|---|-----------------------------------------|---| +|sample|
Python
|
azureazure-communication-services
| + +# Call Automation - Quick Start Sample + +This is a sample application demonstrated during Microsoft Ignite 2024. It highlights an integration of Azure Communication Services with OpenAI Service to enable intelligent conversational agents. + +## Prerequisites + +- An Azure account with an active subscription. [Create an account for free](https://azure.microsoft.com/free/?WT.mc_id=A261C142F). +- A deployed Communication Services resource. [Create a Communication Services resource](https://docs.microsoft.com/azure/communication-services/quickstarts/create-communication-resource). +- A [phone number](https://learn.microsoft.com/en-us/azure/communication-services/quickstarts/telephony/get-phone-number) in your Azure Communication Services resource that can get inbound calls. NB: phone numbers are not available in free subscriptions. +- [Python](https://www.python.org/downloads/) 3.7 or above. +- An OpenAI Resource and Deployed Model. See [instructions](https://platform.openai.com/docs/guides/realtime). + +## Before running the sample for the first time + +1. Open an instance of PowerShell, Windows Terminal, Command Prompt or equivalent and navigate to the directory that you would like to clone the sample to. +2. git clone `https://github.com/Azure-Samples/communication-services-python-quickstarts.git`. +3. Navigate to `callautomation-openai-voice` folder and open `main.py` file. + +### Setup the Python environment + +Create and activate python virtual environment and install required packages using following command +``` +pip install -r requirements.txt +pip install -r ./aoai-whl/rtclient-0.5.1-py3-none-any.whl +``` + +### Setup and host your Azure DevTunnel + +[Azure DevTunnels](https://learn.microsoft.com/en-us/azure/developer/dev-tunnels/overview) is an Azure service that enables you to share local web services hosted on the internet. Use the commands below to connect your local development environment to the public internet. This creates a tunnel with a persistent endpoint URL and which allows anonymous access. We will then use this endpoint to notify your application of calling events from the ACS Call Automation service. + +```bash +devtunnel create --allow-anonymous +devtunnel port create -p 8080 +devtunnel host +``` + +### Configuring application + +Open `main.py` file to configure the following settings + +1. `ACS_CONNECTION_STRING`: Azure Communication Service resource's connection string. +2. `CALLBACK_URI_HOST`: Base url of the app. (For local development use dev tunnel url) + +Open `azureOpenAIService.py` file to configure the following settings + +1. `OPENAI_SERVICE_KEY`: Azure Open AI service key + +## Run app locally + +1. Navigate to `callautomation-azure-openai-voice` folder and run `main.py` in debug mode or use command `python ./main.py` to run it from PowerShell, Command Prompt or Unix Terminal +2. Browser should pop up with the below page. If not navigate it to `http://localhost:8080/`or your dev tunnel url. +3. Register an EventGrid Webhook for the IncomingCall(`https:///api/incomingCall`) event that points to your devtunnel URI. Instructions [here](https://learn.microsoft.com/en-us/azure/communication-services/concepts/call-automation/incoming-call-notification). + +Once that's completed you should have a running application. The best way to test this is to place a call to your ACS phone number and talk to your intelligent agent. diff --git a/callautomation-openai-voice/requirements.txt b/callautomation-openai-voice/requirements.txt new file mode 100644 index 0000000..ba72a08 --- /dev/null +++ b/callautomation-openai-voice/requirements.txt @@ -0,0 +1,4 @@ +Quart>=0.19.6 +azure-eventgrid==4.11.0 +aiohttp>= 3.11.9 +azure-communication-callautomation==1.4.0b1 \ No newline at end of file