Skip to content

Commit d20397c

Browse files
authored
DispatchLink(): Fix user stats
Fixes #5076 (comment)
1 parent 19f8907 commit d20397c

File tree

8 files changed

+62
-7
lines changed

8 files changed

+62
-7
lines changed

app/dispatcher/default.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,47 @@ func (d *DefaultDispatcher) getLink(ctx context.Context) (*transport.Link, *tran
196196
return inboundLink, outboundLink
197197
}
198198

199+
func (d *DefaultDispatcher) WrapLink(ctx context.Context, link *transport.Link) *transport.Link {
200+
sessionInbound := session.InboundFromContext(ctx)
201+
var user *protocol.MemoryUser
202+
if sessionInbound != nil {
203+
user = sessionInbound.User
204+
}
205+
206+
link.Reader = &buf.TimeoutWrapperReader{Reader: link.Reader}
207+
208+
if user != nil && len(user.Email) > 0 {
209+
p := d.policy.ForLevel(user.Level)
210+
if p.Stats.UserUplink {
211+
name := "user>>>" + user.Email + ">>>traffic>>>uplink"
212+
if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
213+
link.Reader.(*buf.TimeoutWrapperReader).Counter = c
214+
}
215+
}
216+
if p.Stats.UserDownlink {
217+
name := "user>>>" + user.Email + ">>>traffic>>>downlink"
218+
if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
219+
link.Writer = &SizeStatWriter{
220+
Counter: c,
221+
Writer: link.Writer,
222+
}
223+
}
224+
}
225+
if p.Stats.UserOnline {
226+
name := "user>>>" + user.Email + ">>>online"
227+
if om, _ := stats.GetOrRegisterOnlineMap(d.stats, name); om != nil {
228+
sessionInbounds := session.InboundFromContext(ctx)
229+
userIP := sessionInbounds.Source.Address.String()
230+
om.AddIP(userIP)
231+
// log Online user with ips
232+
// errors.LogDebug(ctx, "user>>>" + user.Email + ">>>online", om.Count(), om.List())
233+
}
234+
}
235+
}
236+
237+
return link
238+
}
239+
199240
func (d *DefaultDispatcher) shouldOverride(ctx context.Context, result SniffResult, request session.SniffingRequest, destination net.Destination) bool {
200241
domain := result.Domain()
201242
if domain == "" {
@@ -316,6 +357,7 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De
316357
content = new(session.Content)
317358
ctx = session.ContextWithContent(ctx, content)
318359
}
360+
outbound = d.WrapLink(ctx, outbound)
319361
sniffingRequest := content.SniffingRequest
320362
if !sniffingRequest.Enabled {
321363
d.routedDispatch(ctx, outbound, destination)

app/reverse/bridge.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"time"
66

7+
"github.com/xtls/xray-core/app/dispatcher"
78
"github.com/xtls/xray-core/common/errors"
89
"github.com/xtls/xray-core/common/mux"
910
"github.com/xtls/xray-core/common/net"
@@ -200,6 +201,7 @@ func (w *BridgeWorker) DispatchLink(ctx context.Context, dest net.Destination, l
200201
return w.dispatcher.DispatchLink(ctx, dest, link)
201202
}
202203

204+
link = w.dispatcher.(*dispatcher.DefaultDispatcher).WrapLink(ctx, link)
203205
w.handleInternalConn(link)
204206

205207
return nil

common/buf/io.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type TimeoutReader interface {
3030

3131
type TimeoutWrapperReader struct {
3232
Reader
33+
stats.Counter
3334
mb MultiBuffer
3435
err error
3536
done chan struct{}
@@ -39,11 +40,16 @@ func (r *TimeoutWrapperReader) ReadMultiBuffer() (MultiBuffer, error) {
3940
if r.done != nil {
4041
<-r.done
4142
r.done = nil
43+
if r.Counter != nil {
44+
r.Counter.Add(int64(r.mb.Len()))
45+
}
4246
return r.mb, r.err
4347
}
44-
r.mb = nil
45-
r.err = nil
46-
return r.Reader.ReadMultiBuffer()
48+
r.mb, r.err = r.Reader.ReadMultiBuffer()
49+
if r.Counter != nil {
50+
r.Counter.Add(int64(r.mb.Len()))
51+
}
52+
return r.mb, r.err
4753
}
4854

4955
func (r *TimeoutWrapperReader) ReadMultiBufferTimeout(duration time.Duration) (MultiBuffer, error) {
@@ -62,6 +68,9 @@ func (r *TimeoutWrapperReader) ReadMultiBufferTimeout(duration time.Duration) (M
6268
select {
6369
case <-r.done:
6470
r.done = nil
71+
if r.Counter != nil {
72+
r.Counter.Add(int64(r.mb.Len()))
73+
}
6574
return r.mb, r.err
6675
case <-timeout:
6776
return nil, nil

common/mux/server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"io"
66

7+
"github.com/xtls/xray-core/app/dispatcher"
78
"github.com/xtls/xray-core/common"
89
"github.com/xtls/xray-core/common/buf"
910
"github.com/xtls/xray-core/common/errors"
@@ -61,6 +62,7 @@ func (s *Server) DispatchLink(ctx context.Context, dest net.Destination, link *t
6162
if dest.Address != muxCoolAddress {
6263
return s.dispatcher.DispatchLink(ctx, dest, link)
6364
}
65+
link = s.dispatcher.(*dispatcher.DefaultDispatcher).WrapLink(ctx, link)
6466
_, err := NewServerWorker(ctx, s.dispatcher, link)
6567
return err
6668
}

proxy/dokodemo/dokodemo.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn st
182182
}
183183

184184
if err := dispatcher.DispatchLink(ctx, dest, &transport.Link{
185-
Reader: &buf.TimeoutWrapperReader{Reader: reader},
185+
Reader: reader,
186186
Writer: writer},
187187
); err != nil {
188188
return errors.New("failed to dispatch request").Base(err)

proxy/http/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func (s *Server) handleConnect(ctx context.Context, _ *http.Request, buffer *buf
193193
inbound.CanSpliceCopy = 1
194194
}
195195
if err := dispatcher.DispatchLink(ctx, dest, &transport.Link{
196-
Reader: &buf.TimeoutWrapperReader{Reader: reader},
196+
Reader: reader,
197197
Writer: buf.NewWriter(conn)},
198198
); err != nil {
199199
return errors.New("failed to dispatch request").Base(err)

proxy/socks/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func (s *Server) processTCP(ctx context.Context, conn stat.Connection, dispatche
161161
inbound.CanSpliceCopy = 1
162162
}
163163
if err := dispatcher.DispatchLink(ctx, dest, &transport.Link{
164-
Reader: &buf.TimeoutWrapperReader{Reader: reader},
164+
Reader: reader,
165165
Writer: buf.NewWriter(conn)},
166166
); err != nil {
167167
return errors.New("failed to dispatch request").Base(err)

proxy/vless/inbound/inbound.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
563563
bufferWriter.SetFlushNext()
564564

565565
if err := dispatcher.DispatchLink(ctx, request.Destination(), &transport.Link{
566-
Reader: &buf.TimeoutWrapperReader{Reader: clientReader},
566+
Reader: clientReader,
567567
Writer: clientWriter},
568568
); err != nil {
569569
return errors.New("failed to dispatch request").Base(err)

0 commit comments

Comments
 (0)