Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ class PathManager @Inject constructor(
// -----------------------------

suspend fun getPath(exclude: Snode? = null): Path {
directory.refreshPoolIfStaleAsync()

val current = _paths.value
if (current.size >= targetPathCount && current.any { exclude == null || !it.contains(exclude) }) {
return selectPath(current, exclude)
Expand Down
254 changes: 187 additions & 67 deletions app/src/main/java/org/session/libsession/network/snode/SnodeDirectory.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.session.libsession.network.snode

import android.os.SystemClock
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
Expand Down Expand Up @@ -27,18 +28,28 @@ class SnodeDirectory @Inject constructor(

companion object {
private const val MINIMUM_SNODE_POOL_COUNT = 12
private const val MINIMUM_SNODE_REFRESH_COUNT = 3
private const val SEED_NODE_PORT = 4443

private const val POOL_REFRESH_INTERVAL_MS = 2 * 60 * 60 * 1000L // 2h

private const val KEY_IP = "public_ip"
private const val KEY_PORT = "storage_port"
private const val KEY_X25519 = "pubkey_x25519"
private const val KEY_ED25519 = "pubkey_ed25519"
private const val KEY_VERSION = "storage_server_version"
}

//todo ONION we need to add the "refresh every 2h plus intersection" rules
/**
* Single mutex for any operation that can persist/replace the pool (bootstrap OR refresh).
* This prevents refresh/bootstrap races overwriting each other.
*/
private val poolWriteMutex = Mutex()

// Refresh state (non-blocking trigger + real exclusion inside mutex)
@Volatile private var snodePoolRefreshing = false

private val poolMutex = Mutex()
@Volatile private var lastRefreshElapsedMs: Long = 0L

private val seedNodePool: Set<String> = when (prefs.getEnvironment()) {
Environment.DEV_NET -> setOf("http://sesh-net.local:1280")
Expand Down Expand Up @@ -73,8 +84,9 @@ class SnodeDirectory @Inject constructor(

fun getSnodePool(): Set<Snode> = storage.getSnodePool()

fun updateSnodePool(newPool: Set<Snode>) {
private fun persistSnodePool(newPool: Set<Snode>) {
storage.setSnodePool(newPool)
lastRefreshElapsedMs = SystemClock.elapsedRealtime()
}

/**
Expand All @@ -90,78 +102,85 @@ class SnodeDirectory @Inject constructor(
suspend fun ensurePoolPopulated(
minCount: Int = MINIMUM_SNODE_POOL_COUNT
): Set<Snode> {
// 1. Fast path: Optimistic check (no lock)
val current = getSnodePool()

if (current.size >= minCount) {
// ensure we set the refresh timestamp in case we are starting the app
// with already cached snodes
if (lastRefreshElapsedMs == 0L) {
lastRefreshElapsedMs = SystemClock.elapsedRealtime()
}
return current
}

// 2. Slow path: Acquire lock
return poolMutex.withLock {
// 3. Double-check: Did someone populate it while we were waiting?
return poolWriteMutex.withLock {
val freshCurrent = getSnodePool()
if (freshCurrent.size >= minCount) {
return@withLock freshCurrent
}
if (freshCurrent.size >= minCount) return@withLock freshCurrent

val target = seedNodePool.random()
Log.d("SnodeDirectory", "Populating snode pool using seed node: $target")

val url = "$target/json_rpc"
val responseBytes = HTTP.execute(
HTTP.Verb.POST,
url = url,
parameters = getRandomSnodeParams,
useSeedNodeConnection = true
)

val json = runCatching {
JsonUtil.fromJson(responseBytes, Map::class.java)
}.getOrNull() ?: buildMap<String, Any> {
this["result"] = responseBytes.toString(Charsets.UTF_8)
}
val seeded = fetchSnodePoolFromSeed()
if (seeded.isEmpty()) throw IllegalStateException("Seed node returned empty snode pool")

@Suppress("UNCHECKED_CAST")
val intermediate = json["result"] as? Map<*, *>
?: throw IllegalStateException("Failed to update snode pool, 'result' was null.")
.also { Log.d("SnodeDirectory", "Failed to update snode pool, intermediate was null.") }

@Suppress("UNCHECKED_CAST")
val rawSnodes = intermediate["service_node_states"] as? List<*>
?: throw IllegalStateException("Failed to update snode pool, 'service_node_states' was null.")
.also { Log.d("SnodeDirectory", "Failed to update snode pool, rawSnodes was null.") }

val newPool = rawSnodes.asSequence()
.mapNotNull { it as? Map<*, *> }
.mapNotNull { raw ->
createSnode(
address = raw[KEY_IP] as? String,
port = raw[KEY_PORT] as? Int,
ed25519Key = raw[KEY_ED25519] as? String,
x25519Key = raw[KEY_X25519] as? String,
version = (raw[KEY_VERSION] as? List<*>)
?.filterIsInstance<Int>()
?.let(Snode::Version)
).also {
if (it == null) {
Log.d(
"SnodeDirectory",
"Failed to parse snode from: ${raw.prettifiedDescription()}."
)
}
}
}
.toSet()
Log.d("SnodeDirectory", "Persisting snode pool with ${seeded.size} snodes (seed bootstrap).")
persistSnodePool(seeded)
seeded
}
}

if (newPool.isEmpty()) {
throw IllegalStateException("Seed node returned empty snode pool")
}
private suspend fun fetchSnodePoolFromSeed(): Set<Snode> {
val target = seedNodePool.random()
Log.d("SnodeDirectory", "Fetching snode pool using seed node: $target")
return fetchSnodePool(target, fromSeed = true)
}

Log.d("SnodeDirectory", "Persisting snode pool with ${newPool.size} snodes.")
updateSnodePool(newPool)
private suspend fun fetchSnodePoolFromSnode(snode: Snode): Set<Snode> {
val target = "${snode.address}:${snode.port}"
Log.d("SnodeDirectory", "Fetching snode pool using snode: $target")
return fetchSnodePool(target, fromSeed = false)
}

newPool
private suspend fun fetchSnodePool(target: String, fromSeed: Boolean): Set<Snode> {
val url = "$target/json_rpc"
val responseBytes = HTTP.execute(
HTTP.Verb.POST,
url = url,
parameters = getRandomSnodeParams,
useSeedNodeConnection = fromSeed
)

val json = runCatching {
JsonUtil.fromJson(responseBytes, Map::class.java)
}.getOrNull() ?: buildMap<String, Any> {
this["result"] = responseBytes.toString(Charsets.UTF_8)
}

@Suppress("UNCHECKED_CAST")
val intermediate = json["result"] as? Map<*, *>
?: throw IllegalStateException("Failed to update snode pool, 'result' was null.")
.also { Log.d("SnodeDirectory", "Failed to update snode pool, intermediate was null.") }

@Suppress("UNCHECKED_CAST")
val rawSnodes = intermediate["service_node_states"] as? List<*>
?: throw IllegalStateException("Failed to update snode pool, 'service_node_states' was null.")
.also { Log.d("SnodeDirectory", "Failed to update snode pool, rawSnodes was null.") }

return rawSnodes.asSequence()
.mapNotNull { it as? Map<*, *> }
.mapNotNull { raw ->
createSnode(
address = raw[KEY_IP] as? String,
port = raw[KEY_PORT] as? Int,
ed25519Key = raw[KEY_ED25519] as? String,
x25519Key = raw[KEY_X25519] as? String,
version = (raw[KEY_VERSION] as? List<*>)
?.filterIsInstance<Int>()
?.let(Snode::Version)
).also {
if (it == null) {
Log.d("SnodeDirectory", "Failed to parse snode from: ${raw.prettifiedDescription()}.")
}
}
}
.toSet()
}

/**
Expand Down Expand Up @@ -220,7 +239,8 @@ class SnodeDirectory @Inject constructor(
val current = getSnodePool()
val hit = current.firstOrNull { it.publicKeySet?.ed25519Key == ed25519Key } ?: return
Log.w("SnodeDirectory", "Dropping snode from pool (ed25519=$ed25519Key): $hit")
updateSnodePool(current - hit)
storage.setSnodePool(current - hit)
// NOTE: do NOT touch lastRefreshElapsedMs here; dropping isn’t a “refresh”.
}

fun updateForkInfo(newForkInfo: ForkInfo) {
Expand All @@ -233,8 +253,108 @@ class SnodeDirectory @Inject constructor(
}
}

fun getSnodeByKey(ed25519Key: String?): Snode?{
if(ed25519Key == null) return null
fun getSnodeByKey(ed25519Key: String?): Snode? {
if (ed25519Key == null) return null
return getSnodePool().firstOrNull { it.publicKeySet?.ed25519Key == ed25519Key }
}
}

// snode pool refresh logic

/**
* Non-blocking trigger.
*
* IMPORTANT: does nothing until we have successfully seeded at least once
* (lastRefreshElapsedMs != 0L).
*/
fun refreshPoolIfStaleAsync() {
// Don’t refresh until we’ve successfully seeded at least once
if (lastRefreshElapsedMs == 0L) return

val now = SystemClock.elapsedRealtime()
if (snodePoolRefreshing) return
if (now - lastRefreshElapsedMs < POOL_REFRESH_INTERVAL_MS) return

scope.launch { refreshPoolFromSnodes() }
}

private suspend fun refreshPoolFromSnodes() {
poolWriteMutex.withLock {
// Re-check staleness INSIDE the lock to avoid “double refresh” races
if (lastRefreshElapsedMs == 0L) return // still not seeded
val now = SystemClock.elapsedRealtime()
if (now - lastRefreshElapsedMs < POOL_REFRESH_INTERVAL_MS) return

if (snodePoolRefreshing) return
snodePoolRefreshing = true

try {
val current = getSnodePool()

// If pool has less than 3 snodes, refresh from seed
if (current.size < MINIMUM_SNODE_REFRESH_COUNT) {
val seeded = fetchSnodePoolFromSeed()
if (seeded.isNotEmpty()) {
Log.d("SnodeDirectory", "Refreshing pool from seed (pool too small). New size=${seeded.size}")
persistSnodePool(seeded)
}
return
}

// Otherwise fetch from 3 random snodes (no special filtering requested)
val results = mutableListOf<Set<Snode>>()
val attempts = current.shuffled().iterator()

while (results.size < MINIMUM_SNODE_REFRESH_COUNT && attempts.hasNext()) {
val snode = attempts.next()
val fetched = runCatching { fetchSnodePoolFromSnode(snode) }.getOrNull()
if (!fetched.isNullOrEmpty()) results += fetched
}

if (results.size < MINIMUM_SNODE_REFRESH_COUNT) {
// Could not fetch 3 pools reliably, fallback to seed
val seeded = fetchSnodePoolFromSeed()
if (seeded.isNotEmpty()) {
Log.d("SnodeDirectory", "Refreshing pool from seed (3-snode fetch failed). New size=${seeded.size}")
persistSnodePool(seeded)
}
return
}

val intersected = intersectByEd25519(results)

// If intersection is empty, fallback to seed
if (intersected.isEmpty()) {
val seeded = fetchSnodePoolFromSeed()
if (seeded.isNotEmpty()) {
Log.d("SnodeDirectory", "Intersection empty; refreshing pool from seed instead. New size=${seeded.size}")
persistSnodePool(seeded)
}
return
}

Log.d("SnodeDirectory", "Refreshing pool via 3-node intersection. New size=${intersected.size}")
persistSnodePool(intersected)

} finally {
snodePoolRefreshing = false
}
}
}

/**
* Get the intersection of snodes from the various snode pool results
*/
private fun intersectByEd25519(pools: List<Set<Snode>>): Set<Snode> {
if (pools.isEmpty()) return emptySet()

val candidates = pools.first()
val otherPoolKeys = pools.drop(1).map { pool ->
pool.mapNotNull { it.publicKeySet?.ed25519Key }.toSet()
}

return candidates.filter { snode ->
val key = snode.publicKeySet?.ed25519Key ?: return@filter false
otherPoolKeys.all { it.contains(key) }
}.toSet()
}
}
4 changes: 2 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ assertjCoreVersion = "3.27.6"
biometricVersion = "1.1.0"
cameraCamera2Version = "1.5.2"
cardviewVersion = "1.0.0"
composeBomVersion = "2025.12.00"
composeBomVersion = "2026.01.00"
conscryptAndroidVersion = "2.5.3"
conscryptJavaVersion = "2.5.2"
constraintlayoutVersion = "2.2.1"
Expand Down Expand Up @@ -69,7 +69,7 @@ zxingVersion = "3.5.4"
huaweiPushVersion = "6.13.0.300"
googlePlayReviewVersion = "2.0.2"
coilVersion = "3.3.0"
billingVersion = "8.1.0"
billingVersion = "8.3.0"

[libraries]
accompanist-permissions = { module = "com.google.accompanist:accompanist-permissions", version.ref = "accompanistPermissionsVersion" }
Expand Down