Source file src/net/http/netconn_test.go

     1  // Copyright 2024 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package http_test
     6  
     7  import (
     8  	"bytes"
     9  	"context"
    10  	"internal/synctest"
    11  	"io"
    12  	"math"
    13  	"net"
    14  	"net/netip"
    15  	"os"
    16  	"sync"
    17  	"time"
    18  )
    19  
    20  func fakeNetListen() *fakeNetListener {
    21  	li := &fakeNetListener{
    22  		setc:    make(chan struct{}, 1),
    23  		unsetc:  make(chan struct{}, 1),
    24  		addr:    netip.MustParseAddrPort("127.0.0.1:8000"),
    25  		locPort: 10000,
    26  	}
    27  	li.unsetc <- struct{}{}
    28  	return li
    29  }
    30  
    31  type fakeNetListener struct {
    32  	setc, unsetc chan struct{}
    33  	queue        []net.Conn
    34  	closed       bool
    35  	addr         netip.AddrPort
    36  	locPort      uint16
    37  
    38  	onDial func() // called when making a new connection
    39  
    40  	trackConns bool // set this to record all created conns
    41  	conns      []*fakeNetConn
    42  }
    43  
    44  func (li *fakeNetListener) lock() {
    45  	select {
    46  	case <-li.setc:
    47  	case <-li.unsetc:
    48  	}
    49  }
    50  
    51  func (li *fakeNetListener) unlock() {
    52  	if li.closed || len(li.queue) > 0 {
    53  		li.setc <- struct{}{}
    54  	} else {
    55  		li.unsetc <- struct{}{}
    56  	}
    57  }
    58  
    59  func (li *fakeNetListener) connect() *fakeNetConn {
    60  	if li.onDial != nil {
    61  		li.onDial()
    62  	}
    63  	li.lock()
    64  	defer li.unlock()
    65  	locAddr := netip.AddrPortFrom(netip.AddrFrom4([4]byte{127, 0, 0, 1}), li.locPort)
    66  	li.locPort++
    67  	c0, c1 := fakeNetPipe(li.addr, locAddr)
    68  	li.queue = append(li.queue, c0)
    69  	if li.trackConns {
    70  		li.conns = append(li.conns, c0)
    71  	}
    72  	return c1
    73  }
    74  
    75  func (li *fakeNetListener) Accept() (net.Conn, error) {
    76  	<-li.setc
    77  	defer li.unlock()
    78  	if li.closed {
    79  		return nil, net.ErrClosed
    80  	}
    81  	c := li.queue[0]
    82  	li.queue = li.queue[1:]
    83  	return c, nil
    84  }
    85  
    86  func (li *fakeNetListener) Close() error {
    87  	li.lock()
    88  	defer li.unlock()
    89  	li.closed = true
    90  	return nil
    91  }
    92  
    93  func (li *fakeNetListener) Addr() net.Addr {
    94  	return net.TCPAddrFromAddrPort(li.addr)
    95  }
    96  
    97  // fakeNetPipe creates an in-memory, full duplex network connection.
    98  //
    99  // Unlike net.Pipe, the connection is not synchronous.
   100  // Writes are made to a buffer, and return immediately.
   101  // By default, the buffer size is unlimited.
   102  func fakeNetPipe(s1ap, s2ap netip.AddrPort) (r, w *fakeNetConn) {
   103  	s1addr := net.TCPAddrFromAddrPort(s1ap)
   104  	s2addr := net.TCPAddrFromAddrPort(s2ap)
   105  	s1 := newSynctestNetConnHalf(s1addr)
   106  	s2 := newSynctestNetConnHalf(s2addr)
   107  	c1 := &fakeNetConn{loc: s1, rem: s2}
   108  	c2 := &fakeNetConn{loc: s2, rem: s1}
   109  	c1.peer = c2
   110  	c2.peer = c1
   111  	return c1, c2
   112  }
   113  
   114  // A fakeNetConn is one endpoint of the connection created by fakeNetPipe.
   115  type fakeNetConn struct {
   116  	// local and remote connection halves.
   117  	// Each half contains a buffer.
   118  	// Reads pull from the local buffer, and writes push to the remote buffer.
   119  	loc, rem *fakeNetConnHalf
   120  
   121  	// When set, synctest.Wait is automatically called before reads and after writes.
   122  	autoWait bool
   123  
   124  	// peer is the other endpoint.
   125  	peer *fakeNetConn
   126  
   127  	onClose func() // called when closing
   128  }
   129  
   130  // Read reads data from the connection.
   131  func (c *fakeNetConn) Read(b []byte) (n int, err error) {
   132  	if c.autoWait {
   133  		synctest.Wait()
   134  	}
   135  	return c.loc.read(b)
   136  }
   137  
   138  // Peek returns the available unread read buffer,
   139  // without consuming its contents.
   140  func (c *fakeNetConn) Peek() []byte {
   141  	if c.autoWait {
   142  		synctest.Wait()
   143  	}
   144  	return c.loc.peek()
   145  }
   146  
   147  // Write writes data to the connection.
   148  func (c *fakeNetConn) Write(b []byte) (n int, err error) {
   149  	if c.autoWait {
   150  		defer synctest.Wait()
   151  	}
   152  	return c.rem.write(b)
   153  }
   154  
   155  // IsClosed reports whether the peer has closed its end of the connection.
   156  func (c *fakeNetConn) IsClosedByPeer() bool {
   157  	if c.autoWait {
   158  		synctest.Wait()
   159  	}
   160  	c.rem.lock()
   161  	defer c.rem.unlock()
   162  	// If the remote half of the conn is returning ErrClosed,
   163  	// the peer has closed the connection.
   164  	return c.rem.readErr == net.ErrClosed
   165  }
   166  
   167  // Close closes the connection.
   168  func (c *fakeNetConn) Close() error {
   169  	if c.onClose != nil {
   170  		c.onClose()
   171  	}
   172  	// Local half of the conn is now closed.
   173  	c.loc.lock()
   174  	c.loc.writeErr = net.ErrClosed
   175  	c.loc.readErr = net.ErrClosed
   176  	c.loc.buf.Reset()
   177  	c.loc.unlock()
   178  	// Remote half of the connection reads EOF after reading any remaining data.
   179  	c.rem.lock()
   180  	if c.rem.readErr != nil {
   181  		c.rem.readErr = io.EOF
   182  	}
   183  	c.rem.unlock()
   184  	if c.autoWait {
   185  		synctest.Wait()
   186  	}
   187  	return nil
   188  }
   189  
   190  // LocalAddr returns the (fake) local network address.
   191  func (c *fakeNetConn) LocalAddr() net.Addr {
   192  	return c.loc.addr
   193  }
   194  
   195  // LocalAddr returns the (fake) remote network address.
   196  func (c *fakeNetConn) RemoteAddr() net.Addr {
   197  	return c.rem.addr
   198  }
   199  
   200  // SetDeadline sets the read and write deadlines for the connection.
   201  func (c *fakeNetConn) SetDeadline(t time.Time) error {
   202  	c.SetReadDeadline(t)
   203  	c.SetWriteDeadline(t)
   204  	return nil
   205  }
   206  
   207  // SetReadDeadline sets the read deadline for the connection.
   208  func (c *fakeNetConn) SetReadDeadline(t time.Time) error {
   209  	c.loc.rctx.setDeadline(t)
   210  	return nil
   211  }
   212  
   213  // SetWriteDeadline sets the write deadline for the connection.
   214  func (c *fakeNetConn) SetWriteDeadline(t time.Time) error {
   215  	c.rem.wctx.setDeadline(t)
   216  	return nil
   217  }
   218  
   219  // SetReadBufferSize sets the read buffer limit for the connection.
   220  // Writes by the peer will block so long as the buffer is full.
   221  func (c *fakeNetConn) SetReadBufferSize(size int) {
   222  	c.loc.setReadBufferSize(size)
   223  }
   224  
   225  // fakeNetConnHalf is one data flow in the connection created by fakeNetPipe.
   226  // Each half contains a buffer. Writes to the half push to the buffer, and reads pull from it.
   227  type fakeNetConnHalf struct {
   228  	addr net.Addr
   229  
   230  	// Read and write timeouts.
   231  	rctx, wctx deadlineContext
   232  
   233  	// A half can be readable and/or writable.
   234  	//
   235  	// These four channels act as a lock,
   236  	// and allow waiting for readability/writability.
   237  	// When the half is unlocked, exactly one channel contains a value.
   238  	// When the half is locked, all channels are empty.
   239  	lockr  chan struct{} // readable
   240  	lockw  chan struct{} // writable
   241  	lockrw chan struct{} // readable and writable
   242  	lockc  chan struct{} // neither readable nor writable
   243  
   244  	bufMax   int // maximum buffer size
   245  	buf      bytes.Buffer
   246  	readErr  error // error returned by reads
   247  	writeErr error // error returned by writes
   248  }
   249  
   250  func newSynctestNetConnHalf(addr net.Addr) *fakeNetConnHalf {
   251  	h := &fakeNetConnHalf{
   252  		addr:   addr,
   253  		lockw:  make(chan struct{}, 1),
   254  		lockr:  make(chan struct{}, 1),
   255  		lockrw: make(chan struct{}, 1),
   256  		lockc:  make(chan struct{}, 1),
   257  		bufMax: math.MaxInt, // unlimited
   258  	}
   259  	h.unlock()
   260  	return h
   261  }
   262  
   263  // lock locks h.
   264  func (h *fakeNetConnHalf) lock() {
   265  	select {
   266  	case <-h.lockw: // writable
   267  	case <-h.lockr: // readable
   268  	case <-h.lockrw: // readable and writable
   269  	case <-h.lockc: // neither readable nor writable
   270  	}
   271  }
   272  
   273  // h unlocks h.
   274  func (h *fakeNetConnHalf) unlock() {
   275  	canRead := h.readErr != nil || h.buf.Len() > 0
   276  	canWrite := h.writeErr != nil || h.bufMax > h.buf.Len()
   277  	switch {
   278  	case canRead && canWrite:
   279  		h.lockrw <- struct{}{} // readable and writable
   280  	case canRead:
   281  		h.lockr <- struct{}{} // readable
   282  	case canWrite:
   283  		h.lockw <- struct{}{} // writable
   284  	default:
   285  		h.lockc <- struct{}{} // neither readable nor writable
   286  	}
   287  }
   288  
   289  // waitAndLockForRead waits until h is readable and locks it.
   290  func (h *fakeNetConnHalf) waitAndLockForRead() error {
   291  	// First a non-blocking select to see if we can make immediate progress.
   292  	// This permits using a canceled context for a non-blocking operation.
   293  	select {
   294  	case <-h.lockr:
   295  		return nil // readable
   296  	case <-h.lockrw:
   297  		return nil // readable and writable
   298  	default:
   299  	}
   300  	ctx := h.rctx.context()
   301  	select {
   302  	case <-h.lockr:
   303  		return nil // readable
   304  	case <-h.lockrw:
   305  		return nil // readable and writable
   306  	case <-ctx.Done():
   307  		return context.Cause(ctx)
   308  	}
   309  }
   310  
   311  // waitAndLockForWrite waits until h is writable and locks it.
   312  func (h *fakeNetConnHalf) waitAndLockForWrite() error {
   313  	// First a non-blocking select to see if we can make immediate progress.
   314  	// This permits using a canceled context for a non-blocking operation.
   315  	select {
   316  	case <-h.lockw:
   317  		return nil // writable
   318  	case <-h.lockrw:
   319  		return nil // readable and writable
   320  	default:
   321  	}
   322  	ctx := h.wctx.context()
   323  	select {
   324  	case <-h.lockw:
   325  		return nil // writable
   326  	case <-h.lockrw:
   327  		return nil // readable and writable
   328  	case <-ctx.Done():
   329  		return context.Cause(ctx)
   330  	}
   331  }
   332  
   333  func (h *fakeNetConnHalf) peek() []byte {
   334  	h.lock()
   335  	defer h.unlock()
   336  	return h.buf.Bytes()
   337  }
   338  
   339  func (h *fakeNetConnHalf) read(b []byte) (n int, err error) {
   340  	if err := h.waitAndLockForRead(); err != nil {
   341  		return 0, err
   342  	}
   343  	defer h.unlock()
   344  	if h.buf.Len() == 0 && h.readErr != nil {
   345  		return 0, h.readErr
   346  	}
   347  	return h.buf.Read(b)
   348  }
   349  
   350  func (h *fakeNetConnHalf) setReadBufferSize(size int) {
   351  	h.lock()
   352  	defer h.unlock()
   353  	h.bufMax = size
   354  }
   355  
   356  func (h *fakeNetConnHalf) write(b []byte) (n int, err error) {
   357  	for n < len(b) {
   358  		nn, err := h.writePartial(b[n:])
   359  		n += nn
   360  		if err != nil {
   361  			return n, err
   362  		}
   363  	}
   364  	return n, nil
   365  }
   366  
   367  func (h *fakeNetConnHalf) writePartial(b []byte) (n int, err error) {
   368  	if err := h.waitAndLockForWrite(); err != nil {
   369  		return 0, err
   370  	}
   371  	defer h.unlock()
   372  	if h.writeErr != nil {
   373  		return 0, h.writeErr
   374  	}
   375  	writeMax := h.bufMax - h.buf.Len()
   376  	if writeMax < len(b) {
   377  		b = b[:writeMax]
   378  	}
   379  	return h.buf.Write(b)
   380  }
   381  
   382  // deadlineContext converts a changable deadline (as in net.Conn.SetDeadline) into a Context.
   383  type deadlineContext struct {
   384  	mu     sync.Mutex
   385  	ctx    context.Context
   386  	cancel context.CancelCauseFunc
   387  	timer  *time.Timer
   388  }
   389  
   390  // context returns a Context which expires when the deadline does.
   391  func (t *deadlineContext) context() context.Context {
   392  	t.mu.Lock()
   393  	defer t.mu.Unlock()
   394  	if t.ctx == nil {
   395  		t.ctx, t.cancel = context.WithCancelCause(context.Background())
   396  	}
   397  	return t.ctx
   398  }
   399  
   400  // setDeadline sets the current deadline.
   401  func (t *deadlineContext) setDeadline(deadline time.Time) {
   402  	t.mu.Lock()
   403  	defer t.mu.Unlock()
   404  	// If t.ctx is non-nil and t.cancel is nil, then t.ctx was canceled
   405  	// and we should create a new one.
   406  	if t.ctx == nil || t.cancel == nil {
   407  		t.ctx, t.cancel = context.WithCancelCause(context.Background())
   408  	}
   409  	// Stop any existing deadline from expiring.
   410  	if t.timer != nil {
   411  		t.timer.Stop()
   412  	}
   413  	if deadline.IsZero() {
   414  		// No deadline.
   415  		return
   416  	}
   417  	now := time.Now()
   418  	if !deadline.After(now) {
   419  		// Deadline has already expired.
   420  		t.cancel(os.ErrDeadlineExceeded)
   421  		t.cancel = nil
   422  		return
   423  	}
   424  	if t.timer != nil {
   425  		// Reuse existing deadline timer.
   426  		t.timer.Reset(deadline.Sub(now))
   427  		return
   428  	}
   429  	// Create a new timer to cancel the context at the deadline.
   430  	t.timer = time.AfterFunc(deadline.Sub(now), func() {
   431  		t.mu.Lock()
   432  		defer t.mu.Unlock()
   433  		t.cancel(os.ErrDeadlineExceeded)
   434  		t.cancel = nil
   435  	})
   436  }
   437  

View as plain text