Source file src/runtime/coro.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package runtime
     6  
     7  import "unsafe"
     8  
     9  // A coro represents extra concurrency without extra parallelism,
    10  // as would be needed for a coroutine implementation.
    11  // The coro does not represent a specific coroutine, only the ability
    12  // to do coroutine-style control transfers.
    13  // It can be thought of as like a special channel that always has
    14  // a goroutine blocked on it. If another goroutine calls coroswitch(c),
    15  // the caller becomes the goroutine blocked in c, and the goroutine
    16  // formerly blocked in c starts running.
    17  // These switches continue until a call to coroexit(c),
    18  // which ends the use of the coro by releasing the blocked
    19  // goroutine in c and exiting the current goroutine.
    20  //
    21  // Coros are heap allocated and garbage collected, so that user code
    22  // can hold a pointer to a coro without causing potential dangling
    23  // pointer errors.
    24  type coro struct {
    25  	gp guintptr
    26  	f  func(*coro)
    27  }
    28  
    29  //go:linkname newcoro
    30  
    31  // newcoro creates a new coro containing a
    32  // goroutine blocked waiting to run f
    33  // and returns that coro.
    34  func newcoro(f func(*coro)) *coro {
    35  	c := new(coro)
    36  	c.f = f
    37  	pc := getcallerpc()
    38  	gp := getg()
    39  	systemstack(func() {
    40  		start := corostart
    41  		startfv := *(**funcval)(unsafe.Pointer(&start))
    42  		gp = newproc1(startfv, gp, pc, true, waitReasonCoroutine)
    43  	})
    44  	gp.coroarg = c
    45  	c.gp.set(gp)
    46  	return c
    47  }
    48  
    49  //go:linkname corostart
    50  
    51  // corostart is the entry func for a new coroutine.
    52  // It runs the coroutine user function f passed to corostart
    53  // and then calls coroexit to remove the extra concurrency.
    54  func corostart() {
    55  	gp := getg()
    56  	c := gp.coroarg
    57  	gp.coroarg = nil
    58  
    59  	c.f(c)
    60  	coroexit(c)
    61  }
    62  
    63  // coroexit is like coroswitch but closes the coro
    64  // and exits the current goroutine
    65  func coroexit(c *coro) {
    66  	gp := getg()
    67  	gp.coroarg = c
    68  	gp.coroexit = true
    69  	mcall(coroswitch_m)
    70  }
    71  
    72  //go:linkname coroswitch
    73  
    74  // coroswitch switches to the goroutine blocked on c
    75  // and then blocks the current goroutine on c.
    76  func coroswitch(c *coro) {
    77  	gp := getg()
    78  	gp.coroarg = c
    79  	mcall(coroswitch_m)
    80  }
    81  
    82  // coroswitch_m is the implementation of coroswitch
    83  // that runs on the m stack.
    84  //
    85  // Note: Coroutine switches are expected to happen at
    86  // an order of magnitude (or more) higher frequency
    87  // than regular goroutine switches, so this path is heavily
    88  // optimized to remove unnecessary work.
    89  // The fast path here is three CAS: the one at the top on gp.atomicstatus,
    90  // the one in the middle to choose the next g,
    91  // and the one at the bottom on gnext.atomicstatus.
    92  // It is important not to add more atomic operations or other
    93  // expensive operations to the fast path.
    94  func coroswitch_m(gp *g) {
    95  	// TODO(go.dev/issue/65889): Something really nasty will happen if either
    96  	// goroutine in this handoff tries to lock itself to an OS thread.
    97  	// There's an explicit multiplexing going on here that needs to be
    98  	// disabled if either the consumer or the iterator ends up in such
    99  	// a state.
   100  	c := gp.coroarg
   101  	gp.coroarg = nil
   102  	exit := gp.coroexit
   103  	gp.coroexit = false
   104  	mp := gp.m
   105  
   106  	// Acquire tracer for writing for the duration of this call.
   107  	//
   108  	// There's a lot of state manipulation performed with shortcuts
   109  	// but we need to make sure the tracer can only observe the
   110  	// start and end states to maintain a coherent model and avoid
   111  	// emitting an event for every single transition.
   112  	trace := traceAcquire()
   113  
   114  	if exit {
   115  		// TODO(65889): If we're locked to the current OS thread and
   116  		// we exit here while tracing is enabled, we're going to end up
   117  		// in a really bad place (traceAcquire also calls acquirem; there's
   118  		// no releasem before the thread exits).
   119  		gdestroy(gp)
   120  		gp = nil
   121  	} else {
   122  		// If we can CAS ourselves directly from running to waiting, so do,
   123  		// keeping the control transfer as lightweight as possible.
   124  		gp.waitreason = waitReasonCoroutine
   125  		if !gp.atomicstatus.CompareAndSwap(_Grunning, _Gwaiting) {
   126  			// The CAS failed: use casgstatus, which will take care of
   127  			// coordinating with the garbage collector about the state change.
   128  			casgstatus(gp, _Grunning, _Gwaiting)
   129  		}
   130  
   131  		// Clear gp.m.
   132  		setMNoWB(&gp.m, nil)
   133  	}
   134  
   135  	// The goroutine stored in c is the one to run next.
   136  	// Swap it with ourselves.
   137  	var gnext *g
   138  	for {
   139  		// Note: this is a racy load, but it will eventually
   140  		// get the right value, and if it gets the wrong value,
   141  		// the c.gp.cas will fail, so no harm done other than
   142  		// a wasted loop iteration.
   143  		// The cas will also sync c.gp's
   144  		// memory enough that the next iteration of the racy load
   145  		// should see the correct value.
   146  		// We are avoiding the atomic load to keep this path
   147  		// as lightweight as absolutely possible.
   148  		// (The atomic load is free on x86 but not free elsewhere.)
   149  		next := c.gp
   150  		if next.ptr() == nil {
   151  			throw("coroswitch on exited coro")
   152  		}
   153  		var self guintptr
   154  		self.set(gp)
   155  		if c.gp.cas(next, self) {
   156  			gnext = next.ptr()
   157  			break
   158  		}
   159  	}
   160  
   161  	// Emit the trace event after getting gnext but before changing curg.
   162  	// GoSwitch expects that the current G is running and that we haven't
   163  	// switched yet for correct status emission.
   164  	if trace.ok() {
   165  		trace.GoSwitch(gnext, exit)
   166  	}
   167  
   168  	// Start running next, without heavy scheduling machinery.
   169  	// Set mp.curg and gnext.m and then update scheduling state
   170  	// directly if possible.
   171  	setGNoWB(&mp.curg, gnext)
   172  	setMNoWB(&gnext.m, mp)
   173  	if !gnext.atomicstatus.CompareAndSwap(_Gwaiting, _Grunning) {
   174  		// The CAS failed: use casgstatus, which will take care of
   175  		// coordinating with the garbage collector about the state change.
   176  		casgstatus(gnext, _Gwaiting, _Grunnable)
   177  		casgstatus(gnext, _Grunnable, _Grunning)
   178  	}
   179  
   180  	// Release the trace locker. We've completed all the necessary transitions..
   181  	if trace.ok() {
   182  		traceRelease(trace)
   183  	}
   184  
   185  	// Switch to gnext. Does not return.
   186  	gogo(&gnext.sched)
   187  }
   188  

View as plain text