Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions backend/modules/observability/application/convertor/page.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2025 coze-dev Authors
// SPDX-License-Identifier: Apache-2.0

package convertor

import (
"github.com/coze-dev/coze-loop/backend/kitex_gen/coze/loop/observability/domain/common"
entity "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/common"
"github.com/coze-dev/coze-loop/backend/pkg/lang/ptr"
)

func OrderByDTO2DO(orderBy *common.OrderBy) *entity.OrderBy {
if orderBy == nil {
return nil
}
return &entity.OrderBy{
Field: orderBy.GetField(),
IsAsc: orderBy.GetIsAsc(),
}
}

func OrderByDO2DTO(orderBy *entity.OrderBy) *common.OrderBy {
if orderBy == nil {
return nil
}
return &common.OrderBy{
Field: ptr.Of(orderBy.Field),
IsAsc: ptr.Of(orderBy.IsAsc),
}
}
103 changes: 103 additions & 0 deletions backend/modules/observability/application/convertor/task/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright (c) 2025 coze-dev Authors
// SPDX-License-Identifier: Apache-2.0

package task

import (
"github.com/coze-dev/coze-loop/backend/kitex_gen/coze/loop/observability/domain/filter"
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/task/entity"
"github.com/coze-dev/coze-loop/backend/pkg/lang/ptr"
)

func TaskFiltersDTO2DO(filters *filter.TaskFilterFields) *entity.TaskFilterFields {
if filters == nil {
return nil
}
result := &entity.TaskFilterFields{}
if filters.QueryAndOr != nil {
relation := entity.QueryRelation(*filters.QueryAndOr)
result.QueryAndOr = &relation
}
if len(filters.FilterFields) == 0 {
return result
}
result.FilterFields = make([]*entity.TaskFilterField, 0, len(filters.FilterFields))
for _, field := range filters.FilterFields {
if field == nil {
continue
}
result.FilterFields = append(result.FilterFields, taskFilterFieldDTO2DO(field))
}
return result
}

func taskFilterFieldDTO2DO(field *filter.TaskFilterField) *entity.TaskFilterField {
if field == nil {
return nil
}
result := &entity.TaskFilterField{
Values: append([]string(nil), field.Values...),
SubFilter: taskFilterFieldDTO2DO(field.SubFilter),
}
if field.FieldName != nil {
name := entity.TaskFieldName(*field.FieldName)
result.FieldName = &name
}
if field.FieldType != nil {
fieldType := entity.FieldType(*field.FieldType)
result.FieldType = &fieldType
}
if field.QueryType != nil {
queryType := entity.QueryType(*field.QueryType)
result.QueryType = &queryType
}
if field.QueryAndOr != nil {
relation := entity.QueryRelation(*field.QueryAndOr)
result.QueryAndOr = &relation
}
return result
}

func TaskFiltersDO2DTO(filters *entity.TaskFilterFields) *filter.TaskFilterFields {
if filters == nil {
return nil
}
result := &filter.TaskFilterFields{}
if filters.QueryAndOr != nil {
result.QueryAndOr = ptr.Of(filter.QueryRelation(*filters.QueryAndOr))
}
if len(filters.FilterFields) == 0 {
return result
}
result.FilterFields = make([]*filter.TaskFilterField, 0, len(filters.FilterFields))
for _, field := range filters.FilterFields {
if field == nil {
continue
}
result.FilterFields = append(result.FilterFields, taskFilterFieldDO2DTO(field))
}
return result
}

func taskFilterFieldDO2DTO(field *entity.TaskFilterField) *filter.TaskFilterField {
if field == nil {
return nil
}
result := &filter.TaskFilterField{
Values: append([]string(nil), field.Values...),
SubFilter: taskFilterFieldDO2DTO(field.SubFilter),
}
if field.FieldName != nil {
result.FieldName = ptr.Of(string(*field.FieldName))
}
if field.FieldType != nil {
result.FieldType = ptr.Of(filter.FieldType(*field.FieldType))
}
if field.QueryType != nil {
result.QueryType = ptr.Of(filter.QueryType(*field.QueryType))
}
if field.QueryAndOr != nil {
result.QueryAndOr = ptr.Of(filter.QueryRelation(*field.QueryAndOr))
}
return result
}
130 changes: 20 additions & 110 deletions backend/modules/observability/application/convertor/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
"github.com/coze-dev/coze-loop/backend/modules/observability/application/convertor"
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/task/entity"
entity_common "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/common"
obErrorx "github.com/coze-dev/coze-loop/backend/modules/observability/pkg/errno"
"github.com/coze-dev/coze-loop/backend/pkg/errorx"
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span"
"github.com/coze-dev/coze-loop/backend/pkg/lang/ptr"
"github.com/coze-dev/coze-loop/backend/pkg/lang/slices"
"github.com/coze-dev/coze-loop/backend/pkg/logs"
"github.com/samber/lo"
)

func TaskDOs2DTOs(ctx context.Context, taskPOs []*entity.ObservabilityTask, userInfos map[string]*entity_common.UserInfo) []*task.Task {
Expand Down Expand Up @@ -61,8 +61,8 @@ func TaskDO2DTO(ctx context.Context, v *entity.ObservabilityTask, userMap map[st
Name: v.Name,
Description: v.Description,
WorkspaceID: ptr.Of(v.WorkspaceID),
TaskType: v.TaskType,
TaskStatus: ptr.Of(v.TaskStatus),
TaskType: task.TaskType(v.TaskType),
TaskStatus: ptr.Of(task.TaskStatus(v.TaskStatus)),
Rule: RuleDO2DTO(v.SpanFilter, v.EffectiveTime, v.Sampler, v.BackfillEffectiveTime),
TaskConfig: TaskConfigDO2DTO(v.TaskConfig),
TaskDetail: taskDetail,
Expand All @@ -84,8 +84,8 @@ func TaskRunDO2DTO(ctx context.Context, v *entity.TaskRun, userMap map[string]*e
ID: v.ID,
WorkspaceID: v.WorkspaceID,
TaskID: v.TaskID,
TaskType: v.TaskType,
RunStatus: v.RunStatus,
TaskType: task.TaskRunType(v.TaskType),
RunStatus: task.RunStatus(v.RunStatus),
RunDetail: RunDetailDO2DTO(v.RunDetail),
BackfillRunDetail: BackfillRunDetailDO2DTO(v.BackfillDetail),
RunStartAt: v.RunStartAt.UnixMilli(),
Expand Down Expand Up @@ -177,8 +177,8 @@ func SpanFilterDO2DTO(spanFilter *entity.SpanFilterFields) *filter.SpanFilterFie

return &filter.SpanFilterFields{
Filters: convertor.FilterFieldsDO2DTO(&spanFilter.Filters),
PlatformType: &spanFilter.PlatformType,
SpanListType: &spanFilter.SpanListType,
PlatformType: lo.ToPtr(common.PlatformType(spanFilter.PlatformType)),
SpanListType: lo.ToPtr(common.SpanListType(spanFilter.SpanListType)),
}
}

Expand All @@ -204,7 +204,7 @@ func SamplerDO2DTO(sampler *entity.Sampler) *task.Sampler {
IsCycle: ptr.Of(sampler.IsCycle),
CycleCount: ptr.Of(sampler.CycleCount),
CycleInterval: ptr.Of(sampler.CycleInterval),
CycleTimeUnit: ptr.Of(sampler.CycleTimeUnit),
CycleTimeUnit: ptr.Of(string(sampler.CycleTimeUnit)),
}
}

Expand Down Expand Up @@ -305,7 +305,7 @@ func UserInfoPO2DO(userInfo *entity_common.UserInfo, userID string) *common.User
}
}

func TaskDTO2DO(taskDTO *task.Task, userID string, spanFilters *entity.SpanFilterFields) *entity.ObservabilityTask {
func TaskDTO2DO(taskDTO *task.Task) *entity.ObservabilityTask {
if taskDTO == nil {
return nil
}
Expand All @@ -316,31 +316,16 @@ func TaskDTO2DO(taskDTO *task.Task, userID string, spanFilters *entity.SpanFilte
if taskDTO.GetBaseInfo().GetUpdatedBy() != nil {
updatedBy = taskDTO.GetBaseInfo().GetUpdatedBy().GetUserID()
}
if userID != "" {
createdBy = userID
updatedBy = userID
} else {
if taskDTO.GetBaseInfo().GetCreatedBy() != nil {
createdBy = taskDTO.GetBaseInfo().GetCreatedBy().GetUserID()
}
if taskDTO.GetBaseInfo().GetUpdatedBy() != nil {
updatedBy = taskDTO.GetBaseInfo().GetUpdatedBy().GetUserID()
}
}
var spanFilterDO *entity.SpanFilterFields
if spanFilters != nil {
spanFilterDO = spanFilters
} else {
spanFilterDO = SpanFilterDTO2DO(taskDTO.GetRule().GetSpanFilters())
}

spanFilterDO := SpanFilterDTO2DO(taskDTO.GetRule().GetSpanFilters())

return &entity.ObservabilityTask{
ID: taskDTO.GetID(),
WorkspaceID: taskDTO.GetWorkspaceID(),
Name: taskDTO.GetName(),
Description: ptr.Of(taskDTO.GetDescription()),
TaskType: taskDTO.GetTaskType(),
TaskStatus: taskDTO.GetTaskStatus(),
TaskType: entity.TaskType(taskDTO.GetTaskType()),
TaskStatus: entity.TaskStatus(taskDTO.GetTaskStatus()),
TaskDetail: RunDetailDTO2DO(taskDTO.GetTaskDetail()),
SpanFilter: spanFilterDO,
EffectiveTime: EffectiveTimeDTO2DO(taskDTO.GetRule().GetEffectiveTime()),
Expand All @@ -359,8 +344,8 @@ func SpanFilterDTO2DO(spanFilterFields *filter.SpanFilterFields) *entity.SpanFil
return nil
}
return &entity.SpanFilterFields{
PlatformType: *spanFilterFields.PlatformType,
SpanListType: *spanFilterFields.SpanListType,
PlatformType: loop_span.PlatformType(*spanFilterFields.PlatformType),
SpanListType: loop_span.SpanListType(*spanFilterFields.SpanListType),
Filters: *convertor.FilterFieldsDTO2DO(spanFilterFields.Filters),
}
}
Expand Down Expand Up @@ -396,7 +381,7 @@ func SamplerDTO2DO(sampler *task.Sampler) *entity.Sampler {
IsCycle: sampler.GetIsCycle(),
CycleCount: sampler.GetCycleCount(),
CycleInterval: sampler.GetCycleInterval(),
CycleTimeUnit: sampler.GetCycleTimeUnit(),
CycleTimeUnit: entity.TimeUnit(sampler.GetCycleTimeUnit()),
}
}

Expand All @@ -408,6 +393,7 @@ func TaskConfigDTO2DO(taskConfig *task.TaskConfig) *entity.TaskConfig {
for _, autoEvaluateConfig := range taskConfig.AutoEvaluateConfigs {
var fieldMappings []*entity.EvaluateFieldMapping
if len(autoEvaluateConfig.FieldMappings) > 0 {
// todo tyf 这段逻辑挪到service层
var evalSetNames []string
jspnPathMapping := make(map[string]string)
for _, config := range autoEvaluateConfig.FieldMappings {
Expand Down Expand Up @@ -471,8 +457,8 @@ func TaskRunDTO2DO(taskRun *task.TaskRun) *entity.TaskRun {
ID: taskRun.ID,
TaskID: taskRun.TaskID,
WorkspaceID: taskRun.WorkspaceID,
TaskType: taskRun.TaskType,
RunStatus: taskRun.RunStatus,
TaskType: entity.TaskRunType(taskRun.TaskType),
RunStatus: entity.TaskRunStatus(taskRun.RunStatus),
RunDetail: RunDetailDTO2DO(taskRun.RunDetail),
BackfillDetail: BackfillRunDetailDTO2DO(taskRun.BackfillRunDetail),
RunStartAt: time.UnixMilli(taskRun.RunStartAt),
Expand Down Expand Up @@ -531,82 +517,6 @@ func BackfillRunDetailDTO2DO(v *task.BackfillDetail) *entity.BackfillDetail {
}
}

func CheckEffectiveTime(ctx context.Context, effectiveTime *task.EffectiveTime, taskStatus task.TaskStatus, effectiveTimeDO *entity.EffectiveTime) (*entity.EffectiveTime, error) {
if effectiveTimeDO == nil {
logs.CtxError(ctx, "EffectiveTimePO2DO error")
return nil, errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode, errorx.WithExtraMsg("effective time is nil"))
}
var validEffectiveTime entity.EffectiveTime
// 开始时间不能大于结束时间
if effectiveTime.GetStartAt() >= effectiveTime.GetEndAt() {
logs.CtxError(ctx, "Start time must be less than end time")
return nil, errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode, errorx.WithExtraMsg("start time must be less than end time"))
}
// 开始、结束时间不能小于当前时间
if effectiveTimeDO.StartAt != effectiveTime.GetStartAt() && effectiveTime.GetStartAt() < time.Now().UnixMilli() {
logs.CtxError(ctx, "update time must be greater than current time")
return nil, errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode, errorx.WithExtraMsg("start time must be greater than current time"))
}
if effectiveTimeDO.EndAt != effectiveTime.GetEndAt() && effectiveTime.GetEndAt() < time.Now().UnixMilli() {
logs.CtxError(ctx, "update time must be greater than current time")
return nil, errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode, errorx.WithExtraMsg("start time must be greater than current time"))
}
validEffectiveTime.StartAt = effectiveTimeDO.StartAt
validEffectiveTime.EndAt = effectiveTimeDO.EndAt
switch taskStatus {
case task.TaskStatusUnstarted:
if validEffectiveTime.StartAt != 0 {
validEffectiveTime.StartAt = *effectiveTime.StartAt
}
if validEffectiveTime.EndAt != 0 {
validEffectiveTime.EndAt = *effectiveTime.EndAt
}
case task.TaskStatusRunning, task.TaskStatusPending:
if validEffectiveTime.EndAt != 0 {
validEffectiveTime.EndAt = *effectiveTime.EndAt
}
default:
logs.CtxError(ctx, "Invalid task status:%s", taskStatus)
return nil, errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode, errorx.WithExtraMsg("invalid task status"))
}
return &validEffectiveTime, nil
}

func CheckTaskStatus(ctx context.Context, taskStatus task.TaskStatus, currentTaskStatus task.TaskStatus) (task.TaskStatus, error) {
var validTaskStatus task.TaskStatus
// [0530]todo: 任务状态校验
switch taskStatus {
case task.TaskStatusUnstarted:
if currentTaskStatus == task.TaskStatusUnstarted {
validTaskStatus = taskStatus
} else {
logs.CtxError(ctx, "Invalid task status:%s", taskStatus)
return "", errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode, errorx.WithExtraMsg("invalid task status"))
}
case task.TaskStatusRunning:
if currentTaskStatus == task.TaskStatusUnstarted || currentTaskStatus == task.TaskStatusPending {
validTaskStatus = taskStatus
} else {
logs.CtxError(ctx, "Invalid task status:%s,currentTaskStatus:%s", taskStatus, currentTaskStatus)
return "", errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode, errorx.WithExtraMsg("invalid task status"))
}
case task.TaskStatusPending:
if currentTaskStatus == task.TaskStatusRunning {
validTaskStatus = task.TaskStatusPending
}
case task.TaskStatusDisabled:
if currentTaskStatus == task.TaskStatusUnstarted || currentTaskStatus == task.TaskStatusPending {
validTaskStatus = task.TaskStatusDisabled
}
case task.TaskStatusSuccess:
if currentTaskStatus != task.TaskStatusSuccess {
validTaskStatus = task.TaskStatusSuccess
}
}

return validTaskStatus, nil
}

func getLastPartAfterDot(s string) string {
s = strings.TrimRight(s, ".")
lastDotIndex := strings.LastIndex(s, ".")
Expand Down
Loading
Loading