-
-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathconn.go
183 lines (163 loc) · 4.49 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package masque
import (
"context"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"sync"
"sync/atomic"
"time"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
"github.com/quic-go/quic-go/quicvarint"
)
type masqueAddr struct{ net.Addr }
func (m masqueAddr) Network() string { return "connect-udp" }
func (m masqueAddr) String() string { return m.Addr.String() }
var _ net.Addr = &masqueAddr{}
type proxiedConn struct {
str http3.Stream
localAddr net.Addr
remoteAddr net.Addr
closed atomic.Bool // set when Close is called
readDone chan struct{}
deadlineMx sync.Mutex
readCtx context.Context
readCtxCancel context.CancelFunc
deadline time.Time
readDeadlineTimer *time.Timer
}
var _ net.PacketConn = &proxiedConn{}
func newProxiedConn(str http3.Stream, local net.Addr) *proxiedConn {
c := &proxiedConn{
str: str,
localAddr: local,
readDone: make(chan struct{}),
}
c.readCtx, c.readCtxCancel = context.WithCancel(context.Background())
go func() {
defer close(c.readDone)
if err := skipCapsules(quicvarint.NewReader(str)); err != io.EOF && !c.closed.Load() {
log.Printf("reading from request stream failed: %v", err)
}
str.Close()
}()
return c
}
func (c *proxiedConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) {
start:
c.deadlineMx.Lock()
ctx := c.readCtx
c.deadlineMx.Unlock()
data, err := c.str.ReceiveDatagram(ctx)
if err != nil {
if !errors.Is(err, context.Canceled) {
return 0, nil, err
}
// The context is cancelled asynchronously (in a Go routine spawned from time.AfterFunc).
// We need to check if a new deadline has already been set.
c.deadlineMx.Lock()
restart := time.Now().Before(c.deadline)
c.deadlineMx.Unlock()
if restart {
goto start
}
return 0, nil, os.ErrDeadlineExceeded
}
contextID, n, err := quicvarint.Parse(data)
if err != nil {
return 0, nil, fmt.Errorf("masque: malformed datagram: %w", err)
}
if contextID != 0 {
// Drop this datagram. We currently only support proxying of UDP payloads.
goto start
}
// If b is too small, additional bytes are discarded.
// This mirrors the behavior of large UDP datagrams received on a UDP socket (on Linux).
return copy(b, data[n:]), c.remoteAddr, nil
}
// WriteTo sends a UDP datagram to the target.
// The net.Addr parameter is ignored.
func (c *proxiedConn) WriteTo(p []byte, _ net.Addr) (n int, err error) {
data := make([]byte, 0, len(contextIDZero)+len(p))
data = append(data, contextIDZero...)
data = append(data, p...)
return len(p), c.str.SendDatagram(data)
}
func (c *proxiedConn) Close() error {
c.closed.Store(true)
c.str.CancelRead(quic.StreamErrorCode(http3.ErrCodeNoError))
err := c.str.Close()
<-c.readDone
c.readCtxCancel()
c.deadlineMx.Lock()
if c.readDeadlineTimer != nil {
c.readDeadlineTimer.Stop()
}
c.deadlineMx.Unlock()
return err
}
func (c *proxiedConn) LocalAddr() net.Addr {
return &masqueAddr{c.localAddr}
}
func (c *proxiedConn) SetDeadline(t time.Time) error {
_ = c.SetWriteDeadline(t)
return c.SetReadDeadline(t)
}
func (c *proxiedConn) SetReadDeadline(t time.Time) error {
c.deadlineMx.Lock()
defer c.deadlineMx.Unlock()
oldDeadline := c.deadline
c.deadline = t
now := time.Now()
// Stop the timer.
if t.IsZero() {
if c.readDeadlineTimer != nil && !c.readDeadlineTimer.Stop() {
<-c.readDeadlineTimer.C
}
return nil
}
// If the deadline already expired, cancel immediately.
if !t.After(now) {
c.readCtxCancel()
return nil
}
deadline := t.Sub(now)
// if we already have a timer, reset it
if c.readDeadlineTimer != nil {
// if that timer expired, create a new one
if now.Before(oldDeadline) {
c.readCtxCancel() // the old context might already have been cancelled, but that's not guaranteed
c.readCtx, c.readCtxCancel = context.WithCancel(context.Background())
}
c.readDeadlineTimer.Reset(deadline)
} else { // this is the first time the timer is set
c.readDeadlineTimer = time.AfterFunc(deadline, func() {
c.deadlineMx.Lock()
defer c.deadlineMx.Unlock()
if !c.deadline.IsZero() && c.deadline.Before(time.Now()) {
c.readCtxCancel()
}
})
}
return nil
}
func (c *proxiedConn) SetWriteDeadline(time.Time) error {
// TODO(#22): This is currently blocked on a change in quic-go's API.
return nil
}
func skipCapsules(str quicvarint.Reader) error {
for {
ct, r, err := http3.ParseCapsule(str)
if err != nil {
return err
}
log.Printf("skipping capsule of type %d", ct)
if _, err := io.Copy(io.Discard, r); err != nil {
return err
}
}
}