diff --git a/document/content/docs/self-host/upgrading/4-14/4149.mdx b/document/content/docs/self-host/upgrading/4-14/4149.mdx index 3eba454701ee..923d6fb2ef44 100644 --- a/document/content/docs/self-host/upgrading/4-14/4149.mdx +++ b/document/content/docs/self-host/upgrading/4-14/4149.mdx @@ -63,6 +63,7 @@ CODE_SANDBOX_TOKEN=代码运行沙盒的凭证 8. 修复工作流预览模式下,重新打开预览弹窗,会丢失表单输入内容。 9. 修复订阅套餐自定义字段未生效 10. login接口,存在异步 session 问题,会出现报错日志。 +11. 系统工具集不显示版本 12. 修复判断器 arrayAny 类型无判断条件可选 13. 修复视频音频自定义文件类型流程开始无文件链接变量 14. 用户输入框消息不转义成 markdown 格式 @@ -70,4 +71,5 @@ CODE_SANDBOX_TOKEN=代码运行沙盒的凭证 ## 代码优化 -1. 商业版开发时,monorepo 指向不同 react 导致需重装包。 \ No newline at end of file +1. 商业版开发时,monorepo 指向不同 react 导致需重装包。 + diff --git a/packages/global/core/workflow/constants.ts b/packages/global/core/workflow/constants.ts index 0bbd2d4e9c61..8dec5b5e20c7 100644 --- a/packages/global/core/workflow/constants.ts +++ b/packages/global/core/workflow/constants.ts @@ -239,6 +239,8 @@ export enum NodeInputKeyEnum { // loop loopInputArray = 'loopInputArray', + batchParallelConcurrency = 'batchParallelConcurrency', + batchParallelRetryTimes = 'batchParallelRetryTimes', childrenNodeIdList = 'childrenNodeIdList', nodeWidth = 'nodeWidth', nodeHeight = 'nodeHeight', @@ -307,6 +309,8 @@ export enum NodeOutputKeyEnum { // loop loopArray = 'loopArray', + batchRawResult = 'batchRawResult', + batchStatus = 'batchStatus', // loop start loopStartInput = 'loopStartInput', loopStartIndex = 'loopStartIndex', diff --git a/packages/global/core/workflow/node/constant.ts b/packages/global/core/workflow/node/constant.ts index 840ef84f3b02..5550c178d343 100644 --- a/packages/global/core/workflow/node/constant.ts +++ b/packages/global/core/workflow/node/constant.ts @@ -148,6 +148,7 @@ export enum FlowNodeTypeEnum { readFiles = 'readFiles', userSelect = 'userSelect', loop = 'loop', + batch = 'batch', loopStart = 'loopStart', loopEnd = 'loopEnd', formInput = 'formInput', diff --git a/packages/global/core/workflow/runtime/type.ts b/packages/global/core/workflow/runtime/type.ts index 0fc55a33755e..95874701cfe6 100644 --- a/packages/global/core/workflow/runtime/type.ts +++ b/packages/global/core/workflow/runtime/type.ts @@ -393,6 +393,12 @@ export type DispatchNodeResponseType = { loopResult?: any[]; loopInput?: any[]; loopDetail?: ChatHistoryItemResType[]; + // batch + batchInput?: any[]; + batchResult?: any[]; + batchRawResult?: any[]; + batchStatus?: 'success' | 'failed' | 'partial_success'; + batchDetail?: ChatHistoryItemResType[]; // loop start loopInputValue?: any; // loop end diff --git a/packages/global/core/workflow/template/constants.ts b/packages/global/core/workflow/template/constants.ts index 470779c9a4a7..6e3176f5b728 100644 --- a/packages/global/core/workflow/template/constants.ts +++ b/packages/global/core/workflow/template/constants.ts @@ -27,6 +27,7 @@ import { FormInputNode } from './system/interactive/formInput'; import { UserSelectNode } from './system/interactive/userSelect'; import { LafModule } from './system/laf'; import { LoopNode } from './system/loop/loop'; +import { BatchNode } from './system/loop/batch'; import { LoopEndNode } from './system/loop/loopEnd'; import { LoopStartNode } from './system/loop/loopStart'; import { ReadFilesNode } from './system/readFiles'; @@ -55,7 +56,8 @@ const systemNodes: FlowNodeTemplateType[] = [ IfElseNode, VariableUpdateNode, CodeNode, - LoopNode + LoopNode, + BatchNode ]; /* app flow module templates */ export const appSystemModuleTemplates: FlowNodeTemplateType[] = [ diff --git a/packages/global/core/workflow/template/system/loop/batch.ts b/packages/global/core/workflow/template/system/loop/batch.ts new file mode 100644 index 000000000000..d8889b8b9c5f --- /dev/null +++ b/packages/global/core/workflow/template/system/loop/batch.ts @@ -0,0 +1,96 @@ +import { + FlowNodeInputTypeEnum, + FlowNodeOutputTypeEnum, + FlowNodeTypeEnum +} from '../../../node/constant'; +import { type FlowNodeTemplateType } from '../../../type/node'; +import { + FlowNodeTemplateTypeEnum, + NodeInputKeyEnum, + NodeOutputKeyEnum, + WorkflowIOValueTypeEnum +} from '../../../constants'; +import { i18nT } from '../../../../../../web/i18n/utils'; +import { + Input_Template_Children_Node_List, + Input_Template_LOOP_NODE_OFFSET, + Input_Template_Node_Height, + Input_Template_Node_Width +} from '../../input'; + +export const BatchNode: FlowNodeTemplateType = { + id: FlowNodeTypeEnum.batch, + templateType: FlowNodeTemplateTypeEnum.tools, + flowNodeType: FlowNodeTypeEnum.batch, + showSourceHandle: true, + showTargetHandle: true, + avatar: 'core/workflow/template/loop', + avatarLinear: 'core/workflow/template/loopLinear', + colorSchema: 'violetDeep', + name: i18nT('workflow:batch'), + intro: i18nT('workflow:intro_batch'), + showStatus: true, + inputs: [ + { + key: NodeInputKeyEnum.loopInputArray, + renderTypeList: [FlowNodeInputTypeEnum.reference], + valueType: WorkflowIOValueTypeEnum.arrayAny, + required: true, + label: i18nT('workflow:loop_input_array'), + value: [] + }, + { + key: NodeInputKeyEnum.batchParallelConcurrency, + renderTypeList: [FlowNodeInputTypeEnum.numberInput, FlowNodeInputTypeEnum.reference], + selectedTypeIndex: 0, + valueType: WorkflowIOValueTypeEnum.number, + required: true, + label: i18nT('workflow:batch_parallel_concurrency'), + min: 1, + max: 10, + value: 5 + }, + { + key: NodeInputKeyEnum.batchParallelRetryTimes, + renderTypeList: [FlowNodeInputTypeEnum.numberInput], + valueType: WorkflowIOValueTypeEnum.number, + required: true, + label: i18nT('workflow:batch_parallel_retry_times'), + min: 0, + max: 5, + value: 3 + }, + Input_Template_Children_Node_List, + Input_Template_Node_Width, + Input_Template_Node_Height, + Input_Template_LOOP_NODE_OFFSET + ], + outputs: [ + { + id: NodeOutputKeyEnum.loopArray, + key: NodeOutputKeyEnum.loopArray, + label: i18nT('workflow:batch_result_success'), + description: i18nT('workflow:batch_result_success_tip'), + type: FlowNodeOutputTypeEnum.static, + valueType: WorkflowIOValueTypeEnum.arrayAny + }, + { + id: NodeOutputKeyEnum.batchRawResult, + key: NodeOutputKeyEnum.batchRawResult, + label: i18nT('workflow:batch_result_raw'), + description: i18nT('workflow:batch_result_raw_tip'), + required: true, + type: FlowNodeOutputTypeEnum.static, + valueType: WorkflowIOValueTypeEnum.arrayObject + }, + { + id: NodeOutputKeyEnum.batchStatus, + key: NodeOutputKeyEnum.batchStatus, + label: i18nT('workflow:batch_status'), + description: i18nT('workflow:batch_status_tip'), + required: true, + type: FlowNodeOutputTypeEnum.static, + valueType: WorkflowIOValueTypeEnum.string + } + ] +}; diff --git a/packages/global/core/workflow/template/system/loop/loop.ts b/packages/global/core/workflow/template/system/loop/loop.ts index a774da54e4fc..f6ea6cf05b19 100644 --- a/packages/global/core/workflow/template/system/loop/loop.ts +++ b/packages/global/core/workflow/template/system/loop/loop.ts @@ -22,12 +22,13 @@ export const LoopNode: FlowNodeTemplateType = { id: FlowNodeTypeEnum.loop, templateType: FlowNodeTemplateTypeEnum.tools, flowNodeType: FlowNodeTypeEnum.loop, + abandon: true, showSourceHandle: true, showTargetHandle: true, avatar: 'core/workflow/template/loop', avatarLinear: 'core/workflow/template/loopLinear', colorSchema: 'violetDeep', - name: i18nT('workflow:loop'), + name: i18nT('workflow:loop_deprecated'), intro: i18nT('workflow:intro_loop'), showStatus: true, courseUrl: '/docs/introduction/guide/dashboard/workflow/loop/', diff --git a/packages/global/core/workflow/template/system/loop/loopEnd.ts b/packages/global/core/workflow/template/system/loop/loopEnd.ts index 5aae082f9eb0..46d6b717d00a 100644 --- a/packages/global/core/workflow/template/system/loop/loopEnd.ts +++ b/packages/global/core/workflow/template/system/loop/loopEnd.ts @@ -19,13 +19,14 @@ export const LoopEndNode: FlowNodeTemplateType = { avatarLinear: 'core/workflow/template/loopEndLinear', colorSchema: 'violetDeep', name: i18nT('workflow:loop_end'), + intro: i18nT('workflow:loop_end_intro'), showStatus: false, inputs: [ { key: NodeInputKeyEnum.loopEndInput, renderTypeList: [FlowNodeInputTypeEnum.reference], valueType: WorkflowIOValueTypeEnum.any, - label: '', + label: i18nT('workflow:loop_end_intro'), required: true, value: [] } diff --git a/packages/service/core/workflow/dispatch/batch/runBatch.ts b/packages/service/core/workflow/dispatch/batch/runBatch.ts new file mode 100644 index 000000000000..9408c4dc176e --- /dev/null +++ b/packages/service/core/workflow/dispatch/batch/runBatch.ts @@ -0,0 +1,239 @@ +import { NodeInputKeyEnum, NodeOutputKeyEnum } from '@fastgpt/global/core/workflow/constants'; +import { FlowNodeTypeEnum } from '@fastgpt/global/core/workflow/node/constant'; +import { + type DispatchNodeResultType, + type ModuleDispatchProps +} from '@fastgpt/global/core/workflow/runtime/type'; +import { runWorkflow } from '..'; +import { DispatchNodeResponseKeyEnum } from '@fastgpt/global/core/workflow/runtime/constants'; +import { cloneDeep } from 'lodash'; +import { storeEdges2RuntimeEdges } from '@fastgpt/global/core/workflow/runtime/utils'; +import { getErrText } from '@fastgpt/global/common/error/utils'; + +type BatchRawResultItem = { + success: boolean; + message?: string; + data?: any; +}; + +type Props = ModuleDispatchProps<{ + [NodeInputKeyEnum.loopInputArray]: Array; + [NodeInputKeyEnum.childrenNodeIdList]: string[]; + [NodeInputKeyEnum.batchParallelConcurrency]?: number; + [NodeInputKeyEnum.batchParallelRetryTimes]?: number; +}>; +type Response = DispatchNodeResultType<{ + [NodeOutputKeyEnum.loopArray]: Array; + [NodeOutputKeyEnum.batchRawResult]: BatchRawResultItem[]; + [NodeOutputKeyEnum.batchStatus]: 'success' | 'failed' | 'partial_success'; +}>; + +const getMaxConcurrency = () => { + const val = Number(process.env.WORKFLOW_BATCH_MAX_CONCURRENCY); + return Number.isInteger(val) && val > 0 ? val : 10; +}; +const getMaxRetry = () => { + const val = Number(process.env.WORKFLOW_BATCH_MAX_RETRY); + return Number.isInteger(val) && val >= 0 ? val : 5; +}; +const getRuntimeConcurrency = (raw: any) => { + const num = Math.floor(Number(raw)); + if (!Number.isFinite(num)) return 5; + return Math.max(1, Math.min(getMaxConcurrency(), num)); +}; +const getRuntimeRetry = (raw: any) => { + const num = Math.floor(Number(raw)); + if (!Number.isFinite(num)) return 3; + return Math.max(0, Math.min(getMaxRetry(), num)); +}; + +const assertBatchChildNodes = ({ + childrenNodeIdList, + runtimeNodes +}: { + childrenNodeIdList: string[]; + runtimeNodes: Props['runtimeNodes']; +}) => { + const forbiddenTypes = new Set([ + FlowNodeTypeEnum.loop, + FlowNodeTypeEnum.batch, + FlowNodeTypeEnum.userSelect, + FlowNodeTypeEnum.formInput, + FlowNodeTypeEnum.variableUpdate + ]); + + const hasForbidden = runtimeNodes.some( + (node) => childrenNodeIdList.includes(node.nodeId) && forbiddenTypes.has(node.flowNodeType) + ); + if (hasForbidden) { + return Promise.reject( + 'Batch child workflow does not allow loop/batch/interactive/variable-update nodes' + ); + } +}; + +export const dispatchBatch = async (props: Props): Promise => { + const { + params, + runtimeEdges, + runtimeNodes, + node: { name } + } = props; + const { + loopInputArray = [], + childrenNodeIdList = [], + batchParallelConcurrency = 5, + batchParallelRetryTimes = 3 + } = params; + + if (!Array.isArray(loopInputArray)) { + return Promise.reject('Input value is not an array'); + } + + await assertBatchChildNodes({ + childrenNodeIdList, + runtimeNodes + }); + + const maxLength = process.env.WORKFLOW_MAX_LOOP_TIMES + ? Number(process.env.WORKFLOW_MAX_LOOP_TIMES) + : 50; + if (loopInputArray.length > maxLength) { + return Promise.reject(`Input array length cannot be greater than ${maxLength}`); + } + + if (loopInputArray.length === 0) { + return { + data: { + [NodeOutputKeyEnum.loopArray]: [], + [NodeOutputKeyEnum.batchRawResult]: [], + [NodeOutputKeyEnum.batchStatus]: 'success' + }, + [DispatchNodeResponseKeyEnum.nodeResponse]: { + totalPoints: 0, + batchInput: loopInputArray, + batchResult: [], + batchRawResult: [], + batchStatus: 'success', + mergeSignId: props.node.nodeId + } + }; + } + + const concurrency = getRuntimeConcurrency(batchParallelConcurrency); + const retryTimes = getRuntimeRetry(batchParallelRetryTimes); + + const orderedRawResult: BatchRawResultItem[] = new Array(loopInputArray.length); + const orderedSuccessResult: any[] = []; + const detailResponses: any[] = []; + let totalPoints = 0; + const customFeedbacks: string[] = []; + + let cursor = 0; + const runOne = async (item: any, index: number) => { + let attempt = 0; + while (attempt <= retryTimes) { + try { + const taskRuntimeNodes = cloneDeep(runtimeNodes); + taskRuntimeNodes.forEach((node) => { + if (!childrenNodeIdList.includes(node.nodeId)) return; + if (node.flowNodeType !== FlowNodeTypeEnum.loopStart) return; + + node.isEntry = true; + node.inputs.forEach((input) => { + if (input.key === NodeInputKeyEnum.loopStartInput) { + input.value = item; + } else if (input.key === NodeInputKeyEnum.loopStartIndex) { + input.value = index + 1; + } + }); + }); + + const response = await runWorkflow({ + ...props, + usageId: undefined, + lastInteractive: undefined, + runtimeNodes: taskRuntimeNodes, + runtimeEdges: cloneDeep(storeEdges2RuntimeEdges(runtimeEdges, undefined)) + }); + + if (response.workflowInteractiveResponse) { + throw new Error('Batch child workflow does not allow interactive nodes'); + } + + const loopOutputValue = response.flowResponses.find( + (res) => res.moduleType === FlowNodeTypeEnum.loopEnd + )?.loopOutputValue; + + orderedRawResult[index] = { + success: true, + data: loopOutputValue + }; + orderedSuccessResult.push({ + index, + data: loopOutputValue + }); + detailResponses.push(...response.flowResponses); + totalPoints += response.flowUsages.reduce((acc, usage) => acc + usage.totalPoints, 0); + if (response[DispatchNodeResponseKeyEnum.customFeedbacks]) { + customFeedbacks.push(...response[DispatchNodeResponseKeyEnum.customFeedbacks]); + } + return; + } catch (error) { + attempt++; + if (attempt > retryTimes) { + orderedRawResult[index] = { + success: false, + message: getErrText(error) + }; + } + } + } + }; + + const workers = Array.from({ length: Math.min(concurrency, loopInputArray.length) }).map( + async () => { + while (cursor < loopInputArray.length) { + const index = cursor++; + await runOne(loopInputArray[index], index); + } + } + ); + await Promise.all(workers); + + const successCount = orderedRawResult.filter((item) => item?.success).length; + const failedCount = orderedRawResult.length - successCount; + const status: 'success' | 'failed' | 'partial_success' = + failedCount === 0 ? 'success' : successCount === 0 ? 'failed' : 'partial_success'; + + const sortedSuccessResult = orderedSuccessResult + .sort((a, b) => a.index - b.index) + .map((item) => item.data); + + return { + data: { + [NodeOutputKeyEnum.loopArray]: sortedSuccessResult, + [NodeOutputKeyEnum.batchRawResult]: orderedRawResult, + [NodeOutputKeyEnum.batchStatus]: status + }, + [DispatchNodeResponseKeyEnum.nodeResponse]: { + totalPoints, + batchInput: loopInputArray, + batchResult: sortedSuccessResult, + batchRawResult: orderedRawResult, + batchStatus: status, + batchDetail: detailResponses, + mergeSignId: props.node.nodeId + }, + [DispatchNodeResponseKeyEnum.nodeDispatchUsages]: totalPoints + ? [ + { + totalPoints, + moduleName: name + } + ] + : [], + [DispatchNodeResponseKeyEnum.customFeedbacks]: + customFeedbacks.length > 0 ? customFeedbacks : undefined + }; +}; diff --git a/packages/service/core/workflow/dispatch/constants.ts b/packages/service/core/workflow/dispatch/constants.ts index 37a07d4b7b3d..d468929be423 100644 --- a/packages/service/core/workflow/dispatch/constants.ts +++ b/packages/service/core/workflow/dispatch/constants.ts @@ -14,6 +14,7 @@ import { dispatchWorkflowStart } from './init/workflowStart'; import { dispatchFormInput } from './interactive/formInput'; import { dispatchUserSelect } from './interactive/userSelect'; import { dispatchLoop } from './loop/runLoop'; +import { dispatchBatch } from './batch/runBatch'; import { dispatchLoopEnd } from './loop/runLoopEnd'; import { dispatchLoopStart } from './loop/runLoopStart'; import { dispatchRunPlugin } from './plugin/run'; @@ -65,6 +66,7 @@ export const callbackMap: Record = { [FlowNodeTypeEnum.readFiles]: dispatchReadFiles, [FlowNodeTypeEnum.userSelect]: dispatchUserSelect, [FlowNodeTypeEnum.loop]: dispatchLoop, + [FlowNodeTypeEnum.batch]: dispatchBatch, [FlowNodeTypeEnum.loopStart]: dispatchLoopStart, [FlowNodeTypeEnum.loopEnd]: dispatchLoopEnd, [FlowNodeTypeEnum.formInput]: dispatchFormInput, diff --git a/packages/web/i18n/en/workflow.json b/packages/web/i18n/en/workflow.json index c5628b3cba41..f6e0b8c08d28 100644 --- a/packages/web/i18n/en/workflow.json +++ b/packages/web/i18n/en/workflow.json @@ -128,11 +128,24 @@ "less_than": "Less Than", "less_than_or_equal_to": "Less Than or Equal To", "loop": "Batch Run", + "loop_deprecated": "Batch Run (Serial, Deprecated)", "loop_body": "loop body", "loop_end": "End", + "loop_end_intro": "Select a variable as the output of batch execution", "loop_input_array": "array", "loop_result": "Array execution results", "loop_start": "Start", + "batch": "Batch Run", + "intro_batch": "Input an array, run multiple independent tasks in parallel (random execution order), and aggregate results by input order.", + "batch_parallel_concurrency": "Concurrency Limit", + "batch_parallel_retry_times": "Retry Times Per Item", + "batch_result_success": "Batch Result (Success)", + "batch_result_raw": "Batch Result (Raw)", + "batch_status": "Overall Status", + "execution_logic": "Execution logic", + "batch_result_success_tip": "Filter out failed executions and output only successful results.\n\nOutput format:\n[\nresult1,\nresult2\n]", + "batch_result_raw_tip": "Outputs the raw execution results of batch run.\n\nOutput format:\n[{\nsuccess: boolean,\nmessage: error message,\ndata: execution result\n}, {}, ...]\n\nWhen successful: success=true, data has value, and message is empty; when failed: success=false, message has value, and data is empty.", + "batch_status_tip": "All success: success; all failed: failed; partial: partial_success (downstream uses English values).", "max_dialog_rounds": "Maximum Number of Dialog Rounds", "max_tokens": "Maximum Tokens", "mouse_priority": "Mouse first\n- Press the left button to drag the canvas\n- Hold down shift and left click to select batches", @@ -232,5 +245,7 @@ "workflow.My edit": "My Edit", "workflow.Switch_success": "Switch Successful", "workflow.Team cloud": "Team Cloud", - "workflow.exit_tips": "Your changes have not been saved. 'Exit directly' will not save your edits." + "workflow.exit_tips": "Your changes have not been saved. 'Exit directly' will not save your edits.", + "batch_end_output_label": "Select a variable as the batch execution output", + "batch_no_interactive_node": "Interactive nodes are not allowed inside batch execution nodes" } diff --git a/packages/web/i18n/zh-CN/workflow.json b/packages/web/i18n/zh-CN/workflow.json index 7111715d39bd..a5187625d668 100644 --- a/packages/web/i18n/zh-CN/workflow.json +++ b/packages/web/i18n/zh-CN/workflow.json @@ -128,11 +128,24 @@ "less_than": "小于", "less_than_or_equal_to": "小于等于", "loop": "批量执行", + "loop_deprecated": "批量执行(串行,Deprecated)", "loop_body": "循环体", "loop_end": "结束", + "loop_end_intro": "选择变量,作为批量执行的结果输出", "loop_input_array": "数组", "loop_result": "数组执行结果", "loop_start": "开始", + "batch": "批量执行", + "intro_batch": "输入一个数组,批量执行多个独立任务(任务顺序随机),并按数组顺序聚合结果", + "batch_parallel_concurrency": "并发上限", + "batch_parallel_retry_times": "单轮报错重试次数", + "batch_result_success": "批量执行结果(成功)", + "batch_result_raw": "批量执行结果(原始)", + "batch_status": "整体运行状态", + "execution_logic": "执行逻辑", + "batch_result_success_tip": "过滤运行失败结果,仅输出运行成功的执行结果。\n\n输出格式:\n[\n运行结果1,\n运行结果2\n]", + "batch_result_raw_tip": "输出批量执行的原始执行结果。\n\n输出格式:\n[{\nsuccess: boolean, \nmessage: 报错信息, \ndata: 运行结果\n}, {}, ...]\n\n成功时 success=true , data 有值,message为空;失败时 success=false,message 有值,data为空。", + "batch_status_tip": "全部成功:success;\n全部失败:failed;\n部分成功:partial_success", "max_dialog_rounds": "最多携带多少轮对话记录", "max_tokens": "最大 Tokens", "mouse_priority": "鼠标优先\n- 左键按下后可拖动画布\n- 按住 shift 后左键可批量选择", @@ -232,5 +245,7 @@ "workflow.My edit": "我的编辑", "workflow.Switch_success": "切换成功", "workflow.Team cloud": "团队云端", - "workflow.exit_tips": "您的更改尚未保存,「直接退出」将不会保存您的编辑记录。" + "workflow.exit_tips": "您的更改尚未保存,「直接退出」将不会保存您的编辑记录。", + "batch_end_output_label": "选择变量,作为批量执行的结果输出", + "batch_no_interactive_node": "批量执行节点内不允许存在交互节点" } diff --git a/packages/web/i18n/zh-Hant/workflow.json b/packages/web/i18n/zh-Hant/workflow.json index 2e7e28be1cf9..b5cfbaf1d0db 100644 --- a/packages/web/i18n/zh-Hant/workflow.json +++ b/packages/web/i18n/zh-Hant/workflow.json @@ -128,11 +128,24 @@ "less_than": "小於", "less_than_or_equal_to": "小於或等於", "loop": "大量執行", + "loop_deprecated": "大量執行(串行,Deprecated)", "loop_body": "迴圈主體", "loop_end": "結束", + "loop_end_intro": "選擇變數,作為批量執行的結果輸出", "loop_input_array": "陣列", "loop_result": "陣列執行結果", "loop_start": "開始", + "batch": "大量執行", + "intro_batch": "輸入一個陣列,批量執行多個獨立任務(任務順序隨機),並按陣列順序聚合結果", + "batch_parallel_concurrency": "並發上限", + "batch_parallel_retry_times": "單輪報錯重試次數", + "batch_result_success": "批量執行結果(成功)", + "batch_result_raw": "批量執行結果(原始)", + "batch_status": "整體運行狀態", + "execution_logic": "執行邏輯", + "batch_result_success_tip": "過濾執行失敗結果,僅輸出執行成功的結果。\n\n輸出格式:\n[\n執行結果1,\n執行結果2\n]", + "batch_result_raw_tip": "輸出批量執行的原始執行結果。\n\n輸出格式:\n[{\nsuccess: boolean, \nmessage: 錯誤訊息, \ndata: 執行結果\n}, {}, ...]\n\n成功時 success=true,data 有值,message 為空;失敗時 success=false,message 有值,data 為空。", + "batch_status_tip": "全部成功:success;\n全部失敗:failed;\n部分成功:partial_success", "max_dialog_rounds": "最多攜帶幾輪對話紀錄", "max_tokens": "最大 Token 數", "mouse_priority": "滑鼠優先\n- 按下左鍵拖曳畫布\n- 按住 Shift 鍵並點選左鍵可批次選取", @@ -232,5 +245,7 @@ "workflow.My edit": "我的編輯", "workflow.Switch_success": "切換成功", "workflow.Team cloud": "團隊雲端", - "workflow.exit_tips": "您的變更尚未儲存,「直接結束」將不會儲存您的編輯紀錄。" + "workflow.exit_tips": "您的變更尚未儲存,「直接結束」將不會儲存您的編輯紀錄。", + "batch_end_output_label": "選擇變數,作為批量執行的結果輸出", + "batch_no_interactive_node": "大量執行節點內不允許存在互動節點" } diff --git a/projects/app/src/components/core/chat/components/WholeResponseModal.tsx b/projects/app/src/components/core/chat/components/WholeResponseModal.tsx index 9e738ad3e819..ac47f6f436d3 100644 --- a/projects/app/src/components/core/chat/components/WholeResponseModal.tsx +++ b/projects/app/src/components/core/chat/components/WholeResponseModal.tsx @@ -152,14 +152,16 @@ export const WholeResponseContent = ({ )} {(activeModule?.childrenResponses || activeModule.toolDetail || - activeModule.pluginDetail) && ( + activeModule.pluginDetail || + activeModule.batchDetail) && ( sum + (item.totalPoints || 0), 0) || 0 )} /> @@ -496,6 +498,10 @@ export const WholeResponseContent = ({ {/* loop */} + + + + {/* loopStart */} { let children: sideTabItemType[] = []; if ( - !!(item?.toolDetail || item?.pluginDetail || item?.loopDetail || item?.childrenResponses) + !!( + item?.toolDetail || + item?.pluginDetail || + item?.loopDetail || + item?.batchDetail || + item?.childrenResponses + ) ) { if (item?.toolDetail) children.push(...pretreatmentResponse(item?.toolDetail)); if (item?.pluginDetail) children.push(...pretreatmentResponse(item?.pluginDetail)); if (item?.loopDetail) children.push(...pretreatmentResponse(item?.loopDetail)); + if (item?.batchDetail) children.push(...pretreatmentResponse(item?.batchDetail)); if (item?.childrenResponses) children.push(...pretreatmentResponse(item?.childrenResponses)); } diff --git a/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/components/NodeTemplates/list.tsx b/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/components/NodeTemplates/list.tsx index 3411f23d875d..538e05e3fe81 100644 --- a/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/components/NodeTemplates/list.tsx +++ b/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/components/NodeTemplates/list.tsx @@ -273,7 +273,10 @@ const NodeTemplateList = ({ }); const currentNode = getNodeById(handleParams?.nodeId); - if (templateNode.flowNodeType === FlowNodeTypeEnum.loop && !!currentNode?.parentNodeId) { + if ( + [FlowNodeTypeEnum.loop, FlowNodeTypeEnum.batch].includes(templateNode.flowNodeType) && + !!currentNode?.parentNodeId + ) { toast({ status: 'warning', title: t('workflow:can_not_loop') @@ -319,7 +322,7 @@ const NodeTemplateList = ({ const newNodes = [newNode]; - if (templateNode.flowNodeType === FlowNodeTypeEnum.loop) { + if ([FlowNodeTypeEnum.loop, FlowNodeTypeEnum.batch].includes(templateNode.flowNodeType)) { const startNode = nodeTemplate2FlowNode({ template: LoopStartNode, position: { x: position.x + 60, y: position.y + 280 }, diff --git a/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/components/NodeTemplates/useNodeTemplates.tsx b/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/components/NodeTemplates/useNodeTemplates.tsx index 0c4c7daa3be2..d17f1cf9a980 100644 --- a/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/components/NodeTemplates/useNodeTemplates.tsx +++ b/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/components/NodeTemplates/useNodeTemplates.tsx @@ -38,6 +38,10 @@ export const useNodeTemplates = () => { if (templateType === TemplateTypeEnum.basic) { return basicNodeTemplates .filter((item) => { + // hide deprecated templates from add panel + if (item.abandon) { + return false; + } // unique node filter if (item.unique) { const nodeExist = getNodeList().some( diff --git a/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/hooks/useKeyboard.tsx b/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/hooks/useKeyboard.tsx index 18211f898347..236c076fd8dd 100644 --- a/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/hooks/useKeyboard.tsx +++ b/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/hooks/useKeyboard.tsx @@ -68,7 +68,10 @@ export const useKeyboard = () => { if (!Array.isArray(parseData)) return; // filter workflow data const filteredData = parseData.filter( - (item) => !!item.type && item.data?.unique !== true && item.type !== FlowNodeTypeEnum.loop + (item) => + !!item.type && + item.data?.unique !== true && + ![FlowNodeTypeEnum.loop, FlowNodeTypeEnum.batch].includes(item.type as FlowNodeTypeEnum) ); if (filteredData.length === 0) return; diff --git a/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/hooks/useWorkflow.tsx b/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/hooks/useWorkflow.tsx index 4b285330260f..1c860d6b8763 100644 --- a/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/hooks/useWorkflow.tsx +++ b/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/hooks/useWorkflow.tsx @@ -392,7 +392,7 @@ const useRAF = () => { export const popoverWidth = 400; export const popoverHeight = 600; // Loop 类型的父节点类型集合 -const PARENT_NODE_TYPES = new Set([FlowNodeTypeEnum.loop]); +const PARENT_NODE_TYPES = new Set([FlowNodeTypeEnum.loop, FlowNodeTypeEnum.batch]); export const useWorkflow = () => { const { toast } = useToast(); @@ -432,10 +432,17 @@ export const useWorkflow = () => { const unSupportedTypes = [ FlowNodeTypeEnum.workflowStart, FlowNodeTypeEnum.loop, + FlowNodeTypeEnum.batch, FlowNodeTypeEnum.pluginInput, FlowNodeTypeEnum.pluginOutput, FlowNodeTypeEnum.systemConfig ]; + const batchOnlyUnSupportedTypes = [ + FlowNodeTypeEnum.userSelect, + FlowNodeTypeEnum.formInput, + FlowNodeTypeEnum.variableUpdate + ]; + const batchInteractiveNodeTypes = [FlowNodeTypeEnum.userSelect, FlowNodeTypeEnum.formInput]; if (!node || node.data.parentNodeId) return; @@ -443,14 +450,28 @@ export const useWorkflow = () => { const intersections = getIntersectingNodes(node); // 获取所有与当前节点相交的节点中,类型为 loop 的节点且它不能是折叠状态 const parentNode = intersections.find( - (item) => !item.data.isFolded && item.type === FlowNodeTypeEnum.loop + (item) => + !item.data.isFolded && + [FlowNodeTypeEnum.loop, FlowNodeTypeEnum.batch].includes(item.type as FlowNodeTypeEnum) ); if (parentNode) { - if (unSupportedTypes.includes(node.type as FlowNodeTypeEnum)) { + const parentType = parentNode.type as FlowNodeTypeEnum; + const currentNodeType = node.type as FlowNodeTypeEnum; + const isUnsupportedForCommonParent = unSupportedTypes.includes(currentNodeType); + const isUnsupportedForBatchParent = + parentType === FlowNodeTypeEnum.batch && + batchOnlyUnSupportedTypes.includes(currentNodeType); + const isInteractiveInBatch = + parentType === FlowNodeTypeEnum.batch && + batchInteractiveNodeTypes.includes(currentNodeType); + + if (isUnsupportedForCommonParent || isUnsupportedForBatchParent) { return toast({ status: 'warning', - title: t('workflow:can_not_loop') + title: isInteractiveInBatch + ? t('workflow:batch_no_interactive_node') + : t('workflow:can_not_loop') }); } diff --git a/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/index.tsx b/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/index.tsx index 0d94f2037192..898b74f7a2bf 100644 --- a/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/index.tsx +++ b/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/index.tsx @@ -61,6 +61,7 @@ const nodeTypes: Record = { [FlowNodeTypeEnum.code]: dynamic(() => import('./nodes/NodeCode')), [FlowNodeTypeEnum.userSelect]: dynamic(() => import('./nodes/NodeUserSelect')), [FlowNodeTypeEnum.loop]: dynamic(() => import('./nodes/Loop/NodeLoop')), + [FlowNodeTypeEnum.batch]: dynamic(() => import('./nodes/Loop/NodeLoop')), [FlowNodeTypeEnum.loopStart]: dynamic(() => import('./nodes/Loop/NodeLoopStart')), [FlowNodeTypeEnum.loopEnd]: dynamic(() => import('./nodes/Loop/NodeLoopEnd')), [FlowNodeTypeEnum.formInput]: dynamic(() => import('./nodes/NodeFormInput')), diff --git a/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/nodes/Loop/NodeLoop.tsx b/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/nodes/Loop/NodeLoop.tsx index f8b2183588aa..cbee2288ba99 100644 --- a/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/nodes/Loop/NodeLoop.tsx +++ b/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/nodes/Loop/NodeLoop.tsx @@ -20,6 +20,10 @@ import { VARIABLE_NODE_ID, WorkflowIOValueTypeEnum } from '@fastgpt/global/core/workflow/constants'; +import { + FlowNodeInputTypeEnum, + FlowNodeTypeEnum +} from '@fastgpt/global/core/workflow/node/constant'; import { Input_Template_Children_Node_List, Input_Template_LOOP_NODE_OFFSET @@ -37,7 +41,7 @@ import { useMemoEnhance } from '@fastgpt/web/hooks/useMemoEnhance'; const NodeLoop = ({ data, selected }: NodeProps) => { const { t } = useTranslation(); - const { nodeId, inputs, outputs, isFolded } = data; + const { nodeId, inputs, outputs, isFolded, flowNodeType } = data; const { getNodeById, nodeIds, nodeAmount, getNodeList, systemConfigNode } = useContextSelector( WorkflowBufferDataContext, (v) => v @@ -108,6 +112,106 @@ const NodeLoop = ({ data, selected }: NodeProps) => { }); }, [loopInputArray, newValueType, nodeId, onChangeNode]); + // Normalize batch numeric inputs to keep UI and backend behavior consistent. + useEffect(() => { + if (flowNodeType !== FlowNodeTypeEnum.batch) return; + + const normalizeInput = ({ + key, + min, + max, + defaultValue + }: { + key: NodeInputKeyEnum; + min: number; + max: number; + defaultValue: number; + }) => { + const input = inputs.find((item) => item.key === key); + if (!input) return; + + const num = Math.floor(Number(input.value)); + const nextValue = Number.isFinite(num) ? Math.max(min, Math.min(max, num)) : defaultValue; + + if (input.value === nextValue) return; + + onChangeNode({ + nodeId, + type: 'updateInput', + key, + value: { + ...input, + value: nextValue + } + }); + }; + + normalizeInput({ + key: NodeInputKeyEnum.batchParallelConcurrency, + min: 1, + max: 10, + defaultValue: 5 + }); + normalizeInput({ + key: NodeInputKeyEnum.batchParallelRetryTimes, + min: 0, + max: 5, + defaultValue: 3 + }); + }, [flowNodeType, inputs, nodeId, onChangeNode]); + + useEffect(() => { + if (flowNodeType !== FlowNodeTypeEnum.batch) return; + + const expectedConcurrency = [ + FlowNodeInputTypeEnum.numberInput, + FlowNodeInputTypeEnum.reference + ] as const; + const concurrencyInput = inputs.find( + (i) => i.key === NodeInputKeyEnum.batchParallelConcurrency + ); + if (concurrencyInput) { + const list = concurrencyInput.renderTypeList || []; + const matches = + list.length === expectedConcurrency.length && + list.every((t, i) => t === expectedConcurrency[i]); + if (!matches) { + onChangeNode({ + nodeId, + type: 'updateInput', + key: NodeInputKeyEnum.batchParallelConcurrency, + value: { + ...concurrencyInput, + renderTypeList: [...expectedConcurrency], + selectedTypeIndex: Math.min(Math.max(concurrencyInput.selectedTypeIndex ?? 0, 0), 1) + } + }); + } + } + + const expectedRetry = [FlowNodeInputTypeEnum.numberInput] as const; + const retryInput = inputs.find((i) => i.key === NodeInputKeyEnum.batchParallelRetryTimes); + if (retryInput) { + const list = retryInput.renderTypeList || []; + const matches = list.length === expectedRetry.length && list[0] === expectedRetry[0]; + if (!matches) { + const num = Math.floor(Number(retryInput.value)); + const nextValue = Number.isFinite(num) ? Math.max(0, Math.min(5, num)) : 3; + onChangeNode({ + nodeId, + type: 'updateInput', + key: NodeInputKeyEnum.batchParallelRetryTimes, + value: { + ...retryInput, + renderTypeList: [...expectedRetry], + selectedTypeIndex: 0, + value: nextValue + } + }); + } + } + }, [flowNodeType, inputs, nodeId, onChangeNode]); + // Update childrenNodeIdList const childrenNodeIdList = useMemoEnhance(() => { return getNodeList() @@ -161,7 +265,9 @@ const NodeLoop = ({ data, selected }: NodeProps) => { <> - {t('workflow:loop_body')} + {flowNodeType === FlowNodeTypeEnum.batch + ? t('workflow:execution_logic') + : t('workflow:loop_body')} ) => { } }, [valueType, nodeId, onChangeNode, parentNodeId, getNodeById]); + const intro = data.intro && String(data.intro).trim() ? data.intro : 'workflow:loop_end_intro'; + return ( { }, [foldedNodesMap, getNodeById, nodeId]); const isAppNode = node && AppNodeFlowNodeTypeMap[node?.flowNodeType]; - const isLoopNode = node?.flowNodeType === FlowNodeTypeEnum.loop; + const isLoopNode = [FlowNodeTypeEnum.loop, FlowNodeTypeEnum.batch].includes( + node?.flowNodeType as FlowNodeTypeEnum + ); const showVersion = useMemo(() => { // 1. MCP tool, HTTP tool set and system tool set do not have version if ( isAppNode && - (node.toolConfig?.mcpToolSet || + ( + node.toolConfig?.mcpToolSet || node.toolConfig?.mcpTool || node?.toolConfig?.httpToolSet || - node?.toolConfig?.systemToolSet) + node?.toolConfig?.systemToolSet + ) ) return false; // 2. Team app/System commercial plugin diff --git a/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/nodes/render/RenderInput/templates/Reference.tsx b/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/nodes/render/RenderInput/templates/Reference.tsx index 595bfaac2dd7..230291df7054 100644 --- a/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/nodes/render/RenderInput/templates/Reference.tsx +++ b/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/nodes/render/RenderInput/templates/Reference.tsx @@ -152,7 +152,9 @@ const Reference = ({ item, nodeId }: RenderInputProps) => { const popDirection = useMemo(() => { const node = getNodeById(nodeId); if (!node) return 'bottom'; - return node.flowNodeType === FlowNodeTypeEnum.loop ? 'top' : 'bottom'; + return [FlowNodeTypeEnum.loop, FlowNodeTypeEnum.batch].includes(node.flowNodeType) + ? 'top' + : 'bottom'; }, [nodeId, getNodeById]); return (