voice-ai-engine-development

构建实时对话式AI语音引擎,采用异步工作流管道、流式转录、大语言模型代理、支持中断处理与多服务商集成的文本转语音合成技术。

查看详情
name:voice-ai-engine-developmentdescription:"Build real-time conversational AI voice engines using async worker pipelines, streaming transcription, LLM agents, and TTS synthesis with interrupt handling and multi-provider support"

Voice AI Engine Development

Overview

This skill guides you through building production-ready voice AI engines with real-time conversation capabilities. Voice AI engines enable natural, bidirectional conversations between users and AI agents through streaming audio processing, speech-to-text transcription, LLM-powered responses, and text-to-speech synthesis.

The core architecture uses an async queue-based worker pipeline where each component runs independently and communicates via asyncio.Queue objects, enabling concurrent processing, interrupt handling, and real-time streaming at every stage.

When to Use This Skill

Use this skill when:

  • Building real-time voice conversation systems

  • Implementing voice assistants or chatbots

  • Creating voice-enabled customer service agents

  • Developing voice AI applications with interrupt capabilities

  • Integrating multiple transcription, LLM, or TTS providers

  • Working with streaming audio processing pipelines

  • The user mentions Vocode, voice engines, or conversational AI
  • Core Architecture Principles

    The Worker Pipeline Pattern

    Every voice AI engine follows this pipeline:

    Audio In → Transcriber → Agent → Synthesizer → Audio Out
    (Worker 1) (Worker 2) (Worker 3)

    Key Benefits:

  • Decoupling: Workers only know about their input/output queues

  • Concurrency: All workers run simultaneously via asyncio

  • Backpressure: Queues automatically handle rate differences

  • Interruptibility: Everything can be stopped mid-stream
  • Base Worker Pattern

    Every worker follows this pattern:

    class BaseWorker:
    def __init__(self, input_queue, output_queue):
    self.input_queue = input_queue # asyncio.Queue to consume from
    self.output_queue = output_queue # asyncio.Queue to produce to
    self.active = False

    def start(self):
    """Start the worker's processing loop"""
    self.active = True
    asyncio.create_task(self._run_loop())

    async def _run_loop(self):
    """Main processing loop - runs forever until terminated"""
    while self.active:
    item = await self.input_queue.get() # Block until item arrives
    await self.process(item) # Process the item

    async def process(self, item):
    """Override this - does the actual work"""
    raise NotImplementedError

    def terminate(self):
    """Stop the worker"""
    self.active = False

    Component Implementation Guide

    1. Transcriber (Audio → Text)

    Purpose: Converts incoming audio chunks to text transcriptions

    Interface Requirements:

    class BaseTranscriber:
    def __init__(self, transcriber_config):
    self.input_queue = asyncio.Queue() # Audio chunks (bytes)
    self.output_queue = asyncio.Queue() # Transcriptions
    self.is_muted = False

    def send_audio(self, chunk: bytes):
    """Client calls this to send audio"""
    if not self.is_muted:
    self.input_queue.put_nowait(chunk)
    else:
    # Send silence instead (prevents echo during bot speech)
    self.input_queue.put_nowait(self.create_silent_chunk(len(chunk)))

    def mute(self):
    """Called when bot starts speaking (prevents echo)"""
    self.is_muted = True

    def unmute(self):
    """Called when bot stops speaking"""
    self.is_muted = False

    Output Format:

    class Transcription:
    message: str # "Hello, how are you?"
    confidence: float # 0.95
    is_final: bool # True = complete sentence, False = partial
    is_interrupt: bool # Set by TranscriptionsWorker

    Supported Providers:

  • Deepgram - Fast, accurate, streaming

  • AssemblyAI - High accuracy, good for accents

  • Azure Speech - Enterprise-grade

  • Google Cloud Speech - Multi-language support
  • Critical Implementation Details:

  • Use WebSocket for bidirectional streaming

  • Run sender and receiver tasks concurrently with asyncio.gather()

  • Mute transcriber when bot speaks to prevent echo/feedback loops

  • Handle both final and partial transcriptions
  • 2. Agent (Text → Response)

    Purpose: Processes user input and generates conversational responses

    Interface Requirements:

    class BaseAgent:
    def __init__(self, agent_config):
    self.input_queue = asyncio.Queue() # TranscriptionAgentInput
    self.output_queue = asyncio.Queue() # AgentResponse
    self.transcript = None # Conversation history

    async def generate_response(self, human_input, is_interrupt, conversation_id):
    """Override this - returns AsyncGenerator of responses"""
    raise NotImplementedError

    Why Streaming Responses?

  • Lower latency: Start speaking as soon as first sentence is ready

  • Better interrupts: Can stop mid-response

  • Sentence-by-sentence: More natural conversation flow
  • Supported Providers:

  • OpenAI (GPT-4, GPT-3.5) - High quality, fast

  • Google Gemini - Multimodal, cost-effective

  • Anthropic Claude - Long context, nuanced responses
  • Critical Implementation Details:

  • Maintain conversation history in Transcript object

  • Stream responses using AsyncGenerator

  • IMPORTANT: Buffer entire LLM response before yielding to synthesizer (prevents audio jumping)

  • Handle interrupts by canceling current generation task

  • Update conversation history with partial messages on interrupt
  • 3. Synthesizer (Text → Audio)

    Purpose: Converts agent text responses to speech audio

    Interface Requirements:

    class BaseSynthesizer:
    async def create_speech(self, message: BaseMessage, chunk_size: int) -> SynthesisResult:
    """
    Returns a SynthesisResult containing:
    - chunk_generator: AsyncGenerator that yields audio chunks
    - get_message_up_to: Function to get partial text (for interrupts)
    """
    raise NotImplementedError

    SynthesisResult Structure:

    class SynthesisResult:
    chunk_generator: AsyncGenerator[ChunkResult, None]
    get_message_up_to: Callable[[float], str] # seconds → partial text

    class ChunkResult:
    chunk: bytes # Raw PCM audio
    is_last_chunk: bool

    Supported Providers:

  • ElevenLabs - Most natural voices, streaming

  • Azure TTS - Enterprise-grade, many languages

  • Google Cloud TTS - Cost-effective, good quality

  • Amazon Polly - AWS integration

  • Play.ht - Voice cloning
  • Critical Implementation Details:

  • Stream audio chunks as they're generated

  • Convert audio to LINEAR16 PCM format (16kHz sample rate)

  • Implement get_message_up_to() for interrupt handling

  • Handle audio format conversion (MP3 → PCM)
  • 4. Output Device (Audio → Client)

    Purpose: Sends synthesized audio back to the client

    CRITICAL: Rate Limiting for Interrupts

    async def send_speech_to_output(self, message, synthesis_result,
    stop_event, seconds_per_chunk):
    chunk_idx = 0
    async for chunk_result in synthesis_result.chunk_generator:
    # Check for interrupt
    if stop_event.is_set():
    logger.debug(f"Interrupted after {chunk_idx} chunks")
    message_sent = synthesis_result.get_message_up_to(
    chunk_idx seconds_per_chunk
    )
    return message_sent, True # cut_off = True

    start_time = time.time()

    # Send chunk to output device
    self.output_device.consume_nonblocking(chunk_result.chunk)

    # CRITICAL: Wait for chunk to play before sending next one
    # This is what makes interrupts work!
    speech_length = seconds_per_chunk
    processing_time = time.time() - start_time
    await asyncio.sleep(max(speech_length - processing_time, 0))

    chunk_idx += 1

    return message, False # cut_off = False

    Why Rate Limiting?
    Without rate limiting, all audio chunks would be sent immediately, which would:

  • Buffer entire message on client side

  • Make interrupts impossible (all audio already sent)

  • Cause timing issues
  • By sending one chunk every N seconds:

  • Real-time playback is maintained

  • Interrupts can stop mid-sentence

  • Natural conversation flow is preserved
  • The Interrupt System

    The interrupt system is critical for natural conversations.

    How Interrupts Work

    Scenario: Bot is saying "I think the weather will be nice today and tomorrow and—" when user interrupts with "Stop".

    Step 1: User starts speaking

    # TranscriptionsWorker detects new transcription while bot speaking
    async def process(self, transcription):
    if not self.conversation.is_human_speaking: # Bot was speaking!
    # Broadcast interrupt to all in-flight events
    interrupted = self.conversation.broadcast_interrupt()
    transcription.is_interrupt = interrupted

    Step 2: broadcast_interrupt() stops everything

    def broadcast_interrupt(self):
    num_interrupts = 0
    # Interrupt all queued events
    while True:
    try:
    interruptible_event = self.interruptible_events.get_nowait()
    if interruptible_event.interrupt(): # Sets interruption_event
    num_interrupts += 1
    except queue.Empty:
    break

    # Cancel current tasks
    self.agent.cancel_current_task() # Stop generating text
    self.agent_responses_worker.cancel_current_task() # Stop synthesizing
    return num_interrupts > 0

    Step 3: SynthesisResultsWorker detects interrupt

    async def send_speech_to_output(self, synthesis_result, stop_event, ...):
    async for chunk_result in synthesis_result.chunk_generator:
    # Check stop_event (this is the interruption_event)
    if stop_event.is_set():
    logger.debug("Interrupted! Stopping speech.")
    # Calculate what was actually spoken
    seconds_spoken = chunk_idx
    seconds_per_chunk
    partial_message = synthesis_result.get_message_up_to(seconds_spoken)
    # e.g., "I think the weather will be nice today"
    return partial_message, True # cut_off = True

    Step 4: Agent updates history

    if cut_off:
    # Update conversation history with partial message
    self.agent.update_last_bot_message_on_cut_off(message_sent)
    # History now shows:
    # Bot: "I think the weather will be nice today" (incomplete)

    InterruptibleEvent Pattern

    Every event in the pipeline is wrapped in an InterruptibleEvent:

    class InterruptibleEvent:
    def __init__(self, payload, is_interruptible=True):
    self.payload = payload
    self.is_interruptible = is_interruptible
    self.interruption_event = threading.Event() # Initially not set
    self.interrupted = False

    def interrupt(self) -> bool:
    """Interrupt this event"""
    if not self.is_interruptible:
    return False
    if not self.interrupted:
    self.interruption_event.set() # Signal to stop!
    self.interrupted = True
    return True
    return False

    def is_interrupted(self) -> bool:
    return self.interruption_event.is_set()

    Multi-Provider Factory Pattern

    Support multiple providers with a factory pattern:

    class VoiceHandler:
    """Multi-provider factory for voice components"""

    def create_transcriber(self, agent_config: Dict):
    """Create transcriber based on transcriberProvider"""
    provider = agent_config.get("transcriberProvider", "deepgram")

    if provider == "deepgram":
    return self._create_deepgram_transcriber(agent_config)
    elif provider == "assemblyai":
    return self._create_assemblyai_transcriber(agent_config)
    elif provider == "azure":
    return self._create_azure_transcriber(agent_config)
    elif provider == "google":
    return self._create_google_transcriber(agent_config)
    else:
    raise ValueError(f"Unknown transcriber provider: {provider}")

    def create_agent(self, agent_config: Dict):
    """Create LLM agent based on llmProvider"""
    provider = agent_config.get("llmProvider", "openai")

    if provider == "openai":
    return self._create_openai_agent(agent_config)
    elif provider == "gemini":
    return self._create_gemini_agent(agent_config)
    else:
    raise ValueError(f"Unknown LLM provider: {provider}")

    def create_synthesizer(self, agent_config: Dict):
    """Create voice synthesizer based on voiceProvider"""
    provider = agent_config.get("voiceProvider", "elevenlabs")

    if provider == "elevenlabs":
    return self._create_elevenlabs_synthesizer(agent_config)
    elif provider == "azure":
    return self._create_azure_synthesizer(agent_config)
    elif provider == "google":
    return self._create_google_synthesizer(agent_config)
    elif provider == "polly":
    return self._create_polly_synthesizer(agent_config)
    elif provider == "playht":
    return self._create_playht_synthesizer(agent_config)
    else:
    raise ValueError(f"Unknown voice provider: {provider}")

    WebSocket Integration

    Voice AI engines typically use WebSocket for bidirectional audio streaming:

    @app.websocket("/conversation")
    async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    # Create voice components
    voice_handler = VoiceHandler()
    transcriber = voice_handler.create_transcriber(agent_config)
    agent = voice_handler.create_agent(agent_config)
    synthesizer = voice_handler.create_synthesizer(agent_config)

    # Create output device
    output_device = WebsocketOutputDevice(
    ws=websocket,
    sampling_rate=16000,
    audio_encoding=AudioEncoding.LINEAR16
    )

    # Create conversation orchestrator
    conversation = StreamingConversation(
    output_device=output_device,
    transcriber=transcriber,
    agent=agent,
    synthesizer=synthesizer
    )

    # Start all workers
    await conversation.start()

    try:
    # Receive audio from client
    async for message in websocket.iter_bytes():
    conversation.receive_audio(message)
    except WebSocketDisconnect:
    logger.info("Client disconnected")
    finally:
    await conversation.terminate()

    Common Pitfalls and Solutions

    1. Audio Jumping/Cutting Off

    Problem: Bot's audio jumps or cuts off mid-response.

    Cause: Sending text to synthesizer in small chunks causes multiple TTS calls.

    Solution: Buffer the entire LLM response before sending to synthesizer:

    # ❌ Bad: Yields sentence-by-sentence
    async for sentence in llm_stream:
    yield GeneratedResponse(message=BaseMessage(text=sentence))

    ✅ Good: Buffer entire response


    full_response = ""
    async for chunk in llm_stream:
    full_response += chunk
    yield GeneratedResponse(message=BaseMessage(text=full_response))

    2. Echo/Feedback Loop

    Problem: Bot hears itself speaking and responds to its own audio.

    Cause: Transcriber not muted during bot speech.

    Solution: Mute transcriber when bot starts speaking:

    # Before sending audio to output
    self.transcriber.mute()

    After audio playback complete


    self.transcriber.unmute()

    3. Interrupts Not Working

    Problem: User can't interrupt bot mid-sentence.

    Cause: All audio chunks sent at once instead of rate-limited.

    Solution: Rate-limit audio chunks to match real-time playback:

    async for chunk in synthesis_result.chunk_generator:
    start_time = time.time()

    # Send chunk
    output_device.consume_nonblocking(chunk)

    # Wait for chunk duration before sending next
    processing_time = time.time() - start_time
    await asyncio.sleep(max(seconds_per_chunk - processing_time, 0))

    4. Memory Leaks from Unclosed Streams

    Problem: Memory usage grows over time.

    Cause: WebSocket connections or API streams not properly closed.

    Solution: Always use context managers and cleanup:

    try:
    async with websockets.connect(url) as ws:
    # Use websocket
    pass
    finally:
    # Cleanup
    await conversation.terminate()
    await transcriber.terminate()

    Production Considerations

    1. Error Handling

    async def _run_loop(self):
    while self.active:
    try:
    item = await self.input_queue.get()
    await self.process(item)
    except Exception as e:
    logger.error(f"Worker error: {e}", exc_info=True)
    # Don't crash the worker, continue processing

    2. Graceful Shutdown

    async def terminate(self):
    """Gracefully shut down all workers"""
    self.active = False

    # Stop all workers
    self.transcriber.terminate()
    self.agent.terminate()
    self.synthesizer.terminate()

    # Wait for queues to drain
    await asyncio.sleep(0.5)

    # Close connections
    if self.websocket:
    await self.websocket.close()

    3. Monitoring and Logging

    # Log key events
    logger.info(f"🎤 [TRANSCRIBER] Received: '{transcription.message}'")
    logger.info(f"🤖 [AGENT] Generating response...")
    logger.info(f"🔊 [SYNTHESIZER] Synthesizing {len(text)} characters")
    logger.info(f"⚠️ [INTERRUPT] User interrupted bot")

    Track metrics


    metrics.increment("transcriptions.count")
    metrics.timing("agent.response_time", duration)
    metrics.gauge("active_conversations", count)

    4. Rate Limiting and Quotas

    # Implement rate limiting for API calls
    from aiolimiter import AsyncLimiter

    rate_limiter = AsyncLimiter(max_rate=10, time_period=1) # 10 calls/second

    async def call_api(self, data):
    async with rate_limiter:
    return await self.client.post(data)

    Key Design Patterns

    1. Producer-Consumer with Queues

    # Producer
    async def producer(queue):
    while True:
    item = await generate_item()
    queue.put_nowait(item)

    Consumer


    async def consumer(queue):
    while True:
    item = await queue.get()
    await process_item(item)

    2. Streaming Generators

    Instead of returning complete results:

    # ❌ Bad: Wait for entire response
    async def generate_response(prompt):
    response = await openai.complete(prompt) # 5 seconds
    return response

    ✅ Good: Stream chunks as they arrive


    async def generate_response(prompt):
    async for chunk in openai.complete(prompt, stream=True):
    yield chunk # Yield after 0.1s, 0.2s, etc.

    3. Conversation State Management

    Maintain conversation history for context:

    class Transcript:
    event_logs: List[Message] = []

    def add_human_message(self, text):
    self.event_logs.append(Message(sender=Sender.HUMAN, text=text))

    def add_bot_message(self, text):
    self.event_logs.append(Message(sender=Sender.BOT, text=text))

    def to_openai_messages(self):
    return [
    {"role": "user" if msg.sender == Sender.HUMAN else "assistant",
    "content": msg.text}
    for msg in self.event_logs
    ]

    Testing Strategies

    1. Unit Test Workers in Isolation

    async def test_transcriber():
    transcriber = DeepgramTranscriber(config)

    # Mock audio input
    audio_chunk = b'\x00\x01\x02...'
    transcriber.send_audio(audio_chunk)

    # Check output
    transcription = await transcriber.output_queue.get()
    assert transcription.message == "expected text"

    2. Integration Test Pipeline

    async def test_full_pipeline():
    # Create all components
    conversation = create_test_conversation()

    # Send test audio
    conversation.receive_audio(test_audio_chunk)

    # Wait for response
    response = await wait_for_audio_output(timeout=5)

    assert response is not None

    3. Test Interrupts

    async def test_interrupt():
    conversation = create_test_conversation()

    # Start bot speaking
    await conversation.agent.generate_response("Tell me a long story")

    # Interrupt mid-response
    await asyncio.sleep(1) # Let it speak for 1 second
    conversation.broadcast_interrupt()

    # Verify partial message in transcript
    last_message = conversation.transcript.event_logs[-1]
    assert last_message.text != full_expected_message

    Implementation Workflow

    When implementing a voice AI engine:

  • Start with Base Workers: Implement the base worker pattern first

  • Add Transcriber: Choose a provider and implement streaming transcription

  • Add Agent: Implement LLM integration with streaming responses

  • Add Synthesizer: Implement TTS with audio streaming

  • Connect Pipeline: Wire all workers together with queues

  • Add Interrupts: Implement the interrupt system

  • Add WebSocket: Create WebSocket endpoint for client communication

  • Test Components: Unit test each worker in isolation

  • Test Integration: Test the full pipeline end-to-end

  • Add Error Handling: Implement robust error handling and logging

  • Optimize: Add rate limiting, monitoring, and performance optimizations
  • Related Skills

  • @websocket-patterns - For WebSocket implementation details

  • @async-python - For asyncio and async patterns

  • @streaming-apis - For streaming API integration

  • @audio-processing - For audio format conversion and processing

  • @systematic-debugging - For debugging complex async pipelines
  • Resources

    Libraries:

  • asyncio - Async programming

  • websockets - WebSocket client/server

  • FastAPI - WebSocket server framework

  • pydub - Audio manipulation

  • numpy - Audio data processing
  • API Providers:

  • Transcription: Deepgram, AssemblyAI, Azure Speech, Google Cloud Speech

  • LLM: OpenAI, Google Gemini, Anthropic Claude

  • TTS: ElevenLabs, Azure TTS, Google Cloud TTS, Amazon Polly, Play.ht
  • Summary

    Building a voice AI engine requires:

  • ✅ Async worker pipeline for concurrent processing

  • ✅ Queue-based communication between components

  • ✅ Streaming at every stage (transcription, LLM, synthesis)

  • ✅ Interrupt system for natural conversations

  • ✅ Rate limiting for real-time audio playback

  • ✅ Multi-provider support for flexibility

  • ✅ Proper error handling and graceful shutdown
  • The key insight: Everything must stream and everything must be interruptible for natural, real-time conversations.

      voice-ai-engine-development - Agent Skills