Skip to content

Can I answer a call if I don't have "Event Grid" service and a "Registered Phone Number"? #81

@ANYMS-A

Description

@ANYMS-A

Hi,
Recently I am trying to build a voice AI agent with ACS. I tried to follow the intro from this example: https://github.com/Azure-Samples/communication-services-python-quickstarts/tree/main/callautomation-azure-openai-voice

However, I saw in the README, this example need a registered Phone number. But I thougt I can call with the created ACS user name right?
Besides, I don't have an Event Grid subscription yet. I only deploy my app to a Azure App Service to verify if it can work.

But I can only recieve the following events:

INFO:     xxx:xxx:xxx:xxxx- "POST /api/callbacks/incomingCall HTTP/1.1" 200 OK
INFO:main:Received Event:-> Microsoft.Communication.CallDisconnected, Correlation Id:-> 447dd7fd-0b4b-4279-b609-08792e7295cd, CallConnectionId:-> 07004c80-34ed-4107-bfbe-c93d89e77abd
INFO:     xxx:xxx:xxx:xxxx - "POST /api/callbacks/incomingCall HTTP/1.1" 200 OK
INFO:main:Received Event:-> Microsoft.Communication.CreateCallFailed, Correlation Id:-> 447dd7fd-0b4b-4279-b609-08792e7295cd, CallConnectionId:-> 07004c80-34ed-4107-bfbe-c93d89e77abd

I will paste my code below:

from fastapi.responses import JSONResponse
from urllib.parse import urlencode, urlparse, urlunparse
import logging
import json
from azure.communication.callautomation import (
    MediaStreamingOptions,
    AudioFormat,
    MediaStreamingContentType,
    MediaStreamingAudioChannelType,
    StreamingTransportType
    )
from azure.communication.callautomation.aio import (
    CallAutomationClient
    )
import os
import uuid
import uvicorn
from dotenv import load_dotenv


load_dotenv()

ACS_HOST = os.environ.get("ACS_HOST")
ACS_KEY = os.environ.get("ACS_KEY")
# Callback events URI to handle callback events.
CALLBACK_URI_HOST = os.environ.get("CALLBACK_URI_HOST")
CALLBACK_EVENTS_URI = CALLBACK_URI_HOST + "/api/callbacks"

ONE_ON_ONE_CALLER_MAP = {}

acs_client = CallAutomationClient(ACS_HOST, ACS_KEY)
app = FastAPI()

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@app.get("/api/getOneOnOneCallUser")
async def get_one_on_one_call_user():

    try:
        # Create an identity
        identity = acs_client.create_user()
        logging.info("\nCreated an caller identity with ID: " + identity.properties['id'])
        # Issue an access token with a validity of 24 hours and the "voip" scope for an identity
        token_result = acs_client.get_token(identity, ["voip"])
        logging.info("\nIssued an caller access token with 'voip' scope that expires at " + token_result.expires_on + ":")
        logging.info(token_result.token)

        # Create an identity
        identity_acc = acs_client.create_user()
        logging.info("\nCreated an accepter identity with ID: " + identity_acc.properties['id'])
        # Issue an access token with a validity of 24 hours and the "voip" scope for an identity
        token_result_acc = acs_client.get_token(identity_acc, ["voip"])
        logging.info("\nIssued an accepter access token with 'voip' scope that expires at " + token_result_acc.expires_on + ":")
        logging.info(token_result_acc.token)
        id_str = uuid.uuid4()
        data = {
            "caller": {
                "identity": identity,
                "token": token_result
            },
            "accepter": {
                "identity": identity_acc,
                "token": token_result_acc
            }
        }
        ONE_ON_ONE_CALLER_MAP[id_str] = data
        return JSONResponse(content=data, status_code=200)
    except Exception as e:
        logger.error(f"Error creating users or tokens: {e}")
        return JSONResponse(content={"error": "Failed to create users or tokens"}, status_code=500)


@app.post('/api/callbacks/incomingCall')
async def incoming_call_callbacks(request: Request):
    event_list = await request.json()
    for event in event_list:
        # Parsing callback events
        global call_connection_id
        event_data = event['data']
        call_connection_id = event_data["callConnectionId"]
        logger.info(f"Received Event:-> {event['type']}, Correlation Id:-> {event_data['correlationId']}, CallConnectionId:-> {call_connection_id}")
        if event['type'] == "Microsoft.Communication.IncomingCall":
            logger.info(f"Received IncomingCall event for connection id: {call_connection_id}")
            logger.info("Start answering call")
            incoming_call_response = await incoming_call_handler(event)
            return incoming_call_response
    return JSONResponse(content={}, status_code=200)

@app.post('/api/callbacks/{contextId}')
async def callbacks(contextId: str, request: Request):
     event_list = await request.json()
     for event in event_list:
        # Parsing callback events
        global call_connection_id
        event_data = event['data']
        call_connection_id = event_data["callConnectionId"]
        logger.info(f"Received Event:-> {event['type']}, Correlation Id:-> {event_data['correlationId']}, CallConnectionId:-> {call_connection_id}")
        if event['type'] == "Microsoft.Communication.CallConnected":
            call_connection_properties = await acs_client.get_call_connection(call_connection_id).get_call_properties()
            media_streaming_subscription = call_connection_properties.media_streaming_subscription
            logger.info(f"MediaStreamingSubscription:--> {media_streaming_subscription}")
            logger.info(f"Received CallConnected event for connection id: {call_connection_id}")
            logger.info("CORRELATION ID:--> %s", event_data["correlationId"])
            logger.info("CALL CONNECTION ID:--> %s", event_data["callConnectionId"])
        elif event['type'] == "Microsoft.Communication.MediaStreamingStarted":
            logger.info(f"Media streaming content type:--> {event_data['mediaStreamingUpdate']['contentType']}")
            logger.info(f"Media streaming status:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatus']}")
            logger.info(f"Media streaming status details:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatusDetails']}")
        elif event['type'] == "Microsoft.Communication.MediaStreamingStopped":
            logger.info(f"Media streaming content type:--> {event_data['mediaStreamingUpdate']['contentType']}")
            logger.info(f"Media streaming status:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatus']}")
            logger.info(f"Media streaming status details:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatusDetails']}")
        elif event['type'] == "Microsoft.Communication.MediaStreamingFailed":
            logger.info(f"Code:->{event_data['resultInformation']['code']}, Subcode:-> {event_data['resultInformation']['subCode']}")
            logger.info(f"Message:->{event_data['resultInformation']['message']}")
        elif event['type'] == "Microsoft.Communication.CallDisconnected":
            logger.info(f"Received CallDisconnected event for connection id: {call_connection_id}")

     return JSONResponse(content={}, status_code=200)

# WebSocket.
@app.websocket('/ws')
async def ws(websocket: WebSocket):
    await websocket.accept()
    print("Client connected to WebSocket")
    try:
        while True:
            # Receive data from the client
            data = await websocket.receive_text()
            print(f"Received data: {data}")
            data = json.loads(data)
            kind = data['kind']
            if kind == "AudioData":
                audio_data_section = data.get("audioData", {})
                if not audio_data_section.get("silent", True):
                    audio_data = audio_data_section.get("data")
                    logging.info(f"Received audio data, type {type(data)}, length: {len(audio_data)}")
    except WebSocketDisconnect:
        print("WebSocket connection closed")
    except Exception as e:
        print(f"WebSocket error: {e}")


@app.get('/')
def home():
    return 'Hello ACS CallAutomation!'


async def incoming_call_handler(event: dict):
    caller_id =  event["data"]['from']['rawId'] 
    logger.info("incoming call handler caller id: %s", caller_id)
    incoming_call_context=event.data['incomingCallContext']
    logging.info(f"incoming call context:{incoming_call_context}")
    guid =uuid.uuid4()
    query_parameters = urlencode({"callerId": caller_id})
    callback_uri = f"{CALLBACK_EVENTS_URI}/{guid}?{query_parameters}"
    
    parsed_url = urlparse(CALLBACK_EVENTS_URI)
    websocket_url = urlunparse(('wss',parsed_url.netloc,'/ws','', '', ''))

    logger.info("callback url: %s",  callback_uri)
    logger.info("websocket url: %s",  websocket_url)

    media_streaming_options = MediaStreamingOptions(
            transport_url=websocket_url,
            transport_type=StreamingTransportType.WEBSOCKET,
            content_type=MediaStreamingContentType.AUDIO,
            audio_channel_type=MediaStreamingAudioChannelType.MIXED,
            start_media_streaming=True,
            enable_bidirectional=True,
            audio_format=AudioFormat.PCM24_K_MONO)
    
    answer_call_result = await acs_client.answer_call(
        incoming_call_context=incoming_call_context,
        operation_context="incomingCall",
        callback_url=callback_uri, 
        media_streaming=media_streaming_options
    )
    logger.info("Answered call for connection id: %s", answer_call_result.call_connection_id)
    return JSONResponse(content={}, status_code=200)

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=8000)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions