Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadFactory
import java.util.concurrent.ThreadPoolExecutor
Expand Down Expand Up @@ -43,6 +45,8 @@ object OneSignalDispatchers {
"$BASE_THREAD_NAME-IO" // Thread name prefix for I/O operations
private const val DEFAULT_THREAD_NAME_PREFIX =
"$BASE_THREAD_NAME-Default" // Thread name prefix for CPU operations
private const val SERIAL_IO_THREAD_NAME =
"$BASE_THREAD_NAME-SerialIO" // Single, named thread for order-sensitive work

private class OptimizedThreadFactory(
private val namePrefix: String,
Expand Down Expand Up @@ -80,6 +84,21 @@ object OneSignalDispatchers {
}
}

/** Single-thread executor for order-sensitive lifecycle work (focus / unfocus handlers). */
private val serialIOExecutor: ExecutorService by lazy {
try {
Executors.newSingleThreadExecutor(
OptimizedThreadFactory(
namePrefix = SERIAL_IO_THREAD_NAME,
priority = Thread.NORM_PRIORITY - 1,
),
)
} catch (e: Exception) {
Logging.error("OneSignalDispatchers: Failed to create SerialIO executor: ${e.message}")
throw e
}
}

private val defaultExecutor: ThreadPoolExecutor by lazy {
try {
ThreadPoolExecutor(
Expand Down Expand Up @@ -117,6 +136,17 @@ object OneSignalDispatchers {
}
}

val SerialIO: CoroutineDispatcher by lazy {
try {
serialIOExecutor.asCoroutineDispatcher()
} catch (e: Exception) {
// Fall back to a limitedParallelism(1) view of Dispatchers.IO so submissions stay serialized.
Logging.error("OneSignalDispatchers: Using fallback serialized Dispatchers.IO: ${e.message}")
@Suppress("OPT_IN_USAGE")
Dispatchers.IO.limitedParallelism(1)
}
}

private val IOScope: CoroutineScope by lazy {
CoroutineScope(SupervisorJob() + IO)
}
Expand All @@ -125,6 +155,10 @@ object OneSignalDispatchers {
CoroutineScope(SupervisorJob() + Default)
}

private val SerialIOScope: CoroutineScope by lazy {
CoroutineScope(SupervisorJob() + SerialIO)
}

fun launchOnIO(block: suspend () -> Unit): Job {
return IOScope.launch { block() }
}
Expand All @@ -133,57 +167,66 @@ object OneSignalDispatchers {
return DefaultScope.launch { block() }
}

/** Launches [block] on the single-thread serial IO dispatcher (FIFO across all callers). */
fun launchOnSerialIO(block: suspend () -> Unit): Job {
return SerialIOScope.launch { block() }
}

internal fun getPerformanceMetrics(): String {
return try {
val serialQueueSize =
(serialIOExecutor as? ThreadPoolExecutor)?.queue?.size?.toString() ?: "n/a"
val serialCompleted =
(serialIOExecutor as? ThreadPoolExecutor)?.completedTaskCount ?: 0L
"""
OneSignalDispatchers Performance Metrics:
- IO Pool: ${ioExecutor.activeCount}/${ioExecutor.corePoolSize} active/core threads
- IO Queue: ${ioExecutor.queue.size} pending tasks
- Default Pool: ${defaultExecutor.activeCount}/${defaultExecutor.corePoolSize} active/core threads
- Default Queue: ${defaultExecutor.queue.size} pending tasks
- Total completed tasks: ${ioExecutor.completedTaskCount + defaultExecutor.completedTaskCount}
- Memory usage: ~${(ioExecutor.activeCount + defaultExecutor.activeCount) * 1024}KB (thread stacks, ~1MB each)
- SerialIO Queue: $serialQueueSize pending tasks
- Total completed tasks: ${ioExecutor.completedTaskCount + defaultExecutor.completedTaskCount + serialCompleted}
- Memory usage: ~${(ioExecutor.activeCount + defaultExecutor.activeCount + 1) * 1024}KB (thread stacks, ~1MB each)
""".trimIndent()
} catch (e: Exception) {
"OneSignalDispatchers not initialized or using fallback dispatchers ${e.message}"
}
}

internal fun getStatus(): String {
val ioExecutorStatus =
try {
if (ioExecutor.isShutdown) "Shutdown" else "Active"
} catch (e: Exception) {
"ioExecutor Not initialized ${e.message ?: "Unknown error"}"
}

val defaultExecutorStatus =
try {
if (defaultExecutor.isShutdown) "Shutdown" else "Active"
} catch (e: Exception) {
"defaultExecutor Not initialized ${e.message ?: "Unknown error"}"
}

val ioScopeStatus =
try {
if (IOScope.isActive) "Active" else "Cancelled"
} catch (e: Exception) {
"IOScope Not initialized ${e.message ?: "Unknown error"}"
}

val defaultScopeStatus =
try {
if (DefaultScope.isActive) "Active" else "Cancelled"
} catch (e: Exception) {
"DefaultScope Not initialized ${e.message ?: "Unknown error"}"
}

return """
OneSignalDispatchers Status:
- IO Executor: $ioExecutorStatus
- Default Executor: $defaultExecutorStatus
- IO Scope: $ioScopeStatus
- Default Scope: $defaultScopeStatus
- IO Executor: ${executorStatus("ioExecutor") { ioExecutor.isShutdown }}
- Default Executor: ${executorStatus("defaultExecutor") { defaultExecutor.isShutdown }}
- SerialIO Executor: ${executorStatus("serialIOExecutor") { serialIOExecutor.isShutdown }}
- IO Scope: ${scopeStatus("IOScope") { IOScope.isActive }}
- Default Scope: ${scopeStatus("DefaultScope") { DefaultScope.isActive }}
- SerialIO Scope: ${scopeStatus("SerialIOScope") { SerialIOScope.isActive }}
""".trimIndent()
}

// internal so tests can exercise the failure branch (when `isShutdown()` itself throws,
// which happens when the lazy initializer threw and re-throws on every access).
internal fun executorStatus(
name: String,
isShutdown: () -> Boolean,
): String =
try {
if (isShutdown()) "Shutdown" else "Active"
} catch (e: Exception) {
"$name $NOT_INITIALIZED ${e.message ?: UNKNOWN_ERROR}"
}

internal fun scopeStatus(
name: String,
isActive: () -> Boolean,
): String =
try {
if (isActive()) "Active" else "Cancelled"
} catch (e: Exception) {
"$name $NOT_INITIALIZED ${e.message ?: UNKNOWN_ERROR}"
}

private const val NOT_INITIALIZED = "Not initialized"
private const val UNKNOWN_ERROR = "Unknown error"
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,38 @@ fun suspendifyOnDefault(block: suspend () -> Unit) {
suspendifyWithCompletion(useIO = false, block = block, onComplete = null)
}

/**
* Runs [block] on the single-thread serial IO dispatcher. Tasks from any thread execute
* one-at-a-time in submission order — the entry point for lifecycle handlers that need to
* preserve event ordering. Always routes off-main regardless of [ThreadingMode.useBackgroundThreading].
*
* Capture time-sensitive state (timestamps, "current" snapshots) on the caller's thread
* before invoking — the block itself may run later under load.
*/
fun suspendifyOnSerialIO(block: suspend () -> Unit) {
OneSignalDispatchers.launchOnSerialIO {
try {
block()
} catch (e: Exception) {
Logging.error("Exception in suspendifyOnSerialIO", e)
}
}
}

/**
* FF-gated rollout helper for lifecycle offload work. When [ThreadingMode.useBackgroundThreading]
* is on, dispatches [block] to the serial IO thread; when off, runs it inline so the control
* cohort retains the original behavior. The block is non-suspending so the FF-off branch doesn't
* need a [kotlinx.coroutines.runBlocking].
*/
fun runOnSerialIOIfBackgroundThreading(block: () -> Unit) {
if (ThreadingMode.useBackgroundThreading) {
suspendifyOnSerialIO { block() }
} else {
block()
}
}

/**
* Modern utility for executing suspending code with completion callback.
* Uses OneSignal's centralized thread management for better resource control.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import android.content.ComponentName
import android.content.Context
import android.content.pm.PackageManager
import androidx.core.content.ContextCompat
import com.onesignal.common.threading.runOnSerialIOIfBackgroundThreading
import com.onesignal.core.internal.application.IApplicationLifecycleHandler
import com.onesignal.core.internal.application.IApplicationService
import com.onesignal.core.internal.background.IBackgroundManager
Expand Down Expand Up @@ -70,12 +71,15 @@ internal class BackgroundManager(
_applicationService.addApplicationLifecycleHandler(this)
}

// JobScheduler.cancel/schedule are synchronous Binder calls; on some devices they block the
// main thread for seconds (SDK-4505). Offload to SerialIO so a rapid unfocus -> focus burst
// still runs cancel-then-schedule in submission order. FF gates the rollout.
override fun onFocus(firedOnSubscribe: Boolean) {
cancelSyncTask()
runOnSerialIOIfBackgroundThreading { cancelSyncTask() }
}

override fun onUnfocused() {
scheduleBackground()
runOnSerialIOIfBackgroundThreading { scheduleBackground() }
}

private fun scheduleBackground() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.onesignal.common.modeling.ISingletonModelStoreChangeHandler
import com.onesignal.common.modeling.ModelChangeTags
import com.onesignal.common.modeling.ModelChangedArgs
import com.onesignal.common.threading.OneSignalDispatchers
import com.onesignal.common.threading.runOnSerialIOIfBackgroundThreading
import com.onesignal.core.internal.application.IApplicationLifecycleHandler
import com.onesignal.core.internal.application.IApplicationService
import com.onesignal.core.internal.backend.IFeatureFlagsBackendService
Expand Down Expand Up @@ -69,15 +70,22 @@ internal class FeatureFlagsRefreshService(
// Foreground-at-subscribe is handled by [IApplicationService.addApplicationLifecycleHandler] (fires onFocus).
}

// restartForegroundPolling calls launchOnIO; on cold start that can be the first
// OneSignalDispatchers consumer and pay the lazy-chain init cost on the main thread
// (SDK-4506). SerialIO also preserves order with the matching onUnfocused cancel.
override fun onFocus(firedOnSubscribe: Boolean) {
restartForegroundPolling()
runOnSerialIOIfBackgroundThreading { restartForegroundPolling() }
}

override fun onUnfocused() {
synchronized(this) {
pollJob?.cancel()
pollJob = null
pollingAppId = null
runOnSerialIOIfBackgroundThreading {
// Qualify `this` so we lock on the service instance (same monitor
// restartForegroundPolling acquires), not on the no-receiver lambda.
synchronized(this@FeatureFlagsRefreshService) {
pollJob?.cancel()
pollJob = null
pollingAppId = null
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.onesignal.session.internal.session.impl

import com.onesignal.common.events.EventProducer
import com.onesignal.common.threading.runOnSerialIOIfBackgroundThreading
import com.onesignal.core.internal.application.IApplicationLifecycleHandler
import com.onesignal.core.internal.application.IApplicationService
import com.onesignal.core.internal.background.IBackgroundService
Expand Down Expand Up @@ -103,6 +104,18 @@ internal class SessionService(
* the `onSessionStarted()` callback here, so fire it when they themselves subscribe.
*/
override fun onFocus(firedOnSubscribe: Boolean) {
// Capture focus time on the caller's thread so session timestamps reflect lifecycle
// arrival, not dispatcher latency (SDK-4506).
val focusTimeMs = _time.currentTimeMillis
runOnSerialIOIfBackgroundThreading {
handleOnFocus(firedOnSubscribe, focusTimeMs)
}
}

private fun handleOnFocus(
firedOnSubscribe: Boolean,
focusTimeMs: Long,
) {
Logging.log(LogLevel.DEBUG, "SessionService.onFocus() - fired from start: $firedOnSubscribe")

val session = this.session
Expand All @@ -121,27 +134,35 @@ internal class SessionService(
// As the old session was made inactive, we need to create a new session
shouldFireOnSubscribe = firedOnSubscribe
session.sessionId = UUID.randomUUID().toString()
session.startTime = _time.currentTimeMillis
session.startTime = focusTimeMs
session.focusTime = session.startTime
session.isValid = true
Logging.debug("SessionService: New session started at ${session.startTime}")
sessionLifeCycleNotifier.fire { it.onSessionStarted() }
} else {
// existing session: just remember the focus time so we can calculate the active time
// when onUnfocused is called.
session.focusTime = _time.currentTimeMillis
session.focusTime = focusTimeMs
sessionLifeCycleNotifier.fire { it.onSessionActive() }
}
}

override fun onUnfocused() {
// Capture on the caller's thread so activeDuration is unaffected by dispatcher latency.
val unfocusTimeMs = _time.currentTimeMillis
runOnSerialIOIfBackgroundThreading {
handleOnUnfocused(unfocusTimeMs)
}
}

private fun handleOnUnfocused(unfocusTimeMs: Long) {
val session = this.session
if (session == null) {
Logging.warn("SessionService.onUnfocused called before bootstrap; ignoring.")
return
}
// capture the amount of time the app was focused
val dt = _time.currentTimeMillis - session.focusTime
val dt = unfocusTimeMs - session.focusTime
session.activeDuration += dt
Logging.log(LogLevel.DEBUG, "SessionService.onUnfocused adding time $dt for total: ${session.activeDuration}")
}
Expand Down
Loading
Loading