Skip to content

Commit 876532f

Browse files
authored
ref: Per-bucket limits, fix envelope chunking (8) (#5595)
### Description - correctly splice spans into multiple envelopes if a bucket contains more than 1000 spans from the same `trace_id` - add weight-based flushing (actual limit is 10 MB, we flush at 5 MB) - apply the weight and amount limits per `trace_id` rather than globally - add `trace_id`, `parent_span_id` to the transport format #### Issues <!-- * resolves: #1234 * resolves: LIN-1234 --> #### Reminders - Please add tests to validate your changes, and lint your code using `tox -e linters`. - Add GH Issue ID _&_ Linear ID (if applicable) - PR title should use [conventional commit](https://develop.sentry.dev/engineering-practices/commit-messages/#type) style (`feat:`, `fix:`, `ref:`, `meta:`) - For external contributors: [CONTRIBUTING.md](https://github.com/getsentry/sentry-python/blob/master/CONTRIBUTING.md), [Sentry SDK development docs](https://develop.sentry.dev/sdk/), [Discord community](https://discord.gg/Ww9hbqr)
1 parent 57f8181 commit 876532f

1 file changed

Lines changed: 58 additions & 32 deletions

File tree

sentry_sdk/_span_batcher.py

Lines changed: 58 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,15 @@
1515

1616

1717
class SpanBatcher(Batcher["StreamedSpan"]):
18-
# TODO[span-first]: size-based flushes
19-
# TODO[span-first]: adjust flush/drop defaults
18+
# MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
19+
# a bit of a buffer for spans that appear between setting the flush event
20+
# and actually flushing the buffer.
21+
#
22+
# The max limits are all per trace.
23+
MAX_ENVELOPE_SIZE = 1000 # spans
2024
MAX_BEFORE_FLUSH = 1000
21-
MAX_BEFORE_DROP = 5000
25+
MAX_BEFORE_DROP = 2000
26+
MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB
2227
FLUSH_WAIT_TIME = 5.0
2328

2429
TYPE = "span"
@@ -35,6 +40,7 @@ def __init__(
3540
# envelope.
3641
# trace_id -> span buffer
3742
self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list)
43+
self._running_size: dict[str, int] = defaultdict(lambda: 0)
3844
self._capture_func = capture_func
3945
self._record_lost_func = record_lost_func
4046
self._running = True
@@ -45,16 +51,12 @@ def __init__(
4551
self._flusher: "Optional[threading.Thread]" = None
4652
self._flusher_pid: "Optional[int]" = None
4753

48-
def get_size(self) -> int:
49-
# caller is responsible for locking before checking this
50-
return sum(len(buffer) for buffer in self._span_buffer.values())
51-
5254
def add(self, span: "StreamedSpan") -> None:
5355
if not self._ensure_thread() or self._flusher is None:
5456
return None
5557

5658
with self._lock:
57-
size = self.get_size()
59+
size = len(self._span_buffer[span.trace_id])
5860
if size >= self.MAX_BEFORE_DROP:
5961
self._record_lost_func(
6062
reason="queue_overflow",
@@ -64,18 +66,36 @@ def add(self, span: "StreamedSpan") -> None:
6466
return None
6567

6668
self._span_buffer[span.trace_id].append(span)
69+
self._running_size[span.trace_id] += self._estimate_size(span)
70+
6771
if size + 1 >= self.MAX_BEFORE_FLUSH:
6872
self._flush_event.set()
73+
return
74+
75+
if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
76+
self._flush_event.set()
77+
return
78+
79+
@staticmethod
80+
def _estimate_size(item: "StreamedSpan") -> int:
81+
# Rough estimate of serialized span size that's quick to compute.
82+
# 210 is the rough size of the payload without attributes, and we
83+
# estimate additional 70 bytes on top of that per attribute.
84+
return 210 + 70 * len(item._attributes)
6985

7086
@staticmethod
7187
def _to_transport_format(item: "StreamedSpan") -> "Any":
7288
# TODO[span-first]
7389
res: "dict[str, Any]" = {
90+
"trace_id": item.trace_id,
7491
"span_id": item.span_id,
7592
"name": item._name,
7693
"status": item._status,
7794
}
7895

96+
if item._parent_span_id:
97+
res["parent_span_id"] = item._parent_span_id
98+
7999
if item._attributes:
80100
res["attributes"] = {
81101
k: serialize_attribute(v) for (k, v) in item._attributes.items()
@@ -86,7 +106,7 @@ def _to_transport_format(item: "StreamedSpan") -> "Any":
86106
def _flush(self) -> None:
87107
with self._lock:
88108
if len(self._span_buffer) == 0:
89-
return None
109+
return
90110

91111
envelopes = []
92112
for trace_id, spans in self._span_buffer.items():
@@ -95,34 +115,40 @@ def _flush(self) -> None:
95115
# dsc = spans[0].dynamic_sampling_context()
96116
dsc = None
97117

98-
envelope = Envelope(
99-
headers={
100-
"sent_at": format_timestamp(datetime.now(timezone.utc)),
101-
"trace": dsc,
102-
}
103-
)
104-
105-
envelope.add_item(
106-
Item(
107-
type="span",
108-
content_type="application/vnd.sentry.items.span.v2+json",
118+
# Max per envelope is 1000, so if we happen to have more than
119+
# 1000 spans in one bucket, we'll need to separate them.
120+
for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE):
121+
end = min(start + self.MAX_ENVELOPE_SIZE, len(spans))
122+
123+
envelope = Envelope(
109124
headers={
110-
"item_count": len(spans),
111-
},
112-
payload=PayloadRef(
113-
json={
114-
"items": [
115-
self._to_transport_format(span)
116-
for span in spans
117-
]
118-
}
119-
),
125+
"sent_at": format_timestamp(datetime.now(timezone.utc)),
126+
"trace": dsc,
127+
}
128+
)
129+
130+
envelope.add_item(
131+
Item(
132+
type=self.TYPE,
133+
content_type=self.CONTENT_TYPE,
134+
headers={
135+
"item_count": end - start,
136+
},
137+
payload=PayloadRef(
138+
json={
139+
"items": [
140+
self._to_transport_format(spans[j])
141+
for j in range(start, end)
142+
]
143+
}
144+
),
145+
)
120146
)
121-
)
122147

123-
envelopes.append(envelope)
148+
envelopes.append(envelope)
124149

125150
self._span_buffer.clear()
151+
self._running_size.clear()
126152

127153
for envelope in envelopes:
128154
self._capture_func(envelope)

0 commit comments

Comments
 (0)