diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bc2575b6a..6f974fcee 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -128,6 +128,7 @@ jobs: if: needs.check-for-pr.outputs.skip != 'true' outputs: configurations: ${{ steps.compute.outputs.configurations }} + run_fuzz: ${{ steps.compute.outputs.run_fuzz }} steps: - name: Debounce label events if: github.event.action == 'labeled' @@ -155,6 +156,13 @@ jobs: if echo "$labels" | grep -Fq "test:tsan"; then configs="$configs"',"tsan"' fi + if echo "$labels" | grep -Fq "test:fuzz"; then + echo "run_fuzz=true" >> $GITHUB_OUTPUT + else + echo "run_fuzz=false" >> $GITHUB_OUTPUT + fi + else + echo "run_fuzz=false" >> $GITHUB_OUTPUT fi configs="$configs]" @@ -194,3 +202,37 @@ jobs: body-file: test-summary.md comment-id: ci-test-results + fuzz: + needs: [check-for-pr, compute-configurations] + if: needs.check-for-pr.outputs.skip != 'true' && needs.compute-configurations.outputs.run_fuzz == 'true' + runs-on: ubuntu-latest + continue-on-error: true + timeout-minutes: 30 + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + - name: Cache Gradle Wrapper Binaries + uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5 + with: + path: ~/.gradle/wrapper/dists + key: gradle-wrapper-${{ runner.os }}-${{ hashFiles('gradle/wrapper/gradle-wrapper.properties') }} + restore-keys: | + gradle-wrapper-${{ runner.os }}- + - name: Cache Gradle User Home + uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5 + with: + path: ~/.gradle/caches + key: gradle-caches-${{ runner.os }}-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }} + restore-keys: | + gradle-caches-${{ runner.os }}- + - name: Setup OS + run: | + sudo apt-get update + sudo apt-get install -y clang + - name: Fuzz + run: ./gradlew :ddprof-lib:fuzz:fuzz -Pfuzz-duration=120 --no-daemon + - name: Upload crash artifacts + if: failure() + uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1 + with: + name: fuzz-crashes + path: ddprof-lib/fuzz/build/fuzz-crashes/ diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index 600c94a5b..eb8351368 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -15,6 +15,38 @@ jobs: uses: ./.github/workflows/test_workflow.yml with: configuration: '["asan"]' # Ignoring tsan for now '["asan", "tsan"]' + fuzz: + runs-on: ubuntu-latest + continue-on-error: true + timeout-minutes: 30 + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + - name: Cache Gradle Wrapper Binaries + uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5 + with: + path: ~/.gradle/wrapper/dists + key: gradle-wrapper-${{ runner.os }}-${{ hashFiles('gradle/wrapper/gradle-wrapper.properties') }} + restore-keys: | + gradle-wrapper-${{ runner.os }}- + - name: Cache Gradle User Home + uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5 + with: + path: ~/.gradle/caches + key: gradle-caches-${{ runner.os }}-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }} + restore-keys: | + gradle-caches-${{ runner.os }}- + - name: Setup OS + run: | + sudo apt-get update + sudo apt-get install -y clang + - name: Fuzz + run: ./gradlew :ddprof-lib:fuzz:fuzz -Pfuzz-duration=120 --no-daemon + - name: Upload crash artifacts + if: failure() + uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1 + with: + name: fuzz-crashes + path: ddprof-lib/fuzz/build/fuzz-crashes/ report-failures: runs-on: ubuntu-latest needs: run-test diff --git a/AGENTS.md b/AGENTS.md index 8c23d6959..b0a6635fa 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -384,6 +384,29 @@ arm64 has a weakly-ordered memory model (unlike x86 TSO). Incorrect ordering cau - **Architecture Support**: x64, arm64 with architecture-specific stack walking - **Debug Symbol Handling**: Split debug information for production deployments +#### musl/aarch64/JDK11 — `start_routine_wrapper_spec` minimal-frame invariant + +`start_routine_wrapper_spec` (`libraryPatcher_linux.cpp`) has a known "precarious stack guard +corruption" on musl/aarch64/JDK11 (see the comment at the function definition). The root cause +is that musl places the stack canary close to the frame boundary, so any substantial stack +allocation inside `start_routine_wrapper_spec` corrupts it. + +**Rule:** Any code placed inside `start_routine_wrapper_spec` that allocates meaningful stack +objects MUST be extracted into a separate `__attribute__((noinline))` helper so those objects +live in the helper's own frame, not in `start_routine_wrapper_spec`'s frame. + +Existing helpers follow this pattern: +- `delete_routine_info` — isolates `SignalBlocker` (`sigset_t`, 128 bytes on musl) +- `init_tls_and_register` — same reason +- `run_with_musl_cleanup` — isolates `struct __ptcb` from `pthread_cleanup_push` (24 bytes) + +**Trigger:** `pthread_cleanup_push` is a macro that declares `struct __ptcb __cb` on the +caller's stack. If called directly inside `start_routine_wrapper_spec` it re-triggers the +corruption. Always wrap it in a `noinline` helper. + +This only affects the `#ifdef __aarch64__` / `#ifndef __GLIBC__` code path. Other platforms +and libc combinations do not have this constraint. + ## Development Guidelines ### Code Organization Principles diff --git a/build-logic/conventions/src/main/kotlin/com/datadoghq/native/fuzz/FuzzTargetsPlugin.kt b/build-logic/conventions/src/main/kotlin/com/datadoghq/native/fuzz/FuzzTargetsPlugin.kt index b6c5a0b56..248131342 100644 --- a/build-logic/conventions/src/main/kotlin/com/datadoghq/native/fuzz/FuzzTargetsPlugin.kt +++ b/build-logic/conventions/src/main/kotlin/com/datadoghq/native/fuzz/FuzzTargetsPlugin.kt @@ -86,7 +86,7 @@ class FuzzTargetsPlugin : Plugin { val includeFiles = buildIncludePaths(project, extension, homebrewLLVM) // Build compiler/linker args - val compilerArgs = buildFuzzCompilerArgs() + val compilerArgs = buildFuzzCompilerArgs(project) val linkerArgs = buildFuzzLinkerArgs(homebrewLLVM, clangResourceDir, project.logger) val fuzzSourceDir = extension.fuzzSourceDir.get().asFile @@ -194,7 +194,8 @@ class FuzzTargetsPlugin : Plugin { return includes } - private fun buildFuzzCompilerArgs(): List { + private fun buildFuzzCompilerArgs(project: Project): List { + val version = project.version.toString() val args = mutableListOf( "-O1", "-g", @@ -202,7 +203,8 @@ class FuzzTargetsPlugin : Plugin { "-fsanitize=fuzzer,address,undefined", "-fvisibility=hidden", "-std=c++17", - "-DFUZZING_BUILD_MODE_UNSAFE_FOR_PRODUCTION" + "-DFUZZING_BUILD_MODE_UNSAFE_FOR_PRODUCTION", + "-DPROFILER_VERSION=\"$version\"" ) if (PlatformUtils.currentPlatform == Platform.LINUX && PlatformUtils.isMusl()) { args.add("-D__musl__") diff --git a/ddprof-lib/src/main/cpp/callTraceStorage.cpp b/ddprof-lib/src/main/cpp/callTraceStorage.cpp index 00f5ea352..bf8af010a 100644 --- a/ddprof-lib/src/main/cpp/callTraceStorage.cpp +++ b/ddprof-lib/src/main/cpp/callTraceStorage.cpp @@ -6,240 +6,16 @@ #include "callTraceStorage.h" #include "counters.h" +#include "log.h" #include "os.h" #include "common.h" #include "thread.h" #include "vmEntry.h" // For BCI_ERROR constant #include "arch.h" // For LP64_ONLY macro and COMMA macro #include "guards.h" // For table swap critical sections -#include "primeProbing.h" #include "thread.h" #include #include -#include - -// RefCountGuard static members -RefCountSlot RefCountGuard::refcount_slots[RefCountGuard::MAX_THREADS]; -int RefCountGuard::slot_owners[RefCountGuard::MAX_THREADS]; - - -// RefCountGuard implementation -int RefCountGuard::getThreadRefCountSlot() { - // Signal-safe collision resolution: use OS::threadId() with semi-random prime step probing - ProfiledThread* thrd = ProfiledThread::currentSignalSafe(); - int tid = thrd != nullptr ? thrd->tid() : OS::threadId(); - - // Semi-random prime step probing to eliminate secondary clustering - HashProbe probe(static_cast(tid), MAX_THREADS); - - int slot = probe.slot(); - for (int i = 0; i < MAX_PROBE_DISTANCE; i++) { - // Try to claim this slot atomically - int expected = 0; // Empty slot (no thread ID) - if (__atomic_compare_exchange_n(&slot_owners[slot], &expected, tid, false, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) { - // Successfully claimed the slot - return slot; - } - - // Check if we already own this slot (for reentrant calls) - if (__atomic_load_n(&slot_owners[slot], __ATOMIC_ACQUIRE) == tid) { - return slot; - } - - // Move to next slot using probe - if (probe.hasNext()) { - slot = probe.next(); - } - } - - // All probing attempts failed - return -1 to indicate failure - return -1; -} - -RefCountGuard::RefCountGuard(CallTraceHashTable* resource) : _active(true), _my_slot(-1) { - // Get thread refcount slot using signal-safe collision resolution - _my_slot = getThreadRefCountSlot(); - - if (_my_slot == -1) { - // Slot allocation failed - refcount guard is inactive - _active = false; - return; - } - - // CRITICAL ORDERING: Store pointer FIRST, then increment count - // This ensures the pointer-first protocol for race-free operation - // - // Why this ordering is safe: - // Between step 1 and 2, if scanner runs: - // - Scanner loads count=0 (not yet incremented) - // - Scanner sees slot as inactive, skips it - // - Safe: we haven't "activated" protection yet - // - // After step 2, slot is fully active and protects the resource - __atomic_store_n(&refcount_slots[_my_slot].active_table, resource, __ATOMIC_RELEASE); - __atomic_fetch_add(&refcount_slots[_my_slot].count, 1, __ATOMIC_RELEASE); -} - -RefCountGuard::~RefCountGuard() { - if (_active && _my_slot >= 0) { - // CRITICAL ORDERING: Decrement count FIRST, then clear pointer - // This ensures safe deactivation - // - // Why this ordering is safe: - // After step 1, count=0 so scanner will skip this slot - // Step 2 clears the pointer (cleanup) - // No window where scanner thinks slot protects a table it doesn't - __atomic_fetch_sub(&refcount_slots[_my_slot].count, 1, __ATOMIC_RELEASE); - __atomic_store_n(&refcount_slots[_my_slot].active_table, nullptr, __ATOMIC_RELEASE); - - // Release slot ownership - __atomic_store_n(&slot_owners[_my_slot], 0, __ATOMIC_RELEASE); - } -} - -RefCountGuard::RefCountGuard(RefCountGuard&& other) noexcept : _active(other._active), _my_slot(other._my_slot) { - other._active = false; -} - -RefCountGuard& RefCountGuard::operator=(RefCountGuard&& other) noexcept { - if (this != &other) { - // Clean up current state with same ordering as destructor - if (_active && _my_slot >= 0) { - __atomic_fetch_sub(&refcount_slots[_my_slot].count, 1, __ATOMIC_RELEASE); - __atomic_store_n(&refcount_slots[_my_slot].active_table, nullptr, __ATOMIC_RELEASE); - __atomic_store_n(&slot_owners[_my_slot], 0, __ATOMIC_RELEASE); - } - - // Move from other - _active = other._active; - _my_slot = other._my_slot; - - // Clear other - other._active = false; - } - return *this; -} - -void RefCountGuard::waitForRefCountToClear(CallTraceHashTable* table_to_delete) { - // Check refcount slots for the table we want to delete - // - // POINTER-FIRST PROTOCOL GUARANTEES: - // - Constructor stores pointer then increments count - // - Destructor decrements count then clears pointer - // - Scanner checks count first (if 0, slot is inactive) - // - // TRACE DROP WINDOW (intentional design): - // - Scanner can complete on FIRST iteration if all slots have count=0 - // - Guards in construction (pointer stored, count still 0) are treated as inactive - // - Revalidation check in put() detects this race and drops the trace - // - This trades a narrow trace-drop window (~10-100ns) for protocol simplicity - // - USE-AFTER-FREE IS IMPOSSIBLE: Revalidation prevents table access after deletion - - // PHASE 1: Fast path - spin with pause for short waits (common case) - // Expected: refcounts clear within 1-20µs as put() operations complete - const int SPIN_ITERATIONS = 100; - for (int spin = 0; spin < SPIN_ITERATIONS; ++spin) { - bool all_clear = true; - - // Scan all slots (no bitmap optimization, but simpler logic) - for (int i = 0; i < MAX_THREADS; ++i) { - // CRITICAL: Check count FIRST (pointer-first protocol) - uint32_t count = __atomic_load_n(&refcount_slots[i].count, __ATOMIC_ACQUIRE); - if (count == 0) { - continue; // Slot inactive, skip it - } - - // Count > 0, so slot is active - check which table it protects - CallTraceHashTable* table = __atomic_load_n(&refcount_slots[i].active_table, __ATOMIC_ACQUIRE); - if (table == table_to_delete) { - all_clear = false; - break; - } - } - - if (all_clear) { - return; // Fast path success - refcounts cleared quickly - } - spinPause(); // CPU pause instruction, ~10-50 cycles - } - - // PHASE 2: Slow path - async-signal-safe sleep for blocked thread case - const int MAX_WAIT_ITERATIONS = 5000; - struct timespec sleep_time = {0, 100000}; // 100 microseconds - - for (int wait_count = 0; wait_count < MAX_WAIT_ITERATIONS; ++wait_count) { - bool all_clear = true; - - for (int i = 0; i < MAX_THREADS; ++i) { - uint32_t count = __atomic_load_n(&refcount_slots[i].count, __ATOMIC_ACQUIRE); - if (count == 0) { - continue; - } - - CallTraceHashTable* table = __atomic_load_n(&refcount_slots[i].active_table, __ATOMIC_ACQUIRE); - if (table == table_to_delete) { - all_clear = false; - break; - } - } - - if (all_clear) { - return; // Slow path success - } - - // nanosleep is POSIX async-signal-safe and does not call malloc - nanosleep(&sleep_time, nullptr); - } - - // If we reach here, some refcounts didn't clear in time - // This shouldn't happen in normal operation but we log it for debugging -} - -void RefCountGuard::waitForAllRefCountsToClear() { - // PHASE 1: Fast path - spin with pause for short waits - const int SPIN_ITERATIONS = 100; - for (int spin = 0; spin < SPIN_ITERATIONS; ++spin) { - bool any_refcounts = false; - - for (int i = 0; i < MAX_THREADS; ++i) { - uint32_t count = __atomic_load_n(&refcount_slots[i].count, __ATOMIC_ACQUIRE); - if (count > 0) { - any_refcounts = true; - break; - } - } - - if (!any_refcounts) { - return; // Fast path success - } - spinPause(); - } - - // PHASE 2: Slow path - async-signal-safe sleep - const int MAX_WAIT_ITERATIONS = 5000; - struct timespec sleep_time = {0, 100000}; // 100 microseconds - - for (int wait_count = 0; wait_count < MAX_WAIT_ITERATIONS; ++wait_count) { - bool any_refcounts = false; - - for (int i = 0; i < MAX_THREADS; ++i) { - uint32_t count = __atomic_load_n(&refcount_slots[i].count, __ATOMIC_ACQUIRE); - if (count > 0) { - any_refcounts = true; - break; - } - } - - if (!any_refcounts) { - return; // Slow path success - } - - nanosleep(&sleep_time, nullptr); - } - - // If we reach here, some refcounts didn't clear in time -} - static const u64 OVERFLOW_TRACE_ID = 0x7fffffffffffffffULL; // Max 64-bit signed value @@ -266,15 +42,15 @@ CallTraceStorage::CallTraceStorage() : _active_storage(nullptr), _standby_storag _preserve_set_buffer.rehash(static_cast(1024 / 0.75f)); // Initialize triple-buffered storage - auto active_table = std::make_unique(); - active_table->setInstanceId(getNextInstanceId()); - active_table->setParentStorage(this); - __atomic_store_n(&_active_storage, active_table.release(), __ATOMIC_RELEASE); - - auto standby_table = std::make_unique(); - standby_table->setParentStorage(this); - standby_table->setInstanceId(getNextInstanceId()); - __atomic_store_n(&_standby_storage, standby_table.release(), __ATOMIC_RELEASE); + auto active_ptr = std::make_unique(); + active_ptr->setInstanceId(getNextInstanceId()); + active_ptr->setParentStorage(this); + __atomic_store_n(&_active_storage, active_ptr.release(), __ATOMIC_RELEASE); + + auto standby_ptr = std::make_unique(); + standby_ptr->setParentStorage(this); + standby_ptr->setInstanceId(getNextInstanceId()); + __atomic_store_n(&_standby_storage, standby_ptr.release(), __ATOMIC_RELEASE); auto scratch_table = std::make_unique(); scratch_table->setParentStorage(this); diff --git a/ddprof-lib/src/main/cpp/callTraceStorage.h b/ddprof-lib/src/main/cpp/callTraceStorage.h index 77bbcce8b..4735e18f2 100644 --- a/ddprof-lib/src/main/cpp/callTraceStorage.h +++ b/ddprof-lib/src/main/cpp/callTraceStorage.h @@ -8,6 +8,7 @@ #define _CALLTRACESTORAGE_H #include "callTraceHashTable.h" +#include "refCountGuard.h" #include "spinLock.h" #include "os.h" #include @@ -28,82 +29,6 @@ class CallTraceHashTable; // Using reference parameter avoids malloc() for vector creation and copying typedef std::function&)> LivenessChecker; -/** - * Cache-aligned reference counting slot for thread-local reference counting. - * Each slot occupies a full cache line (64 bytes) to eliminate false sharing. - * - * CORRECTNESS: The pointer-first protocol ensures race-free operation: - * - Constructor: Store pointer first, then increment count - * - Destructor: Decrement count first, then clear pointer - * - Scanner: Check count first (if 0, slot is inactive) - * - * This ordering ensures no window where scanner incorrectly believes a slot - * is inactive when it should be protecting a table. - */ -struct alignas(DEFAULT_CACHE_LINE_SIZE) RefCountSlot { - volatile uint32_t count; // Reference count (0 = inactive) - char _padding1[4]; // Alignment padding for pointer - CallTraceHashTable* active_table; // Which table is being referenced (8 bytes on 64-bit) - char padding[DEFAULT_CACHE_LINE_SIZE - 16]; // Remaining padding (64 - 16 = 48 bytes) - - RefCountSlot() : count(0), _padding1{}, active_table(nullptr), padding{} { - static_assert(sizeof(RefCountSlot) == DEFAULT_CACHE_LINE_SIZE, - "RefCountSlot must be exactly one cache line"); - } -}; - -/** - * RAII guard for thread-local reference counting. - * - * This class provides lock-free memory reclamation for CallTraceHashTable instances. - * Uses the pointer-first protocol to avoid race conditions during slot activation/deactivation. - * - * Performance characteristics: - * - Hot path: ~44-94 cycles - * - Thread-local cache line access (zero contention) - * - No bitmap operations required - * - * Correctness: - * - Pointer stored BEFORE count increment (activation) - * - Count decremented BEFORE pointer cleared (deactivation) - * - Scanner checks count first, ensuring consistent view - */ -class RefCountGuard { -public: - static constexpr int MAX_THREADS = 8192; - static constexpr int MAX_PROBE_DISTANCE = 32; // Maximum probing attempts - - static RefCountSlot refcount_slots[MAX_THREADS]; - static int slot_owners[MAX_THREADS]; // Thread ID ownership verification - -private: - bool _active; - int _my_slot; // This instance's assigned slot - - // Signal-safe slot assignment using thread ID hash with prime probing - static int getThreadRefCountSlot(); - -public: - RefCountGuard(CallTraceHashTable* resource); - ~RefCountGuard(); - - // Non-copyable, movable for efficiency - RefCountGuard(const RefCountGuard&) = delete; - RefCountGuard& operator=(const RefCountGuard&) = delete; - - RefCountGuard(RefCountGuard&& other) noexcept; - RefCountGuard& operator=(RefCountGuard&& other) noexcept; - - // Check if refcount guard is active (slot allocation succeeded) - bool isActive() const { return _active; } - - // Wait for reference counts pointing to specific table to clear - static void waitForRefCountToClear(CallTraceHashTable* table_to_delete); - - // Wait for ALL reference counts to clear - static void waitForAllRefCountsToClear(); -}; - class CallTraceStorage { public: // Reserved trace ID for dropped samples due to contention diff --git a/ddprof-lib/src/main/cpp/dictionary.h b/ddprof-lib/src/main/cpp/dictionary.h index e5df18f82..50824b2aa 100644 --- a/ddprof-lib/src/main/cpp/dictionary.h +++ b/ddprof-lib/src/main/cpp/dictionary.h @@ -84,6 +84,9 @@ class Dictionary { unsigned int bounded_lookup(const char *key, size_t length, int size_limit); void collect(std::map &map); + + int counterId() const { return _id; } + int size() const { return _size; } }; #endif // _DICTIONARY_H diff --git a/ddprof-lib/src/main/cpp/flightRecorder.cpp b/ddprof-lib/src/main/cpp/flightRecorder.cpp index 50e495188..32e3f0866 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.cpp +++ b/ddprof-lib/src/main/cpp/flightRecorder.cpp @@ -56,7 +56,7 @@ SharedLineNumberTable::~SharedLineNumberTable() { void Lookup::fillNativeMethodInfo(MethodInfo *mi, const char *name, const char *lib_name) { - mi->_class = _classes->lookup(""); + mi->_class = _classes->lookupDuringDump("", 0); // TODO return the library name once we figured out how to cooperate with the // backend // if (lib_name == NULL) { @@ -105,7 +105,7 @@ void Lookup::fillNativeMethodInfo(MethodInfo *mi, const char *name, void Lookup::fillRemoteFrameInfo(MethodInfo *mi, const RemoteFrameInfo *rfi) { // Store build-id in the class name field - mi->_class = _classes->lookup(rfi->build_id); + mi->_class = _classes->lookupDuringDump(rfi->build_id, strlen(rfi->build_id)); // Store PC offset in hex format in the signature field char offset_hex[32]; @@ -228,8 +228,9 @@ void Lookup::fillJavaMethodInfo(MethodInfo *mi, jmethodID method, // constants... if (has_prefix(class_name, "Ljdk/internal/reflect/GeneratedConstructorAccessor")) { - class_name_id = _classes->lookup( - "jdk/internal/reflect/GeneratedConstructorAccessor"); + class_name_id = _classes->lookupDuringDump( + "jdk/internal/reflect/GeneratedConstructorAccessor", + strlen("jdk/internal/reflect/GeneratedConstructorAccessor")); method_name_id = _symbols.lookup("Object " "jdk.internal.reflect.GeneratedConstructorAccessor." @@ -238,7 +239,8 @@ void Lookup::fillJavaMethodInfo(MethodInfo *mi, jmethodID method, } else if (has_prefix(class_name, "Lsun/reflect/GeneratedConstructorAccessor")) { class_name_id = - _classes->lookup("sun/reflect/GeneratedConstructorAccessor"); + _classes->lookupDuringDump("sun/reflect/GeneratedConstructorAccessor", + strlen("sun/reflect/GeneratedConstructorAccessor")); method_name_id = _symbols.lookup( "Object " "sun.reflect.GeneratedConstructorAccessor.newInstance(Object[])"); @@ -246,7 +248,8 @@ void Lookup::fillJavaMethodInfo(MethodInfo *mi, jmethodID method, } else if (has_prefix(class_name, "Ljdk/internal/reflect/GeneratedMethodAccessor")) { class_name_id = - _classes->lookup("jdk/internal/reflect.GeneratedMethodAccessor"); + _classes->lookupDuringDump("jdk/internal/reflect/GeneratedMethodAccessor", + strlen("jdk/internal/reflect/GeneratedMethodAccessor")); method_name_id = _symbols.lookup("Object " "jdk.internal.reflect.GeneratedMethodAccessor." @@ -254,7 +257,8 @@ void Lookup::fillJavaMethodInfo(MethodInfo *mi, jmethodID method, method_sig_id = _symbols.lookup(method_sig); } else if (has_prefix(class_name, "Lsun/reflect/GeneratedMethodAccessor")) { - class_name_id = _classes->lookup("sun/reflect/GeneratedMethodAccessor"); + class_name_id = _classes->lookupDuringDump("sun/reflect/GeneratedMethodAccessor", + strlen("sun/reflect/GeneratedMethodAccessor")); method_name_id = _symbols.lookup( "Object sun.reflect.GeneratedMethodAccessor.invoke(Object, " "Object[])"); @@ -265,27 +269,30 @@ void Lookup::fillJavaMethodInfo(MethodInfo *mi, jmethodID method, // we want to normalise to java/lang/invoke/LambdaForm$MH, // java/lang/invoke/LambdaForm$DMH, java/lang/invoke/LambdaForm$BMH, if (has_prefix(class_name + lambdaFormPrefixLength, "MH")) { - class_name_id = _classes->lookup("java/lang/invoke/LambdaForm$MH"); + class_name_id = _classes->lookupDuringDump("java/lang/invoke/LambdaForm$MH", + strlen("java/lang/invoke/LambdaForm$MH")); } else if (has_prefix(class_name + lambdaFormPrefixLength, "BMH")) { - class_name_id = _classes->lookup("java/lang/invoke/LambdaForm$BMH"); + class_name_id = _classes->lookupDuringDump("java/lang/invoke/LambdaForm$BMH", + strlen("java/lang/invoke/LambdaForm$BMH")); } else if (has_prefix(class_name + lambdaFormPrefixLength, "DMH")) { - class_name_id = _classes->lookup("java/lang/invoke/LambdaForm$DMH"); + class_name_id = _classes->lookupDuringDump("java/lang/invoke/LambdaForm$DMH", + strlen("java/lang/invoke/LambdaForm$DMH")); } else { // don't recognise the suffix, so don't normalise class_name_id = - _classes->lookup(class_name + 1, strlen(class_name) - 2); + _classes->lookupDuringDump(class_name + 1, strlen(class_name) - 2); } method_name_id = _symbols.lookup(method_name); method_sig_id = _symbols.lookup(method_sig); } else { class_name_id = - _classes->lookup(class_name + 1, strlen(class_name) - 2); + _classes->lookupDuringDump(class_name + 1, strlen(class_name) - 2); method_name_id = _symbols.lookup(method_name); method_sig_id = _symbols.lookup(method_sig); } } else { Counters::increment(JMETHODID_SKIPPED); - class_name_id = _classes->lookup(""); + class_name_id = _classes->lookupDuringDump("", 0); method_name_id = _symbols.lookup("jvmtiError"); method_sig_id = _symbols.lookup("()L;"); } @@ -1172,10 +1179,9 @@ void Recording::writeCpool(Buffer *buf) { // constant pool count - bump each time a new pool is added buf->put8(12); - // classMap() is shared across the dump (this thread) and the JVMTI shared-lock - // writers (Profiler::lookupClass and friends). writeClasses() holds - // classMapSharedGuard() for its full duration; the exclusive classMap()->clear() - // in Profiler::dump runs only after this method returns. + // All three dictionaries are rotated by Profiler::rotateDictsAndRun() before any + // call that reaches finishChunk(); standby() returns the old-active snapshot which + // is stable for the lifetime of this call. Lookup lookup(this, &_method_map, Profiler::instance()->classMap()); writeFrameTypes(buf); writeThreadStates(buf); @@ -1187,9 +1193,9 @@ void Recording::writeCpool(Buffer *buf) { writePackages(buf, &lookup); writeConstantPoolSection(buf, T_SYMBOL, &lookup._symbols); writeConstantPoolSection(buf, T_STRING, - Profiler::instance()->stringLabelMap()); + Profiler::instance()->stringLabelMap()->standby()); writeConstantPoolSection(buf, T_ATTRIBUTE_VALUE, - Profiler::instance()->contextValueMap()); + Profiler::instance()->contextValueMap()->standby()); writeLogLevels(buf); flushIfNeeded(buf); } @@ -1389,12 +1395,7 @@ void Recording::writeMethods(Buffer *buf, Lookup *lookup) { void Recording::writeClasses(Buffer *buf, Lookup *lookup) { std::map classes; - // Hold classMapSharedGuard() for the full function. The const char* pointers - // stored in classes point into dictionary row storage; clear() frees that - // storage under the exclusive lock, so we must not release the shared lock - // until we have finished iterating. - auto guard = Profiler::instance()->classMapSharedGuard(); - lookup->_classes->collect(classes); + lookup->_classes->standby()->collect(classes); buf->putVar64(T_CLASS); buf->putVar64(classes.size()); @@ -1446,6 +1447,13 @@ void Recording::writeConstantPoolSection(Buffer *buf, JfrType type, writeConstantPoolSection(buf, type, constants); } +void Recording::writeConstantPoolSection(Buffer *buf, JfrType type, + StringDictionaryBuffer *buffer) { + std::map constants; + buffer->collect(constants); + writeConstantPoolSection(buf, type, constants); +} + void Recording::writeLogLevels(Buffer *buf) { buf->putVar64(T_LOG_LEVEL); buf->putVar64(LOG_ERROR - LOG_TRACE + 1); diff --git a/ddprof-lib/src/main/cpp/flightRecorder.h b/ddprof-lib/src/main/cpp/flightRecorder.h index b8af773ad..25ee51b3f 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.h +++ b/ddprof-lib/src/main/cpp/flightRecorder.h @@ -18,6 +18,7 @@ #include "buffers.h" #include "counters.h" #include "dictionary.h" +#include "stringDictionary.h" #include "event.h" #include "frame.h" #include "jfrMetadata.h" @@ -265,6 +266,8 @@ class Recording { void writeConstantPoolSection(Buffer *buf, JfrType type, Dictionary *dictionary); + void writeConstantPoolSection(Buffer *buf, JfrType type, + StringDictionaryBuffer *buffer); void writeLogLevels(Buffer *buf); @@ -305,7 +308,7 @@ class Lookup { public: Recording *_rec; MethodMap *_method_map; - Dictionary *_classes; + StringDictionary *_classes; Dictionary _packages; Dictionary _symbols; @@ -320,7 +323,7 @@ class Lookup { } public: - Lookup(Recording *rec, MethodMap *method_map, Dictionary *classes) + Lookup(Recording *rec, MethodMap *method_map, StringDictionary *classes) : _rec(rec), _method_map(method_map), _classes(classes), _packages(), _symbols() {} diff --git a/ddprof-lib/src/main/cpp/hotspot/hotspotSupport.cpp b/ddprof-lib/src/main/cpp/hotspot/hotspotSupport.cpp index cc6c6eb60..c3a97a2b1 100644 --- a/ddprof-lib/src/main/cpp/hotspot/hotspotSupport.cpp +++ b/ddprof-lib/src/main/cpp/hotspot/hotspotSupport.cpp @@ -531,22 +531,11 @@ __attribute__((no_sanitize("address"))) int HotspotSupport::walkVM(void* ucontex uintptr_t receiver = frame.jarg0(); if (receiver != 0) { VMSymbol* symbol = VMKlass::fromOop(receiver)->name(); - // walkVM runs in a signal handler. _class_map is mutated - // under _class_map_lock (shared by Profiler::lookupClass - // inserters, exclusive by _class_map.clear() in the dump - // path between unlockAll() and lock()). bounded_lookup - // with size_limit=0 never inserts (no malloc), but it - // still traverses row->next and reads row->keys, which - // clear() concurrently frees. Take the lock shared via - // try-lock; if an exclusive clear() is in progress, drop - // the synthetic frame rather than read freed memory. - auto guard = profiler->classMapTrySharedGuard(); - if (guard.ownsLock()) { - u32 class_id = profiler->classMap()->bounded_lookup( - symbol->body(), symbol->length(), 0); - if (class_id != INT_MAX) { - fillFrame(frames[depth++], BCI_ALLOC, class_id); - } + // bounded_lookup is a signal-safe read-only probe of active. + u32 class_id = profiler->classMap()->bounded_lookup( + symbol->body(), symbol->length()); + if (class_id != 0) { + fillFrame(frames[depth++], BCI_ALLOC, class_id); } } } diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 6f70ae54e..5eadea442 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -203,7 +203,7 @@ Java_com_datadoghq_profiler_JavaProfiler_recordTrace0( JniString endpoint_str(env, endpoint); u32 endpointLabel = Profiler::instance()->stringLabelMap()->bounded_lookup( endpoint_str.c_str(), endpoint_str.length(), sizeLimit); - bool acceptValue = endpointLabel != INT_MAX; + bool acceptValue = endpointLabel != 0; if (acceptValue) { u32 operationLabel = 0; if (operation != NULL) { @@ -595,7 +595,7 @@ Java_com_datadoghq_profiler_ThreadContext_registerConstant0(JNIEnv* env, jclass JniString value_str(env, value); u32 encoding = Profiler::instance()->contextValueMap()->bounded_lookup( value_str.c_str(), value_str.length(), 1 << 16); - return encoding == INT_MAX ? -1 : encoding; + return encoding == 0 ? -1 : (jint)encoding; } extern "C" DLLEXPORT void JNICALL diff --git a/ddprof-lib/src/main/cpp/libraryPatcher_linux.cpp b/ddprof-lib/src/main/cpp/libraryPatcher_linux.cpp index c1ca9709d..234fa85fd 100644 --- a/ddprof-lib/src/main/cpp/libraryPatcher_linux.cpp +++ b/ddprof-lib/src/main/cpp/libraryPatcher_linux.cpp @@ -104,6 +104,19 @@ static void cleanup_unregister(void*) { unregister_and_release(ProfiledThread::currentTid()); } +// pthread_cleanup_push declares `struct __ptcb` in the caller's frame. If that +// frame is start_routine_wrapper_spec, the structure sits inside the ~224-byte +// DEOPT-corruption zone and pthread_cleanup_pop(1) would invoke a clobbered +// function pointer. This noinline + no_stack_protector helper hoists the +// cleanup-handler frame out of the corruption zone — its own frame lives +// safely above start_routine_wrapper_spec's. +__attribute__((noinline, no_stack_protector)) +static void run_with_musl_cleanup(func_start_routine routine, void* params) { + pthread_cleanup_push(cleanup_unregister, nullptr); + routine(params); + pthread_cleanup_pop(1); +} + // Wrapper around the real start routine. // The wrapper: // 1. Register the newly created thread to profiler @@ -154,11 +167,9 @@ static void* start_routine_wrapper_spec(void* args) { delete_routine_info(thr); init_tls_and_register(); // cleanup_unregister fires on pthread_exit() or cancellation from within - // routine(params). pthread_cleanup_pop(1) executes and removes the handler - // on the normal return path, so unregister_and_release() is not called twice. - pthread_cleanup_push(cleanup_unregister, nullptr); - routine(params); - pthread_cleanup_pop(1); + // routine(params). The push/pop pair lives inside run_with_musl_cleanup so + // that `struct __ptcb` does not land in this frame's DEOPT-corruption zone. + run_with_musl_cleanup(routine, params); // pthread_exit instead of 'return': the saved LR in this frame is corrupted // by DEOPT PACKING; returning would jump to a garbage address. pthread_exit(nullptr); diff --git a/ddprof-lib/src/main/cpp/profiler.cpp b/ddprof-lib/src/main/cpp/profiler.cpp index cbdc4ad8e..ff1b90953 100644 --- a/ddprof-lib/src/main/cpp/profiler.cpp +++ b/ddprof-lib/src/main/cpp/profiler.cpp @@ -1141,12 +1141,16 @@ Error Profiler::start(Arguments &args, bool reset) { _total_samples = 0; memset(_failures, 0, sizeof(_failures)); - // Reset dictionaries and bitmaps - // Reset class map under lock because ObjectSampler may try to use it while - // it is being cleaned up - _class_map_lock.lock(); - _class_map.clear(); - _class_map_lock.unlock(); + // Reset dictionaries. lockAll() gates signal-handler event writes; the + // pre-drain here reduces the work inside each clearAll(). clearAll() itself + // sets _accepting=false and drains again to close the window between this + // drain and the first clear call, then re-enables lookups after the reset. + lockAll(); + RefCountGuard::waitForAllRefCountsToClear(); + _class_map.clearAll(); + _string_label_map.clearAll(); + _context_value_map.clearAll(); + unlockAll(); // Reset call trace storage if (!_omit_stacktraces) { @@ -1400,10 +1404,7 @@ Error Profiler::stop() { // correct counts in the recording _thread_info.reportCounters(); - // Acquire all spinlocks to avoid race with remaining signals - lockAll(); - _jfr.stop(); // JFR serialization must complete before unpatching libraries - unlockAll(); + rotateDictsAndRun([&]{ _jfr.stop(); }); // Unpatch libraries AFTER JFR serialization completes // Remote symbolication RemoteFrameInfo structs contain pointers to build-ID strings @@ -1451,22 +1452,6 @@ Error Profiler::check(Arguments &args) { return error; } -Error Profiler::flushJfr() { - MutexLocker ml(_state_lock); - if (_state != RUNNING) { - return Error("Profiler is not active"); - } - - updateJavaThreadNames(); - updateNativeThreadNames(); - - lockAll(); - _jfr.flush(); - unlockAll(); - - return Error::OK; -} - Error Profiler::dump(const char *path, const int length) { MutexLocker ml(_state_lock); if (_state != IDLE && _state != RUNNING) { @@ -1488,18 +1473,11 @@ Error Profiler::dump(const char *path, const int length) { Counters::set(CODECACHE_RUNTIME_STUBS_SIZE_BYTES, native_libs.memoryUsage()); - lockAll(); - Error err = _jfr.dump(path, length); - __atomic_add_fetch(&_epoch, 1, __ATOMIC_SEQ_CST); - - // Note: No need to clear call trace storage here - the double buffering system - // in processTraces() already handles clearing old traces while preserving - // traces referenced by surviving LivenessTracker objects - unlockAll(); - // Reset classmap - _class_map_lock.lock(); - _class_map.clear(); - _class_map_lock.unlock(); + Error err = Error::OK; + rotateDictsAndRun([&]{ + err = _jfr.dump(path, length); + __atomic_add_fetch(&_epoch, 1, __ATOMIC_SEQ_CST); + }); _thread_info.clearAll(thread_ids); _thread_info.reportCounters(); @@ -1630,13 +1608,8 @@ void Profiler::shutdown(Arguments &args) { } int Profiler::lookupClass(const char *key, size_t length) { - if (_class_map_lock.tryLockShared()) { - int ret = _class_map.lookup(key, length); - _class_map_lock.unlockShared(); - return ret; - } - // unable to lookup the class - return -1; + u32 id = _class_map.lookup(key, length); + return id != 0 ? static_cast(id) : -1; } int Profiler::status(char* status, int max_len) { diff --git a/ddprof-lib/src/main/cpp/profiler.h b/ddprof-lib/src/main/cpp/profiler.h index 703e7bc64..84aa9ed9e 100644 --- a/ddprof-lib/src/main/cpp/profiler.h +++ b/ddprof-lib/src/main/cpp/profiler.h @@ -13,9 +13,11 @@ #include "codeCache.h" #include "common.h" #include "dictionary.h" +#include "stringDictionary.h" #include "engine.h" #include "event.h" #include "flightRecorder.h" +#include "guards.h" #include "libraries.h" #include "log.h" #include "mutex.h" @@ -57,9 +59,8 @@ class VM; enum State { NEW, IDLE, RUNNING, TERMINATED }; -// Aligned to satisfy SpinLock member alignment requirement (64 bytes) -// Required because this class contains multiple SpinLock members: -// _class_map_lock and _locks[] +// Aligned to satisfy SpinLock member alignment requirement (64 bytes) +// Required because this class contains the _locks[] SpinLock array. class alignas(alignof(SpinLock)) Profiler { friend VM; @@ -79,9 +80,9 @@ class alignas(alignof(SpinLock)) Profiler { // -- ThreadInfo _thread_info; - Dictionary _class_map; - Dictionary _string_label_map; - Dictionary _context_value_map; + StringDictionary _class_map{1}; + StringDictionary _string_label_map{2}; + StringDictionary _context_value_map{3}; ThreadFilter _thread_filter; CallTraceStorage _call_trace_storage; FlightRecorder _jfr; @@ -100,7 +101,6 @@ class alignas(alignof(SpinLock)) Profiler { alignas(DEFAULT_CACHE_LINE_SIZE) volatile u64 _sample_seq; u64 _failures[ASGCT_FAILURE_TYPES]; - SpinLock _class_map_lock; SpinLock _locks[CONCURRENCY_LEVEL]; CallTraceBuffer *_calltrace_buffer[CONCURRENCY_LEVEL]; int _max_stack_depth; @@ -143,6 +143,27 @@ class alignas(alignof(SpinLock)) Profiler { void lockAll(); void unlockAll(); + // Rotate all three dictionaries under a SignalBlocker + lockAll() critical + // section, invoke jfr_op (stop/dump), then clear standby buffers. + // This protocol is required so writeCpool() reads a consistent standby + // snapshot: without the rotate, values inserted into active after the last + // rotate remain invisible to writeCpool() and produce dangling cpool refs. + // lockAll() across jfr_op prevents JNI writers from racing TraceRootEvents + // into the chunk after writeCpool() has already read the snapshot. + template + void rotateDictsAndRun(F jfr_op) { + SignalBlocker blocker; + lockAll(); + _class_map.rotate(); + _string_label_map.rotate(); + _context_value_map.rotate(); + jfr_op(); + unlockAll(); + _class_map.clearStandby(); + _string_label_map.clearStandby(); + _context_value_map.clearStandby(); + } + static int crashHandlerInternal(int signo, siginfo_t *siginfo, void *ucontext); static void check_JDK_8313796_workaround(); @@ -156,7 +177,7 @@ class alignas(alignof(SpinLock)) Profiler { _call_trace_storage(), _jfr(), _cpu_engine(NULL), _wall_engine(NULL), _alloc_engine(NULL), _event_mask(0), _start_time(0), _stop_time(0), _epoch(0), _timer_id(NULL), - _total_samples(0), _sample_seq(0), _failures(), _class_map_lock(), + _total_samples(0), _sample_seq(0), _failures(), _max_stack_depth(0), _features(), _safe_mode(0), _cstack(CSTACK_NO), _thread_events_state(JVMTI_DISABLE), _libs(Libraries::instance()), _num_context_attributes(0), _omit_stacktraces(false), @@ -204,11 +225,9 @@ class alignas(alignof(SpinLock)) Profiler { Engine *cpuEngine() { return _cpu_engine; } Engine *wallEngine() { return _wall_engine; } - Dictionary *classMap() { return &_class_map; } - SharedLockGuard classMapSharedGuard() { return SharedLockGuard(&_class_map_lock); } - BoundedOptionalSharedLockGuard classMapTrySharedGuard() { return BoundedOptionalSharedLockGuard(&_class_map_lock); } - Dictionary *stringLabelMap() { return &_string_label_map; } - Dictionary *contextValueMap() { return &_context_value_map; } + StringDictionary *classMap() { return &_class_map; } + StringDictionary *stringLabelMap() { return &_string_label_map; } + StringDictionary *contextValueMap() { return &_context_value_map; } u32 numContextAttributes() { return _num_context_attributes; } ThreadFilter *threadFilter() { return &_thread_filter; } @@ -240,7 +259,6 @@ class alignas(alignof(SpinLock)) Profiler { Error check(Arguments &args); Error start(Arguments &args, bool reset); Error stop(); - Error flushJfr(); Error dump(const char *path, const int length); void logStats(); void switchThreadEvents(jvmtiEventMode mode); diff --git a/ddprof-lib/src/main/cpp/refCountGuard.cpp b/ddprof-lib/src/main/cpp/refCountGuard.cpp new file mode 100644 index 000000000..e1cbcf636 --- /dev/null +++ b/ddprof-lib/src/main/cpp/refCountGuard.cpp @@ -0,0 +1,194 @@ +/* + * Copyright The async-profiler authors + * Copyright 2025, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "refCountGuard.h" +#include "arch.h" +#include "log.h" +#include "os.h" +#include "primeProbing.h" +#include "thread.h" +#include + +// Static member definitions +RefCountSlot RefCountGuard::refcount_slots[RefCountGuard::MAX_THREADS]; +int RefCountGuard::slot_owners[RefCountGuard::MAX_THREADS]; + +int RefCountGuard::getThreadRefCountSlot() { + ProfiledThread* thrd = ProfiledThread::currentSignalSafe(); + int tid = thrd != nullptr ? thrd->tid() : OS::threadId(); + + HashProbe probe(static_cast(tid), MAX_THREADS); + + int slot = probe.slot(); + for (int i = 0; i < MAX_PROBE_DISTANCE; i++) { + int expected = 0; + if (__atomic_compare_exchange_n(&slot_owners[slot], &expected, tid, false, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) { + return slot; + } + + if (__atomic_load_n(&slot_owners[slot], __ATOMIC_ACQUIRE) == tid) { + // Only treat as reentrant if the outer guard is still active. + // When count==0 the outer guard has already decremented and is + // just clearing slot_owners; creating a "reentrant" guard on a + // dying slot would publish active_ptr while the outer destructor + // is about to overwrite it, causing waitForRefCountToClear to + // miss the new resource. + if (__atomic_load_n(&refcount_slots[slot].count, __ATOMIC_ACQUIRE) > 0) { + return slot + MAX_THREADS; + } + // Fall through: probe for a fresh slot instead. + } + + if (probe.hasNext()) { + slot = probe.next(); + } + } + + return -1; +} + +RefCountGuard::RefCountGuard(void* resource) : _active(true), _is_reentrant(false), _set_outer_ptr(false), _my_slot(-1), _saved_ptr(nullptr) { + int raw = getThreadRefCountSlot(); + + if (raw == -1) { + _active = false; + return; + } + + _is_reentrant = (raw >= MAX_THREADS); + _my_slot = _is_reentrant ? (raw - MAX_THREADS) : raw; + + if (_is_reentrant) { + _saved_ptr = __atomic_load_n(&refcount_slots[_my_slot].active_ptr, __ATOMIC_ACQUIRE); + // Reentrant: increment count first so the scanner always sees the outer + // resource while active_ptr is being updated. + __atomic_fetch_add(&refcount_slots[_my_slot].count, 1, __ATOMIC_RELEASE); + // Only write outer_ptr on the FIRST level of reentrancy (when it is still + // null). If a second signal nests inside the first, outer_ptr already + // holds the outermost resource; overwriting it would lose that pointer and + // let waitForRefCountToClear miss it while all three guards are live. + void* existing_outer = __atomic_load_n(&refcount_slots[_my_slot].outer_ptr, __ATOMIC_ACQUIRE); + _set_outer_ptr = (existing_outer == nullptr); + if (_set_outer_ptr) { + __atomic_store_n(&refcount_slots[_my_slot].outer_ptr, _saved_ptr, __ATOMIC_RELEASE); + } + __atomic_store_n(&refcount_slots[_my_slot].active_ptr, resource, __ATOMIC_RELEASE); + } else { + // Non-reentrant (count was 0): store pointer first so the scanner skips + // this slot during the activation window (count=0 → treated as inactive). + __atomic_store_n(&refcount_slots[_my_slot].active_ptr, resource, __ATOMIC_RELEASE); + __atomic_fetch_add(&refcount_slots[_my_slot].count, 1, __ATOMIC_RELEASE); + } +} + +RefCountGuard::~RefCountGuard() { + if (_active && _my_slot >= 0) { + if (_is_reentrant) { + // Restore outer active_ptr first, then (if we set outer_ptr) clear it, + // then decrement count; scanner always observes the outer resource while + // count > 0. We only clear outer_ptr when we were the one who set it + // (first reentrancy level); deeper levels must leave it pointing at the + // outermost resource so the scanner can still find it. + __atomic_store_n(&refcount_slots[_my_slot].active_ptr, _saved_ptr, __ATOMIC_RELEASE); + if (_set_outer_ptr) { + __atomic_store_n(&refcount_slots[_my_slot].outer_ptr, nullptr, __ATOMIC_RELEASE); + } + __atomic_fetch_sub(&refcount_slots[_my_slot].count, 1, __ATOMIC_RELEASE); + } else { + __atomic_fetch_sub(&refcount_slots[_my_slot].count, 1, __ATOMIC_RELEASE); + __atomic_store_n(&refcount_slots[_my_slot].active_ptr, nullptr, __ATOMIC_RELEASE); + __atomic_store_n(&slot_owners[_my_slot], 0, __ATOMIC_RELEASE); + } + } +} + +RefCountGuard::RefCountGuard(RefCountGuard&& other) noexcept + : _active(other._active), _is_reentrant(other._is_reentrant), + _set_outer_ptr(other._set_outer_ptr), + _my_slot(other._my_slot), _saved_ptr(other._saved_ptr) { + other._active = false; +} + +RefCountGuard& RefCountGuard::operator=(RefCountGuard&& other) noexcept { + if (this != &other) { + if (_active && _my_slot >= 0) { + if (_is_reentrant) { + __atomic_store_n(&refcount_slots[_my_slot].active_ptr, _saved_ptr, __ATOMIC_RELEASE); + if (_set_outer_ptr) { + __atomic_store_n(&refcount_slots[_my_slot].outer_ptr, nullptr, __ATOMIC_RELEASE); + } + __atomic_fetch_sub(&refcount_slots[_my_slot].count, 1, __ATOMIC_RELEASE); + } else { + __atomic_fetch_sub(&refcount_slots[_my_slot].count, 1, __ATOMIC_RELEASE); + __atomic_store_n(&refcount_slots[_my_slot].active_ptr, nullptr, __ATOMIC_RELEASE); + __atomic_store_n(&slot_owners[_my_slot], 0, __ATOMIC_RELEASE); + } + } + _active = other._active; + _is_reentrant = other._is_reentrant; + _set_outer_ptr = other._set_outer_ptr; + _my_slot = other._my_slot; + _saved_ptr = other._saved_ptr; + other._active = false; + } + return *this; +} + +void RefCountGuard::waitForRefCountToClear(void* table_to_delete) { + const int SPIN_ITERATIONS = 100; + for (int spin = 0; spin < SPIN_ITERATIONS; ++spin) { + bool all_clear = true; + for (int i = 0; i < MAX_THREADS; ++i) { + uint32_t count = __atomic_load_n(&refcount_slots[i].count, __ATOMIC_ACQUIRE); + if (count == 0) continue; + void* table = __atomic_load_n(&refcount_slots[i].active_ptr, __ATOMIC_ACQUIRE); + void* outer = __atomic_load_n(&refcount_slots[i].outer_ptr, __ATOMIC_ACQUIRE); + if (table == table_to_delete || outer == table_to_delete) { all_clear = false; break; } + } + if (all_clear) return; + spinPause(); + } + + const int MAX_WAIT_ITERATIONS = 5000; + struct timespec sleep_time = {0, 100000}; + for (int wait_count = 0; wait_count < MAX_WAIT_ITERATIONS; ++wait_count) { + bool all_clear = true; + for (int i = 0; i < MAX_THREADS; ++i) { + uint32_t count = __atomic_load_n(&refcount_slots[i].count, __ATOMIC_ACQUIRE); + if (count == 0) continue; + void* table = __atomic_load_n(&refcount_slots[i].active_ptr, __ATOMIC_ACQUIRE); + void* outer = __atomic_load_n(&refcount_slots[i].outer_ptr, __ATOMIC_ACQUIRE); + if (table == table_to_delete || outer == table_to_delete) { all_clear = false; break; } + } + if (all_clear) return; + nanosleep(&sleep_time, nullptr); + } + + Log::warn("waitForRefCountToClear: timeout waiting for %p, proceeding", table_to_delete); +} + +void RefCountGuard::waitForAllRefCountsToClear() { + const int SPIN_ITERATIONS = 100; + for (int spin = 0; spin < SPIN_ITERATIONS; ++spin) { + bool any = false; + for (int i = 0; i < MAX_THREADS; ++i) { + if (__atomic_load_n(&refcount_slots[i].count, __ATOMIC_ACQUIRE) > 0) { any = true; break; } + } + if (!any) return; + spinPause(); + } + + const int MAX_WAIT_ITERATIONS = 5000; + struct timespec sleep_time = {0, 100000}; + for (int wait_count = 0; wait_count < MAX_WAIT_ITERATIONS; ++wait_count) { + bool any = false; + for (int i = 0; i < MAX_THREADS; ++i) { + if (__atomic_load_n(&refcount_slots[i].count, __ATOMIC_ACQUIRE) > 0) { any = true; break; } + } + if (!any) return; + nanosleep(&sleep_time, nullptr); + } +} diff --git a/ddprof-lib/src/main/cpp/refCountGuard.h b/ddprof-lib/src/main/cpp/refCountGuard.h new file mode 100644 index 000000000..7c0b0fed4 --- /dev/null +++ b/ddprof-lib/src/main/cpp/refCountGuard.h @@ -0,0 +1,115 @@ +/* + * Copyright The async-profiler authors + * Copyright 2025, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _REFCOUNTGUARD_H +#define _REFCOUNTGUARD_H + +#include "arch.h" +#include + +/** + * Cache-aligned reference counting slot for thread-local reference counting. + * Each slot occupies a full cache line (64 bytes) to eliminate false sharing. + * + * ACTIVATION PROTOCOL (pointer-first): + * - Constructor: store active_ptr first, then increment count + * - Destructor: decrement count first, then clear active_ptr + * - Scanner: check count; if 0, skip slot (treats it as inactive) + * + * There is a brief activation window between the store of active_ptr and the + * increment of count where the scanner sees count=0 and may skip the slot. + * For signal handlers this window is never observed in practice: handlers + * complete within microseconds while a buffer can only be cleared after TWO + * full dump cycles (typically 60+ seconds). If the window were hit, the caller + * observes a miss (e.g. 0 or equivalent sentinel) and handles it gracefully + * — a dropped trace or generic vtable frame, not a crash. + * + * REENTRANT NESTING: when a signal fires inside an outer guard (same thread), + * outer_ptr preserves the outer resource so waitForRefCountToClear() can see + * it even while active_ptr is updated to the inner resource. + * Ordering: outer_ptr must be stored AFTER count++ but BEFORE active_ptr is + * overwritten; cleared AFTER active_ptr is restored but BEFORE count--. + */ +struct alignas(DEFAULT_CACHE_LINE_SIZE) RefCountSlot { + volatile uint32_t count; // Reference count (0 = inactive) + alignas(alignof(void*)) void* active_ptr; // Which resource is being referenced + void* outer_ptr; // Outer guard's resource when reentrant (else null) + // Remaining padding: accounts for the alignment gap between count and active_ptr. + // Formula: total - alignof(void*) - 2*sizeof(void*) on 64-bit (count=4+4gap, active=8, outer=8). + char padding[DEFAULT_CACHE_LINE_SIZE - alignof(void*) - 2 * sizeof(void*)]; + + RefCountSlot() : count(0), active_ptr(nullptr), outer_ptr(nullptr), padding{} { + static_assert(sizeof(RefCountSlot) == DEFAULT_CACHE_LINE_SIZE, + "RefCountSlot must be exactly one cache line"); + } +}; + +/** + * RAII guard for thread-local reference counting. + * + * Provides lock-free memory reclamation for any heap-allocated resource that + * may be accessed from signal handlers concurrently with deallocation. + * Uses the pointer-first protocol to avoid race conditions. + * + * Performance: ~44-94 cycles hot-path; thread-local cache line, zero contention. + * + * Correctness: + * - Pointer stored BEFORE count increment (activation) + * - Count decremented BEFORE pointer cleared (deactivation) + * - Scanner checks count first, ensuring consistent view + * + * Reentrancy: + * - A signal handler may create a RefCountGuard while a JNI thread already + * holds one on the same slot (same tid). getThreadRefCountSlot() returns + * slot + MAX_THREADS to signal this case. The inner guard saves and restores + * the outer guard's active_ptr instead of clearing it, so the scanner never + * sees a null pointer for an active outer guard. + * - Ordering invariants differ for the reentrant case: + * Constructor: count incremented BEFORE overwriting active_ptr (outer resource + * stays visible to the scanner until the new pointer is installed). + * Destructor: active_ptr restored to saved outer pointer BEFORE decrementing + * count (scanner always sees outer resource while count is still elevated). + */ +class RefCountGuard { +public: + static constexpr int MAX_THREADS = 8192; + static constexpr int MAX_PROBE_DISTANCE = 32; + + static RefCountSlot refcount_slots[MAX_THREADS]; + static int slot_owners[MAX_THREADS]; + +private: + bool _active; + bool _is_reentrant; + bool _set_outer_ptr; // true if this guard wrote outer_ptr (first reentrancy level only) + int _my_slot; + void* _saved_ptr; + + // Returns slot index in [0, MAX_THREADS) on fresh claim. + // Returns slot + MAX_THREADS when the calling thread already owns that slot + // (reentrant signal delivery); the caller must save/restore active_ptr. + static int getThreadRefCountSlot(); + +public: + explicit RefCountGuard(void* resource); + ~RefCountGuard(); + + RefCountGuard(const RefCountGuard&) = delete; + RefCountGuard& operator=(const RefCountGuard&) = delete; + + RefCountGuard(RefCountGuard&& other) noexcept; + RefCountGuard& operator=(RefCountGuard&& other) noexcept; + + bool isActive() const { return _active; } + + // Wait for all in-flight guards protecting ptr_to_delete to be released. + static void waitForRefCountToClear(void* ptr_to_delete); + + // Wait for ALL reference counts to clear. + static void waitForAllRefCountsToClear(); +}; + +#endif // _REFCOUNTGUARD_H diff --git a/ddprof-lib/src/main/cpp/spinLock.h b/ddprof-lib/src/main/cpp/spinLock.h index 6314bee14..f1e825e30 100644 --- a/ddprof-lib/src/main/cpp/spinLock.h +++ b/ddprof-lib/src/main/cpp/spinLock.h @@ -60,21 +60,6 @@ class alignas(DEFAULT_CACHE_LINE_SIZE) SpinLock { return false; } - // Signal-safe variant: returns false after at most 5 CAS attempts. - // Use only in signal-handler paths where spinning indefinitely is unsafe. - bool tryLockSharedBounded() { - for (int attempts = 0; attempts < 5; ++attempts) { - int value = __atomic_load_n(&_lock, __ATOMIC_ACQUIRE); - if (value > 0) { - return false; - } - if (__sync_bool_compare_and_swap(&_lock, value, value - 1)) { - return true; - } - } - return false; - } - void lockShared() { int value; while ((value = __atomic_load_n(&_lock, __ATOMIC_ACQUIRE)) > 0 || @@ -127,33 +112,6 @@ class OptionalSharedLockGuard { OptionalSharedLockGuard& operator=(OptionalSharedLockGuard&&) = delete; }; -// Signal-safe variant of OptionalSharedLockGuard: uses bounded CAS retries -// and may fail spuriously when racing other shared lockers. Use ONLY in -// signal-handler paths where spinning indefinitely is unsafe; do NOT use -// in hot recording paths where silent acquisition failures would drop events. -class BoundedOptionalSharedLockGuard { - SpinLock* _lock; -public: - explicit BoundedOptionalSharedLockGuard(SpinLock* lock) : _lock(lock) { - if (!_lock->tryLockSharedBounded()) { - // Locking failed (bounded retries exhausted or exclusive lock held); no unlock needed. - _lock = nullptr; - } - } - ~BoundedOptionalSharedLockGuard() { - if (_lock != nullptr) { - _lock->unlockShared(); - } - } - bool ownsLock() { return _lock != nullptr; } - - // Non-copyable and non-movable - BoundedOptionalSharedLockGuard(const BoundedOptionalSharedLockGuard&) = delete; - BoundedOptionalSharedLockGuard& operator=(const BoundedOptionalSharedLockGuard&) = delete; - BoundedOptionalSharedLockGuard(BoundedOptionalSharedLockGuard&&) = delete; - BoundedOptionalSharedLockGuard& operator=(BoundedOptionalSharedLockGuard&&) = delete; -}; - class ExclusiveLockGuard { private: SpinLock* _lock; diff --git a/ddprof-lib/src/main/cpp/stringDictionary.h b/ddprof-lib/src/main/cpp/stringDictionary.h new file mode 100644 index 000000000..513617674 --- /dev/null +++ b/ddprof-lib/src/main/cpp/stringDictionary.h @@ -0,0 +1,400 @@ +/* + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _STRINGDICTIONARY_H +#define _STRINGDICTIONARY_H + +#include "counters.h" +#include "refCountGuard.h" +#include "tripleBuffer.h" +#include "arch.h" +#include +#include +#include +#include +#include + +// Reuse the same table geometry as Dictionary for cache-friendly layout. +#ifndef ROW_BITS +#define ROW_BITS 7 +#define ROWS (1 << ROW_BITS) +#define CELLS 3 +#endif + +// ─── Internal storage types ──────────────────────────────────────────────── + +struct SBTable; + +struct SBRow { + char* keys[CELLS]; // null = empty; CAS-claimed by the inserting thread + u32 ids[CELLS]; // set AFTER winning the key CAS (release); 0 until published + SBTable* next; // overflow chain; CAS-created on overflow +}; + +struct SBTable { + SBRow rows[ROWS]; +}; + +// ─── StringDictionaryBuffer ──────────────────────────────────────────────── +// +// Open-addressing concurrent hash table mapping string keys to u32 IDs. +// +// Concurrency model: +// - Inserts (insert_with_id, copyFrom): use __sync_bool_compare_and_swap on +// keys[c] (full fence on GCC) to claim a slot. id and keylen are stored +// AFTER the CAS wins (release); a reader that observes keys[c] != null via +// an acquire load must also check ids[c] != 0 before using it. On x86 the +// CAS already implies a full memory barrier; on ARM the acquire/release +// pairing provides the required ordering. id remains 0 until published. +// - Reads (lookup): acquire-load keys[c]; if non-null and key matches, return +// the id — only if ids[c] != 0 (not yet published = miss). Null means +// "slot unclaimed" — treated as miss. There is a narrow window where a +// inserter has started but not yet committed the CAS; this is safe (returns +// miss = 0), identical to the existing Dictionary behaviour. +// - clear(): called only when no concurrent readers/writers are active. +// +// Not signal-safe for insert_with_id / copyFrom (call malloc). +// Signal-safe for lookup (read-only, no allocation). +class StringDictionaryBuffer { +private: + SBTable* _table; + std::atomic _size{0}; + + static unsigned int hash(const char* key, size_t length) { + unsigned int h = 2166136261U; + for (size_t i = 0; i < length; i++) h = (h ^ (unsigned char)key[i]) * 16777619; + return h; + } + + static bool keyEquals(const char* candidate, const char* key, size_t length) { + return strncmp(candidate, key, length) == 0 && candidate[length] == '\0'; + } + + // Iterative DFS walk of the overflow tree. Each frame tracks one table and + // the next row to visit. Because we descend into row->next immediately and + // only pop the frame after every row has been processed, the stack holds at + // most one frame per overflow-chain level. The hash rotation has period + // 32 / gcd(32, ROW_BITS) = 32: after 32 levels the row index cycles back to + // the original. In practice, reaching level N requires N*CELLS keys with + // identical 32-bit FNV hashes (gcd(7,32)=1 means any two keys that share + // the same row index across all 32 rotation steps must hash identically). + // A chain longer than 33 levels therefore requires 99+ exact FNV collisions + // — statistically negligible for real-world string inputs. stk[34] covers + // this practical bound; the top < 34 guard is safety-overflow protection. + // If it ever fires, entries beyond level 33 are silently dropped rather than + // crashing — acceptable given how the bound is reached only by adversarial + // or degenerate inputs. + static void freeTable(SBTable* table) { + struct Frame { SBTable* t; int row; }; + Frame stk[34]; + int top = 0; + stk[top++] = {table, 0}; + while (top > 0) { + Frame& f = stk[top - 1]; + if (f.row >= ROWS) { + if (f.t != table) free(f.t); + --top; + continue; + } + SBRow* row = &f.t->rows[f.row++]; + for (int j = 0; j < CELLS; j++) { + if (row->keys[j]) free(row->keys[j]); + } + if (row->next && top < 34) stk[top++] = {row->next, 0}; + } + } + + static void collectTable(const SBTable* table, + std::map& out) { + struct Frame { const SBTable* t; int row; }; + Frame stk[34]; + int top = 0; + stk[top++] = {table, 0}; + while (top > 0) { + Frame& f = stk[top - 1]; + if (f.row >= ROWS) { --top; continue; } + const SBRow* row = &f.t->rows[f.row++]; + for (int j = 0; j < CELLS; j++) { + const char* k = __atomic_load_n(&row->keys[j], __ATOMIC_ACQUIRE); + if (k) { + u32 eid = __atomic_load_n(&row->ids[j], __ATOMIC_ACQUIRE); + if (eid != 0) out[eid] = k; + } + } + const SBTable* next = __atomic_load_n(&row->next, __ATOMIC_ACQUIRE); + if (next && top < 34) stk[top++] = {next, 0}; + } + } + +public: + StringDictionaryBuffer() { + _table = (SBTable*)calloc(1, sizeof(SBTable)); + } + + ~StringDictionaryBuffer() { + if (_table != nullptr) { freeTable(_table); } + free(_table); + _table = nullptr; + } + + // Signal-safe read-only probe. Returns 0 on miss. + u32 lookup(const char* key, size_t len) const { + const SBTable* table = _table; + unsigned int h = hash(key, len); + while (table) { + const SBRow* row = &table->rows[h % ROWS]; + for (int c = 0; c < CELLS; c++) { + const char* k = __atomic_load_n(&row->keys[c], __ATOMIC_ACQUIRE); + if (!k) return 0; + if (keyEquals(k, key, len)) { + u32 id = __atomic_load_n(&row->ids[c], __ATOMIC_ACQUIRE); + return id; + } + } + table = __atomic_load_n(&row->next, __ATOMIC_ACQUIRE); + h = (h >> ROW_BITS) | (h << (32 - ROW_BITS)); + } + return 0; + } + + // Insert with the given id. Returns the id stored for this key + // (either the given id on a new slot, or the existing id on a duplicate). + // NOT signal-safe (calls malloc). + u32 insert_with_id(const char* key, size_t len, u32 id) { + SBTable* table = _table; + unsigned int h = hash(key, len); + while (true) { + SBRow* row = &table->rows[h % ROWS]; + for (int c = 0; c < CELLS; c++) { + char* existing = __atomic_load_n(&row->keys[c], __ATOMIC_ACQUIRE); + if (!existing) { + char* new_key = (char*)malloc(len + 1); + if (!new_key) return 0; + memcpy(new_key, key, len); + new_key[len] = '\0'; + if (__sync_bool_compare_and_swap(&row->keys[c], nullptr, new_key)) { + __atomic_store_n(&row->ids[c], id, __ATOMIC_RELEASE); + _size.fetch_add(1, std::memory_order_relaxed); + return id; + } + free(new_key); + existing = __atomic_load_n(&row->keys[c], __ATOMIC_ACQUIRE); + } + if (existing && keyEquals(existing, key, len)) { + u32 stored_id; + while ((stored_id = __atomic_load_n(&row->ids[c], __ATOMIC_ACQUIRE)) == 0) { spinPause(); } + return stored_id; + } + } + if (!row->next) { + SBTable* nt = (SBTable*)calloc(1, sizeof(SBTable)); + if (nt == nullptr) return 0; + if (!__sync_bool_compare_and_swap(&row->next, nullptr, nt)) free(nt); + } + table = __atomic_load_n(&row->next, __ATOMIC_ACQUIRE); + h = (h >> ROW_BITS) | (h << (32 - ROW_BITS)); + } + } + + // Copy all entries from src into this buffer preserving their ids. + // NOT signal-safe. + void copyFrom(const StringDictionaryBuffer& src) { + std::map entries; + src.collect(entries); + for (auto& kv : entries) { + insert_with_id(kv.second, strlen(kv.second), kv.first); + } + } + + // Populate out with {id -> key} for all entries in this buffer. + void collect(std::map& out) const { + collectTable(_table, out); + } + + // Free all keys and reset to empty. Call only with no concurrent accessors. + void clear() { + if (_table == nullptr) { _size.store(0, std::memory_order_relaxed); return; } + freeTable(_table); + memset(_table, 0, sizeof(SBTable)); + _size.store(0, std::memory_order_relaxed); + } + + int size() const { return _size.load(std::memory_order_relaxed); } +}; + +// ─── StringDictionary ───────────────────────────────────────────────────── +// +// Triple-buffered wrapper around StringDictionaryBuffer. +// +// Roles cycle through three buffers: +// active — receives new writes (lookup, insert_with_id) +// dump — stable snapshot for the current JFR chunk (after rotate()) +// scratch — two rotations behind; cleared by clearStandby() +// +// _next_id is a global monotonic counter that never resets until clearAll(). +// rotate() does a two-phase ID-preserving copy so no entry is lost due to +// concurrent inserts in the rotation window: +// phase 1: copy active → clearTarget (before rotate) +// phase 2: copy old_active → new_active (after drain, catch late inserts) +// lookupDuringDump(key): probes dump then active; inserts into both if new, +// using the existing global ID (or a new fetch-add if truly new everywhere). +// +// Signal safety: +// bounded_lookup acquires RefCountGuard on active before reading. +// lookup also acquires RefCountGuard before inserting (not signal-safe due to +// malloc, but the guard protects the buffer pointer lifetime). +// lookupDuringDump is NOT signal-safe; call from dump thread only. +class StringDictionary { + std::atomic _next_id{1}; // starts at 1; id=0 reserved as "no entry" + std::atomic _accepting{true}; // false while clearAll() is resetting buffers + StringDictionaryBuffer _a, _b, _c; + TripleBufferRotator _rot; + int _counter_offset; // offset into DICTIONARY_KEYS / DICTIONARY_KEYS_BYTES counter rows + + u32 nextId() { + return _next_id.fetch_add(1, std::memory_order_relaxed); + } + + void countInsert(size_t len) { + Counters::increment(DICTIONARY_KEYS, 1, _counter_offset); + Counters::increment(DICTIONARY_KEYS_BYTES, (long long)(len + 1), _counter_offset); + } + +public: + explicit StringDictionary(int counter_offset = 0) + : _rot(&_a, &_b, &_c), _counter_offset(counter_offset) {} + + // Insert into active buffer; returns globally stable id. NOT signal-safe. + u32 lookup(const char* key, size_t len) { + // Bail immediately if clearAll() is resetting the buffers; creating a + // RefCountGuard here could race with freeTable() inside clear(). + if (!_accepting.load(std::memory_order_acquire)) return 0; + while (true) { + StringDictionaryBuffer* active = _rot.active(); + RefCountGuard guard(active); + if (!guard.isActive()) return 0; + if (_rot.active() != active) continue; + u32 id = active->lookup(key, len); + if (id != 0) return id; + // nextId() may be consumed without assignment if a concurrent insert wins + // the CAS for the same key; IDs are unique but not guaranteed to be dense. + u32 new_id = nextId(); + u32 result = active->insert_with_id(key, len, new_id); + if (result == new_id) countInsert(len); + return result; + } + } + + // Insert into active buffer if size < size_limit; returns 0 when at cap. + // NOT signal-safe. + u32 bounded_lookup(const char* key, size_t len, int size_limit) { + // Bail immediately if clearAll() is resetting the buffers; creating a + // RefCountGuard here could race with freeTable() inside clear(). + if (!_accepting.load(std::memory_order_acquire)) return 0; + while (true) { + StringDictionaryBuffer* active = _rot.active(); + RefCountGuard guard(active); + if (!guard.isActive()) return 0; + if (_rot.active() != active) continue; + u32 id = active->lookup(key, len); + if (id != 0) return id; + if (active->size() >= size_limit) return 0; + u32 new_id = nextId(); + u32 result = active->insert_with_id(key, len, new_id); + if (result == new_id) countInsert(len); + return result; + } + } + + // Signal-safe read-only probe of active. Returns 0 on miss. + u32 bounded_lookup(const char* key, size_t len) { + if (!_accepting.load(std::memory_order_acquire)) return 0; + while (true) { + StringDictionaryBuffer* active = _rot.active(); + RefCountGuard guard(active); + if (!guard.isActive()) return 0; + if (_rot.active() != active) continue; + return active->lookup(key, len); + } + } + + // Returns the dump buffer (snapshot of old active after rotate()). + StringDictionaryBuffer* standby() { + return _rot.dumpBuffer(); + } + + // Two-phase ID-preserving rotate. + void rotate() { + StringDictionaryBuffer* old_active = _rot.active(); + // Phase 1: pre-populate clearTarget from active (before rotate). + _rot.clearTarget()->copyFrom(*old_active); + _rot.rotate(); + // Drain all in-flight accessors on old_active (now the dump buffer). + RefCountGuard::waitForRefCountToClear(old_active); + // Phase 2: copy old_active → new active to catch late inserts. + _rot.active()->copyFrom(*old_active); + } + + // Resolve a key during the dump phase. Safe to call from the dump thread + // after rotate(); must NOT be called from signal handlers or concurrently + // with another lookupDuringDump call. + u32 lookupDuringDump(const char* key, size_t len) { + StringDictionaryBuffer* dump = _rot.dumpBuffer(); + + u32 id = dump->lookup(key, len); + if (id != 0) return id; + + { + StringDictionaryBuffer* active = _rot.active(); + RefCountGuard guard(active); + if (!guard.isActive()) return 0; + id = active->lookup(key, len); + } + if (id != 0) { + dump->insert_with_id(key, len, id); + return id; + } + + { + StringDictionaryBuffer* active = _rot.active(); + RefCountGuard guard(active); + if (!guard.isActive()) return 0; + u32 new_id = nextId(); + new_id = active->insert_with_id(key, len, new_id); + if (new_id != 0) dump->insert_with_id(key, len, new_id); + return new_id; + } + } + + // Clear the scratch buffer (two rotations behind active; safe to clear). + // Resets per-dump counters to 0 so they track only post-clearStandby inserts. + void clearStandby() { + _rot.clearTarget()->clear(); + Counters::set(DICTIONARY_KEYS, 0, _counter_offset); + Counters::set(DICTIONARY_KEYS_BYTES, 0, _counter_offset); + } + + // Reset all three buffers and restart the ID counter. + // Sets _accepting=false so that bounded_lookup callers that bypass lockAll() + // (JNI paths) do not create new RefCountGuards after the caller's initial + // waitForAllRefCountsToClear() drains. Drains again internally to catch any + // guard that started between the caller's drain and this flag flip, then + // clears, then re-enables lookups. + void clearAll() { + _accepting.store(false, std::memory_order_seq_cst); + // Drain guards that were already past the _accepting check when the flag + // was set. After this returns, no new guards will be created (they all + // see _accepting=false) so the clear below is safe. + RefCountGuard::waitForAllRefCountsToClear(); + _a.clear(); _b.clear(); _c.clear(); + _rot.reset(); + _next_id.store(1, std::memory_order_relaxed); + Counters::set(DICTIONARY_KEYS, 0, _counter_offset); + Counters::set(DICTIONARY_KEYS_BYTES, 0, _counter_offset); + _accepting.store(true, std::memory_order_release); + } +}; + +#endif // _STRINGDICTIONARY_H diff --git a/ddprof-lib/src/main/cpp/tripleBuffer.h b/ddprof-lib/src/main/cpp/tripleBuffer.h new file mode 100644 index 000000000..422ee2300 --- /dev/null +++ b/ddprof-lib/src/main/cpp/tripleBuffer.h @@ -0,0 +1,82 @@ +/* + * Copyright 2025, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _TRIPLE_BUFFER_H +#define _TRIPLE_BUFFER_H + +/** + * Generic triple-buffer rotation manager. + * + * Manages three externally-owned objects that cycle through three roles: + * + * active — receives new writes (signal handlers, fill-path) + * dump — snapshot being read by the current dump (old active after rotate) + * scratch — two rotations behind active; ready to be cleared. At least one + * full dump cycle has elapsed since this buffer was last in the + * "dump" role, which gives any writer that loaded a stale active + * pointer time to complete its lookup before the buffer is freed. + * + * Lifecycle per dump cycle: + * rotate() — advance active index; dump thread reads dumpBuffer() + * ...dump runs, populating dumpBuffer() with fill-path data... + * ...caller clears clearTarget() (the scratch buffer)... + * + * Memory: at most two non-empty buffers at any time (active + dump). + * + * Thread safety: + * _active_index is accessed via __atomic_* with acquire/release ordering. + * Writers may briefly operate on a buffer that has just transitioned to + * "dump" or "scratch" (TOCTOU at rotate); this is safe because Dictionary + * (and similar) operations are lock-free internally and the scratch buffer + * is not cleared until one full dump cycle later. + */ +template +class TripleBufferRotator { + T* const _buf[3]; + volatile int _active_index; // 0/1/2; cycles on rotate() + +public: + // a/b/c must remain valid for the lifetime of this rotator. + TripleBufferRotator(T* a, T* b, T* c) + : _buf{a, b, c}, _active_index(0) {} + + T* active() const { + return _buf[__atomic_load_n(&_active_index, __ATOMIC_ACQUIRE)]; + } + + // Buffer the dump thread reads from: (active_index + 2) % 3 after rotate(). + T* dumpBuffer() const { + return _buf[(__atomic_load_n(&_active_index, __ATOMIC_ACQUIRE) + 2) % 3]; + } + + // Buffer scheduled for the next clear: (active_index + 1) % 3. + // At least one full dump cycle has elapsed since this buffer was the + // "dump" role, so any stale writer pointer to it is no longer in use. + T* clearTarget() const { + return _buf[(__atomic_load_n(&_active_index, __ATOMIC_ACQUIRE) + 1) % 3]; + } + + // Advance _active_index by 1 mod 3. + // After this call the old active is accessible via dumpBuffer(). + // Uses a CAS loop so concurrent callers (e.g. stop() racing dump()) each + // advance by exactly one step without silently aliasing the same index. + void rotate() { + int old = __atomic_load_n(&_active_index, __ATOMIC_ACQUIRE); + int next = (old + 1) % 3; + while (!__atomic_compare_exchange_n(&_active_index, &old, next, + /*weak=*/false, + __ATOMIC_ACQ_REL, + __ATOMIC_ACQUIRE)) { + next = (old + 1) % 3; + } + } + + // Reset to initial state (no concurrent writers/readers). + void reset() { + __atomic_store_n(&_active_index, 0, __ATOMIC_RELEASE); + } +}; + +#endif // _TRIPLE_BUFFER_H diff --git a/ddprof-lib/src/test/cpp/dictionary_concurrent_ut.cpp b/ddprof-lib/src/test/cpp/dictionary_concurrent_ut.cpp deleted file mode 100644 index ad2ccb17e..000000000 --- a/ddprof-lib/src/test/cpp/dictionary_concurrent_ut.cpp +++ /dev/null @@ -1,322 +0,0 @@ -/* - * Copyright 2026, Datadog, Inc. - * SPDX-License-Identifier: Apache-2.0 - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "dictionary.h" -#include "spinLock.h" -#include "../../main/cpp/gtest_crash_handler.h" - -// Regression tests for the dictionary concurrency contracts. -// -// These tests pin down two contracts: -// (1) bounded_lookup(key, length, 0) is read-only (no malloc/calloc) and -// returns INT_MAX on miss. This is the contract -// HotspotSupport::walkVM relies on in its vtable-target lookup block to -// be async-signal-safe. -// (2) Dictionary::collect, when externally guarded by a SpinLock taken -// shared, can run concurrently with shared-lock inserters and is -// serialized against an exclusive-lock Dictionary::clear() — matching -// the protocol Recording::writeClasses now uses around _class_map_lock. -// -// Test name for crash handler -static constexpr char DICTIONARY_CONCURRENT_TEST_NAME[] = "DictionaryConcurrent"; - -namespace { - -class DictionaryConcurrentSetup { -public: - DictionaryConcurrentSetup() { - installGtestCrashHandler(); - } - ~DictionaryConcurrentSetup() { - restoreDefaultSignalHandlers(); - } -}; - -static DictionaryConcurrentSetup dictionary_concurrent_global_setup; - -} // namespace - -// (1a) bounded_lookup with size_limit=0 must not insert on miss. -TEST(DictionaryConcurrent, BoundedLookupMissReturnsSentinelAndDoesNotInsert) { - Dictionary dict(/*id=*/0); - - std::map before; - dict.collect(before); - ASSERT_TRUE(before.empty()); - - const char* key = "Lorg/example/Cold;"; - unsigned int id = dict.bounded_lookup(key, strlen(key), 0); - EXPECT_EQ(static_cast(INT_MAX), id); - - std::map after; - dict.collect(after); - EXPECT_TRUE(after.empty()); - - // A second miss on a different key must also leave the dictionary empty. - const char* key2 = "Lorg/example/Other;"; - unsigned int id2 = dict.bounded_lookup(key2, strlen(key2), 0); - EXPECT_EQ(static_cast(INT_MAX), id2); - - std::map after2; - dict.collect(after2); - EXPECT_TRUE(after2.empty()); -} - -// (1b) bounded_lookup with size_limit=0 must return the existing id when the -// key is already present (e.g. previously inserted from a non-signal context). -TEST(DictionaryConcurrent, BoundedLookupHitReturnsExistingId) { - Dictionary dict(/*id=*/0); - - const char* key = "Ljava/util/HashMap;"; - unsigned int inserted_id = dict.lookup(key, strlen(key)); - ASSERT_NE(0u, inserted_id); - ASSERT_NE(static_cast(INT_MAX), inserted_id); - - unsigned int looked_up_id = dict.bounded_lookup(key, strlen(key), 0); - EXPECT_EQ(inserted_id, looked_up_id); -} - -// (2) Stress test: shared-lock inserters + exclusive-lock clear + shared-lock -// collect, all driven from separate threads. This mirrors the discipline that -// Recording::writeClasses (shared-lock collect) and Profiler::dump (exclusive-lock -// clear) follow around _class_map_lock. Without the lock, this pattern races -// (and SIGSEGVs on dictionary corruption); with the lock it is well-formed and -// the test passes cleanly under TSan/ASan. -TEST(DictionaryConcurrent, ConcurrentInsertCollectClearWithExternalLock) { - Dictionary dict(/*id=*/0); - SpinLock lock; - - constexpr int kWriters = 4; - constexpr int kKeysPerWriter = 256; - const auto kDuration = std::chrono::milliseconds(500); - - std::atomic stop{false}; - std::atomic total_inserts{0}; - std::atomic total_collects{0}; - std::atomic total_clears{0}; - - std::vector writers; - writers.reserve(kWriters); - for (int w = 0; w < kWriters; ++w) { - writers.emplace_back([&, w]() { - char buf[64]; - int counter = 0; - while (!stop.load(std::memory_order_relaxed)) { - snprintf(buf, sizeof(buf), "Lcom/example/W%d/K%d;", - w, counter % kKeysPerWriter); - size_t len = strlen(buf); - lock.lockShared(); - unsigned int id = dict.lookup(buf, len); - lock.unlockShared(); - EXPECT_NE(static_cast(INT_MAX), id); - total_inserts.fetch_add(1, std::memory_order_relaxed); - ++counter; - } - }); - } - - std::thread collector([&]() { - while (!stop.load(std::memory_order_relaxed)) { - std::map snapshot; - lock.lockShared(); - dict.collect(snapshot); - lock.unlockShared(); - // The map may be empty if a clear() just ran; either way it must - // be a well-formed map of (id, key) pairs that we can iterate. - for (auto it = snapshot.begin(); it != snapshot.end(); ++it) { - ASSERT_NE(nullptr, it->second); - } - total_collects.fetch_add(1, std::memory_order_relaxed); - } - }); - - std::thread clearer([&]() { - while (!stop.load(std::memory_order_relaxed)) { - std::this_thread::sleep_for(std::chrono::milliseconds(20)); - lock.lock(); - dict.clear(); - lock.unlock(); - total_clears.fetch_add(1, std::memory_order_relaxed); - } - }); - - std::this_thread::sleep_for(kDuration); - stop.store(true, std::memory_order_relaxed); - - for (auto& t : writers) { - t.join(); - } - collector.join(); - clearer.join(); - - // Sanity: each role made progress. - EXPECT_GT(total_inserts.load(), 0L); - EXPECT_GT(total_collects.load(), 0L); - EXPECT_GT(total_clears.load(), 0L); -} - -// (3) Regression for PROF-14550: the walkVM vtable-target lookup path uses -// OptionalSharedLockGuard (tryLockShared) + bounded_lookup(0). This must not -// race a concurrent exclusive dict.clear() (Profiler::dump path). -// -// Without the guard, bounded_lookup reads row->keys and row->next while clear() -// concurrently frees them — use-after-free / SIGSEGV. With the guard, clear() -// holds the exclusive lock and signal-side readers either finish before clear -// or fail tryLockShared and skip the lookup entirely. -TEST(DictionaryConcurrent, SignalHandlerBoundedLookupVsDumpClear) { - Dictionary dict(/*id=*/0); - SpinLock lock; - - // Pre-populate so bounded_lookup has real rows to traverse. - constexpr int kPreload = 64; - for (int i = 0; i < kPreload; ++i) { - char buf[64]; - snprintf(buf, sizeof(buf), "Lcom/example/Preloaded%d;", i); - lock.lock(); - dict.lookup(buf, strlen(buf)); - lock.unlock(); - } - - constexpr int kSignalThreads = 4; - const auto kDuration = std::chrono::milliseconds(500); - - std::atomic stop{false}; - std::atomic total_reads{0}; - std::atomic total_skips{0}; - std::atomic total_clears{0}; - - // Simulate walkVM signal-handler threads: tryLockShared → bounded_lookup(0) - // → unlockShared. Mirrors the OptionalSharedLockGuard + ownsLock() guard. - std::vector signal_threads; - signal_threads.reserve(kSignalThreads); - for (int w = 0; w < kSignalThreads; ++w) { - signal_threads.emplace_back([&, w]() { - char buf[64]; - int counter = 0; - while (!stop.load(std::memory_order_relaxed)) { - snprintf(buf, sizeof(buf), "Lcom/example/Preloaded%d;", - counter % kPreload); - size_t len = strlen(buf); - OptionalSharedLockGuard guard(&lock); - if (guard.ownsLock()) { - // Read-only; no malloc, no CAS — safe in signal context. - unsigned int id = dict.bounded_lookup(buf, len, 0); - // INT_MAX is fine: key was cleared by a concurrent dump. - (void)id; - total_reads.fetch_add(1, std::memory_order_relaxed); - } else { - // Exclusive clear() holds the lock; drop the frame. - total_skips.fetch_add(1, std::memory_order_relaxed); - } - ++counter; - } - }); - } - - // Simulate Profiler::dump: exclusive lock → _class_map.clear() → unlock. - std::thread dump_thread([&]() { - while (!stop.load(std::memory_order_relaxed)) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - lock.lock(); - dict.clear(); - lock.unlock(); - total_clears.fetch_add(1, std::memory_order_relaxed); - } - }); - - std::this_thread::sleep_for(kDuration); - stop.store(true, std::memory_order_relaxed); - - for (auto& t : signal_threads) { - t.join(); - } - dump_thread.join(); - - // Both roles must have made progress. - EXPECT_GT(total_reads.load() + total_skips.load(), 0L); - EXPECT_GT(total_clears.load(), 0L); -} - -// (4) Same race as (3) but using BoundedOptionalSharedLockGuard, which is the -// guard classMapTrySharedGuard() now returns in hotspotSupport.cpp. The bounded -// variant may fail spuriously under reader pressure (≤5 CAS attempts); this -// verifies it still prevents use-after-free without deadlocking. -TEST(DictionaryConcurrent, SignalHandlerBoundedOptionalLookupVsDumpClear) { - Dictionary dict(/*id=*/0); - SpinLock lock; - - constexpr int kPreload = 64; - for (int i = 0; i < kPreload; ++i) { - char buf[64]; - snprintf(buf, sizeof(buf), "Lcom/example/Preloaded%d;", i); - lock.lock(); - dict.lookup(buf, strlen(buf)); - lock.unlock(); - } - - constexpr int kSignalThreads = 4; - const auto kDuration = std::chrono::milliseconds(500); - - std::atomic stop{false}; - std::atomic total_reads{0}; - std::atomic total_skips{0}; - std::atomic total_clears{0}; - - // Simulate hotspotSupport.cpp: BoundedOptionalSharedLockGuard + bounded_lookup(0). - std::vector signal_threads; - signal_threads.reserve(kSignalThreads); - for (int w = 0; w < kSignalThreads; ++w) { - signal_threads.emplace_back([&, w]() { - char buf[64]; - int counter = 0; - while (!stop.load(std::memory_order_relaxed)) { - snprintf(buf, sizeof(buf), "Lcom/example/Preloaded%d;", - counter % kPreload); - size_t len = strlen(buf); - BoundedOptionalSharedLockGuard guard(&lock); - if (guard.ownsLock()) { - unsigned int id = dict.bounded_lookup(buf, len, 0); - (void)id; - total_reads.fetch_add(1, std::memory_order_relaxed); - } else { - total_skips.fetch_add(1, std::memory_order_relaxed); - } - ++counter; - } - }); - } - - std::thread dump_thread([&]() { - while (!stop.load(std::memory_order_relaxed)) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - lock.lock(); - dict.clear(); - lock.unlock(); - total_clears.fetch_add(1, std::memory_order_relaxed); - } - }); - - std::this_thread::sleep_for(kDuration); - stop.store(true, std::memory_order_relaxed); - - for (auto& t : signal_threads) { - t.join(); - } - dump_thread.join(); - - EXPECT_GT(total_reads.load() + total_skips.load(), 0L); - EXPECT_GT(total_clears.load(), 0L); -} diff --git a/ddprof-lib/src/test/cpp/dictionary_ut.cpp b/ddprof-lib/src/test/cpp/dictionary_ut.cpp new file mode 100644 index 000000000..9d6cf343e --- /dev/null +++ b/ddprof-lib/src/test/cpp/dictionary_ut.cpp @@ -0,0 +1,70 @@ +/* + * Copyright 2025 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "dictionary.h" +#include +#include + +// ── Dictionary ───────────────────────────────────────────────────────────── + +TEST(DictionaryTest, LookupReturnsSameIdForSameKey) { + Dictionary d(0); + unsigned int id1 = d.lookup("hello", 5); + EXPECT_GT(id1, 0U); + EXPECT_EQ(id1, d.lookup("hello", 5)); +} + +TEST(DictionaryTest, LookupReturnsDifferentIdsForDifferentKeys) { + Dictionary d(0); + unsigned int a = d.lookup("alpha", 5); + unsigned int b = d.lookup("beta", 4); + EXPECT_NE(a, b); +} + +TEST(DictionaryTest, BoundedLookupSkipsInsertWhenAtLimit) { + Dictionary d(0); + d.lookup("key1", 4); + // size is 1, limit is 1 → insert not allowed + unsigned int r = d.bounded_lookup("key2", 4, 1); + EXPECT_EQ(r, static_cast(INT_MAX)); +} + +TEST(DictionaryTest, BoundedLookupReturnsExistingIdWhenAtLimit) { + Dictionary d(0); + unsigned int existing = d.lookup("key1", 4); + // size is 1, limit is 1 → existing key still found + EXPECT_EQ(existing, d.bounded_lookup("key1", 4, 1)); +} + +TEST(DictionaryTest, CollectReturnsAllInsertedEntries) { + Dictionary d(0); + d.lookup("a", 1); + d.lookup("b", 1); + d.lookup("c", 1); + std::map m; + d.collect(m); + EXPECT_EQ(m.size(), 3U); +} + +TEST(DictionaryTest, ClearResetsToEmpty) { + Dictionary d(0); + d.lookup("x", 1); + d.clear(); + std::map m; + d.collect(m); + EXPECT_EQ(m.size(), 0U); +} diff --git a/ddprof-lib/src/test/cpp/spinlock_bounded_ut.cpp b/ddprof-lib/src/test/cpp/spinlock_bounded_ut.cpp deleted file mode 100644 index 84e26be83..000000000 --- a/ddprof-lib/src/test/cpp/spinlock_bounded_ut.cpp +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright 2026, Datadog, Inc. - * SPDX-License-Identifier: Apache-2.0 - */ - -#include -#include -#include -#include -#include - -#include "spinLock.h" -#include "../../main/cpp/gtest_crash_handler.h" - -// Unit tests for tryLockSharedBounded() and BoundedOptionalSharedLockGuard — -// the signal-safe locking API introduced to replace unbounded spinning in -// hotspotSupport.cpp (classMapTrySharedGuard()). - -static constexpr char SPINLOCK_BOUNDED_TEST_NAME[] = "SpinLockBounded"; - -namespace { - -class SpinLockBoundedSetup { -public: - SpinLockBoundedSetup() { - installGtestCrashHandler(); - } - ~SpinLockBoundedSetup() { - restoreDefaultSignalHandlers(); - } -}; - -static SpinLockBoundedSetup spinlock_bounded_global_setup; - -} // namespace - -TEST(SpinLockBounded, BoundedTryLockSucceedsOnFreeLock) { - SpinLock lock; - EXPECT_TRUE(lock.tryLockSharedBounded()); - lock.unlockShared(); -} - -TEST(SpinLockBounded, BoundedTryLockFailsWhenExclusiveLockHeld) { - SpinLock lock; - lock.lock(); - EXPECT_FALSE(lock.tryLockSharedBounded()); - lock.unlock(); -} - -// Multiple shared holders must coexist — bounded acquire must not treat a -// concurrent reader's negative _lock value as an exclusive lock. -TEST(SpinLockBounded, BoundedTryLockAllowsMultipleSharedHolders) { - SpinLock lock; - ASSERT_TRUE(lock.tryLockSharedBounded()); - EXPECT_TRUE(lock.tryLockSharedBounded()); - lock.unlockShared(); - lock.unlockShared(); -} - -TEST(SpinLockBounded, BoundedGuardOwnsLockWhenFree) { - SpinLock lock; - BoundedOptionalSharedLockGuard guard(&lock); - EXPECT_TRUE(guard.ownsLock()); -} - -// When the exclusive lock is held the guard must not own the lock, and its -// destructor must not accidentally release the exclusive lock. -TEST(SpinLockBounded, BoundedGuardFailsWhenExclusiveLockHeld) { - SpinLock lock; - lock.lock(); - { - BoundedOptionalSharedLockGuard guard(&lock); - EXPECT_FALSE(guard.ownsLock()); - } - // Exclusive lock must still be held — unlock() must succeed exactly once. - lock.unlock(); -} - -// After the guard goes out of scope the lock must be fully released so that an -// exclusive acquire succeeds. -TEST(SpinLockBounded, BoundedGuardReleasesLockOnDestruction) { - SpinLock lock; - { - BoundedOptionalSharedLockGuard guard(&lock); - ASSERT_TRUE(guard.ownsLock()); - } - EXPECT_TRUE(lock.tryLock()); - lock.unlock(); -} - -// Stress: concurrent BoundedOptionalSharedLockGuard readers vs an exclusive -// locker. Mirrors the classMapTrySharedGuard() signal-handler path vs the -// Profiler::dump path, using the RAII type that hotspotSupport.cpp now uses. -TEST(SpinLockBounded, BoundedGuardConcurrentWithExclusiveLock) { - SpinLock lock; - constexpr int kReaders = 4; - const auto kDuration = std::chrono::milliseconds(300); - - std::atomic stop{false}; - std::atomic total_acquired{0}; - std::atomic total_skipped{0}; - std::atomic total_exclusive{0}; - - std::vector readers; - readers.reserve(kReaders); - for (int i = 0; i < kReaders; ++i) { - readers.emplace_back([&]() { - while (!stop.load(std::memory_order_relaxed)) { - BoundedOptionalSharedLockGuard guard(&lock); - if (guard.ownsLock()) { - total_acquired.fetch_add(1, std::memory_order_relaxed); - } else { - total_skipped.fetch_add(1, std::memory_order_relaxed); - } - } - }); - } - - std::thread exclusive_thread([&]() { - while (!stop.load(std::memory_order_relaxed)) { - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - lock.lock(); - lock.unlock(); - total_exclusive.fetch_add(1, std::memory_order_relaxed); - } - }); - - std::this_thread::sleep_for(kDuration); - stop.store(true, std::memory_order_relaxed); - for (auto& t : readers) t.join(); - exclusive_thread.join(); - - EXPECT_GT(total_acquired.load(), 0L); - EXPECT_GT(total_exclusive.load(), 0L); -} diff --git a/ddprof-lib/src/test/cpp/stress_stringDictionary.cpp b/ddprof-lib/src/test/cpp/stress_stringDictionary.cpp new file mode 100644 index 000000000..e8732afaa --- /dev/null +++ b/ddprof-lib/src/test/cpp/stress_stringDictionary.cpp @@ -0,0 +1,481 @@ +/* + * Copyright 2025, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include "stringDictionary.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// ── helpers ─────────────────────────────────────────────────────────────── + +static std::vector makeKeys(int thread_id, int count) { + std::vector keys; + keys.reserve(count); + for (int i = 0; i < count; i++) { + keys.push_back("t" + std::to_string(thread_id) + "_k" + std::to_string(i)); + } + return keys; +} + +// ── StringDictionaryBuffer concurrent insert + read ──────────────────────── + +TEST(StressStringDictionaryBuffer, ConcurrentInsertNoCorruption) { + StringDictionaryBuffer buf; + constexpr int N_THREADS = 8; + constexpr int KEYS_PER_THREAD = 500; + + std::vector threads; + for (int t = 0; t < N_THREADS; t++) { + threads.emplace_back([&buf, t]() { + auto keys = makeKeys(t, KEYS_PER_THREAD); + for (int i = 0; i < (int)keys.size(); i++) { + u32 id = static_cast(t * KEYS_PER_THREAD + i + 1); + buf.insert_with_id(keys[i].c_str(), keys[i].size(), id); + } + }); + } + for (auto& th : threads) th.join(); + + EXPECT_LE(buf.size(), N_THREADS * KEYS_PER_THREAD); + EXPECT_GT(buf.size(), 0); +} + +TEST(StressStringDictionaryBuffer, ConcurrentInsertAndLookupNoCorruption) { + StringDictionaryBuffer buf; + constexpr int N_WRITERS = 4; + constexpr int N_READERS = 4; + constexpr int OPS = 1000; + std::atomic stop{false}; + + std::vector writers; + for (int t = 0; t < N_WRITERS; t++) { + writers.emplace_back([&buf, &stop, t]() { + auto keys = makeKeys(t, OPS); + for (int i = 0; i < OPS && !stop.load(std::memory_order_relaxed); i++) { + buf.insert_with_id(keys[i].c_str(), keys[i].size(), + static_cast(t * OPS + i + 1)); + } + }); + } + + std::vector readers; + for (int t = 0; t < N_READERS; t++) { + readers.emplace_back([&buf, &stop, t]() { + while (!stop.load(std::memory_order_relaxed)) { + std::string key = "t0_k" + std::to_string(t % OPS); + buf.lookup(key.c_str(), key.size()); + } + }); + } + + for (auto& th : writers) th.join(); + stop.store(true); + for (auto& th : readers) th.join(); + + SUCCEED(); +} + +// Same key inserted by multiple threads: exactly one ID must survive and +// all threads must return that same ID. +TEST(StressStringDictionaryBuffer, ConcurrentSameKeyInsertReturnsConsistentId) { + StringDictionaryBuffer buf; + constexpr int N_THREADS = 16; + std::vector results(N_THREADS, 0); + + std::vector threads; + for (int t = 0; t < N_THREADS; t++) { + threads.emplace_back([&buf, &results, t]() { + // All threads try to insert the same key with different ids. + results[t] = buf.insert_with_id("shared/Key", 10, static_cast(t + 1)); + }); + } + for (auto& th : threads) th.join(); + + // All results must be the same value (the winner's id). + u32 expected = results[0]; + EXPECT_GT(expected, 0u); + for (int t = 0; t < N_THREADS; t++) { + EXPECT_EQ(expected, results[t]) << "thread " << t << " got different id"; + } +} + +// ── StringDictionary concurrent stress ──────────────────────────────────── + +// Invariant: once a key is assigned an id, every subsequent bounded_lookup +// must return the same id, across any number of rotations. +TEST(StressStringDictionary, IdStabilityUnderConcurrentRotation) { + StringDictionary dict; + constexpr int N_INSERTERS = 4; + constexpr int KEYS_PER_THREAD = 200; + + // Phase 1: insert all keys and record their ids. + std::vector> recorded(N_INSERTERS); + { + std::vector inserters; + for (int t = 0; t < N_INSERTERS; t++) { + inserters.emplace_back([&dict, &recorded, t]() { + for (auto& key : makeKeys(t, KEYS_PER_THREAD)) { + u32 id = dict.lookup(key.c_str(), key.size()); + recorded[t][key] = id; + } + }); + } + for (auto& th : inserters) th.join(); + } + + // Phase 2: rotate many times; ids must remain stable. + constexpr int N_ROTATIONS = 20; + for (int r = 0; r < N_ROTATIONS; r++) { + dict.rotate(); + dict.clearStandby(); + + for (int t = 0; t < N_INSERTERS; t++) { + for (auto& kv : recorded[t]) { + u32 current = dict.bounded_lookup(kv.first.c_str(), kv.first.size()); + EXPECT_EQ(kv.second, current) + << "id changed for key '" << kv.first << "' at rotation " << r; + } + } + } +} + +// Concurrent inserts AND rotations simultaneously. +TEST(StressStringDictionary, ConcurrentInsertAndRotateNoCorruption) { + StringDictionary dict; + constexpr int N_INSERTERS = 6; + constexpr int KEYS_PER_THREAD = 300; + std::atomic done{false}; + + std::thread rotator([&dict, &done]() { + while (!done.load(std::memory_order_relaxed)) { + dict.rotate(); + dict.clearStandby(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + }); + + std::vector inserters; + for (int t = 0; t < N_INSERTERS; t++) { + inserters.emplace_back([&dict, t]() { + for (auto& key : makeKeys(t, KEYS_PER_THREAD)) { + dict.lookup(key.c_str(), key.size()); + } + }); + } + for (auto& th : inserters) th.join(); + done.store(true); + rotator.join(); + + SUCCEED(); +} + +// bounded_lookup (simulating signal handlers) concurrent with inserts and rotation. +TEST(StressStringDictionary, BoundedLookupSafeUnderConcurrentRotation) { + StringDictionary dict; + constexpr int N_INSERTERS = 4; + constexpr int N_READERS = 4; + constexpr int KEYS_PER_THREAD = 200; + std::atomic done{false}; + + // Pre-insert known keys for readers to probe. + auto base_keys = makeKeys(99, 50); + for (auto& key : base_keys) dict.lookup(key.c_str(), key.size()); + + std::thread rotator([&dict, &done]() { + while (!done.load(std::memory_order_relaxed)) { + dict.rotate(); + dict.clearStandby(); + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + } + }); + + std::vector inserters; + for (int t = 0; t < N_INSERTERS; t++) { + inserters.emplace_back([&dict, t]() { + for (auto& key : makeKeys(t, KEYS_PER_THREAD)) { + dict.lookup(key.c_str(), key.size()); + } + }); + } + + std::vector readers; + for (int t = 0; t < N_READERS; t++) { + readers.emplace_back([&dict, &base_keys, &done]() { + while (!done.load(std::memory_order_relaxed)) { + for (auto& key : base_keys) { + dict.bounded_lookup(key.c_str(), key.size()); + } + } + }); + } + + for (auto& th : inserters) th.join(); + done.store(true); + rotator.join(); + for (auto& th : readers) th.join(); + + SUCCEED(); +} + +// lookupDuringDump called concurrently with inserts into active. +// Invariant: all keys found or inserted by lookupDuringDump must appear in standby. +TEST(StressStringDictionary, LookupDuringDumpSafeUnderConcurrentInserts) { + StringDictionary dict; + constexpr int N_INSERTERS = 4; + constexpr int KEYS_PER_THREAD = 100; + + // Pre-populate and rotate so lookupDuringDump has a dump buffer. + for (auto& key : makeKeys(99, 20)) dict.lookup(key.c_str(), key.size()); + dict.rotate(); + + // Start inserters FIRST so lookupDuringDump races with active inserts. + std::atomic done{false}; + std::vector inserters; + for (int t = 0; t < N_INSERTERS; t++) { + inserters.emplace_back([&dict, &done, t]() { + for (auto& key : makeKeys(t, KEYS_PER_THREAD)) { + if (done.load(std::memory_order_relaxed)) break; + dict.lookup(key.c_str(), key.size()); + } + }); + } + + // Dump thread probes pre-populated keys concurrently with active inserts. + std::vector> dump_results; + for (auto& key : makeKeys(99, 20)) { + u32 id = dict.lookupDuringDump(key.c_str(), key.size()); + EXPECT_GT(id, 0u) << "lookupDuringDump returned 0 for pre-populated key"; + dump_results.push_back({key, id}); + } + + done.store(true); + for (auto& th : inserters) th.join(); + + // All keys returned by lookupDuringDump must be in the dump buffer (standby). + std::map snap; + dict.standby()->collect(snap); + for (auto& kv : dump_results) { + EXPECT_EQ(1u, snap.count(kv.second)) + << "key '" << kv.first << "' with id " << kv.second << " not in standby"; + } +} + +// ── Multi-dictionary atomic rotation ────────────────────────────────────── +// +// Mirrors the production pattern in Profiler::dump(): three independent +// StringDictionaries are rotated atomically under a single critical section +// while inserters and signal-style readers run concurrently against all three. +// +// Invariants asserted: +// - Seed keys recorded before the rotator starts retain stable ids in every +// dictionary across many rotation cycles (concurrent inserts must not +// shift them). +// - After each atomic rotate-of-all-three, every seed id is present in the +// standby buffer of its dictionary — i.e. the rotation snapshot is +// consistent across the three dictionaries simultaneously. +TEST(StressStringDictionary, MultiDictionaryAtomicRotation) { + StringDictionary d1, d2, d3; + StringDictionary* dicts[3] = {&d1, &d2, &d3}; + + constexpr int N_INSERTERS_PER_DICT = 2; + constexpr int N_READERS = 3; + constexpr int SEED_KEY_COUNT = 40; + constexpr int SOAK_MS = 1500; + + // Pre-insert seed keys into all three dicts and record their ids. + auto seed_keys = makeKeys(99, SEED_KEY_COUNT); + std::unordered_map> seed_ids; + for (auto& k : seed_keys) { + seed_ids[k] = { + d1.lookup(k.c_str(), k.size()), + d2.lookup(k.c_str(), k.size()), + d3.lookup(k.c_str(), k.size()) + }; + } + + std::atomic done{false}; + std::mutex rotate_mutex; // simulates Profiler::lockAll() + + // Rotator: rotate all three atomically, then verify the rotation snapshot. + std::thread rotator([&]() { + while (!done.load(std::memory_order_relaxed)) { + { + std::lock_guard lk(rotate_mutex); + d1.rotate(); + d2.rotate(); + d3.rotate(); + + // Atomic snapshot: every seed id is present in every standby. + std::map s1, s2, s3; + d1.standby()->collect(s1); + d2.standby()->collect(s2); + d3.standby()->collect(s3); + for (auto& kv : seed_ids) { + EXPECT_EQ(1u, s1.count(kv.second[0])) + << "d1 standby missing seed id " << kv.second[0] << " for '" << kv.first << "'"; + EXPECT_EQ(1u, s2.count(kv.second[1])) + << "d2 standby missing seed id " << kv.second[1] << " for '" << kv.first << "'"; + EXPECT_EQ(1u, s3.count(kv.second[2])) + << "d3 standby missing seed id " << kv.second[2] << " for '" << kv.first << "'"; + } + } + d1.clearStandby(); + d2.clearStandby(); + d3.clearStandby(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + }); + + // Inserters: per dict, hammer a per-thread key set. Each inserter loops + // its key list to keep memory bounded while exercising the lookup path + // (first iteration inserts; subsequent iterations are hot-hit lookups). + std::vector inserters; + for (int d = 0; d < 3; d++) { + for (int t = 0; t < N_INSERTERS_PER_DICT; t++) { + inserters.emplace_back([dicts, d, t, &done]() { + auto keys = makeKeys(d * 10 + t, 100); + while (!done.load(std::memory_order_relaxed)) { + for (auto& k : keys) { + dicts[d]->lookup(k.c_str(), k.size()); + } + } + }); + } + } + + // Readers: signal-style bounded_lookup of seed keys on all three dicts. + // Ids must remain stable across rotations. + std::vector readers; + for (int r = 0; r < N_READERS; r++) { + readers.emplace_back([&]() { + while (!done.load(std::memory_order_relaxed)) { + for (auto& kv : seed_ids) { + u32 i1 = d1.bounded_lookup(kv.first.c_str(), kv.first.size()); + u32 i2 = d2.bounded_lookup(kv.first.c_str(), kv.first.size()); + u32 i3 = d3.bounded_lookup(kv.first.c_str(), kv.first.size()); + EXPECT_EQ(kv.second[0], i1) << "d1 id drifted for '" << kv.first << "'"; + EXPECT_EQ(kv.second[1], i2) << "d2 id drifted for '" << kv.first << "'"; + EXPECT_EQ(kv.second[2], i3) << "d3 id drifted for '" << kv.first << "'"; + } + } + }); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(SOAK_MS)); + done.store(true); + rotator.join(); + for (auto& th : inserters) th.join(); + for (auto& th : readers) th.join(); +} + +// ── clearAll under concurrent readers ───────────────────────────────────── +// +// StringDictionary::clearAll() frees every malloc'd key in all three buffers. +// Its contract is that the caller must quiesce signal handlers first +// (cf. RefCountGuard::waitForAllRefCountsToClear in the production callsite). +// This test models that protocol using a std::shared_mutex barrier: +// readers/inserters acquire it shared, the clearer acquires it exclusive. +// +// Under ASan/TSan, this test catches: UAF on freed keys, lost stores from a +// torn clearAll, or any heap corruption from clear-and-reinsert cycles. +// +// Invariants asserted: +// - No use-after-free, no heap corruption (caught by sanitizers). +// - Each clearAll-then-reinsert cycle yields a self-consistent id mapping: +// the same key resolves to one id through all reads in that epoch. +TEST(StressStringDictionary, ClearAllUnderConcurrentReaders) { + StringDictionary dict; + + constexpr int N_READERS = 4; + constexpr int N_INSERTERS = 2; + constexpr int N_CLEAR_OPS = 20; + constexpr int SOAK_MS = 1500; + + auto seed_keys = makeKeys(99, 30); + + // shared_mutex: shared = workers; unique = clearer. Mirrors the production + // requirement that clearAll runs only after all signal handlers are quiesced. + std::shared_mutex epoch_mtx; + std::atomic epoch{0}; // bumped every clearAll; readers re-snapshot ids per epoch + + // Re-seed under exclusive lock. Returns the per-key id map for this epoch. + auto reseed = [&](std::unordered_map& out) { + out.clear(); + for (auto& k : seed_keys) { + out[k] = dict.lookup(k.c_str(), k.size()); + } + }; + + std::unordered_map seed_ids; + reseed(seed_ids); // initial epoch 0 + + std::atomic done{false}; + + // Clearer: bounded number of clearAll cycles, then exits. + std::thread clearer([&]() { + for (int i = 0; i < N_CLEAR_OPS && !done.load(std::memory_order_relaxed); i++) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::unordered_map new_ids; + { + std::unique_lock lk(epoch_mtx); + dict.clearAll(); + reseed(new_ids); + seed_ids = std::move(new_ids); + epoch.fetch_add(1, std::memory_order_release); + } + } + }); + + // Inserters: arbitrary keys, just to stress the malloc/CAS paths under + // contention with clearAll cycles. + std::vector inserters; + for (int t = 0; t < N_INSERTERS; t++) { + inserters.emplace_back([&, t]() { + auto ks = makeKeys(t, 60); + while (!done.load(std::memory_order_relaxed)) { + std::shared_lock lk(epoch_mtx); + for (auto& k : ks) { + dict.lookup(k.c_str(), k.size()); + } + } + }); + } + + // Readers: snapshot the current-epoch id map, then verify lookups within + // the epoch are consistent. An epoch bump invalidates the snapshot, so + // re-read it under shared lock on each loop. + std::vector readers; + for (int t = 0; t < N_READERS; t++) { + readers.emplace_back([&]() { + while (!done.load(std::memory_order_relaxed)) { + std::shared_lock lk(epoch_mtx); + u64 my_epoch = epoch.load(std::memory_order_acquire); + auto local = seed_ids; // snapshot under lock + for (auto& kv : local) { + u32 id = dict.bounded_lookup(kv.first.c_str(), kv.first.size()); + // While the shared lock is held, no clearAll can run, so + // the id must equal what reseed recorded for this epoch. + EXPECT_EQ(kv.second, id) + << "epoch " << my_epoch << " key '" << kv.first + << "' expected " << kv.second << " got " << id; + } + } + }); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(SOAK_MS)); + done.store(true); + clearer.join(); + for (auto& th : inserters) th.join(); + for (auto& th : readers) th.join(); +} diff --git a/ddprof-lib/src/test/cpp/stringDictionary_ut.cpp b/ddprof-lib/src/test/cpp/stringDictionary_ut.cpp new file mode 100644 index 000000000..68fca8863 --- /dev/null +++ b/ddprof-lib/src/test/cpp/stringDictionary_ut.cpp @@ -0,0 +1,183 @@ +#include +#include "stringDictionary.h" +#include +#include +#include +#include +#include + +// ── StringDictionaryBuffer ───────────────────────────────────────────────── + +TEST(StringDictionaryBufferTest, InsertWithIdReturnsSameIdForSameKey) { + StringDictionaryBuffer buf; + u32 id = buf.insert_with_id("hello", 5, 42); + EXPECT_EQ(42u, id); + EXPECT_EQ(42u, buf.insert_with_id("hello", 5, 42)); +} + +TEST(StringDictionaryBufferTest, InsertPreservesExistingIdOnDuplicate) { + StringDictionaryBuffer buf; + buf.insert_with_id("key", 3, 7); + // Second insert of same key must return 7, not some other value + EXPECT_EQ(7u, buf.insert_with_id("key", 3, 99)); +} + +TEST(StringDictionaryBufferTest, LookupReturnZeroOnMiss) { + StringDictionaryBuffer buf; + EXPECT_EQ(0u, buf.lookup("absent", 6)); +} + +TEST(StringDictionaryBufferTest, LookupFindsInsertedKey) { + StringDictionaryBuffer buf; + buf.insert_with_id("java/lang/String", 16, 1); + EXPECT_EQ(1u, buf.lookup("java/lang/String", 16)); +} + +TEST(StringDictionaryBufferTest, LookupDoesNotInsert) { + StringDictionaryBuffer buf; + buf.lookup("ghost", 5); + std::map out; + buf.collect(out); + EXPECT_EQ(0u, out.size()); +} + +TEST(StringDictionaryBufferTest, CollectReturnsAllInsertedEntries) { + StringDictionaryBuffer buf; + buf.insert_with_id("a", 1, 1); + buf.insert_with_id("b", 1, 2); + buf.insert_with_id("c", 1, 3); + std::map out; + buf.collect(out); + ASSERT_EQ(3u, out.size()); + EXPECT_STREQ("a", out[1]); + EXPECT_STREQ("b", out[2]); + EXPECT_STREQ("c", out[3]); +} + +TEST(StringDictionaryBufferTest, CopyFromPreservesAllEntriesWithIds) { + StringDictionaryBuffer src; + src.insert_with_id("java/lang/String", 16, 10); + src.insert_with_id("java/lang/Integer", 17, 20); + + StringDictionaryBuffer dst; + dst.copyFrom(src); + + EXPECT_EQ(10u, dst.lookup("java/lang/String", 16)); + EXPECT_EQ(20u, dst.lookup("java/lang/Integer", 17)); +} + +TEST(StringDictionaryBufferTest, ClearResetsToEmpty) { + StringDictionaryBuffer buf; + buf.insert_with_id("x", 1, 5); + buf.clear(); + EXPECT_EQ(0u, buf.lookup("x", 1)); + std::map out; + buf.collect(out); + EXPECT_EQ(0u, out.size()); +} + +// ── StringDictionary (persistent, global IDs) ───────────────────────────── + +class StringDictionaryTest : public ::testing::Test { +protected: + StringDictionary dict; +}; + +TEST_F(StringDictionaryTest, LookupAssignsGlobalId) { + u32 id = dict.lookup("java/lang/String", 16); + EXPECT_GT(id, 0u); + EXPECT_EQ(id, dict.lookup("java/lang/String", 16)); +} + +TEST_F(StringDictionaryTest, BoundedLookupFindsActiveEntry) { + u32 id = dict.lookup("Foo", 3); + EXPECT_EQ(id, dict.bounded_lookup("Foo", 3)); +} + +TEST_F(StringDictionaryTest, BoundedLookupReturnsZeroOnMiss) { + EXPECT_EQ(0u, dict.bounded_lookup("Absent", 6)); +} + +TEST_F(StringDictionaryTest, IdStableAcrossRotations) { + u32 id = dict.lookup("java/lang/String", 16); + for (int cycle = 0; cycle < 10; cycle++) { + dict.rotate(); + dict.clearStandby(); + EXPECT_EQ(id, dict.bounded_lookup("java/lang/String", 16)) + << "id changed at cycle " << cycle; + } +} + +TEST_F(StringDictionaryTest, AllEntriesPresentInStandbyAfterRotate) { + u32 id1 = dict.lookup("a", 1); + u32 id2 = dict.lookup("b", 1); + dict.rotate(); + + std::map snap; + dict.standby()->collect(snap); + ASSERT_EQ(2u, snap.size()); + EXPECT_EQ(snap[id1], std::string("a")); + EXPECT_EQ(snap[id2], std::string("b")); +} + +TEST_F(StringDictionaryTest, NewEntryAfterRotateIsInNewActive) { + dict.lookup("early", 5); + dict.rotate(); + u32 id = dict.lookup("late", 4); + + EXPECT_EQ(id, dict.bounded_lookup("late", 4)); + + dict.rotate(); + std::map snap; + dict.standby()->collect(snap); + bool found = false; + for (auto& kv : snap) if (strcmp(kv.second, "late") == 0) { found = true; break; } + EXPECT_TRUE(found); +} + +TEST_F(StringDictionaryTest, LookupDuringDumpFindsPreregisteredKey) { + u32 id = dict.lookup("java/lang/String", 16); + dict.rotate(); + EXPECT_EQ(id, dict.lookupDuringDump("java/lang/String", 16)); +} + +TEST_F(StringDictionaryTest, LookupDuringDumpAlsoAddsToStandby) { + dict.rotate(); + u32 id = dict.lookup("late/Class", 10); + + u32 found = dict.lookupDuringDump("late/Class", 10); + EXPECT_EQ(id, found); + + std::map snap; + dict.standby()->collect(snap); + EXPECT_EQ(1u, snap.count(id)); +} + +TEST_F(StringDictionaryTest, ClearAllResetsEverything) { + u32 id = dict.lookup("x", 1); + (void)id; + dict.rotate(); + dict.clearAll(); + EXPECT_EQ(0u, dict.bounded_lookup("x", 1)); + dict.rotate(); + std::map snap; + dict.standby()->collect(snap); + EXPECT_EQ(0u, snap.size()); + u32 new_id = dict.lookup("x", 1); + EXPECT_EQ(1u, new_id); +} + +TEST_F(StringDictionaryTest, LookupDuringDumpInsertsNewKeyIntoActiveAndStandby) { + dict.rotate(); // empty active becomes dump, fresh active + // Key is not in dump and not in active — lookupDuringDump must insert into both. + u32 id = dict.lookupDuringDump("brand/New", 9); + EXPECT_GT(id, 0u); + + // Must be in dump (standby) + std::map snap; + dict.standby()->collect(snap); + EXPECT_EQ(1u, snap.count(id)); + + // Must be in active (bounded_lookup is a probe of active) + EXPECT_EQ(id, dict.bounded_lookup("brand/New", 9)); +} diff --git a/ddprof-lib/src/test/fuzz/corpus/fuzz_stringDictionary/basic_rotation b/ddprof-lib/src/test/fuzz/corpus/fuzz_stringDictionary/basic_rotation new file mode 100644 index 000000000..e1545539b Binary files /dev/null and b/ddprof-lib/src/test/fuzz/corpus/fuzz_stringDictionary/basic_rotation differ diff --git a/ddprof-lib/src/test/fuzz/fuzz_stringDictionary.cpp b/ddprof-lib/src/test/fuzz/fuzz_stringDictionary.cpp new file mode 100644 index 000000000..49df7ac7d --- /dev/null +++ b/ddprof-lib/src/test/fuzz/fuzz_stringDictionary.cpp @@ -0,0 +1,153 @@ +/* + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + * + * libFuzzer fuzz target for the triple-buffered StringDictionary. + * + * The fuzzer interprets each input as a sequence of dictionary operations and + * verifies the documented sequential invariants of StringDictionary: + * + * I1. Once a key has an id, every subsequent successful lookup of that key + * returns the same id — across any number of rotate()/clearStandby() cycles. + * + * I2. The signal-safe read-only bounded_lookup(key, len) never returns an id + * for a key that was never inserted into the active buffer. + * + * I3. lookupDuringDump(key) either returns 0 or returns an id that is also + * resolvable from standby()->collect(). + * + * I4. clearAll() resets state — after clearAll(), no previously recorded id + * must be observable via any read path. + * + * Address/UB sanitizer is expected to be enabled by the fuzz build; UAFs and + * heap corruption from the malloc'd key storage will be caught automatically. + */ + +#include +#include +#include + +#include +#include +#include + +#include "stringDictionary.h" + +namespace { + +// Bound the working set so a pathological corpus does not OOM the fuzzer. +constexpr int kMaxUniqueKeys = 4096; +constexpr int kBoundedSizeLimit = 8192; +constexpr size_t kMaxKeyLen = 31; // mask of the length byte + +// Read a length-prefixed key from the input. '\0' bytes are mapped to '_' so +// the key is a valid C string (StringDictionary uses NUL-terminated comparison). +// Advances pos. Returns an empty key when the input is exhausted. +std::string readKey(const uint8_t *data, size_t size, size_t &pos) { + if (pos >= size) return {}; + size_t len = data[pos++] & kMaxKeyLen; + if (pos + len > size) len = size - pos; + std::string out; + out.reserve(len); + for (size_t i = 0; i < len; i++) { + char c = (char)data[pos + i]; + out.push_back(c == '\0' ? '_' : c); + } + pos += len; + return out; +} + +} // namespace + +extern "C" int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { + if (size < 2) return 0; + + StringDictionary dict; + std::unordered_map shadow; // key -> expected id + bool has_dump_buffer = false; // true after first rotate() + + size_t pos = 0; + while (pos < size) { + uint8_t op = data[pos++]; + + if (op < 0x40) { + // lookup (insert into active) + std::string k = readKey(data, size, pos); + if (shadow.size() >= kMaxUniqueKeys && shadow.find(k) == shadow.end()) { + continue; + } + u32 id = dict.lookup(k.c_str(), k.size()); + if (id == 0) continue; + auto it = shadow.find(k); + if (it == shadow.end()) { + shadow.emplace(k, id); + } else if (it->second != id) { + __builtin_trap(); // I1: id changed for known key + } + } else if (op < 0x60) { + // bounded_lookup with insert (high cap) + std::string k = readKey(data, size, pos); + if (shadow.size() >= kMaxUniqueKeys && shadow.find(k) == shadow.end()) { + continue; + } + u32 id = dict.bounded_lookup(k.c_str(), k.size(), kBoundedSizeLimit); + if (id == 0) continue; // at cap is legitimate + auto it = shadow.find(k); + if (it == shadow.end()) { + shadow.emplace(k, id); + } else if (it->second != id) { + __builtin_trap(); // I1 + } + } else if (op < 0x70) { + // bounded_lookup signal-safe (read-only) + std::string k = readKey(data, size, pos); + u32 id = dict.bounded_lookup(k.c_str(), k.size()); + auto it = shadow.find(k); + if (it == shadow.end()) { + if (id != 0) __builtin_trap(); // I2: phantom id + } else if (id != 0 && id != it->second) { + __builtin_trap(); // I1: id changed + } + // Note: id may legitimately be 0 if the key only ever existed in + // standby/dump (e.g. inserted, then clearAll, then nothing); we + // already cleared the shadow in that case, so shadow.find would miss. + } else if (op < 0x80) { + // lookupDuringDump — only legal once we have a dump buffer. + std::string k = readKey(data, size, pos); + if (!has_dump_buffer) continue; + if (shadow.size() >= kMaxUniqueKeys && shadow.find(k) == shadow.end()) { + continue; + } + u32 id = dict.lookupDuringDump(k.c_str(), k.size()); + if (id == 0) continue; + auto it = shadow.find(k); + if (it == shadow.end()) { + shadow.emplace(k, id); + } else if (it->second != id) { + __builtin_trap(); // I1 + } + // I3: lookupDuringDump must leave the key resolvable from standby(). + std::map snap; + dict.standby()->collect(snap); + auto found = snap.find(id); + if (found == snap.end() || k != found->second) { + __builtin_trap(); + } + } else if (op < 0x90) { + // rotate + dict.rotate(); + has_dump_buffer = true; + } else if (op < 0xA0) { + // clearStandby (clears scratch; does not affect active) + dict.clearStandby(); + } else if (op < 0xA8) { + // clearAll — sequential fuzz only; not exercising signal-time semantics. + dict.clearAll(); + shadow.clear(); + has_dump_buffer = false; + } + // 0xA8-0xFF: noop / spacer — keeps the corpus density adjustable. + } + + return 0; +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/ContendedCallTraceStorageTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/ContendedCallTraceStorageTest.java index 80da81c91..b95b58d60 100644 --- a/ddprof-test/src/test/java/com/datadoghq/profiler/ContendedCallTraceStorageTest.java +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/ContendedCallTraceStorageTest.java @@ -43,7 +43,10 @@ protected String getProfilerCommand() { @Override protected boolean isPlatformSupported() { - return !Platform.isJ9(); // Avoid J9-specific issues + // CTimer::unregisterThread races with concurrent thread teardown on musl-aarch64 debug; + // tracked separately as a pre-existing native bug: + // https://github.com/DataDog/java-profiler/issues/534 + return !Platform.isJ9() && !(Platform.isMusl() && Platform.isAarch64()); } @Test diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/endpoints/EndpointTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/endpoints/EndpointTest.java index 06e400c25..cbb9ff1a0 100644 --- a/ddprof-test/src/test/java/com/datadoghq/profiler/endpoints/EndpointTest.java +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/endpoints/EndpointTest.java @@ -40,6 +40,8 @@ public void testEndpoints() { // reject above size limit record(new Endpoint(0, UUID.randomUUID().toString(), UUID.randomUUID().toString()), false, sizeLimit); + Map debugCounters = profiler.getDebugCounters(); + assertEquals(endpoints.length, debugCounters.get("dictionary_endpoints_keys")); stopProfiler(); IItemCollection events = verifyEvents("datadog.Endpoint"); IAttribute endpointAttribute = attr("endpoint", "endpoint", "endpoint", @@ -64,8 +66,6 @@ public void testEndpoints() { for (int i = 0; i < endpoints.length; i++) { assertTrue(recovered.get(i), i + " not tested"); } - Map debugCounters = profiler.getDebugCounters(); - assertEquals(endpoints.length, debugCounters.get("dictionary_endpoints_keys")); assertEquals(Arrays.stream(endpoints).mapToInt(ep -> ep.endpoint.length() + 1).sum(), debugCounters.get("dictionary_endpoints_keys_bytes")); assertBoundedBy(debugCounters.get("dictionary_endpoints_pages"), 300, "endpoint storage too many pages"); assertBoundedBy(debugCounters.get("dictionary_endpoints_bytes"), 300 * DICTIONARY_PAGE_SIZE, "endpoint storage too many pages"); diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/metadata/BoundMethodHandleMetadataSizeTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/metadata/BoundMethodHandleProfilerTest.java similarity index 90% rename from ddprof-test/src/test/java/com/datadoghq/profiler/metadata/BoundMethodHandleMetadataSizeTest.java rename to ddprof-test/src/test/java/com/datadoghq/profiler/metadata/BoundMethodHandleProfilerTest.java index d7b795d96..8cb8f6186 100644 --- a/ddprof-test/src/test/java/com/datadoghq/profiler/metadata/BoundMethodHandleMetadataSizeTest.java +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/metadata/BoundMethodHandleProfilerTest.java @@ -8,12 +8,11 @@ import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; import java.util.Map; - import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeFalse; -public class BoundMethodHandleMetadataSizeTest extends AbstractProfilerTest { +public class BoundMethodHandleProfilerTest extends AbstractProfilerTest { @Override protected String getProfilerCommand() { return Platform.isJ9() ? "wall=100ms" : "wall=100us"; @@ -34,8 +33,7 @@ public void test() throws Throwable { stopProfiler(); verifyEvents("datadog.MethodSample"); Map counters = profiler.getDebugCounters(); - assertFalse(counters.isEmpty()); - // assert about the size of metadata here + assertFalse(counters.isEmpty(), "profiler debug counters must not be empty after BoundMethodHandle workload"); } @@ -47,7 +45,7 @@ public static String append(String string, int suffix) { public static int generateBoundMethodHandles(int howMany) throws Throwable { int total = 0; MethodHandle append = MethodHandles.lookup() - .findStatic(BoundMethodHandleMetadataSizeTest.class, + .findStatic(BoundMethodHandleProfilerTest.class, "append", MethodType.methodType(String.class, String.class, int.class)); for (int i = 0; i < howMany; i++) { diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/metadata/DictionaryRotationTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/metadata/DictionaryRotationTest.java new file mode 100644 index 000000000..6b8c40415 --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/metadata/DictionaryRotationTest.java @@ -0,0 +1,118 @@ +/* + * Copyright 2025 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datadoghq.profiler.metadata; + +import com.datadoghq.profiler.AbstractProfilerTest; +import org.junit.jupiter.api.Test; +import org.openjdk.jmc.common.item.IAttribute; +import org.openjdk.jmc.common.item.IItem; +import org.openjdk.jmc.common.item.IItemCollection; +import org.openjdk.jmc.common.item.IItemIterable; +import org.openjdk.jmc.common.item.IMemberAccessor; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.*; +import static org.openjdk.jmc.common.item.Attribute.attr; +import static org.openjdk.jmc.common.unit.UnitLookup.PLAIN_TEXT; + +/** + * Verifies that the dictionary rotate+clearStandby cycle correctly: + * - Exposes only pre-dump entries in the dump snapshot. + * - Recalibrates the live counter to reflect the active buffer after clearStandby(). + * - Accumulates post-dump entries in the new active buffer. + */ +public class DictionaryRotationTest extends AbstractProfilerTest { + + private static final IAttribute ENDPOINT_ATTR = + attr("endpoint", "endpoint", "endpoint", PLAIN_TEXT); + + @Test + public void dumpCycleSeparatesPreAndPostDumpEntries() throws Exception { + String[] preDump = { "ep_pre_0", "ep_pre_1", "ep_pre_2" }; + String[] postDump = { "ep_post_0", "ep_post_1" }; + int sizeLimit = 100; + + for (int i = 0; i < preDump.length; i++) { + profiler.recordTraceRoot(i, preDump[i], null, sizeLimit); + } + + // dump() triggers: lockAll() → rotate() → jfr.dump(snapshot) → unlockAll() + // → clearStandby() (resets per-dump counters to 0, frees scratch buffer) + Path snapshot = Files.createTempFile("DictionaryRotation_snapshot_", ".jfr"); + try { + dump(snapshot); + + // Counter reset to 0 by clearStandby() — tracks only post-clearStandby inserts + Map afterDump = profiler.getDebugCounters(); + assertEquals(0L, afterDump.getOrDefault("dictionary_endpoints_keys", -1L), + "dictionary_endpoints_keys must be 0 after clearStandby"); + + for (int i = 0; i < postDump.length; i++) { + profiler.recordTraceRoot(preDump.length + i, postDump[i], null, sizeLimit); + } + + // Live counter reflects only post-dump insertions + Map live = profiler.getDebugCounters(); + assertEquals((long) postDump.length, live.get("dictionary_endpoints_keys"), + "Live counter must equal number of post-dump endpoints"); + + stopProfiler(); + + // Snapshot contains pre-dump endpoints only + Set inSnapshot = endpointNames(verifyEvents(snapshot, "datadog.Endpoint", true)); + for (String ep : preDump) { + assertTrue(inSnapshot.contains(ep), "Snapshot must contain pre-dump endpoint: " + ep); + } + for (String ep : postDump) { + assertFalse(inSnapshot.contains(ep), "Snapshot must NOT contain post-dump endpoint: " + ep); + } + + // Main recording contains post-dump endpoints only + Set inRecording = endpointNames(verifyEvents("datadog.Endpoint")); + for (String ep : postDump) { + assertTrue(inRecording.contains(ep), "Recording must contain post-dump endpoint: " + ep); + } + for (String ep : preDump) { + assertFalse(inRecording.contains(ep), "Recording must NOT contain pre-dump endpoint: " + ep); + } + } finally { + Files.deleteIfExists(snapshot); + } + } + + @Override + protected String getProfilerCommand() { + return "wall=~1ms"; + } + + private static Set endpointNames(IItemCollection events) { + Set names = new HashSet<>(); + for (IItemIterable it : events) { + IMemberAccessor accessor = ENDPOINT_ATTR.getAccessor(it.getType()); + if (accessor == null) continue; + for (IItem item : it) { + String v = accessor.getMember(item); + if (v != null) names.add(v); + } + } + return names; + } +}