Skip to content

Commit 3c92a92

Browse files
committed
Revert log changes
1 parent 19862e9 commit 3c92a92

File tree

4 files changed

+52
-38
lines changed

4 files changed

+52
-38
lines changed

kafkajobs/listener.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,18 +142,16 @@ func (d *Driver) listen() error {
142142
case SerialPipelining:
143143
fetchWg := &sync.WaitGroup{}
144144
fetches.EachPartition(func(partition kgo.FetchTopicPartition) {
145-
fetchWg.Add(1)
146145
itemWg := &sync.WaitGroup{}
147146

148-
go func() {
149-
defer fetchWg.Done()
147+
fetchWg.Go(func() {
150148
partition.EachRecord(func(r *kgo.Record) {
151149
itemWg.Add(1)
152150
item := fromConsumer(r, d.requeueCh, d.recordsCh, itemWg, &d.stopped)
153151
d.insertTracedItem(item)
154152
itemWg.Wait()
155153
})
156-
}()
154+
})
157155
})
158156
fetchWg.Wait()
159157
case FanOutPipelining:

tests/jobs_kafka_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func TestKafkaInitCG(t *testing.T) {
147147
}
148148

149149
func TestKafkaPQCG(t *testing.T) {
150-
cont := endure.New(slog.LevelDebug, endure.GracefulShutdownTimeout(time.Second))
150+
cont := endure.New(slog.LevelDebug)
151151

152152
cfg := &config.Plugin{
153153
Version: "v2023.2.0",
@@ -365,7 +365,6 @@ func TestKafkaPipeliningStrategy(t *testing.T) {
365365
stopCh <- struct{}{}
366366
wg.Wait()
367367

368-
assert.Equal(t, 60, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
369368
assert.Equal(t, 60, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
370369

371370
logs := oLogger.FilterMessageSnippet("php consumed:1:").All()

tests/mock/logger.go

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
package mocklogger
22

33
import (
4-
"encoding/json"
54
"github.com/roadrunner-server/endure/v2/dep"
65
"go.uber.org/zap"
76
"go.uber.org/zap/zapcore"
8-
"os"
9-
"strings"
107
)
118

129
type ZapLoggerMock struct {
@@ -18,36 +15,12 @@ type Logger interface {
1815
}
1916

2017
func ZapTestLogger(enab zapcore.LevelEnabler) (*ZapLoggerMock, *ObservedLogs) {
21-
logs := &ObservedLogs{}
22-
core := zapcore.NewCore(
23-
zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()), // or zapcore.NewConsoleEncoder for plain text
24-
zapcore.AddSync(os.Stdout),
25-
enab,
26-
)
18+
core, logs := New(enab)
19+
obsLog := zap.New(core, zap.Development())
2720

28-
logger := zap.New(
29-
core,
30-
zap.Development(),
31-
zap.Hooks(func(entry zapcore.Entry) error {
32-
line := strings.TrimSpace(entry.Message)
33-
var rawFields map[string]interface{}
34-
var c []zapcore.Field
35-
jsonStartIndex := strings.Index(line, "{")
36-
if jsonStartIndex > 0 {
37-
jsonStr := line[jsonStartIndex:]
38-
_ = json.Unmarshal([]byte(jsonStr), &rawFields)
39-
}
40-
41-
for field, value := range rawFields {
42-
c = append(c, zap.Any(field, value))
43-
}
44-
45-
logs.add(LoggedEntry{entry, c})
46-
return nil
47-
}),
48-
)
49-
50-
return &ZapLoggerMock{logger}, logs
21+
return &ZapLoggerMock{
22+
l: obsLog,
23+
}, logs
5124
}
5225

5326
func (z *ZapLoggerMock) Init() error {

tests/mock/observer.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,47 @@ func (o *ObservedLogs) add(log LoggedEntry) {
153153
o.logs = append(o.logs, log)
154154
o.mu.Unlock()
155155
}
156+
157+
// New creates a new Core that buffers logs in memory (without any encoding).
158+
// It's particularly useful in tests.
159+
func New(enab zapcore.LevelEnabler) (zapcore.Core, *ObservedLogs) {
160+
ol := &ObservedLogs{}
161+
return &contextObserver{
162+
LevelEnabler: enab,
163+
logs: ol,
164+
}, ol
165+
}
166+
167+
type contextObserver struct {
168+
zapcore.LevelEnabler
169+
logs *ObservedLogs
170+
context []zapcore.Field
171+
}
172+
173+
func (co *contextObserver) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
174+
if co.Enabled(ent.Level) {
175+
return ce.AddCore(ent, co)
176+
}
177+
return ce
178+
}
179+
180+
func (co *contextObserver) With(fields []zapcore.Field) zapcore.Core {
181+
return &contextObserver{
182+
LevelEnabler: co.LevelEnabler,
183+
logs: co.logs,
184+
context: append(co.context[:len(co.context):len(co.context)], fields...),
185+
}
186+
}
187+
188+
func (co *contextObserver) Write(ent zapcore.Entry, fields []zapcore.Field) error {
189+
all := make([]zapcore.Field, 0, len(fields)+len(co.context))
190+
all = append(all, co.context...)
191+
all = append(all, fields...)
192+
co.logs.add(LoggedEntry{ent, all})
193+
194+
return nil
195+
}
196+
197+
func (co *contextObserver) Sync() error {
198+
return nil
199+
}

0 commit comments

Comments
 (0)