语音技术正在改变我们与机器互动的方式,使与AI的对话感觉比以往任何时候都更加自然。随着 GPT-4o 实时API的公开测试版发布,开发人员现在可以使用这些工具在应用程序中创建低延迟、多模态的语音体验,从而为创新开辟了无限的可能性。

过去构建语音机器人需要将多个模型拼接在一起,用于语音识别、推理和文本转语音等操作。而现在借助实时API,开发者只需通过一个API调用即可完成整个过程,从而实现流畅、自然的语音对话。这对于客户支持、教育和实时语言翻译等行业来说是一个重大变革,因为在这些行业中,快速、无缝的交互至关重要。

本博客将指导您从头开始使用 GPT-4o 实时模型构建第一个实时语音机器人。我们将介绍实时API的关键功能,如何设置用于语音流的 WebSocket 连接,以及如何利用API处理中断和调用函数的能力。完成后,您将准备好创建一个语音机器人,它可以以接近人类的准确性和情感响应用户。无论您是新手还是有经验的开发人员,这都将帮助您开始创建具有响应性和吸引力的沉浸式语音交互。

关键特性

  • 低延迟流媒体:实现实时音频输入和输出,促进自然流畅的对话。
  • 多模态支持:既能处理文本输入和输出,也能处理音频输入和输出,从而实现灵活多样的交互模式。
  • 预设声音:支持六种预设声音,确保回复的质量和一致性。
  • 函数调用:允许语音助手根据上下文动态执行操作或获取特定信息。
  • 安全与隐私:采用多层安全保护措施,包括自动化监控和遵守隐私政策。

GPT-4o 实时API是如何工作的?

传统上,构建语音助手需要将多个模型串联起来:一个用于转录音频的自动语音识别(ASR)模型,如Whisper;一个用于处理响应的基于文本的模型;以及一个用于生成音频输出的文本到语音(TTS)模型。这种多步骤过程往往会导致延迟和情感细微差别的丢失。

GPT-4o 实时API通过将这些功能整合到一个API调用中,彻底改变了这一现状。通过建立持久的 WebSocket 连接,开发人员可以直接流式传输音频输入和输出,大大降低了延迟,增强了对话的自然性。此外,API的函数调用功能还允许语音机器人在需要时执行诸如下订单或获取客户信息等操作。

构建实时语音机器人

在开始之前,请确保已经具备以下条件:

  • Azure 订阅:免费创建一个。
  • Azure OpenAI资源:在支持的区域(美东2区或瑞典中部)中设置。
  • 开发环境:熟悉 Python 语言以及基本的异步编程。
  • 客户端库:如LiveKit、Agora或Twilio等工具可以增强机器人的功能。

设置API

  1. 部署GPT-4o实时模型:

    • 导航至 Azure AI Studio。
    • 访问模型目录并搜索“gpt-4o-realtime-preview”。
    • 通过选择 Azure OpenAI 资源并配置部署设置来部署模型。
  2. 配置音频输入和输出:

    • 该API支持多种音频格式,主要为PCM16。
    • 设置客户端以处理音频流,确保与API的要求兼容。

本示例展示了如何使用 Azure OpenAI 构建一个高级的实时对话式 AI 系统。通过利用 WebSocket 连接和事件驱动架构,该系统可以在任何语言下提供响应迅速且语境敏感的客户支持服务。这种方法可以适用于多种语言和场景,因此对于希望提升客户服务能力的企业来说,它是一种灵活的解决方案。该项目包含三个主要组件:

  • 实时API:处理与Azure OpenAI的实时API的WebSocket连接。
  • 工具:定义了各种客户支持功能,如查询订单状态、处理退货等。
  • 应用:管理交互流程并与UI层集成实时客户端。

环境设置

创建一个名为.env的文件,并更新以下环境变量:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
AZURE_OPENAI_API_KEY=XXXX
# replace with your Azure OpenAI API Key

AZURE_OPENAI_ENDPOINT=https://xxxx.openai.azure.com/
# replace with your Azure OpenAI Endpoint

AZURE_OPENAI_DEPLOYMENT=gpt-4o-realtime-preview
#Create a deployment for the gpt-4o-realtime-preview model and place the 
#deployment name here. You can name the deployment as per your choice and 
#put the name here.

AZURE_OPENAI_CHAT_DEPLOYMENT_VERSION=2024-10-01-preview
#You don't need to change this unless you are willing to try other versions.

创建一个名为requirements.txt的文件,并添加以下内容:

1
2
3
4
5
6
7
chainlit==1.3.0rc1
openai
beautifulsoup4
lxml
python-dotenv
websockets
aiohttp

实现实时客户端

你的语音机器人的心跳是实时客户端,它管理WebSocket连接并处理与GPT-4o实时API的通信。RealtimeAPI类负责管理与OpenAI实时API的WebSocket连接。它负责发送和接收消息、分发事件,并维护连接状态。主要包括:

  • 连接管理:建立并维护WebSocket连接。
  • 事件分发:采用事件驱动架构来处理输入和输出消息。
  • 音频处理:使用实用工具函数将Base64编码的音频输入转换为数组缓冲区,反之亦然。高效管理音频流,确保最小的延迟和高质量的语音交互。

关键组件

实时API类
  • 建立并维护WebSocket连接。
  • 负责发送和接收消息。
  • 负责处理各种对话事件的分发。

参考: init.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class RealtimeAPI(RealtimeEventHandler):
    def __init__(self):
        super().__init__()
        self.default_url = 'wss://api.openai.com/v1/realtime'
        self.url = os.environ["AZURE_OPENAI_ENDPOINT"]
        self.api_key = os.environ["AZURE_OPENAI_API_KEY"]
        self.api_version = "2024-10-01-preview"
        self.azure_deployment = os.environ["AZURE_OPENAI_DEPLOYMENT"]
        self.ws = None

    def is_connected(self):
        return self.ws is not None

    def log(self, *args):
        logger.debug(f"[Websocket/{datetime.utcnow().isoformat()}]", *args)

    async def connect(self, model='gpt-4o-realtime-preview'):
        if self.is_connected():
            raise Exception("Already connected")
        self.ws = await websockets.connect(f"{self.url}/openai/realtime
        ?api-version={self.api_version}&deployment={model}&api-key={self.api_key}", 
        extra_headers={
            'Authorization': f'Bearer {self.api_key}',
            'OpenAI-Beta': 'realtime=v1'
        })
        self.log(f"Connected to {self.url}")
        asyncio.create_task(self._receive_messages())

    async def _receive_messages(self):
        async for message in self.ws:
            event = json.loads(message)
            if event['type'] == "error":
                logger.error("ERROR", message)
            self.log("received:", event)
            self.dispatch(f"server.{event['type']}", event)
            self.dispatch("server.*", event)

    async def send(self, event_name, data=None):
        if not self.is_connected():
            raise Exception("RealtimeAPI is not connected")
        data = data or {}
        if not isinstance(data, dict):
            raise Exception("data must be a dictionary")
        event = {
            "event_id": self._generate_id("evt_"),
            "type": event_name,
            **data
        }
        self.dispatch(f"client.{event_name}", event)
        self.dispatch("client.*", event)
        self.log("sent:", event)
        await self.ws.send(json.dumps(event))

    def _generate_id(self, prefix):
        return f"{prefix}{int(datetime.utcnow().timestamp() * 1000)}"

    async def disconnect(self):
        if self.ws:
            await self.ws.close()
            self.ws = None
            self.log(f"Disconnected from {self.url}")
实时对话类
  • 管理对话的状态。
  • 处理各种类型的事件,例如消息创建、转录完成和音频流。
  • 负责管理音频和文本数据的队列,以实现无缝交互。
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class RealtimeConversation:
    default_frequency = config.features.audio.sample_rate
    
    EventProcessors = {
        'conversation.item.created': 
        lambda self, event: self._process_item_created(event),
        'conversation.item.truncated': 
        lambda self, event: self._process_item_truncated(event),
        'conversation.item.deleted': 
        lambda self, event: self._process_item_deleted(event),
        'conversation.item.input_audio_transcription.completed': 
        lambda self, event: self._process_input_audio_transcription_completed(event),
        'input_audio_buffer.speech_started': 
        lambda self, event: self._process_speech_started(event),
        'input_audio_buffer.speech_stopped': 
        lambda self, event, input_audio_buffer: 
            self._process_speech_stopped(event, input_audio_buffer),
        'response.created': 
        lambda self, event: self._process_response_created(event),
        'response.output_item.added': 
        lambda self, event: self._process_output_item_added(event),
        'response.output_item.done': 
        lambda self, event: self._process_output_item_done(event),
        'response.content_part.added': 
        lambda self, event: self._process_content_part_added(event),
        'response.audio_transcript.delta': 
        lambda self, event: self._process_audio_transcript_delta(event),
        'response.audio.delta': 
        lambda self, event: self._process_audio_delta(event),
        'response.text.delta': 
        lambda self, event: self._process_text_delta(event),
        'response.function_call_arguments.delta': 
        lambda self, event: self._process_function_call_arguments_delta(event),
    }
    
    def __init__(self):
        self.clear()

    def clear(self):
        self.item_lookup = {}
        self.items = []
        self.response_lookup = {}
        self.responses = []
        self.queued_speech_items = {}
        self.queued_transcript_items = {}
        self.queued_input_audio = None

    def queue_input_audio(self, input_audio):
        self.queued_input_audio = input_audio

    def process_event(self, event, *args):
        event_processor = self.EventProcessors.get(event['type'])
        if not event_processor:
            raise Exception(f"Missing conversation event processor for {event['type']}")
        return event_processor(self, event, *args)

    def get_item(self, id):
        return self.item_lookup.get(id)

    def get_items(self):
        return self.items[:]

    def _process_item_created(self, event):
        item = event['item']
        new_item = item.copy()
        if new_item['id'] not in self.item_lookup:
            self.item_lookup[new_item['id']] = new_item
            self.items.append(new_item)
        new_item['formatted'] = {
            'audio': [],
            'text': '',
            'transcript': ''
        }
        if new_item['id'] in self.queued_speech_items:
            new_item['formatted']['audio'] = self.queued_speech_items[new_item['id']]['audio']
            del self.queued_speech_items[new_item['id']]
        if 'content' in new_item:
            text_content = [c for c in new_item['content'] if c['type'] in ['text', 'input_text']]
            for content in text_content:
                new_item['formatted']['text'] += content['text']
        if new_item['id'] in self.queued_transcript_items:
            new_item['formatted']['transcript'] = self.queued_transcript_items[new_item['id']]['transcript']
            del self.queued_transcript_items[new_item['id']]
        if new_item['type'] == 'message':
            if new_item['role'] == 'user':
                new_item['status'] = 'completed'
                if self.queued_input_audio:
                    new_item['formatted']['audio'] = self.queued_input_audio
                    self.queued_input_audio = None
            else:
                new_item['status'] = 'in_progress'
        elif new_item['type'] == 'function_call':
            new_item['formatted']['tool'] = {
                'type': 'function',
                'name': new_item['name'],
                'call_id': new_item['call_id'],
                'arguments': ''
            }
            new_item['status'] = 'in_progress'
        elif new_item['type'] == 'function_call_output':
            new_item['status'] = 'completed'
            new_item['formatted']['output'] = new_item['output']
        return new_item, None

    def _process_item_truncated(self, event):
        item_id = event['item_id']
        audio_end_ms = event['audio_end_ms']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'item.truncated: Item "{item_id}" not found')
        end_index = (audio_end_ms * self.default_frequency) // 1000
        item['formatted']['transcript'] = ''
        item['formatted']['audio'] = item['formatted']['audio'][:end_index]
        return item, None

    def _process_item_deleted(self, event):
        item_id = event['item_id']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'item.deleted: Item "{item_id}" not found')
        del self.item_lookup[item['id']]
        self.items.remove(item)
        return item, None

    def _process_input_audio_transcription_completed(self, event):
        item_id = event['item_id']
        content_index = event['content_index']
        transcript = event['transcript']
        formatted_transcript = transcript or ' '
        item = self.item_lookup.get(item_id)
        if not item:
            self.queued_transcript_items[item_id] = {'transcript': formatted_transcript}
            return None, None
        item['content'][content_index]['transcript'] = transcript
        item['formatted']['transcript'] = formatted_transcript
        return item, {'transcript': transcript}

    def _process_speech_started(self, event):
        item_id = event['item_id']
        audio_start_ms = event['audio_start_ms']
        self.queued_speech_items[item_id] = {'audio_start_ms': audio_start_ms}
        return None, None

    def _process_speech_stopped(self, event, input_audio_buffer):
        item_id = event['item_id']
        audio_end_ms = event['audio_end_ms']
        speech = self.queued_speech_items[item_id]
        speech['audio_end_ms'] = audio_end_ms
        if input_audio_buffer:
            start_index = (speech['audio_start_ms'] * self.default_frequency) // 1000
            end_index = (speech['audio_end_ms'] * self.default_frequency) // 1000
            speech['audio'] = input_audio_buffer[start_index:end_index]
        return None, None

    def _process_response_created(self, event):
        response = event['response']
        if response['id'] not in self.response_lookup:
            self.response_lookup[response['id']] = response
            self.responses.append(response)
        return None, None

    def _process_output_item_added(self, event):
        response_id = event['response_id']
        item = event['item']
        response = self.response_lookup.get(response_id)
        if not response:
            raise Exception(f'response.output_item.added: Response "{response_id}" not found')
        response['output'].append(item['id'])
        return None, None

    def _process_output_item_done(self, event):
        item = event['item']
        if not item:
            raise Exception('response.output_item.done: Missing "item"')
        found_item = self.item_lookup.get(item['id'])
        if not found_item:
            raise Exception(f'response.output_item.done: Item "{item["id"]}" not found')
        found_item['status'] = item['status']
        return found_item, None

    def _process_content_part_added(self, event):
        item_id = event['item_id']
        part = event['part']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'response.content_part.added: Item "{item_id}" not found')
        item['content'].append(part)
        return item, None

    def _process_audio_transcript_delta(self, event):
        item_id = event['item_id']
        content_index = event['content_index']
        delta = event['delta']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'response.audio_transcript.delta: Item "{item_id}" not found')
        item['content'][content_index]['transcript'] += delta
        item['formatted']['transcript'] += delta
        return item, {'transcript': delta}

    def _process_audio_delta(self, event):
        item_id = event['item_id']
        content_index = event['content_index']
        delta = event['delta']
        item = self.item_lookup.get(item_id)
        if not item:
            logger.debug(f'response.audio.delta: Item "{item_id}" not found')
            return None, None
        array_buffer = base64_to_array_buffer(delta)
        append_values = array_buffer.tobytes()
        # TODO: make it work
        # item['formatted']['audio'] = merge_int16_arrays(item['formatted']['audio'], append_values)
        return item, {'audio': append_values}

    def _process_text_delta(self, event):
        item_id = event['item_id']
        content_index = event['content_index']
        delta = event['delta']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'response.text.delta: Item "{item_id}" not found')
        item['content'][content_index]['text'] += delta
        item['formatted']['text'] += delta
        return item, {'text': delta}

    def _process_function_call_arguments_delta(self, event):
        item_id = event['item_id']
        delta = event['delta']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'response.function_call_arguments.delta: Item "{item_id}" not found')
        item['arguments'] += delta
        item['formatted']['tool']['arguments'] += delta
        return item, {'arguments': delta}
实时客户端类
  • 初始化:设置系统提示、会话配置,并初始化 RealtimeAPI 和 RealtimeConversation,用于管理 WebSocket 连接和会话事件。
  • 连接管理:负责与服务器建立和断开连接、等待会话创建以及更新会话设置。
  • 事件处理:监听并处理服务器和客户端事件,并将它们分发给相应的处理程序。
  • 会话管理:管理会话项的创建、更新和删除,包括处理输入音频和语音事件。
  • 工具与响应管理:支持添加/删除工具、根据事件调用工具、向用户发送消息、创建响应以及管理音频内容。
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
class RealtimeClient(RealtimeEventHandler):
    def __init__(self, system_prompt: str):
        super().__init__()
        self.system_prompt = system_prompt
        self.default_session_config = {
            "modalities": ["text", "audio"],
            "instructions": self.system_prompt,
            "voice": "shimmer",
            "input_audio_format": "pcm16",
            "output_audio_format": "pcm16",
            "input_audio_transcription": { "model": 'whisper-1' },
            "turn_detection": { "type": 'server_vad' },
            "tools": [],
            "tool_choice": "auto",
            "temperature": 0.8,
            "max_response_output_tokens": 4096,
        }
        self.session_config = {}
        self.transcription_models = [{"model": "whisper-1"}]
        self.default_server_vad_config = {
            "type": "server_vad",
            "threshold": 0.5,
            "prefix_padding_ms": 300,
            "silence_duration_ms": 200,
        }
        self.realtime = RealtimeAPI()
        self.conversation = RealtimeConversation()
        self._reset_config()
        self._add_api_event_handlers()
        
    def _reset_config(self):
        self.session_created = False
        self.tools = {}
        self.session_config = self.default_session_config.copy()
        self.input_audio_buffer = bytearray()
        return True

    def _add_api_event_handlers(self):
        self.realtime.on("client.*", self._log_event)
        self.realtime.on("server.*", self._log_event)
        self.realtime.on("server.session.created", self._on_session_created)
        self.realtime.on("server.response.created", self._process_event)
        self.realtime.on("server.response.output_item.added", self._process_event)
        self.realtime.on("server.response.content_part.added", self._process_event)
        self.realtime.on("server.input_audio_buffer.speech_started", self._on_speech_started)
        self.realtime.on("server.input_audio_buffer.speech_stopped", self._on_speech_stopped)
        self.realtime.on("server.conversation.item.created", self._on_item_created)
        self.realtime.on("server.conversation.item.truncated", self._process_event)
        self.realtime.on("server.conversation.item.deleted", self._process_event)
        self.realtime.on("server.conversation.item.input_audio_transcription.completed", self._process_event)
        self.realtime.on("server.response.audio_transcript.delta", self._process_event)
        self.realtime.on("server.response.audio.delta", self._process_event)
        self.realtime.on("server.response.text.delta", self._process_event)
        self.realtime.on("server.response.function_call_arguments.delta", self._process_event)
        self.realtime.on("server.response.output_item.done", self._on_output_item_done)

    def _log_event(self, event):
        realtime_event = {
            "time": datetime.utcnow().isoformat(),
            "source": "client" if event["type"].startswith("client.") else "server",
            "event": event,
        }
        self.dispatch("realtime.event", realtime_event)

    def _on_session_created(self, event):
        self.session_created = True

    def _process_event(self, event, *args):
        item, delta = self.conversation.process_event(event, *args)
        if item:
            self.dispatch("conversation.updated", {"item": item, "delta": delta})
        return item, delta

    def _on_speech_started(self, event):
        self._process_event(event)
        self.dispatch("conversation.interrupted", event)

    def _on_speech_stopped(self, event):
        self._process_event(event, self.input_audio_buffer)

    def _on_item_created(self, event):
        item, delta = self._process_event(event)
        self.dispatch("conversation.item.appended", {"item": item})
        if item and item["status"] == "completed":
            self.dispatch("conversation.item.completed", {"item": item})

    async def _on_output_item_done(self, event):
        item, delta = self._process_event(event)
        if item and item["status"] == "completed":
            self.dispatch("conversation.item.completed", {"item": item})
        if item and item.get("formatted", {}).get("tool"):
            await self._call_tool(item["formatted"]["tool"])

    async def _call_tool(self, tool):
        try:
            print(tool["arguments"])
            json_arguments = json.loads(tool["arguments"])
            tool_config = self.tools.get(tool["name"])
            if not tool_config:
                raise Exception(f'Tool "{tool["name"]}" has not been added')
            result = await tool_config["handler"](**json_arguments)
            await self.realtime.send("conversation.item.create", {
                "item": {
                    "type": "function_call_output",
                    "call_id": tool["call_id"],
                    "output": json.dumps(result),
                }
            })
        except Exception as e:
            logger.error(traceback.format_exc())
            await self.realtime.send("conversation.item.create", {
                "item": {
                    "type": "function_call_output",
                    "call_id": tool["call_id"],
                    "output": json.dumps({"error": str(e)}),
                }
            })
        await self.create_response()

    def is_connected(self):
        return self.realtime.is_connected()

    def reset(self):
        self.disconnect()
        self.realtime.clear_event_handlers()
        self._reset_config()
        self._add_api_event_handlers()
        return True

    async def connect(self):
        if self.is_connected():
            raise Exception("Already connected, use .disconnect() first")
        await self.realtime.connect()
        await self.update_session()
        return True

    async def wait_for_session_created(self):
        if not self.is_connected():
            raise Exception("Not connected, use .connect() first")
        while not self.session_created:
            await asyncio.sleep(0.001)
        return True

    async def disconnect(self):
        self.session_created = False
        self.conversation.clear()
        if self.realtime.is_connected():
            await self.realtime.disconnect()

    def get_turn_detection_type(self):
        return self.session_config.get("turn_detection", {}).get("type")

    async def add_tool(self, definition, handler):
        if not definition.get("name"):
            raise Exception("Missing tool name in definition")
        name = definition["name"]
        if name in self.tools:
            raise Exception(f'Tool "{name}" already added. Please use .removeTool("{name}") before trying to add again.')
        if not callable(handler):
            raise Exception(f'Tool "{name}" handler must be a function')
        self.tools[name] = {"definition": definition, "handler": handler}
        await self.update_session()
        return self.tools[name]

    def remove_tool(self, name):
        if name not in self.tools:
            raise Exception(f'Tool "{name}" does not exist, can not be removed.')
        del self.tools[name]
        return True

    async def delete_item(self, id):
        await self.realtime.send("conversation.item.delete", {"item_id": id})
        return True

    async def update_session(self, **kwargs):
        self.session_config.update(kwargs)
        use_tools = [
            {**tool_definition, "type": "function"}
            for tool_definition in self.session_config.get("tools", [])
        ] + [
            {**self.tools[key]["definition"], "type": "function"}
            for key in self.tools
        ]
        session = {**self.session_config, "tools": use_tools}
        if self.realtime.is_connected():
            await self.realtime.send("session.update", {"session": session})
        return True
    
    async def create_conversation_item(self, item):
        await self.realtime.send("conversation.item.create", {
            "item": item
        })

    async def send_user_message_content(self, content=[]):
        if content:
            for c in content:
                if c["type"] == "input_audio":
                    if isinstance(c["audio"], (bytes, bytearray)):
                        c["audio"] = array_buffer_to_base64(c["audio"])
            await self.realtime.send("conversation.item.create", {
                "item": {
                    "type": "message",
                    "role": "user",
                    "content": content,
                }
            })
        await self.create_response()
        return True

    async def append_input_audio(self, array_buffer):
        if len(array_buffer) > 0:
            await self.realtime.send("input_audio_buffer.append", {
                "audio": array_buffer_to_base64(np.array(array_buffer)),
            })
            self.input_audio_buffer.extend(array_buffer)
        return True

    async def create_response(self):
        if self.get_turn_detection_type() is None and len(self.input_audio_buffer) > 0:
            await self.realtime.send("input_audio_buffer.commit")
            self.conversation.queue_input_audio(self.input_audio_buffer)
            self.input_audio_buffer = bytearray()
        await self.realtime.send("response.create")
        return True

    async def cancel_response(self, id=None, sample_count=0):
        if not id:
            await self.realtime.send("response.cancel")
            return {"item": None}
        else:
            item = self.conversation.get_item(id)
            if not item:
                raise Exception(f'Could not find item "{id}"')
            if item["type"] != "message":
                raise Exception('Can only cancelResponse messages with type "message"')
            if item["role"] != "assistant":
                raise Exception('Can only cancelResponse messages with role "assistant"')
            await self.realtime.send("response.cancel")
            audio_index = next((i for i, c in enumerate(item["content"]) if c["type"] == "audio"), -1)
            if audio_index == -1:
                raise Exception("Could not find audio on item to cancel")
            await self.realtime.send("conversation.item.truncate", {
                "item_id": id,
                "content_index": audio_index,
                "audio_end_ms": int((sample_count / self.conversation.default_frequency) * 1000),
            })
            return {"item": item}

    async def wait_for_next_item(self):
        event = await self.wait_for_next("conversation.item.appended")
        return {"item": event["item"]}

    async def wait_for_next_completed_item(self):
        event = await self.wait_for_next("conversation.item.completed")
        return {"item": event["item"]}

添加工具和处理程序

你的语音机器人的功能可以通过集成各种工具和处理程序来扩展。这些工具和处理程序允许机器人根据用户输入执行特定操作。

  1. 定义工具定义:

    • 在 tool.py 中,定义你的机器人的功能,例如检查订单状态、处理退货或更新账户信息等。
    • 每个工具都有一个名称、描述和所需参数。
  2. 实现处理程序:

    • 为每个工具创建异步处理函数,以执行所需的操作。
    • 这些处理程序与后端系统或数据库交互,以满足用户请求。
  3. 将工具集成到实时客户端中:

    • 在 app.py 文件中将每个工具及其操作员与 RealtimeClient 进行注册。
    • 确保该机器人在对话中能够动态地调用这些工具。

关键组件

工具定义

对每个工具进行结构化的描述,包括所需的参数和功能。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# Function Definitions
check_order_status_def = {
    "name": "check_order_status",
    "description": "Check the status of a customer's order",
    "parameters": {
      "type": "object",
      "properties": {
        "customer_id": {
          "type": "string",
          "description": "The unique identifier for the customer"
        },
        "order_id": {
          "type": "string",
          "description": "The unique identifier for the order"
        }
      },
      "required": ["customer_id", "order_id"]
    }
}
处理程序功能
  • 异步函数,用于执行每个工具的逻辑操作。
  • 与外部系统、数据库进行交互,或根据用户请求执行特定操作。

参考: tool.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
async def check_order_status_handler(customer_id, order_id):
    status = "In Transit"
    
    # Your Business Logic
    estimated_delivery, status, order_date =  fetch_order_details(order_id, customer_id)
    # Read the HTML template
    with open('order_status_template.html', 'r') as file:
        html_content = file.read()

    # Replace placeholders with actual data
    html_content = html_content.format(
        order_id=order_id,
        customer_id=customer_id,
        order_date=order_date.strftime("%B %d, %Y"),
        estimated_delivery=estimated_delivery.strftime("%B %d, %Y"),
        status=status
    )

    # Return the Chainlit message with HTML content
    await cl.Message(content=f"Here is the detail of your order \n {html_content}").send()
    return f"Order {order_id} status for customer {customer_id}: {status}"

与应用程序集成

有了实时客户端和相关工具后,是时候将所有内容整合到应用程序中了。

  1. 初始化实时 OpenAI:

    • 在 app.py 中,使用系统提示和会话配置设置与 GPT-4o 实时 API 的连接。
    • 无缝管理用户会话并跟踪交互。
  2. 处理用户交互:

    • 实现聊天启动、消息接收、音频处理和会话终止的事件处理程序。
    • 确保用户输入(无论是文本还是语音)能够得到及时的处理和响应。
  3. 管理对话流程:

    • 使用 RealtimeConversation 类来处理会话状态、管理音频流并保持上下文。
    • 实现逻辑以处理中断、取消和基于用户操作的动态响应。

关键组件

初始化

使用系统提示安装OpenAI实时客户端并配置工具。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
system_prompt = """Provide helpful and empathetic support responses to customer 
inquiries for ShopMe in Hindi language, addressing their requests, concerns, 
or feedback professionally.

Maintain a friendly and service-oriented tone throughout the interaction to 
ensure a positive customer experience.

# Steps

1. **Identify the Issue:** Carefully read the customer's inquiry to understand 
the problem or question they are presenting.
2. **Gather Relevant Information:** Check for any additional data needed, such as 
order numbers or account details, while ensuring the privacy and security of the customer's information.
3. **Formulate a Response:** Develop a solution or informative response based on 
the understanding of the issue. The response should be clear, concise, and address 
all parts of the customer's concern.
4. **Offer Further Assistance:** Invite the customer to reach out again if they 
need more help or have additional questions.
5. **Close Politely:** End the conversation with a polite closing statement that 
reinforces the service commitment of ShopMe.

# Output Format

Provide a clear and concise paragraph addressing the customer's inquiry, including:
- Acknowledgment of their concern
- Suggested solution or response
- Offer for further assistance
- Polite closing

# Notes
- Greet user with Welcome to ShopMe For the first time only
- always speak in Hindi
- Ensure all customer data is handled according to relevant privacy and data protection 
laws and ShopMe's privacy policy.
- In cases of high sensitivity or complexity, escalate the issue to a human customer support agent.
- Keep responses within a reasonable length to ensure they are easy to read and understand."""
事件处理程序

管理聊天启动、消息接收、音频流媒体和会话终止事件。

首先,我们将初始化之前讨论的实时客户端。

参考: app.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
async def setup_openai_realtime(system_prompt: str):
    """Instantiate and configure the OpenAI Realtime Client"""
    openai_realtime = RealtimeClient(system_prompt = system_prompt)
    cl.user_session.set("track_id", str(uuid4()))
    async def handle_conversation_updated(event):
        item = event.get("item")
        delta = event.get("delta")
        """Currently used to stream audio back to the client."""
        if delta:
            # Only one of the following will be populated for any given event
            if 'audio' in delta:
                audio = delta['audio']  # Int16Array, audio added
                await cl.context.emitter.send_audio_chunk(
                    cl.OutputAudioChunk(mimeType="pcm16", data=audio, 
                    track=cl.user_session.get("track_id")))
            if 'transcript' in delta:
                transcript = delta['transcript']  # string, transcript added
                pass
            if 'arguments' in delta:
                arguments = delta['arguments']  # string, function arguments added
                pass
            
    async def handle_item_completed(item):
        """Used to populate the chat context with transcription once an item is completed."""
        # print(item) # TODO
        pass
    
    async def handle_conversation_interrupt(event):
        """Used to cancel the client previous audio playback."""
        cl.user_session.set("track_id", str(uuid4()))
        await cl.context.emitter.send_audio_interrupt()
        
    async def handle_error(event):
        logger.error(event)
会话管理

维护用户会话,处理对话中断,确保交互流程顺畅。正如下面的代码所示,每当收到一个音频片段时,都应使用实时客户端将音频片段发送过去。

1
2
3
4
5
if openai_realtime:            
    if openai_realtime.is_connected():
        await openai_realtime.append_input_audio(chunk.data)
    else:
        logger.info("RealtimeClient is not connected")

测试与部署

一旦语音机器人构建完成,彻底的测试至关重要,以确保其可靠性和用户满意度。

  1. 本地测试:

    • 使用AI Studio实时音频工作台与已部署的模型进行交互。
    • 测试各种功能,包括语音识别、响应生成和工具执行。
  2. 集成测试:

    • 确保应用程序能够无缝地与实时 API 进行通信。
    • 测试事件处理程序和工具集成,以验证在不同场景下是否存在正确的行为。
  3. 部署:

    • 将应用程序部署到生产环境中,利用云服务实现可扩展性。
    • 监控性能,并根据需要进行调整以应对实际应用。

最后

多亏了 GPT-4o 实时API,构建实时语音机器人从未像现在这样容易。通过将语音到语音功能整合到一个高效的界面中,开发者可以创建引人入胜且自然的对话体验,无需处理管理多个模型的复杂性。无论是提升客户支持、开发教育工具还是创建交互式应用程序,GPT-4o 实时API都提供了坚实的基础,让语音机器人成为现实。

参考