Skip to content

Commit d3b4d2c

Browse files
author
Gleez Technologies
committed
feature: Publish Minimal nats event early and several fixes
1 parent 8952129 commit d3b4d2c

File tree

7 files changed

+105
-90
lines changed

7 files changed

+105
-90
lines changed

Dockerfile

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +0,0 @@
1-
############################
2-
# STEP 1 build executable binary
3-
############################
4-
FROM golang:1.14 AS builder
5-
6-
ADD . /go/src/github.com/sandeepone/mysql-manticore
7-
8-
RUN cd /go/src/github.com/sandeepone/mysql-manticore \
9-
&& COMMIT_SHA=$(git rev-parse --short HEAD) \
10-
&& CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "-s -w \
11-
-X main.version=0.8.0 \
12-
-X main.revision=${COMMIT_SHA}" \
13-
-a -tags netgo -installsuffix netgo -o mysql-manticore cmd/mysql-manticore/main.go
14-
15-
############################
16-
# STEP 2 build a certs image
17-
############################
18-
19-
# Alpine certs
20-
FROM alpine:3.11 as alpine
21-
22-
RUN apk update && apk add --no-cache ca-certificates tzdata && update-ca-certificates
23-
24-
# Create appuser
25-
ENV USER=appuser
26-
ENV UID=10001
27-
28-
# See https://stackoverflow.com/a/55757473/12429735
29-
RUN adduser \
30-
--disabled-password \
31-
--gecos "" \
32-
--home "/nonexistent" \
33-
--shell "/sbin/nologin" \
34-
--no-create-home \
35-
--uid "${UID}" \
36-
"${USER}"
37-
38-
############################
39-
# STEP 3 build a release image
40-
############################
41-
FROM scratch
42-
MAINTAINER Sandeep Sangamreddi <[email protected]>
43-
44-
# Import from builder.
45-
COPY --from=alpine /usr/share/zoneinfo /usr/share/zoneinfo
46-
COPY --from=alpine /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
47-
COPY --from=alpine /etc/passwd /etc/passwd
48-
COPY --from=alpine /etc/group /etc/group
49-
50-
COPY --from=builder /go/src/github.com/sandeepone/mysql-manticore/mysql-manticore /usr/bin/
51-
COPY etc/river.toml /etc/mysql-manticore/river.toml
52-
COPY etc/dict /etc/mysql-manticore/dict
53-
54-
EXPOSE 8080
55-
56-
# Use an unprivileged user.
57-
USER appuser:appuser
58-
59-
ENTRYPOINT ["mysql-manticore", "-config", "/etc/mysql-manticore/river.toml"]

river/config.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ type Config struct {
104104

105105
StatAddr string `toml:"stat_addr"`
106106

107-
NatsEnabled bool `toml:"nats_enabled"`
108107
NatsAddr string `toml:"nats_addr"`
108+
NatsEnabled bool `toml:"nats_enabled"`
109109

110110
ServerID uint32 `toml:"server_id"`
111111
Flavor string `toml:"flavor"`
@@ -135,6 +135,8 @@ type Config struct {
135135
SkipRebuild bool `toml:"skip_rebuild"`
136136
SkipUploadIndex bool `toml:"skip_upload_index"`
137137
SkipReloadRtIndex bool `toml:"skip_reload_rt_index"`
138+
139+
TaoMap map[string]string `toml:"tao_type"`
138140
}
139141

140142
// IndexConfigField field of an index as it's seen in the indexer config

river/cron_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (s *CronService) Serve() {
3333

3434
s.cron = cron.New(
3535
cron.WithSeconds(),
36-
cron.WithLogger(cron.VerbosePrintfLogger(stdLogger)),
36+
cron.WithLogger(cron.PrintfLogger(stdLogger)),
3737
)
3838

3939
cfg := s.riverInstance.c.MaintenanceConfig

river/nats_service.go

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package river
33
import (
44
"context"
55
"fmt"
6+
"strconv"
67
"sync"
78
"time"
89

@@ -17,6 +18,9 @@ type Message struct {
1718
// Id is an unique identifier of action.
1819
Id uint64
1920

21+
// Type id of the event
22+
Type uint32
23+
2024
// event name: users/posts/comments etc
2125
Name string
2226

@@ -44,6 +48,8 @@ type NatsService struct {
4448
sphm sync.Mutex
4549
wg sync.WaitGroup
4650

51+
seq uint64
52+
4753
// NATS connection types
4854
nc *nats.Conn
4955
ec *nats.EncodedConn
@@ -83,8 +89,9 @@ func (s *NatsService) SyncLoop(ctx context.Context) {
8389

8490
for {
8591
if s.ec == nil {
92+
s.log.Debug("SyncLoop Nats not connected, so drain channel")
8693
<-sync2Nats
87-
break
94+
continue
8895
}
8996

9097
select {
@@ -93,20 +100,33 @@ func (s *NatsService) SyncLoop(ctx context.Context) {
93100
case <-s.ctx.Done():
94101
s.log.Info("SyncLoop Closed")
95102
break
103+
default:
104+
s.log.Debug("SyncLoop No activity")
96105
}
97106
}
107+
108+
close(sync2Nats)
109+
s.log.Info("SyncLoop exited")
98110
}
99111

100112
func (s *NatsService) Publish(v *Message) {
101113
if s.ec == nil {
102114
return
103115
}
104116

105-
sub := fmt.Sprintf("binlog.%s.%s", v.Name, v.Action)
106-
s.log.Debugf("Publishing event %s: %d", sub, v.Id)
117+
cfg := s.riverInstance.c
118+
typ := strconv.FormatUint(uint64(v.Type), 10)
119+
120+
// try to get name from tao map
121+
if name, ok := cfg.TaoMap[typ]; ok {
122+
v.Name = name
123+
}
124+
125+
sub := fmt.Sprintf("binlog.%s.%s.%d", v.Name, v.Action, s.nextSeqNo())
126+
s.log.Infof("Publishing event [%s] for [%d]", sub, v.Id)
107127

108128
if err := s.ec.Publish(sub, v); err != nil {
109-
s.log.Errorf("Error publishing event %d: %v", v.Id, err)
129+
s.log.Errorf("Error publishing event [%s] for [%d]: %v", sub, v.Id, err)
110130
}
111131

112132
// // Sends a PING and wait for a PONG from the server, up to the given timeout.
@@ -123,6 +143,12 @@ func NewNatsService(r *River) *NatsService {
123143
return &s
124144
}
125145

146+
func (s *NatsService) nextSeqNo() uint64 {
147+
s.seq++
148+
149+
return s.seq
150+
}
151+
126152
func (s *NatsService) connect() {
127153
s.sphm.Lock()
128154
defer s.sphm.Unlock()
@@ -192,11 +218,23 @@ func NewMessage(id uint64) *Message {
192218
}
193219
}
194220

195-
func PublishRowToNats(doc TableRowChange, rule IngestRule) {
221+
func PublishRowToNats(id uint64, typ uint32, action string, table string) {
222+
ev := NewMessage(id)
223+
ev.Type = typ
224+
ev.Name = table
225+
ev.Action = action
226+
ev.Metadata["tableName"] = table
227+
228+
sync2Nats <- ev
229+
}
230+
231+
func PublishDocToNats(doc TableRowChange, rule IngestRule) {
196232
ev := NewMessage(doc.DocID)
197-
ev.Name = doc.Index
233+
ev.Type = uint32(rule.JsonTypeValue)
234+
ev.Name = doc.TableName
198235
ev.Action = doc.Action
199236

237+
ev.Metadata["index"] = doc.Index
200238
ev.Metadata["tableName"] = doc.TableName
201239
ev.Metadata["timeStamp"] = doc.TS.String()
202240

river/river.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func (r *River) run() error {
226226
r.cronToken = &t
227227
}
228228

229-
if r.natsToken == nil && r.c.NatsEnabled {
229+
if r.natsToken == nil {
230230
t := r.sup.Add(NewNatsService(r))
231231
r.natsToken = &t
232232
}

river/rule.go

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ type IngestRule struct {
3030
Index string `toml:"index"`
3131
ColumnMap map[string][]string `toml:"column_map"`
3232
JsonColumnName string `toml:"json_column_name"`
33-
JsonTypeName string `toml:"json_type_name"` // optional used for tao_objects type column ex: user or post or comment
34-
JsonTypeValue int `toml:"json_type_value"` // optional used for tao_objects type column ex: 1 or 3 or 4
33+
JsonTypeName string `toml:"json_type_name"` // optional used for tao_objects type column name ex: type
34+
JsonTypeValue int `toml:"json_type_value"` // optional used for tao_objects type column value ex: 1 or 3 or 4
3535
timeProvider func() time.Time
3636

3737
// MySQL table information
@@ -158,6 +158,7 @@ func (r *IngestRule) Apply(e *canal.RowsEvent) ([]TableRowChange, error) {
158158
if err != nil {
159159
return nil, errors.Trace(err)
160160
}
161+
161162
switch e.Action {
162163
case canal.InsertAction:
163164
return r.makeInsertChangeSet(e, idColNo)
@@ -172,7 +173,9 @@ func (r *IngestRule) Apply(e *canal.RowsEvent) ([]TableRowChange, error) {
172173
// ApplyRuleSet converts row event to document changeset and sends to the channel c
173174
func ApplyRuleSet(ruleSet []IngestRule, e *canal.RowsEvent, c chan interface{}) (uint64, error) {
174175
var docCount uint64
175-
var ts = time.Now().UTC()
176+
177+
// Publish Minimal Row Event - without payload
178+
processRowEventForNats(e)
176179

177180
for ruleID := range ruleSet {
178181
rule := ruleSet[ruleID]
@@ -181,37 +184,22 @@ func ApplyRuleSet(ruleSet []IngestRule, e *canal.RowsEvent, c chan interface{})
181184
return docCount, errors.Trace(err)
182185
}
183186

184-
// Log skipped event
185-
if docs == nil && rule.JsonTypeValue > 0 {
186-
log.Infof(
187-
"[row event skipped] ruleId=%d index=%s table=%s type!=%d action=%s rows=%d timeStamp=%s",
188-
ruleID,
189-
rule.Index,
190-
e.Table,
191-
rule.JsonTypeValue,
192-
e.Action,
193-
rowCount(e),
194-
ts.String(),
195-
)
196-
}
197-
198187
if docs != nil {
199188
log.Infof(
200-
"[row event] ruleId=%d index=%s table=%s type=%d action=%s rows=%d docs=%v timeStamp=%s",
189+
"[rule event] ruleId=%d index=%s table=%s type=%d action=%s rows=%d docs=%v",
201190
ruleID,
202191
rule.Index,
203192
e.Table,
204193
rule.JsonTypeValue,
205194
e.Action,
206195
rowCount(e),
207196
DocIDList(docs),
208-
ts.String(),
209197
)
198+
210199
for _, doc := range docs {
211200
c <- doc
212201
docCount++
213-
214-
PublishRowToNats(doc, rule)
202+
// PublishDocToNats(doc, rule)
215203
}
216204
}
217205
}
@@ -427,3 +415,49 @@ func isJsonRowTypeValid(e *canal.RowsEvent, typColNo, typColVal int) (bool, erro
427415

428416
return false, nil
429417
}
418+
419+
func processRowEventForNats(e *canal.RowsEvent) {
420+
idColNo, _ := fieldIndex(e, "id")
421+
typColNo, _ := fieldIndex(e, "type")
422+
423+
if e.Action == "update" {
424+
for i := 0; i < len(e.Rows)/2; i++ {
425+
var id uint64
426+
var typ uint32
427+
newRow := e.Rows[i*2+1]
428+
429+
if idColNo >= 0 {
430+
id, _ = util.CoerceToUint64(newRow[idColNo])
431+
}
432+
433+
if typColNo >= 0 {
434+
typ, _ = util.CoerceToUint32(newRow[typColNo])
435+
}
436+
437+
if id > 0 && typ > 0 {
438+
PublishRowToNats(id, typ, e.Action, e.Table.String())
439+
log.Debugf("[row event] table=%s type=%d action=%s id=%d", e.Table, typ, e.Action, id)
440+
}
441+
}
442+
}
443+
444+
if e.Action == "insert" || e.Action == "delete" {
445+
for _, row := range e.Rows {
446+
var id uint64
447+
var typ uint32
448+
449+
if idColNo >= 0 {
450+
id, _ = util.CoerceToUint64(row[idColNo])
451+
}
452+
453+
if typColNo >= 0 {
454+
typ, _ = util.CoerceToUint32(row[typColNo])
455+
}
456+
457+
if id > 0 && typ > 0 {
458+
PublishRowToNats(id, typ, e.Action, e.Table.String())
459+
log.Debugf("[row event] table=%s type=%d action=%s id=%d", e.Table, typ, e.Action, id)
460+
}
461+
}
462+
}
463+
}

sphinx/sphinx.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ func (c *SphConn) reconnect() error {
300300
}
301301

302302
// addr := c.RemoteAddr().String()
303-
log.Infof("[sphinx-reconnect] %s", c.addr)
303+
log.Debugf("[sphinx-reconnect] %s", c.addr)
304304
conn, err := client.Connect(c.addr, "", "", "")
305305
if err != nil {
306306
return errors.Trace(err)

0 commit comments

Comments
 (0)