Source file
src/net/http/netconn_test.go
1
2
3
4
5 package http_test
6
7 import (
8 "bytes"
9 "context"
10 "internal/synctest"
11 "io"
12 "math"
13 "net"
14 "net/netip"
15 "os"
16 "sync"
17 "time"
18 )
19
20 func fakeNetListen() *fakeNetListener {
21 li := &fakeNetListener{
22 setc: make(chan struct{}, 1),
23 unsetc: make(chan struct{}, 1),
24 addr: netip.MustParseAddrPort("127.0.0.1:8000"),
25 locPort: 10000,
26 }
27 li.unsetc <- struct{}{}
28 return li
29 }
30
31 type fakeNetListener struct {
32 setc, unsetc chan struct{}
33 queue []net.Conn
34 closed bool
35 addr netip.AddrPort
36 locPort uint16
37
38 onDial func()
39
40 trackConns bool
41 conns []*fakeNetConn
42 }
43
44 func (li *fakeNetListener) lock() {
45 select {
46 case <-li.setc:
47 case <-li.unsetc:
48 }
49 }
50
51 func (li *fakeNetListener) unlock() {
52 if li.closed || len(li.queue) > 0 {
53 li.setc <- struct{}{}
54 } else {
55 li.unsetc <- struct{}{}
56 }
57 }
58
59 func (li *fakeNetListener) connect() *fakeNetConn {
60 if li.onDial != nil {
61 li.onDial()
62 }
63 li.lock()
64 defer li.unlock()
65 locAddr := netip.AddrPortFrom(netip.AddrFrom4([4]byte{127, 0, 0, 1}), li.locPort)
66 li.locPort++
67 c0, c1 := fakeNetPipe(li.addr, locAddr)
68 li.queue = append(li.queue, c0)
69 if li.trackConns {
70 li.conns = append(li.conns, c0)
71 }
72 return c1
73 }
74
75 func (li *fakeNetListener) Accept() (net.Conn, error) {
76 <-li.setc
77 defer li.unlock()
78 if li.closed {
79 return nil, net.ErrClosed
80 }
81 c := li.queue[0]
82 li.queue = li.queue[1:]
83 return c, nil
84 }
85
86 func (li *fakeNetListener) Close() error {
87 li.lock()
88 defer li.unlock()
89 li.closed = true
90 return nil
91 }
92
93 func (li *fakeNetListener) Addr() net.Addr {
94 return net.TCPAddrFromAddrPort(li.addr)
95 }
96
97
98
99
100
101
102 func fakeNetPipe(s1ap, s2ap netip.AddrPort) (r, w *fakeNetConn) {
103 s1addr := net.TCPAddrFromAddrPort(s1ap)
104 s2addr := net.TCPAddrFromAddrPort(s2ap)
105 s1 := newSynctestNetConnHalf(s1addr)
106 s2 := newSynctestNetConnHalf(s2addr)
107 c1 := &fakeNetConn{loc: s1, rem: s2}
108 c2 := &fakeNetConn{loc: s2, rem: s1}
109 c1.peer = c2
110 c2.peer = c1
111 return c1, c2
112 }
113
114
115 type fakeNetConn struct {
116
117
118
119 loc, rem *fakeNetConnHalf
120
121
122 autoWait bool
123
124
125 peer *fakeNetConn
126
127 onClose func()
128 }
129
130
131 func (c *fakeNetConn) Read(b []byte) (n int, err error) {
132 if c.autoWait {
133 synctest.Wait()
134 }
135 return c.loc.read(b)
136 }
137
138
139
140 func (c *fakeNetConn) Peek() []byte {
141 if c.autoWait {
142 synctest.Wait()
143 }
144 return c.loc.peek()
145 }
146
147
148 func (c *fakeNetConn) Write(b []byte) (n int, err error) {
149 if c.autoWait {
150 defer synctest.Wait()
151 }
152 return c.rem.write(b)
153 }
154
155
156 func (c *fakeNetConn) IsClosedByPeer() bool {
157 if c.autoWait {
158 synctest.Wait()
159 }
160 c.rem.lock()
161 defer c.rem.unlock()
162
163
164 return c.rem.readErr == net.ErrClosed
165 }
166
167
168 func (c *fakeNetConn) Close() error {
169 if c.onClose != nil {
170 c.onClose()
171 }
172
173 c.loc.lock()
174 c.loc.writeErr = net.ErrClosed
175 c.loc.readErr = net.ErrClosed
176 c.loc.buf.Reset()
177 c.loc.unlock()
178
179 c.rem.lock()
180 if c.rem.readErr != nil {
181 c.rem.readErr = io.EOF
182 }
183 c.rem.unlock()
184 if c.autoWait {
185 synctest.Wait()
186 }
187 return nil
188 }
189
190
191 func (c *fakeNetConn) LocalAddr() net.Addr {
192 return c.loc.addr
193 }
194
195
196 func (c *fakeNetConn) RemoteAddr() net.Addr {
197 return c.rem.addr
198 }
199
200
201 func (c *fakeNetConn) SetDeadline(t time.Time) error {
202 c.SetReadDeadline(t)
203 c.SetWriteDeadline(t)
204 return nil
205 }
206
207
208 func (c *fakeNetConn) SetReadDeadline(t time.Time) error {
209 c.loc.rctx.setDeadline(t)
210 return nil
211 }
212
213
214 func (c *fakeNetConn) SetWriteDeadline(t time.Time) error {
215 c.rem.wctx.setDeadline(t)
216 return nil
217 }
218
219
220
221 func (c *fakeNetConn) SetReadBufferSize(size int) {
222 c.loc.setReadBufferSize(size)
223 }
224
225
226
227 type fakeNetConnHalf struct {
228 addr net.Addr
229
230
231 rctx, wctx deadlineContext
232
233
234
235
236
237
238
239 lockr chan struct{}
240 lockw chan struct{}
241 lockrw chan struct{}
242 lockc chan struct{}
243
244 bufMax int
245 buf bytes.Buffer
246 readErr error
247 writeErr error
248 }
249
250 func newSynctestNetConnHalf(addr net.Addr) *fakeNetConnHalf {
251 h := &fakeNetConnHalf{
252 addr: addr,
253 lockw: make(chan struct{}, 1),
254 lockr: make(chan struct{}, 1),
255 lockrw: make(chan struct{}, 1),
256 lockc: make(chan struct{}, 1),
257 bufMax: math.MaxInt,
258 }
259 h.unlock()
260 return h
261 }
262
263
264 func (h *fakeNetConnHalf) lock() {
265 select {
266 case <-h.lockw:
267 case <-h.lockr:
268 case <-h.lockrw:
269 case <-h.lockc:
270 }
271 }
272
273
274 func (h *fakeNetConnHalf) unlock() {
275 canRead := h.readErr != nil || h.buf.Len() > 0
276 canWrite := h.writeErr != nil || h.bufMax > h.buf.Len()
277 switch {
278 case canRead && canWrite:
279 h.lockrw <- struct{}{}
280 case canRead:
281 h.lockr <- struct{}{}
282 case canWrite:
283 h.lockw <- struct{}{}
284 default:
285 h.lockc <- struct{}{}
286 }
287 }
288
289
290 func (h *fakeNetConnHalf) waitAndLockForRead() error {
291
292
293 select {
294 case <-h.lockr:
295 return nil
296 case <-h.lockrw:
297 return nil
298 default:
299 }
300 ctx := h.rctx.context()
301 select {
302 case <-h.lockr:
303 return nil
304 case <-h.lockrw:
305 return nil
306 case <-ctx.Done():
307 return context.Cause(ctx)
308 }
309 }
310
311
312 func (h *fakeNetConnHalf) waitAndLockForWrite() error {
313
314
315 select {
316 case <-h.lockw:
317 return nil
318 case <-h.lockrw:
319 return nil
320 default:
321 }
322 ctx := h.wctx.context()
323 select {
324 case <-h.lockw:
325 return nil
326 case <-h.lockrw:
327 return nil
328 case <-ctx.Done():
329 return context.Cause(ctx)
330 }
331 }
332
333 func (h *fakeNetConnHalf) peek() []byte {
334 h.lock()
335 defer h.unlock()
336 return h.buf.Bytes()
337 }
338
339 func (h *fakeNetConnHalf) read(b []byte) (n int, err error) {
340 if err := h.waitAndLockForRead(); err != nil {
341 return 0, err
342 }
343 defer h.unlock()
344 if h.buf.Len() == 0 && h.readErr != nil {
345 return 0, h.readErr
346 }
347 return h.buf.Read(b)
348 }
349
350 func (h *fakeNetConnHalf) setReadBufferSize(size int) {
351 h.lock()
352 defer h.unlock()
353 h.bufMax = size
354 }
355
356 func (h *fakeNetConnHalf) write(b []byte) (n int, err error) {
357 for n < len(b) {
358 nn, err := h.writePartial(b[n:])
359 n += nn
360 if err != nil {
361 return n, err
362 }
363 }
364 return n, nil
365 }
366
367 func (h *fakeNetConnHalf) writePartial(b []byte) (n int, err error) {
368 if err := h.waitAndLockForWrite(); err != nil {
369 return 0, err
370 }
371 defer h.unlock()
372 if h.writeErr != nil {
373 return 0, h.writeErr
374 }
375 writeMax := h.bufMax - h.buf.Len()
376 if writeMax < len(b) {
377 b = b[:writeMax]
378 }
379 return h.buf.Write(b)
380 }
381
382
383 type deadlineContext struct {
384 mu sync.Mutex
385 ctx context.Context
386 cancel context.CancelCauseFunc
387 timer *time.Timer
388 }
389
390
391 func (t *deadlineContext) context() context.Context {
392 t.mu.Lock()
393 defer t.mu.Unlock()
394 if t.ctx == nil {
395 t.ctx, t.cancel = context.WithCancelCause(context.Background())
396 }
397 return t.ctx
398 }
399
400
401 func (t *deadlineContext) setDeadline(deadline time.Time) {
402 t.mu.Lock()
403 defer t.mu.Unlock()
404
405
406 if t.ctx == nil || t.cancel == nil {
407 t.ctx, t.cancel = context.WithCancelCause(context.Background())
408 }
409
410 if t.timer != nil {
411 t.timer.Stop()
412 }
413 if deadline.IsZero() {
414
415 return
416 }
417 now := time.Now()
418 if !deadline.After(now) {
419
420 t.cancel(os.ErrDeadlineExceeded)
421 t.cancel = nil
422 return
423 }
424 if t.timer != nil {
425
426 t.timer.Reset(deadline.Sub(now))
427 return
428 }
429
430 t.timer = time.AfterFunc(deadline.Sub(now), func() {
431 t.mu.Lock()
432 defer t.mu.Unlock()
433 t.cancel(os.ErrDeadlineExceeded)
434 t.cancel = nil
435 })
436 }
437
View as plain text