Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,15 @@ internal class OperationRepo(
if (externalId != null) {
_jwtTokenStore.invalidateJwt(externalId)
Logging.warn("Operation execution failed with 401 Unauthorized, JWT invalidated for user: $externalId. Operations re-queued.")
// Unblock any enqueueAndWait callers so loginSuspend doesn't hang.
ops.forEach { it.waiter?.wake(false) }
// Re-queue with waiter = null: the operation is preserved for retry
// (once a new JWT is provided via updateUserJwt), but the original
// waiter is detached since it was already woken above.
synchronized(queue) {
ops.reversed().forEach { queue.add(0, it) }
ops.reversed().forEach {
queue.add(0, OperationQueueItem(it.operation, waiter = null, bucket = it.bucket, retries = it.retries))
}
}
dispatchJwtInvalidatedToApp(externalId)
} else {
Expand Down Expand Up @@ -331,9 +338,15 @@ internal class OperationRepo(
Logging.error("Operation execution failed with eventual retry, pausing the operation repo: $operations")
// keep the failed operation and pause the operation repo from executing
paused = true
// add back all operations to the front of the queue to be re-executed.
// Unblock any enqueueAndWait callers so loginSuspend doesn't hang.
ops.forEach { it.waiter?.wake(false) }
// Re-queue with waiter = null: the operation is preserved for retry
// on next cold start, but the original waiter is detached since it
// was already woken above.
synchronized(queue) {
ops.reversed().forEach { queue.add(0, it) }
ops.reversed().forEach {
queue.add(0, OperationQueueItem(it.operation, waiter = null, bucket = it.bucket, retries = it.retries))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,14 +384,20 @@ internal class OneSignalImp(

if (isBackgroundThreadingEnabled) {
waitForInit(operationName = "login")
suspendifyOnIO { loginHelper.login(externalId, jwtBearerToken) }
} else {
if (!isInitialized) {
throw IllegalStateException("Must call 'initWithContext' before 'login'")
}
}

val context = loginHelper.switchUser(externalId, jwtBearerToken) ?: return

if (isBackgroundThreadingEnabled) {
suspendifyOnIO { loginHelper.enqueueLogin(context) }
} else {
Thread {
runBlocking(runtimeIoDispatcher) {
loginHelper.login(externalId, jwtBearerToken)
loginHelper.enqueueLogin(context)
}
}.start()
}
Expand Down Expand Up @@ -695,7 +701,8 @@ internal class OneSignalImp(
throw IllegalStateException("'initWithContext failed' before 'login'")
}

loginHelper.login(externalId, jwtBearerToken)
val context = loginHelper.switchUser(externalId, jwtBearerToken) ?: return@withContext
loginHelper.enqueueLogin(context)
}

override suspend fun logoutSuspend() =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,30 @@ class LoginHelper(
private val jwtTokenStore: JwtTokenStore,
private val lock: Any,
) {
suspend fun login(
internal data class LoginEnqueueContext(
val appId: String,
val newIdentityOneSignalId: String,
val externalId: String,
val existingOneSignalId: String?,
)

/**
* Synchronously switches local user models under the login/logout lock.
* Returns context needed for [enqueueLogin], or null if the user was
* already logged in with [externalId] (no switch needed).
*/
internal fun switchUser(
externalId: String,
jwtBearerToken: String? = null,
) {
var currentIdentityExternalId: String? = null
var currentIdentityOneSignalId: String? = null
var newIdentityOneSignalId: String = ""

): LoginEnqueueContext? {
synchronized(lock) {
currentIdentityExternalId = identityModelStore.model.externalId
currentIdentityOneSignalId = identityModelStore.model.onesignalId
val currentExternalId = identityModelStore.model.externalId
val currentOneSignalId = identityModelStore.model.onesignalId

if (currentIdentityExternalId == externalId) {
if (currentExternalId == externalId) {
jwtTokenStore.putJwt(externalId, jwtBearerToken)
operationRepo.forceExecuteOperations()
return
return null
}

jwtTokenStore.putJwt(externalId, jwtBearerToken)
Expand All @@ -39,23 +47,30 @@ class LoginHelper(
identityModel.externalId = externalId
}

newIdentityOneSignalId = identityModelStore.model.onesignalId
}
val newOneSignalId = identityModelStore.model.onesignalId

val existingOneSignalId =
if (configModel.useIdentityVerification == true) {
null
} else {
if (currentIdentityExternalId == null) currentIdentityOneSignalId else null
}
val existingOneSignalId =
if (configModel.useIdentityVerification == true) {
null
} else {
if (currentExternalId == null) currentOneSignalId else null
}

return LoginEnqueueContext(configModel.appId, newOneSignalId, externalId, existingOneSignalId)
}
}

/**
* Enqueues the [LoginUserOperation] and suspends until it completes.
*/
internal suspend fun enqueueLogin(context: LoginEnqueueContext) {
val result =
operationRepo.enqueueAndWait(
LoginUserOperation(
configModel.appId,
newIdentityOneSignalId,
externalId,
existingOneSignalId,
context.appId,
context.newIdentityOneSignalId,
context.externalId,
context.existingOneSignalId,
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -954,8 +954,9 @@ class OperationRepoTests : FunSpec({
operationRepo.start()
val response = operationRepo.enqueueAndWait(operation)

// Then
response shouldBe true
// Then – waiter is woken with false immediately on FAIL_UNAUTHORIZED
// (operation is re-queued with waiter=null for retry when a new JWT is provided)
response shouldBe false
verify { jwtTokenStore.invalidateJwt("test-user") }
handlerCalledWith shouldBe "test-user"
}
Expand Down Expand Up @@ -1011,8 +1012,11 @@ class OperationRepoTests : FunSpec({
operationRepo.start()
val response = operationRepo.enqueueAndWait(operation)

response shouldBe true
// Waiter is woken with false immediately; operation re-queued with waiter=null
response shouldBe false
verify { jwtTokenStore.invalidateJwt("test-user") }
// The re-queued op (waiter=null) retries asynchronously; wait for it to complete
delay(3000)
coVerify(exactly = 2) { executor.execute(any()) }
}

Expand Down
Loading
Loading