diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/CHANGELOG.md b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/CHANGELOG.md index 3b4c6095e..e3305ccfa 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/CHANGELOG.md +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Changed + +- Improved LiteLLM GenAI util invocation mapping for positional arguments, + streaming time-to-first-token, multi-choice outputs, tool-call deltas, and + a real smoke example + ([#191](https://github.com/alibaba/loongsuite-python-agent/pull/191)). + ## Version 0.5.0 (2026-05-11) There are no changelog entries for this release. diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst index 0f940f728..452968579 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst @@ -25,6 +25,8 @@ Configuration The instrumentation can be enabled/disabled using environment variables: * ``ENABLE_LITELLM_INSTRUMENTOR``: Enable/disable instrumentation (default: true) +* ``OTEL_SEMCONV_STABILITY_OPT_IN``: Set to ``gen_ai_latest_experimental`` to enable GenAI semantic conventions +* ``OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT``: Set to ``NO_CONTENT``, ``SPAN_ONLY``, ``EVENT_ONLY``, or ``SPAN_AND_EVENT`` Usage ----- @@ -43,6 +45,32 @@ Usage messages=[{"role": "user", "content": "Hello!"}] ) +Local OTLP smoke +---------------- + +The ``examples/litellm_genai_smoke.py`` script sends real LiteLLM traffic for: + +* non-streaming completion +* streaming completion +* concurrent async completion calls + +Set ``LITELLM_SMOKE_MODE`` to ``non_streaming``, ``streaming``, +``concurrent``, or ``all`` (default) to run a subset. + +Example with a local ``otel-gui`` OTLP endpoint: + +.. code:: console + + export DASHSCOPE_API_KEY=... + export OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:4318 + export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf + export OTEL_SEMCONV_STABILITY_OPT_IN=gen_ai_latest_experimental + export OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=SPAN_ONLY + export OTEL_SERVICE_NAME=loongsuite-litellm-smoke + + loongsuite-instrument python \ + instrumentation-loongsuite/loongsuite-instrumentation-litellm/examples/litellm_genai_smoke.py + Features -------- @@ -53,6 +81,9 @@ This instrumentation automatically captures: * Embedding calls * Retry mechanisms * Tool/function calls +* Provider inference from known OpenAI-compatible base URLs, custom providers, and model names +* Streaming time-to-first-token, including reasoning/thinking deltas +* Multi-choice streaming outputs and tool-call delta accumulation * Request and response metadata * Token usage * Model information @@ -65,4 +96,3 @@ References * `OpenTelemetry LiteLLM Instrumentation `_ * `OpenTelemetry Project `_ * `LiteLLM Documentation `_ - diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/examples/litellm_genai_smoke.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/examples/litellm_genai_smoke.py new file mode 100644 index 000000000..55c5eb2e1 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/examples/litellm_genai_smoke.py @@ -0,0 +1,137 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Real LiteLLM smoke traffic for LoongSuite GenAI telemetry. + +Run this under ``loongsuite-instrument`` with OTLP configured. The script +exercises non-streaming, streaming, and concurrent async completion calls. +""" + +from __future__ import annotations + +import asyncio +import os + +import litellm + +MODEL = os.getenv("LITELLM_MODEL", "qwen-turbo") +API_BASE = os.getenv( + "LITELLM_API_BASE", + "https://dashscope.aliyuncs.com/compatible-mode/v1", +) +CUSTOM_PROVIDER = os.getenv("LITELLM_CUSTOM_LLM_PROVIDER", "openai") + + +def _configure_provider() -> None: + litellm.telemetry = False + + +def _provider_kwargs() -> dict[str, str]: + api_key = ( + os.getenv("LITELLM_API_KEY") + or os.getenv("DASHSCOPE_API_KEY") + or os.getenv("OPENAI_API_KEY") + ) + if not api_key: + raise SystemExit( + "Missing required API key: set LITELLM_API_KEY, " + "DASHSCOPE_API_KEY, or OPENAI_API_KEY" + ) + + return { + "custom_llm_provider": CUSTOM_PROVIDER, + "api_key": api_key, + "api_base": API_BASE, + } + + +def run_non_streaming() -> None: + response = litellm.completion( + model=MODEL, + **_provider_kwargs(), + messages=[ + { + "role": "user", + "content": "Reply with exactly one short sentence.", + } + ], + temperature=0.1, + max_tokens=64, + ) + print("non_streaming:", response.choices[0].message.content[:80]) + + +def run_streaming() -> None: + stream = litellm.completion( + model=MODEL, + **_provider_kwargs(), + messages=[ + { + "role": "user", + "content": "Count from one to five, separated by commas.", + } + ], + stream=True, + temperature=0.1, + max_tokens=64, + ) + + chunks = [] + for chunk in stream: + if chunk.choices: + delta = chunk.choices[0].delta + if getattr(delta, "content", None): + chunks.append(delta.content) + print("streaming:", "".join(chunks)[:80]) + + +async def run_concurrent() -> None: + prompts = [ + "Give one word for sky color.", + "Give one word for ocean color.", + "Give one word for grass color.", + ] + + async def call(prompt: str): + return await litellm.acompletion( + model=MODEL, + **_provider_kwargs(), + messages=[{"role": "user", "content": prompt}], + temperature=0.1, + max_tokens=32, + ) + + responses = await asyncio.gather(*(call(prompt) for prompt in prompts)) + print( + "concurrent:", + ", ".join( + response.choices[0].message.content[:24] for response in responses + ), + ) + + +def main() -> None: + _configure_provider() + mode = os.getenv("LITELLM_SMOKE_MODE", "all").lower() + + if mode in ("all", "non_streaming"): + run_non_streaming() + if mode in ("all", "streaming"): + run_streaming() + if mode in ("all", "concurrent"): + asyncio.run(run_concurrent()) + + +if __name__ == "__main__": + main() diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/pyproject.toml b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/pyproject.toml index fbd8ae831..3535c16b2 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/pyproject.toml +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/pyproject.toml @@ -41,18 +41,18 @@ instruments = [ litellm = "opentelemetry.instrumentation.litellm:LiteLLMInstrumentor" [project.urls] -Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation/opentelemetry-instrumentation-litellm" -Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" +Homepage = "https://github.com/alibaba/loongsuite-python-agent/tree/main/instrumentation-loongsuite/loongsuite-instrumentation-litellm" +Repository = "https://github.com/alibaba/loongsuite-python-agent" [tool.hatch.version] path = "src/opentelemetry/instrumentation/litellm/version.py" [tool.hatch.build.targets.sdist] include = [ + "/examples", "/src", "/tests", ] [tool.hatch.build.targets.wheel] packages = ["src/opentelemetry"] - diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py index cda320946..a954ce999 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py @@ -16,19 +16,18 @@ Embedding wrapper for LiteLLM instrumentation. """ -import logging import os from typing import Callable from opentelemetry import context from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY from opentelemetry.instrumentation.litellm._utils import ( + apply_litellm_embedding_response_to_invocation, create_embedding_invocation_from_litellm, + normalize_litellm_embedding_kwargs, ) from opentelemetry.util.genai.types import Error -logger = logging.getLogger(__name__) - def _is_instrumentation_enabled() -> bool: """Check if instrumentation is enabled via environment variable.""" @@ -53,8 +52,10 @@ def __call__(self, *args, **kwargs): if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return self.original_func(*args, **kwargs) - # Create invocation object - invocation = create_embedding_invocation_from_litellm(**kwargs) + request_kwargs = normalize_litellm_embedding_kwargs( + self.original_func, args, kwargs + ) + invocation = create_embedding_invocation_from_litellm(**request_kwargs) # Start Embedding invocation self._handler.start_embedding(invocation) @@ -63,43 +64,9 @@ def __call__(self, *args, **kwargs): # Call original function response = self.original_func(*args, **kwargs) - # Extract response metadata - if hasattr(response, "model"): - invocation.response_model_name = response.model - - # Extract token usage if available - if hasattr(response, "usage") and response.usage: - invocation.input_tokens = getattr( - response.usage, "prompt_tokens", None - ) - invocation.output_tokens = getattr( - response.usage, "total_tokens", None - ) - - # Extract embedding dimension count - if ( - hasattr(response, "data") - and response.data - and len(response.data) > 0 - ): - try: - first_embedding = response.data[0] - # Handle dict response - if ( - isinstance(first_embedding, dict) - and "embedding" in first_embedding - ): - embedding_vector = first_embedding["embedding"] - if isinstance(embedding_vector, list): - invocation.dimension_count = len(embedding_vector) - # Handle object response - elif hasattr(first_embedding, "embedding"): - embedding_vector = first_embedding.embedding - if isinstance(embedding_vector, list): - invocation.dimension_count = len(embedding_vector) - except (IndexError, AttributeError, KeyError, TypeError): - # If we can't extract dimension, just skip it - pass + apply_litellm_embedding_response_to_invocation( + invocation, response + ) # End Embedding invocation successfully self._handler.stop_embedding(invocation) @@ -131,8 +98,10 @@ async def __call__(self, *args, **kwargs): if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return await self.original_func(*args, **kwargs) - # Create invocation object - invocation = create_embedding_invocation_from_litellm(**kwargs) + request_kwargs = normalize_litellm_embedding_kwargs( + self.original_func, args, kwargs + ) + invocation = create_embedding_invocation_from_litellm(**request_kwargs) # Start Embedding invocation self._handler.start_embedding(invocation) @@ -141,43 +110,9 @@ async def __call__(self, *args, **kwargs): # Call original function response = await self.original_func(*args, **kwargs) - # Extract response metadata - if hasattr(response, "model"): - invocation.response_model_name = response.model - - # Extract token usage if available - if hasattr(response, "usage") and response.usage: - invocation.input_tokens = getattr( - response.usage, "prompt_tokens", None - ) - invocation.output_tokens = getattr( - response.usage, "total_tokens", None - ) - - # Extract embedding dimension count - if ( - hasattr(response, "data") - and response.data - and len(response.data) > 0 - ): - try: - first_embedding = response.data[0] - # Handle dict response - if ( - isinstance(first_embedding, dict) - and "embedding" in first_embedding - ): - embedding_vector = first_embedding["embedding"] - if isinstance(embedding_vector, list): - invocation.dimension_count = len(embedding_vector) - # Handle object response - elif hasattr(first_embedding, "embedding"): - embedding_vector = first_embedding.embedding - if isinstance(embedding_vector, list): - invocation.dimension_count = len(embedding_vector) - except (IndexError, AttributeError, KeyError, TypeError): - # If we can't extract dimension, just skip it - pass + apply_litellm_embedding_response_to_invocation( + invocation, response + ) # End Embedding invocation successfully self._handler.stop_embedding(invocation) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py index 82c241ec6..20f6af70d 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py @@ -17,11 +17,176 @@ """ import logging +import timeit from typing import Any, Iterator, Optional +from opentelemetry.instrumentation.litellm._utils import ( + extract_litellm_text_parts, + get_litellm_value, + parse_tool_call_arguments, +) +from opentelemetry.util.genai.types import ( + OutputMessage, + Reasoning, + Text, + ToolCall, +) + logger = logging.getLogger(__name__) +class _StreamAccumulator: + """Accumulate LiteLLM streaming deltas into GenAI output messages.""" + + def __init__(self, invocation: Any = None): + self.invocation = invocation + self._choice_states: dict[int, dict[str, Any]] = {} + + def record_chunk(self, chunk: Any) -> None: + choices = get_litellm_value(chunk, "choices") or [] + if not choices: + return + + saw_token = False + for default_index, choice in enumerate(choices): + index = get_litellm_value(choice, "index", default_index) + if not isinstance(index, int): + index = default_index + + state = self._choice_states.setdefault( + index, + { + "role": "assistant", + "reasoning": [], + "content": [], + "finish_reason": None, + "tool_calls": {}, + }, + ) + + finish_reason = get_litellm_value(choice, "finish_reason") + if finish_reason: + state["finish_reason"] = finish_reason + + delta = get_litellm_value(choice, "delta") + if delta is None: + continue + + role = get_litellm_value(delta, "role") + if role: + state["role"] = role + + content = get_litellm_value(delta, "content") + content_parts = extract_litellm_text_parts(content) + if content_parts: + state["content"].extend(content_parts) + saw_token = True + + reasoning_content = get_litellm_value(delta, "reasoning_content") + if reasoning_content is None: + reasoning_content = get_litellm_value(delta, "reasoning") + reasoning_parts = extract_litellm_text_parts(reasoning_content) + if reasoning_parts: + state["reasoning"].extend(reasoning_parts) + saw_token = True + + tool_calls = get_litellm_value(delta, "tool_calls") + if tool_calls: + saw_token = True + self._record_tool_calls(state, tool_calls) + + if saw_token and self.invocation is not None: + first_token_time = getattr( + self.invocation, "monotonic_first_token_s", None + ) + if first_token_time is None: + self.invocation.monotonic_first_token_s = ( + timeit.default_timer() + ) + + def get_output_messages(self) -> list[OutputMessage]: + output_messages = [] + for index in sorted(self._choice_states): + state = self._choice_states[index] + parts = [] + reasoning = "".join(state["reasoning"]) + if reasoning: + parts.append(Reasoning(content=reasoning)) + + content = "".join(state["content"]) + if content: + parts.append(Text(content=content)) + + for tool_index in sorted(state["tool_calls"]): + tool_call = state["tool_calls"][tool_index] + arguments = parse_tool_call_arguments( + tool_call.get("arguments", "") + ) + if ( + tool_call.get("id") + or tool_call.get("name") + or arguments not in (None, "") + ): + parts.append( + ToolCall( + id=tool_call.get("id"), + name=tool_call.get("name", ""), + arguments=arguments, + ) + ) + + if not parts: + parts.append(Text(content="")) + + output_messages.append( + OutputMessage( + role=state["role"] or "assistant", + parts=parts, + finish_reason=state["finish_reason"] or "stop", + ) + ) + return output_messages + + def finish_reasons(self) -> list[str]: + finish_reasons = [] + for index in sorted(self._choice_states): + state = self._choice_states[index] + if state["finish_reason"]: + finish_reasons.append(state["finish_reason"]) + return finish_reasons + + @staticmethod + def _record_tool_calls( + state: dict[str, Any], tool_calls: list[Any] + ) -> None: + for fallback_index, tool_call in enumerate(tool_calls): + tool_index = get_litellm_value(tool_call, "index", fallback_index) + if not isinstance(tool_index, int): + tool_index = fallback_index + + stored = state["tool_calls"].setdefault( + tool_index, + {"id": None, "name": "", "arguments": ""}, + ) + + tool_id = get_litellm_value(tool_call, "id") + if tool_id: + stored["id"] = tool_id + + function = get_litellm_value(tool_call, "function") + function_name = get_litellm_value(function, "name") + if function_name: + stored["name"] = function_name + + arguments = get_litellm_value(function, "arguments") + if isinstance(arguments, str): + stored["arguments"] += arguments + elif arguments: + logger.debug( + "Skipping non-string LiteLLM streamed tool-call arguments" + ) + + class StreamWrapper: """ Wrapper for synchronous streaming responses. @@ -31,15 +196,22 @@ class StreamWrapper: Supports context manager protocol for reliable cleanup. """ - def __init__(self, stream: Iterator, span: Any, callback: callable): + _warned_unclosed_stream = False + + def __init__( + self, + stream: Iterator, + span: Any, + callback: callable, + invocation: Any = None, + ): self.stream = stream self.span = span self.callback = callback + self._accumulator = _StreamAccumulator(invocation) self.last_chunk = None # Only keep last chunk to avoid memory leak self.chunk_count = 0 self._finalized = False - self.accumulated_content = [] # Accumulate content for output messages - self.accumulated_tool_calls = [] # Accumulate tool calls def __iter__(self): return self @@ -48,17 +220,7 @@ def __next__(self): try: chunk = next(self.stream) - # Accumulate content from delta for output messages - if hasattr(chunk, "choices") and chunk.choices: - choice = chunk.choices[0] - if hasattr(choice, "delta"): - delta = choice.delta - # Accumulate text content - if hasattr(delta, "content") and delta.content: - self.accumulated_content.append(delta.content) - # Accumulate tool calls - if hasattr(delta, "tool_calls") and delta.tool_calls: - self.accumulated_tool_calls.extend(delta.tool_calls) + self._accumulator.record_chunk(chunk) # Only keep the last chunk (contains usage info) self.last_chunk = chunk @@ -93,15 +255,44 @@ def close(self): """Explicitly close and finalize the stream.""" self._finalize() + def __del__(self): + if getattr(self, "_finalized", True): + return + + if not StreamWrapper._warned_unclosed_stream: + StreamWrapper._warned_unclosed_stream = True + logger.warning( + "LiteLLM stream wrapper was garbage-collected before close; " + "finalizing the span. Use a context manager or call close() " + "when terminating streams early." + ) + + try: + self._finalize() + except Exception as exc: + logger.debug("Error finalizing unclosed LiteLLM stream: %s", exc) + + def _close_stream(self) -> None: + close = getattr(self.stream, "close", None) + if not callable(close): + return + + try: + close() + except Exception as exc: + logger.debug("Error closing LiteLLM stream: %s", exc) + def _finalize(self, error: Optional[Exception] = None): """Finalize the span with data from last chunk.""" if self._finalized: return self._finalized = True + self._close_stream() try: # Call the callback with only the last chunk - # Note: The callback is responsible for calling handler.stop_llm() or handler.fail_llm() + # Note: The callback is responsible for calling handler.stop_llm() + # or handler.fail_llm(). # which will end the span. We no longer call span.end() here. if self.callback: self.callback(self.span, self.last_chunk, error) @@ -111,6 +302,12 @@ def _finalize(self, error: Optional[Exception] = None): except Exception as e: logger.debug(f"Error finalizing stream: {e}") + def get_output_messages(self) -> list[OutputMessage]: + return self._accumulator.get_output_messages() + + def finish_reasons(self) -> list[str]: + return self._accumulator.finish_reasons() + class AsyncStreamWrapper: """ @@ -125,16 +322,21 @@ class AsyncStreamWrapper: 3. Letting the wrapper detect stream exhaustion """ - def __init__(self, stream, span: Any, callback: callable): + def __init__( + self, + stream, + span: Any, + callback: callable, + invocation: Any = None, + ): self.stream = stream self.span = span self.callback = callback + self._accumulator = _StreamAccumulator(invocation) self.last_chunk = None # Only keep last chunk to avoid memory leak self.chunk_count = 0 self._finalized = False self._stream_exhausted = False - self.accumulated_content = [] # Accumulate content for output messages - self.accumulated_tool_calls = [] # Accumulate tool calls def __aiter__(self): # Return an async generator that wraps the stream and ensures finalization @@ -148,21 +350,10 @@ async def _wrapped_iteration(self): 2. An exception occurs 3. The generator is closed early (via aclose()) """ + error = None try: async for chunk in self.stream: - # Accumulate content from delta for output messages - if hasattr(chunk, "choices") and chunk.choices: - choice = chunk.choices[0] - if hasattr(choice, "delta"): - delta = choice.delta - # Accumulate text content - if hasattr(delta, "content") and delta.content: - self.accumulated_content.append(delta.content) - # Accumulate tool calls - if hasattr(delta, "tool_calls") and delta.tool_calls: - self.accumulated_tool_calls.extend( - delta.tool_calls - ) + self._accumulator.record_chunk(chunk) # Only keep the last chunk (contains usage info) self.last_chunk = chunk @@ -177,11 +368,12 @@ async def _wrapped_iteration(self): except Exception as e: # Error during streaming logger.debug(f"AsyncStreamWrapper: Error during streaming: {e}") - self._finalize(error=e) + error = e raise finally: # Always finalize, whether completed normally, with error, or closed early - self._finalize() + await self._aclose_stream() + self._finalize(error=error) async def __aenter__(self): """Support async context manager protocol.""" @@ -189,6 +381,7 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): """Ensure finalization on async context exit.""" + await self._aclose_stream() if exc_type is not None: # Exception occurred during iteration self._finalize(error=exc_val) @@ -199,12 +392,35 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): async def aclose(self): """Explicitly close and finalize the async stream.""" + await self._aclose_stream() self._finalize() def close(self): """Synchronous close method for compatibility.""" + self._close_stream() self._finalize() + def _close_stream(self) -> None: + close = getattr(self.stream, "close", None) + if not callable(close): + return + + try: + close() + except Exception as exc: + logger.debug("Error closing LiteLLM async stream: %s", exc) + + async def _aclose_stream(self) -> None: + aclose = getattr(self.stream, "aclose", None) + if callable(aclose): + try: + await aclose() + return + except Exception as exc: + logger.debug("Error closing LiteLLM async stream: %s", exc) + + self._close_stream() + def _finalize(self, error: Optional[Exception] = None): """Finalize the span with data from last chunk.""" if self._finalized: @@ -213,7 +429,8 @@ def _finalize(self, error: Optional[Exception] = None): self._finalized = True try: # Call the callback with only the last chunk - # Note: The callback is responsible for calling handler.stop_llm() or handler.fail_llm() + # Note: The callback is responsible for calling handler.stop_llm() + # or handler.fail_llm(). # which will end the span. We no longer call span.end() here. if self.callback: try: @@ -225,3 +442,9 @@ def _finalize(self, error: Optional[Exception] = None): self.last_chunk = None except Exception as e: logger.debug(f"Error finalizing async stream: {e}") + + def get_output_messages(self) -> list[OutputMessage]: + return self._accumulator.get_output_messages() + + def finish_reasons(self) -> list[str]: + return self._accumulator.finish_reasons() diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py index cbec10a0f..cceebb523 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py @@ -16,9 +16,12 @@ Utility functions for LiteLLM instrumentation. """ +import inspect import json import logging -from typing import Any, Dict, List, Optional +from collections.abc import Callable, Mapping +from typing import Any, List, Optional +from urllib.parse import urlparse from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import ( GenAiOperationNameValues, @@ -29,6 +32,7 @@ InputMessage, LLMInvocation, OutputMessage, + Reasoning, Text, ToolCall, ToolCallResponse, @@ -36,94 +40,219 @@ logger = logging.getLogger(__name__) +_COMPLETION_POSITIONAL_PARAMETERS = ("model", "messages") +_EMBEDDING_POSITIONAL_PARAMETERS = ("model", "input") +_BASE_URL_PROVIDER_MAP = ( + ("dashscope.aliyuncs.com", "dashscope"), + ("api.openai.com", "openai"), + ("api.deepseek.com", "deepseek"), + ("anthropic.com", "anthropic"), + ("generativelanguage.googleapis.com", "google"), +) +_BASE_URL_KWARG_NAMES = ( + "api_base", + "base_url", + "api_endpoint", + "endpoint", +) +_SYSTEM_INSTRUCTION_ROLES = frozenset(("system", "developer")) -def convert_messages_to_structured_format( - messages: List[Dict[str, Any]], -) -> List[Dict[str, Any]]: - """ - Convert LiteLLM message format to structured format required by semantic conventions. - Converts from: - {"role": "user", "content": "..."} - To: - {"role": "user", "parts": [{"type": "text", "content": "..."}]} - """ - if not isinstance(messages, list): +def get_litellm_value(obj: Any, key: str, default: Any = None) -> Any: + """Read a value from LiteLLM dict, pydantic, or object responses.""" + if obj is None: + return default + if isinstance(obj, Mapping): + return obj.get(key, default) + return getattr(obj, key, default) + + +def normalize_litellm_completion_kwargs( + original_func: Callable[..., Any], + args: tuple[Any, ...], + kwargs: dict[str, Any], +) -> dict[str, Any]: + """Return request kwargs with positional LiteLLM completion args included.""" + return _normalize_litellm_kwargs( + original_func, args, kwargs, _COMPLETION_POSITIONAL_PARAMETERS + ) + + +def normalize_litellm_embedding_kwargs( + original_func: Callable[..., Any], + args: tuple[Any, ...], + kwargs: dict[str, Any], +) -> dict[str, Any]: + """Return request kwargs with positional LiteLLM embedding args included.""" + return _normalize_litellm_kwargs( + original_func, args, kwargs, _EMBEDDING_POSITIONAL_PARAMETERS + ) + + +def _normalize_litellm_kwargs( + original_func: Callable[..., Any], + args: tuple[Any, ...], + kwargs: dict[str, Any], + positional_names: tuple[str, ...], +) -> dict[str, Any]: + normalized = dict(kwargs) + + for name, value in zip(positional_names, args): + normalized.setdefault(name, value) + + try: + signature = inspect.signature(original_func) + bound_arguments = signature.bind_partial(*args, **kwargs).arguments + except (TypeError, ValueError): + return normalized + + extra_kwargs = bound_arguments.pop("kwargs", None) + bound_arguments.pop("args", None) + normalized.update(bound_arguments) + if isinstance(extra_kwargs, Mapping): + normalized.update(extra_kwargs) + return normalized + + +def parse_tool_call_arguments(arguments: Any) -> Any: + """Parse JSON tool-call arguments when LiteLLM returns them as strings.""" + if isinstance(arguments, str) and arguments: + try: + return json.loads(arguments) + except (TypeError, ValueError): + return arguments + return arguments + + +def extract_litellm_text_parts(content: Any) -> list[str]: + """Extract text strings from LiteLLM text or multimodal content.""" + if isinstance(content, str): + return [content] if content else [] + + if not isinstance(content, list): return [] - structured_messages = [] - for msg in messages: - if not isinstance(msg, dict): + text_parts = [] + for item in content: + if isinstance(item, str): + if item: + text_parts.append(item) continue - role = msg.get("role", "") - structured_msg = {"role": role, "parts": []} + if not isinstance(item, Mapping) or item.get("type") != "text": + continue - # Handle text content - if "content" in msg and msg["content"]: - content = msg["content"] - if isinstance(content, str): - structured_msg["parts"].append( - {"type": "text", "content": content} - ) - elif isinstance(content, list): - # Handle multi-modal content - for item in content: - if isinstance(item, dict): - if item.get("type") == "text": - structured_msg["parts"].append( - { - "type": "text", - "content": item.get("text", ""), - } - ) - else: - structured_msg["parts"].append(item) - - # Handle tool calls - if "tool_calls" in msg and msg["tool_calls"]: - for tool_call in msg["tool_calls"]: - if not isinstance(tool_call, dict): - continue - - tool_part = {"type": "tool_call"} - if "id" in tool_call: - tool_part["id"] = tool_call["id"] - if "function" in tool_call: - func = tool_call["function"] - if isinstance(func, dict): - if "name" in func: - tool_part["name"] = func["name"] - if "arguments" in func: - try: - # Try to parse arguments if it's a JSON string - args_str = func["arguments"] - if isinstance(args_str, str): - tool_part["arguments"] = json.loads( - args_str - ) - else: - tool_part["arguments"] = args_str - except Exception: - tool_part["arguments"] = func.get( - "arguments", "" - ) - - structured_msg["parts"].append(tool_part) - - # Handle tool call responses - if role == "tool" and "content" in msg: - tool_response_part = { - "type": "tool_call_response", - "response": msg["content"], - } - if "tool_call_id" in msg: - tool_response_part["id"] = msg["tool_call_id"] - structured_msg["parts"].append(tool_response_part) - - structured_messages.append(structured_msg) - - return structured_messages + text = item.get("text", item.get("content", "")) + if isinstance(text, str) and text: + text_parts.append(text) + + return text_parts + + +def apply_litellm_llm_response_to_invocation( + invocation: LLMInvocation, + response: Any, + *, + include_output_messages: bool = True, +) -> None: + """Populate a GenAI LLMInvocation from a LiteLLM response or stream chunk.""" + if include_output_messages: + output_messages = extract_output_from_litellm_response(response) + if output_messages: + invocation.output_messages = output_messages + + usage = get_litellm_value(response, "usage") + _apply_usage_to_invocation(invocation, usage) + + response_id = get_litellm_value(response, "id") + if response_id: + invocation.response_id = response_id + + response_model = get_litellm_value(response, "model") + if response_model: + invocation.response_model_name = response_model + + finish_reasons = extract_finish_reasons_from_litellm_response(response) + if finish_reasons: + invocation.finish_reasons = finish_reasons + + +def apply_litellm_embedding_response_to_invocation( + invocation: EmbeddingInvocation, + response: Any, +) -> None: + """Populate a GenAI EmbeddingInvocation from a LiteLLM response.""" + response_model = get_litellm_value(response, "model") + if response_model: + invocation.response_model_name = response_model + + usage = get_litellm_value(response, "usage") + _apply_usage_to_invocation(invocation, usage, include_output_tokens=False) + + data = get_litellm_value(response, "data") + if not data: + return + + try: + first_embedding = data[0] + embedding_vector = get_litellm_value(first_embedding, "embedding") + if isinstance(embedding_vector, list): + invocation.dimension_count = len(embedding_vector) + except (IndexError, AttributeError, KeyError, TypeError): + logger.debug("Failed to extract LiteLLM embedding dimension count") + + +def extract_finish_reasons_from_litellm_response(response: Any) -> list[str]: + """Extract non-empty finish reasons from LiteLLM choices.""" + choices = get_litellm_value(response, "choices") or [] + finish_reasons = [] + for choice in choices: + finish_reason = get_litellm_value(choice, "finish_reason") + if finish_reason: + finish_reasons.append(finish_reason) + return finish_reasons + + +def _apply_usage_to_invocation( + invocation: Any, + usage: Any, + *, + include_output_tokens: bool = True, +) -> None: + if not usage: + return + + input_tokens = get_litellm_value(usage, "prompt_tokens") + output_tokens = get_litellm_value(usage, "completion_tokens") + total_tokens = get_litellm_value(usage, "total_tokens") + + if ( + include_output_tokens + and output_tokens is None + and input_tokens is not None + and total_tokens + ): + output_tokens = max(total_tokens - input_tokens, 0) + + if input_tokens is not None: + invocation.input_tokens = input_tokens + if include_output_tokens and output_tokens is not None: + invocation.output_tokens = output_tokens + + prompt_details = get_litellm_value(usage, "prompt_tokens_details") + cached_tokens = get_litellm_value(prompt_details, "cached_tokens") + if cached_tokens is not None and hasattr( + invocation, "usage_cache_read_input_tokens" + ): + invocation.usage_cache_read_input_tokens = cached_tokens + + cache_creation_tokens = get_litellm_value( + prompt_details, "cache_creation_tokens" + ) + if cache_creation_tokens is not None and hasattr( + invocation, "usage_cache_creation_input_tokens" + ): + invocation.usage_cache_creation_input_tokens = cache_creation_tokens def parse_provider_from_model(model: str) -> Optional[str]: @@ -132,7 +261,7 @@ def parse_provider_from_model(model: str) -> Optional[str]: LiteLLM uses format like "openai/gpt-4", "dashscope/qwen-turbo", etc. """ - if not model: + if not model or not isinstance(model, str): return None if "/" in model: @@ -151,6 +280,41 @@ def parse_provider_from_model(model: str) -> Optional[str]: return "unknown" +def parse_provider_from_base_url(base_url: Any) -> Optional[str]: + """Infer provider from known OpenAI-compatible service endpoints.""" + if not base_url or not isinstance(base_url, str): + return None + + try: + host = urlparse(base_url).hostname or base_url + except ValueError: + host = base_url + + host = host.lower() + for fragment, provider in _BASE_URL_PROVIDER_MAP: + if fragment in host: + return provider + return None + + +def resolve_litellm_provider(model: Any, kwargs: Mapping[str, Any]) -> str: + """Resolve the actual GenAI provider for a LiteLLM request.""" + for name in _BASE_URL_KWARG_NAMES: + provider = parse_provider_from_base_url(kwargs.get(name)) + if provider: + return provider + + custom_provider = kwargs.get("custom_llm_provider") + if custom_provider: + return custom_provider + + provider = parse_provider_from_model(model) + if provider not in (None, "unknown"): + return provider + + return provider or "unknown" + + def parse_model_name(model: str) -> str: """ Parse model name by removing provider prefix. @@ -160,7 +324,7 @@ def parse_model_name(model: str) -> str: "dashscope/qwen-turbo" -> "qwen-turbo" "gpt-4" -> "gpt-4" """ - if not model: + if not model or not isinstance(model, str): return "unknown" if "/" in model: @@ -169,34 +333,8 @@ def parse_model_name(model: str) -> str: return model -def safe_json_dumps(obj: Any, default: str = "{}") -> str: - """ - Safely serialize object to JSON string. - """ - try: - return json.dumps(obj, ensure_ascii=False) - except Exception as e: - logger.debug(f"Failed to serialize object to JSON: {e}") - return default - - -def convert_tool_definitions(tools: List[Dict[str, Any]]) -> str: - """ - Convert tool definitions to JSON string format. - """ - if not tools: - return "[]" - - try: - # Tools are typically in format: [{"type": "function", "function": {...}}] - return json.dumps(tools, ensure_ascii=False) - except Exception as e: - logger.debug(f"Failed to convert tool definitions: {e}") - return "[]" - - def convert_litellm_messages_to_genai_format( - messages: List[Dict[str, Any]], + messages: list[dict[str, Any]], ) -> List: """ Convert LiteLLM message format to OpenTelemetry GenAI InputMessage format. @@ -214,52 +352,10 @@ def convert_litellm_messages_to_genai_format( continue role = msg.get("role", "user") - parts = [] - - # Handle text content - if "content" in msg and msg["content"]: - content = msg["content"] - if isinstance(content, str): - parts.append(Text(content=content)) - elif isinstance(content, list): - # Handle multi-modal content - for item in content: - if isinstance(item, dict) and item.get("type") == "text": - parts.append(Text(content=item.get("text", ""))) - # Other content types (image, etc.) can be added here - - # Handle tool calls - if "tool_calls" in msg and msg["tool_calls"]: - for tool_call in msg["tool_calls"]: - if not isinstance(tool_call, dict): - continue - - func = tool_call.get("function", {}) - if isinstance(func, dict): - # Parse arguments if it's a JSON string - arguments = func.get("arguments", "") - if isinstance(arguments, str) and arguments: - try: - arguments = json.loads(arguments) - except Exception: - # If arguments are not valid JSON, keep the original string - pass - - parts.append( - ToolCall( - id=tool_call.get("id"), - name=func.get("name", ""), - arguments=arguments, - ) - ) + if role in _SYSTEM_INSTRUCTION_ROLES: + continue - # Handle tool call responses - if role == "tool" and "content" in msg: - parts.append( - ToolCallResponse( - id=msg.get("tool_call_id"), response=msg["content"] - ) - ) + parts = _extract_message_parts(msg, role) # If no parts added, add empty text if not parts: @@ -270,6 +366,64 @@ def convert_litellm_messages_to_genai_format( return input_messages +def extract_system_instruction_from_litellm_messages( + messages: list[dict[str, Any]], +) -> list: + """Extract system/developer instructions from LiteLLM messages.""" + if not isinstance(messages, list): + return [] + + system_instruction = [] + for msg in messages: + if not isinstance(msg, dict): + continue + + if msg.get("role") not in _SYSTEM_INSTRUCTION_ROLES: + continue + + for text in extract_litellm_text_parts(msg.get("content")): + system_instruction.append(Text(content=text)) + + return system_instruction + + +def _extract_message_parts(msg: Mapping[str, Any], role: str) -> list: + parts = [] + + for text in extract_litellm_text_parts(msg.get("content")): + parts.append(Text(content=text)) + + # Handle tool calls + if "tool_calls" in msg and msg["tool_calls"]: + for tool_call in msg["tool_calls"]: + if not isinstance(tool_call, Mapping): + continue + + func = tool_call.get("function", {}) + if isinstance(func, Mapping): + arguments = parse_tool_call_arguments( + func.get("arguments", "") + ) + + parts.append( + ToolCall( + id=tool_call.get("id"), + name=func.get("name", ""), + arguments=arguments, + ) + ) + + # Handle tool call responses + if role == "tool" and "content" in msg: + parts.append( + ToolCallResponse( + id=msg.get("tool_call_id"), response=msg["content"] + ) + ) + + return parts + + def extract_output_from_litellm_response(response: Any) -> List: """ Extract output messages from LiteLLM response. @@ -277,50 +431,54 @@ def extract_output_from_litellm_response(response: Any) -> List: Converts LiteLLM response to OpenTelemetry GenAI OutputMessage format. """ - if not hasattr(response, "choices") or not response.choices: + choices = get_litellm_value(response, "choices") or [] + if not choices: return [] output_messages = [] - for choice in response.choices: - if not hasattr(choice, "message"): - continue - - msg = choice.message + for choice in choices: + msg = get_litellm_value(choice, "message") parts = [] + role = "assistant" + + if msg is not None: + role = get_litellm_value(msg, "role", "assistant") + + reasoning_content = get_litellm_value(msg, "reasoning_content") + for text in extract_litellm_text_parts(reasoning_content): + parts.append(Reasoning(content=text)) + + # Extract text content + content = get_litellm_value(msg, "content") + for text in extract_litellm_text_parts(content): + parts.append(Text(content=text)) + + # Extract tool calls + tool_calls = get_litellm_value(msg, "tool_calls") + if tool_calls: + for tc in tool_calls: + function = get_litellm_value(tc, "function") + arguments = parse_tool_call_arguments( + get_litellm_value(function, "arguments", "") + ) - # Extract text content - if hasattr(msg, "content") and msg.content: - parts.append(Text(content=msg.content)) - - # Extract tool calls - if hasattr(msg, "tool_calls") and msg.tool_calls: - for tc in msg.tool_calls: - # Parse arguments if it's a JSON string - arguments = getattr(tc.function, "arguments", "") - if isinstance(arguments, str) and arguments: - try: - arguments = json.loads(arguments) - except Exception: - # If arguments are not valid JSON, keep the original string - pass - - parts.append( - ToolCall( - id=getattr(tc, "id", None), - name=getattr(tc.function, "name", ""), - arguments=arguments, + parts.append( + ToolCall( + id=get_litellm_value(tc, "id"), + name=get_litellm_value(function, "name", ""), + arguments=arguments, + ) ) - ) # If no parts, add empty text if not parts: parts.append(Text(content="")) - finish_reason = getattr(choice, "finish_reason", "stop") or "stop" + finish_reason = get_litellm_value(choice, "finish_reason") or "stop" output_messages.append( OutputMessage( - role=getattr(msg, "role", "assistant"), + role=role, parts=parts, finish_reason=finish_reason, ) @@ -333,9 +491,11 @@ def create_llm_invocation_from_litellm(**kwargs): """ Create LLMInvocation from LiteLLM request parameters. + The provider is resolved from known base URLs, custom_llm_provider, or the + model name. + Args: model: The model name (e.g., "gpt-4", "openai/gpt-4") - provider: The provider name (e.g., "openai", "dashscope") messages: List of message dictionaries **kwargs: Additional request parameters (temperature, max_tokens, etc.) @@ -345,20 +505,25 @@ def create_llm_invocation_from_litellm(**kwargs): # Parse model name (remove provider prefix if present) model = kwargs.get("model", "unknown_model") - provider = parse_provider_from_model(model) or "unknown" + provider = resolve_litellm_provider(model, kwargs) messages = kwargs.get("messages", []) # Convert messages to GenAI format input_messages = convert_litellm_messages_to_genai_format(messages) + system_instruction = extract_system_instruction_from_litellm_messages( + messages + ) request_model = parse_model_name(model) invocation = LLMInvocation( request_model=request_model, - provider=provider or "unknown", + provider=provider, operation_name=GenAiOperationNameValues.CHAT.value, input_messages=input_messages, ) + if system_instruction: + invocation.system_instruction = system_instruction # Set optional request parameters if "temperature" in kwargs and kwargs["temperature"] is not None: @@ -376,6 +541,14 @@ def create_llm_invocation_from_litellm(**kwargs): invocation.presence_penalty = kwargs["presence_penalty"] if "seed" in kwargs and kwargs["seed"] is not None: invocation.seed = kwargs["seed"] + if "n" in kwargs and kwargs["n"] is not None: + invocation.choice_count = kwargs["n"] + if "top_k" in kwargs and kwargs["top_k"] is not None: + invocation.top_k = kwargs["top_k"] + if "response_format" in kwargs and kwargs["response_format"] is not None: + response_format = kwargs["response_format"] + if isinstance(response_format, Mapping): + invocation.output_type = response_format.get("type") if "stop" in kwargs and kwargs["stop"] is not None: stop = kwargs["stop"] if isinstance(stop, str): @@ -407,9 +580,11 @@ def create_embedding_invocation_from_litellm(**kwargs): """ Create EmbeddingInvocation from LiteLLM embedding request parameters. + The provider is resolved from known base URLs, custom_llm_provider, or the + model name. + Args: model: The embedding model name - provider: The provider name **kwargs: Additional request parameters Returns: @@ -418,14 +593,14 @@ def create_embedding_invocation_from_litellm(**kwargs): # Extract request parameters model = kwargs.get("model", "unknown") - provider = parse_provider_from_model(model) or "unknown" + provider = resolve_litellm_provider(model, kwargs) # Parse model name (remove provider prefix if present) request_model = parse_model_name(model) invocation = EmbeddingInvocation( request_model=request_model, - provider=provider or "unknown", + provider=provider, ) # Set encoding formats if present diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_wrapper.py index eefbfe47b..f4a5bb9ff 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_wrapper.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_wrapper.py @@ -16,7 +16,6 @@ Wrapper functions for LiteLLM completion instrumentation. """ -import json import logging import os from typing import Any, Callable, Optional @@ -28,15 +27,12 @@ StreamWrapper, ) from opentelemetry.instrumentation.litellm._utils import ( + apply_litellm_llm_response_to_invocation, create_llm_invocation_from_litellm, - extract_output_from_litellm_response, -) -from opentelemetry.util.genai.types import ( - Error, - OutputMessage, - Text, - ToolCall, + extract_finish_reasons_from_litellm_response, + normalize_litellm_completion_kwargs, ) +from opentelemetry.util.genai.types import Error logger = logging.getLogger(__name__) @@ -67,18 +63,21 @@ def __call__(self, *args, **kwargs): if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return self.original_func(*args, **kwargs) - # Extract request parameters - is_stream = kwargs.get("stream", False) + request_kwargs = normalize_litellm_completion_kwargs( + self.original_func, args, kwargs + ) + is_stream = request_kwargs.get("stream", False) # For streaming, enable usage tracking if not explicitly disabled # This ensures we get token usage information in the final chunk - if is_stream and "stream_options" not in kwargs: + if is_stream and "stream_options" not in request_kwargs: kwargs["stream_options"] = {"include_usage": True} + request_kwargs["stream_options"] = kwargs["stream_options"] # For streaming, we need special handling if is_stream: # Create invocation object - invocation = create_llm_invocation_from_litellm(**kwargs) + invocation = create_llm_invocation_from_litellm(**request_kwargs) # Start LLM invocation self._handler.start_llm(invocation) @@ -93,6 +92,7 @@ def __call__(self, *args, **kwargs): stream=response, span=invocation.span, # For TTFT tracking callback=None, + invocation=invocation, ) stream_wrapper.callback = ( lambda span, @@ -113,7 +113,7 @@ def __call__(self, *args, **kwargs): else: # Create invocation object - invocation = create_llm_invocation_from_litellm(**kwargs) + invocation = create_llm_invocation_from_litellm(**request_kwargs) # Start LLM invocation (handler creates and manages span) self._handler.start_llm(invocation) @@ -122,37 +122,7 @@ def __call__(self, *args, **kwargs): # Call original function response = self.original_func(*args, **kwargs) - # Fill response data into invocation - invocation.output_messages = ( - extract_output_from_litellm_response(response) - ) - - # Extract token usage - if hasattr(response, "usage") and response.usage: - invocation.input_tokens = getattr( - response.usage, "prompt_tokens", None - ) - invocation.output_tokens = getattr( - response.usage, "completion_tokens", None - ) - - # Extract response metadata - if hasattr(response, "id"): - invocation.response_id = response.id - if hasattr(response, "model"): - invocation.response_model_name = response.model - - # Extract finish reasons - if hasattr(response, "choices") and response.choices: - finish_reasons = [] - for choice in response.choices: - if ( - hasattr(choice, "finish_reason") - and choice.finish_reason - ): - finish_reasons.append(choice.finish_reason) - if finish_reasons: - invocation.finish_reasons = finish_reasons + apply_litellm_llm_response_to_invocation(invocation, response) # End LLM invocation successfully (handler ends span and records metrics) self._handler.stop_llm(invocation) @@ -183,78 +153,28 @@ def _handle_stream_end_with_handler( ) return - # Construct output message from accumulated content - parts = [] if stream_wrapper and hasattr( - stream_wrapper, "accumulated_content" + stream_wrapper, "get_output_messages" ): - full_content = "".join(stream_wrapper.accumulated_content) - if full_content: - parts.append(Text(content=full_content)) - - # Handle accumulated tool calls if any - if ( - hasattr(stream_wrapper, "accumulated_tool_calls") - and stream_wrapper.accumulated_tool_calls - ): - for tc in stream_wrapper.accumulated_tool_calls: - if hasattr(tc, "function"): - # Parse arguments if it's a JSON string - arguments = getattr(tc.function, "arguments", "") - if isinstance(arguments, str) and arguments: - try: - arguments = json.loads(arguments) - except Exception: - # If arguments are not valid JSON, keep the original string - pass - - parts.append( - ToolCall( - id=getattr(tc, "id", None), - name=getattr(tc.function, "name", ""), - arguments=arguments, - ) - ) - - # If we have parts, create output message - if parts: - invocation.output_messages = [ - OutputMessage( - role="assistant", parts=parts, finish_reason="stop" - ) - ] + output_messages = stream_wrapper.get_output_messages() + if output_messages: + invocation.output_messages = output_messages - # Extract token usage from last chunk - if ( - last_chunk - and hasattr(last_chunk, "usage") - and last_chunk.usage - ): - invocation.input_tokens = getattr( - last_chunk.usage, "prompt_tokens", None - ) - invocation.output_tokens = getattr( - last_chunk.usage, "completion_tokens", None + if last_chunk: + apply_litellm_llm_response_to_invocation( + invocation, + last_chunk, + include_output_messages=False, ) - # Extract response metadata - if last_chunk: - if hasattr(last_chunk, "id"): - invocation.response_id = last_chunk.id - if hasattr(last_chunk, "model"): - invocation.response_model_name = last_chunk.model - - # Extract finish_reason from last chunk's choice - if hasattr(last_chunk, "choices") and last_chunk.choices: - finish_reasons = [] - for choice in last_chunk.choices: - if ( - hasattr(choice, "finish_reason") - and choice.finish_reason - ): - finish_reasons.append(choice.finish_reason) - if finish_reasons: - invocation.finish_reasons = finish_reasons + if stream_wrapper and hasattr(stream_wrapper, "finish_reasons"): + finish_reasons = stream_wrapper.finish_reasons() + else: + finish_reasons = extract_finish_reasons_from_litellm_response( + last_chunk + ) + if finish_reasons: + invocation.finish_reasons = finish_reasons # End LLM invocation successfully self._handler.stop_llm(invocation) @@ -291,17 +211,20 @@ async def __call__(self, *args, **kwargs): if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return await self.original_func(*args, **kwargs) - # Extract request parameters - is_stream = kwargs.get("stream", False) + request_kwargs = normalize_litellm_completion_kwargs( + self.original_func, args, kwargs + ) + is_stream = request_kwargs.get("stream", False) # For streaming, enable usage tracking if not explicitly disabled - if is_stream and "stream_options" not in kwargs: + if is_stream and "stream_options" not in request_kwargs: kwargs["stream_options"] = {"include_usage": True} + request_kwargs["stream_options"] = kwargs["stream_options"] # For streaming, we need special handling if is_stream: # Create invocation object - invocation = create_llm_invocation_from_litellm(**kwargs) + invocation = create_llm_invocation_from_litellm(**request_kwargs) # Start LLM invocation self._handler.start_llm(invocation) @@ -315,6 +238,7 @@ async def __call__(self, *args, **kwargs): stream=response, span=invocation.span, # For TTFT tracking callback=None, + invocation=invocation, ) stream_wrapper.callback = ( lambda span, @@ -336,7 +260,7 @@ async def __call__(self, *args, **kwargs): else: # Non-streaming: use Handler pattern # Create invocation object - invocation = create_llm_invocation_from_litellm(**kwargs) + invocation = create_llm_invocation_from_litellm(**request_kwargs) # Start LLM invocation self._handler.start_llm(invocation) @@ -345,37 +269,7 @@ async def __call__(self, *args, **kwargs): # Call original function response = await self.original_func(*args, **kwargs) - # Fill response data into invocation - invocation.output_messages = ( - extract_output_from_litellm_response(response) - ) - - # Extract token usage - if hasattr(response, "usage") and response.usage: - invocation.input_tokens = getattr( - response.usage, "prompt_tokens", None - ) - invocation.output_tokens = getattr( - response.usage, "completion_tokens", None - ) - - # Extract response metadata - if hasattr(response, "id"): - invocation.response_id = response.id - if hasattr(response, "model"): - invocation.response_model_name = response.model - - # Extract finish reasons - if hasattr(response, "choices") and response.choices: - finish_reasons = [] - for choice in response.choices: - if ( - hasattr(choice, "finish_reason") - and choice.finish_reason - ): - finish_reasons.append(choice.finish_reason) - if finish_reasons: - invocation.finish_reasons = finish_reasons + apply_litellm_llm_response_to_invocation(invocation, response) # End LLM invocation successfully self._handler.stop_llm(invocation) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/conftest.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/conftest.py index 645233a0b..5b9388de6 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/conftest.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/conftest.py @@ -72,7 +72,7 @@ def environment(): ) # Allow capturing message content os.environ.setdefault( - "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "True" + "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "SPAN_ONLY" ) litellm.telemetry = False diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py new file mode 100644 index 000000000..0f0c02d9e --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py @@ -0,0 +1,637 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import json +from types import SimpleNamespace + +import litellm + +from opentelemetry import context as otel_context +from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor + + +def _chat_response(model: str, content: str): + return SimpleNamespace( + id=f"chatcmpl-{model}", + model=model, + choices=[ + SimpleNamespace( + message=SimpleNamespace( + role="assistant", + content=content, + tool_calls=None, + ), + finish_reason="stop", + ) + ], + usage=SimpleNamespace( + prompt_tokens=4, + completion_tokens=3, + total_tokens=7, + ), + ) + + +def _embedding_response(model: str): + return SimpleNamespace( + id=f"embd-{model}", + model=model, + data=[{"embedding": [0.1, 0.2, 0.3]}], + usage=SimpleNamespace( + prompt_tokens=5, + total_tokens=5, + ), + ) + + +def _chunk(choices, usage=None): + return SimpleNamespace( + id="chatcmpl-stream", + model="qwen-turbo", + choices=choices, + usage=usage, + ) + + +def _choice( + index, + content=None, + finish_reason=None, + tool_calls=None, + reasoning_content=None, +): + return SimpleNamespace( + index=index, + delta=SimpleNamespace( + content=content, + reasoning_content=reasoning_content, + tool_calls=tool_calls, + ), + finish_reason=finish_reason, + ) + + +def _tool_delta(index, tool_call_id=None, name=None, arguments=None): + return SimpleNamespace( + index=index, + id=tool_call_id, + function=SimpleNamespace(name=name, arguments=arguments), + ) + + +class _ClosableIterator: + def __init__(self, chunks): + self._iterator = iter(chunks) + self.closed = False + + def __iter__(self): + return self + + def __next__(self): + return next(self._iterator) + + def close(self): + self.closed = True + + +class _AsyncClosableStream: + def __init__(self, chunks): + self._chunks = list(chunks) + self._index = 0 + self.closed = False + + def __aiter__(self): + return self + + async def __anext__(self): + if self._index >= len(self._chunks): + raise StopAsyncIteration + + chunk = self._chunks[self._index] + self._index += 1 + return chunk + + async def aclose(self): + self.closed = True + + +def test_completion_positional_args_feed_genai_invocation( + monkeypatch, tracer_provider, span_exporter +): + def fake_completion(model, messages, **kwargs): + assert model == "qwen-turbo" + assert messages[0]["content"] == "hello" + assert kwargs["temperature"] == 0.2 + return _chat_response(model, "hello back") + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + litellm.completion( + "qwen-turbo", + [{"role": "user", "content": "hello"}], + temperature=0.2, + ) + finally: + instrumentor.uninstrument() + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.attributes["gen_ai.span.kind"] == "LLM" + assert span.attributes["gen_ai.provider.name"] == "dashscope" + assert span.attributes["gen_ai.request.model"] == "qwen-turbo" + + input_messages = json.loads(span.attributes["gen_ai.input.messages"]) + assert input_messages[0]["role"] == "user" + assert input_messages[0]["parts"][0]["content"] == "hello" + + +def test_provider_prefers_custom_provider_over_model_heuristic_and_system_split( + monkeypatch, tracer_provider, span_exporter +): + def fake_completion(model, messages, **kwargs): + assert model == "gpt-4" + assert kwargs["custom_llm_provider"] == "azure" + assert messages[0]["role"] == "system" + return _chat_response(model, "azure response") + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + litellm.completion( + model="gpt-4", + custom_llm_provider="azure", + messages=[ + {"role": "system", "content": "system rules"}, + {"role": "developer", "content": "developer rules"}, + {"role": "user", "content": "hello"}, + ], + ) + finally: + instrumentor.uninstrument() + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.attributes["gen_ai.provider.name"] == "azure" + + input_messages = json.loads(span.attributes["gen_ai.input.messages"]) + assert [message["role"] for message in input_messages] == ["user"] + + system_instructions = json.loads( + span.attributes["gen_ai.system_instructions"] + ) + assert [part["content"] for part in system_instructions] == [ + "system rules", + "developer rules", + ] + + +def test_provider_prefers_known_base_url_over_custom_adapter( + monkeypatch, tracer_provider, span_exporter +): + def fake_completion(model, messages, **kwargs): + assert model == "custom-compatible-model" + assert kwargs["custom_llm_provider"] == "openai" + assert "dashscope.aliyuncs.com" in kwargs["api_base"] + return _chat_response(model, "compatible response") + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + litellm.completion( + model="custom-compatible-model", + custom_llm_provider="openai", + api_base="https://dashscope.aliyuncs.com/compatible-mode/v1", + messages=[{"role": "user", "content": "hello"}], + ) + finally: + instrumentor.uninstrument() + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].attributes["gen_ai.provider.name"] == "dashscope" + + +def test_completion_usage_falls_back_to_total_minus_prompt_tokens( + monkeypatch, tracer_provider, span_exporter +): + def fake_completion(model, messages, **kwargs): + assert model == "qwen-turbo" + return SimpleNamespace( + id="chatcmpl-fallback-usage", + model=model, + choices=[ + SimpleNamespace( + message=SimpleNamespace( + role="assistant", + content="fallback usage", + tool_calls=None, + ), + finish_reason="stop", + ) + ], + usage=SimpleNamespace(prompt_tokens=4, total_tokens=9), + ) + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + litellm.completion( + model="qwen-turbo", + messages=[{"role": "user", "content": "hello"}], + ) + finally: + instrumentor.uninstrument() + + span = span_exporter.get_finished_spans()[0] + assert span.attributes["gen_ai.usage.input_tokens"] == 4 + assert span.attributes["gen_ai.usage.output_tokens"] == 5 + assert span.attributes["gen_ai.usage.total_tokens"] == 9 + + +def test_suppressed_instrumentation_skips_completion_span( + monkeypatch, tracer_provider, span_exporter +): + def fake_completion(model, messages, **kwargs): + return _chat_response(model, "not traced") + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + token = None + instrumentor.instrument(tracer_provider=tracer_provider) + try: + ctx = otel_context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True) + token = otel_context.attach(ctx) + litellm.completion( + model="qwen-turbo", + messages=[{"role": "user", "content": "hello"}], + ) + finally: + if token is not None: + otel_context.detach(token) + instrumentor.uninstrument() + + assert not span_exporter.get_finished_spans() + + +def test_no_content_mode_omits_messages_but_keeps_metadata( + monkeypatch, tracer_provider, span_exporter +): + monkeypatch.setenv( + "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "NO_CONTENT" + ) + + def fake_completion(model, messages, **kwargs): + return _chat_response(model, "content hidden") + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + litellm.completion( + model="qwen-turbo", + messages=[ + {"role": "system", "content": "secret system"}, + {"role": "user", "content": "secret user"}, + ], + ) + finally: + instrumentor.uninstrument() + + span = span_exporter.get_finished_spans()[0] + assert span.attributes["gen_ai.span.kind"] == "LLM" + assert span.attributes["gen_ai.request.model"] == "qwen-turbo" + assert "gen_ai.input.messages" not in span.attributes + assert "gen_ai.output.messages" not in span.attributes + assert "gen_ai.system_instructions" not in span.attributes + + +def test_embedding_usage_records_input_tokens_only( + monkeypatch, tracer_provider, span_exporter +): + def fake_embedding(model, input_, **kwargs): + assert model == "text-embedding-v1" + assert input_ == "embed me" + return _embedding_response(model) + + monkeypatch.setattr(litellm, "embedding", fake_embedding) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + litellm.embedding( + "text-embedding-v1", + "embed me", + custom_llm_provider="openai", + api_base="https://dashscope.aliyuncs.com/compatible-mode/v1", + ) + finally: + instrumentor.uninstrument() + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.attributes["gen_ai.span.kind"] == "EMBEDDING" + assert span.attributes["gen_ai.provider.name"] == "dashscope" + assert span.attributes["gen_ai.usage.input_tokens"] == 5 + assert span.attributes["gen_ai.usage.total_tokens"] == 5 + assert "gen_ai.usage.output_tokens" not in span.attributes + assert span.attributes["gen_ai.embeddings.dimension.count"] == 3 + + +def test_streaming_completion_records_ttft_choices_and_tool_calls( + monkeypatch, tracer_provider, span_exporter +): + chunks = [ + _chunk( + [ + _choice(0, content="hel"), + _choice(1, content="bon"), + ] + ), + _chunk( + [ + _choice( + 0, + content="lo", + tool_calls=[ + _tool_delta( + 0, + tool_call_id="call_1", + name="lookup", + arguments='{"q":', + ) + ], + ), + _choice(1, content="jour"), + ] + ), + _chunk( + [ + _choice( + 0, + tool_calls=[_tool_delta(0, arguments={"ignored": True})], + ), + ] + ), + _chunk( + [ + _choice( + 0, + finish_reason="tool_calls", + tool_calls=[_tool_delta(0, arguments='"weather"}')], + ), + _choice(1, finish_reason="stop"), + ], + usage=SimpleNamespace( + prompt_tokens=6, + completion_tokens=5, + total_tokens=11, + ), + ), + ] + + def fake_completion(*args, **kwargs): + assert kwargs["stream"] is True + assert kwargs["stream_options"] == {"include_usage": True} + return iter(chunks) + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + response = litellm.completion( + model="qwen-turbo", + messages=[{"role": "user", "content": "stream please"}], + stream=True, + n=2, + ) + assert len(list(response)) == 4 + finally: + instrumentor.uninstrument() + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert "gen_ai.response.time_to_first_token" in span.attributes + assert span.attributes["gen_ai.request.choice.count"] == 2 + assert span.attributes["gen_ai.usage.input_tokens"] == 6 + assert span.attributes["gen_ai.usage.output_tokens"] == 5 + + output_messages = json.loads(span.attributes["gen_ai.output.messages"]) + assert len(output_messages) == 2 + assert output_messages[0]["parts"][0]["content"] == "hello" + assert output_messages[1]["parts"][0]["content"] == "bonjour" + tool_call = output_messages[0]["parts"][1] + assert tool_call["type"] == "tool_call" + assert tool_call["id"] == "call_1" + assert tool_call["name"] == "lookup" + assert tool_call["arguments"] == {"q": "weather"} + + +def test_streaming_reasoning_multimodal_content_and_empty_choice( + monkeypatch, tracer_provider, span_exporter +): + chunks = [ + _chunk( + [ + _choice(0, reasoning_content="thinking"), + _choice(1, finish_reason="stop"), + ] + ), + _chunk([_choice(0, content={"unexpected": True})]), + _chunk( + [ + _choice( + 0, + content=[ + {"type": "text", "text": "hello"}, + {"type": "image_url", "image_url": {"url": "x"}}, + " world", + ], + finish_reason="stop", + ) + ], + usage=SimpleNamespace( + prompt_tokens=3, + completion_tokens=2, + total_tokens=5, + ), + ), + ] + + def fake_completion(*args, **kwargs): + assert kwargs["stream"] is True + return iter(chunks) + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + response = litellm.completion( + model="qwen-turbo", + messages=[{"role": "user", "content": "reason"}], + stream=True, + n=2, + ) + assert len(list(response)) == 3 + finally: + instrumentor.uninstrument() + + span = span_exporter.get_finished_spans()[0] + assert "gen_ai.response.time_to_first_token" in span.attributes + output_messages = json.loads(span.attributes["gen_ai.output.messages"]) + assert len(output_messages) == 2 + assert output_messages[0]["parts"][0] == { + "content": "thinking", + "type": "reasoning", + } + assert output_messages[0]["parts"][1] == { + "content": "hello world", + "type": "text", + } + assert output_messages[1]["parts"] == [{"content": "", "type": "text"}] + + +def test_streaming_close_closes_underlying_stream_and_finalizes( + monkeypatch, tracer_provider, span_exporter +): + stream = _ClosableIterator( + [ + _chunk([_choice(0, content="partial")]), + _chunk([_choice(0, content=" ignored", finish_reason="stop")]), + ] + ) + + def fake_completion(*args, **kwargs): + assert kwargs["stream"] is True + return stream + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + response = litellm.completion( + model="qwen-turbo", + messages=[{"role": "user", "content": "stream"}], + stream=True, + ) + next(response) + response.close() + finally: + instrumentor.uninstrument() + + assert stream.closed is True + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + output_messages = json.loads(spans[0].attributes["gen_ai.output.messages"]) + assert output_messages[0]["parts"][0]["content"] == "partial" + + +def test_async_streaming_aclose_closes_stream_and_finalizes( + monkeypatch, tracer_provider, span_exporter +): + captured = {} + + async def fake_acompletion(*args, **kwargs): + assert kwargs["stream"] is True + stream = _AsyncClosableStream( + [ + _chunk([_choice(0, content="async partial")]), + _chunk([_choice(0, content=" ignored", finish_reason="stop")]), + ] + ) + captured["stream"] = stream + return stream + + monkeypatch.setattr(litellm, "acompletion", fake_acompletion) + + async def run_call(): + response = await litellm.acompletion( + model="qwen-turbo", + messages=[{"role": "user", "content": "stream"}], + stream=True, + ) + iterator = response.__aiter__() + await iterator.__anext__() + await iterator.aclose() + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + asyncio.run(run_call()) + finally: + instrumentor.uninstrument() + + assert captured["stream"].closed is True + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + output_messages = json.loads(spans[0].attributes["gen_ai.output.messages"]) + assert output_messages[0]["parts"][0]["content"] == "async partial" + + +def test_async_completion_concurrent_calls_keep_separate_spans( + monkeypatch, tracer_provider, span_exporter +): + async def fake_acompletion(model, messages, **kwargs): + await asyncio.sleep(0.01 if model == "qwen-turbo" else 0) + return _chat_response(model, f"reply to {messages[0]['content']}") + + monkeypatch.setattr(litellm, "acompletion", fake_acompletion) + + async def run_calls(): + return await asyncio.gather( + litellm.acompletion( + "qwen-turbo", + [{"role": "user", "content": "first"}], + ), + litellm.acompletion( + "qwen-plus", + [{"role": "user", "content": "second"}], + ), + ) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + asyncio.run(run_calls()) + finally: + instrumentor.uninstrument() + + spans = span_exporter.get_finished_spans() + assert len(spans) == 2 + observed = { + json.loads(span.attributes["gen_ai.input.messages"])[0]["parts"][0][ + "content" + ]: span.attributes["gen_ai.request.model"] + for span in spans + } + assert observed == {"first": "qwen-turbo", "second": "qwen-plus"} diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_sync_completion.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_sync_completion.py index b49b8890c..9c678bed0 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_sync_completion.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_sync_completion.py @@ -193,16 +193,24 @@ def test_sync_completion_with_multiple_messages(self): self.assertEqual(len(spans), 1) span = spans[0] - # Verify all messages captured in sequence + # Verify system instructions are separated from input messages. self.assertIn("gen_ai.input.messages", span.attributes) input_messages = json.loads( span.attributes.get("gen_ai.input.messages") ) - self.assertEqual(len(input_messages), 4) - self.assertEqual(input_messages[0]["role"], "system") - self.assertEqual(input_messages[1]["role"], "user") - self.assertEqual(input_messages[2]["role"], "assistant") - self.assertEqual(input_messages[3]["role"], "user") + self.assertEqual(len(input_messages), 3) + self.assertEqual(input_messages[0]["role"], "user") + self.assertEqual(input_messages[1]["role"], "assistant") + self.assertEqual(input_messages[2]["role"], "user") + + self.assertIn("gen_ai.system_instructions", span.attributes) + system_instructions = json.loads( + span.attributes.get("gen_ai.system_instructions") + ) + self.assertEqual( + system_instructions[0]["content"], + "You are a helpful assistant that provides concise answers.", + ) output_messages = json.loads( span.attributes.get("gen_ai.output.messages")