diff --git a/libs/coze_server_api/__init__.py b/libs/coze_server_api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/libs/coze_server_api/client.py b/libs/coze_server_api/client.py new file mode 100644 index 000000000..67f53736f --- /dev/null +++ b/libs/coze_server_api/client.py @@ -0,0 +1,192 @@ +import json +import asyncio +import aiohttp +import io +from typing import Dict, List, Any, AsyncGenerator +import os +from pathlib import Path + + + + +class AsyncCozeAPIClient: + def __init__(self, api_key: str, api_base: str = "https://api.coze.cn"): + self.api_key = api_key + self.api_base = api_base + self.session = None + + async def __aenter__(self): + """支持异步上下文管理器""" + await self.coze_session() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """退出时自动关闭会话""" + await self.close() + + + + async def coze_session(self): + """确保HTTP session存在""" + if self.session is None: + connector = aiohttp.TCPConnector( + ssl=False if self.api_base.startswith("http://") else True, + limit=100, + limit_per_host=30, + keepalive_timeout=30, + enable_cleanup_closed=True, + ) + timeout = aiohttp.ClientTimeout( + total=120, # 默认超时时间 + connect=30, + sock_read=120, + ) + headers = { + "Authorization": f"Bearer {self.api_key}", + "Accept": "text/event-stream", + } + self.session = aiohttp.ClientSession( + headers=headers, timeout=timeout, connector=connector + ) + return self.session + + async def close(self): + """显式关闭会话""" + if self.session and not self.session.closed: + await self.session.close() + self.session = None + + async def upload( + self, + file, + ) -> str: + # 处理 Path 对象 + if isinstance(file, Path): + if not file.exists(): + raise ValueError(f"File not found: {file}") + with open(file, "rb") as f: + file = f.read() + + # 处理文件路径字符串 + elif isinstance(file, str): + if not os.path.isfile(file): + raise ValueError(f"File not found: {file}") + with open(file, "rb") as f: + file = f.read() + + # 处理文件对象 + elif hasattr(file, 'read'): + file = file.read() + + session = await self.coze_session() + url = f"{self.api_base}/v1/files/upload" + + try: + file_io = io.BytesIO(file) + async with session.post( + url, + data={ + "file": file_io, + }, + timeout=aiohttp.ClientTimeout(total=60), + ) as response: + if response.status == 401: + raise Exception("Coze API 认证失败,请检查 API Key 是否正确") + + response_text = await response.text() + + + if response.status != 200: + raise Exception( + f"文件上传失败,状态码: {response.status}, 响应: {response_text}" + ) + try: + result = await response.json() + except json.JSONDecodeError: + raise Exception(f"文件上传响应解析失败: {response_text}") + + if result.get("code") != 0: + raise Exception(f"文件上传失败: {result.get('msg', '未知错误')}") + + file_id = result["data"]["id"] + return file_id + + except asyncio.TimeoutError: + raise Exception("文件上传超时") + except Exception as e: + raise Exception(f"文件上传失败: {str(e)}") + + + async def chat_messages( + self, + bot_id: str, + user_id: str, + additional_messages: List[Dict] | None = None, + conversation_id: str | None = None, + auto_save_history: bool = True, + stream: bool = True, + timeout: float = 120, + ) -> AsyncGenerator[Dict[str, Any], None]: + """发送聊天消息并返回流式响应 + + Args: + bot_id: Bot ID + user_id: 用户ID + additional_messages: 额外消息列表 + conversation_id: 会话ID + auto_save_history: 是否自动保存历史 + stream: 是否流式响应 + timeout: 超时时间 + """ + session = await self.coze_session() + url = f"{self.api_base}/v3/chat" + + payload = { + "bot_id": bot_id, + "user_id": user_id, + "stream": stream, + "auto_save_history": auto_save_history, + } + + if additional_messages: + payload["additional_messages"] = additional_messages + + params = {} + if conversation_id: + params["conversation_id"] = conversation_id + + + try: + async with session.post( + url, + json=payload, + params=params, + timeout=aiohttp.ClientTimeout(total=timeout), + ) as response: + if response.status == 401: + raise Exception("Coze API 认证失败,请检查 API Key 是否正确") + + if response.status != 200: + raise Exception(f"Coze API 流式请求失败,状态码: {response.status}") + + + async for chunk in response.content: + chunk = chunk.decode("utf-8") + if chunk != '\n': + if chunk.startswith("event:"): + chunk_type = chunk.replace("event:", "", 1).strip() + elif chunk.startswith("data:"): + chunk_data = chunk.replace("data:", "", 1).strip() + else: + yield {"event": chunk_type, "data": json.loads(chunk_data)} + + except asyncio.TimeoutError: + raise Exception(f"Coze API 流式请求超时 ({timeout}秒)") + except Exception as e: + raise Exception(f"Coze API 流式请求失败: {str(e)}") + + + + + + diff --git a/pkg/provider/runners/cozeapi.py b/pkg/provider/runners/cozeapi.py new file mode 100644 index 000000000..6d4f02a12 --- /dev/null +++ b/pkg/provider/runners/cozeapi.py @@ -0,0 +1,312 @@ +from __future__ import annotations + +import typing +import json +import uuid +import base64 + +from .. import runner +from ...core import app +import langbot_plugin.api.entities.builtin.provider.message as provider_message +from ...utils import image +import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query +from libs.coze_server_api.client import AsyncCozeAPIClient + +@runner.runner_class('coze-api') +class CozeAPIRunner(runner.RequestRunner): + """Coze API 对话请求器""" + + def __init__(self, ap: app.Application, pipeline_config: dict): + self.pipeline_config = pipeline_config + self.ap = ap + self.agent_token = pipeline_config["ai"]['coze-api']['api-key'] + self.bot_id = pipeline_config["ai"]['coze-api'].get('bot-id') + self.chat_timeout = pipeline_config["ai"]['coze-api'].get('timeout') + self.auto_save_history = pipeline_config["ai"]['coze-api'].get('auto_save_history') + self.api_base = pipeline_config["ai"]['coze-api'].get('api-base') + + self.coze = AsyncCozeAPIClient( + self.agent_token, + self.api_base + ) + + def _process_thinking_content( + self, + content: str, + ) -> tuple[str, str]: + """处理思维链内容 + + Args: + content: 原始内容 + Returns: + (处理后的内容, 提取的思维链内容) + """ + remove_think = self.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False) + thinking_content = '' + # 从 content 中提取 标签内容 + if content and '' in content and '' in content: + import re + + think_pattern = r'(.*?)' + think_matches = re.findall(think_pattern, content, re.DOTALL) + if think_matches: + thinking_content = '\n'.join(think_matches) + # 移除 content 中的 标签 + content = re.sub(think_pattern, '', content, flags=re.DOTALL).strip() + + # 根据 remove_think 参数决定是否保留思维链 + if remove_think: + return content, '' + else: + # 如果有思维链内容,将其以 格式添加到 content 开头 + if thinking_content: + content = f'\n{thinking_content}\n\n{content}'.strip() + return content, thinking_content + + async def _preprocess_user_message(self, query: pipeline_query.Query) -> list[dict]: + """预处理用户消息,转换为Coze消息格式 + + Returns: + list[dict]: Coze消息列表 + """ + messages = [] + + if isinstance(query.user_message.content, list): + # 多模态消息处理 + content_parts = [] + + for ce in query.user_message.content: + if ce.type == 'text': + content_parts.append({"type": "text", "text": ce.text}) + elif ce.type == 'image_base64': + image_b64, image_format = await image.extract_b64_and_format(ce.image_base64) + file_bytes = base64.b64decode(image_b64) + file_id = await self._get_file_id(file_bytes) + content_parts.append({"type": "image", "file_id": file_id}) + elif ce.type == 'file': + # 处理文件,上传到Coze + file_id = await self._get_file_id(ce.file) + content_parts.append({"type": "file", "file_id": file_id}) + + # 创建多模态消息 + if content_parts: + messages.append({ + "role": "user", + "content": json.dumps(content_parts), + "content_type": "object_string", + "meta_data": None + }) + + elif isinstance(query.user_message.content, str): + # 纯文本消息 + messages.append({ + "role": "user", + "content": query.user_message.content, + "content_type": "text", + "meta_data": None + }) + + return messages + + async def _get_file_id(self, file) -> str: + """上传文件到Coze服务 + Args: + file: 文件 + Returns: + str: 文件ID + """ + file_id = await self.coze.upload(file=file) + return file_id + + async def _chat_messages( + self, query: pipeline_query.Query + ) -> typing.AsyncGenerator[provider_message.Message, None]: + """调用聊天助手(非流式) + + 注意:由于cozepy没有提供非流式API,这里使用流式API并在结束后一次性返回完整内容 + """ + user_id = f'{query.launcher_id}_{query.sender_id}' + + # 预处理用户消息 + additional_messages = await self._preprocess_user_message(query) + + # 获取会话ID + conversation_id = None + + # 收集完整内容 + full_content = '' + full_reasoning = '' + + try: + # 调用Coze API流式接口 + async for chunk in self.coze.chat_messages( + bot_id=self.bot_id, + user_id=user_id, + additional_messages=additional_messages, + conversation_id=conversation_id, + timeout=self.chat_timeout, + auto_save_history=self.auto_save_history, + stream=True + ): + self.ap.logger.debug(f'coze-chat-stream: {chunk}') + + event_type = chunk.get('event') + data = chunk.get('data', {}) + + if event_type == 'conversation.message.delta': + # 收集内容 + if 'content' in data: + full_content += data.get('content', '') + + # 收集推理内容(如果有) + if 'reasoning_content' in data: + full_reasoning += data.get('reasoning_content', '') + + elif event_type == 'done': + # 保存会话ID + if 'conversation_id' in data: + conversation_id = data.get('conversation_id') + + elif event_type == 'error': + # 处理错误 + error_msg = f"Coze API错误: {data.get('message', '未知错误')}" + yield provider_message.Message( + role='assistant', + content=error_msg, + ) + return + + # 处理思维链内容 + content, thinking_content = self._process_thinking_content(full_content) + if full_reasoning: + remove_think = self.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False) + if not remove_think: + content = f'\n{full_reasoning}\n\n{content}'.strip() + + # 一次性返回完整内容 + yield provider_message.Message( + role='assistant', + content=content, + ) + + # 保存会话ID + if conversation_id and query.session.using_conversation: + query.session.using_conversation.uuid = conversation_id + + except Exception as e: + self.ap.logger.error(f'Coze API错误: {str(e)}') + yield provider_message.Message( + role='assistant', + content=f'Coze API调用失败: {str(e)}', + ) + + + async def _chat_messages_chunk( + self, query: pipeline_query.Query + ) -> typing.AsyncGenerator[provider_message.MessageChunk, None]: + """调用聊天助手(流式)""" + user_id = f'{query.launcher_id}_{query.sender_id}' + + # 预处理用户消息 + additional_messages = await self._preprocess_user_message(query) + + # 获取会话ID + conversation_id = None + + start_reasoning = False + stop_reasoning = False + message_idx = 1 + is_final = False + full_content = '' + remove_think = self.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False) + + + + try: + # 调用Coze API流式接口 + async for chunk in self.coze.chat_messages( + bot_id=self.bot_id, + user_id=user_id, + additional_messages=additional_messages, + conversation_id=conversation_id, + timeout=self.chat_timeout, + auto_save_history=self.auto_save_history, + stream=True + ): + self.ap.logger.debug(f'coze-chat-stream-chunk: {chunk}') + + event_type = chunk.get('event') + data = chunk.get('data', {}) + content = "" + + if event_type == 'conversation.message.delta': + message_idx += 1 + # 处理内容增量 + if "reasoning_content" in data and not remove_think: + + reasoning_content = data.get('reasoning_content', '') + if reasoning_content and not start_reasoning: + content = f"\n" + start_reasoning = True + content += reasoning_content + + if 'content' in data: + if data.get('content', ''): + content += data.get('content', '') + if not stop_reasoning and start_reasoning: + content = f"\n{content}" + stop_reasoning = True + + + elif event_type == 'done': + # 保存会话ID + if 'conversation_id' in data: + conversation_id = data.get('conversation_id') + if query.session.using_conversation: + query.session.using_conversation.uuid = conversation_id + is_final = True + + + elif event_type == 'error': + # 处理错误 + error_msg = f"Coze API错误: {data.get('message', '未知错误')}" + yield provider_message.MessageChunk( + role='assistant', + content=error_msg, + finish_reason='error' + ) + return + full_content += content + if message_idx % 8 == 0 or is_final: + if full_content: + yield provider_message.MessageChunk( + role='assistant', + content=full_content, + is_final=is_final + ) + + except Exception as e: + self.ap.logger.error(f'Coze API流式调用错误: {str(e)}') + yield provider_message.MessageChunk( + role='assistant', + content=f'Coze API流式调用失败: {str(e)}', + finish_reason='error' + ) + + + async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]: + """运行""" + msg_seq = 0 + if await query.adapter.is_stream_output_supported(): + async for msg in self._chat_messages_chunk(query): + if isinstance(msg, provider_message.MessageChunk): + msg_seq += 1 + msg.msg_sequence = msg_seq + yield msg + else: + async for msg in self._chat_messages(query): + yield msg + + + + diff --git a/templates/config.yaml b/templates/config.yaml index 13916d9ff..b81b04dc7 100644 --- a/templates/config.yaml +++ b/templates/config.yaml @@ -41,4 +41,4 @@ plugin: enable: true runtime_ws_url: 'ws://langbot_plugin_runtime:5400/control/ws' enable_marketplace: true - cloud_service_url: 'https://space.langbot.app' \ No newline at end of file + cloud_service_url: 'https://space.langbot.app' diff --git a/templates/metadata/pipeline/ai.yaml b/templates/metadata/pipeline/ai.yaml index 2b69806ca..e4d16a95d 100644 --- a/templates/metadata/pipeline/ai.yaml +++ b/templates/metadata/pipeline/ai.yaml @@ -43,6 +43,10 @@ stages: label: en_US: Langflow API zh_Hans: Langflow API + - name: coze-api + label: + en_US: Coze API + zh_Hans: 扣子 API - name: local-agent label: en_US: Local Agent @@ -380,4 +384,57 @@ stages: zh_Hans: 可选的流程调整参数 type: json required: false - default: '{}' \ No newline at end of file + default: '{}' + - name: coze-api + label: + en_US: coze API + zh_Hans: 扣子 API + description: + en_US: Configure the Coze API of the pipeline + zh_Hans: 配置Coze API + config: + - name: api-key + label: + en_US: API Key + zh_Hans: API 密钥 + description: + en_US: The API key for the Coze server + zh_Hans: Coze服务器的 API 密钥 + type: string + required: true + - name: bot-id + label: + en_US: Bot ID + zh_Hans: 机器人 ID + description: + en_US: The ID of the bot to run + zh_Hans: 要运行的机器人 ID + type: string + required: true + - name: api-base + label: + en_US: API Base URL + zh_Hans: API 基础 URL + description: + en_US: The base URL for the Coze API, please use https://api.coze.com for global Coze edition(coze.com). + zh_Hans: Coze API 的基础 URL,请使用 https://api.coze.com 用于全球 Coze 版(coze.com) + type: string + default: "https://api.coze.cn" + - name: auto-save-history + label: + en_US: Auto Save History + zh_Hans: 自动保存历史 + description: + en_US: Whether to automatically save conversation history + zh_Hans: 是否自动保存对话历史 + type: boolean + default: true + - name: timeout + label: + en_US: Request Timeout + zh_Hans: 请求超时 + description: + en_US: Timeout in seconds for API requests + zh_Hans: API 请求超时时间(秒) + type: number + default: 120 \ No newline at end of file