Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions examples/listen/14-transcription-live-websocket-v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""
Example: Live Transcription with WebSocket V2 (Listen V2)

This example shows how to use Listen V2 for advanced conversational speech recognition
with contextual turn detection.
"""

import os
import threading
import time
from pathlib import Path
from typing import Union

from dotenv import load_dotenv

load_dotenv()

from deepgram import DeepgramClient
from deepgram.core.events import EventType
from deepgram.extensions.types.sockets import (
ListenV2ConnectedEvent,
ListenV2ControlMessage,
ListenV2FatalErrorEvent,
ListenV2TurnInfoEvent,
)

ListenV2SocketClientResponse = Union[ListenV2ConnectedEvent, ListenV2TurnInfoEvent, ListenV2FatalErrorEvent]

client = DeepgramClient(api_key=os.environ.get("DEEPGRAM_API_KEY"))

try:
with client.listen.v2.connect(
model="flux-general-en",
encoding="linear16",
sample_rate="16000",
) as connection:

def on_message(message: ListenV2SocketClientResponse) -> None:
msg_type = getattr(message, "type", type(message).__name__)
print(f"Received {msg_type} event ({type(message).__name__})")

# Extract transcription from TurnInfo events
if isinstance(message, ListenV2TurnInfoEvent):
print(f" transcript: {message.transcript}")
print(f" event: {message.event}")
print(f" turn_index: {message.turn_index}")

connection.on(EventType.OPEN, lambda _: print("Connection opened"))
connection.on(EventType.MESSAGE, on_message)
connection.on(EventType.CLOSE, lambda _: print("Connection closed"))
connection.on(EventType.ERROR, lambda error: print(f"Error: {type(error).__name__}: {error}"))

# Send audio in a background thread so start_listening can process responses
def send_audio():
audio_path = Path(__file__).parent.parent / "fixtures" / "audio.wav"
with open(audio_path, "rb") as f:
audio = f.read()

# Send in chunks
chunk_size = 4096
for i in range(0, len(audio), chunk_size):
connection.send_media(audio[i : i + chunk_size])
time.sleep(0.01) # pace the sending

# Signal end of audio
time.sleep(2)
connection.send_control(ListenV2ControlMessage(type="CloseStream"))

sender = threading.Thread(target=send_audio, daemon=True)
sender.start()

# This blocks until the connection closes
connection.start_listening()

except Exception as e:
print(f"Error: {type(e).__name__}: {e}")
6 changes: 3 additions & 3 deletions src/deepgram/agent/v1/socket_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import websockets
import websockets.sync.connection as websockets_sync_connection
from ...core.events import EventEmitterMixin, EventType
from ...core.pydantic_utilities import parse_obj_as
from ...core.unchecked_base_model import construct_type

try:
from websockets.legacy.client import WebSocketClientProtocol # type: ignore
Expand Down Expand Up @@ -84,7 +84,7 @@ def _handle_binary_message(self, message: bytes) -> typing.Any:
def _handle_json_message(self, message: str) -> typing.Any:
"""Handle a JSON message by parsing it."""
json_data = json.loads(message)
return parse_obj_as(V1SocketClientResponse, json_data) # type: ignore
return construct_type(type_=V1SocketClientResponse, object_=json_data) # type: ignore

def _process_message(self, raw_message: typing.Any) -> typing.Tuple[typing.Any, bool]:
"""Process a raw message, detecting if it's binary or JSON."""
Expand Down Expand Up @@ -199,7 +199,7 @@ def _handle_binary_message(self, message: bytes) -> typing.Any:
def _handle_json_message(self, message: str) -> typing.Any:
"""Handle a JSON message by parsing it."""
json_data = json.loads(message)
return parse_obj_as(V1SocketClientResponse, json_data) # type: ignore
return construct_type(type_=V1SocketClientResponse, object_=json_data) # type: ignore

def _process_message(self, raw_message: typing.Any) -> typing.Tuple[typing.Any, bool]:
"""Process a raw message, detecting if it's binary or JSON."""
Expand Down
7 changes: 7 additions & 0 deletions src/deepgram/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .remove_none_from_dict import remove_none_from_dict
from .request_options import RequestOptions
from .serialization import FieldMetadata, convert_and_respect_annotation_metadata
from .unchecked_base_model import UncheckedBaseModel, UnionMetadata, construct_type
_dynamic_imports: typing.Dict[str, str] = {
"ApiError": ".api_error",
"AsyncClientWrapper": ".client_wrapper",
Expand All @@ -44,6 +45,9 @@
"SyncClientWrapper": ".client_wrapper",
"UniversalBaseModel": ".pydantic_utilities",
"UniversalRootModel": ".pydantic_utilities",
"UncheckedBaseModel": ".unchecked_base_model",
"UnionMetadata": ".unchecked_base_model",
"construct_type": ".unchecked_base_model",
"convert_and_respect_annotation_metadata": ".serialization",
"convert_file_dict_to_httpx_tuples": ".file",
"encode_query": ".query_encoder",
Expand Down Expand Up @@ -94,8 +98,11 @@ def __dir__():
"IS_PYDANTIC_V2",
"RequestOptions",
"SyncClientWrapper",
"UncheckedBaseModel",
"UnionMetadata",
"UniversalBaseModel",
"UniversalRootModel",
"construct_type",
"convert_and_respect_annotation_metadata",
"convert_file_dict_to_httpx_tuples",
"encode_query",
Expand Down
Loading