From be67e0b0e9dc2bb5b0165b2751f3ac1a052ddfe3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=81=E5=B1=BF?= Date: Thu, 21 May 2026 00:01:29 +0800 Subject: [PATCH 1/4] Improve LiteLLM GenAI util instrumentation --- .../CHANGELOG.md | 6 + .../README.rst | 29 ++- .../examples/litellm_genai_smoke.py | 120 +++++++++ .../pyproject.toml | 6 +- .../litellm/_embedding_wrapper.py | 94 ++----- .../litellm/_stream_wrapper.py | 207 +++++++++++++-- .../instrumentation/litellm/_utils.py | 239 +++++++++++++++--- .../instrumentation/litellm/_wrapper.py | 188 +++----------- .../tests/conftest.py | 2 +- .../tests/test_genai_util_wrapper.py | 226 +++++++++++++++++ 10 files changed, 827 insertions(+), 290 deletions(-) create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/examples/litellm_genai_smoke.py create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/CHANGELOG.md b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/CHANGELOG.md index 3b4c6095e..1fdb3125a 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/CHANGELOG.md +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Changed + +- Improved LiteLLM GenAI util invocation mapping for positional arguments, + streaming time-to-first-token, multi-choice outputs, tool-call deltas, and + real smoke examples. + ## Version 0.5.0 (2026-05-11) There are no changelog entries for this release. diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst index 0f940f728..498a6aed9 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst @@ -25,6 +25,8 @@ Configuration The instrumentation can be enabled/disabled using environment variables: * ``ENABLE_LITELLM_INSTRUMENTOR``: Enable/disable instrumentation (default: true) +* ``OTEL_SEMCONV_STABILITY_OPT_IN=gen_ai_latest_experimental``: Enable GenAI semantic conventions +* ``OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT``: Set to ``NO_CONTENT``, ``SPAN_ONLY``, ``EVENT_ONLY``, or ``SPAN_AND_EVENT`` Usage ----- @@ -43,6 +45,32 @@ Usage messages=[{"role": "user", "content": "Hello!"}] ) +Local OTLP smoke +---------------- + +The ``examples/litellm_genai_smoke.py`` script sends real LiteLLM traffic for: + +* non-streaming completion +* streaming completion +* concurrent async completion calls + +Set ``LITELLM_SMOKE_MODE`` to ``non_streaming``, ``streaming``, +``concurrent``, or ``all`` (default) to run a subset. + +Example with a local ``otel-gui`` OTLP endpoint: + +.. code:: console + + export DASHSCOPE_API_KEY=... + export OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:4318 + export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf + export OTEL_SEMCONV_STABILITY_OPT_IN=gen_ai_latest_experimental + export OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=SPAN_ONLY + export OTEL_SERVICE_NAME=loongsuite-litellm-smoke + + loongsuite-instrument python \ + instrumentation-loongsuite/loongsuite-instrumentation-litellm/examples/litellm_genai_smoke.py + Features -------- @@ -65,4 +93,3 @@ References * `OpenTelemetry LiteLLM Instrumentation `_ * `OpenTelemetry Project `_ * `LiteLLM Documentation `_ - diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/examples/litellm_genai_smoke.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/examples/litellm_genai_smoke.py new file mode 100644 index 000000000..24fb5bddd --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/examples/litellm_genai_smoke.py @@ -0,0 +1,120 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Real LiteLLM smoke traffic for LoongSuite GenAI telemetry. + +Run this under ``loongsuite-instrument`` with OTLP configured. The script +exercises non-streaming, streaming, and concurrent async completion calls. +""" + +from __future__ import annotations + +import asyncio +import os + +import litellm + +MODEL = os.getenv("LITELLM_MODEL", "qwen-turbo") +API_BASE = os.getenv( + "LITELLM_API_BASE", + "https://dashscope.aliyuncs.com/compatible-mode/v1", +) + + +def _configure_provider() -> None: + if os.getenv("DASHSCOPE_API_KEY") and not os.getenv("OPENAI_API_KEY"): + os.environ["OPENAI_API_KEY"] = os.environ["DASHSCOPE_API_KEY"] + + os.environ.setdefault("OPENAI_API_BASE", API_BASE) + os.environ.setdefault("DASHSCOPE_API_BASE", API_BASE) + litellm.telemetry = False + + +def run_non_streaming() -> None: + response = litellm.completion( + model=MODEL, + custom_llm_provider="openai", + messages=[ + { + "role": "user", + "content": "Reply with exactly one short sentence.", + } + ], + temperature=0.1, + max_tokens=64, + ) + print("non_streaming:", response.choices[0].message.content[:80]) + + +def run_streaming() -> None: + stream = litellm.completion( + model=MODEL, + custom_llm_provider="openai", + messages=[ + { + "role": "user", + "content": "Count from one to five, separated by commas.", + } + ], + stream=True, + temperature=0.1, + max_tokens=64, + ) + + chunks = [] + for chunk in stream: + if chunk.choices: + delta = chunk.choices[0].delta + if getattr(delta, "content", None): + chunks.append(delta.content) + print("streaming:", "".join(chunks)[:80]) + + +async def run_concurrent() -> None: + prompts = [ + "Give one word for sky color.", + "Give one word for ocean color.", + "Give one word for grass color.", + ] + + async def call(prompt: str): + return await litellm.acompletion( + model=MODEL, + custom_llm_provider="openai", + messages=[{"role": "user", "content": prompt}], + temperature=0.1, + max_tokens=32, + ) + + responses = await asyncio.gather(*(call(prompt) for prompt in prompts)) + print( + "concurrent:", + ", ".join(response.choices[0].message.content[:24] for response in responses), + ) + + +def main() -> None: + _configure_provider() + mode = os.getenv("LITELLM_SMOKE_MODE", "all").lower() + + if mode in ("all", "non_streaming"): + run_non_streaming() + if mode in ("all", "streaming"): + run_streaming() + if mode in ("all", "concurrent"): + asyncio.run(run_concurrent()) + + +if __name__ == "__main__": + main() diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/pyproject.toml b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/pyproject.toml index fbd8ae831..3535c16b2 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/pyproject.toml +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/pyproject.toml @@ -41,18 +41,18 @@ instruments = [ litellm = "opentelemetry.instrumentation.litellm:LiteLLMInstrumentor" [project.urls] -Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation/opentelemetry-instrumentation-litellm" -Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" +Homepage = "https://github.com/alibaba/loongsuite-python-agent/tree/main/instrumentation-loongsuite/loongsuite-instrumentation-litellm" +Repository = "https://github.com/alibaba/loongsuite-python-agent" [tool.hatch.version] path = "src/opentelemetry/instrumentation/litellm/version.py" [tool.hatch.build.targets.sdist] include = [ + "/examples", "/src", "/tests", ] [tool.hatch.build.targets.wheel] packages = ["src/opentelemetry"] - diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py index cda320946..a3cc2bcda 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py @@ -23,7 +23,9 @@ from opentelemetry import context from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY from opentelemetry.instrumentation.litellm._utils import ( + apply_litellm_embedding_response_to_invocation, create_embedding_invocation_from_litellm, + normalize_litellm_embedding_kwargs, ) from opentelemetry.util.genai.types import Error @@ -53,8 +55,10 @@ def __call__(self, *args, **kwargs): if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return self.original_func(*args, **kwargs) - # Create invocation object - invocation = create_embedding_invocation_from_litellm(**kwargs) + request_kwargs = normalize_litellm_embedding_kwargs( + self.original_func, args, kwargs + ) + invocation = create_embedding_invocation_from_litellm(**request_kwargs) # Start Embedding invocation self._handler.start_embedding(invocation) @@ -63,43 +67,9 @@ def __call__(self, *args, **kwargs): # Call original function response = self.original_func(*args, **kwargs) - # Extract response metadata - if hasattr(response, "model"): - invocation.response_model_name = response.model - - # Extract token usage if available - if hasattr(response, "usage") and response.usage: - invocation.input_tokens = getattr( - response.usage, "prompt_tokens", None - ) - invocation.output_tokens = getattr( - response.usage, "total_tokens", None - ) - - # Extract embedding dimension count - if ( - hasattr(response, "data") - and response.data - and len(response.data) > 0 - ): - try: - first_embedding = response.data[0] - # Handle dict response - if ( - isinstance(first_embedding, dict) - and "embedding" in first_embedding - ): - embedding_vector = first_embedding["embedding"] - if isinstance(embedding_vector, list): - invocation.dimension_count = len(embedding_vector) - # Handle object response - elif hasattr(first_embedding, "embedding"): - embedding_vector = first_embedding.embedding - if isinstance(embedding_vector, list): - invocation.dimension_count = len(embedding_vector) - except (IndexError, AttributeError, KeyError, TypeError): - # If we can't extract dimension, just skip it - pass + apply_litellm_embedding_response_to_invocation( + invocation, response + ) # End Embedding invocation successfully self._handler.stop_embedding(invocation) @@ -131,8 +101,10 @@ async def __call__(self, *args, **kwargs): if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return await self.original_func(*args, **kwargs) - # Create invocation object - invocation = create_embedding_invocation_from_litellm(**kwargs) + request_kwargs = normalize_litellm_embedding_kwargs( + self.original_func, args, kwargs + ) + invocation = create_embedding_invocation_from_litellm(**request_kwargs) # Start Embedding invocation self._handler.start_embedding(invocation) @@ -141,43 +113,9 @@ async def __call__(self, *args, **kwargs): # Call original function response = await self.original_func(*args, **kwargs) - # Extract response metadata - if hasattr(response, "model"): - invocation.response_model_name = response.model - - # Extract token usage if available - if hasattr(response, "usage") and response.usage: - invocation.input_tokens = getattr( - response.usage, "prompt_tokens", None - ) - invocation.output_tokens = getattr( - response.usage, "total_tokens", None - ) - - # Extract embedding dimension count - if ( - hasattr(response, "data") - and response.data - and len(response.data) > 0 - ): - try: - first_embedding = response.data[0] - # Handle dict response - if ( - isinstance(first_embedding, dict) - and "embedding" in first_embedding - ): - embedding_vector = first_embedding["embedding"] - if isinstance(embedding_vector, list): - invocation.dimension_count = len(embedding_vector) - # Handle object response - elif hasattr(first_embedding, "embedding"): - embedding_vector = first_embedding.embedding - if isinstance(embedding_vector, list): - invocation.dimension_count = len(embedding_vector) - except (IndexError, AttributeError, KeyError, TypeError): - # If we can't extract dimension, just skip it - pass + apply_litellm_embedding_response_to_invocation( + invocation, response + ) # End Embedding invocation successfully self._handler.stop_embedding(invocation) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py index 82c241ec6..28cf6ab7d 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py @@ -17,11 +17,156 @@ """ import logging +import timeit from typing import Any, Iterator, Optional +from opentelemetry.instrumentation.litellm._utils import ( + get_litellm_value, + parse_tool_call_arguments, +) +from opentelemetry.util.genai.types import OutputMessage, Text, ToolCall + logger = logging.getLogger(__name__) +class _StreamAccumulator: + """Accumulate LiteLLM streaming deltas into GenAI output messages.""" + + def __init__(self, invocation: Any = None): + self.invocation = invocation + self._choice_states: dict[int, dict[str, Any]] = {} + + def record_chunk(self, chunk: Any) -> None: + choices = get_litellm_value(chunk, "choices") or [] + if not choices: + return + + saw_token = False + for default_index, choice in enumerate(choices): + index = get_litellm_value(choice, "index", default_index) + if not isinstance(index, int): + index = default_index + + state = self._choice_states.setdefault( + index, + { + "role": "assistant", + "content": [], + "finish_reason": None, + "tool_calls": {}, + }, + ) + + finish_reason = get_litellm_value(choice, "finish_reason") + if finish_reason: + state["finish_reason"] = finish_reason + + delta = get_litellm_value(choice, "delta") + if delta is None: + continue + + role = get_litellm_value(delta, "role") + if role: + state["role"] = role + + content = get_litellm_value(delta, "content") + if content: + state["content"].append(content) + saw_token = True + + tool_calls = get_litellm_value(delta, "tool_calls") + if tool_calls: + saw_token = True + self._record_tool_calls(state, tool_calls) + + if saw_token and self.invocation is not None: + first_token_time = getattr( + self.invocation, "monotonic_first_token_s", None + ) + if first_token_time is None: + self.invocation.monotonic_first_token_s = ( + timeit.default_timer() + ) + + def get_output_messages(self) -> list[OutputMessage]: + output_messages = [] + for index in sorted(self._choice_states): + state = self._choice_states[index] + parts = [] + content = "".join(state["content"]) + if content: + parts.append(Text(content=content)) + + for tool_index in sorted(state["tool_calls"]): + tool_call = state["tool_calls"][tool_index] + arguments = parse_tool_call_arguments( + tool_call.get("arguments", "") + ) + if ( + tool_call.get("id") + or tool_call.get("name") + or arguments not in (None, "") + ): + parts.append( + ToolCall( + id=tool_call.get("id"), + name=tool_call.get("name", ""), + arguments=arguments, + ) + ) + + if not parts: + continue + + output_messages.append( + OutputMessage( + role=state["role"] or "assistant", + parts=parts, + finish_reason=state["finish_reason"] or "stop", + ) + ) + return output_messages + + def finish_reasons(self) -> list[str]: + finish_reasons = [] + for state in self._choice_states.values(): + if state["finish_reason"]: + finish_reasons.append(state["finish_reason"]) + return finish_reasons + + @staticmethod + def _record_tool_calls( + state: dict[str, Any], tool_calls: list[Any] + ) -> None: + for fallback_index, tool_call in enumerate(tool_calls): + tool_index = get_litellm_value(tool_call, "index", fallback_index) + if not isinstance(tool_index, int): + tool_index = fallback_index + + stored = state["tool_calls"].setdefault( + tool_index, + {"id": None, "name": "", "arguments": ""}, + ) + + tool_id = get_litellm_value(tool_call, "id") + if tool_id: + stored["id"] = tool_id + + function = get_litellm_value(tool_call, "function") + function_name = get_litellm_value(function, "name") + if function_name: + stored["name"] = function_name + + arguments = get_litellm_value(function, "arguments") + if isinstance(arguments, str): + if isinstance(stored["arguments"], str): + stored["arguments"] += arguments + else: + stored["arguments"] = arguments + elif arguments: + stored["arguments"] = arguments + + class StreamWrapper: """ Wrapper for synchronous streaming responses. @@ -31,10 +176,17 @@ class StreamWrapper: Supports context manager protocol for reliable cleanup. """ - def __init__(self, stream: Iterator, span: Any, callback: callable): + def __init__( + self, + stream: Iterator, + span: Any, + callback: callable, + invocation: Any = None, + ): self.stream = stream self.span = span self.callback = callback + self._accumulator = _StreamAccumulator(invocation) self.last_chunk = None # Only keep last chunk to avoid memory leak self.chunk_count = 0 self._finalized = False @@ -48,17 +200,7 @@ def __next__(self): try: chunk = next(self.stream) - # Accumulate content from delta for output messages - if hasattr(chunk, "choices") and chunk.choices: - choice = chunk.choices[0] - if hasattr(choice, "delta"): - delta = choice.delta - # Accumulate text content - if hasattr(delta, "content") and delta.content: - self.accumulated_content.append(delta.content) - # Accumulate tool calls - if hasattr(delta, "tool_calls") and delta.tool_calls: - self.accumulated_tool_calls.extend(delta.tool_calls) + self._accumulator.record_chunk(chunk) # Only keep the last chunk (contains usage info) self.last_chunk = chunk @@ -101,7 +243,8 @@ def _finalize(self, error: Optional[Exception] = None): self._finalized = True try: # Call the callback with only the last chunk - # Note: The callback is responsible for calling handler.stop_llm() or handler.fail_llm() + # Note: The callback is responsible for calling handler.stop_llm() + # or handler.fail_llm(). # which will end the span. We no longer call span.end() here. if self.callback: self.callback(self.span, self.last_chunk, error) @@ -111,6 +254,12 @@ def _finalize(self, error: Optional[Exception] = None): except Exception as e: logger.debug(f"Error finalizing stream: {e}") + def get_output_messages(self) -> list[OutputMessage]: + return self._accumulator.get_output_messages() + + def finish_reasons(self) -> list[str]: + return self._accumulator.finish_reasons() + class AsyncStreamWrapper: """ @@ -125,10 +274,17 @@ class AsyncStreamWrapper: 3. Letting the wrapper detect stream exhaustion """ - def __init__(self, stream, span: Any, callback: callable): + def __init__( + self, + stream, + span: Any, + callback: callable, + invocation: Any = None, + ): self.stream = stream self.span = span self.callback = callback + self._accumulator = _StreamAccumulator(invocation) self.last_chunk = None # Only keep last chunk to avoid memory leak self.chunk_count = 0 self._finalized = False @@ -150,19 +306,7 @@ async def _wrapped_iteration(self): """ try: async for chunk in self.stream: - # Accumulate content from delta for output messages - if hasattr(chunk, "choices") and chunk.choices: - choice = chunk.choices[0] - if hasattr(choice, "delta"): - delta = choice.delta - # Accumulate text content - if hasattr(delta, "content") and delta.content: - self.accumulated_content.append(delta.content) - # Accumulate tool calls - if hasattr(delta, "tool_calls") and delta.tool_calls: - self.accumulated_tool_calls.extend( - delta.tool_calls - ) + self._accumulator.record_chunk(chunk) # Only keep the last chunk (contains usage info) self.last_chunk = chunk @@ -213,7 +357,8 @@ def _finalize(self, error: Optional[Exception] = None): self._finalized = True try: # Call the callback with only the last chunk - # Note: The callback is responsible for calling handler.stop_llm() or handler.fail_llm() + # Note: The callback is responsible for calling handler.stop_llm() + # or handler.fail_llm(). # which will end the span. We no longer call span.end() here. if self.callback: try: @@ -225,3 +370,9 @@ def _finalize(self, error: Optional[Exception] = None): self.last_chunk = None except Exception as e: logger.debug(f"Error finalizing async stream: {e}") + + def get_output_messages(self) -> list[OutputMessage]: + return self._accumulator.get_output_messages() + + def finish_reasons(self) -> list[str]: + return self._accumulator.finish_reasons() diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py index cbec10a0f..c7697d574 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py @@ -16,8 +16,10 @@ Utility functions for LiteLLM instrumentation. """ +import inspect import json import logging +from collections.abc import Callable, Mapping from typing import Any, Dict, List, Optional from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import ( @@ -36,6 +38,171 @@ logger = logging.getLogger(__name__) +_COMPLETION_POSITIONAL_PARAMETERS = ("model", "messages") +_EMBEDDING_POSITIONAL_PARAMETERS = ("model", "input") + + +def get_litellm_value(obj: Any, key: str, default: Any = None) -> Any: + """Read a value from LiteLLM dict, pydantic, or object responses.""" + if obj is None: + return default + if isinstance(obj, Mapping): + return obj.get(key, default) + return getattr(obj, key, default) + + +def normalize_litellm_completion_kwargs( + original_func: Callable[..., Any], + args: tuple[Any, ...], + kwargs: dict[str, Any], +) -> dict[str, Any]: + """Return request kwargs with positional LiteLLM completion args included.""" + return _normalize_litellm_kwargs( + original_func, args, kwargs, _COMPLETION_POSITIONAL_PARAMETERS + ) + + +def normalize_litellm_embedding_kwargs( + original_func: Callable[..., Any], + args: tuple[Any, ...], + kwargs: dict[str, Any], +) -> dict[str, Any]: + """Return request kwargs with positional LiteLLM embedding args included.""" + return _normalize_litellm_kwargs( + original_func, args, kwargs, _EMBEDDING_POSITIONAL_PARAMETERS + ) + + +def _normalize_litellm_kwargs( + original_func: Callable[..., Any], + args: tuple[Any, ...], + kwargs: dict[str, Any], + positional_names: tuple[str, ...], +) -> dict[str, Any]: + normalized = dict(kwargs) + + for name, value in zip(positional_names, args): + normalized.setdefault(name, value) + + try: + signature = inspect.signature(original_func) + bound_arguments = signature.bind_partial(*args, **kwargs).arguments + except (TypeError, ValueError): + return normalized + + extra_kwargs = bound_arguments.pop("kwargs", None) + bound_arguments.pop("args", None) + normalized.update(bound_arguments) + if isinstance(extra_kwargs, Mapping): + normalized.update(extra_kwargs) + return normalized + + +def parse_tool_call_arguments(arguments: Any) -> Any: + """Parse JSON tool-call arguments when LiteLLM returns them as strings.""" + if isinstance(arguments, str) and arguments: + try: + return json.loads(arguments) + except Exception: + return arguments + return arguments + + +def apply_litellm_llm_response_to_invocation( + invocation: LLMInvocation, + response: Any, + *, + include_output_messages: bool = True, +) -> None: + """Populate a GenAI LLMInvocation from a LiteLLM response or stream chunk.""" + if include_output_messages: + output_messages = extract_output_from_litellm_response(response) + if output_messages: + invocation.output_messages = output_messages + + usage = get_litellm_value(response, "usage") + _apply_usage_to_invocation(invocation, usage) + + response_id = get_litellm_value(response, "id") + if response_id: + invocation.response_id = response_id + + response_model = get_litellm_value(response, "model") + if response_model: + invocation.response_model_name = response_model + + finish_reasons = extract_finish_reasons_from_litellm_response(response) + if finish_reasons: + invocation.finish_reasons = finish_reasons + + +def apply_litellm_embedding_response_to_invocation( + invocation: EmbeddingInvocation, + response: Any, +) -> None: + """Populate a GenAI EmbeddingInvocation from a LiteLLM response.""" + response_model = get_litellm_value(response, "model") + if response_model: + invocation.response_model_name = response_model + + usage = get_litellm_value(response, "usage") + _apply_usage_to_invocation(invocation, usage) + + data = get_litellm_value(response, "data") + if not data: + return + + try: + first_embedding = data[0] + embedding_vector = get_litellm_value(first_embedding, "embedding") + if isinstance(embedding_vector, list): + invocation.dimension_count = len(embedding_vector) + except (IndexError, AttributeError, KeyError, TypeError): + logger.debug("Failed to extract LiteLLM embedding dimension count") + + +def extract_finish_reasons_from_litellm_response(response: Any) -> list[str]: + """Extract non-empty finish reasons from LiteLLM choices.""" + choices = get_litellm_value(response, "choices") or [] + finish_reasons = [] + for choice in choices: + finish_reason = get_litellm_value(choice, "finish_reason") + if finish_reason: + finish_reasons.append(finish_reason) + return finish_reasons + + +def _apply_usage_to_invocation(invocation: Any, usage: Any) -> None: + if not usage: + return + + input_tokens = get_litellm_value(usage, "prompt_tokens") + output_tokens = get_litellm_value(usage, "completion_tokens") + total_tokens = get_litellm_value(usage, "total_tokens") + + if output_tokens is None and input_tokens is not None and total_tokens: + output_tokens = max(total_tokens - input_tokens, 0) + + if input_tokens is not None: + invocation.input_tokens = input_tokens + if output_tokens is not None: + invocation.output_tokens = output_tokens + + prompt_details = get_litellm_value(usage, "prompt_tokens_details") + cached_tokens = get_litellm_value(prompt_details, "cached_tokens") + if cached_tokens is not None and hasattr( + invocation, "usage_cache_read_input_tokens" + ): + invocation.usage_cache_read_input_tokens = cached_tokens + + cache_creation_tokens = get_litellm_value( + prompt_details, "cache_creation_tokens" + ) + if cache_creation_tokens is not None and hasattr( + invocation, "usage_cache_creation_input_tokens" + ): + invocation.usage_cache_creation_input_tokens = cache_creation_tokens + def convert_messages_to_structured_format( messages: List[Dict[str, Any]], @@ -132,7 +299,7 @@ def parse_provider_from_model(model: str) -> Optional[str]: LiteLLM uses format like "openai/gpt-4", "dashscope/qwen-turbo", etc. """ - if not model: + if not model or not isinstance(model, str): return None if "/" in model: @@ -160,7 +327,7 @@ def parse_model_name(model: str) -> str: "dashscope/qwen-turbo" -> "qwen-turbo" "gpt-4" -> "gpt-4" """ - if not model: + if not model or not isinstance(model, str): return "unknown" if "/" in model: @@ -236,14 +403,9 @@ def convert_litellm_messages_to_genai_format( func = tool_call.get("function", {}) if isinstance(func, dict): - # Parse arguments if it's a JSON string - arguments = func.get("arguments", "") - if isinstance(arguments, str) and arguments: - try: - arguments = json.loads(arguments) - except Exception: - # If arguments are not valid JSON, keep the original string - pass + arguments = parse_tool_call_arguments( + func.get("arguments", "") + ) parts.append( ToolCall( @@ -277,37 +439,36 @@ def extract_output_from_litellm_response(response: Any) -> List: Converts LiteLLM response to OpenTelemetry GenAI OutputMessage format. """ - if not hasattr(response, "choices") or not response.choices: + choices = get_litellm_value(response, "choices") or [] + if not choices: return [] output_messages = [] - for choice in response.choices: - if not hasattr(choice, "message"): + for choice in choices: + msg = get_litellm_value(choice, "message") + if msg is None: continue - msg = choice.message parts = [] # Extract text content - if hasattr(msg, "content") and msg.content: - parts.append(Text(content=msg.content)) + content = get_litellm_value(msg, "content") + if content: + parts.append(Text(content=content)) # Extract tool calls - if hasattr(msg, "tool_calls") and msg.tool_calls: - for tc in msg.tool_calls: - # Parse arguments if it's a JSON string - arguments = getattr(tc.function, "arguments", "") - if isinstance(arguments, str) and arguments: - try: - arguments = json.loads(arguments) - except Exception: - # If arguments are not valid JSON, keep the original string - pass + tool_calls = get_litellm_value(msg, "tool_calls") + if tool_calls: + for tc in tool_calls: + function = get_litellm_value(tc, "function") + arguments = parse_tool_call_arguments( + get_litellm_value(function, "arguments", "") + ) parts.append( ToolCall( - id=getattr(tc, "id", None), - name=getattr(tc.function, "name", ""), + id=get_litellm_value(tc, "id"), + name=get_litellm_value(function, "name", ""), arguments=arguments, ) ) @@ -316,11 +477,11 @@ def extract_output_from_litellm_response(response: Any) -> List: if not parts: parts.append(Text(content="")) - finish_reason = getattr(choice, "finish_reason", "stop") or "stop" + finish_reason = get_litellm_value(choice, "finish_reason") or "stop" output_messages.append( OutputMessage( - role=getattr(msg, "role", "assistant"), + role=get_litellm_value(msg, "role", "assistant"), parts=parts, finish_reason=finish_reason, ) @@ -345,7 +506,10 @@ def create_llm_invocation_from_litellm(**kwargs): # Parse model name (remove provider prefix if present) model = kwargs.get("model", "unknown_model") - provider = parse_provider_from_model(model) or "unknown" + provider = parse_provider_from_model(model) + if provider in (None, "unknown"): + provider = kwargs.get("custom_llm_provider") or provider + provider = provider or "unknown" messages = kwargs.get("messages", []) # Convert messages to GenAI format @@ -376,6 +540,14 @@ def create_llm_invocation_from_litellm(**kwargs): invocation.presence_penalty = kwargs["presence_penalty"] if "seed" in kwargs and kwargs["seed"] is not None: invocation.seed = kwargs["seed"] + if "n" in kwargs and kwargs["n"] is not None: + invocation.choice_count = kwargs["n"] + if "top_k" in kwargs and kwargs["top_k"] is not None: + invocation.top_k = kwargs["top_k"] + if "response_format" in kwargs and kwargs["response_format"] is not None: + response_format = kwargs["response_format"] + if isinstance(response_format, Mapping): + invocation.output_type = response_format.get("type") if "stop" in kwargs and kwargs["stop"] is not None: stop = kwargs["stop"] if isinstance(stop, str): @@ -418,7 +590,10 @@ def create_embedding_invocation_from_litellm(**kwargs): # Extract request parameters model = kwargs.get("model", "unknown") - provider = parse_provider_from_model(model) or "unknown" + provider = parse_provider_from_model(model) + if provider in (None, "unknown"): + provider = kwargs.get("custom_llm_provider") or provider + provider = provider or "unknown" # Parse model name (remove provider prefix if present) request_model = parse_model_name(model) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_wrapper.py index eefbfe47b..f4a5bb9ff 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_wrapper.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_wrapper.py @@ -16,7 +16,6 @@ Wrapper functions for LiteLLM completion instrumentation. """ -import json import logging import os from typing import Any, Callable, Optional @@ -28,15 +27,12 @@ StreamWrapper, ) from opentelemetry.instrumentation.litellm._utils import ( + apply_litellm_llm_response_to_invocation, create_llm_invocation_from_litellm, - extract_output_from_litellm_response, -) -from opentelemetry.util.genai.types import ( - Error, - OutputMessage, - Text, - ToolCall, + extract_finish_reasons_from_litellm_response, + normalize_litellm_completion_kwargs, ) +from opentelemetry.util.genai.types import Error logger = logging.getLogger(__name__) @@ -67,18 +63,21 @@ def __call__(self, *args, **kwargs): if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return self.original_func(*args, **kwargs) - # Extract request parameters - is_stream = kwargs.get("stream", False) + request_kwargs = normalize_litellm_completion_kwargs( + self.original_func, args, kwargs + ) + is_stream = request_kwargs.get("stream", False) # For streaming, enable usage tracking if not explicitly disabled # This ensures we get token usage information in the final chunk - if is_stream and "stream_options" not in kwargs: + if is_stream and "stream_options" not in request_kwargs: kwargs["stream_options"] = {"include_usage": True} + request_kwargs["stream_options"] = kwargs["stream_options"] # For streaming, we need special handling if is_stream: # Create invocation object - invocation = create_llm_invocation_from_litellm(**kwargs) + invocation = create_llm_invocation_from_litellm(**request_kwargs) # Start LLM invocation self._handler.start_llm(invocation) @@ -93,6 +92,7 @@ def __call__(self, *args, **kwargs): stream=response, span=invocation.span, # For TTFT tracking callback=None, + invocation=invocation, ) stream_wrapper.callback = ( lambda span, @@ -113,7 +113,7 @@ def __call__(self, *args, **kwargs): else: # Create invocation object - invocation = create_llm_invocation_from_litellm(**kwargs) + invocation = create_llm_invocation_from_litellm(**request_kwargs) # Start LLM invocation (handler creates and manages span) self._handler.start_llm(invocation) @@ -122,37 +122,7 @@ def __call__(self, *args, **kwargs): # Call original function response = self.original_func(*args, **kwargs) - # Fill response data into invocation - invocation.output_messages = ( - extract_output_from_litellm_response(response) - ) - - # Extract token usage - if hasattr(response, "usage") and response.usage: - invocation.input_tokens = getattr( - response.usage, "prompt_tokens", None - ) - invocation.output_tokens = getattr( - response.usage, "completion_tokens", None - ) - - # Extract response metadata - if hasattr(response, "id"): - invocation.response_id = response.id - if hasattr(response, "model"): - invocation.response_model_name = response.model - - # Extract finish reasons - if hasattr(response, "choices") and response.choices: - finish_reasons = [] - for choice in response.choices: - if ( - hasattr(choice, "finish_reason") - and choice.finish_reason - ): - finish_reasons.append(choice.finish_reason) - if finish_reasons: - invocation.finish_reasons = finish_reasons + apply_litellm_llm_response_to_invocation(invocation, response) # End LLM invocation successfully (handler ends span and records metrics) self._handler.stop_llm(invocation) @@ -183,78 +153,28 @@ def _handle_stream_end_with_handler( ) return - # Construct output message from accumulated content - parts = [] if stream_wrapper and hasattr( - stream_wrapper, "accumulated_content" + stream_wrapper, "get_output_messages" ): - full_content = "".join(stream_wrapper.accumulated_content) - if full_content: - parts.append(Text(content=full_content)) - - # Handle accumulated tool calls if any - if ( - hasattr(stream_wrapper, "accumulated_tool_calls") - and stream_wrapper.accumulated_tool_calls - ): - for tc in stream_wrapper.accumulated_tool_calls: - if hasattr(tc, "function"): - # Parse arguments if it's a JSON string - arguments = getattr(tc.function, "arguments", "") - if isinstance(arguments, str) and arguments: - try: - arguments = json.loads(arguments) - except Exception: - # If arguments are not valid JSON, keep the original string - pass - - parts.append( - ToolCall( - id=getattr(tc, "id", None), - name=getattr(tc.function, "name", ""), - arguments=arguments, - ) - ) - - # If we have parts, create output message - if parts: - invocation.output_messages = [ - OutputMessage( - role="assistant", parts=parts, finish_reason="stop" - ) - ] + output_messages = stream_wrapper.get_output_messages() + if output_messages: + invocation.output_messages = output_messages - # Extract token usage from last chunk - if ( - last_chunk - and hasattr(last_chunk, "usage") - and last_chunk.usage - ): - invocation.input_tokens = getattr( - last_chunk.usage, "prompt_tokens", None - ) - invocation.output_tokens = getattr( - last_chunk.usage, "completion_tokens", None + if last_chunk: + apply_litellm_llm_response_to_invocation( + invocation, + last_chunk, + include_output_messages=False, ) - # Extract response metadata - if last_chunk: - if hasattr(last_chunk, "id"): - invocation.response_id = last_chunk.id - if hasattr(last_chunk, "model"): - invocation.response_model_name = last_chunk.model - - # Extract finish_reason from last chunk's choice - if hasattr(last_chunk, "choices") and last_chunk.choices: - finish_reasons = [] - for choice in last_chunk.choices: - if ( - hasattr(choice, "finish_reason") - and choice.finish_reason - ): - finish_reasons.append(choice.finish_reason) - if finish_reasons: - invocation.finish_reasons = finish_reasons + if stream_wrapper and hasattr(stream_wrapper, "finish_reasons"): + finish_reasons = stream_wrapper.finish_reasons() + else: + finish_reasons = extract_finish_reasons_from_litellm_response( + last_chunk + ) + if finish_reasons: + invocation.finish_reasons = finish_reasons # End LLM invocation successfully self._handler.stop_llm(invocation) @@ -291,17 +211,20 @@ async def __call__(self, *args, **kwargs): if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return await self.original_func(*args, **kwargs) - # Extract request parameters - is_stream = kwargs.get("stream", False) + request_kwargs = normalize_litellm_completion_kwargs( + self.original_func, args, kwargs + ) + is_stream = request_kwargs.get("stream", False) # For streaming, enable usage tracking if not explicitly disabled - if is_stream and "stream_options" not in kwargs: + if is_stream and "stream_options" not in request_kwargs: kwargs["stream_options"] = {"include_usage": True} + request_kwargs["stream_options"] = kwargs["stream_options"] # For streaming, we need special handling if is_stream: # Create invocation object - invocation = create_llm_invocation_from_litellm(**kwargs) + invocation = create_llm_invocation_from_litellm(**request_kwargs) # Start LLM invocation self._handler.start_llm(invocation) @@ -315,6 +238,7 @@ async def __call__(self, *args, **kwargs): stream=response, span=invocation.span, # For TTFT tracking callback=None, + invocation=invocation, ) stream_wrapper.callback = ( lambda span, @@ -336,7 +260,7 @@ async def __call__(self, *args, **kwargs): else: # Non-streaming: use Handler pattern # Create invocation object - invocation = create_llm_invocation_from_litellm(**kwargs) + invocation = create_llm_invocation_from_litellm(**request_kwargs) # Start LLM invocation self._handler.start_llm(invocation) @@ -345,37 +269,7 @@ async def __call__(self, *args, **kwargs): # Call original function response = await self.original_func(*args, **kwargs) - # Fill response data into invocation - invocation.output_messages = ( - extract_output_from_litellm_response(response) - ) - - # Extract token usage - if hasattr(response, "usage") and response.usage: - invocation.input_tokens = getattr( - response.usage, "prompt_tokens", None - ) - invocation.output_tokens = getattr( - response.usage, "completion_tokens", None - ) - - # Extract response metadata - if hasattr(response, "id"): - invocation.response_id = response.id - if hasattr(response, "model"): - invocation.response_model_name = response.model - - # Extract finish reasons - if hasattr(response, "choices") and response.choices: - finish_reasons = [] - for choice in response.choices: - if ( - hasattr(choice, "finish_reason") - and choice.finish_reason - ): - finish_reasons.append(choice.finish_reason) - if finish_reasons: - invocation.finish_reasons = finish_reasons + apply_litellm_llm_response_to_invocation(invocation, response) # End LLM invocation successfully self._handler.stop_llm(invocation) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/conftest.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/conftest.py index 645233a0b..5b9388de6 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/conftest.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/conftest.py @@ -72,7 +72,7 @@ def environment(): ) # Allow capturing message content os.environ.setdefault( - "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "True" + "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "SPAN_ONLY" ) litellm.telemetry = False diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py new file mode 100644 index 000000000..caac8feef --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py @@ -0,0 +1,226 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import json +from types import SimpleNamespace + +import litellm + +from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor + + +def _chat_response(model: str, content: str): + return SimpleNamespace( + id=f"chatcmpl-{model}", + model=model, + choices=[ + SimpleNamespace( + message=SimpleNamespace( + role="assistant", + content=content, + tool_calls=None, + ), + finish_reason="stop", + ) + ], + usage=SimpleNamespace( + prompt_tokens=4, + completion_tokens=3, + total_tokens=7, + ), + ) + + +def _chunk(choices, usage=None): + return SimpleNamespace( + id="chatcmpl-stream", + model="qwen-turbo", + choices=choices, + usage=usage, + ) + + +def _choice(index, content=None, finish_reason=None, tool_calls=None): + return SimpleNamespace( + index=index, + delta=SimpleNamespace(content=content, tool_calls=tool_calls), + finish_reason=finish_reason, + ) + + +def _tool_delta(index, tool_call_id=None, name=None, arguments=None): + return SimpleNamespace( + index=index, + id=tool_call_id, + function=SimpleNamespace(name=name, arguments=arguments), + ) + + +def test_completion_positional_args_feed_genai_invocation( + monkeypatch, tracer_provider, span_exporter +): + def fake_completion(model, messages, **kwargs): + assert model == "qwen-turbo" + assert messages[0]["content"] == "hello" + assert kwargs["temperature"] == 0.2 + return _chat_response(model, "hello back") + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + litellm.completion( + "qwen-turbo", + [{"role": "user", "content": "hello"}], + temperature=0.2, + ) + finally: + instrumentor.uninstrument() + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.attributes["gen_ai.span.kind"] == "LLM" + assert span.attributes["gen_ai.provider.name"] == "dashscope" + assert span.attributes["gen_ai.request.model"] == "qwen-turbo" + + input_messages = json.loads(span.attributes["gen_ai.input.messages"]) + assert input_messages[0]["role"] == "user" + assert input_messages[0]["parts"][0]["content"] == "hello" + + +def test_streaming_completion_records_ttft_choices_and_tool_calls( + monkeypatch, tracer_provider, span_exporter +): + chunks = [ + _chunk( + [ + _choice(0, content="hel"), + _choice(1, content="bon"), + ] + ), + _chunk( + [ + _choice( + 0, + content="lo", + tool_calls=[ + _tool_delta( + 0, + tool_call_id="call_1", + name="lookup", + arguments='{"q":', + ) + ], + ), + _choice(1, content="jour"), + ] + ), + _chunk( + [ + _choice( + 0, + finish_reason="tool_calls", + tool_calls=[ + _tool_delta(0, arguments='"weather"}') + ], + ), + _choice(1, finish_reason="stop"), + ], + usage=SimpleNamespace( + prompt_tokens=6, + completion_tokens=5, + total_tokens=11, + ), + ), + ] + + def fake_completion(*args, **kwargs): + assert kwargs["stream"] is True + assert kwargs["stream_options"] == {"include_usage": True} + return iter(chunks) + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + response = litellm.completion( + model="qwen-turbo", + messages=[{"role": "user", "content": "stream please"}], + stream=True, + n=2, + ) + assert len(list(response)) == 3 + finally: + instrumentor.uninstrument() + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert "gen_ai.response.time_to_first_token" in span.attributes + assert span.attributes["gen_ai.request.choice.count"] == 2 + assert span.attributes["gen_ai.usage.input_tokens"] == 6 + assert span.attributes["gen_ai.usage.output_tokens"] == 5 + + output_messages = json.loads(span.attributes["gen_ai.output.messages"]) + assert len(output_messages) == 2 + assert output_messages[0]["parts"][0]["content"] == "hello" + assert output_messages[1]["parts"][0]["content"] == "bonjour" + tool_call = output_messages[0]["parts"][1] + assert tool_call["type"] == "tool_call" + assert tool_call["id"] == "call_1" + assert tool_call["name"] == "lookup" + assert tool_call["arguments"] == {"q": "weather"} + + +def test_async_completion_concurrent_calls_keep_separate_spans( + monkeypatch, tracer_provider, span_exporter +): + async def fake_acompletion(model, messages, **kwargs): + await asyncio.sleep(0.01 if model == "qwen-turbo" else 0) + return _chat_response(model, f"reply to {messages[0]['content']}") + + monkeypatch.setattr(litellm, "acompletion", fake_acompletion) + + async def run_calls(): + return await asyncio.gather( + litellm.acompletion( + "qwen-turbo", + [{"role": "user", "content": "first"}], + ), + litellm.acompletion( + "qwen-plus", + [{"role": "user", "content": "second"}], + ), + ) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + asyncio.run(run_calls()) + finally: + instrumentor.uninstrument() + + spans = span_exporter.get_finished_spans() + assert len(spans) == 2 + observed = { + json.loads(span.attributes["gen_ai.input.messages"])[0]["parts"][0][ + "content" + ]: span.attributes["gen_ai.request.model"] + for span in spans + } + assert observed == {"first": "qwen-turbo", "second": "qwen-plus"} From 5504097d155c805e344bdf03b54fd2409725c19f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=81=E5=B1=BF?= Date: Thu, 21 May 2026 13:49:09 +0800 Subject: [PATCH 2/4] Refine LiteLLM GenAI provider and stream handling --- .../examples/litellm_genai_smoke.py | 35 ++++++-- .../litellm/_stream_wrapper.py | 16 ++-- .../instrumentation/litellm/_utils.py | 83 ++++++++++++++---- .../tests/test_genai_util_wrapper.py | 87 ++++++++++++++++++- 4 files changed, 183 insertions(+), 38 deletions(-) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/examples/litellm_genai_smoke.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/examples/litellm_genai_smoke.py index 24fb5bddd..55c5eb2e1 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/examples/litellm_genai_smoke.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/examples/litellm_genai_smoke.py @@ -30,21 +30,36 @@ "LITELLM_API_BASE", "https://dashscope.aliyuncs.com/compatible-mode/v1", ) +CUSTOM_PROVIDER = os.getenv("LITELLM_CUSTOM_LLM_PROVIDER", "openai") def _configure_provider() -> None: - if os.getenv("DASHSCOPE_API_KEY") and not os.getenv("OPENAI_API_KEY"): - os.environ["OPENAI_API_KEY"] = os.environ["DASHSCOPE_API_KEY"] - - os.environ.setdefault("OPENAI_API_BASE", API_BASE) - os.environ.setdefault("DASHSCOPE_API_BASE", API_BASE) litellm.telemetry = False +def _provider_kwargs() -> dict[str, str]: + api_key = ( + os.getenv("LITELLM_API_KEY") + or os.getenv("DASHSCOPE_API_KEY") + or os.getenv("OPENAI_API_KEY") + ) + if not api_key: + raise SystemExit( + "Missing required API key: set LITELLM_API_KEY, " + "DASHSCOPE_API_KEY, or OPENAI_API_KEY" + ) + + return { + "custom_llm_provider": CUSTOM_PROVIDER, + "api_key": api_key, + "api_base": API_BASE, + } + + def run_non_streaming() -> None: response = litellm.completion( model=MODEL, - custom_llm_provider="openai", + **_provider_kwargs(), messages=[ { "role": "user", @@ -60,7 +75,7 @@ def run_non_streaming() -> None: def run_streaming() -> None: stream = litellm.completion( model=MODEL, - custom_llm_provider="openai", + **_provider_kwargs(), messages=[ { "role": "user", @@ -91,7 +106,7 @@ async def run_concurrent() -> None: async def call(prompt: str): return await litellm.acompletion( model=MODEL, - custom_llm_provider="openai", + **_provider_kwargs(), messages=[{"role": "user", "content": prompt}], temperature=0.1, max_tokens=32, @@ -100,7 +115,9 @@ async def call(prompt: str): responses = await asyncio.gather(*(call(prompt) for prompt in prompts)) print( "concurrent:", - ", ".join(response.choices[0].message.content[:24] for response in responses), + ", ".join( + response.choices[0].message.content[:24] for response in responses + ), ) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py index 28cf6ab7d..57cb0e40c 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py @@ -129,7 +129,8 @@ def get_output_messages(self) -> list[OutputMessage]: def finish_reasons(self) -> list[str]: finish_reasons = [] - for state in self._choice_states.values(): + for index in sorted(self._choice_states): + state = self._choice_states[index] if state["finish_reason"]: finish_reasons.append(state["finish_reason"]) return finish_reasons @@ -159,12 +160,11 @@ def _record_tool_calls( arguments = get_litellm_value(function, "arguments") if isinstance(arguments, str): - if isinstance(stored["arguments"], str): - stored["arguments"] += arguments - else: - stored["arguments"] = arguments + stored["arguments"] += arguments elif arguments: - stored["arguments"] = arguments + logger.debug( + "Skipping non-string LiteLLM streamed tool-call arguments" + ) class StreamWrapper: @@ -190,8 +190,6 @@ def __init__( self.last_chunk = None # Only keep last chunk to avoid memory leak self.chunk_count = 0 self._finalized = False - self.accumulated_content = [] # Accumulate content for output messages - self.accumulated_tool_calls = [] # Accumulate tool calls def __iter__(self): return self @@ -289,8 +287,6 @@ def __init__( self.chunk_count = 0 self._finalized = False self._stream_exhausted = False - self.accumulated_content = [] # Accumulate content for output messages - self.accumulated_tool_calls = [] # Accumulate tool calls def __aiter__(self): # Return an async generator that wraps the stream and ensures finalization diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py index c7697d574..a31278881 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py @@ -21,6 +21,7 @@ import logging from collections.abc import Callable, Mapping from typing import Any, Dict, List, Optional +from urllib.parse import urlparse from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import ( GenAiOperationNameValues, @@ -40,6 +41,19 @@ _COMPLETION_POSITIONAL_PARAMETERS = ("model", "messages") _EMBEDDING_POSITIONAL_PARAMETERS = ("model", "input") +_BASE_URL_PROVIDER_MAP = ( + ("dashscope.aliyuncs.com", "dashscope"), + ("api.openai.com", "openai"), + ("api.deepseek.com", "deepseek"), + ("anthropic.com", "anthropic"), + ("generativelanguage.googleapis.com", "google"), +) +_BASE_URL_KWARG_NAMES = ( + "api_base", + "base_url", + "api_endpoint", + "endpoint", +) def get_litellm_value(obj: Any, key: str, default: Any = None) -> Any: @@ -103,7 +117,7 @@ def parse_tool_call_arguments(arguments: Any) -> Any: if isinstance(arguments, str) and arguments: try: return json.loads(arguments) - except Exception: + except (TypeError, ValueError): return arguments return arguments @@ -146,7 +160,7 @@ def apply_litellm_embedding_response_to_invocation( invocation.response_model_name = response_model usage = get_litellm_value(response, "usage") - _apply_usage_to_invocation(invocation, usage) + _apply_usage_to_invocation(invocation, usage, include_output_tokens=False) data = get_litellm_value(response, "data") if not data: @@ -172,7 +186,12 @@ def extract_finish_reasons_from_litellm_response(response: Any) -> list[str]: return finish_reasons -def _apply_usage_to_invocation(invocation: Any, usage: Any) -> None: +def _apply_usage_to_invocation( + invocation: Any, + usage: Any, + *, + include_output_tokens: bool = True, +) -> None: if not usage: return @@ -180,12 +199,17 @@ def _apply_usage_to_invocation(invocation: Any, usage: Any) -> None: output_tokens = get_litellm_value(usage, "completion_tokens") total_tokens = get_litellm_value(usage, "total_tokens") - if output_tokens is None and input_tokens is not None and total_tokens: + if ( + include_output_tokens + and output_tokens is None + and input_tokens is not None + and total_tokens + ): output_tokens = max(total_tokens - input_tokens, 0) if input_tokens is not None: invocation.input_tokens = input_tokens - if output_tokens is not None: + if include_output_tokens and output_tokens is not None: invocation.output_tokens = output_tokens prompt_details = get_litellm_value(usage, "prompt_tokens_details") @@ -318,6 +342,41 @@ def parse_provider_from_model(model: str) -> Optional[str]: return "unknown" +def parse_provider_from_base_url(base_url: Any) -> Optional[str]: + """Infer provider from known OpenAI-compatible service endpoints.""" + if not base_url or not isinstance(base_url, str): + return None + + try: + host = urlparse(base_url).hostname or base_url + except ValueError: + host = base_url + + host = host.lower() + for fragment, provider in _BASE_URL_PROVIDER_MAP: + if fragment in host: + return provider + return None + + +def resolve_litellm_provider(model: Any, kwargs: Mapping[str, Any]) -> str: + """Resolve the actual GenAI provider for a LiteLLM request.""" + for name in _BASE_URL_KWARG_NAMES: + provider = parse_provider_from_base_url(kwargs.get(name)) + if provider: + return provider + + provider = parse_provider_from_model(model) + if provider not in (None, "unknown"): + return provider + + custom_provider = kwargs.get("custom_llm_provider") + if custom_provider: + return custom_provider + + return provider or "unknown" + + def parse_model_name(model: str) -> str: """ Parse model name by removing provider prefix. @@ -506,10 +565,7 @@ def create_llm_invocation_from_litellm(**kwargs): # Parse model name (remove provider prefix if present) model = kwargs.get("model", "unknown_model") - provider = parse_provider_from_model(model) - if provider in (None, "unknown"): - provider = kwargs.get("custom_llm_provider") or provider - provider = provider or "unknown" + provider = resolve_litellm_provider(model, kwargs) messages = kwargs.get("messages", []) # Convert messages to GenAI format @@ -519,7 +575,7 @@ def create_llm_invocation_from_litellm(**kwargs): invocation = LLMInvocation( request_model=request_model, - provider=provider or "unknown", + provider=provider, operation_name=GenAiOperationNameValues.CHAT.value, input_messages=input_messages, ) @@ -590,17 +646,14 @@ def create_embedding_invocation_from_litellm(**kwargs): # Extract request parameters model = kwargs.get("model", "unknown") - provider = parse_provider_from_model(model) - if provider in (None, "unknown"): - provider = kwargs.get("custom_llm_provider") or provider - provider = provider or "unknown" + provider = resolve_litellm_provider(model, kwargs) # Parse model name (remove provider prefix if present) request_model = parse_model_name(model) invocation = EmbeddingInvocation( request_model=request_model, - provider=provider or "unknown", + provider=provider, ) # Set encoding formats if present diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py index caac8feef..a7650b0ee 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py @@ -43,6 +43,18 @@ def _chat_response(model: str, content: str): ) +def _embedding_response(model: str): + return SimpleNamespace( + id=f"embd-{model}", + model=model, + data=[{"embedding": [0.1, 0.2, 0.3]}], + usage=SimpleNamespace( + prompt_tokens=5, + total_tokens=5, + ), + ) + + def _chunk(choices, usage=None): return SimpleNamespace( id="chatcmpl-stream", @@ -102,6 +114,67 @@ def fake_completion(model, messages, **kwargs): assert input_messages[0]["parts"][0]["content"] == "hello" +def test_provider_prefers_known_base_url_over_custom_adapter( + monkeypatch, tracer_provider, span_exporter +): + def fake_completion(model, messages, **kwargs): + assert model == "custom-compatible-model" + assert kwargs["custom_llm_provider"] == "openai" + assert "dashscope.aliyuncs.com" in kwargs["api_base"] + return _chat_response(model, "compatible response") + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + litellm.completion( + model="custom-compatible-model", + custom_llm_provider="openai", + api_base="https://dashscope.aliyuncs.com/compatible-mode/v1", + messages=[{"role": "user", "content": "hello"}], + ) + finally: + instrumentor.uninstrument() + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].attributes["gen_ai.provider.name"] == "dashscope" + + +def test_embedding_usage_records_input_tokens_only( + monkeypatch, tracer_provider, span_exporter +): + def fake_embedding(model, input_, **kwargs): + assert model == "text-embedding-v1" + assert input_ == "embed me" + return _embedding_response(model) + + monkeypatch.setattr(litellm, "embedding", fake_embedding) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + litellm.embedding( + "text-embedding-v1", + "embed me", + custom_llm_provider="openai", + api_base="https://dashscope.aliyuncs.com/compatible-mode/v1", + ) + finally: + instrumentor.uninstrument() + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.attributes["gen_ai.span.kind"] == "EMBEDDING" + assert span.attributes["gen_ai.provider.name"] == "dashscope" + assert span.attributes["gen_ai.usage.input_tokens"] == 5 + assert span.attributes["gen_ai.usage.total_tokens"] == 5 + assert "gen_ai.usage.output_tokens" not in span.attributes + assert span.attributes["gen_ai.embeddings.dimension.count"] == 3 + + def test_streaming_completion_records_ttft_choices_and_tool_calls( monkeypatch, tracer_provider, span_exporter ): @@ -129,14 +202,20 @@ def test_streaming_completion_records_ttft_choices_and_tool_calls( _choice(1, content="jour"), ] ), + _chunk( + [ + _choice( + 0, + tool_calls=[_tool_delta(0, arguments={"ignored": True})], + ), + ] + ), _chunk( [ _choice( 0, finish_reason="tool_calls", - tool_calls=[ - _tool_delta(0, arguments='"weather"}') - ], + tool_calls=[_tool_delta(0, arguments='"weather"}')], ), _choice(1, finish_reason="stop"), ], @@ -164,7 +243,7 @@ def fake_completion(*args, **kwargs): stream=True, n=2, ) - assert len(list(response)) == 3 + assert len(list(response)) == 4 finally: instrumentor.uninstrument() From c3342e370bea190f30b5d39694be01038f95be58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=81=E5=B1=BF?= Date: Thu, 21 May 2026 19:36:08 +0800 Subject: [PATCH 3/4] Fix LiteLLM GenAI review blockers --- .../CHANGELOG.md | 3 +- .../README.rst | 5 +- .../litellm/_embedding_wrapper.py | 3 - .../litellm/_stream_wrapper.py | 88 ++++- .../instrumentation/litellm/_utils.py | 317 +++++++---------- .../tests/test_genai_util_wrapper.py | 336 +++++++++++++++++- .../tests/test_sync_completion.py | 20 +- 7 files changed, 568 insertions(+), 204 deletions(-) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/CHANGELOG.md b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/CHANGELOG.md index 1fdb3125a..e3305ccfa 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/CHANGELOG.md +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/CHANGELOG.md @@ -11,7 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Improved LiteLLM GenAI util invocation mapping for positional arguments, streaming time-to-first-token, multi-choice outputs, tool-call deltas, and - real smoke examples. + a real smoke example + ([#191](https://github.com/alibaba/loongsuite-python-agent/pull/191)). ## Version 0.5.0 (2026-05-11) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst index 498a6aed9..452968579 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst @@ -25,7 +25,7 @@ Configuration The instrumentation can be enabled/disabled using environment variables: * ``ENABLE_LITELLM_INSTRUMENTOR``: Enable/disable instrumentation (default: true) -* ``OTEL_SEMCONV_STABILITY_OPT_IN=gen_ai_latest_experimental``: Enable GenAI semantic conventions +* ``OTEL_SEMCONV_STABILITY_OPT_IN``: Set to ``gen_ai_latest_experimental`` to enable GenAI semantic conventions * ``OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT``: Set to ``NO_CONTENT``, ``SPAN_ONLY``, ``EVENT_ONLY``, or ``SPAN_AND_EVENT`` Usage @@ -81,6 +81,9 @@ This instrumentation automatically captures: * Embedding calls * Retry mechanisms * Tool/function calls +* Provider inference from known OpenAI-compatible base URLs, custom providers, and model names +* Streaming time-to-first-token, including reasoning/thinking deltas +* Multi-choice streaming outputs and tool-call delta accumulation * Request and response metadata * Token usage * Model information diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py index a3cc2bcda..a954ce999 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py @@ -16,7 +16,6 @@ Embedding wrapper for LiteLLM instrumentation. """ -import logging import os from typing import Callable @@ -29,8 +28,6 @@ ) from opentelemetry.util.genai.types import Error -logger = logging.getLogger(__name__) - def _is_instrumentation_enabled() -> bool: """Check if instrumentation is enabled via environment variable.""" diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py index 57cb0e40c..20f6af70d 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py @@ -21,10 +21,16 @@ from typing import Any, Iterator, Optional from opentelemetry.instrumentation.litellm._utils import ( + extract_litellm_text_parts, get_litellm_value, parse_tool_call_arguments, ) -from opentelemetry.util.genai.types import OutputMessage, Text, ToolCall +from opentelemetry.util.genai.types import ( + OutputMessage, + Reasoning, + Text, + ToolCall, +) logger = logging.getLogger(__name__) @@ -51,6 +57,7 @@ def record_chunk(self, chunk: Any) -> None: index, { "role": "assistant", + "reasoning": [], "content": [], "finish_reason": None, "tool_calls": {}, @@ -70,8 +77,17 @@ def record_chunk(self, chunk: Any) -> None: state["role"] = role content = get_litellm_value(delta, "content") - if content: - state["content"].append(content) + content_parts = extract_litellm_text_parts(content) + if content_parts: + state["content"].extend(content_parts) + saw_token = True + + reasoning_content = get_litellm_value(delta, "reasoning_content") + if reasoning_content is None: + reasoning_content = get_litellm_value(delta, "reasoning") + reasoning_parts = extract_litellm_text_parts(reasoning_content) + if reasoning_parts: + state["reasoning"].extend(reasoning_parts) saw_token = True tool_calls = get_litellm_value(delta, "tool_calls") @@ -93,6 +109,10 @@ def get_output_messages(self) -> list[OutputMessage]: for index in sorted(self._choice_states): state = self._choice_states[index] parts = [] + reasoning = "".join(state["reasoning"]) + if reasoning: + parts.append(Reasoning(content=reasoning)) + content = "".join(state["content"]) if content: parts.append(Text(content=content)) @@ -116,7 +136,7 @@ def get_output_messages(self) -> list[OutputMessage]: ) if not parts: - continue + parts.append(Text(content="")) output_messages.append( OutputMessage( @@ -176,6 +196,8 @@ class StreamWrapper: Supports context manager protocol for reliable cleanup. """ + _warned_unclosed_stream = False + def __init__( self, stream: Iterator, @@ -233,12 +255,40 @@ def close(self): """Explicitly close and finalize the stream.""" self._finalize() + def __del__(self): + if getattr(self, "_finalized", True): + return + + if not StreamWrapper._warned_unclosed_stream: + StreamWrapper._warned_unclosed_stream = True + logger.warning( + "LiteLLM stream wrapper was garbage-collected before close; " + "finalizing the span. Use a context manager or call close() " + "when terminating streams early." + ) + + try: + self._finalize() + except Exception as exc: + logger.debug("Error finalizing unclosed LiteLLM stream: %s", exc) + + def _close_stream(self) -> None: + close = getattr(self.stream, "close", None) + if not callable(close): + return + + try: + close() + except Exception as exc: + logger.debug("Error closing LiteLLM stream: %s", exc) + def _finalize(self, error: Optional[Exception] = None): """Finalize the span with data from last chunk.""" if self._finalized: return self._finalized = True + self._close_stream() try: # Call the callback with only the last chunk # Note: The callback is responsible for calling handler.stop_llm() @@ -300,6 +350,7 @@ async def _wrapped_iteration(self): 2. An exception occurs 3. The generator is closed early (via aclose()) """ + error = None try: async for chunk in self.stream: self._accumulator.record_chunk(chunk) @@ -317,11 +368,12 @@ async def _wrapped_iteration(self): except Exception as e: # Error during streaming logger.debug(f"AsyncStreamWrapper: Error during streaming: {e}") - self._finalize(error=e) + error = e raise finally: # Always finalize, whether completed normally, with error, or closed early - self._finalize() + await self._aclose_stream() + self._finalize(error=error) async def __aenter__(self): """Support async context manager protocol.""" @@ -329,6 +381,7 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): """Ensure finalization on async context exit.""" + await self._aclose_stream() if exc_type is not None: # Exception occurred during iteration self._finalize(error=exc_val) @@ -339,12 +392,35 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): async def aclose(self): """Explicitly close and finalize the async stream.""" + await self._aclose_stream() self._finalize() def close(self): """Synchronous close method for compatibility.""" + self._close_stream() self._finalize() + def _close_stream(self) -> None: + close = getattr(self.stream, "close", None) + if not callable(close): + return + + try: + close() + except Exception as exc: + logger.debug("Error closing LiteLLM async stream: %s", exc) + + async def _aclose_stream(self) -> None: + aclose = getattr(self.stream, "aclose", None) + if callable(aclose): + try: + await aclose() + return + except Exception as exc: + logger.debug("Error closing LiteLLM async stream: %s", exc) + + self._close_stream() + def _finalize(self, error: Optional[Exception] = None): """Finalize the span with data from last chunk.""" if self._finalized: diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py index a31278881..cceebb523 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py @@ -20,7 +20,7 @@ import json import logging from collections.abc import Callable, Mapping -from typing import Any, Dict, List, Optional +from typing import Any, List, Optional from urllib.parse import urlparse from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import ( @@ -32,6 +32,7 @@ InputMessage, LLMInvocation, OutputMessage, + Reasoning, Text, ToolCall, ToolCallResponse, @@ -54,6 +55,7 @@ "api_endpoint", "endpoint", ) +_SYSTEM_INSTRUCTION_ROLES = frozenset(("system", "developer")) def get_litellm_value(obj: Any, key: str, default: Any = None) -> Any: @@ -122,6 +124,31 @@ def parse_tool_call_arguments(arguments: Any) -> Any: return arguments +def extract_litellm_text_parts(content: Any) -> list[str]: + """Extract text strings from LiteLLM text or multimodal content.""" + if isinstance(content, str): + return [content] if content else [] + + if not isinstance(content, list): + return [] + + text_parts = [] + for item in content: + if isinstance(item, str): + if item: + text_parts.append(item) + continue + + if not isinstance(item, Mapping) or item.get("type") != "text": + continue + + text = item.get("text", item.get("content", "")) + if isinstance(text, str) and text: + text_parts.append(text) + + return text_parts + + def apply_litellm_llm_response_to_invocation( invocation: LLMInvocation, response: Any, @@ -228,95 +255,6 @@ def _apply_usage_to_invocation( invocation.usage_cache_creation_input_tokens = cache_creation_tokens -def convert_messages_to_structured_format( - messages: List[Dict[str, Any]], -) -> List[Dict[str, Any]]: - """ - Convert LiteLLM message format to structured format required by semantic conventions. - - Converts from: - {"role": "user", "content": "..."} - To: - {"role": "user", "parts": [{"type": "text", "content": "..."}]} - """ - if not isinstance(messages, list): - return [] - - structured_messages = [] - for msg in messages: - if not isinstance(msg, dict): - continue - - role = msg.get("role", "") - structured_msg = {"role": role, "parts": []} - - # Handle text content - if "content" in msg and msg["content"]: - content = msg["content"] - if isinstance(content, str): - structured_msg["parts"].append( - {"type": "text", "content": content} - ) - elif isinstance(content, list): - # Handle multi-modal content - for item in content: - if isinstance(item, dict): - if item.get("type") == "text": - structured_msg["parts"].append( - { - "type": "text", - "content": item.get("text", ""), - } - ) - else: - structured_msg["parts"].append(item) - - # Handle tool calls - if "tool_calls" in msg and msg["tool_calls"]: - for tool_call in msg["tool_calls"]: - if not isinstance(tool_call, dict): - continue - - tool_part = {"type": "tool_call"} - if "id" in tool_call: - tool_part["id"] = tool_call["id"] - if "function" in tool_call: - func = tool_call["function"] - if isinstance(func, dict): - if "name" in func: - tool_part["name"] = func["name"] - if "arguments" in func: - try: - # Try to parse arguments if it's a JSON string - args_str = func["arguments"] - if isinstance(args_str, str): - tool_part["arguments"] = json.loads( - args_str - ) - else: - tool_part["arguments"] = args_str - except Exception: - tool_part["arguments"] = func.get( - "arguments", "" - ) - - structured_msg["parts"].append(tool_part) - - # Handle tool call responses - if role == "tool" and "content" in msg: - tool_response_part = { - "type": "tool_call_response", - "response": msg["content"], - } - if "tool_call_id" in msg: - tool_response_part["id"] = msg["tool_call_id"] - structured_msg["parts"].append(tool_response_part) - - structured_messages.append(structured_msg) - - return structured_messages - - def parse_provider_from_model(model: str) -> Optional[str]: """ Parse provider name from model string. @@ -366,14 +304,14 @@ def resolve_litellm_provider(model: Any, kwargs: Mapping[str, Any]) -> str: if provider: return provider - provider = parse_provider_from_model(model) - if provider not in (None, "unknown"): - return provider - custom_provider = kwargs.get("custom_llm_provider") if custom_provider: return custom_provider + provider = parse_provider_from_model(model) + if provider not in (None, "unknown"): + return provider + return provider or "unknown" @@ -395,34 +333,8 @@ def parse_model_name(model: str) -> str: return model -def safe_json_dumps(obj: Any, default: str = "{}") -> str: - """ - Safely serialize object to JSON string. - """ - try: - return json.dumps(obj, ensure_ascii=False) - except Exception as e: - logger.debug(f"Failed to serialize object to JSON: {e}") - return default - - -def convert_tool_definitions(tools: List[Dict[str, Any]]) -> str: - """ - Convert tool definitions to JSON string format. - """ - if not tools: - return "[]" - - try: - # Tools are typically in format: [{"type": "function", "function": {...}}] - return json.dumps(tools, ensure_ascii=False) - except Exception as e: - logger.debug(f"Failed to convert tool definitions: {e}") - return "[]" - - def convert_litellm_messages_to_genai_format( - messages: List[Dict[str, Any]], + messages: list[dict[str, Any]], ) -> List: """ Convert LiteLLM message format to OpenTelemetry GenAI InputMessage format. @@ -440,47 +352,10 @@ def convert_litellm_messages_to_genai_format( continue role = msg.get("role", "user") - parts = [] - - # Handle text content - if "content" in msg and msg["content"]: - content = msg["content"] - if isinstance(content, str): - parts.append(Text(content=content)) - elif isinstance(content, list): - # Handle multi-modal content - for item in content: - if isinstance(item, dict) and item.get("type") == "text": - parts.append(Text(content=item.get("text", ""))) - # Other content types (image, etc.) can be added here - - # Handle tool calls - if "tool_calls" in msg and msg["tool_calls"]: - for tool_call in msg["tool_calls"]: - if not isinstance(tool_call, dict): - continue - - func = tool_call.get("function", {}) - if isinstance(func, dict): - arguments = parse_tool_call_arguments( - func.get("arguments", "") - ) - - parts.append( - ToolCall( - id=tool_call.get("id"), - name=func.get("name", ""), - arguments=arguments, - ) - ) + if role in _SYSTEM_INSTRUCTION_ROLES: + continue - # Handle tool call responses - if role == "tool" and "content" in msg: - parts.append( - ToolCallResponse( - id=msg.get("tool_call_id"), response=msg["content"] - ) - ) + parts = _extract_message_parts(msg, role) # If no parts added, add empty text if not parts: @@ -491,6 +366,64 @@ def convert_litellm_messages_to_genai_format( return input_messages +def extract_system_instruction_from_litellm_messages( + messages: list[dict[str, Any]], +) -> list: + """Extract system/developer instructions from LiteLLM messages.""" + if not isinstance(messages, list): + return [] + + system_instruction = [] + for msg in messages: + if not isinstance(msg, dict): + continue + + if msg.get("role") not in _SYSTEM_INSTRUCTION_ROLES: + continue + + for text in extract_litellm_text_parts(msg.get("content")): + system_instruction.append(Text(content=text)) + + return system_instruction + + +def _extract_message_parts(msg: Mapping[str, Any], role: str) -> list: + parts = [] + + for text in extract_litellm_text_parts(msg.get("content")): + parts.append(Text(content=text)) + + # Handle tool calls + if "tool_calls" in msg and msg["tool_calls"]: + for tool_call in msg["tool_calls"]: + if not isinstance(tool_call, Mapping): + continue + + func = tool_call.get("function", {}) + if isinstance(func, Mapping): + arguments = parse_tool_call_arguments( + func.get("arguments", "") + ) + + parts.append( + ToolCall( + id=tool_call.get("id"), + name=func.get("name", ""), + arguments=arguments, + ) + ) + + # Handle tool call responses + if role == "tool" and "content" in msg: + parts.append( + ToolCallResponse( + id=msg.get("tool_call_id"), response=msg["content"] + ) + ) + + return parts + + def extract_output_from_litellm_response(response: Any) -> List: """ Extract output messages from LiteLLM response. @@ -505,32 +438,37 @@ def extract_output_from_litellm_response(response: Any) -> List: output_messages = [] for choice in choices: msg = get_litellm_value(choice, "message") - if msg is None: - continue - parts = [] + role = "assistant" - # Extract text content - content = get_litellm_value(msg, "content") - if content: - parts.append(Text(content=content)) + if msg is not None: + role = get_litellm_value(msg, "role", "assistant") - # Extract tool calls - tool_calls = get_litellm_value(msg, "tool_calls") - if tool_calls: - for tc in tool_calls: - function = get_litellm_value(tc, "function") - arguments = parse_tool_call_arguments( - get_litellm_value(function, "arguments", "") - ) + reasoning_content = get_litellm_value(msg, "reasoning_content") + for text in extract_litellm_text_parts(reasoning_content): + parts.append(Reasoning(content=text)) - parts.append( - ToolCall( - id=get_litellm_value(tc, "id"), - name=get_litellm_value(function, "name", ""), - arguments=arguments, + # Extract text content + content = get_litellm_value(msg, "content") + for text in extract_litellm_text_parts(content): + parts.append(Text(content=text)) + + # Extract tool calls + tool_calls = get_litellm_value(msg, "tool_calls") + if tool_calls: + for tc in tool_calls: + function = get_litellm_value(tc, "function") + arguments = parse_tool_call_arguments( + get_litellm_value(function, "arguments", "") + ) + + parts.append( + ToolCall( + id=get_litellm_value(tc, "id"), + name=get_litellm_value(function, "name", ""), + arguments=arguments, + ) ) - ) # If no parts, add empty text if not parts: @@ -540,7 +478,7 @@ def extract_output_from_litellm_response(response: Any) -> List: output_messages.append( OutputMessage( - role=get_litellm_value(msg, "role", "assistant"), + role=role, parts=parts, finish_reason=finish_reason, ) @@ -553,9 +491,11 @@ def create_llm_invocation_from_litellm(**kwargs): """ Create LLMInvocation from LiteLLM request parameters. + The provider is resolved from known base URLs, custom_llm_provider, or the + model name. + Args: model: The model name (e.g., "gpt-4", "openai/gpt-4") - provider: The provider name (e.g., "openai", "dashscope") messages: List of message dictionaries **kwargs: Additional request parameters (temperature, max_tokens, etc.) @@ -570,6 +510,9 @@ def create_llm_invocation_from_litellm(**kwargs): # Convert messages to GenAI format input_messages = convert_litellm_messages_to_genai_format(messages) + system_instruction = extract_system_instruction_from_litellm_messages( + messages + ) request_model = parse_model_name(model) @@ -579,6 +522,8 @@ def create_llm_invocation_from_litellm(**kwargs): operation_name=GenAiOperationNameValues.CHAT.value, input_messages=input_messages, ) + if system_instruction: + invocation.system_instruction = system_instruction # Set optional request parameters if "temperature" in kwargs and kwargs["temperature"] is not None: @@ -635,9 +580,11 @@ def create_embedding_invocation_from_litellm(**kwargs): """ Create EmbeddingInvocation from LiteLLM embedding request parameters. + The provider is resolved from known base URLs, custom_llm_provider, or the + model name. + Args: model: The embedding model name - provider: The provider name **kwargs: Additional request parameters Returns: diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py index a7650b0ee..0f0c02d9e 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py @@ -18,6 +18,8 @@ import litellm +from opentelemetry import context as otel_context +from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor @@ -64,10 +66,20 @@ def _chunk(choices, usage=None): ) -def _choice(index, content=None, finish_reason=None, tool_calls=None): +def _choice( + index, + content=None, + finish_reason=None, + tool_calls=None, + reasoning_content=None, +): return SimpleNamespace( index=index, - delta=SimpleNamespace(content=content, tool_calls=tool_calls), + delta=SimpleNamespace( + content=content, + reasoning_content=reasoning_content, + tool_calls=tool_calls, + ), finish_reason=finish_reason, ) @@ -80,6 +92,42 @@ def _tool_delta(index, tool_call_id=None, name=None, arguments=None): ) +class _ClosableIterator: + def __init__(self, chunks): + self._iterator = iter(chunks) + self.closed = False + + def __iter__(self): + return self + + def __next__(self): + return next(self._iterator) + + def close(self): + self.closed = True + + +class _AsyncClosableStream: + def __init__(self, chunks): + self._chunks = list(chunks) + self._index = 0 + self.closed = False + + def __aiter__(self): + return self + + async def __anext__(self): + if self._index >= len(self._chunks): + raise StopAsyncIteration + + chunk = self._chunks[self._index] + self._index += 1 + return chunk + + async def aclose(self): + self.closed = True + + def test_completion_positional_args_feed_genai_invocation( monkeypatch, tracer_provider, span_exporter ): @@ -114,6 +162,49 @@ def fake_completion(model, messages, **kwargs): assert input_messages[0]["parts"][0]["content"] == "hello" +def test_provider_prefers_custom_provider_over_model_heuristic_and_system_split( + monkeypatch, tracer_provider, span_exporter +): + def fake_completion(model, messages, **kwargs): + assert model == "gpt-4" + assert kwargs["custom_llm_provider"] == "azure" + assert messages[0]["role"] == "system" + return _chat_response(model, "azure response") + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + litellm.completion( + model="gpt-4", + custom_llm_provider="azure", + messages=[ + {"role": "system", "content": "system rules"}, + {"role": "developer", "content": "developer rules"}, + {"role": "user", "content": "hello"}, + ], + ) + finally: + instrumentor.uninstrument() + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.attributes["gen_ai.provider.name"] == "azure" + + input_messages = json.loads(span.attributes["gen_ai.input.messages"]) + assert [message["role"] for message in input_messages] == ["user"] + + system_instructions = json.loads( + span.attributes["gen_ai.system_instructions"] + ) + assert [part["content"] for part in system_instructions] == [ + "system rules", + "developer rules", + ] + + def test_provider_prefers_known_base_url_over_custom_adapter( monkeypatch, tracer_provider, span_exporter ): @@ -142,6 +233,104 @@ def fake_completion(model, messages, **kwargs): assert spans[0].attributes["gen_ai.provider.name"] == "dashscope" +def test_completion_usage_falls_back_to_total_minus_prompt_tokens( + monkeypatch, tracer_provider, span_exporter +): + def fake_completion(model, messages, **kwargs): + assert model == "qwen-turbo" + return SimpleNamespace( + id="chatcmpl-fallback-usage", + model=model, + choices=[ + SimpleNamespace( + message=SimpleNamespace( + role="assistant", + content="fallback usage", + tool_calls=None, + ), + finish_reason="stop", + ) + ], + usage=SimpleNamespace(prompt_tokens=4, total_tokens=9), + ) + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + litellm.completion( + model="qwen-turbo", + messages=[{"role": "user", "content": "hello"}], + ) + finally: + instrumentor.uninstrument() + + span = span_exporter.get_finished_spans()[0] + assert span.attributes["gen_ai.usage.input_tokens"] == 4 + assert span.attributes["gen_ai.usage.output_tokens"] == 5 + assert span.attributes["gen_ai.usage.total_tokens"] == 9 + + +def test_suppressed_instrumentation_skips_completion_span( + monkeypatch, tracer_provider, span_exporter +): + def fake_completion(model, messages, **kwargs): + return _chat_response(model, "not traced") + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + token = None + instrumentor.instrument(tracer_provider=tracer_provider) + try: + ctx = otel_context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True) + token = otel_context.attach(ctx) + litellm.completion( + model="qwen-turbo", + messages=[{"role": "user", "content": "hello"}], + ) + finally: + if token is not None: + otel_context.detach(token) + instrumentor.uninstrument() + + assert not span_exporter.get_finished_spans() + + +def test_no_content_mode_omits_messages_but_keeps_metadata( + monkeypatch, tracer_provider, span_exporter +): + monkeypatch.setenv( + "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "NO_CONTENT" + ) + + def fake_completion(model, messages, **kwargs): + return _chat_response(model, "content hidden") + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + litellm.completion( + model="qwen-turbo", + messages=[ + {"role": "system", "content": "secret system"}, + {"role": "user", "content": "secret user"}, + ], + ) + finally: + instrumentor.uninstrument() + + span = span_exporter.get_finished_spans()[0] + assert span.attributes["gen_ai.span.kind"] == "LLM" + assert span.attributes["gen_ai.request.model"] == "qwen-turbo" + assert "gen_ai.input.messages" not in span.attributes + assert "gen_ai.output.messages" not in span.attributes + assert "gen_ai.system_instructions" not in span.attributes + + def test_embedding_usage_records_input_tokens_only( monkeypatch, tracer_provider, span_exporter ): @@ -266,6 +455,149 @@ def fake_completion(*args, **kwargs): assert tool_call["arguments"] == {"q": "weather"} +def test_streaming_reasoning_multimodal_content_and_empty_choice( + monkeypatch, tracer_provider, span_exporter +): + chunks = [ + _chunk( + [ + _choice(0, reasoning_content="thinking"), + _choice(1, finish_reason="stop"), + ] + ), + _chunk([_choice(0, content={"unexpected": True})]), + _chunk( + [ + _choice( + 0, + content=[ + {"type": "text", "text": "hello"}, + {"type": "image_url", "image_url": {"url": "x"}}, + " world", + ], + finish_reason="stop", + ) + ], + usage=SimpleNamespace( + prompt_tokens=3, + completion_tokens=2, + total_tokens=5, + ), + ), + ] + + def fake_completion(*args, **kwargs): + assert kwargs["stream"] is True + return iter(chunks) + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + response = litellm.completion( + model="qwen-turbo", + messages=[{"role": "user", "content": "reason"}], + stream=True, + n=2, + ) + assert len(list(response)) == 3 + finally: + instrumentor.uninstrument() + + span = span_exporter.get_finished_spans()[0] + assert "gen_ai.response.time_to_first_token" in span.attributes + output_messages = json.loads(span.attributes["gen_ai.output.messages"]) + assert len(output_messages) == 2 + assert output_messages[0]["parts"][0] == { + "content": "thinking", + "type": "reasoning", + } + assert output_messages[0]["parts"][1] == { + "content": "hello world", + "type": "text", + } + assert output_messages[1]["parts"] == [{"content": "", "type": "text"}] + + +def test_streaming_close_closes_underlying_stream_and_finalizes( + monkeypatch, tracer_provider, span_exporter +): + stream = _ClosableIterator( + [ + _chunk([_choice(0, content="partial")]), + _chunk([_choice(0, content=" ignored", finish_reason="stop")]), + ] + ) + + def fake_completion(*args, **kwargs): + assert kwargs["stream"] is True + return stream + + monkeypatch.setattr(litellm, "completion", fake_completion) + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + response = litellm.completion( + model="qwen-turbo", + messages=[{"role": "user", "content": "stream"}], + stream=True, + ) + next(response) + response.close() + finally: + instrumentor.uninstrument() + + assert stream.closed is True + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + output_messages = json.loads(spans[0].attributes["gen_ai.output.messages"]) + assert output_messages[0]["parts"][0]["content"] == "partial" + + +def test_async_streaming_aclose_closes_stream_and_finalizes( + monkeypatch, tracer_provider, span_exporter +): + captured = {} + + async def fake_acompletion(*args, **kwargs): + assert kwargs["stream"] is True + stream = _AsyncClosableStream( + [ + _chunk([_choice(0, content="async partial")]), + _chunk([_choice(0, content=" ignored", finish_reason="stop")]), + ] + ) + captured["stream"] = stream + return stream + + monkeypatch.setattr(litellm, "acompletion", fake_acompletion) + + async def run_call(): + response = await litellm.acompletion( + model="qwen-turbo", + messages=[{"role": "user", "content": "stream"}], + stream=True, + ) + iterator = response.__aiter__() + await iterator.__anext__() + await iterator.aclose() + + instrumentor = LiteLLMInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + asyncio.run(run_call()) + finally: + instrumentor.uninstrument() + + assert captured["stream"].closed is True + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + output_messages = json.loads(spans[0].attributes["gen_ai.output.messages"]) + assert output_messages[0]["parts"][0]["content"] == "async partial" + + def test_async_completion_concurrent_calls_keep_separate_spans( monkeypatch, tracer_provider, span_exporter ): diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_sync_completion.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_sync_completion.py index b49b8890c..9c678bed0 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_sync_completion.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_sync_completion.py @@ -193,16 +193,24 @@ def test_sync_completion_with_multiple_messages(self): self.assertEqual(len(spans), 1) span = spans[0] - # Verify all messages captured in sequence + # Verify system instructions are separated from input messages. self.assertIn("gen_ai.input.messages", span.attributes) input_messages = json.loads( span.attributes.get("gen_ai.input.messages") ) - self.assertEqual(len(input_messages), 4) - self.assertEqual(input_messages[0]["role"], "system") - self.assertEqual(input_messages[1]["role"], "user") - self.assertEqual(input_messages[2]["role"], "assistant") - self.assertEqual(input_messages[3]["role"], "user") + self.assertEqual(len(input_messages), 3) + self.assertEqual(input_messages[0]["role"], "user") + self.assertEqual(input_messages[1]["role"], "assistant") + self.assertEqual(input_messages[2]["role"], "user") + + self.assertIn("gen_ai.system_instructions", span.attributes) + system_instructions = json.loads( + span.attributes.get("gen_ai.system_instructions") + ) + self.assertEqual( + system_instructions[0]["content"], + "You are a helpful assistant that provides concise answers.", + ) output_messages = json.loads( span.attributes.get("gen_ai.output.messages") From 0ed5a1b17b2b6b5cc8d39a67d500dc99774ed08f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=81=E5=B1=BF?= Date: Tue, 26 May 2026 14:21:21 +0800 Subject: [PATCH 4/4] test(litellm): fix GenAI wrapper CI checks --- .../tests/test_genai_util_wrapper.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py index 0f0c02d9e..7704b2406 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_genai_util_wrapper.py @@ -211,7 +211,10 @@ def test_provider_prefers_known_base_url_over_custom_adapter( def fake_completion(model, messages, **kwargs): assert model == "custom-compatible-model" assert kwargs["custom_llm_provider"] == "openai" - assert "dashscope.aliyuncs.com" in kwargs["api_base"] + assert ( + kwargs["api_base"] + == "https://dashscope.aliyuncs.com/compatible-mode/v1" + ) return _chat_response(model, "compatible response") monkeypatch.setattr(litellm, "completion", fake_completion) @@ -370,7 +373,8 @@ def test_streaming_completion_records_ttft_choices_and_tool_calls( chunks = [ _chunk( [ - _choice(0, content="hel"), + # Keep this split to avoid a codespell false positive. + _choice(0, content="he"), _choice(1, content="bon"), ] ), @@ -378,7 +382,7 @@ def test_streaming_completion_records_ttft_choices_and_tool_calls( [ _choice( 0, - content="lo", + content="llo", tool_calls=[ _tool_delta( 0,