@@ -5,9 +5,16 @@ package ck
55
66import (
77 "context"
8+ "fmt"
9+ "strings"
810
911 "github.com/coze-dev/coze-loop/backend/infra/ck"
1012 "github.com/coze-dev/coze-loop/backend/modules/observability/infra/repo/ck/gorm_gen/model"
13+ obErrorx "github.com/coze-dev/coze-loop/backend/modules/observability/pkg/errno"
14+ "github.com/coze-dev/coze-loop/backend/pkg/errorx"
15+ "github.com/coze-dev/coze-loop/backend/pkg/logs"
16+ "gorm.io/gorm"
17+ "gorm.io/gorm/clause"
1118)
1219
1320type InsertAnnotationParam struct {
@@ -50,13 +57,147 @@ type AnnotationCkDaoImpl struct {
5057}
5158
5259func (a * AnnotationCkDaoImpl ) Insert (ctx context.Context , params * InsertAnnotationParam ) error {
53- return nil
60+ if params == nil || len (params .Annotations ) == 0 {
61+ return errorx .NewByCode (obErrorx .CommercialCommonInvalidParamCodeCode )
62+ }
63+ db := a .db .NewSession (ctx )
64+ retryTimes := 3
65+ var lastErr error
66+ for i := 0 ; i < retryTimes ; i ++ {
67+ if err := db .Table (params .Table ).Create (params .Annotations ).Error ; err != nil {
68+ lastErr = err
69+ } else {
70+ return nil
71+ }
72+ }
73+ return errorx .WrapByCode (lastErr , obErrorx .CommercialCommonInternalErrorCodeCode )
5474}
5575
5676func (a * AnnotationCkDaoImpl ) Get (ctx context.Context , params * GetAnnotationParam ) (* model.ObservabilityAnnotation , error ) {
57- return nil , nil
77+ if params == nil || params .ID == "" {
78+ return nil , errorx .NewByCode (obErrorx .CommercialCommonInvalidParamCodeCode )
79+ }
80+ db , err := a .buildSql (ctx , & annoSqlParam {
81+ Tables : params .Tables ,
82+ StartTime : params .StartTime ,
83+ EndTime : params .EndTime ,
84+ ID : params .ID ,
85+ Limit : 1 ,
86+ })
87+ if err != nil {
88+ return nil , err
89+ }
90+ logs .CtxInfo (ctx , "Get Annotation SQL: %s" , db .ToSQL (func (tx * gorm.DB ) * gorm.DB {
91+ return tx .Find (nil )
92+ }))
93+ var annotations []* model.ObservabilityAnnotation
94+ if err := db .Find (& annotations ).Error ; err != nil {
95+ return nil , err
96+ }
97+ if len (annotations ) == 0 {
98+ return nil , nil
99+ } else if len (annotations ) > 1 {
100+ logs .CtxWarn (ctx , "multiple annotations found" )
101+ }
102+ return annotations [0 ], nil
58103}
59104
60105func (a * AnnotationCkDaoImpl ) List (ctx context.Context , params * ListAnnotationsParam ) ([]* model.ObservabilityAnnotation , error ) {
61- return nil , nil
106+ if params == nil || len (params .SpanIDs ) == 0 {
107+ return nil , nil
108+ }
109+ db , err := a .buildSql (ctx , & annoSqlParam {
110+ Tables : params .Tables ,
111+ StartTime : params .StartTime ,
112+ EndTime : params .EndTime ,
113+ SpanIDs : params .SpanIDs ,
114+ DescByUpdatedAt : params .DescByUpdatedAt ,
115+ Limit : params .Limit ,
116+ })
117+ if err != nil {
118+ return nil , err
119+ }
120+ logs .CtxInfo (ctx , "List Annotations SQL: %s" , db .ToSQL (func (tx * gorm.DB ) * gorm.DB {
121+ return tx .Find (nil )
122+ }))
123+ var annotations []* model.ObservabilityAnnotation
124+ if err := db .Find (& annotations ).Error ; err != nil {
125+ return nil , err
126+ }
127+ return annotations , nil
128+ }
129+
130+ type annoSqlParam struct {
131+ Tables []string
132+ StartTime int64
133+ EndTime int64
134+ ID string
135+ SpanIDs []string
136+ DescByUpdatedAt bool
137+ Limit int32
138+ }
139+
140+ func (a * AnnotationCkDaoImpl ) buildSql (ctx context.Context , param * annoSqlParam ) (* gorm.DB , error ) {
141+ db := a .db .NewSession (ctx )
142+ var tableQueries []* gorm.DB
143+ for _ , table := range param .Tables {
144+ query , err := a .buildSingleSql (ctx , db , table , param )
145+ if err != nil {
146+ return nil , err
147+ }
148+ tableQueries = append (tableQueries , query )
149+ }
150+ if len (tableQueries ) == 0 {
151+ return nil , fmt .Errorf ("no table configured" )
152+ } else if len (tableQueries ) == 1 {
153+ query := tableQueries [0 ].ToSQL (func (tx * gorm.DB ) * gorm.DB {
154+ return tx .Find (nil )
155+ })
156+ query += " SETTINGS final = 1"
157+ return db .Raw (query ), nil
158+ } else {
159+ queries := make ([]string , 0 )
160+ for i := 0 ; i < len (tableQueries ); i ++ {
161+ query := tableQueries [i ].ToSQL (func (tx * gorm.DB ) * gorm.DB {
162+ return tx .Find (nil )
163+ })
164+ queries = append (queries , "(" + query + ")" )
165+ }
166+ sql := fmt .Sprintf ("SELECT * FROM (%s)" , strings .Join (queries , " UNION ALL " ))
167+ if param .DescByUpdatedAt {
168+ sql += " ORDER BY updated_at DESC"
169+ } else {
170+ sql += " ORDER BY created_at ASC"
171+ }
172+ sql += fmt .Sprintf (" LIMIT %d SETTINGS final = 1" , param .Limit )
173+ return db .Raw (sql ), nil
174+ }
175+ }
176+
177+ // buildSingleSql 构建单表查询SQL
178+ func (a * AnnotationCkDaoImpl ) buildSingleSql (ctx context.Context , db * gorm.DB , tableName string , param * annoSqlParam ) (* gorm.DB , error ) {
179+ sqlQuery := db .
180+ Table (tableName ).
181+ Where ("deleted_at = 0" )
182+
183+ if param .ID != "" {
184+ sqlQuery = sqlQuery .Where ("id = ?" , param .ID )
185+ }
186+ if len (param .SpanIDs ) > 0 {
187+ sqlQuery = sqlQuery .Where ("span_id IN (?)" , param .SpanIDs )
188+ }
189+ sqlQuery = sqlQuery .
190+ Where ("start_time >= ?" , param .StartTime ).
191+ Where ("start_time <= ?" , param .EndTime )
192+ if param .DescByUpdatedAt {
193+ sqlQuery = sqlQuery .Order (clause.OrderBy {Columns : []clause.OrderByColumn {
194+ {Column : clause.Column {Name : "updated_at" }, Desc : true },
195+ }})
196+ } else {
197+ sqlQuery = sqlQuery .Order (clause.OrderBy {Columns : []clause.OrderByColumn {
198+ {Column : clause.Column {Name : "created_at" }, Desc : false },
199+ }})
200+ }
201+ sqlQuery = sqlQuery .Limit (int (param .Limit ))
202+ return sqlQuery , nil
62203}
0 commit comments