feat: add CAMB AI integration with 8 components#11876
feat: add CAMB AI integration with 8 components#11876neilruaro-camb wants to merge 2 commits intolangflow-ai:mainfrom
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughThis pull request introduces a complete CAMB.AI integration for Langflow with eight new components: Text-to-Speech, Translation, Transcription, Translated TTS, Voice Cloning, Voice Listing, Text-to-Sound, and Audio Separation. Each component includes async execution, error handling, and CAMB SDK integration, along with comprehensive unit and integration tests. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 6 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (6 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (4)
src/lfx/src/lfx/components/camb/camb_text_to_sound.py (1)
57-79: Preferis not Nonefor optional numeric inputs (duration).Line 61 uses truthiness, so
duration=0.0would be ignored. Small tweak:Proposed tweak
kwargs: dict[str, Any] = {"prompt": self.prompt} - if self.duration: + if self.duration is not None: kwargs["duration"] = self.duration(Optional) If generated audio can be large, consider writing chunks directly to a temp file instead of accumulating all bytes in memory first.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/lfx/src/lfx/components/camb/camb_text_to_sound.py` around lines 57 - 79, In _generate_sound_async, the duration check uses truthiness so a valid 0.0 value would be skipped; change the condition that adds duration to kwargs to use "if self.duration is not None" (referencing the local variable duration in the _generate_sound_async method) so zero is preserved; optionally, to avoid high memory use, stream chunks to a temporary file as they're iterated from client.text_to_audio.get_text_to_audio_result instead of accumulating them in the chunks list before calling save_audio.src/lfx/tests/unit/components/camb/test_camb_components.py (3)
256-259: Wrong# noqatag —RET504doesn't apply here.
RET504flags "unnecessary assignment beforereturn", which is unrelated to making a function an async generator. The real intent is just preventing Ruff from flaggingyieldas unreachable code afterreturn. If Ruff flags it at all, the correct suppression is# noqa: B901(return-in-generator) or simply leave the comment as a plain explanation without a noqa tag.♻️ Proposed fix
- yield # noqa: RET504 – make it an async generator + yield # make it an async generator🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/lfx/tests/unit/components/camb/test_camb_components.py` around lines 256 - 259, The fake_tts async function uses a trailing unreachable yield to make it an async generator but the noqa tag is wrong; update the suppression on the yield in fake_tts to the correct rule (replace "# noqa: RET504" with "# noqa: B901") or remove the noqa and use a plain explanatory comment so Ruff doesn't incorrectly flag return-in-generator; locate the fake_tts definition and adjust the comment on the yield accordingly.
337-376: Missing test forrun_id-absent case inTestTranscribeComponent.
TestTranslatedTTSComponentexplicitly covers the no-run_idscenario (Mock(spec=[])) and asserts an"error"payload is returned.TestTranscribeComponenthas no equivalent test, leaving theAttributeErrorpath fromstatus.run_id(line 74 ofcamb_transcribe.py) uncovered. Add atest_transcribe_missing_run_idcase to match the coverage pattern used in the other component tests.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/lfx/tests/unit/components/camb/test_camb_components.py` around lines 337 - 376, Add a new unit test in TestTranscribeComponent named test_transcribe_missing_run_id that mirrors the other tests' patching of lfx.components.camb.camb_transcribe.get_async_client and .poll_task, return a status object created with Mock(spec=[]) (so it has no run_id attribute), and call CambTranscribeComponent().transcribe_audio() (after setting api_key, language and audio_url) and assert that result.data contains an "error" key; this covers the AttributeError path from accessing status.run_id in camb_transcribe.py.
87-92: Hardcoded magic number== 8will break on the next component addition.
assert len(camb_entries) == 8is fragile. Prefer asserting>= 8or comparing againstlen(camb.__all__)to stay in sync automatically.♻️ Proposed fix
- assert len(camb_entries) == 8 + import lfx.components.camb as camb_mod + assert len(camb_entries) == len(camb_mod.__all__)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/lfx/tests/unit/components/camb/test_camb_components.py` around lines 87 - 92, The test test_component_discovery_from_parent asserts a hardcoded length (len(camb_entries) == 8) which is brittle; update the assertion in the test to be resilient by either asserting a minimum expected count (e.g., assert len(camb_entries) >= 8) or by importing the camb package and comparing against its export list (e.g., assert len(camb_entries) == len(camb.__all__)). Locate the test using _discover_components_from_module and _dynamic_imports and replace the equality check on camb_entries with one of the two suggested assertions so the test won’t break when components are added.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/lfx/src/lfx/components/camb/__init__.py`:
- Around line 28-37: The __all__ list in this module is unsorted and triggers
Ruff RUF022; sort the entries in the __all__ list alphabetically (by string) so
the exported names like "CambAudioSeparationComponent",
"CambTextToSoundComponent", "CambTranscribeComponent", "CambTranslateComponent",
"CambTranslatedTTSComponent", "CambTTSComponent", "CambVoiceCloneComponent", and
"CambVoiceListComponent" appear in sorted order; update the __all__ declaration
accordingly to satisfy the linter.
In `@src/lfx/src/lfx/components/camb/_helpers.py`:
- Around line 53-57: The temp file created by save_audio uses
tempfile.NamedTemporaryFile(..., delete=False) and is never auto-reclaimed;
update save_audio's contract: either (A) document in save_audio's docstring that
callers are responsible for removing the returned path and show recommended
cleanup (e.g., os.remove), or (B) provide an alternative context-manager API
(e.g., save_audio_tempfile or make_save_audio_context) that yields the temp-file
path and ensures automatic deletion on exit; refer to the save_audio function
and its use of tempfile.NamedTemporaryFile when implementing or documenting this
change.
- Around line 11-23: Remove the unused client parameter from the function
signature of poll_task and update every call site that passes a client (e.g.,
where poll_task(...) is invoked in components like camb_transcribe) to drop that
argument; inside poll_task keep using get_status_fn(task_id) and shorten long
lines accordingly. Replace direct f-string messages inside raise statements by
first assigning the message to a local variable (e.g., msg = f"...") and then
raise the exception with that variable (raise RuntimeError(msg) and raise
TimeoutError(msg)) to satisfy the TRY003/EM102 checks. Ensure you reference the
poll_task function name when making edits so all callers are updated.
In `@src/lfx/src/lfx/components/camb/camb_audio_separation.py`:
- Around line 47-72: The async helper _separate_audio_async blocks the event
loop by using open() synchronously and also assumes status.run_id always exists;
change the file read to asynchronous I/O (e.g., use aiofiles to open the file
and pass the async file-like object or its bytes to get_async_client usage)
inside _separate_audio_async (refer to method _separate_audio_async and the
branch that sets kwargs["media_file"]) and add a guard after poll_task to check
that status has a run_id (and return a Data error if missing) before calling
client.audio_separation.get_audio_separation_run_info; ensure you also
defensively access result.foreground_audio_url and result.background_audio_url
(via getattr or conditional) and keep separate_audio calling asyncio.run as-is.
In `@src/lfx/src/lfx/components/camb/camb_transcribe.py`:
- Around line 72-74: The code calls status.run_id without guarding for its
presence after awaiting poll_task (which uses
client.transcription.get_transcription_task_status), risking AttributeError;
update the logic in the task handling flow (around task_id, poll_task, status,
and the call to client.transcription.get_transcription_result) to check whether
status.run_id is present/truthy before calling get_transcription_result, and if
absent return an error Data payload (consistent with CambTranslatedTTSComponent)
that includes a helpful error message and the task_id/status for diagnostics.
In `@src/lfx/src/lfx/components/camb/camb_translated_tts.py`:
- Around line 71-112: The _translated_tts_async method needs network timeout and
exception handling around the httpx request and must save audio using the
detected format instead of forcing "wav"; wrap the httpx.AsyncClient() call (and
the await http.get(...)) in a try/except that catches
httpx.RequestError/HTTPError and returns a Data error on failure, supply an
explicit timeout to the AsyncClient or request, use
detect_audio_format(audio_data) to set fmt and only call add_wav_header when fmt
== "wav" and the header is missing, then call save_audio(audio_data, fmt) and
return Data with the actual format variable instead of the hardcoded "wav"
(references: _translated_tts_async, httpx.AsyncClient, detect_audio_format,
add_wav_header, save_audio).
In `@src/lfx/src/lfx/components/camb/camb_voice_clone.py`:
- Around line 75-100: The file is opened inside the async function
_clone_voice_async which blocks the event loop; move the open(self.audio_file,
"rb") context manager into the synchronous clone_voice() method, open the file
there and pass the file object (or its bytes) into _clone_voice_async as an
argument, then call asyncio.run(self._clone_voice_async(file_handle)) and ensure
the file is closed after asyncio.run returns; keep the existing voice_id
extraction logic (getattr(result, "voice_id", getattr(result, "id", None))) and
do not replace it with a dict-based fallback.
---
Nitpick comments:
In `@src/lfx/src/lfx/components/camb/camb_text_to_sound.py`:
- Around line 57-79: In _generate_sound_async, the duration check uses
truthiness so a valid 0.0 value would be skipped; change the condition that adds
duration to kwargs to use "if self.duration is not None" (referencing the local
variable duration in the _generate_sound_async method) so zero is preserved;
optionally, to avoid high memory use, stream chunks to a temporary file as
they're iterated from client.text_to_audio.get_text_to_audio_result instead of
accumulating them in the chunks list before calling save_audio.
In `@src/lfx/tests/unit/components/camb/test_camb_components.py`:
- Around line 256-259: The fake_tts async function uses a trailing unreachable
yield to make it an async generator but the noqa tag is wrong; update the
suppression on the yield in fake_tts to the correct rule (replace "# noqa:
RET504" with "# noqa: B901") or remove the noqa and use a plain explanatory
comment so Ruff doesn't incorrectly flag return-in-generator; locate the
fake_tts definition and adjust the comment on the yield accordingly.
- Around line 337-376: Add a new unit test in TestTranscribeComponent named
test_transcribe_missing_run_id that mirrors the other tests' patching of
lfx.components.camb.camb_transcribe.get_async_client and .poll_task, return a
status object created with Mock(spec=[]) (so it has no run_id attribute), and
call CambTranscribeComponent().transcribe_audio() (after setting api_key,
language and audio_url) and assert that result.data contains an "error" key;
this covers the AttributeError path from accessing status.run_id in
camb_transcribe.py.
- Around line 87-92: The test test_component_discovery_from_parent asserts a
hardcoded length (len(camb_entries) == 8) which is brittle; update the assertion
in the test to be resilient by either asserting a minimum expected count (e.g.,
assert len(camb_entries) >= 8) or by importing the camb package and comparing
against its export list (e.g., assert len(camb_entries) == len(camb.__all__)).
Locate the test using _discover_components_from_module and _dynamic_imports and
replace the equality check on camb_entries with one of the two suggested
assertions so the test won’t break when components are added.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
src/lfx/src/lfx/components/__init__.pysrc/lfx/src/lfx/components/camb/__init__.pysrc/lfx/src/lfx/components/camb/_helpers.pysrc/lfx/src/lfx/components/camb/camb_audio_separation.pysrc/lfx/src/lfx/components/camb/camb_text_to_sound.pysrc/lfx/src/lfx/components/camb/camb_transcribe.pysrc/lfx/src/lfx/components/camb/camb_translate.pysrc/lfx/src/lfx/components/camb/camb_translated_tts.pysrc/lfx/src/lfx/components/camb/camb_tts.pysrc/lfx/src/lfx/components/camb/camb_voice_clone.pysrc/lfx/src/lfx/components/camb/camb_voice_list.pysrc/lfx/tests/unit/components/camb/__init__.pysrc/lfx/tests/unit/components/camb/test_camb_components.py
| __all__ = [ | ||
| "CambAudioSeparationComponent", | ||
| "CambTextToSoundComponent", | ||
| "CambTranscribeComponent", | ||
| "CambTranslateComponent", | ||
| "CambTranslatedTTSComponent", | ||
| "CambTTSComponent", | ||
| "CambVoiceCloneComponent", | ||
| "CambVoiceListComponent", | ||
| ] |
There was a problem hiding this comment.
CI blocker: sort __all__ to satisfy Ruff RUF022.
This is currently failing the Ruff check, so it’ll block merge.
Proposed fix (sorted __all__)
__all__ = [
"CambAudioSeparationComponent",
+ "CambTTSComponent",
"CambTextToSoundComponent",
"CambTranscribeComponent",
"CambTranslateComponent",
"CambTranslatedTTSComponent",
- "CambTTSComponent",
"CambVoiceCloneComponent",
"CambVoiceListComponent",
]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| __all__ = [ | |
| "CambAudioSeparationComponent", | |
| "CambTextToSoundComponent", | |
| "CambTranscribeComponent", | |
| "CambTranslateComponent", | |
| "CambTranslatedTTSComponent", | |
| "CambTTSComponent", | |
| "CambVoiceCloneComponent", | |
| "CambVoiceListComponent", | |
| ] | |
| __all__ = [ | |
| "CambAudioSeparationComponent", | |
| "CambTTSComponent", | |
| "CambTextToSoundComponent", | |
| "CambTranscribeComponent", | |
| "CambTranslateComponent", | |
| "CambTranslatedTTSComponent", | |
| "CambVoiceCloneComponent", | |
| "CambVoiceListComponent", | |
| ] |
🧰 Tools
🪛 GitHub Actions: Ruff Style Check
[error] 28-28: RUF022 'all' is not sorted.
🪛 GitHub Check: Ruff Style Check (3.13)
[failure] 28-37: Ruff (RUF022)
src/lfx/src/lfx/components/camb/init.py:28:11: RUF022 __all__ is not sorted
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/lfx/src/lfx/components/camb/__init__.py` around lines 28 - 37, The
__all__ list in this module is unsorted and triggers Ruff RUF022; sort the
entries in the __all__ list alphabetically (by string) so the exported names
like "CambAudioSeparationComponent", "CambTextToSoundComponent",
"CambTranscribeComponent", "CambTranslateComponent",
"CambTranslatedTTSComponent", "CambTTSComponent", "CambVoiceCloneComponent", and
"CambVoiceListComponent" appear in sorted order; update the __all__ declaration
accordingly to satisfy the linter.
| async def poll_task(client: Any, get_status_fn: Any, task_id: str, max_attempts: int = 60, interval: float = 2.0) -> Any: | ||
| """Poll a CAMB.AI async task until completion.""" | ||
| for _ in range(max_attempts): | ||
| status = await get_status_fn(task_id) | ||
| if hasattr(status, "status"): | ||
| val = status.status | ||
| if val in ("completed", "SUCCESS", "complete"): | ||
| return status | ||
| if val in ("failed", "FAILED", "error", "ERROR", "TIMEOUT", "PAYMENT_REQUIRED"): | ||
| reason = getattr(status, "exception_reason", "") or getattr(status, "error", "Unknown error") | ||
| raise RuntimeError(f"CAMB.AI task failed: {val}. {reason}") | ||
| await asyncio.sleep(interval) | ||
| raise TimeoutError(f"CAMB.AI task {task_id} did not complete within {max_attempts * interval}s") |
There was a problem hiding this comment.
CI-blocking Ruff failures: unused client parameter + inline exception strings + line too long.
The static analysis run flags six failures that block the CI check:
ARG001(line 11) —clientis passed in but never referenced in the function body; onlyget_status_fnis called. Remove the parameter and update all call sites.E501(line 11) — line length 121 > 120; removingclientalso fixes this.EM102/TRY003(lines 21 and 23) — f-string literals used directly inraisestatements.EM101/TRY003(line 65) — string literal used directly inraise ImportError(...).
🔧 Proposed fix
-async def poll_task(client: Any, get_status_fn: Any, task_id: str, max_attempts: int = 60, interval: float = 2.0) -> Any:
+async def poll_task(get_status_fn: Any, task_id: str, max_attempts: int = 60, interval: float = 2.0) -> Any:
"""Poll a CAMB.AI async task until completion."""
for _ in range(max_attempts):
status = await get_status_fn(task_id)
if hasattr(status, "status"):
val = status.status
if val in ("completed", "SUCCESS", "complete"):
return status
if val in ("failed", "FAILED", "error", "ERROR", "TIMEOUT", "PAYMENT_REQUIRED"):
reason = getattr(status, "exception_reason", "") or getattr(status, "error", "Unknown error")
- raise RuntimeError(f"CAMB.AI task failed: {val}. {reason}")
+ msg = f"CAMB.AI task failed: {val}. {reason}"
+ raise RuntimeError(msg)
await asyncio.sleep(interval)
- raise TimeoutError(f"CAMB.AI task {task_id} did not complete within {max_attempts * interval}s")
+ msg = f"CAMB.AI task {task_id} did not complete within {max_attempts * interval}s"
+ raise TimeoutError(msg) except ImportError as e:
- raise ImportError("The 'camb' package is required. Install it with: pip install camb") from e
+ msg = "The 'camb' package is required. Install it with: pip install camb"
+ raise ImportError(msg) from eAfter removing client from poll_task, update every call site (e.g. camb_transcribe.py line 73, and similar lines in the other components) to drop the client argument.
🧰 Tools
🪛 GitHub Check: Ruff Style Check (3.13)
[failure] 23-23: Ruff (EM102)
src/lfx/src/lfx/components/camb/_helpers.py:23:24: EM102 Exception must not use an f-string literal, assign to variable first
[failure] 23-23: Ruff (TRY003)
src/lfx/src/lfx/components/camb/_helpers.py:23:11: TRY003 Avoid specifying long messages outside the exception class
[failure] 21-21: Ruff (EM102)
src/lfx/src/lfx/components/camb/_helpers.py:21:36: EM102 Exception must not use an f-string literal, assign to variable first
[failure] 21-21: Ruff (TRY003)
src/lfx/src/lfx/components/camb/_helpers.py:21:23: TRY003 Avoid specifying long messages outside the exception class
[failure] 11-11: Ruff (E501)
src/lfx/src/lfx/components/camb/_helpers.py:11:121: E501 Line too long (121 > 120)
[failure] 11-11: Ruff (ARG001)
src/lfx/src/lfx/components/camb/_helpers.py:11:21: ARG001 Unused function argument: client
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/lfx/src/lfx/components/camb/_helpers.py` around lines 11 - 23, Remove the
unused client parameter from the function signature of poll_task and update
every call site that passes a client (e.g., where poll_task(...) is invoked in
components like camb_transcribe) to drop that argument; inside poll_task keep
using get_status_fn(task_id) and shorten long lines accordingly. Replace direct
f-string messages inside raise statements by first assigning the message to a
local variable (e.g., msg = f"...") and then raise the exception with that
variable (raise RuntimeError(msg) and raise TimeoutError(msg)) to satisfy the
TRY003/EM102 checks. Ensure you reference the poll_task function name when
making edits so all callers are updated.
| def save_audio(data: bytes, extension: str = "wav") -> str: | ||
| """Save audio data to a temporary file and return the file path.""" | ||
| with tempfile.NamedTemporaryFile(suffix=f".{extension}", delete=False) as f: | ||
| f.write(data) | ||
| return f.name |
There was a problem hiding this comment.
Temp files from save_audio are never automatically reclaimed.
delete=False is necessary to return a usable path, but if the caller raises an exception before deleting the file, the temp file is leaked. Consider documenting the cleanup contract in the docstring, or — for callers that don't need persistence — use a context-manager wrapper.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/lfx/src/lfx/components/camb/_helpers.py` around lines 53 - 57, The temp
file created by save_audio uses tempfile.NamedTemporaryFile(..., delete=False)
and is never auto-reclaimed; update save_audio's contract: either (A) document
in save_audio's docstring that callers are responsible for removing the returned
path and show recommended cleanup (e.g., os.remove), or (B) provide an
alternative context-manager API (e.g., save_audio_tempfile or
make_save_audio_context) that yields the temp-file path and ensures automatic
deletion on exit; refer to the save_audio function and its use of
tempfile.NamedTemporaryFile when implementing or documenting this change.
| def separate_audio(self) -> Data: | ||
| return asyncio.run(self._separate_audio_async()) | ||
|
|
||
| async def _separate_audio_async(self) -> Data: | ||
| client = get_async_client(self.api_key) | ||
| kwargs: dict[str, Any] = {} | ||
|
|
||
| if self.audio_file: | ||
| with open(self.audio_file, "rb") as f: | ||
| kwargs["media_file"] = f | ||
| task = await client.audio_separation.create_audio_separation(**kwargs) | ||
| elif self.audio_url: | ||
| kwargs["media_url"] = self.audio_url | ||
| task = await client.audio_separation.create_audio_separation(**kwargs) | ||
| else: | ||
| return Data(data={"error": "Provide either an audio file or audio URL"}) | ||
|
|
||
| status = await poll_task(client, client.audio_separation.get_audio_separation_status, task.task_id) | ||
| result = await client.audio_separation.get_audio_separation_run_info(status.run_id) | ||
|
|
||
| output = { | ||
| "foreground_audio_url": getattr(result, "foreground_audio_url", None), | ||
| "background_audio_url": getattr(result, "background_audio_url", None), | ||
| } | ||
| self.status = "Audio separation complete" | ||
| return Data(data=output) |
There was a problem hiding this comment.
Fix Ruff ASYNC230 (blocking open() in async) and guard missing run_id.
Ruff flags Line 55 because _separate_audio_async() opens files via blocking I/O. Also, Line 65 assumes status.run_id always exists.
Proposed fix
class CambAudioSeparationComponent(Component):
@@
outputs = [
Output(display_name="Separation Result", name="separation_output", method="separate_audio"),
]
def separate_audio(self) -> Data:
- return asyncio.run(self._separate_audio_async())
+ # Open files outside the event loop (Ruff ASYNC230) and keep them open
+ # for the duration of the async call.
+ if self.audio_file:
+ with open(self.audio_file, "rb") as f:
+ return asyncio.run(self._separate_audio_async(media_file=f))
+ return asyncio.run(self._separate_audio_async(media_url=self.audio_url))
- async def _separate_audio_async(self) -> Data:
+ async def _separate_audio_async(self, media_file: Any | None = None, media_url: str | None = None) -> Data:
client = get_async_client(self.api_key)
- kwargs: dict[str, Any] = {}
-
- if self.audio_file:
- with open(self.audio_file, "rb") as f:
- kwargs["media_file"] = f
- task = await client.audio_separation.create_audio_separation(**kwargs)
- elif self.audio_url:
- kwargs["media_url"] = self.audio_url
- task = await client.audio_separation.create_audio_separation(**kwargs)
+ if media_file is not None:
+ task = await client.audio_separation.create_audio_separation(media_file=media_file)
+ elif media_url:
+ task = await client.audio_separation.create_audio_separation(media_url=media_url)
else:
return Data(data={"error": "Provide either an audio file or audio URL"})
status = await poll_task(client, client.audio_separation.get_audio_separation_status, task.task_id)
- result = await client.audio_separation.get_audio_separation_run_info(status.run_id)
+ run_id = getattr(status, "run_id", None)
+ if run_id is None:
+ return Data(data={"error": "Audio separation task completed but no run_id was returned."})
+ result = await client.audio_separation.get_audio_separation_run_info(run_id)🧰 Tools
🪛 GitHub Check: Ruff Style Check (3.13)
[failure] 55-55: Ruff (ASYNC230)
src/lfx/src/lfx/components/camb/camb_audio_separation.py:55:18: ASYNC230 Async functions should not open files with blocking methods like open
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/lfx/src/lfx/components/camb/camb_audio_separation.py` around lines 47 -
72, The async helper _separate_audio_async blocks the event loop by using open()
synchronously and also assumes status.run_id always exists; change the file read
to asynchronous I/O (e.g., use aiofiles to open the file and pass the async
file-like object or its bytes to get_async_client usage) inside
_separate_audio_async (refer to method _separate_audio_async and the branch that
sets kwargs["media_file"]) and add a guard after poll_task to check that status
has a run_id (and return a Data error if missing) before calling
client.audio_separation.get_audio_separation_run_info; ensure you also
defensively access result.foreground_audio_url and result.background_audio_url
(via getattr or conditional) and keep separate_audio calling asyncio.run as-is.
| task_id = result.task_id | ||
| status = await poll_task(client, client.transcription.get_transcription_task_status, task_id) | ||
| transcription = await client.transcription.get_transcription_result(status.run_id) |
There was a problem hiding this comment.
Missing run_id guard may raise AttributeError.
status.run_id is accessed unconditionally, but the status returned by poll_task is not guaranteed to carry that attribute. Other components in this PR (e.g. CambTranslatedTTSComponent) explicitly guard against this and return an error Data payload when run_id is absent — this component should be consistent.
🛡️ Proposed fix
task_id = result.task_id
status = await poll_task(client, client.transcription.get_transcription_task_status, task_id)
+if not hasattr(status, "run_id"):
+ return Data(data={"error": "Transcription task completed but returned no run_id"})
transcription = await client.transcription.get_transcription_result(status.run_id)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/lfx/src/lfx/components/camb/camb_transcribe.py` around lines 72 - 74, The
code calls status.run_id without guarding for its presence after awaiting
poll_task (which uses client.transcription.get_transcription_task_status),
risking AttributeError; update the logic in the task handling flow (around
task_id, poll_task, status, and the call to
client.transcription.get_transcription_result) to check whether status.run_id is
present/truthy before calling get_transcription_result, and if absent return an
error Data payload (consistent with CambTranslatedTTSComponent) that includes a
helpful error message and the task_id/status for diagnostics.
| async def _translated_tts_async(self) -> Data: | ||
| import httpx | ||
|
|
||
| client = get_async_client(self.api_key) | ||
|
|
||
| kwargs: dict[str, Any] = { | ||
| "text": self.text, | ||
| "voice_id": self.voice_id, | ||
| "source_language": self.source_language, | ||
| "target_language": self.target_language, | ||
| } | ||
| if self.formality: | ||
| kwargs["formality"] = self.formality | ||
|
|
||
| result = await client.translated_tts.create_translated_tts(**kwargs) | ||
| status = await poll_task(client, client.translated_tts.get_translated_tts_task_status, result.task_id) | ||
|
|
||
| run_id = getattr(status, "run_id", None) | ||
| if run_id is None: | ||
| return Data(data={"error": "Translated TTS task completed but no run_id was returned."}) | ||
|
|
||
| # The CAMB SDK doesn't expose a streaming endpoint for translated TTS | ||
| # results, so we fetch the audio directly from the REST API. | ||
| url = f"https://client.camb.ai/apis/tts-result/{run_id}" | ||
| async with httpx.AsyncClient() as http: | ||
| resp = await http.get(url, headers={"x-api-key": self.api_key}) | ||
| if resp.status_code != 200: | ||
| return Data( | ||
| data={"error": f"Failed to fetch TTS audio: HTTP {resp.status_code}"} | ||
| ) | ||
| audio_data = resp.content | ||
|
|
||
| if not audio_data: | ||
| return Data(data={"error": "TTS result returned empty audio data."}) | ||
|
|
||
| fmt = detect_audio_format(audio_data) | ||
| if fmt == "wav" and audio_data[:4] != b"RIFF": | ||
| audio_data = add_wav_header(audio_data) | ||
|
|
||
| file_path = save_audio(audio_data, "wav") | ||
| self.status = f"Audio saved to {file_path}" | ||
| return Data(data={"file_path": file_path, "format": "wav"}) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
CAMB.AI translated TTS REST API endpoint audio format tts-result
💡 Result:
For CAMB.AI Translated TTS, once your task reaches SUCCESS and you get a run_id (via GET /translated-tts/{task_id}), you retrieve the audio using:
- GET
/tts-result/{run_id}(docs.camb.ai)
Audio format (tts-result)
- Default response (
output_type=raw_bytes) streams FLAC audio bytes with response Content-Type:audio/flac. (docs.camb.ai)
If you want a downloadable URL instead of bytes
- Call
GET /tts-result/{run_id}?output_type=file_urlto receive JSON containing a temporaryoutput_url(instead of raw audio bytes). (docs.camb.ai)
Citations:
- 1: https://docs.camb.ai/api-reference/endpoint/poll-translated-tts-result?utm_source=openai
- 2: https://docs.camb.ai/api-reference/endpoint/get-tts-run-info?utm_source=openai
- 3: https://docs.camb.ai/api-reference/endpoint/get-tts-run-info?utm_source=openai
🏁 Script executed:
# First, let's locate and examine the related utility functions
fd -type f -name "*.py" | xargs rg -l "detect_audio_format|save_audio" | head -20Repository: langflow-ai/langflow
Length of output: 507
🏁 Script executed:
# Also check for httpx usage patterns and timeout conventions in the codebase
rg "httpx\." -A 3 -B 1 --max-count=15Repository: langflow-ai/langflow
Length of output: 50377
🏁 Script executed:
# Let's examine the camb_translated_tts.py file more completely
cat -n src/lfx/src/lfx/components/camb/camb_translated_tts.pyRepository: langflow-ai/langflow
Length of output: 4610
🏁 Script executed:
# Check the _helpers.py file for detect_audio_format and save_audio implementations
cat -n src/lfx/src/lfx/components/camb/_helpers.pyRepository: langflow-ai/langflow
Length of output: 3174
🏁 Script executed:
# Check timeout patterns in other CAMB components
rg "httpx\|timeout" src/lfx/src/lfx/components/camb/ -A 3 -B 1Repository: langflow-ai/langflow
Length of output: 46
🏁 Script executed:
# Check test file for this component
cat -n src/lfx/tests/unit/components/camb/test_camb_components.pyRepository: langflow-ai/langflow
Length of output: 33648
Add HTTP timeout/exception handling and save audio using the detected format instead of forcing WAV.
- Line 95:
httpx.AsyncClient()lacks an explicit timeout and exception handling. Without a timeout, network delays can cause indefinite hangs. Without exception handling,httpx.HTTPErrorand other network errors will propagate unhandled. - Lines 104–110: The code correctly detects audio format (FLAC, MP3, OGG, etc.) via
detect_audio_format(), but then ignores the result and forcessave_audio(audio_data, "wav")with a hardcoded format and returns"format": "wav". Per CAMB.AI API documentation, the/tts-result/{run_id}endpoint returns FLAC audio by default, not WAV. Saving FLAC bytes with a.wavextension produces a mislabeled file that will fail playback.
Suggested changes
async def _translated_tts_async(self) -> Data:
import httpx
@@
# The CAMB SDK doesn't expose a streaming endpoint for translated TTS
# results, so we fetch the audio directly from the REST API.
url = f"https://client.camb.ai/apis/tts-result/{run_id}"
- async with httpx.AsyncClient() as http:
- resp = await http.get(url, headers={"x-api-key": self.api_key})
- if resp.status_code != 200:
- return Data(
- data={"error": f"Failed to fetch TTS audio: HTTP {resp.status_code}"}
- )
- audio_data = resp.content
+ try:
+ async with httpx.AsyncClient(timeout=60.0) as http:
+ resp = await http.get(url, headers={"x-api-key": self.api_key})
+ resp.raise_for_status()
+ audio_data = resp.content
+ except httpx.HTTPError as e:
+ return Data(data={"error": f"Failed to fetch TTS audio: {e}"})
@@
fmt = detect_audio_format(audio_data)
if fmt == "wav" and audio_data[:4] != b"RIFF":
audio_data = add_wav_header(audio_data)
- file_path = save_audio(audio_data, "wav")
+ file_path = save_audio(audio_data, fmt)
self.status = f"Audio saved to {file_path}"
- return Data(data={"file_path": file_path, "format": "wav"})
+ return Data(data={"file_path": file_path, "format": fmt})🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/lfx/src/lfx/components/camb/camb_translated_tts.py` around lines 71 -
112, The _translated_tts_async method needs network timeout and exception
handling around the httpx request and must save audio using the detected format
instead of forcing "wav"; wrap the httpx.AsyncClient() call (and the await
http.get(...)) in a try/except that catches httpx.RequestError/HTTPError and
returns a Data error on failure, supply an explicit timeout to the AsyncClient
or request, use detect_audio_format(audio_data) to set fmt and only call
add_wav_header when fmt == "wav" and the header is missing, then call
save_audio(audio_data, fmt) and return Data with the actual format variable
instead of the hardcoded "wav" (references: _translated_tts_async,
httpx.AsyncClient, detect_audio_format, add_wav_header, save_audio).
| def clone_voice(self) -> Data: | ||
| return asyncio.run(self._clone_voice_async()) | ||
|
|
||
| async def _clone_voice_async(self) -> Data: | ||
| client = get_async_client(self.api_key) | ||
| gender_code = self._GENDER_MAP.get(self.gender, 0) | ||
|
|
||
| with open(self.audio_file, "rb") as f: | ||
| kwargs: dict[str, Any] = { | ||
| "voice_name": self.voice_name, | ||
| "gender": gender_code, | ||
| "file": f, | ||
| } | ||
| if self.description: | ||
| kwargs["description"] = self.description | ||
| if self.age: | ||
| kwargs["age"] = self.age | ||
| if self.language: | ||
| kwargs["language"] = self.language | ||
| result = await client.voice_cloning.create_custom_voice(**kwargs) | ||
|
|
||
| voice_id = getattr(result, "voice_id", getattr(result, "id", None)) | ||
| if voice_id is None: | ||
| return Data(data={"error": "Voice clone succeeded but no voice_id was returned"}) | ||
| self.status = f"Voice cloned: {self.voice_name} (ID: {voice_id})" | ||
| return Data(data={"voice_id": voice_id, "voice_name": self.voice_name, "status": "created"}) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
CAMB.AI voice cloning Python SDK create_custom_voice response format
💡 Result:
In the CAMB.AI Python SDK, client.voice_cloning.create_custom_voice(...) returns a CreateCustomVoiceOut object (a Pydantic model) with this shape:
{ "voice_id": 123 }So you can read the cloned voice’s ID as:
result.voice_id(preferred; it’s a model field)- or, if you convert to dict/JSON first, you’ll see the same
{"voice_id": ...}payload.
Sources: CAMB.AI “Create a Custom Voice” endpoint example response ({"voice_id": 123}) [1]; SDK type definition CreateCustomVoiceOut(voice_id: int) and create_custom_voice(...) -> CreateCustomVoiceOut [2].
🏁 Script executed:
# Search for CAMB SDK imports and usage patterns in the codebase
fd -e py | xargs rg "camb|voice_cloning|create_custom_voice" -A 3 -B 1 | head -100Repository: langflow-ai/langflow
Length of output: 7030
🏁 Script executed:
# Check for the CAMB client import and initialization
rg "get_async_client|AsyncCambAI" -B 2 -A 5Repository: langflow-ai/langflow
Length of output: 24666
🏁 Script executed:
# Look for camb-related files and structure
fd camb -t f | head -20Repository: langflow-ai/langflow
Length of output: 536
🏁 Script executed:
# Check the voice clone test to understand the expected response behavior
cat -n src/lfx/tests/unit/components/camb/test_camb_components.py | grep -A 20 "class TestVoiceCloneComponent"Repository: langflow-ai/langflow
Length of output: 982
🏁 Script executed:
# Look at the full test for voice cloning
sed -n '/class TestVoiceCloneComponent/,/class Test[A-Z]/p' src/lfx/tests/unit/components/camb/test_camb_components.py | head -60Repository: langflow-ai/langflow
Length of output: 1864
🏁 Script executed:
# Check the actual camb_voice_clone.py file to see the full implementation
cat -n src/lfx/src/lfx/components/camb/camb_voice_clone.pyRepository: langflow-ai/langflow
Length of output: 4116
🏁 Script executed:
# Check if blocking file I/O in async is flagged by ruff elsewhere
rg "ASYNC230|async.*open\(" -B 2 -A 2 | head -40Repository: langflow-ai/langflow
Length of output: 3000
Move file open outside async context to avoid blocking I/O.
Line 82: Using open() inside an async def function blocks the event loop. Move file opening to clone_voice() and pass the file handle to _clone_voice_async().
Regarding voice_id extraction: The CAMB.AI SDK returns a CreateCustomVoiceOut Pydantic model object, not a dict. The current getattr(result, "voice_id", ...) approach correctly handles this typed response, so no dict-safe fallback is needed. The existing logic is correct.
Proposed fix for blocking I/O
def clone_voice(self) -> Data:
- return asyncio.run(self._clone_voice_async())
+ with open(self.audio_file, "rb") as f:
+ return asyncio.run(self._clone_voice_async(f))
- async def _clone_voice_async(self) -> Data:
+ async def _clone_voice_async(self, audio_fp: Any) -> Data:
client = get_async_client(self.api_key)
gender_code = self._GENDER_MAP.get(self.gender, 0)
- with open(self.audio_file, "rb") as f:
- kwargs: dict[str, Any] = {
- "voice_name": self.voice_name,
- "gender": gender_code,
- "file": f,
- }
- if self.description:
- kwargs["description"] = self.description
- if self.age:
- kwargs["age"] = self.age
- if self.language:
- kwargs["language"] = self.language
- result = await client.voice_cloning.create_custom_voice(**kwargs)
+ kwargs: dict[str, Any] = {
+ "voice_name": self.voice_name,
+ "gender": gender_code,
+ "file": audio_fp,
+ }
+ if self.description:
+ kwargs["description"] = self.description
+ if self.age:
+ kwargs["age"] = self.age
+ if self.language:
+ kwargs["language"] = self.language
+ result = await client.voice_cloning.create_custom_voice(**kwargs)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def clone_voice(self) -> Data: | |
| return asyncio.run(self._clone_voice_async()) | |
| async def _clone_voice_async(self) -> Data: | |
| client = get_async_client(self.api_key) | |
| gender_code = self._GENDER_MAP.get(self.gender, 0) | |
| with open(self.audio_file, "rb") as f: | |
| kwargs: dict[str, Any] = { | |
| "voice_name": self.voice_name, | |
| "gender": gender_code, | |
| "file": f, | |
| } | |
| if self.description: | |
| kwargs["description"] = self.description | |
| if self.age: | |
| kwargs["age"] = self.age | |
| if self.language: | |
| kwargs["language"] = self.language | |
| result = await client.voice_cloning.create_custom_voice(**kwargs) | |
| voice_id = getattr(result, "voice_id", getattr(result, "id", None)) | |
| if voice_id is None: | |
| return Data(data={"error": "Voice clone succeeded but no voice_id was returned"}) | |
| self.status = f"Voice cloned: {self.voice_name} (ID: {voice_id})" | |
| return Data(data={"voice_id": voice_id, "voice_name": self.voice_name, "status": "created"}) | |
| def clone_voice(self) -> Data: | |
| with open(self.audio_file, "rb") as f: | |
| return asyncio.run(self._clone_voice_async(f)) | |
| async def _clone_voice_async(self, audio_fp: Any) -> Data: | |
| client = get_async_client(self.api_key) | |
| gender_code = self._GENDER_MAP.get(self.gender, 0) | |
| kwargs: dict[str, Any] = { | |
| "voice_name": self.voice_name, | |
| "gender": gender_code, | |
| "file": audio_fp, | |
| } | |
| if self.description: | |
| kwargs["description"] = self.description | |
| if self.age: | |
| kwargs["age"] = self.age | |
| if self.language: | |
| kwargs["language"] = self.language | |
| result = await client.voice_cloning.create_custom_voice(**kwargs) | |
| voice_id = getattr(result, "voice_id", getattr(result, "id", None)) | |
| if voice_id is None: | |
| return Data(data={"error": "Voice clone succeeded but no voice_id was returned"}) | |
| self.status = f"Voice cloned: {self.voice_name} (ID: {voice_id})" | |
| return Data(data={"voice_id": voice_id, "voice_name": self.voice_name, "status": "created"}) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/lfx/src/lfx/components/camb/camb_voice_clone.py` around lines 75 - 100,
The file is opened inside the async function _clone_voice_async which blocks the
event loop; move the open(self.audio_file, "rb") context manager into the
synchronous clone_voice() method, open the file there and pass the file object
(or its bytes) into _clone_voice_async as an argument, then call
asyncio.run(self._clone_voice_async(file_handle)) and ensure the file is closed
after asyncio.run returns; keep the existing voice_id extraction logic
(getattr(result, "voice_id", getattr(result, "id", None))) and do not replace it
with a dict-based fallback.
Summary
_helpers.pywith async task polling, audio format detection, WAV header construction, and temp file managementcomponents/__init__.pyComponents
CambTranslateComponentCambTTSComponentCambTranscribeComponentCambTranslatedTTSComponentCambVoiceCloneComponentCambVoiceListComponentCambTextToSoundComponentCambAudioSeparationComponentSummary by CodeRabbit
New Features
Tests