|
7 | 7 | import threading
|
8 | 8 | from typing import Any, Dict, List, Optional, Union
|
9 | 9 |
|
| 10 | +import numpy as np |
10 | 11 | import websockets
|
11 | 12 | from asgiref.sync import sync_to_async
|
12 | 13 | from pydantic import BaseModel, ConfigDict
|
| 14 | +from pyrnnoise import RNNoise |
13 | 15 |
|
14 | 16 | from llmstack.apps.types.agent import AgentConfigSchema
|
15 | 17 | from llmstack.apps.types.voice_agent import VoiceAgentConfigSchema
|
@@ -143,6 +145,7 @@ def __init__(self, output_queue: asyncio.Queue, config: AgentControllerConfig):
|
143 | 145 | self._input_metadata = {}
|
144 | 146 | self._output_audio_stream = None
|
145 | 147 | self._output_transcript_stream = None
|
| 148 | + self._rnnoise = RNNoise(sample_rate=24000) |
146 | 149 |
|
147 | 150 | self._input_messages_queue = queue.Queue()
|
148 | 151 | self._loop = asyncio.new_event_loop()
|
@@ -270,10 +273,33 @@ async def _process_input_audio_stream(self):
|
270 | 273 | await self._send_websocket_message({"type": "response.create"})
|
271 | 274 | break
|
272 | 275 |
|
| 276 | + # Convert bytes to numpy array and normalize to float32 |
| 277 | + try: |
| 278 | + audio_data = np.frombuffer(chunk, dtype=np.int16) |
| 279 | + # Convert int16 to float32 and normalize to [-1, 1] |
| 280 | + audio_data = audio_data.astype(np.float32) / 32768.0 |
| 281 | + frame_iterator = self._rnnoise.process_chunk(audio_data) |
| 282 | + except Exception as e: |
| 283 | + logger.exception(f"Error processing chunk with rnnoise: {e}") |
| 284 | + frame_iterator = [] |
| 285 | + |
| 286 | + # Rebuild the chunk from the denoised frames |
| 287 | + denoised_chunk = b"" |
| 288 | + try: |
| 289 | + for _, denoised_frame in frame_iterator: |
| 290 | + # Convert float32 [-1, 1] back to int16 range and then to bytes |
| 291 | + int16_data = (denoised_frame * 32768.0).astype(np.int16) |
| 292 | + denoised_chunk += int16_data.tobytes() |
| 293 | + except Exception as e: |
| 294 | + logger.exception(f"Error joining denoised frames: {e}") |
| 295 | + |
| 296 | + logger.debug(f"Denoised chunk size to original chunk size: {len(denoised_chunk)} vs {len(chunk)}") |
| 297 | + |
273 | 298 | # Base64 encode and send
|
274 |
| - await self._send_websocket_message( |
275 |
| - {"type": "input_audio_buffer.append", "audio": base64.b64encode(chunk).decode("utf-8")} |
276 |
| - ) |
| 299 | + if len(denoised_chunk) > 0: |
| 300 | + await self._send_websocket_message( |
| 301 | + {"type": "input_audio_buffer.append", "audio": base64.b64encode(denoised_chunk).decode("utf-8")} |
| 302 | + ) |
277 | 303 |
|
278 | 304 | async def _process_input_text_stream(self):
|
279 | 305 | if self._input_text_stream:
|
|
0 commit comments