Skip to content

Commit c5273da

Browse files
author
Przemysław Stępień
authored
feat: Add handling of error messages to sdk (#2195)
#### Summary <!-- 🎉 Thank you for making CloudQuery awesome by submitting a PR 🎉 --> <!-- Explain what problem this PR addresses --> This PR requires changes to plugin-pb --- Use the following steps to ensure your PR is ready to be reviewed - [ ] Read the [contribution guidelines](../blob/main/CONTRIBUTING.md) 🧑‍🎓 - [ ] Run `go fmt` to format your code 🖊 - [ ] Lint your changes via `golangci-lint run` 🚨 (install golangci-lint [here](https://golangci-lint.run/usage/install/#local-installation)) - [ ] Update or add tests 🧪 - [ ] Ensure the status checks below are successful ✅
1 parent 140b6f3 commit c5273da

File tree

8 files changed

+48
-3
lines changed

8 files changed

+48
-3
lines changed

internal/servers/plugin/v3/plugin.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,16 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error {
259259
WhereClause: whereClause,
260260
},
261261
}
262+
case *message.SyncError:
263+
if !req.WithErrorMessages {
264+
continue
265+
}
266+
pbMsg.Message = &pb.Sync_Response_Error{
267+
Error: &pb.Sync_MessageError{
268+
TableName: m.TableName,
269+
Error: m.Error,
270+
},
271+
}
262272
default:
263273
return status.Errorf(codes.Internal, "unknown message type: %T", msg)
264274
}

message/sync_message.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,13 @@ type SyncDeleteRecord struct {
121121
func (m SyncDeleteRecord) GetTable() *schema.Table {
122122
return &schema.Table{Name: m.TableName}
123123
}
124+
125+
type SyncError struct {
126+
syncBaseMessage
127+
TableName string
128+
Error string
129+
}
130+
131+
func (e SyncError) GetTable() *schema.Table {
132+
return &schema.Table{Name: e.TableName}
133+
}

scheduler/queue/scheduler.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55

66
"github.com/cloudquery/plugin-sdk/v4/caser"
7+
"github.com/cloudquery/plugin-sdk/v4/message"
78
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
89
"github.com/cloudquery/plugin-sdk/v4/schema"
910
"github.com/google/uuid"
@@ -77,7 +78,7 @@ func NewShuffleQueueScheduler(logger zerolog.Logger, m *metrics.Metrics, seed in
7778
return scheduler
7879
}
7980

80-
func (d *Scheduler) Sync(ctx context.Context, tableClients []WorkUnit, resolvedResources chan<- *schema.Resource) {
81+
func (d *Scheduler) Sync(ctx context.Context, tableClients []WorkUnit, resolvedResources chan<- *schema.Resource, msgChan chan<- message.SyncMessage) {
8182
if len(tableClients) == 0 {
8283
return
8384
}
@@ -102,6 +103,7 @@ func (d *Scheduler) Sync(ctx context.Context, tableClients []WorkUnit, resolvedR
102103
d.invocationID,
103104
d.deterministicCQID,
104105
d.metrics,
106+
msgChan,
105107
).work(ctx, activeWorkSignal)
106108
return nil
107109
})

scheduler/queue/scheduler_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"testing"
77

8+
"github.com/cloudquery/plugin-sdk/v4/message"
89
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
910
"github.com/cloudquery/plugin-sdk/v4/schema"
1011
"github.com/cloudquery/plugin-sdk/v4/transformers"
@@ -83,9 +84,10 @@ func TestScheduler(t *testing.T) {
8384
}
8485

8586
resolvedResources := make(chan *schema.Resource)
87+
msgs := make(chan message.SyncMessage, 10)
8688
go func() {
8789
defer close(resolvedResources)
88-
scheduler.Sync(context.Background(), tableClients, resolvedResources)
90+
scheduler.Sync(context.Background(), tableClients, resolvedResources, msgs)
8991
}()
9092

9193
gotResources := make([]*schema.Resource, 0)

scheduler/queue/worker.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/cloudquery/plugin-sdk/v4/caser"
1212
"github.com/cloudquery/plugin-sdk/v4/helpers"
13+
"github.com/cloudquery/plugin-sdk/v4/message"
1314
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
1415
"github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers"
1516
"github.com/cloudquery/plugin-sdk/v4/schema"
@@ -29,6 +30,8 @@ type worker struct {
2930
invocationID string
3031
deterministicCQID bool
3132
metrics *metrics.Metrics
33+
// message channel for sending SyncError messages
34+
msgChan chan<- message.SyncMessage
3235
}
3336

3437
func (w *worker) work(ctx context.Context, activeWorkSignal *activeWorkSignal) {
@@ -51,6 +54,7 @@ func newWorker(
5154
invocationID string,
5255
deterministicCQID bool,
5356
m *metrics.Metrics,
57+
msgChan chan<- message.SyncMessage,
5458
) *worker {
5559
return &worker{
5660
jobs: jobs,
@@ -61,6 +65,7 @@ func newWorker(
6165
deterministicCQID: deterministicCQID,
6266
invocationID: invocationID,
6367
metrics: m,
68+
msgChan: msgChan,
6469
}
6570
}
6671

@@ -105,6 +110,12 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s
105110
logger.Error().Err(err).Msg("table resolver finished with error")
106111
tableMetrics.OtelErrorsAdd(ctx, 1)
107112
atomic.AddUint64(&tableMetrics.Errors, 1)
113+
// Send SyncError message
114+
syncErrorMsg := &message.SyncError{
115+
TableName: table.Name,
116+
Error: err.Error(),
117+
}
118+
w.msgChan <- syncErrorMsg
108119
return
109120
}
110121
}()

scheduler/scheduler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ type syncClient struct {
139139
metrics *metrics.Metrics
140140
logger zerolog.Logger
141141
invocationID string
142+
// message channel for sending SyncError messages
143+
msgChan chan<- message.SyncMessage
142144

143145
shard *shard
144146
}
@@ -213,6 +215,7 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s
213215
scheduler: s,
214216
logger: s.logger,
215217
invocationID: s.invocationID,
218+
msgChan: res,
216219
}
217220
for _, opt := range opts {
218221
opt(syncClient)

scheduler/scheduler_dfs.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/cloudquery/plugin-sdk/v4/helpers"
12+
"github.com/cloudquery/plugin-sdk/v4/message"
1213
"github.com/cloudquery/plugin-sdk/v4/scheduler/batchsender"
1314
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
1415
"github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers"
@@ -119,6 +120,12 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c
119120
logger.Error().Err(err).Msg("table resolver finished with error")
120121
tableMetrics.OtelErrorsAdd(ctx, 1)
121122
atomic.AddUint64(&tableMetrics.Errors, 1)
123+
// Send SyncError message
124+
syncErrorMsg := &message.SyncError{
125+
TableName: table.Name,
126+
Error: err.Error(),
127+
}
128+
s.msgChan <- syncErrorMsg
122129
return
123130
}
124131
}()

scheduler/scheduler_shuffle_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,5 @@ func (s *syncClient) syncShuffleQueue(ctx context.Context, resolvedResources cha
4545
Client: tc.client,
4646
})
4747
}
48-
scheduler.Sync(ctx, queueClients, resolvedResources)
48+
scheduler.Sync(ctx, queueClients, resolvedResources, s.msgChan)
4949
}

0 commit comments

Comments
 (0)