Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 51 additions & 5 deletions profiling/src/profiling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{Clocks, CLOCKS, TAGS};
use chrono::Utc;
use core::mem::forget;
use core::{ptr, str};
use cpu_time::ThreadTime;
use crossbeam_channel::{Receiver, Sender, TrySendError};
use libdd_profiling::api::{
Function, Label as ApiLabel, Location, Period, Sample, UpscalingInfo, ValueType as ApiValueType,
Expand All @@ -36,7 +37,7 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::hash::Hash;
use std::num::NonZeroI64;
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU32, Ordering};
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Barrier, OnceLock};
use std::thread::JoinHandle;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
Expand All @@ -61,6 +62,32 @@ const UPLOAD_CHANNEL_CAPACITY: usize = 8;
/// minit, and is destroyed on mshutdown.
static mut PROFILER: OnceLock<Profiler> = OnceLock::new();

pub static STACK_WALK_COUNT: AtomicU64 = AtomicU64::new(0);
pub static STACK_WALK_CPU_TIME_NS: AtomicU64 = AtomicU64::new(0);
pub static DDPROF_TIME_CPU_TIME_NS: AtomicU64 = AtomicU64::new(0);
pub static DDPROF_UPLOAD_CPU_TIME_NS: AtomicU64 = AtomicU64::new(0);

fn cpu_time_delta_ns(now: ThreadTime, prev: ThreadTime) -> u64 {
match now.as_duration().checked_sub(prev.as_duration()) {
Some(duration) => duration.as_nanos().try_into().unwrap_or(u64::MAX),
None => 0,
}
}

pub(crate) fn update_cpu_time_counter(last: &mut Option<ThreadTime>, counter: &AtomicU64) {
let Some(prev) = last.take() else {
*last = ThreadTime::try_now().ok();
return;
};
if let Ok(now) = ThreadTime::try_now() {
let elapsed_ns = cpu_time_delta_ns(now, prev);
counter.fetch_add(elapsed_ns, Ordering::Relaxed);
*last = Some(now);
} else {
*last = Some(prev);
}
}

/// Order this array this way:
/// 1. Always enabled types.
/// 2. On by default types.
Expand Down Expand Up @@ -569,6 +596,7 @@ impl TimeCollector {
let upload_tick = crossbeam_channel::tick(self.upload_period);
let never = crossbeam_channel::never();
let mut running = true;
let mut last_cpu = ThreadTime::try_now().ok();

while running {
// The crossbeam_channel::select! doesn't have the ability to
Expand All @@ -594,6 +622,7 @@ impl TimeCollector {
Self::handle_resource_message(message, &mut profiles),
ProfilerMessage::Cancel => {
// flush what we have before exiting
update_cpu_time_counter(&mut last_cpu, &DDPROF_TIME_CPU_TIME_NS);
last_wall_export = self.handle_timeout(&mut profiles, &last_wall_export);
running = false;
},
Expand Down Expand Up @@ -631,6 +660,7 @@ impl TimeCollector {

recv(upload_tick) -> message => {
if message.is_ok() {
update_cpu_time_counter(&mut last_cpu, &DDPROF_TIME_CPU_TIME_NS);
last_wall_export = self.handle_timeout(&mut profiles, &last_wall_export);
}
},
Expand Down Expand Up @@ -903,13 +933,29 @@ impl Profiler {
unsafe { (*system_settings).profiling_timeline_enabled }
}

fn collect_stack_sample_timed(
&self,
execute_data: *mut zend_execute_data,
) -> Result<Backtrace, CollectStackSampleError> {
let start = ThreadTime::try_now().ok();
let result = collect_stack_sample(execute_data);
STACK_WALK_COUNT.fetch_add(1, Ordering::Relaxed);
if let Some(start) = start {
if let Ok(end) = ThreadTime::try_now() {
let elapsed_ns = cpu_time_delta_ns(end, start);
STACK_WALK_CPU_TIME_NS.fetch_add(elapsed_ns, Ordering::Relaxed);
}
}
result
}

/// Collect a stack sample with elapsed wall time. Collects CPU time if
/// it's enabled and available.
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "debug"))]
pub fn collect_time(&self, execute_data: *mut zend_execute_data, interrupt_count: u32) {
// todo: should probably exclude the wall and CPU time used by collecting the sample.
let interrupt_count = interrupt_count as i64;
let result = collect_stack_sample(execute_data);
let result = self.collect_stack_sample_timed(execute_data);
match result {
Ok(frames) => {
let depth = frames.len();
Expand Down Expand Up @@ -957,7 +1003,7 @@ impl Profiler {
alloc_size: i64,
interrupt_count: Option<u32>,
) {
let result = collect_stack_sample(execute_data);
let result = self.collect_stack_sample_timed(execute_data);
match result {
Ok(frames) => {
let depth = frames.len();
Expand Down Expand Up @@ -1010,7 +1056,7 @@ impl Profiler {
exception: String,
message: Option<String>,
) {
let result = collect_stack_sample(execute_data);
let result = self.collect_stack_sample_timed(execute_data);
match result {
Ok(frames) => {
let depth = frames.len();
Expand Down Expand Up @@ -1410,7 +1456,7 @@ impl Profiler {
where
F: FnOnce(&mut SampleValues),
{
let result = collect_stack_sample(execute_data);
let result = self.collect_stack_sample_timed(execute_data);
match result {
Ok(frames) => {
let depth = frames.len();
Expand Down
91 changes: 77 additions & 14 deletions profiling/src/profiling/uploader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use crate::config::AgentEndpoint;
use crate::profiling::{UploadMessage, UploadRequest};
use crate::profiling::{
update_cpu_time_counter, UploadMessage, UploadRequest, DDPROF_TIME_CPU_TIME_NS,
DDPROF_UPLOAD_CPU_TIME_NS, STACK_WALK_COUNT, STACK_WALK_CPU_TIME_NS,
};
use crate::{PROFILER_NAME_STR, PROFILER_VERSION_STR};
use chrono::{DateTime, Utc};
use cpu_time::ThreadTime;
use crossbeam_channel::{select, Receiver};
use libdd_common::Endpoint;
use log::{debug, info, warn};
Expand All @@ -14,7 +18,6 @@ use std::sync::{Arc, Barrier};
use crate::allocation::{ALLOCATION_PROFILING_COUNT, ALLOCATION_PROFILING_SIZE};
#[cfg(feature = "debug_stats")]
use crate::exception::EXCEPTION_PROFILING_EXCEPTION_COUNT;
#[cfg(feature = "debug_stats")]
use std::sync::atomic::Ordering;

pub struct Uploader {
Expand Down Expand Up @@ -44,13 +47,41 @@ impl Uploader {

/// This function will not only create the internal metadata JSON representation, but is also
/// in charge to reset all those counters back to 0.
#[cfg(feature = "debug_stats")]
fn create_internal_metadata() -> Option<serde_json::Value> {
Some(json!({
"exceptions_count": EXCEPTION_PROFILING_EXCEPTION_COUNT.swap(0, Ordering::Relaxed),
"allocations_count": ALLOCATION_PROFILING_COUNT.swap(0, Ordering::Relaxed),
"allocations_size": ALLOCATION_PROFILING_SIZE.swap(0, Ordering::Relaxed),
}))
let capacity = 4 + cfg!(feature = "debug_stats") as usize * 3;
let mut metadata = serde_json::Map::with_capacity(capacity);
metadata.insert(
"stack_walk_count".to_string(),
json!(STACK_WALK_COUNT.swap(0, Ordering::Relaxed)),
);
metadata.insert(
"stack_walk_cpu_time_ns".to_string(),
json!(STACK_WALK_CPU_TIME_NS.swap(0, Ordering::Relaxed)),
);
metadata.insert(
"ddprof_time_cpu_time_ns".to_string(),
json!(DDPROF_TIME_CPU_TIME_NS.swap(0, Ordering::Relaxed)),
);
metadata.insert(
"ddprof_upload_cpu_time_ns".to_string(),
json!(DDPROF_UPLOAD_CPU_TIME_NS.swap(0, Ordering::Relaxed)),
);
#[cfg(feature = "debug_stats")]
{
metadata.insert(
"exceptions_count".to_string(),
json!(EXCEPTION_PROFILING_EXCEPTION_COUNT.swap(0, Ordering::Relaxed)),
);
metadata.insert(
"allocations_count".to_string(),
json!(ALLOCATION_PROFILING_COUNT.swap(0, Ordering::Relaxed)),
);
metadata.insert(
"allocations_size".to_string(),
json!(ALLOCATION_PROFILING_SIZE.swap(0, Ordering::Relaxed)),
);
}
Some(serde_json::Value::Object(metadata))
}

fn create_profiler_info(&self) -> Option<serde_json::Value> {
Expand All @@ -62,7 +93,11 @@ impl Uploader {
Some(metadata)
}

fn upload(&self, message: Box<UploadRequest>) -> anyhow::Result<u16> {
fn upload(
&self,
message: Box<UploadRequest>,
last_cpu: &mut Option<ThreadTime>,
) -> anyhow::Result<u16> {
let index = message.index;
let profile = message.profile;

Expand All @@ -83,16 +118,18 @@ impl Uploader {
let serialized =
profile.serialize_into_compressed_pprof(Some(message.end_time), message.duration)?;
exporter.set_timeout(10000); // 10 seconds in milliseconds

// Capture CPU time up to this point. Note: metadata generation, exporter
// building, and HTTP request time will be attributed to the next profile.
update_cpu_time_counter(last_cpu, &DDPROF_UPLOAD_CPU_TIME_NS);

let request = exporter.build(
serialized,
&[],
&[],
None,
None,
#[cfg(feature = "debug_stats")]
Self::create_internal_metadata(),
#[cfg(not(feature = "debug_stats"))]
None,
self.create_profiler_info(),
)?;
debug!("Sending profile to: {agent_endpoint}");
Expand All @@ -106,7 +143,7 @@ impl Uploader {
*/
let pprof_filename = &self.output_pprof;
let mut i = 0;

let mut last_cpu = ThreadTime::try_now().ok();
loop {
/* Since profiling uploads are going over the Internet and not just
* the local network, it would be ideal if they were the lowest
Expand All @@ -132,7 +169,7 @@ impl Uploader {
std::fs::write(&name, r.buffer).expect("write to succeed");
info!("Successfully wrote profile to {name}");
},
None => match self.upload(request) {
None => match self.upload(request, &mut last_cpu) {
Ok(status) => {
if status >= 400 {
warn!("Unexpected HTTP status when sending profile (HTTP {status}).")
Expand Down Expand Up @@ -169,6 +206,10 @@ mod tests {
#[test]
fn test_create_internal_metadata() {
// Set up all counters with known values
STACK_WALK_COUNT.store(7, Ordering::Relaxed);
STACK_WALK_CPU_TIME_NS.store(9000, Ordering::Relaxed);
DDPROF_TIME_CPU_TIME_NS.store(1234, Ordering::Relaxed);
DDPROF_UPLOAD_CPU_TIME_NS.store(5678, Ordering::Relaxed);
EXCEPTION_PROFILING_EXCEPTION_COUNT.store(42, Ordering::Relaxed);
ALLOCATION_PROFILING_COUNT.store(100, Ordering::Relaxed);
ALLOCATION_PROFILING_SIZE.store(1024, Ordering::Relaxed);
Expand All @@ -181,6 +222,28 @@ mod tests {
let metadata = metadata.unwrap();

// The metadata should contain all counts
assert_eq!(
metadata.get("stack_walk_count").and_then(|v| v.as_u64()),
Some(7)
);
assert_eq!(
metadata
.get("stack_walk_cpu_time_ns")
.and_then(|v| v.as_u64()),
Some(9000)
);
assert_eq!(
metadata
.get("ddprof_time_cpu_time_ns")
.and_then(|v| v.as_u64()),
Some(1234)
);
assert_eq!(
metadata
.get("ddprof_upload_cpu_time_ns")
.and_then(|v| v.as_u64()),
Some(5678)
);

assert_eq!(
metadata.get("exceptions_count").and_then(|v| v.as_u64()),
Expand Down
Loading