Source file src/net/http/netconn_test.go

     1  // Copyright 2024 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package http_test
     6  
     7  import (
     8  	"bytes"
     9  	"context"
    10  	"internal/synctest"
    11  	"io"
    12  	"math"
    13  	"net"
    14  	"net/netip"
    15  	"os"
    16  	"sync"
    17  	"time"
    18  )
    19  
    20  func fakeNetListen() *fakeNetListener {
    21  	li := &fakeNetListener{
    22  		setc:    make(chan struct{}, 1),
    23  		unsetc:  make(chan struct{}, 1),
    24  		addr:    netip.MustParseAddrPort("127.0.0.1:8000"),
    25  		locPort: 10000,
    26  	}
    27  	li.unsetc <- struct{}{}
    28  	return li
    29  }
    30  
    31  type fakeNetListener struct {
    32  	setc, unsetc chan struct{}
    33  	queue        []net.Conn
    34  	closed       bool
    35  	addr         netip.AddrPort
    36  	locPort      uint16
    37  
    38  	onDial  func()             // called when making a new connection
    39  	onClose func(*fakeNetConn) // called when closing a connection
    40  
    41  	trackConns bool // set this to record all created conns
    42  	conns      []*fakeNetConn
    43  }
    44  
    45  func (li *fakeNetListener) lock() {
    46  	select {
    47  	case <-li.setc:
    48  	case <-li.unsetc:
    49  	}
    50  }
    51  
    52  func (li *fakeNetListener) unlock() {
    53  	if li.closed || len(li.queue) > 0 {
    54  		li.setc <- struct{}{}
    55  	} else {
    56  		li.unsetc <- struct{}{}
    57  	}
    58  }
    59  
    60  func (li *fakeNetListener) connect() *fakeNetConn {
    61  	if li.onDial != nil {
    62  		li.onDial()
    63  	}
    64  	li.lock()
    65  	defer li.unlock()
    66  	locAddr := netip.AddrPortFrom(netip.AddrFrom4([4]byte{127, 0, 0, 1}), li.locPort)
    67  	li.locPort++
    68  	c0, c1 := fakeNetPipe(li.addr, locAddr)
    69  	c0.onClose = li.onClose
    70  	c1.onClose = li.onClose
    71  	li.queue = append(li.queue, c0)
    72  	if li.trackConns {
    73  		li.conns = append(li.conns, c0)
    74  	}
    75  	return c1
    76  }
    77  
    78  func (li *fakeNetListener) Accept() (net.Conn, error) {
    79  	<-li.setc
    80  	defer li.unlock()
    81  	if li.closed {
    82  		return nil, net.ErrClosed
    83  	}
    84  	c := li.queue[0]
    85  	li.queue = li.queue[1:]
    86  	return c, nil
    87  }
    88  
    89  func (li *fakeNetListener) Close() error {
    90  	li.lock()
    91  	defer li.unlock()
    92  	li.closed = true
    93  	return nil
    94  }
    95  
    96  func (li *fakeNetListener) Addr() net.Addr {
    97  	return net.TCPAddrFromAddrPort(li.addr)
    98  }
    99  
   100  // fakeNetPipe creates an in-memory, full duplex network connection.
   101  //
   102  // Unlike net.Pipe, the connection is not synchronous.
   103  // Writes are made to a buffer, and return immediately.
   104  // By default, the buffer size is unlimited.
   105  func fakeNetPipe(s1ap, s2ap netip.AddrPort) (r, w *fakeNetConn) {
   106  	s1addr := net.TCPAddrFromAddrPort(s1ap)
   107  	s2addr := net.TCPAddrFromAddrPort(s2ap)
   108  	s1 := newSynctestNetConnHalf(s1addr)
   109  	s2 := newSynctestNetConnHalf(s2addr)
   110  	c1 := &fakeNetConn{loc: s1, rem: s2}
   111  	c2 := &fakeNetConn{loc: s2, rem: s1}
   112  	c1.peer = c2
   113  	c2.peer = c1
   114  	return c1, c2
   115  }
   116  
   117  // A fakeNetConn is one endpoint of the connection created by fakeNetPipe.
   118  type fakeNetConn struct {
   119  	// local and remote connection halves.
   120  	// Each half contains a buffer.
   121  	// Reads pull from the local buffer, and writes push to the remote buffer.
   122  	loc, rem *fakeNetConnHalf
   123  
   124  	// When set, synctest.Wait is automatically called before reads and after writes.
   125  	autoWait bool
   126  
   127  	// peer is the other endpoint.
   128  	peer *fakeNetConn
   129  
   130  	onClose func(*fakeNetConn) // called when closing
   131  }
   132  
   133  // Read reads data from the connection.
   134  func (c *fakeNetConn) Read(b []byte) (n int, err error) {
   135  	if c.autoWait {
   136  		synctest.Wait()
   137  	}
   138  	return c.loc.read(b)
   139  }
   140  
   141  // Peek returns the available unread read buffer,
   142  // without consuming its contents.
   143  func (c *fakeNetConn) Peek() []byte {
   144  	if c.autoWait {
   145  		synctest.Wait()
   146  	}
   147  	return c.loc.peek()
   148  }
   149  
   150  // Write writes data to the connection.
   151  func (c *fakeNetConn) Write(b []byte) (n int, err error) {
   152  	if c.autoWait {
   153  		defer synctest.Wait()
   154  	}
   155  	return c.rem.write(b)
   156  }
   157  
   158  // IsClosed reports whether the peer has closed its end of the connection.
   159  func (c *fakeNetConn) IsClosedByPeer() bool {
   160  	if c.autoWait {
   161  		synctest.Wait()
   162  	}
   163  	c.rem.lock()
   164  	defer c.rem.unlock()
   165  	// If the remote half of the conn is returning ErrClosed,
   166  	// the peer has closed the connection.
   167  	return c.rem.readErr == net.ErrClosed
   168  }
   169  
   170  // Close closes the connection.
   171  func (c *fakeNetConn) Close() error {
   172  	if c.onClose != nil {
   173  		c.onClose(c)
   174  	}
   175  	// Local half of the conn is now closed.
   176  	c.loc.lock()
   177  	c.loc.writeErr = net.ErrClosed
   178  	c.loc.readErr = net.ErrClosed
   179  	c.loc.buf.Reset()
   180  	c.loc.unlock()
   181  	// Remote half of the connection reads EOF after reading any remaining data.
   182  	c.rem.lock()
   183  	if c.rem.readErr == nil {
   184  		c.rem.readErr = io.EOF
   185  	}
   186  	c.rem.writeErr = net.ErrClosed
   187  	c.rem.unlock()
   188  	if c.autoWait {
   189  		synctest.Wait()
   190  	}
   191  	return nil
   192  }
   193  
   194  // LocalAddr returns the (fake) local network address.
   195  func (c *fakeNetConn) LocalAddr() net.Addr {
   196  	return c.loc.addr
   197  }
   198  
   199  // LocalAddr returns the (fake) remote network address.
   200  func (c *fakeNetConn) RemoteAddr() net.Addr {
   201  	return c.rem.addr
   202  }
   203  
   204  // SetDeadline sets the read and write deadlines for the connection.
   205  func (c *fakeNetConn) SetDeadline(t time.Time) error {
   206  	c.SetReadDeadline(t)
   207  	c.SetWriteDeadline(t)
   208  	return nil
   209  }
   210  
   211  // SetReadDeadline sets the read deadline for the connection.
   212  func (c *fakeNetConn) SetReadDeadline(t time.Time) error {
   213  	c.loc.rctx.setDeadline(t)
   214  	return nil
   215  }
   216  
   217  // SetWriteDeadline sets the write deadline for the connection.
   218  func (c *fakeNetConn) SetWriteDeadline(t time.Time) error {
   219  	c.rem.wctx.setDeadline(t)
   220  	return nil
   221  }
   222  
   223  // SetReadBufferSize sets the read buffer limit for the connection.
   224  // Writes by the peer will block so long as the buffer is full.
   225  func (c *fakeNetConn) SetReadBufferSize(size int) {
   226  	c.loc.setReadBufferSize(size)
   227  }
   228  
   229  // fakeNetConnHalf is one data flow in the connection created by fakeNetPipe.
   230  // Each half contains a buffer. Writes to the half push to the buffer, and reads pull from it.
   231  type fakeNetConnHalf struct {
   232  	addr net.Addr
   233  
   234  	// Read and write timeouts.
   235  	rctx, wctx deadlineContext
   236  
   237  	// A half can be readable and/or writable.
   238  	//
   239  	// These four channels act as a lock,
   240  	// and allow waiting for readability/writability.
   241  	// When the half is unlocked, exactly one channel contains a value.
   242  	// When the half is locked, all channels are empty.
   243  	lockr  chan struct{} // readable
   244  	lockw  chan struct{} // writable
   245  	lockrw chan struct{} // readable and writable
   246  	lockc  chan struct{} // neither readable nor writable
   247  
   248  	bufMax   int // maximum buffer size
   249  	buf      bytes.Buffer
   250  	readErr  error // error returned by reads
   251  	writeErr error // error returned by writes
   252  }
   253  
   254  func newSynctestNetConnHalf(addr net.Addr) *fakeNetConnHalf {
   255  	h := &fakeNetConnHalf{
   256  		addr:   addr,
   257  		lockw:  make(chan struct{}, 1),
   258  		lockr:  make(chan struct{}, 1),
   259  		lockrw: make(chan struct{}, 1),
   260  		lockc:  make(chan struct{}, 1),
   261  		bufMax: math.MaxInt, // unlimited
   262  	}
   263  	h.unlock()
   264  	return h
   265  }
   266  
   267  // lock locks h.
   268  func (h *fakeNetConnHalf) lock() {
   269  	select {
   270  	case <-h.lockw: // writable
   271  	case <-h.lockr: // readable
   272  	case <-h.lockrw: // readable and writable
   273  	case <-h.lockc: // neither readable nor writable
   274  	}
   275  }
   276  
   277  // h unlocks h.
   278  func (h *fakeNetConnHalf) unlock() {
   279  	canRead := h.readErr != nil || h.buf.Len() > 0
   280  	canWrite := h.writeErr != nil || h.bufMax > h.buf.Len()
   281  	switch {
   282  	case canRead && canWrite:
   283  		h.lockrw <- struct{}{} // readable and writable
   284  	case canRead:
   285  		h.lockr <- struct{}{} // readable
   286  	case canWrite:
   287  		h.lockw <- struct{}{} // writable
   288  	default:
   289  		h.lockc <- struct{}{} // neither readable nor writable
   290  	}
   291  }
   292  
   293  // waitAndLockForRead waits until h is readable and locks it.
   294  func (h *fakeNetConnHalf) waitAndLockForRead() error {
   295  	// First a non-blocking select to see if we can make immediate progress.
   296  	// This permits using a canceled context for a non-blocking operation.
   297  	select {
   298  	case <-h.lockr:
   299  		return nil // readable
   300  	case <-h.lockrw:
   301  		return nil // readable and writable
   302  	default:
   303  	}
   304  	ctx := h.rctx.context()
   305  	select {
   306  	case <-h.lockr:
   307  		return nil // readable
   308  	case <-h.lockrw:
   309  		return nil // readable and writable
   310  	case <-ctx.Done():
   311  		return context.Cause(ctx)
   312  	}
   313  }
   314  
   315  // waitAndLockForWrite waits until h is writable and locks it.
   316  func (h *fakeNetConnHalf) waitAndLockForWrite() error {
   317  	// First a non-blocking select to see if we can make immediate progress.
   318  	// This permits using a canceled context for a non-blocking operation.
   319  	select {
   320  	case <-h.lockw:
   321  		return nil // writable
   322  	case <-h.lockrw:
   323  		return nil // readable and writable
   324  	default:
   325  	}
   326  	ctx := h.wctx.context()
   327  	select {
   328  	case <-h.lockw:
   329  		return nil // writable
   330  	case <-h.lockrw:
   331  		return nil // readable and writable
   332  	case <-ctx.Done():
   333  		return context.Cause(ctx)
   334  	}
   335  }
   336  
   337  func (h *fakeNetConnHalf) peek() []byte {
   338  	h.lock()
   339  	defer h.unlock()
   340  	return h.buf.Bytes()
   341  }
   342  
   343  func (h *fakeNetConnHalf) read(b []byte) (n int, err error) {
   344  	if err := h.waitAndLockForRead(); err != nil {
   345  		return 0, err
   346  	}
   347  	defer h.unlock()
   348  	if h.buf.Len() == 0 && h.readErr != nil {
   349  		return 0, h.readErr
   350  	}
   351  	return h.buf.Read(b)
   352  }
   353  
   354  func (h *fakeNetConnHalf) setReadBufferSize(size int) {
   355  	h.lock()
   356  	defer h.unlock()
   357  	h.bufMax = size
   358  }
   359  
   360  func (h *fakeNetConnHalf) write(b []byte) (n int, err error) {
   361  	for n < len(b) {
   362  		nn, err := h.writePartial(b[n:])
   363  		n += nn
   364  		if err != nil {
   365  			return n, err
   366  		}
   367  	}
   368  	return n, nil
   369  }
   370  
   371  func (h *fakeNetConnHalf) writePartial(b []byte) (n int, err error) {
   372  	if err := h.waitAndLockForWrite(); err != nil {
   373  		return 0, err
   374  	}
   375  	defer h.unlock()
   376  	if h.writeErr != nil {
   377  		return 0, h.writeErr
   378  	}
   379  	writeMax := h.bufMax - h.buf.Len()
   380  	if writeMax < len(b) {
   381  		b = b[:writeMax]
   382  	}
   383  	return h.buf.Write(b)
   384  }
   385  
   386  // deadlineContext converts a changable deadline (as in net.Conn.SetDeadline) into a Context.
   387  type deadlineContext struct {
   388  	mu     sync.Mutex
   389  	ctx    context.Context
   390  	cancel context.CancelCauseFunc
   391  	timer  *time.Timer
   392  }
   393  
   394  // context returns a Context which expires when the deadline does.
   395  func (t *deadlineContext) context() context.Context {
   396  	t.mu.Lock()
   397  	defer t.mu.Unlock()
   398  	if t.ctx == nil {
   399  		t.ctx, t.cancel = context.WithCancelCause(context.Background())
   400  	}
   401  	return t.ctx
   402  }
   403  
   404  // setDeadline sets the current deadline.
   405  func (t *deadlineContext) setDeadline(deadline time.Time) {
   406  	t.mu.Lock()
   407  	defer t.mu.Unlock()
   408  	// If t.ctx is non-nil and t.cancel is nil, then t.ctx was canceled
   409  	// and we should create a new one.
   410  	if t.ctx == nil || t.cancel == nil {
   411  		t.ctx, t.cancel = context.WithCancelCause(context.Background())
   412  	}
   413  	// Stop any existing deadline from expiring.
   414  	if t.timer != nil {
   415  		t.timer.Stop()
   416  	}
   417  	if deadline.IsZero() {
   418  		// No deadline.
   419  		return
   420  	}
   421  	now := time.Now()
   422  	if !deadline.After(now) {
   423  		// Deadline has already expired.
   424  		t.cancel(os.ErrDeadlineExceeded)
   425  		t.cancel = nil
   426  		return
   427  	}
   428  	if t.timer != nil {
   429  		// Reuse existing deadline timer.
   430  		t.timer.Reset(deadline.Sub(now))
   431  		return
   432  	}
   433  	// Create a new timer to cancel the context at the deadline.
   434  	t.timer = time.AfterFunc(deadline.Sub(now), func() {
   435  		t.mu.Lock()
   436  		defer t.mu.Unlock()
   437  		t.cancel(os.ErrDeadlineExceeded)
   438  		t.cancel = nil
   439  	})
   440  }
   441  

View as plain text