diff --git a/demo-app/src/main/kotlin/io/getstream/video/android/App.kt b/demo-app/src/main/kotlin/io/getstream/video/android/App.kt index aa0ea51938..58f5a473f5 100644 --- a/demo-app/src/main/kotlin/io/getstream/video/android/App.kt +++ b/demo-app/src/main/kotlin/io/getstream/video/android/App.kt @@ -20,6 +20,9 @@ import android.app.Application import android.content.Context import dagger.hilt.android.HiltAndroidApp import io.getstream.android.video.generated.models.CallEndedEvent +import io.getstream.android.video.generated.models.CustomVideoEvent +import io.getstream.video.android.DemoCallJoinInterceptor.Companion.CALLEE_READY_TO_JOIN_EVENT_TYPE +import io.getstream.video.android.DemoCallJoinInterceptor.Companion.CALLER_READY_TO_JOIN_EVENT_TYPE import io.getstream.video.android.core.StreamVideo import io.getstream.video.android.core.moderations.CallModerationConstants import io.getstream.video.android.data.model.PolicyViolationUiData @@ -39,9 +42,19 @@ import kotlinx.coroutines.runBlocking @HiltAndroidApp class App : Application() { + companion object { + lateinit var demoApp: App + } + + internal var policyViolationUiData: MutableStateFlow = + MutableStateFlow(null) + + public val callerReadyToJoinFlow = MutableStateFlow(null) + public val calleeReadyToJoinFlow = MutableStateFlow(null) + override fun onCreate() { super.onCreate() - + demoApp = this // We use the provided StreamUserDataStore in the demo app for user data storage. // This is a convenience class provided for storage but the SDK itself is not aware of // this instance and doesn't use it. You can use it to store the logged in user and then @@ -66,10 +79,31 @@ class App : Application() { } observePolicyViolation() + observeCallReadyToJoin() } - internal var policyViolationUiData: MutableStateFlow = - MutableStateFlow(null) + private fun observeCallReadyToJoin() { + CoroutineScope(Dispatchers.Default).launch { + StreamVideo.instanceState + .flatMapLatest { instance -> + instance?.state?.ringingCall ?: flowOf(null) + }.filterNotNull() + .collectLatest { call -> + call.events.collectLatest { event -> + if (event is CustomVideoEvent) { + when (event.custom["type"]) { + CALLER_READY_TO_JOIN_EVENT_TYPE -> { + callerReadyToJoinFlow.value = event + } + CALLEE_READY_TO_JOIN_EVENT_TYPE -> { + calleeReadyToJoinFlow.value = event + } + } + } + } + } + } + } private fun observePolicyViolation() { CoroutineScope(Dispatchers.Default).launch { diff --git a/demo-app/src/main/kotlin/io/getstream/video/android/CallActivity.kt b/demo-app/src/main/kotlin/io/getstream/video/android/CallActivity.kt index 9e8da489a6..bf4df4beff 100644 --- a/demo-app/src/main/kotlin/io/getstream/video/android/CallActivity.kt +++ b/demo-app/src/main/kotlin/io/getstream/video/android/CallActivity.kt @@ -35,6 +35,7 @@ import io.getstream.video.android.compose.ui.ComposeStreamCallActivity import io.getstream.video.android.compose.ui.StreamCallActivityComposeDelegate import io.getstream.video.android.core.Call import io.getstream.video.android.core.MemberState +import io.getstream.video.android.core.RingingState import io.getstream.video.android.core.StreamVideo import io.getstream.video.android.core.call.state.CallAction import io.getstream.video.android.datastore.delegate.StreamUserDataStore @@ -45,12 +46,29 @@ import io.getstream.video.android.ui.common.StreamCallActivityConfiguration import io.getstream.video.android.ui.common.util.StreamCallActivityDelicateApi import io.getstream.video.android.util.FullScreenCircleProgressBar import io.getstream.video.android.util.StreamVideoInitHelper +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.flow.filterNotNull +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import java.util.concurrent.ConcurrentHashMap @OptIn(StreamCallActivityDelicateApi::class) class CallActivity : ComposeStreamCallActivity() { + companion object { + var USE_CALL_JOIN_INTERCEPTOR = false + } + override val uiDelegate: StreamActivityUiDelegate = StreamDemoUiDelegate() + var observeCallReadyToJoinJob: Job? = null + var observeRingingJob: Job? = null + private val previousRingingStates = ConcurrentHashMap.newKeySet() + override val callJoinInterceptor = DemoCallJoinInterceptor(previousRingingStates) /** * This code is required to pass the UI-tests (as it hardcodes the configuration) @@ -61,6 +79,27 @@ class CallActivity : ComposeStreamCallActivity() { .copy(closeScreenOnCallEnded = false, canSkipPermissionRationale = false) } + override fun onCreate(savedInstanceState: Bundle?) { + super.onCreate(savedInstanceState) + observeRingingState() + } + + private fun observeRingingState() { + previousRingingStates.clear() + observeRingingJob?.cancel() + observeRingingJob = CoroutineScope(Dispatchers.Default).launch { + StreamVideo.instanceState + .flatMapLatest { instance -> + instance?.state?.ringingCall ?: flowOf(null) + }.filterNotNull() + .collectLatest { call -> + call.state.ringingState.collectLatest { + previousRingingStates.add(it) + } + } + } + } + @StreamCallActivityDelicateApi override fun onPreCreate(savedInstanceState: Bundle?, persistentState: PersistableBundle?) { runBlocking { @@ -163,4 +202,11 @@ class CallActivity : ComposeStreamCallActivity() { } } } + + override fun finish() { + super.finish() + observeCallReadyToJoinJob?.cancel() + observeRingingJob?.cancel() + previousRingingStates.clear() + } } diff --git a/demo-app/src/main/kotlin/io/getstream/video/android/DemoCallJoinInterceptor.kt b/demo-app/src/main/kotlin/io/getstream/video/android/DemoCallJoinInterceptor.kt new file mode 100644 index 0000000000..3c1ab350ff --- /dev/null +++ b/demo-app/src/main/kotlin/io/getstream/video/android/DemoCallJoinInterceptor.kt @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android + +import io.getstream.log.taggedLogger +import io.getstream.video.android.CallActivity.Companion.USE_CALL_JOIN_INTERCEPTOR +import io.getstream.video.android.core.Call +import io.getstream.video.android.core.CallJoinInterceptor +import io.getstream.video.android.core.RingingState +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.first + +/** + * Do the following changes before testing this flow + * Set [io.getstream.video.android.core.INTERCEPTOR_TIMEOUT_MS] to 10_000L + * Set [io.getstream.video.android.core.PEER_CONNECTION_OBSERVER_TIMEOUT] to 10_000L + */ +class DemoCallJoinInterceptor( + private val previousRingingStates: Set, +) : CallJoinInterceptor { + private val logger by taggedLogger("DemoCallJoinInterceptor") + + companion object { + const val CALLER_READY_TO_JOIN_EVENT_TYPE = "caller_ready_join" + const val CALLEE_READY_TO_JOIN_EVENT_TYPE = "callee_ready_join" + } + + override suspend fun callReadyToJoin(call: Call) { + if (USE_CALL_JOIN_INTERCEPTOR) { + val isIncomingOrOutgoing = previousRingingStates.firstOrNull { + it is RingingState.Incoming || it is RingingState.Outgoing + } + + val isOutgoing = isIncomingOrOutgoing is RingingState.Outgoing + val isIncoming = isIncomingOrOutgoing is RingingState.Incoming + + val currentUserId = call.user.id + if (isIncoming) { + val result = call.sendCustomEvent( + mapOf( + "type" to CALLEE_READY_TO_JOIN_EVENT_TYPE, + "user_id" to currentUserId, + ), + ) + if (result.isSuccess) { + logger.d { "[callReadyToJoin] Successfully sent custom $CALLEE_READY_TO_JOIN_EVENT_TYPE event" } + App.demoApp.callerReadyToJoinFlow.filter { it != null && it.callCid == call.cid }.first() + } else { + logger.d { "[callReadyToJoin] Failed to send custom $CALLEE_READY_TO_JOIN_EVENT_TYPE event" } + } + logger.d { "[callReadyToJoin] callerReadyToJoinFlow finish" } + } else if (isOutgoing) { + val result = call.sendCustomEvent( + mapOf( + "type" to CALLER_READY_TO_JOIN_EVENT_TYPE, + "user_id" to currentUserId, + ), + ) + + if (result.isSuccess) { + logger.d { "[callReadyToJoin] Successfully sent custom $CALLER_READY_TO_JOIN_EVENT_TYPE event" } + App.demoApp.calleeReadyToJoinFlow.filter { it != null && it.callCid == call.cid }.first() + } else { + logger.d { "[callReadyToJoin] Failed to send custom $CALLER_READY_TO_JOIN_EVENT_TYPE event" } + } + logger.d { "[callReadyToJoin] calleeReadyToJoinFlow finish" } + } + } + } +} diff --git a/demo-app/src/main/kotlin/io/getstream/video/android/ui/DogfoodingNavHost.kt b/demo-app/src/main/kotlin/io/getstream/video/android/ui/DogfoodingNavHost.kt index 1289b28852..49f92f0816 100644 --- a/demo-app/src/main/kotlin/io/getstream/video/android/ui/DogfoodingNavHost.kt +++ b/demo-app/src/main/kotlin/io/getstream/video/android/ui/DogfoodingNavHost.kt @@ -91,7 +91,8 @@ fun AppNavHost( composable(AppScreens.DirectCallJoin.route) { val context = LocalContext.current DirectCallJoinScreen( - navigateToDirectCall = { cid, members, joinAndRing -> + navigateToDirectCall = { cid, members, joinAndRing, useCallJoinInterceptor -> + CallActivity.USE_CALL_JOIN_INTERCEPTOR = useCallJoinInterceptor context.startActivity( StreamCallActivity.callIntent( action = NotificationHandler.ACTION_OUTGOING_CALL, diff --git a/demo-app/src/main/kotlin/io/getstream/video/android/ui/outgoing/DirectCallJoinScreen.kt b/demo-app/src/main/kotlin/io/getstream/video/android/ui/outgoing/DirectCallJoinScreen.kt index 00e4e8bebc..27eb563b05 100644 --- a/demo-app/src/main/kotlin/io/getstream/video/android/ui/outgoing/DirectCallJoinScreen.kt +++ b/demo-app/src/main/kotlin/io/getstream/video/android/ui/outgoing/DirectCallJoinScreen.kt @@ -58,6 +58,7 @@ import androidx.compose.ui.unit.dp import androidx.compose.ui.unit.sp import androidx.hilt.navigation.compose.hiltViewModel import androidx.lifecycle.compose.collectAsStateWithLifecycle +import io.getstream.video.android.R import io.getstream.video.android.compose.theme.VideoTheme import io.getstream.video.android.compose.ui.components.avatar.UserAvatar import io.getstream.video.android.compose.ui.components.base.StreamButton @@ -69,7 +70,12 @@ import java.util.UUID @Composable fun DirectCallJoinScreen( viewModel: DirectCallJoinViewModel = hiltViewModel(), - navigateToDirectCall: (cid: StreamCallId, memberList: String, joinAndRing: Boolean) -> Unit, + navigateToDirectCall: ( + cid: StreamCallId, + memberList: String, + joinAndRing: Boolean, + useCallJoinInterceptor: Boolean, + ) -> Unit, ) { val uiState by viewModel.uiState.collectAsStateWithLifecycle() @@ -98,7 +104,7 @@ private fun Header(user: User?) { Column( modifier = Modifier .fillMaxWidth() - .padding(24.dp) // Outer padding + .padding(start = 24.dp, end = 24.dp, top = 24.dp) // Outer padding .padding(vertical = 12.dp), // Inner padding verticalArrangement = Arrangement.Center, ) { @@ -135,9 +141,15 @@ private fun Header(user: User?) { private fun Body( uiState: DirectCallUiState, toggleUserSelection: (Int) -> Unit, - onStartCallClick: (cid: StreamCallId, membersList: String, joinAndRing: Boolean) -> Unit, + onStartCallClick: ( + cid: StreamCallId, + membersList: String, + joinAndRing: Boolean, + useCallJoinInterceptor: Boolean, + ) -> Unit, ) { var callerJoinsFirst by rememberSaveable { mutableStateOf(true) } + var useCallJoinInterceptor by rememberSaveable { mutableStateOf(false) } Box( modifier = Modifier @@ -161,11 +173,10 @@ private fun Body( Row( verticalAlignment = CenterVertically, modifier = Modifier - .fillMaxWidth() - .padding(vertical = 10.dp), + .fillMaxWidth(), horizontalArrangement = Arrangement.SpaceBetween, ) { - Text("Join First", color = Color.White) + Text(stringResource(id = R.string.join_first), color = Color.White) Checkbox( callerJoinsFirst, modifier = Modifier.offset(x = 10.dp), @@ -179,6 +190,27 @@ private fun Body( }, ) } + Row( + verticalAlignment = CenterVertically, + modifier = Modifier + .fillMaxWidth() + .padding(vertical = 10.dp), + horizontalArrangement = Arrangement.SpaceBetween, + ) { + Text(stringResource(id = R.string.use_call_join_interceptor), color = Color.White) + Checkbox( + useCallJoinInterceptor, + modifier = Modifier.offset(x = 10.dp), + colors = CheckboxDefaults.colors( + uncheckedColor = Color.White, // Border color when unchecked + checkedColor = Color.White, // Fill color when checked + checkmarkColor = VideoTheme.colors.buttonBrandDefault, // Tick color + ), + onCheckedChange = { + useCallJoinInterceptor = !useCallJoinInterceptor + }, + ) + } UserList( entries = users, onUserClick = { clickedIndex -> toggleUserSelection(clickedIndex) }, @@ -204,11 +236,11 @@ private fun Body( onClick = { onStartCallClick( StreamCallId("audio_call", UUID.randomUUID().toString()), -// StreamCallId("default", UUID.randomUUID().toString()), users .filter { it.isSelected } .joinToString(separator = ",") { it.user.id ?: "" }, callerJoinsFirst, + useCallJoinInterceptor, ) }, ) @@ -230,6 +262,7 @@ private fun Body( .filter { it.isSelected } .joinToString(separator = ",") { it.user.id ?: "" }, callerJoinsFirst, + useCallJoinInterceptor, ) }, ) @@ -326,7 +359,7 @@ private fun HeaderPreview() { }, ), toggleUserSelection = {}, - ) { _, _, _ -> + ) { _, _, _, _ -> } } } diff --git a/demo-app/src/main/res/values/strings.xml b/demo-app/src/main/res/values/strings.xml index 1e43309b4a..91c04e705a 100644 --- a/demo-app/src/main/res/values/strings.xml +++ b/demo-app/src/main/res/values/strings.xml @@ -62,6 +62,8 @@ We have to end your call as we have detected policy violation OK Warning + Join First + Use Call Join Interceptor %s is typing diff --git a/stream-video-android-core/api/stream-video-android-core.api b/stream-video-android-core/api/stream-video-android-core.api index 1680fa52a1..1abcb7fad1 100644 --- a/stream-video-android-core/api/stream-video-android-core.api +++ b/stream-video-android-core/api/stream-video-android-core.api @@ -8593,10 +8593,10 @@ public final class io/getstream/video/android/core/Call { public final fun isPinnedParticipant (Ljava/lang/String;)Z public final fun isServerPin (Ljava/lang/String;)Z public final fun isVideoEnabled ()Z - public final fun join (ZLio/getstream/video/android/core/CreateCallOptions;ZZLjava/lang/Boolean;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static synthetic fun join$default (Lio/getstream/video/android/core/Call;ZLio/getstream/video/android/core/CreateCallOptions;ZZLjava/lang/Boolean;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; - public final fun joinAndRing (Ljava/util/List;Lio/getstream/video/android/core/CreateCallOptions;ZLkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static synthetic fun joinAndRing$default (Lio/getstream/video/android/core/Call;Ljava/util/List;Lio/getstream/video/android/core/CreateCallOptions;ZLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public final fun join (ZLio/getstream/video/android/core/CreateCallOptions;ZZLjava/lang/Boolean;Lio/getstream/video/android/core/CallJoinInterceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun join$default (Lio/getstream/video/android/core/Call;ZLio/getstream/video/android/core/CreateCallOptions;ZZLjava/lang/Boolean;Lio/getstream/video/android/core/CallJoinInterceptor;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public final fun joinAndRing (Ljava/util/List;Lio/getstream/video/android/core/CreateCallOptions;ZLio/getstream/video/android/core/CallJoinInterceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun joinAndRing$default (Lio/getstream/video/android/core/Call;Ljava/util/List;Lio/getstream/video/android/core/CreateCallOptions;ZLio/getstream/video/android/core/CallJoinInterceptor;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public final fun kickUser (Ljava/lang/String;ZLkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun kickUser$default (Lio/getstream/video/android/core/Call;Ljava/lang/String;ZLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public final fun leave (Ljava/lang/String;)V @@ -8686,6 +8686,16 @@ public final class io/getstream/video/android/core/CallHealthMonitor { public final fun stopTimer ()V } +public final class io/getstream/video/android/core/CallJoinInterceptionException : java/lang/RuntimeException { + public fun (Ljava/lang/String;Ljava/lang/Throwable;)V + public synthetic fun (Ljava/lang/String;Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public final fun getReason ()Ljava/lang/String; +} + +public abstract interface class io/getstream/video/android/core/CallJoinInterceptor { + public abstract fun callReadyToJoin (Lio/getstream/video/android/core/Call;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + public final class io/getstream/video/android/core/CallKt { public static final field sfuReconnectTimeoutMillis I } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/android/video/generated/models/CustomVideoEvent.kt b/stream-video-android-core/src/main/kotlin/io/getstream/android/video/generated/models/CustomVideoEvent.kt index de571929fe..c32c4c4514 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/android/video/generated/models/CustomVideoEvent.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/android/video/generated/models/CustomVideoEvent.kt @@ -63,5 +63,13 @@ data class CustomVideoEvent ( override fun getCallCID(): kotlin.String { return callCid - } + } + + override fun toString(): String = buildString { + append("CustomVideoEvent(") + append("callCid='").append(callCid).append("', ") + append("custom=").append(custom).append(", ") + append("type='").append(type).append("'") + append(")") + } } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt index 8f56483f88..0be74b7767 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt @@ -16,10 +16,12 @@ package io.getstream.video.android.core +import androidx.lifecycle.AtomicReference import io.getstream.log.taggedLogger +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterNotNull import kotlinx.coroutines.flow.first @@ -28,97 +30,155 @@ import kotlinx.coroutines.flow.map import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeoutOrNull -import org.webrtc.PeerConnection +import org.webrtc.PeerConnection.PeerConnectionState private const val PEER_CONNECTION_OBSERVER_TIMEOUT = 5_000L +private const val INTERCEPTOR_TIMEOUT_MS = 5_000L internal class ActiveStateGate( private val coroutineScope: CoroutineScope, private val previousRingingStates: Set, private val strategy: TransitionToRingingStateStrategy = TransitionToRingingStateStrategy.PUBLISHER_CONNECTED, - private val timeoutMs: Long = PEER_CONNECTION_OBSERVER_TIMEOUT, + private val peerConnectionObserverTimeoutMs: Long = PEER_CONNECTION_OBSERVER_TIMEOUT, + private val interceptorTimeoutMs: Long = INTERCEPTOR_TIMEOUT_MS, ) { private val logger by taggedLogger("ActiveStateGate") - private var peerConnectionObserverJob: Job? = null + + private var peerConnectionObserverJob: AtomicReference = AtomicReference() + private var interceptorJob: AtomicReference = AtomicReference() internal fun awaitAndTransition( currentRingingState: RingingState, call: Call, + interceptor: CallJoinInterceptor?, onReady: () -> Unit, ) { + if (currentRingingState is RingingState.Active) { + return + } logger.d { "[awaitAndTransition], ringingState: $currentRingingState" } - when (strategy) { - TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR -> { - onReady() - } - else -> { - val isIncomingOrOutgoing = - previousRingingStates.any { it is RingingState.Incoming || it is RingingState.Outgoing } - - if (isIncomingOrOutgoing && currentRingingState !is RingingState.Active) { - observePeerConnection( - call, - onReady, - strategy, - ) - } else if (!isIncomingOrOutgoing) { - onReady() - } - } + if (strategy == TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR) { + handleLegacyBehaviour(call, onReady, interceptor) + return } + + val isRingingCall = previousRingingStates.any { + it is RingingState.Incoming || it is RingingState.Outgoing + } + + launchGate(call, interceptor, waitForPublisherConnection = isRingingCall, onReady) } - private fun observePeerConnection(call: Call, onReady: () -> Unit, strategy: TransitionToRingingStateStrategy) { - if (peerConnectionObserverJob?.isActive == true) return + private fun handleLegacyBehaviour(call: Call, onReady: () -> Unit, interceptor: CallJoinInterceptor?) { + if (interceptorJob.get()?.isActive == true) return + + if (interceptor == null) { + onReady() + return + } - peerConnectionObserverJob = coroutineScope.launch { - val start = System.currentTimeMillis() + interceptorJob.set( + coroutineScope.launch { + val shouldProceed = invokeInterceptor(call, interceptor) + if (!isActive) return@launch - val result = withTimeoutOrNull(timeoutMs) { - call.session - .filterNotNull() - .flatMapLatest { session -> - - val publisherFlow = session.publisher - .filterNotNull() - .flatMapLatest { it.state } - - when (strategy) { - TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR -> { - emptyFlow() - .map { "none" to it } - } - TransitionToRingingStateStrategy.PUBLISHER_CONNECTED -> { - publisherFlow.filter { it == PeerConnection.PeerConnectionState.CONNECTED } - .map { "publisher" to it } - } - } - } - .first() - } + if (shouldProceed) onReady() + cancelInterceptorJob() + }, + ) + } - val duration = System.currentTimeMillis() - start + private fun launchGate( + call: Call, + interceptor: CallJoinInterceptor?, + waitForPublisherConnection: Boolean, + onReady: () -> Unit, + ) { + if (peerConnectionObserverJob.get()?.isActive == true) return - if (result != null) { - val (source, state) = result - logger.d { - "[observeConnection-$strategy] $source reached $state in ${duration}ms" + peerConnectionObserverJob.set( + coroutineScope.launch { + if (waitForPublisherConnection) { + awaitPeerConnection(call) + if (!isActive) return@launch } - } else { - logger.w { - "[observeConnection-$strategy] Timeout after ${duration}ms" - } - } - if (isActive) { - onReady() + + val shouldProceed = invokeInterceptor(call, interceptor) + if (!isActive) return@launch + + if (shouldProceed) onReady() cleanup() + }, + ) + } + + private suspend fun awaitPeerConnection(call: Call) { + val start = System.currentTimeMillis() + val result = + withTimeoutOrNull(peerConnectionObserverTimeoutMs) { buildConnectionFlow(call).first() } + logConnectionResult(result, System.currentTimeMillis() - start) + } + + private fun buildConnectionFlow(call: Call): Flow = + call.session + .filterNotNull() + .flatMapLatest { session -> + session.publisher + .filterNotNull() + .flatMapLatest { it.state } + .filter { it == PeerConnectionState.CONNECTED } + .map { } } + + private suspend fun invokeInterceptor( + call: Call, + interceptor: CallJoinInterceptor?, + ): Boolean { + val startTime = System.currentTimeMillis() + logger.d { "[invokeInterceptor] start at $startTime" } + if (interceptor == null) return true + return try { + withTimeoutOrNull(interceptorTimeoutMs) { + interceptor.callReadyToJoin(call) + } + logger.d { "[invokeInterceptor] finish at ${(System.currentTimeMillis() - startTime) / 1000}s " } + true + } catch (e: CancellationException) { + throw e + } catch (e: CallJoinInterceptionException) { + val message = "[CallJoinInterceptor] aborted with reason: ${e.reason}" + logger.e(e) { message } + call.leave(reason = message) + clearAllJobs() + false + } catch (e: Exception) { + logger.e(e) { "[CallJoinInterceptor] interceptor threw, proceeding" } + true + } + } + + private fun logConnectionResult(result: Unit?, duration: Long) { + if (result != null) { + logger.d { "[observeConnection] Connected in ${duration}ms" } + } else { + logger.w { "[observeConnection] Timeout after ${duration}ms" } } } fun cleanup() { - peerConnectionObserverJob?.cancel() - peerConnectionObserverJob = null + peerConnectionObserverJob.get()?.cancel() + peerConnectionObserverJob.set(null) + cancelInterceptorJob() + } + + fun clearAllJobs() { + peerConnectionObserverJob.set(null) + interceptorJob.set(null) + } + + fun cancelInterceptorJob() { + interceptorJob.get()?.cancel() + interceptorJob.set(null) } } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt index e1f2e74142..1891bdfb2b 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt @@ -544,6 +544,7 @@ public class Call( ring: Boolean = false, notify: Boolean = false, hintHighScaleLivestreamPublisher: Boolean? = null, + callJoinInterceptor: CallJoinInterceptor? = null, ): Result { logger.d { "[join] #ringing; #track; create: $create, ring: $ring, notify: $notify, createOptions: $createOptions" @@ -567,6 +568,8 @@ public class Call( // Ensure factory is created with the current audioBitrateProfile before joining ensureFactoryMatchesAudioProfile() + this.state.callJoinInterceptor = callJoinInterceptor + // the join flow should retry up to 3 times // if the error is not permanent // and fail immediately on permanent errors @@ -615,10 +618,15 @@ public class Call( members: List, createOptions: CreateCallOptions? = CreateCallOptions(members), video: Boolean = isVideoEnabled(), + callJoinInterceptor: CallJoinInterceptor? = null, ): Result { logger.d { "[joinAndRing] #ringing; #track; members: $members, video: $video" } state.toggleJoinAndRingProgress(true) - return join(ring = false, createOptions = createOptions).flatMap { rtcSession -> + return join( + ring = false, + createOptions = createOptions, + callJoinInterceptor = callJoinInterceptor, + ).flatMap { rtcSession -> logger.d { "[joinAndRing] Joined #ringing; #track; ring: $members" } ring(RingCallRequest(isVideoEnabled(), members)).map { logger.d { "[joinAndRing] Ringed #ringing; #track; ring: $members" } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallJoinInterceptor.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallJoinInterceptor.kt new file mode 100644 index 0000000000..3b87392330 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallJoinInterceptor.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core + +import kotlin.jvm.Throws + +/** + * Controls when a ringing call transitions to [RingingState.Active]. + * + * Implement this to insert custom logic (e.g. waiting for user confirmation) between + * the publisher peer connection becoming ready and the call going active. + * Has no effect on non-ringing joins (livestream, direct join). + */ +public interface CallJoinInterceptor { + + /** + * Called when the SDK is ready to transition to [RingingState.Active]. + * Suspend here to delay the transition; return to allow it to proceed. + * + * Throw [CallJoinInterceptionException] to abort the join — the SDK will leave + * the call cleanly + * + * The SDK enforces a 5-second maximum — the transition proceeds automatically on timeout. + */ + @Throws(CallJoinInterceptionException::class) + public suspend fun callReadyToJoin(call: Call) +} + +/** + * Thrown by a [CallJoinInterceptor] to abort the join. + * + * When raised inside [CallJoinInterceptor.callReadyToJoin], the SDK leaves the call + * cleanly using [reason] for tracing. + */ +public class CallJoinInterceptionException( + public val reason: String, + cause: Throwable? = null, +) : RuntimeException(reason, cause) diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt index c9812c7ad6..3045747b1f 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt @@ -751,6 +751,9 @@ public class CallState( internal var incomingNotificationData = IncomingNotificationData(emptyMap()) private val ringingLogger by taggedLogger("RingingState") + @Volatile + internal var callJoinInterceptor: CallJoinInterceptor? = null + fun handleEvent(event: VideoEvent) { logger.d { "[handleEvent] ${event::class.java.name.split(".").last()}" } @@ -1381,9 +1384,14 @@ public class CallState( ringingLogger.d { "Update: $state" } if (state is RingingState.Active) { - activeStateGate.awaitAndTransition(ringingState.value, call) { + activeStateGate.awaitAndTransition( + ringingState.value, + call, + callJoinInterceptor, + ) { _ringingState.value = state activeStateGate.cleanup() + callJoinInterceptor = null } } else { _ringingState.value = state diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt index 9d8b0de5b3..8e8ddc2a04 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt @@ -18,27 +18,35 @@ package io.getstream.video.android.core import io.getstream.video.android.core.call.RtcSession import io.getstream.video.android.core.call.connection.Publisher -import io.getstream.video.android.core.call.connection.Subscriber +import io.mockk.coVerify import io.mockk.every +import io.mockk.just import io.mockk.mockk +import io.mockk.runs +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.awaitCancellation +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.test.StandardTestDispatcher import kotlinx.coroutines.test.TestScope -import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlinx.coroutines.test.advanceTimeBy import kotlinx.coroutines.test.advanceUntilIdle import kotlinx.coroutines.test.resetMain +import kotlinx.coroutines.test.runCurrent import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.setMain import org.junit.After import org.junit.Before import org.webrtc.PeerConnection +import kotlin.coroutines.cancellation.CancellationException import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse import kotlin.test.assertTrue class ActiveStateGateTest { - private val testDispatcher = UnconfinedTestDispatcher() + private val testDispatcher = StandardTestDispatcher() @Before fun setUp() { @@ -50,287 +58,267 @@ class ActiveStateGateTest { Dispatchers.resetMain() } - // ── TestData now exposes firstRtpPacketArrived ──────────────────────────── - private data class TestData( val call: Call, val publisherState: MutableStateFlow, - val subscriberState: MutableStateFlow, - val firstRtpPacketArrived: MutableStateFlow, ) private fun fakeCall( publisherState: MutableStateFlow = MutableStateFlow(PeerConnection.PeerConnectionState.NEW), - subscriberState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW), - firstRtpPacketArrived: MutableStateFlow = MutableStateFlow(false), ): TestData { val publisher = mockk(relaxed = true) { every { state } returns publisherState } - val subscriber = mockk(relaxed = true) { - every { state } returns subscriberState - } val session = mockk { every { this@mockk.publisher } returns MutableStateFlow(publisher) - every { this@mockk.subscriber } returns MutableStateFlow(subscriber) } val call = mockk { every { this@mockk.session } returns MutableStateFlow(session) + every { leave(any()) } just runs } - return TestData(call, publisherState, subscriberState, firstRtpPacketArrived) + return TestData(call, publisherState) } - // ── 1. Non-ringing previous states ──────────────────────────────────────── - @Test - fun `transitions immediately when no previous incoming or outgoing state`() = - runTest(testDispatcher) { - val sut = ActiveStateGate( - coroutineScope = this, - previousRingingStates = emptySet(), - ) - val (call, _, _, _) = fakeCall() - val transitioned = mutableListOf() - - sut.awaitAndTransition( - currentRingingState = RingingState.Idle, - call = call, - onReady = { transitioned += Unit }, - ) - - assertTrue(transitioned.size == 1) + fun `should ignore when current state already active`() = runTest(testDispatcher) { + val data = fakeCall() + var readyCalled = false + val gate = ActiveStateGate( + coroutineScope = this, + previousRingingStates = emptySet(), + ) + gate.awaitAndTransition( + currentRingingState = RingingState.Active, + call = data.call, + interceptor = null, + ) { + readyCalled = true } - - // ── 2. Already Active ───────────────────────────────────────────────────── + advanceUntilIdle() + assertFalse(readyCalled) + } @Test - fun `does NOT observe peer connection when already in Active state`() = - runTest(testDispatcher) { - val sut = ActiveStateGate( - coroutineScope = this, - previousRingingStates = setOf(RingingState.Incoming(false)), - ) - val (call, _, _, _) = fakeCall() - val transitioned = mutableListOf() - - sut.awaitAndTransition( - currentRingingState = RingingState.Active, - call = call, - onReady = { transitioned += Unit }, - ) - - assertTrue(transitioned.isEmpty()) + fun `should invoke immediately for non ringing call`() = runTest(testDispatcher) { + val data = fakeCall() + var readyCalled = false + val gate = ActiveStateGate( + coroutineScope = this, + previousRingingStates = emptySet(), + ) + + gate.awaitAndTransition( + currentRingingState = RingingState.Idle, + call = data.call, + interceptor = null, + ) { + readyCalled = true } + advanceUntilIdle() + assertTrue(readyCalled) + } @Test - fun `transitions when both peers connect for Outgoing previous state`() = - runTest(testDispatcher) { - val pubState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val subState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val (call, _, _, _) = fakeCall(pubState, subState) - - val outgoingRingingState = RingingState.Outgoing(false) - val sut = ActiveStateGate( - coroutineScope = this, - previousRingingStates = setOf(outgoingRingingState), - timeoutMs = 5_000L, - ) - val transitioned = mutableListOf() - - sut.awaitAndTransition( - currentRingingState = outgoingRingingState, - call = call, - onReady = { transitioned += Unit }, - ) - - pubState.value = PeerConnection.PeerConnectionState.CONNECTED - subState.value = PeerConnection.PeerConnectionState.CONNECTED - assertTrue(transitioned.size == 1) - } + fun `should wait for publisher connection before transition`() = runTest(testDispatcher) { + val data = fakeCall() + var readyCalled = false + val gate = ActiveStateGate( + coroutineScope = this, + previousRingingStates = setOf(RingingState.Incoming(false)), + ) - // ── 4. Timeout ──────────────────────────────────────────────────────────── + gate.awaitAndTransition( + currentRingingState = RingingState.Idle, + call = data.call, + interceptor = null, + ) { readyCalled = true } + + runCurrent() + assertFalse(readyCalled) + data.publisherState.value = PeerConnection.PeerConnectionState.CONNECTED + advanceUntilIdle() + assertTrue(readyCalled) + } @Test - fun `still calls onReady after timeout even if peers never connect`() = - runTest(testDispatcher) { - val (call, _, _, _) = fakeCall() - - val incomingRingingState = RingingState.Incoming(false) - val sut = ActiveStateGate( - coroutineScope = this, - previousRingingStates = setOf(incomingRingingState), - timeoutMs = 100L, - ) - val transitioned = mutableListOf() - - sut.awaitAndTransition( - currentRingingState = incomingRingingState, - call = call, - onReady = { transitioned += Unit }, - ) - - assertTrue(transitioned.isEmpty()) - advanceTimeBy(200L) - assertTrue(transitioned.size == 1) - } + fun `should proceed after peer connection timeout`() = runTest(testDispatcher) { + val data = fakeCall() + var readyCalled = false + val gate = ActiveStateGate( + coroutineScope = this, + previousRingingStates = setOf(RingingState.Incoming()), + peerConnectionObserverTimeoutMs = 100, + ) - // ── 5. Duplicate observer guard ─────────────────────────────────────────── + gate.awaitAndTransition( + currentRingingState = RingingState.Idle, + call = data.call, + interceptor = null, + ) { readyCalled = true } + + advanceTimeBy(101) + advanceUntilIdle() + assertTrue(readyCalled) + } @Test - fun `calling awaitAndTransition twice does not start a second observer`() = - runTest(testDispatcher) { - val pubState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val subState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val (call, _, _, _) = fakeCall(pubState, subState) - - val incomingRingingState = RingingState.Incoming(false) - val sut = ActiveStateGate( - coroutineScope = this, - previousRingingStates = setOf(incomingRingingState), - timeoutMs = 5_000L, - ) - val transitioned = mutableListOf() - val action = { transitioned += Unit } - - sut.awaitAndTransition(incomingRingingState, call, action) - sut.awaitAndTransition(incomingRingingState, call, action) - - pubState.value = PeerConnection.PeerConnectionState.CONNECTED - subState.value = PeerConnection.PeerConnectionState.CONNECTED - - assertTrue(transitioned.size == 1) + fun `should invoke interceptor before transition`() = runTest { + val data = fakeCall() + val events = mutableListOf() + val interceptor = object : CallJoinInterceptor { + override suspend fun callReadyToJoin(call: Call) { + events += "interceptor" + } } - // ── 6. cleanup() ───────────────────────────────────────────────────────── + val gate = ActiveStateGate( + coroutineScope = this, + previousRingingStates = emptySet(), + ) - @Test - fun `cleanup cancels the observer job and onReady is never called`() = - runTest(testDispatcher) { - val (call, pubState, subState, _) = fakeCall() - - val incomingRingingState = RingingState.Incoming(false) - val sut = ActiveStateGate( - coroutineScope = this, - previousRingingStates = setOf(incomingRingingState), - timeoutMs = 5_000L, - ) - val transitioned = mutableListOf() - - sut.awaitAndTransition( - currentRingingState = incomingRingingState, - call = call, - onReady = { transitioned += Unit }, - ) - - sut.cleanup() - - pubState.value = PeerConnection.PeerConnectionState.CONNECTED - subState.value = PeerConnection.PeerConnectionState.CONNECTED - - assertTrue(transitioned.isEmpty()) - } + gate.awaitAndTransition( + currentRingingState = RingingState.Idle, + call = data.call, + interceptor = interceptor, + ) { events += "ready" } + + advanceUntilIdle() + assertEquals( + listOf("interceptor", "ready"), + events, + ) + } @Test - fun `cleanup allows a new observer to be started afterwards`() = - runTest(testDispatcher) { - val pubState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val subState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val (call, _, _, _) = fakeCall(pubState, subState) - - val incomingRingingState = RingingState.Incoming(false) - val sut = ActiveStateGate( - coroutineScope = this, - previousRingingStates = setOf(incomingRingingState), - timeoutMs = 5_000L, - ) - val transitioned = mutableListOf() - - sut.awaitAndTransition(incomingRingingState, call) { transitioned += Unit } - sut.cleanup() - sut.awaitAndTransition(incomingRingingState, call) { transitioned += Unit } - - pubState.value = PeerConnection.PeerConnectionState.CONNECTED - subState.value = PeerConnection.PeerConnectionState.CONNECTED - - assertTrue(transitioned.size == 1) + fun `should proceed when interceptor throws generic exception`() = runTest(testDispatcher) { + val data = fakeCall() + var readyCalled = false + val interceptor = object : CallJoinInterceptor { + override suspend fun callReadyToJoin(call: Call) { + error("boom") + } } - // ── 7. Null session / late-arriving session ─────────────────────────────── + val gate = ActiveStateGate( + coroutineScope = this, + previousRingingStates = emptySet(), + ) + + gate.awaitAndTransition( + currentRingingState = RingingState.Idle, + call = data.call, + interceptor = interceptor, + ) { readyCalled = true } + + advanceUntilIdle() + assertTrue(readyCalled) + } @Test - fun `waits for non-null session before observing peer connections`() = - runTest(testDispatcher) { - val sessionFlow = MutableStateFlow(null) - val pubState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val subState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - - val publisher = mockk { every { state } returns pubState } - val subscriber = mockk(relaxed = true) { every { state } returns subState } - val session = mockk { - every { this@mockk.publisher } returns MutableStateFlow(publisher) - every { this@mockk.subscriber } returns MutableStateFlow(subscriber) + fun `should not proceed when interceptor rejects`() = runTest(testDispatcher) { + val data = fakeCall() + var readyCalled = false + val interceptor = object : CallJoinInterceptor { + override suspend fun callReadyToJoin(call: Call) { + throw CallJoinInterceptionException("blocked") } - val call = mockk { every { this@mockk.session } returns sessionFlow } + } - val incomingRingingState = RingingState.Incoming(false) - val sut = ActiveStateGate( - coroutineScope = this, - previousRingingStates = setOf(incomingRingingState), - timeoutMs = 5_000L, - ) - val transitioned = mutableListOf() + val gate = ActiveStateGate( + coroutineScope = this, + previousRingingStates = emptySet(), + ) - sut.awaitAndTransition(incomingRingingState, call) { transitioned += Unit } + gate.awaitAndTransition( + currentRingingState = RingingState.Idle, + call = data.call, + interceptor = interceptor, + ) { readyCalled = true } - pubState.value = PeerConnection.PeerConnectionState.CONNECTED - subState.value = PeerConnection.PeerConnectionState.CONNECTED - assertTrue(transitioned.isEmpty()) + advanceUntilIdle() + assertFalse(readyCalled) + coVerify { data.call.leave(any()) } + } - sessionFlow.value = session - assertTrue(transitioned.size == 1) + @Test + fun `should proceed when interceptor times out`() = runTest(testDispatcher) { + val data = fakeCall() + var readyCalled = false + val interceptor = object : CallJoinInterceptor { + override suspend fun callReadyToJoin(call: Call) { + delay(10_000) + } } + val interceptorTimeoutMs = 5_000L + val gate = ActiveStateGate( + coroutineScope = this, + interceptorTimeoutMs = interceptorTimeoutMs, + previousRingingStates = emptySet(), + ) + + gate.awaitAndTransition( + currentRingingState = RingingState.Idle, + call = data.call, + interceptor = interceptor, + ) { readyCalled = true } - // ── isActive guard tests ────────────────────────────────────────────────── + advanceTimeBy(interceptorTimeoutMs + 1) + advanceUntilIdle() + assertTrue(readyCalled) + } @Test - fun `onReady is NOT called if cleanup happens while waiting for peer connection`() = - runTest(StandardTestDispatcher()) { - val pubState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val subState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val (call, _, _, _) = fakeCall(pubState, subState) - - val incomingRingingState = RingingState.Incoming(false) - val sut = ActiveStateGate( - coroutineScope = this, - previousRingingStates = setOf(incomingRingingState), - timeoutMs = 5_000L, - ) - val transitioned = mutableListOf() - - sut.awaitAndTransition(incomingRingingState, call) { transitioned += Unit } - - // Peers connect but cleanup cancels before the coroutine resumes - pubState.value = PeerConnection.PeerConnectionState.CONNECTED - subState.value = PeerConnection.PeerConnectionState.CONNECTED - sut.cleanup() // cancels before isActive check runs - - // Now let any pending coroutine work drain — onReady must not fire - advanceUntilIdle() - - assertTrue(transitioned.isEmpty()) + fun `should ignore duplicate gate launches`() = runTest(testDispatcher) { + val data = fakeCall() + var readyInvocationCount = 0 + val gate = ActiveStateGate( + coroutineScope = this, + previousRingingStates = setOf(RingingState.Incoming()), + ) + + repeat(2) { + gate.awaitAndTransition( + currentRingingState = RingingState.Idle, + call = data.call, + interceptor = null, + ) { readyInvocationCount++ } } + data.publisherState.value = PeerConnection.PeerConnectionState.CONNECTED + advanceUntilIdle() + assertEquals(1, readyInvocationCount) + } + + @Test + fun `cleanup should cancel ongoing gate`() = runTest(testDispatcher) { + val data = fakeCall() + var readyCalled = false + val gate = ActiveStateGate( + coroutineScope = this, + previousRingingStates = setOf(RingingState.Incoming()), + ) + + gate.awaitAndTransition( + currentRingingState = RingingState.Idle, + call = data.call, + interceptor = null, + ) { readyCalled = true } + + gate.cleanup() + data.publisherState.value = PeerConnection.PeerConnectionState.CONNECTED + + advanceUntilIdle() + assertFalse(readyCalled) + + gate.awaitAndTransition( + currentRingingState = RingingState.Idle, + call = data.call, + interceptor = null, + ) { readyCalled = true } + + advanceUntilIdle() + assertTrue(readyCalled) + } // ── Strategy tests ──────────────────────────────────────────────────────── @@ -338,48 +326,166 @@ class ActiveStateGateTest { strategy: TransitionToRingingStateStrategy, pubState: MutableStateFlow = MutableStateFlow(PeerConnection.PeerConnectionState.NEW), - subState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW), - firstRtpPacketArrived: MutableStateFlow = MutableStateFlow(false), block: suspend TestScope.( call: Call, sut: ActiveStateGate, transitioned: MutableList, pubState: MutableStateFlow, - subState: MutableStateFlow, - firstRtpPacketArrived: MutableStateFlow, ) -> Unit, ) = runTest(testDispatcher) { - val testData = fakeCall(pubState, subState, firstRtpPacketArrived) + val testData = fakeCall(pubState) val incoming = RingingState.Incoming(false) val sut = ActiveStateGate( coroutineScope = this, previousRingingStates = setOf(incoming), strategy = strategy, - timeoutMs = 5_000L, + peerConnectionObserverTimeoutMs = 5_000L, ) val transitioned = mutableListOf() - sut.awaitAndTransition(incoming, testData.call) { transitioned += Unit } - block(testData.call, sut, transitioned, pubState, subState, firstRtpPacketArrived) + sut.awaitAndTransition(incoming, testData.call, null) { + transitioned += Unit + } + block(testData.call, sut, transitioned, pubState) + } + + @Test + fun `legacy behaviour should invoke immediately when interceptor is null`() = runTest { + val data = fakeCall() + var readyCalled = false + val gate = ActiveStateGate( + coroutineScope = this, + previousRingingStates = emptySet(), + strategy = TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR, + ) + + gate.awaitAndTransition( + currentRingingState = RingingState.Idle, + call = data.call, + interceptor = null, + ) { readyCalled = true } + + runCurrent() + assertTrue(readyCalled) + } + + @Test + fun `legacy behaviour should invoke interceptor before transition`() = runTest { + val data = fakeCall() + val events = mutableListOf() + val interceptor = object : CallJoinInterceptor { + override suspend fun callReadyToJoin(call: Call) { + events += "interceptor" + } + } + + val gate = ActiveStateGate( + coroutineScope = this, + previousRingingStates = emptySet(), + strategy = TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR, + ) + gate.awaitAndTransition( + currentRingingState = RingingState.Idle, + call = data.call, + interceptor = interceptor, + ) { events += "ready" } + + advanceUntilIdle() + assertEquals( + listOf("interceptor", "ready"), + events, + ) } @Test - fun `LEGACY_BEHAVIOUR – transition still fires without timeout fallback`() = - runStrategyTest( - TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR, - ) { _, _, transitioned, _, _, _ -> - assertTrue(transitioned.size == 1) + fun `legacy behaviour should not proceed when interceptor rejects`() = runTest { + val data = fakeCall() + var readyCalled = false + val interceptor = object : CallJoinInterceptor { + override suspend fun callReadyToJoin(call: Call) { + throw CallJoinInterceptionException("blocked") + } + } + + val gate = ActiveStateGate( + coroutineScope = this, + previousRingingStates = emptySet(), + strategy = TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR, + ) + + gate.awaitAndTransition( + currentRingingState = RingingState.Idle, + call = data.call, + interceptor = interceptor, + ) { readyCalled = true } + + advanceUntilIdle() + assertFalse(readyCalled) + coVerify { data.call.leave(any()) } + } + + @Test + fun `legacy behaviour should ignore duplicate interceptor launches`() = runTest { + val data = fakeCall() + var interceptorInvocationCount = 0 + val interceptor = object : CallJoinInterceptor { + override suspend fun callReadyToJoin(call: Call) { + interceptorInvocationCount++ + delay(1000) + } + } + + val gate = ActiveStateGate( + coroutineScope = this, + previousRingingStates = emptySet(), + strategy = TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR, + ) + + repeat(2) { + gate.awaitAndTransition( + currentRingingState = RingingState.Idle, + call = data.call, + interceptor = interceptor, + ) {} } + runCurrent() + assertEquals(1, interceptorInvocationCount) + } + @Test - fun `PUBLISHER_CONNECTED – subscriber alone is not enough`() = - runStrategyTest( - TransitionToRingingStateStrategy.PUBLISHER_CONNECTED, - ) { _, _, transitioned, pubState, subState, _ -> - subState.value = PeerConnection.PeerConnectionState.CONNECTED - assertTrue(transitioned.isEmpty()) - - pubState.value = PeerConnection.PeerConnectionState.CONNECTED - assertTrue(transitioned.size == 1) + fun `cleanup should cancel gate during interceptor execution`() = runTest { + val data = fakeCall() + var readyCalled = false + val interceptorStarted = CompletableDeferred() + val interceptorCancelled = CompletableDeferred() + + val interceptor = object : CallJoinInterceptor { + override suspend fun callReadyToJoin(call: Call) { + interceptorStarted.complete(Unit) + try { + awaitCancellation() + } catch (e: CancellationException) { + interceptorCancelled.complete(Unit) + throw e + } + } } + + val gate = ActiveStateGate( + coroutineScope = this, + previousRingingStates = emptySet(), + ) + + gate.awaitAndTransition( + currentRingingState = RingingState.Idle, + call = data.call, + interceptor = interceptor, + ) { readyCalled = true } + + interceptorStarted.await() + gate.cleanup() + interceptorCancelled.await() + runCurrent() + assertFalse(readyCalled) + } } diff --git a/stream-video-android-ui-core/api/stream-video-android-ui-core.api b/stream-video-android-ui-core/api/stream-video-android-ui-core.api index 53ac3478e4..35f2bde459 100644 --- a/stream-video-android-ui-core/api/stream-video-android-ui-core.api +++ b/stream-video-android-ui-core/api/stream-video-android-ui-core.api @@ -92,6 +92,7 @@ public abstract class io/getstream/video/android/ui/common/StreamCallActivity : public final fun enterPictureInPicture ()V public fun get (Lio/getstream/video/android/core/Call;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V protected fun getCallHandlerDelegate ()Lio/getstream/video/android/ui/common/IncomingCallHandlerDelegate; + protected fun getCallJoinInterceptor ()Lio/getstream/video/android/core/CallJoinInterceptor; protected fun getCallTransitionTime ()J protected final fun getConfig ()Lio/getstream/video/android/ui/common/StreamCallActivityConfiguration; public fun getConfiguration ()Lio/getstream/video/android/ui/common/StreamCallActivityConfiguration; diff --git a/stream-video-android-ui-core/src/main/kotlin/io/getstream/video/android/ui/common/StreamCallActivity.kt b/stream-video-android-ui-core/src/main/kotlin/io/getstream/video/android/ui/common/StreamCallActivity.kt index 2fbbe89b52..9d7c3f15e4 100644 --- a/stream-video-android-ui-core/src/main/kotlin/io/getstream/video/android/ui/common/StreamCallActivity.kt +++ b/stream-video-android-ui-core/src/main/kotlin/io/getstream/video/android/ui/common/StreamCallActivity.kt @@ -46,6 +46,7 @@ import io.getstream.result.flatMap import io.getstream.result.onErrorSuspend import io.getstream.result.onSuccessSuspend import io.getstream.video.android.core.Call +import io.getstream.video.android.core.CallJoinInterceptor import io.getstream.video.android.core.DeviceStatus import io.getstream.video.android.core.RealtimeConnection import io.getstream.video.android.core.StreamVideo @@ -342,6 +343,8 @@ public abstract class StreamCallActivity : ComponentActivity(), ActivityCallOper } } + protected open val callJoinInterceptor: CallJoinInterceptor? = null + // Platform restriction public override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) @@ -966,9 +969,12 @@ public abstract class StreamCallActivity : ComponentActivity(), ActivityCallOper val joinAndRing = intent.getBooleanExtra(EXTRA_JOIN_AND_RING, false) if (joinAndRing) { logger.d { "[joinAndRing] Join and ring call, ${call.cid}" } - availableCall.joinAndRing(call.state.members.value.map { it.user.id }) + availableCall.joinAndRing( + call.state.members.value.map { it.user.id }, + callJoinInterceptor = callJoinInterceptor, + ) } else { - availableCall.join() + availableCall.join(callJoinInterceptor = callJoinInterceptor) } } } @@ -1387,7 +1393,7 @@ public abstract class StreamCallActivity : ComponentActivity(), ActivityCallOper private suspend fun Call.acceptThenJoin() = withContext(Dispatchers.IO) { - accept().flatMap { join() } + accept().flatMap { join(callJoinInterceptor = callJoinInterceptor) } } public fun safeFinish() {