File tree 3 files changed +9
-11
lines changed
3 files changed +9
-11
lines changed Original file line number Diff line number Diff line change 17
17
ElasticSearchConf struct {
18
18
Hosts []string
19
19
Index string
20
- DocType string `json:",default=doc "`
20
+ DocType string `json:",default=_doc "`
21
21
TimeZone string `json:",optional"`
22
22
MaxChunkBytes int `json:",default=15728640"` // default 15M
23
23
Compress bool `json:",default=false"`
Original file line number Diff line number Diff line change @@ -2,6 +2,7 @@ package es
2
2
3
3
import (
4
4
"context"
5
+ "fmt"
5
6
6
7
"github.com/kevwan/go-stash/stash/config"
7
8
"github.com/olivere/elastic/v7"
@@ -10,7 +11,7 @@ import (
10
11
"github.com/zeromicro/go-zero/core/logx"
11
12
)
12
13
13
- const es8Version = "8 .0.0"
14
+ const es8Version = "v8 .0.0"
14
15
15
16
type (
16
17
Writer struct {
22
23
23
24
valueWithIndex struct {
24
25
index string
25
- val string
26
+ val map [ string ] interface {}
26
27
}
27
28
)
28
29
@@ -45,13 +46,13 @@ func NewWriter(c config.ElasticSearchConf) (*Writer, error) {
45
46
writer := Writer {
46
47
docType : c .DocType ,
47
48
client : client ,
48
- esVersion : version ,
49
+ esVersion : fmt . Sprintf ( "v%s" , version ) ,
49
50
}
50
51
writer .inserter = executors .NewChunkExecutor (writer .execute , executors .WithChunkBytes (c .MaxChunkBytes ))
51
52
return & writer , nil
52
53
}
53
54
54
- func (w * Writer ) Write (index , val string ) error {
55
+ func (w * Writer ) Write (index string , val map [ string ] interface {} ) error {
55
56
return w .inserter .Add (valueWithIndex {
56
57
index : index ,
57
58
val : val ,
Original file line number Diff line number Diff line change 4
4
jsoniter "github.com/json-iterator/go"
5
5
"github.com/kevwan/go-stash/stash/es"
6
6
"github.com/kevwan/go-stash/stash/filter"
7
+ "time"
7
8
)
8
9
9
10
type MessageHandler struct {
@@ -37,11 +38,7 @@ func (mh *MessageHandler) Consume(_, val string) error {
37
38
return nil
38
39
}
39
40
}
41
+ m ["timestamp" ] = time .Now ()
40
42
41
- bs , err := jsoniter .Marshal (m )
42
- if err != nil {
43
- return err
44
- }
45
-
46
- return mh .writer .Write (index , string (bs ))
43
+ return mh .writer .Write (index , m )
47
44
}
You can’t perform that action at this time.
0 commit comments