Skip to content
18 changes: 18 additions & 0 deletions sentry_sdk/integrations/asyncpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,17 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T":
params_list = args[2] if len(args) > 2 else None
with _record(None, query, params_list, executemany=executemany) as span:
_set_db_data(span, args[0])

res = await f(*args, **kwargs)

if isinstance(span, StreamedSpan):
with capture_internal_exceptions():
add_query_source(span)

if not isinstance(span, StreamedSpan):
with capture_internal_exceptions():
add_query_source(span)

return res

return _inner
Expand All @@ -163,8 +172,17 @@ def _inner(*args: "Any", **kwargs: "Any") -> "T": # noqa: N807
executemany=False,
) as span:
_set_db_data(span, args[0])

res = f(*args, **kwargs)

if isinstance(span, StreamedSpan):
with capture_internal_exceptions():
add_query_source(span)

if not isinstance(span, StreamedSpan):
with capture_internal_exceptions():
add_query_source(span)

return res

return _inner
Expand Down
331 changes: 311 additions & 20 deletions tests/integrations/asyncpg/test_asyncpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,10 @@ async def test_query_source(sentry_init, capture_events, capture_items, span_str


@pytest.mark.asyncio
async def test_query_source_with_module_in_search_path(sentry_init, capture_events):
@pytest.mark.parametrize("span_streaming", [True, False])
async def test_query_source_with_module_in_search_path(
sentry_init, capture_events, capture_items, span_streaming
):
"""
Test that query source is relative to the path of the module it ran in
"""
Expand All @@ -740,40 +743,71 @@ async def test_query_source_with_module_in_search_path(sentry_init, capture_even
traces_sample_rate=1.0,
enable_db_query_source=True,
db_query_source_threshold_ms=0,
_experiments={
"trace_lifecycle": "stream" if span_streaming else "static",
},
)

events = capture_events()

from asyncpg_helpers.helpers import execute_query_in_connection

with start_transaction(name="test_transaction", sampled=True):
conn: Connection = await connect(PG_CONNECTION_URI)
if span_streaming:
items = capture_items("span")
with sentry_sdk.traces.start_span(name="test_transaction"):
conn: Connection = await connect(PG_CONNECTION_URI)

await execute_query_in_connection(
"INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')",
conn,
)
await execute_query_in_connection(
"INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')",
conn,
)

await conn.close()
await conn.close()
sentry_sdk.flush()

(event,) = events
spans = [item.payload for item in items]

span = event["spans"][-1]
assert span["description"].startswith("INSERT INTO")
assert len(spans) == 3

data = span.get("data", {})
connect_span = spans[0]
insert_span = spans[1]
segment = spans[2]

assert SPANDATA.CODE_LINENO in data
assert segment["name"] == "test_transaction"
assert insert_span["name"].startswith("INSERT INTO")
assert connect_span["name"] == "connect"
data = insert_span.get("attributes", {})
else:
events = capture_events()

with start_transaction(name="test_transaction", sampled=True):
conn: Connection = await connect(PG_CONNECTION_URI)

await execute_query_in_connection(
"INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')",
conn,
)

await conn.close()

(event,) = events

span = event["spans"][-1]
assert span["description"].startswith("INSERT INTO")
data = span.get("data", {})

lineno_key = "code.line.number" if span_streaming else SPANDATA.CODE_LINENO
filepath_key = "code.file.path" if span_streaming else SPANDATA.CODE_FILEPATH

assert lineno_key in data
assert filepath_key in data
assert SPANDATA.CODE_NAMESPACE in data
assert SPANDATA.CODE_FILEPATH in data
assert SPANDATA.CODE_FUNCTION in data

assert type(data.get(SPANDATA.CODE_LINENO)) == int
assert data.get(SPANDATA.CODE_LINENO) > 0
assert type(data.get(lineno_key)) == int
assert data.get(lineno_key) > 0
assert data.get(filepath_key) == "asyncpg_helpers/helpers.py"
assert data.get(SPANDATA.CODE_NAMESPACE) == "asyncpg_helpers.helpers"
assert data.get(SPANDATA.CODE_FILEPATH) == "asyncpg_helpers/helpers.py"

is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep
is_relative_path = data.get(filepath_key)[0] != os.sep
assert is_relative_path

assert data.get(SPANDATA.CODE_FUNCTION) == "execute_query_in_connection"
Expand Down Expand Up @@ -1102,3 +1136,260 @@ def before_send_transaction(event, hint):

assert len(spans) == 1
assert spans[0]["description"] == "filtered"


def _assert_query_source(span, span_streaming, expected_function):
if span_streaming:
data = span.get("attributes", {})
lineno_key = "code.line.number"
filepath_key = "code.file.path"
else:
data = span.get("data", {})
lineno_key = SPANDATA.CODE_LINENO
filepath_key = SPANDATA.CODE_FILEPATH

assert lineno_key in data
assert filepath_key in data
assert SPANDATA.CODE_NAMESPACE in data
assert SPANDATA.CODE_FUNCTION in data

assert type(data.get(lineno_key)) == int
assert data.get(lineno_key) > 0
assert data[SPANDATA.CODE_NAMESPACE] == "tests.integrations.asyncpg.test_asyncpg"
assert data.get(filepath_key).endswith("tests/integrations/asyncpg/test_asyncpg.py")
assert data.get(filepath_key)[0] != os.sep
assert data[SPANDATA.CODE_FUNCTION] == expected_function


@pytest.mark.asyncio
@pytest.mark.parametrize("span_streaming", [True, False])
async def test_query_source_execute(
sentry_init, capture_events, capture_items, span_streaming
):
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
enable_db_query_source=True,
db_query_source_threshold_ms=0,
_experiments={
"trace_lifecycle": "stream" if span_streaming else "static",
},
)

if span_streaming:
items = capture_items("span")
with sentry_sdk.traces.start_span(name="test_transaction"):
conn: Connection = await connect(PG_CONNECTION_URI)
await conn.execute(
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
"Alice",
"pw",
datetime.date(1990, 12, 25),
)
await conn.close()
sentry_sdk.flush()

spans = [item.payload for item in items]
assert len(spans) == 3

connect_span = spans[0]
query_span = spans[1]
segment = spans[2]

assert connect_span["name"] == "connect"
assert query_span["name"].startswith("INSERT INTO")
assert segment["name"] == "test_transaction"
assert segment["is_segment"] is True
else:
events = capture_events()
with start_transaction(name="test_transaction", sampled=True):
conn: Connection = await connect(PG_CONNECTION_URI)
await conn.execute(
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
"Alice",
"pw",
datetime.date(1990, 12, 25),
)
await conn.close()

(event,) = events
spans = event["spans"]
assert len(spans) == 2
assert spans[0]["description"] == "connect"
assert spans[1]["description"].startswith("INSERT INTO")
query_span = spans[1]

_assert_query_source(query_span, span_streaming, "test_query_source_execute")


@pytest.mark.asyncio
@pytest.mark.parametrize("span_streaming", [True, False])
async def test_query_source_executemany(
sentry_init, capture_events, capture_items, span_streaming
):
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
enable_db_query_source=True,
db_query_source_threshold_ms=0,
_experiments={
"trace_lifecycle": "stream" if span_streaming else "static",
},
)

if span_streaming:
items = capture_items("span")
with sentry_sdk.traces.start_span(name="test_transaction"):
conn: Connection = await connect(PG_CONNECTION_URI)
await conn.executemany(
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
[("Bob", "secret_pw", datetime.date(1984, 3, 1))],
)
await conn.close()
sentry_sdk.flush()

spans = [item.payload for item in items]
assert len(spans) == 3

connect_span = spans[0]
query_span = spans[1]
segment = spans[2]

assert connect_span["name"] == "connect"
assert query_span["name"].startswith("INSERT INTO")
assert segment["name"] == "test_transaction"
else:
events = capture_events()
with start_transaction(name="test_transaction", sampled=True):
conn: Connection = await connect(PG_CONNECTION_URI)
await conn.executemany(
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
[("Bob", "secret_pw", datetime.date(1984, 3, 1))],
)
await conn.close()

(event,) = events
spans = event["spans"]
assert len(spans) == 2
assert spans[0]["description"] == "connect"
assert spans[1]["description"].startswith("INSERT INTO")
query_span = spans[1]

_assert_query_source(query_span, span_streaming, "test_query_source_executemany")


@pytest.mark.asyncio
@pytest.mark.parametrize("span_streaming", [True, False])
async def test_query_source_prepare(
sentry_init, capture_events, capture_items, span_streaming
):
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
enable_db_query_source=True,
db_query_source_threshold_ms=0,
_experiments={
"trace_lifecycle": "stream" if span_streaming else "static",
},
)

if span_streaming:
items = capture_items("span")
with sentry_sdk.traces.start_span(name="test_transaction"):
conn: Connection = await connect(PG_CONNECTION_URI)
await conn.prepare("SELECT * FROM users WHERE name = $1")
await conn.close()
sentry_sdk.flush()

spans = [item.payload for item in items]
assert len(spans) == 3
connect_span = spans[0]
query_span = spans[1]
segment = spans[2]

assert connect_span["name"] == "connect"
assert query_span["name"] == "SELECT * FROM users WHERE name = $1"
assert segment["name"] == "test_transaction"
else:
events = capture_events()
with start_transaction(name="test_transaction", sampled=True):
conn: Connection = await connect(PG_CONNECTION_URI)
await conn.prepare("SELECT * FROM users WHERE name = $1")
await conn.close()

(event,) = events
spans = event["spans"]
assert len(spans) == 2
assert spans[0]["description"] == "connect"
assert spans[1]["description"] == "SELECT * FROM users WHERE name = $1"
query_span = spans[1]

_assert_query_source(query_span, span_streaming, "test_query_source_prepare")


@pytest.mark.asyncio
@pytest.mark.parametrize("span_streaming", [True, False])
async def test_query_source_cursor(
sentry_init, capture_events, capture_items, span_streaming
):
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
enable_db_query_source=True,
db_query_source_threshold_ms=0,
_experiments={
"trace_lifecycle": "stream" if span_streaming else "static",
},
)

if span_streaming:
items = capture_items("span")
with sentry_sdk.traces.start_span(name="test_transaction"):
conn: Connection = await connect(PG_CONNECTION_URI)
async with conn.transaction():
async for _ in conn.cursor(
"SELECT * FROM users WHERE dob > $1", datetime.date(1970, 1, 1)
):
pass
await conn.close()
sentry_sdk.flush()

spans = [item.payload for item in items]
assert len(spans) == 5
connect_span = spans[0]
begin_span = spans[1]
query_span = spans[2]
commit_span = spans[3]
segment = spans[4]

assert connect_span["name"] == "connect"
assert begin_span["name"] == "BEGIN;"
assert query_span["name"] == "SELECT * FROM users WHERE dob > $1"
assert commit_span["name"] == "COMMIT;"
assert segment["name"] == "test_transaction"
else:
events = capture_events()
with start_transaction(name="test_transaction", sampled=True):
conn: Connection = await connect(PG_CONNECTION_URI)
async with conn.transaction():
async for _ in conn.cursor(
"SELECT * FROM users WHERE dob > $1", datetime.date(1970, 1, 1)
):
pass
await conn.close()

(event,) = events
spans = event["spans"]
assert len(spans) == 4

connect_span = spans[0]
begin_span = spans[1]
query_span = spans[2]
commit_span = spans[3]

assert connect_span["description"] == "connect"
assert begin_span["description"] == "BEGIN;"
assert query_span["description"] == "SELECT * FROM users WHERE dob > $1"
assert commit_span["description"] == "COMMIT;"

_assert_query_source(query_span, span_streaming, "test_query_source_cursor")
Loading