Skip to content

Commit 646a508

Browse files
committed
feat(v0.4.4): v0.4.4
1 parent 2b7df7f commit 646a508

File tree

6 files changed

+101
-11
lines changed

6 files changed

+101
-11
lines changed

adapter/kratos/transport/websocket/websocket.go

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ import (
44
"context"
55
"encoding/json"
66
http "net/http"
7+
"reflect"
78
"strings"
89

910
"github.com/ccheers/xpkg/sync/errgroup"
1011
"github.com/gorilla/websocket"
1112
"github.com/pkg/errors"
13+
"google.golang.org/protobuf/encoding/protojson"
14+
"google.golang.org/protobuf/proto"
1215
)
1316

1417
type WebSocket[T any, R any] struct {
@@ -21,6 +24,7 @@ type Encoding struct {
2124
errorEncodeFunc func(w http.ResponseWriter, err error)
2225
requestDecodeFunc func(r *http.Request, req interface{}) error
2326

27+
wsReqDecodeFunc func(bs []byte, req interface{}) error
2428
replyEncodeFunc func(ws *websocket.Conn, resp interface{})
2529
replyErrorEncodeFunc func(ws *websocket.Conn, err error)
2630
}
@@ -54,6 +58,9 @@ func defaultWSOptions() WSOptions {
5458
requestDecodeFunc: func(r *http.Request, req interface{}) error {
5559
return json.NewDecoder(r.Body).Decode(req)
5660
},
61+
wsReqDecodeFunc: func(bs []byte, req interface{}) error {
62+
return unmarshalJSON(bs, req)
63+
},
5764
replyEncodeFunc: func(ws *websocket.Conn, resp interface{}) {
5865
_ = ws.WriteJSON(map[string]interface{}{
5966
"code": 200,
@@ -99,6 +106,12 @@ func WithRequestDecodeFunc(fn func(r *http.Request, req interface{}) error) WSOp
99106
}
100107
}
101108

109+
func WithWsReqDecodeFunc(fn func(bs []byte, req interface{}) error) WSOptionFunc {
110+
return func(options *WSOptions) {
111+
options.encoding.wsReqDecodeFunc = fn
112+
}
113+
}
114+
102115
func WithReplyEncodeFunc(fn func(ws *websocket.Conn, resp interface{})) WSOptionFunc {
103116
return func(options *WSOptions) {
104117
options.encoding.replyEncodeFunc = fn
@@ -193,7 +206,7 @@ func (x *WebSocket[T, R]) readLoop(ctx context.Context, ws *websocket.Conn) {
193206
default:
194207
}
195208
var dst R
196-
err := ws.ReadJSON(&dst)
209+
_, bs, err := ws.ReadMessage()
197210
if err != nil {
198211
if strings.Contains(err.Error(), "connection reset by peer") {
199212
return
@@ -204,6 +217,12 @@ func (x *WebSocket[T, R]) readLoop(ctx context.Context, ws *websocket.Conn) {
204217
continue
205218
}
206219

220+
err = x.options.encoding.wsReqDecodeFunc(bs, &dst)
221+
if err != nil {
222+
x.options.encoding.replyErrorEncodeFunc(ws, err)
223+
continue
224+
}
225+
207226
if validate, ok := (interface{})(&dst).(interface{ Validate() error }); ok {
208227
err := validate.Validate()
209228
if err != nil {
@@ -237,3 +256,31 @@ func (x *WebSocket[T, R]) writeLoop(ctx context.Context, ws *websocket.Conn) {
237256
x.options.encoding.replyEncodeFunc(ws, resp)
238257
}
239258
}
259+
260+
var (
261+
// unmarshalOptions is a configurable JSON format parser.
262+
unmarshalOptions = protojson.UnmarshalOptions{
263+
DiscardUnknown: true,
264+
}
265+
)
266+
267+
func unmarshalJSON(data []byte, v interface{}) error {
268+
switch m := v.(type) {
269+
case json.Unmarshaler:
270+
return m.UnmarshalJSON(data)
271+
case proto.Message:
272+
return unmarshalOptions.Unmarshal(data, m)
273+
default:
274+
rv := reflect.ValueOf(v)
275+
for rv := rv; rv.Kind() == reflect.Ptr; {
276+
if rv.IsNil() {
277+
rv.Set(reflect.New(rv.Type().Elem()))
278+
}
279+
rv = rv.Elem()
280+
}
281+
if m, ok := reflect.Indirect(rv).Interface().(proto.Message); ok {
282+
return unmarshalOptions.Unmarshal(data, m)
283+
}
284+
return json.Unmarshal(data, m)
285+
}
286+
}

client/xvm/client_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ func TestVM(t *testing.T) {
4646

4747
datas := []string{
4848
//"server_name",
49-
"entity_count{server_name=\"xxxxx\"}",
50-
//"online_number",
49+
//"entity_count",
50+
"online_number",
5151
//"lock_entity_status",
5252
//"lock_lb_status",
5353
//"count(entity_count)",

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ require (
1919
go.etcd.io/etcd/client/v3 v3.5.7
2020
go.opentelemetry.io/otel v1.28.0
2121
go.opentelemetry.io/otel/metric v1.28.0
22+
go.opentelemetry.io/otel/sdk v1.28.0
2223
go.opentelemetry.io/otel/sdk/metric v1.28.0
2324
go.opentelemetry.io/otel/trace v1.28.0
2425
go.uber.org/zap v1.27.0
@@ -63,11 +64,11 @@ require (
6364
github.com/satori/go.uuid v1.2.0 // indirect
6465
go.etcd.io/etcd/api/v3 v3.5.7 // indirect
6566
go.etcd.io/etcd/client/pkg/v3 v3.5.7 // indirect
66-
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
6767
go.uber.org/multierr v1.10.0 // indirect
6868
golang.org/x/crypto v0.26.0 // indirect
6969
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
7070
golang.org/x/net v0.28.0 // indirect
71+
golang.org/x/sync v0.8.0 // indirect
7172
golang.org/x/sys v0.23.0 // indirect
7273
golang.org/x/text v0.17.0 // indirect
7374
google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 // indirect

mysql/mysql.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func NewMysql(opts ...DBConfigOption) (*sql.DB, func(), error) {
8080
opt(&cfg)
8181
}
8282
netAddr := fmt.Sprintf("tcp(%s:%d)", cfg.Host, cfg.Port)
83-
dsn := fmt.Sprintf("%s:%s@%s/%s?timeout=30s&charset=utf8mb4", cfg.User, cfg.Pass, netAddr, cfg.DBName)
83+
dsn := fmt.Sprintf("%s:%s@%s/%s?loc=Local&charset=utf8mb4&parseTime=True", cfg.User, cfg.Pass, netAddr, cfg.DBName)
8484

8585
driverName, err := otelsql.Register(
8686
"mysql",
@@ -89,6 +89,9 @@ func NewMysql(opts ...DBConfigOption) (*sql.DB, func(), error) {
8989
semconv.DBSystemMySQL,
9090
),
9191
otelsql.WithMeterProvider(cfg.MeterProvider),
92+
otelsql.WithSpanOptions(otelsql.SpanOptions{
93+
DisableErrSkip: true,
94+
}),
9295
)
9396
if err != nil {
9497
return nil, nil, err

redis/redis.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ func NewRedis(opts ...RedisConfigOption) (*redis.Client, error) {
7272
redisClient := redis.NewClient(&redis.Options{
7373
Network: "tcp",
7474
Addr: addr,
75+
Password: cfg.Pass,
7576
DialTimeout: time.Duration(cfg.ReadTimeout) * time.Second,
7677
ReadTimeout: time.Duration(cfg.ReadTimeout) * time.Second,
7778
WriteTimeout: time.Duration(cfg.WriteTimeout) * time.Second,
@@ -104,6 +105,7 @@ func NewRedisV8(opts ...RedisConfigOption) (*redisv8.Client, error) {
104105
redisClient := redisv8.NewClient(&redisv8.Options{
105106
Network: "tcp",
106107
Addr: addr,
108+
Password: cfg.Pass,
107109
DialTimeout: time.Duration(cfg.ReadTimeout) * time.Second,
108110
ReadTimeout: time.Duration(cfg.ReadTimeout) * time.Second,
109111
WriteTimeout: time.Duration(cfg.WriteTimeout) * time.Second,

transport/chttp/http.go

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,39 @@ import (
77
"io"
88
nethttp "net/http"
99
"reflect"
10+
"strconv"
1011
"strings"
1112
"time"
1213

1314
"github.com/go-kratos/kratos/v2/transport/http"
1415
"github.com/opendevops-cn/codo-golang-sdk/cerr"
16+
"go.opentelemetry.io/otel/propagation"
17+
"go.opentelemetry.io/otel/trace"
1518
"google.golang.org/protobuf/encoding/protojson"
1619
"google.golang.org/protobuf/proto"
1720
)
1821

22+
type options struct {
23+
propagator propagation.TextMapPropagator
24+
}
25+
26+
var optionsDefault = options{
27+
propagator: propagation.NewCompositeTextMapPropagator(propagation.Baggage{}, propagation.TraceContext{}),
28+
}
29+
1930
type Resp struct {
2031
// 业务 code
2132
Code cerr.ErrCode `json:"code"`
22-
// 开发看
23-
Msg string `json:"msg"`
2433
// 用户看
34+
Msg string `json:"msg"`
35+
// 开发看
2536
Reason string `json:"reason"`
26-
// 服务器时间戳
27-
Timestamp uint32 `json:"timestamp"`
37+
// 服务器毫秒时间戳
38+
Timestamp string `json:"timestamp"`
2839
// 结构化数据
2940
Result json.RawMessage `json:"result"`
41+
// TraceID
42+
TraceID string `json:"trace_id"`
3043
}
3144

3245
var (
@@ -47,27 +60,46 @@ func ResponseEncoder(writer nethttp.ResponseWriter, request *nethttp.Request, i
4760
if err != nil {
4861
return err
4962
}
63+
64+
ctx := optionsDefault.propagator.Extract(request.Context(), propagation.HeaderCarrier(request.Header))
65+
sp := trace.SpanContextFromContext(ctx)
66+
milliSecondsStr := strconv.Itoa(int(time.Now().UnixMilli()))
67+
68+
// 写入
69+
writer.WriteHeader(nethttp.StatusOK)
5070
return json.NewEncoder(writer).Encode(&Resp{
5171
Code: cerr.SCode,
5272
Msg: "success",
53-
Timestamp: uint32(time.Now().Unix()),
73+
Reason: "success",
74+
Timestamp: milliSecondsStr,
5475
Result: bs,
76+
TraceID: sp.TraceID().String(),
5577
})
5678
}
5779

5880
func RequestBodyDecoder(r *nethttp.Request, i interface{}) error {
81+
const megaBytes4 = 4 << 20
82+
if r.ContentLength == 0 || r.ContentLength > megaBytes4 {
83+
return nil
84+
}
85+
5986
data, err := io.ReadAll(r.Body)
6087
if err != nil {
6188
return cerr.New(cerr.EParamUnparsedCode, err)
6289
}
6390

91+
if len(data) == 0 {
92+
return nil
93+
}
94+
6495
// reset body.
6596
r.Body = io.NopCloser(bytes.NewBuffer(data))
6697

6798
err = unmarshalJSON(data, i)
6899
if err != nil {
69100
return cerr.New(cerr.EParamUnparsedCode, err)
70101
}
102+
71103
return nil
72104
}
73105

@@ -116,13 +148,18 @@ func ErrorEncoder(writer http.ResponseWriter, request *http.Request, err error)
116148
statusCode := codeError.Code.AsHTTPCode()
117149
msg := codeError.Code.String()
118150

151+
ctx := optionsDefault.propagator.Extract(request.Context(), propagation.HeaderCarrier(request.Header))
152+
sp := trace.SpanContextFromContext(ctx)
153+
milliSecondsStr := strconv.Itoa(int(time.Now().UnixMilli()))
154+
119155
// 写入
120156
writer.WriteHeader(statusCode)
121157
_ = json.NewEncoder(writer).Encode(&Resp{
122158
Code: errCode,
123159
Msg: msg,
124160
Reason: err.Error(),
125-
Timestamp: uint32(time.Now().Unix()),
161+
Timestamp: milliSecondsStr,
162+
TraceID: sp.TraceID().String(),
126163
})
127164
}
128165

0 commit comments

Comments
 (0)