Source file src/runtime/chan.go

     1  // Copyright 2014 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package runtime
     6  
     7  // This file contains the implementation of Go channels.
     8  
     9  // Invariants:
    10  //  At least one of c.sendq and c.recvq is empty,
    11  //  except for the case of an unbuffered channel with a single goroutine
    12  //  blocked on it for both sending and receiving using a select statement,
    13  //  in which case the length of c.sendq and c.recvq is limited only by the
    14  //  size of the select statement.
    15  //
    16  // For buffered channels, also:
    17  //  c.qcount > 0 implies that c.recvq is empty.
    18  //  c.qcount < c.dataqsiz implies that c.sendq is empty.
    19  
    20  import (
    21  	"internal/abi"
    22  	"internal/runtime/atomic"
    23  	"runtime/internal/math"
    24  	"unsafe"
    25  )
    26  
    27  const (
    28  	maxAlign  = 8
    29  	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
    30  	debugChan = false
    31  )
    32  
    33  type hchan struct {
    34  	qcount   uint           // total data in the queue
    35  	dataqsiz uint           // size of the circular queue
    36  	buf      unsafe.Pointer // points to an array of dataqsiz elements
    37  	elemsize uint16
    38  	closed   uint32
    39  	timer    *timer // timer feeding this chan
    40  	elemtype *_type // element type
    41  	sendx    uint   // send index
    42  	recvx    uint   // receive index
    43  	recvq    waitq  // list of recv waiters
    44  	sendq    waitq  // list of send waiters
    45  
    46  	// lock protects all fields in hchan, as well as several
    47  	// fields in sudogs blocked on this channel.
    48  	//
    49  	// Do not change another G's status while holding this lock
    50  	// (in particular, do not ready a G), as this can deadlock
    51  	// with stack shrinking.
    52  	lock mutex
    53  }
    54  
    55  type waitq struct {
    56  	first *sudog
    57  	last  *sudog
    58  }
    59  
    60  //go:linkname reflect_makechan reflect.makechan
    61  func reflect_makechan(t *chantype, size int) *hchan {
    62  	return makechan(t, size)
    63  }
    64  
    65  func makechan64(t *chantype, size int64) *hchan {
    66  	if int64(int(size)) != size {
    67  		panic(plainError("makechan: size out of range"))
    68  	}
    69  
    70  	return makechan(t, int(size))
    71  }
    72  
    73  func makechan(t *chantype, size int) *hchan {
    74  	elem := t.Elem
    75  
    76  	// compiler checks this but be safe.
    77  	if elem.Size_ >= 1<<16 {
    78  		throw("makechan: invalid channel element type")
    79  	}
    80  	if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
    81  		throw("makechan: bad alignment")
    82  	}
    83  
    84  	mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
    85  	if overflow || mem > maxAlloc-hchanSize || size < 0 {
    86  		panic(plainError("makechan: size out of range"))
    87  	}
    88  
    89  	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    90  	// buf points into the same allocation, elemtype is persistent.
    91  	// SudoG's are referenced from their owning thread so they can't be collected.
    92  	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    93  	var c *hchan
    94  	switch {
    95  	case mem == 0:
    96  		// Queue or element size is zero.
    97  		c = (*hchan)(mallocgc(hchanSize, nil, true))
    98  		// Race detector uses this location for synchronization.
    99  		c.buf = c.raceaddr()
   100  	case !elem.Pointers():
   101  		// Elements do not contain pointers.
   102  		// Allocate hchan and buf in one call.
   103  		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
   104  		c.buf = add(unsafe.Pointer(c), hchanSize)
   105  	default:
   106  		// Elements contain pointers.
   107  		c = new(hchan)
   108  		c.buf = mallocgc(mem, elem, true)
   109  	}
   110  
   111  	c.elemsize = uint16(elem.Size_)
   112  	c.elemtype = elem
   113  	c.dataqsiz = uint(size)
   114  	lockInit(&c.lock, lockRankHchan)
   115  
   116  	if debugChan {
   117  		print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
   118  	}
   119  	return c
   120  }
   121  
   122  // chanbuf(c, i) is pointer to the i'th slot in the buffer.
   123  func chanbuf(c *hchan, i uint) unsafe.Pointer {
   124  	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
   125  }
   126  
   127  // full reports whether a send on c would block (that is, the channel is full).
   128  // It uses a single word-sized read of mutable state, so although
   129  // the answer is instantaneously true, the correct answer may have changed
   130  // by the time the calling function receives the return value.
   131  func full(c *hchan) bool {
   132  	// c.dataqsiz is immutable (never written after the channel is created)
   133  	// so it is safe to read at any time during channel operation.
   134  	if c.dataqsiz == 0 {
   135  		// Assumes that a pointer read is relaxed-atomic.
   136  		return c.recvq.first == nil
   137  	}
   138  	// Assumes that a uint read is relaxed-atomic.
   139  	return c.qcount == c.dataqsiz
   140  }
   141  
   142  // entry point for c <- x from compiled code.
   143  //
   144  //go:nosplit
   145  func chansend1(c *hchan, elem unsafe.Pointer) {
   146  	chansend(c, elem, true, getcallerpc())
   147  }
   148  
   149  /*
   150   * generic single channel send/recv
   151   * If block is not nil,
   152   * then the protocol will not
   153   * sleep but return if it could
   154   * not complete.
   155   *
   156   * sleep can wake up with g.param == nil
   157   * when a channel involved in the sleep has
   158   * been closed.  it is easiest to loop and re-run
   159   * the operation; we'll see that it's now closed.
   160   */
   161  func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   162  	if c == nil {
   163  		if !block {
   164  			return false
   165  		}
   166  		gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
   167  		throw("unreachable")
   168  	}
   169  
   170  	if debugChan {
   171  		print("chansend: chan=", c, "\n")
   172  	}
   173  
   174  	if raceenabled {
   175  		racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
   176  	}
   177  
   178  	// Fast path: check for failed non-blocking operation without acquiring the lock.
   179  	//
   180  	// After observing that the channel is not closed, we observe that the channel is
   181  	// not ready for sending. Each of these observations is a single word-sized read
   182  	// (first c.closed and second full()).
   183  	// Because a closed channel cannot transition from 'ready for sending' to
   184  	// 'not ready for sending', even if the channel is closed between the two observations,
   185  	// they imply a moment between the two when the channel was both not yet closed
   186  	// and not ready for sending. We behave as if we observed the channel at that moment,
   187  	// and report that the send cannot proceed.
   188  	//
   189  	// It is okay if the reads are reordered here: if we observe that the channel is not
   190  	// ready for sending and then observe that it is not closed, that implies that the
   191  	// channel wasn't closed during the first observation. However, nothing here
   192  	// guarantees forward progress. We rely on the side effects of lock release in
   193  	// chanrecv() and closechan() to update this thread's view of c.closed and full().
   194  	if !block && c.closed == 0 && full(c) {
   195  		return false
   196  	}
   197  
   198  	var t0 int64
   199  	if blockprofilerate > 0 {
   200  		t0 = cputicks()
   201  	}
   202  
   203  	lock(&c.lock)
   204  
   205  	if c.closed != 0 {
   206  		unlock(&c.lock)
   207  		panic(plainError("send on closed channel"))
   208  	}
   209  
   210  	if sg := c.recvq.dequeue(); sg != nil {
   211  		// Found a waiting receiver. We pass the value we want to send
   212  		// directly to the receiver, bypassing the channel buffer (if any).
   213  		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
   214  		return true
   215  	}
   216  
   217  	if c.qcount < c.dataqsiz {
   218  		// Space is available in the channel buffer. Enqueue the element to send.
   219  		qp := chanbuf(c, c.sendx)
   220  		if raceenabled {
   221  			racenotify(c, c.sendx, nil)
   222  		}
   223  		typedmemmove(c.elemtype, qp, ep)
   224  		c.sendx++
   225  		if c.sendx == c.dataqsiz {
   226  			c.sendx = 0
   227  		}
   228  		c.qcount++
   229  		unlock(&c.lock)
   230  		return true
   231  	}
   232  
   233  	if !block {
   234  		unlock(&c.lock)
   235  		return false
   236  	}
   237  
   238  	// Block on the channel. Some receiver will complete our operation for us.
   239  	gp := getg()
   240  	mysg := acquireSudog()
   241  	mysg.releasetime = 0
   242  	if t0 != 0 {
   243  		mysg.releasetime = -1
   244  	}
   245  	// No stack splits between assigning elem and enqueuing mysg
   246  	// on gp.waiting where copystack can find it.
   247  	mysg.elem = ep
   248  	mysg.waitlink = nil
   249  	mysg.g = gp
   250  	mysg.isSelect = false
   251  	mysg.c = c
   252  	gp.waiting = mysg
   253  	gp.param = nil
   254  	c.sendq.enqueue(mysg)
   255  	// Signal to anyone trying to shrink our stack that we're about
   256  	// to park on a channel. The window between when this G's status
   257  	// changes and when we set gp.activeStackChans is not safe for
   258  	// stack shrinking.
   259  	gp.parkingOnChan.Store(true)
   260  	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
   261  	// Ensure the value being sent is kept alive until the
   262  	// receiver copies it out. The sudog has a pointer to the
   263  	// stack object, but sudogs aren't considered as roots of the
   264  	// stack tracer.
   265  	KeepAlive(ep)
   266  
   267  	// someone woke us up.
   268  	if mysg != gp.waiting {
   269  		throw("G waiting list is corrupted")
   270  	}
   271  	gp.waiting = nil
   272  	gp.activeStackChans = false
   273  	closed := !mysg.success
   274  	gp.param = nil
   275  	if mysg.releasetime > 0 {
   276  		blockevent(mysg.releasetime-t0, 2)
   277  	}
   278  	mysg.c = nil
   279  	releaseSudog(mysg)
   280  	if closed {
   281  		if c.closed == 0 {
   282  			throw("chansend: spurious wakeup")
   283  		}
   284  		panic(plainError("send on closed channel"))
   285  	}
   286  	return true
   287  }
   288  
   289  // send processes a send operation on an empty channel c.
   290  // The value ep sent by the sender is copied to the receiver sg.
   291  // The receiver is then woken up to go on its merry way.
   292  // Channel c must be empty and locked.  send unlocks c with unlockf.
   293  // sg must already be dequeued from c.
   294  // ep must be non-nil and point to the heap or the caller's stack.
   295  func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   296  	if raceenabled {
   297  		if c.dataqsiz == 0 {
   298  			racesync(c, sg)
   299  		} else {
   300  			// Pretend we go through the buffer, even though
   301  			// we copy directly. Note that we need to increment
   302  			// the head/tail locations only when raceenabled.
   303  			racenotify(c, c.recvx, nil)
   304  			racenotify(c, c.recvx, sg)
   305  			c.recvx++
   306  			if c.recvx == c.dataqsiz {
   307  				c.recvx = 0
   308  			}
   309  			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   310  		}
   311  	}
   312  	if sg.elem != nil {
   313  		sendDirect(c.elemtype, sg, ep)
   314  		sg.elem = nil
   315  	}
   316  	gp := sg.g
   317  	unlockf()
   318  	gp.param = unsafe.Pointer(sg)
   319  	sg.success = true
   320  	if sg.releasetime != 0 {
   321  		sg.releasetime = cputicks()
   322  	}
   323  	goready(gp, skip+1)
   324  }
   325  
   326  // timerchandrain removes all elements in channel c's buffer.
   327  // It reports whether any elements were removed.
   328  // Because it is only intended for timers, it does not
   329  // handle waiting senders at all (all timer channels
   330  // use non-blocking sends to fill the buffer).
   331  func timerchandrain(c *hchan) bool {
   332  	// Note: Cannot use empty(c) because we are called
   333  	// while holding c.timer.sendLock, and empty(c) will
   334  	// call c.timer.maybeRunChan, which will deadlock.
   335  	// We are emptying the channel, so we only care about
   336  	// the count, not about potentially filling it up.
   337  	if atomic.Loaduint(&c.qcount) == 0 {
   338  		return false
   339  	}
   340  	lock(&c.lock)
   341  	any := false
   342  	for c.qcount > 0 {
   343  		any = true
   344  		typedmemclr(c.elemtype, chanbuf(c, c.recvx))
   345  		c.recvx++
   346  		if c.recvx == c.dataqsiz {
   347  			c.recvx = 0
   348  		}
   349  		c.qcount--
   350  	}
   351  	unlock(&c.lock)
   352  	return any
   353  }
   354  
   355  // Sends and receives on unbuffered or empty-buffered channels are the
   356  // only operations where one running goroutine writes to the stack of
   357  // another running goroutine. The GC assumes that stack writes only
   358  // happen when the goroutine is running and are only done by that
   359  // goroutine. Using a write barrier is sufficient to make up for
   360  // violating that assumption, but the write barrier has to work.
   361  // typedmemmove will call bulkBarrierPreWrite, but the target bytes
   362  // are not in the heap, so that will not help. We arrange to call
   363  // memmove and typeBitsBulkBarrier instead.
   364  
   365  func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
   366  	// src is on our stack, dst is a slot on another stack.
   367  
   368  	// Once we read sg.elem out of sg, it will no longer
   369  	// be updated if the destination's stack gets copied (shrunk).
   370  	// So make sure that no preemption points can happen between read & use.
   371  	dst := sg.elem
   372  	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
   373  	// No need for cgo write barrier checks because dst is always
   374  	// Go memory.
   375  	memmove(dst, src, t.Size_)
   376  }
   377  
   378  func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
   379  	// dst is on our stack or the heap, src is on another stack.
   380  	// The channel is locked, so src will not move during this
   381  	// operation.
   382  	src := sg.elem
   383  	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
   384  	memmove(dst, src, t.Size_)
   385  }
   386  
   387  func closechan(c *hchan) {
   388  	if c == nil {
   389  		panic(plainError("close of nil channel"))
   390  	}
   391  
   392  	lock(&c.lock)
   393  	if c.closed != 0 {
   394  		unlock(&c.lock)
   395  		panic(plainError("close of closed channel"))
   396  	}
   397  
   398  	if raceenabled {
   399  		callerpc := getcallerpc()
   400  		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
   401  		racerelease(c.raceaddr())
   402  	}
   403  
   404  	c.closed = 1
   405  
   406  	var glist gList
   407  
   408  	// release all readers
   409  	for {
   410  		sg := c.recvq.dequeue()
   411  		if sg == nil {
   412  			break
   413  		}
   414  		if sg.elem != nil {
   415  			typedmemclr(c.elemtype, sg.elem)
   416  			sg.elem = nil
   417  		}
   418  		if sg.releasetime != 0 {
   419  			sg.releasetime = cputicks()
   420  		}
   421  		gp := sg.g
   422  		gp.param = unsafe.Pointer(sg)
   423  		sg.success = false
   424  		if raceenabled {
   425  			raceacquireg(gp, c.raceaddr())
   426  		}
   427  		glist.push(gp)
   428  	}
   429  
   430  	// release all writers (they will panic)
   431  	for {
   432  		sg := c.sendq.dequeue()
   433  		if sg == nil {
   434  			break
   435  		}
   436  		sg.elem = nil
   437  		if sg.releasetime != 0 {
   438  			sg.releasetime = cputicks()
   439  		}
   440  		gp := sg.g
   441  		gp.param = unsafe.Pointer(sg)
   442  		sg.success = false
   443  		if raceenabled {
   444  			raceacquireg(gp, c.raceaddr())
   445  		}
   446  		glist.push(gp)
   447  	}
   448  	unlock(&c.lock)
   449  
   450  	// Ready all Gs now that we've dropped the channel lock.
   451  	for !glist.empty() {
   452  		gp := glist.pop()
   453  		gp.schedlink = 0
   454  		goready(gp, 3)
   455  	}
   456  }
   457  
   458  // empty reports whether a read from c would block (that is, the channel is
   459  // empty).  It is atomically correct and sequentially consistent at the moment
   460  // it returns, but since the channel is unlocked, the channel may become
   461  // non-empty immediately afterward.
   462  func empty(c *hchan) bool {
   463  	// c.dataqsiz is immutable.
   464  	if c.dataqsiz == 0 {
   465  		return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
   466  	}
   467  	// c.timer is also immutable (it is set after make(chan) but before any channel operations).
   468  	// All timer channels have dataqsiz > 0.
   469  	if c.timer != nil {
   470  		c.timer.maybeRunChan()
   471  	}
   472  	return atomic.Loaduint(&c.qcount) == 0
   473  }
   474  
   475  // entry points for <- c from compiled code.
   476  //
   477  //go:nosplit
   478  func chanrecv1(c *hchan, elem unsafe.Pointer) {
   479  	chanrecv(c, elem, true)
   480  }
   481  
   482  //go:nosplit
   483  func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
   484  	_, received = chanrecv(c, elem, true)
   485  	return
   486  }
   487  
   488  // chanrecv receives on channel c and writes the received data to ep.
   489  // ep may be nil, in which case received data is ignored.
   490  // If block == false and no elements are available, returns (false, false).
   491  // Otherwise, if c is closed, zeros *ep and returns (true, false).
   492  // Otherwise, fills in *ep with an element and returns (true, true).
   493  // A non-nil ep must point to the heap or the caller's stack.
   494  func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   495  	// raceenabled: don't need to check ep, as it is always on the stack
   496  	// or is new memory allocated by reflect.
   497  
   498  	if debugChan {
   499  		print("chanrecv: chan=", c, "\n")
   500  	}
   501  
   502  	if c == nil {
   503  		if !block {
   504  			return
   505  		}
   506  		gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
   507  		throw("unreachable")
   508  	}
   509  
   510  	if c.timer != nil {
   511  		c.timer.maybeRunChan()
   512  	}
   513  
   514  	// Fast path: check for failed non-blocking operation without acquiring the lock.
   515  	if !block && empty(c) {
   516  		// After observing that the channel is not ready for receiving, we observe whether the
   517  		// channel is closed.
   518  		//
   519  		// Reordering of these checks could lead to incorrect behavior when racing with a close.
   520  		// For example, if the channel was open and not empty, was closed, and then drained,
   521  		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
   522  		// we use atomic loads for both checks, and rely on emptying and closing to happen in
   523  		// separate critical sections under the same lock.  This assumption fails when closing
   524  		// an unbuffered channel with a blocked send, but that is an error condition anyway.
   525  		if atomic.Load(&c.closed) == 0 {
   526  			// Because a channel cannot be reopened, the later observation of the channel
   527  			// being not closed implies that it was also not closed at the moment of the
   528  			// first observation. We behave as if we observed the channel at that moment
   529  			// and report that the receive cannot proceed.
   530  			return
   531  		}
   532  		// The channel is irreversibly closed. Re-check whether the channel has any pending data
   533  		// to receive, which could have arrived between the empty and closed checks above.
   534  		// Sequential consistency is also required here, when racing with such a send.
   535  		if empty(c) {
   536  			// The channel is irreversibly closed and empty.
   537  			if raceenabled {
   538  				raceacquire(c.raceaddr())
   539  			}
   540  			if ep != nil {
   541  				typedmemclr(c.elemtype, ep)
   542  			}
   543  			return true, false
   544  		}
   545  	}
   546  
   547  	var t0 int64
   548  	if blockprofilerate > 0 {
   549  		t0 = cputicks()
   550  	}
   551  
   552  	lock(&c.lock)
   553  
   554  	if c.closed != 0 {
   555  		if c.qcount == 0 {
   556  			if raceenabled {
   557  				raceacquire(c.raceaddr())
   558  			}
   559  			unlock(&c.lock)
   560  			if ep != nil {
   561  				typedmemclr(c.elemtype, ep)
   562  			}
   563  			return true, false
   564  		}
   565  		// The channel has been closed, but the channel's buffer have data.
   566  	} else {
   567  		// Just found waiting sender with not closed.
   568  		if sg := c.sendq.dequeue(); sg != nil {
   569  			// Found a waiting sender. If buffer is size 0, receive value
   570  			// directly from sender. Otherwise, receive from head of queue
   571  			// and add sender's value to the tail of the queue (both map to
   572  			// the same buffer slot because the queue is full).
   573  			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
   574  			return true, true
   575  		}
   576  	}
   577  
   578  	if c.qcount > 0 {
   579  		// Receive directly from queue
   580  		qp := chanbuf(c, c.recvx)
   581  		if raceenabled {
   582  			racenotify(c, c.recvx, nil)
   583  		}
   584  		if ep != nil {
   585  			typedmemmove(c.elemtype, ep, qp)
   586  		}
   587  		typedmemclr(c.elemtype, qp)
   588  		c.recvx++
   589  		if c.recvx == c.dataqsiz {
   590  			c.recvx = 0
   591  		}
   592  		c.qcount--
   593  		unlock(&c.lock)
   594  		return true, true
   595  	}
   596  
   597  	if !block {
   598  		unlock(&c.lock)
   599  		return false, false
   600  	}
   601  
   602  	// no sender available: block on this channel.
   603  	gp := getg()
   604  	mysg := acquireSudog()
   605  	mysg.releasetime = 0
   606  	if t0 != 0 {
   607  		mysg.releasetime = -1
   608  	}
   609  	// No stack splits between assigning elem and enqueuing mysg
   610  	// on gp.waiting where copystack can find it.
   611  	mysg.elem = ep
   612  	mysg.waitlink = nil
   613  	gp.waiting = mysg
   614  
   615  	mysg.g = gp
   616  	mysg.isSelect = false
   617  	mysg.c = c
   618  	gp.param = nil
   619  	c.recvq.enqueue(mysg)
   620  	if c.timer != nil {
   621  		blockTimerChan(c)
   622  	}
   623  
   624  	// Signal to anyone trying to shrink our stack that we're about
   625  	// to park on a channel. The window between when this G's status
   626  	// changes and when we set gp.activeStackChans is not safe for
   627  	// stack shrinking.
   628  	gp.parkingOnChan.Store(true)
   629  	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
   630  
   631  	// someone woke us up
   632  	if mysg != gp.waiting {
   633  		throw("G waiting list is corrupted")
   634  	}
   635  	if c.timer != nil {
   636  		unblockTimerChan(c)
   637  	}
   638  	gp.waiting = nil
   639  	gp.activeStackChans = false
   640  	if mysg.releasetime > 0 {
   641  		blockevent(mysg.releasetime-t0, 2)
   642  	}
   643  	success := mysg.success
   644  	gp.param = nil
   645  	mysg.c = nil
   646  	releaseSudog(mysg)
   647  	return true, success
   648  }
   649  
   650  // recv processes a receive operation on a full channel c.
   651  // There are 2 parts:
   652  //  1. The value sent by the sender sg is put into the channel
   653  //     and the sender is woken up to go on its merry way.
   654  //  2. The value received by the receiver (the current G) is
   655  //     written to ep.
   656  //
   657  // For synchronous channels, both values are the same.
   658  // For asynchronous channels, the receiver gets its data from
   659  // the channel buffer and the sender's data is put in the
   660  // channel buffer.
   661  // Channel c must be full and locked. recv unlocks c with unlockf.
   662  // sg must already be dequeued from c.
   663  // A non-nil ep must point to the heap or the caller's stack.
   664  func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   665  	if c.dataqsiz == 0 {
   666  		if raceenabled {
   667  			racesync(c, sg)
   668  		}
   669  		if ep != nil {
   670  			// copy data from sender
   671  			recvDirect(c.elemtype, sg, ep)
   672  		}
   673  	} else {
   674  		// Queue is full. Take the item at the
   675  		// head of the queue. Make the sender enqueue
   676  		// its item at the tail of the queue. Since the
   677  		// queue is full, those are both the same slot.
   678  		qp := chanbuf(c, c.recvx)
   679  		if raceenabled {
   680  			racenotify(c, c.recvx, nil)
   681  			racenotify(c, c.recvx, sg)
   682  		}
   683  		// copy data from queue to receiver
   684  		if ep != nil {
   685  			typedmemmove(c.elemtype, ep, qp)
   686  		}
   687  		// copy data from sender to queue
   688  		typedmemmove(c.elemtype, qp, sg.elem)
   689  		c.recvx++
   690  		if c.recvx == c.dataqsiz {
   691  			c.recvx = 0
   692  		}
   693  		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   694  	}
   695  	sg.elem = nil
   696  	gp := sg.g
   697  	unlockf()
   698  	gp.param = unsafe.Pointer(sg)
   699  	sg.success = true
   700  	if sg.releasetime != 0 {
   701  		sg.releasetime = cputicks()
   702  	}
   703  	goready(gp, skip+1)
   704  }
   705  
   706  func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
   707  	// There are unlocked sudogs that point into gp's stack. Stack
   708  	// copying must lock the channels of those sudogs.
   709  	// Set activeStackChans here instead of before we try parking
   710  	// because we could self-deadlock in stack growth on the
   711  	// channel lock.
   712  	gp.activeStackChans = true
   713  	// Mark that it's safe for stack shrinking to occur now,
   714  	// because any thread acquiring this G's stack for shrinking
   715  	// is guaranteed to observe activeStackChans after this store.
   716  	gp.parkingOnChan.Store(false)
   717  	// Make sure we unlock after setting activeStackChans and
   718  	// unsetting parkingOnChan. The moment we unlock chanLock
   719  	// we risk gp getting readied by a channel operation and
   720  	// so gp could continue running before everything before
   721  	// the unlock is visible (even to gp itself).
   722  	unlock((*mutex)(chanLock))
   723  	return true
   724  }
   725  
   726  // compiler implements
   727  //
   728  //	select {
   729  //	case c <- v:
   730  //		... foo
   731  //	default:
   732  //		... bar
   733  //	}
   734  //
   735  // as
   736  //
   737  //	if selectnbsend(c, v) {
   738  //		... foo
   739  //	} else {
   740  //		... bar
   741  //	}
   742  func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
   743  	return chansend(c, elem, false, getcallerpc())
   744  }
   745  
   746  // compiler implements
   747  //
   748  //	select {
   749  //	case v, ok = <-c:
   750  //		... foo
   751  //	default:
   752  //		... bar
   753  //	}
   754  //
   755  // as
   756  //
   757  //	if selected, ok = selectnbrecv(&v, c); selected {
   758  //		... foo
   759  //	} else {
   760  //		... bar
   761  //	}
   762  func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
   763  	return chanrecv(c, elem, false)
   764  }
   765  
   766  //go:linkname reflect_chansend reflect.chansend0
   767  func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
   768  	return chansend(c, elem, !nb, getcallerpc())
   769  }
   770  
   771  //go:linkname reflect_chanrecv reflect.chanrecv
   772  func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
   773  	return chanrecv(c, elem, !nb)
   774  }
   775  
   776  func chanlen(c *hchan) int {
   777  	if c == nil {
   778  		return 0
   779  	}
   780  	async := debug.asynctimerchan.Load() != 0
   781  	if c.timer != nil && async {
   782  		c.timer.maybeRunChan()
   783  	}
   784  	if c.timer != nil && !async {
   785  		// timer channels have a buffered implementation
   786  		// but present to users as unbuffered, so that we can
   787  		// undo sends without users noticing.
   788  		return 0
   789  	}
   790  	return int(c.qcount)
   791  }
   792  
   793  func chancap(c *hchan) int {
   794  	if c == nil {
   795  		return 0
   796  	}
   797  	if c.timer != nil {
   798  		async := debug.asynctimerchan.Load() != 0
   799  		if async {
   800  			return int(c.dataqsiz)
   801  		}
   802  		// timer channels have a buffered implementation
   803  		// but present to users as unbuffered, so that we can
   804  		// undo sends without users noticing.
   805  		return 0
   806  	}
   807  	return int(c.dataqsiz)
   808  }
   809  
   810  //go:linkname reflect_chanlen reflect.chanlen
   811  func reflect_chanlen(c *hchan) int {
   812  	return chanlen(c)
   813  }
   814  
   815  //go:linkname reflectlite_chanlen internal/reflectlite.chanlen
   816  func reflectlite_chanlen(c *hchan) int {
   817  	return chanlen(c)
   818  }
   819  
   820  //go:linkname reflect_chancap reflect.chancap
   821  func reflect_chancap(c *hchan) int {
   822  	return chancap(c)
   823  }
   824  
   825  //go:linkname reflect_chanclose reflect.chanclose
   826  func reflect_chanclose(c *hchan) {
   827  	closechan(c)
   828  }
   829  
   830  func (q *waitq) enqueue(sgp *sudog) {
   831  	sgp.next = nil
   832  	x := q.last
   833  	if x == nil {
   834  		sgp.prev = nil
   835  		q.first = sgp
   836  		q.last = sgp
   837  		return
   838  	}
   839  	sgp.prev = x
   840  	x.next = sgp
   841  	q.last = sgp
   842  }
   843  
   844  func (q *waitq) dequeue() *sudog {
   845  	for {
   846  		sgp := q.first
   847  		if sgp == nil {
   848  			return nil
   849  		}
   850  		y := sgp.next
   851  		if y == nil {
   852  			q.first = nil
   853  			q.last = nil
   854  		} else {
   855  			y.prev = nil
   856  			q.first = y
   857  			sgp.next = nil // mark as removed (see dequeueSudoG)
   858  		}
   859  
   860  		// if a goroutine was put on this queue because of a
   861  		// select, there is a small window between the goroutine
   862  		// being woken up by a different case and it grabbing the
   863  		// channel locks. Once it has the lock
   864  		// it removes itself from the queue, so we won't see it after that.
   865  		// We use a flag in the G struct to tell us when someone
   866  		// else has won the race to signal this goroutine but the goroutine
   867  		// hasn't removed itself from the queue yet.
   868  		if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) {
   869  			continue
   870  		}
   871  
   872  		return sgp
   873  	}
   874  }
   875  
   876  func (c *hchan) raceaddr() unsafe.Pointer {
   877  	// Treat read-like and write-like operations on the channel to
   878  	// happen at this address. Avoid using the address of qcount
   879  	// or dataqsiz, because the len() and cap() builtins read
   880  	// those addresses, and we don't want them racing with
   881  	// operations like close().
   882  	return unsafe.Pointer(&c.buf)
   883  }
   884  
   885  func racesync(c *hchan, sg *sudog) {
   886  	racerelease(chanbuf(c, 0))
   887  	raceacquireg(sg.g, chanbuf(c, 0))
   888  	racereleaseg(sg.g, chanbuf(c, 0))
   889  	raceacquire(chanbuf(c, 0))
   890  }
   891  
   892  // Notify the race detector of a send or receive involving buffer entry idx
   893  // and a channel c or its communicating partner sg.
   894  // This function handles the special case of c.elemsize==0.
   895  func racenotify(c *hchan, idx uint, sg *sudog) {
   896  	// We could have passed the unsafe.Pointer corresponding to entry idx
   897  	// instead of idx itself.  However, in a future version of this function,
   898  	// we can use idx to better handle the case of elemsize==0.
   899  	// A future improvement to the detector is to call TSan with c and idx:
   900  	// this way, Go will continue to not allocating buffer entries for channels
   901  	// of elemsize==0, yet the race detector can be made to handle multiple
   902  	// sync objects underneath the hood (one sync object per idx)
   903  	qp := chanbuf(c, idx)
   904  	// When elemsize==0, we don't allocate a full buffer for the channel.
   905  	// Instead of individual buffer entries, the race detector uses the
   906  	// c.buf as the only buffer entry.  This simplification prevents us from
   907  	// following the memory model's happens-before rules (rules that are
   908  	// implemented in racereleaseacquire).  Instead, we accumulate happens-before
   909  	// information in the synchronization object associated with c.buf.
   910  	if c.elemsize == 0 {
   911  		if sg == nil {
   912  			raceacquire(qp)
   913  			racerelease(qp)
   914  		} else {
   915  			raceacquireg(sg.g, qp)
   916  			racereleaseg(sg.g, qp)
   917  		}
   918  	} else {
   919  		if sg == nil {
   920  			racereleaseacquire(qp)
   921  		} else {
   922  			racereleaseacquireg(sg.g, qp)
   923  		}
   924  	}
   925  }
   926  

View as plain text