Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadFactory
import java.util.concurrent.ThreadPoolExecutor
Expand Down Expand Up @@ -43,6 +45,8 @@ object OneSignalDispatchers {
"$BASE_THREAD_NAME-IO" // Thread name prefix for I/O operations
private const val DEFAULT_THREAD_NAME_PREFIX =
"$BASE_THREAD_NAME-Default" // Thread name prefix for CPU operations
private const val SERIAL_IO_THREAD_NAME =
"$BASE_THREAD_NAME-SerialIO" // Single, named thread for order-sensitive work

private class OptimizedThreadFactory(
private val namePrefix: String,
Expand Down Expand Up @@ -80,6 +84,21 @@ object OneSignalDispatchers {
}
}

/** Single-thread executor for order-sensitive lifecycle work (focus / unfocus handlers). */
private val serialIOExecutor: ExecutorService by lazy {
try {
Executors.newSingleThreadExecutor(
OptimizedThreadFactory(
namePrefix = SERIAL_IO_THREAD_NAME,
priority = Thread.NORM_PRIORITY - 1,
),
)
} catch (e: Exception) {
Logging.error("OneSignalDispatchers: Failed to create SerialIO executor: ${e.message}")
throw e
}
}

private val defaultExecutor: ThreadPoolExecutor by lazy {
try {
ThreadPoolExecutor(
Expand Down Expand Up @@ -117,6 +136,17 @@ object OneSignalDispatchers {
}
}

val SerialIO: CoroutineDispatcher by lazy {
try {
serialIOExecutor.asCoroutineDispatcher()
} catch (e: Exception) {
// Fall back to a limitedParallelism(1) view of Dispatchers.IO so submissions stay serialized.
Logging.error("OneSignalDispatchers: Using fallback serialized Dispatchers.IO: ${e.message}")
@Suppress("OPT_IN_USAGE")
Dispatchers.IO.limitedParallelism(1)
}
}

private val IOScope: CoroutineScope by lazy {
CoroutineScope(SupervisorJob() + IO)
}
Expand All @@ -125,6 +155,10 @@ object OneSignalDispatchers {
CoroutineScope(SupervisorJob() + Default)
}

private val SerialIOScope: CoroutineScope by lazy {
CoroutineScope(SupervisorJob() + SerialIO)
}

fun launchOnIO(block: suspend () -> Unit): Job {
return IOScope.launch { block() }
}
Expand All @@ -133,57 +167,66 @@ object OneSignalDispatchers {
return DefaultScope.launch { block() }
}

/** Launches [block] on the single-thread serial IO dispatcher (FIFO across all callers). */
fun launchOnSerialIO(block: suspend () -> Unit): Job {
return SerialIOScope.launch { block() }
}

internal fun getPerformanceMetrics(): String {
return try {
val serialQueueSize =
(serialIOExecutor as? ThreadPoolExecutor)?.queue?.size?.toString() ?: "n/a"
val serialCompleted =
(serialIOExecutor as? ThreadPoolExecutor)?.completedTaskCount ?: 0L
"""
OneSignalDispatchers Performance Metrics:
- IO Pool: ${ioExecutor.activeCount}/${ioExecutor.corePoolSize} active/core threads
- IO Queue: ${ioExecutor.queue.size} pending tasks
- Default Pool: ${defaultExecutor.activeCount}/${defaultExecutor.corePoolSize} active/core threads
- Default Queue: ${defaultExecutor.queue.size} pending tasks
- Total completed tasks: ${ioExecutor.completedTaskCount + defaultExecutor.completedTaskCount}
- Memory usage: ~${(ioExecutor.activeCount + defaultExecutor.activeCount) * 1024}KB (thread stacks, ~1MB each)
- SerialIO Queue: $serialQueueSize pending tasks
- Total completed tasks: ${ioExecutor.completedTaskCount + defaultExecutor.completedTaskCount + serialCompleted}
- Memory usage: ~${(ioExecutor.activeCount + defaultExecutor.activeCount + 1) * 1024}KB (thread stacks, ~1MB each)
""".trimIndent()
} catch (e: Exception) {
"OneSignalDispatchers not initialized or using fallback dispatchers ${e.message}"
}
}

internal fun getStatus(): String {
val ioExecutorStatus =
try {
if (ioExecutor.isShutdown) "Shutdown" else "Active"
} catch (e: Exception) {
"ioExecutor Not initialized ${e.message ?: "Unknown error"}"
}

val defaultExecutorStatus =
try {
if (defaultExecutor.isShutdown) "Shutdown" else "Active"
} catch (e: Exception) {
"defaultExecutor Not initialized ${e.message ?: "Unknown error"}"
}

val ioScopeStatus =
try {
if (IOScope.isActive) "Active" else "Cancelled"
} catch (e: Exception) {
"IOScope Not initialized ${e.message ?: "Unknown error"}"
}

val defaultScopeStatus =
try {
if (DefaultScope.isActive) "Active" else "Cancelled"
} catch (e: Exception) {
"DefaultScope Not initialized ${e.message ?: "Unknown error"}"
}

return """
OneSignalDispatchers Status:
- IO Executor: $ioExecutorStatus
- Default Executor: $defaultExecutorStatus
- IO Scope: $ioScopeStatus
- Default Scope: $defaultScopeStatus
- IO Executor: ${executorStatus("ioExecutor") { ioExecutor.isShutdown }}
- Default Executor: ${executorStatus("defaultExecutor") { defaultExecutor.isShutdown }}
- SerialIO Executor: ${executorStatus("serialIOExecutor") { serialIOExecutor.isShutdown }}
- IO Scope: ${scopeStatus("IOScope") { IOScope.isActive }}
- Default Scope: ${scopeStatus("DefaultScope") { DefaultScope.isActive }}
- SerialIO Scope: ${scopeStatus("SerialIOScope") { SerialIOScope.isActive }}
""".trimIndent()
}

// internal so tests can exercise the failure branch (when `isShutdown()` itself throws,
// which happens when the lazy initializer threw and re-throws on every access).
internal fun executorStatus(
name: String,
isShutdown: () -> Boolean,
): String =
try {
if (isShutdown()) "Shutdown" else "Active"
} catch (e: Exception) {
"$name $NOT_INITIALIZED ${e.message ?: UNKNOWN_ERROR}"
}

internal fun scopeStatus(
name: String,
isActive: () -> Boolean,
): String =
try {
if (isActive()) "Active" else "Cancelled"
} catch (e: Exception) {
"$name $NOT_INITIALIZED ${e.message ?: UNKNOWN_ERROR}"
}

private const val NOT_INITIALIZED = "Not initialized"
private const val UNKNOWN_ERROR = "Unknown error"
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,38 @@ fun suspendifyOnDefault(block: suspend () -> Unit) {
suspendifyWithCompletion(useIO = false, block = block, onComplete = null)
}

/**
* Runs [block] on the single-thread serial IO dispatcher. Tasks from any thread execute
* one-at-a-time in submission order — the entry point for lifecycle handlers that need to
* preserve event ordering. Always routes off-main regardless of [ThreadingMode.useBackgroundThreading].
*
* Capture time-sensitive state (timestamps, "current" snapshots) on the caller's thread
* before invoking — the block itself may run later under load.
*/
fun suspendifyOnSerialIO(block: suspend () -> Unit) {
OneSignalDispatchers.launchOnSerialIO {
try {
block()
} catch (e: Exception) {
Logging.error("Exception in suspendifyOnSerialIO", e)
}
}
}

/**
* FF-gated rollout helper for lifecycle offload work. When [ThreadingMode.useBackgroundThreading]
* is on, dispatches [block] to the serial IO thread; when off, runs it inline so the control
* cohort retains the original behavior. The block is non-suspending so the FF-off branch doesn't
* need a [kotlinx.coroutines.runBlocking].
*/
fun runOnSerialIOIfBackgroundThreading(block: () -> Unit) {
if (ThreadingMode.useBackgroundThreading) {
suspendifyOnSerialIO { block() }
} else {
block()
}
}

/**
* Modern utility for executing suspending code with completion callback.
* Uses OneSignal's centralized thread management for better resource control.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,118 @@ class OneSignalDispatchersTests : FunSpec({
status shouldContain "OneSignalDispatchers Status:"
status shouldContain "IO Executor: Active"
status shouldContain "Default Executor: Active"
status shouldContain "SerialIO Executor: Active"
status shouldContain "IO Scope: Active"
status shouldContain "Default Scope: Active"
status shouldContain "SerialIO Scope: Active"
}

test("getPerformanceMetrics should include SerialIO queue and total completed task counters") {
// Trigger lazy init of the SerialIO executor before asking for metrics so its queue
// line resolves to a concrete value instead of the "n/a" fallback path.
OneSignalDispatchers.SerialIO shouldNotBe null

val metrics = OneSignalDispatchers.getPerformanceMetrics()

metrics shouldContain "OneSignalDispatchers Performance Metrics:"
metrics shouldContain "SerialIO Queue:"
metrics shouldContain "Total completed tasks:"
}

test("SerialIO dispatcher executes work on a background thread") {
val callerThreadId = Thread.currentThread().id
var serialThreadId: Long? = null

runBlocking {
withContext(OneSignalDispatchers.SerialIO) {
serialThreadId = Thread.currentThread().id
}
}

serialThreadId shouldNotBe null
serialThreadId shouldNotBe callerThreadId
}

test("launchOnSerialIO runs tasks on a single thread in submission order") {
// SerialIO's contract: submission order on the caller thread == execution order on
// the worker thread. We submit N tasks with a small sleep so they queue up, then
// assert the recorded order matches submission order and that all observations
// came from one thread.
val taskCount = 5
val observedOrder = mutableListOf<Int>()
val observedThreads = mutableSetOf<Long>()
val latch = CountDownLatch(taskCount)

repeat(taskCount) { i ->
OneSignalDispatchers.launchOnSerialIO {
Thread.sleep(5)
synchronized(observedOrder) {
observedOrder.add(i)
observedThreads.add(Thread.currentThread().id)
}
latch.countDown()
}
}

latch.await()
observedOrder shouldBe (0 until taskCount).toList()
observedThreads.size shouldBe 1
}

test("executorStatus returns 'Active' / 'Shutdown' on the happy path and the Not initialized message when the underlying check throws") {
// Happy paths (Shutdown / Active) are exercised indirectly via getStatus(); this
// case pins down the defensive catch branch, which fires when the underlying
// executor's lazy initializer is in a failed state (e.g. JVM-level
// Executors.newSingleThreadExecutor refused to construct) and every isShutdown
// access re-throws. Without this, the catch is unreachable from unit tests because
// ThreadPoolExecutor.isShutdown does not normally throw.
OneSignalDispatchers.executorStatus("ioExecutor") { false } shouldBe "Active"
OneSignalDispatchers.executorStatus("ioExecutor") { true } shouldBe "Shutdown"
OneSignalDispatchers.executorStatus("ioExecutor") {
throw RuntimeException("init failure")
} shouldBe "ioExecutor Not initialized init failure"
OneSignalDispatchers.executorStatus("ioExecutor") {
throw RuntimeException()
} shouldBe "ioExecutor Not initialized Unknown error"
}

test("scopeStatus returns 'Active' / 'Cancelled' on the happy path and the Not initialized message when the underlying check throws") {
OneSignalDispatchers.scopeStatus("IOScope") { true } shouldBe "Active"
OneSignalDispatchers.scopeStatus("IOScope") { false } shouldBe "Cancelled"
OneSignalDispatchers.scopeStatus("IOScope") {
throw RuntimeException("cancelled supervisor")
} shouldBe "IOScope Not initialized cancelled supervisor"
OneSignalDispatchers.scopeStatus("IOScope") {
throw RuntimeException()
} shouldBe "IOScope Not initialized Unknown error"
}

test("exceptions in a SerialIO task do not stop subsequent tasks from running") {
// Mirrors the parallel "exceptions in one task should not affect others" case for
// launchOnIO. A thrown exception in one serial task must not poison the dispatcher
// for the rest of the queue.
val latch = CountDownLatch(3)
val successCount = AtomicInteger(0)
val errorCount = AtomicInteger(0)

repeat(3) { i ->
OneSignalDispatchers.launchOnSerialIO {
try {
if (i == 1) {
throw RuntimeException("Test error")
}
successCount.incrementAndGet()
} catch (e: Exception) {
errorCount.incrementAndGet()
} finally {
latch.countDown()
}
}
}

latch.await()
successCount.get() shouldBe 2
errorCount.get() shouldBe 1
}

test("dispatchers should handle concurrent operations") {
Expand Down
Loading
Loading