Source file src/runtime/profbuf.go

     1  // Copyright 2017 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package runtime
     6  
     7  import (
     8  	"internal/runtime/atomic"
     9  	"unsafe"
    10  )
    11  
    12  // A profBuf is a lock-free buffer for profiling events,
    13  // safe for concurrent use by one reader and one writer.
    14  // The writer may be a signal handler running without a user g.
    15  // The reader is assumed to be a user g.
    16  //
    17  // Each logged event corresponds to a fixed size header, a list of
    18  // uintptrs (typically a stack), and exactly one unsafe.Pointer tag.
    19  // The header and uintptrs are stored in the circular buffer data and the
    20  // tag is stored in a circular buffer tags, running in parallel.
    21  // In the circular buffer data, each event takes 2+hdrsize+len(stk)
    22  // words: the value 2+hdrsize+len(stk), then the time of the event, then
    23  // hdrsize words giving the fixed-size header, and then len(stk) words
    24  // for the stack.
    25  //
    26  // The current effective offsets into the tags and data circular buffers
    27  // for reading and writing are stored in the high 30 and low 32 bits of r and w.
    28  // The bottom bits of the high 32 are additional flag bits in w, unused in r.
    29  // "Effective" offsets means the total number of reads or writes, mod 2^length.
    30  // The offset in the buffer is the effective offset mod the length of the buffer.
    31  // To make wraparound mod 2^length match wraparound mod length of the buffer,
    32  // the length of the buffer must be a power of two.
    33  //
    34  // If the reader catches up to the writer, a flag passed to read controls
    35  // whether the read blocks until more data is available. A read returns a
    36  // pointer to the buffer data itself; the caller is assumed to be done with
    37  // that data at the next read. The read offset rNext tracks the next offset to
    38  // be returned by read. By definition, r ≤ rNext ≤ w (before wraparound),
    39  // and rNext is only used by the reader, so it can be accessed without atomics.
    40  //
    41  // If the reader is blocked waiting for more data, the writer will wake it up if
    42  // either the buffer is more than half full, or when the writer sets the eof
    43  // marker or writes overflow entries (described below.)
    44  //
    45  // If the writer gets ahead of the reader, so that the buffer fills,
    46  // future writes are discarded and replaced in the output stream by an
    47  // overflow entry, which has size 2+hdrsize+1, time set to the time of
    48  // the first discarded write, a header of all zeroed words, and a "stack"
    49  // containing one word, the number of discarded writes.
    50  //
    51  // Between the time the buffer fills and the buffer becomes empty enough
    52  // to hold more data, the overflow entry is stored as a pending overflow
    53  // entry in the fields overflow and overflowTime. The pending overflow
    54  // entry can be turned into a real record by either the writer or the
    55  // reader. If the writer is called to write a new record and finds that
    56  // the output buffer has room for both the pending overflow entry and the
    57  // new record, the writer emits the pending overflow entry and the new
    58  // record into the buffer. If the reader is called to read data and finds
    59  // that the output buffer is empty but that there is a pending overflow
    60  // entry, the reader will return a synthesized record for the pending
    61  // overflow entry.
    62  //
    63  // Only the writer can create or add to a pending overflow entry, but
    64  // either the reader or the writer can clear the pending overflow entry.
    65  // A pending overflow entry is indicated by the low 32 bits of 'overflow'
    66  // holding the number of discarded writes, and overflowTime holding the
    67  // time of the first discarded write. The high 32 bits of 'overflow'
    68  // increment each time the low 32 bits transition from zero to non-zero
    69  // or vice versa. This sequence number avoids ABA problems in the use of
    70  // compare-and-swap to coordinate between reader and writer.
    71  // The overflowTime is only written when the low 32 bits of overflow are
    72  // zero, that is, only when there is no pending overflow entry, in
    73  // preparation for creating a new one. The reader can therefore fetch and
    74  // clear the entry atomically using
    75  //
    76  //	for {
    77  //		overflow = load(&b.overflow)
    78  //		if uint32(overflow) == 0 {
    79  //			// no pending entry
    80  //			break
    81  //		}
    82  //		time = load(&b.overflowTime)
    83  //		if cas(&b.overflow, overflow, ((overflow>>32)+1)<<32) {
    84  //			// pending entry cleared
    85  //			break
    86  //		}
    87  //	}
    88  //	if uint32(overflow) > 0 {
    89  //		emit entry for uint32(overflow), time
    90  //	}
    91  type profBuf struct {
    92  	// accessed atomically
    93  	r, w         profAtomic
    94  	overflow     atomic.Uint64
    95  	overflowTime atomic.Uint64
    96  	eof          atomic.Uint32
    97  
    98  	// immutable (excluding slice content)
    99  	hdrsize uintptr
   100  	data    []uint64
   101  	tags    []unsafe.Pointer
   102  
   103  	// owned by reader
   104  	rNext       profIndex
   105  	overflowBuf []uint64 // for use by reader to return overflow record
   106  	wait        note
   107  }
   108  
   109  // A profAtomic is the atomically-accessed word holding a profIndex.
   110  type profAtomic uint64
   111  
   112  // A profIndex is the packet tag and data counts and flags bits, described above.
   113  type profIndex uint64
   114  
   115  const (
   116  	profReaderSleeping profIndex = 1 << 32 // reader is sleeping and must be woken up
   117  	profWriteExtra     profIndex = 1 << 33 // overflow or eof waiting
   118  )
   119  
   120  func (x *profAtomic) load() profIndex {
   121  	return profIndex(atomic.Load64((*uint64)(x)))
   122  }
   123  
   124  func (x *profAtomic) store(new profIndex) {
   125  	atomic.Store64((*uint64)(x), uint64(new))
   126  }
   127  
   128  func (x *profAtomic) cas(old, new profIndex) bool {
   129  	return atomic.Cas64((*uint64)(x), uint64(old), uint64(new))
   130  }
   131  
   132  func (x profIndex) dataCount() uint32 {
   133  	return uint32(x)
   134  }
   135  
   136  func (x profIndex) tagCount() uint32 {
   137  	return uint32(x >> 34)
   138  }
   139  
   140  // countSub subtracts two counts obtained from profIndex.dataCount or profIndex.tagCount,
   141  // assuming that they are no more than 2^29 apart (guaranteed since they are never more than
   142  // len(data) or len(tags) apart, respectively).
   143  // tagCount wraps at 2^30, while dataCount wraps at 2^32.
   144  // This function works for both.
   145  func countSub(x, y uint32) int {
   146  	// x-y is 32-bit signed or 30-bit signed; sign-extend to 32 bits and convert to int.
   147  	return int(int32(x-y) << 2 >> 2)
   148  }
   149  
   150  // addCountsAndClearFlags returns the packed form of "x + (data, tag) - all flags".
   151  func (x profIndex) addCountsAndClearFlags(data, tag int) profIndex {
   152  	return profIndex((uint64(x)>>34+uint64(uint32(tag)<<2>>2))<<34 | uint64(uint32(x)+uint32(data)))
   153  }
   154  
   155  // hasOverflow reports whether b has any overflow records pending.
   156  func (b *profBuf) hasOverflow() bool {
   157  	return uint32(b.overflow.Load()) > 0
   158  }
   159  
   160  // takeOverflow consumes the pending overflow records, returning the overflow count
   161  // and the time of the first overflow.
   162  // When called by the reader, it is racing against incrementOverflow.
   163  func (b *profBuf) takeOverflow() (count uint32, time uint64) {
   164  	overflow := b.overflow.Load()
   165  	time = b.overflowTime.Load()
   166  	for {
   167  		count = uint32(overflow)
   168  		if count == 0 {
   169  			time = 0
   170  			break
   171  		}
   172  		// Increment generation, clear overflow count in low bits.
   173  		if b.overflow.CompareAndSwap(overflow, ((overflow>>32)+1)<<32) {
   174  			break
   175  		}
   176  		overflow = b.overflow.Load()
   177  		time = b.overflowTime.Load()
   178  	}
   179  	return uint32(overflow), time
   180  }
   181  
   182  // incrementOverflow records a single overflow at time now.
   183  // It is racing against a possible takeOverflow in the reader.
   184  func (b *profBuf) incrementOverflow(now int64) {
   185  	for {
   186  		overflow := b.overflow.Load()
   187  
   188  		// Once we see b.overflow reach 0, it's stable: no one else is changing it underfoot.
   189  		// We need to set overflowTime if we're incrementing b.overflow from 0.
   190  		if uint32(overflow) == 0 {
   191  			// Store overflowTime first so it's always available when overflow != 0.
   192  			b.overflowTime.Store(uint64(now))
   193  			b.overflow.Store((((overflow >> 32) + 1) << 32) + 1)
   194  			break
   195  		}
   196  		// Otherwise we're racing to increment against reader
   197  		// who wants to set b.overflow to 0.
   198  		// Out of paranoia, leave 2³²-1 a sticky overflow value,
   199  		// to avoid wrapping around. Extremely unlikely.
   200  		if int32(overflow) == -1 {
   201  			break
   202  		}
   203  		if b.overflow.CompareAndSwap(overflow, overflow+1) {
   204  			break
   205  		}
   206  	}
   207  }
   208  
   209  // newProfBuf returns a new profiling buffer with room for
   210  // a header of hdrsize words and a buffer of at least bufwords words.
   211  func newProfBuf(hdrsize, bufwords, tags int) *profBuf {
   212  	if min := 2 + hdrsize + 1; bufwords < min {
   213  		bufwords = min
   214  	}
   215  
   216  	// Buffer sizes must be power of two, so that we don't have to
   217  	// worry about uint32 wraparound changing the effective position
   218  	// within the buffers. We store 30 bits of count; limiting to 28
   219  	// gives us some room for intermediate calculations.
   220  	if bufwords >= 1<<28 || tags >= 1<<28 {
   221  		throw("newProfBuf: buffer too large")
   222  	}
   223  	var i int
   224  	for i = 1; i < bufwords; i <<= 1 {
   225  	}
   226  	bufwords = i
   227  	for i = 1; i < tags; i <<= 1 {
   228  	}
   229  	tags = i
   230  
   231  	b := new(profBuf)
   232  	b.hdrsize = uintptr(hdrsize)
   233  	b.data = make([]uint64, bufwords)
   234  	b.tags = make([]unsafe.Pointer, tags)
   235  	b.overflowBuf = make([]uint64, 2+b.hdrsize+1)
   236  	return b
   237  }
   238  
   239  // canWriteRecord reports whether the buffer has room
   240  // for a single contiguous record with a stack of length nstk.
   241  func (b *profBuf) canWriteRecord(nstk int) bool {
   242  	br := b.r.load()
   243  	bw := b.w.load()
   244  
   245  	// room for tag?
   246  	if countSub(br.tagCount(), bw.tagCount())+len(b.tags) < 1 {
   247  		return false
   248  	}
   249  
   250  	// room for data?
   251  	nd := countSub(br.dataCount(), bw.dataCount()) + len(b.data)
   252  	want := 2 + int(b.hdrsize) + nstk
   253  	i := int(bw.dataCount() % uint32(len(b.data)))
   254  	if i+want > len(b.data) {
   255  		// Can't fit in trailing fragment of slice.
   256  		// Skip over that and start over at beginning of slice.
   257  		nd -= len(b.data) - i
   258  	}
   259  	return nd >= want
   260  }
   261  
   262  // canWriteTwoRecords reports whether the buffer has room
   263  // for two records with stack lengths nstk1, nstk2, in that order.
   264  // Each record must be contiguous on its own, but the two
   265  // records need not be contiguous (one can be at the end of the buffer
   266  // and the other can wrap around and start at the beginning of the buffer).
   267  func (b *profBuf) canWriteTwoRecords(nstk1, nstk2 int) bool {
   268  	br := b.r.load()
   269  	bw := b.w.load()
   270  
   271  	// room for tag?
   272  	if countSub(br.tagCount(), bw.tagCount())+len(b.tags) < 2 {
   273  		return false
   274  	}
   275  
   276  	// room for data?
   277  	nd := countSub(br.dataCount(), bw.dataCount()) + len(b.data)
   278  
   279  	// first record
   280  	want := 2 + int(b.hdrsize) + nstk1
   281  	i := int(bw.dataCount() % uint32(len(b.data)))
   282  	if i+want > len(b.data) {
   283  		// Can't fit in trailing fragment of slice.
   284  		// Skip over that and start over at beginning of slice.
   285  		nd -= len(b.data) - i
   286  		i = 0
   287  	}
   288  	i += want
   289  	nd -= want
   290  
   291  	// second record
   292  	want = 2 + int(b.hdrsize) + nstk2
   293  	if i+want > len(b.data) {
   294  		// Can't fit in trailing fragment of slice.
   295  		// Skip over that and start over at beginning of slice.
   296  		nd -= len(b.data) - i
   297  		i = 0
   298  	}
   299  	return nd >= want
   300  }
   301  
   302  // write writes an entry to the profiling buffer b.
   303  // The entry begins with a fixed hdr, which must have
   304  // length b.hdrsize, followed by a variable-sized stack
   305  // and a single tag pointer *tagPtr (or nil if tagPtr is nil).
   306  // No write barriers allowed because this might be called from a signal handler.
   307  func (b *profBuf) write(tagPtr *unsafe.Pointer, now int64, hdr []uint64, stk []uintptr) {
   308  	if b == nil {
   309  		return
   310  	}
   311  	if len(hdr) > int(b.hdrsize) {
   312  		throw("misuse of profBuf.write")
   313  	}
   314  
   315  	if hasOverflow := b.hasOverflow(); hasOverflow && b.canWriteTwoRecords(1, len(stk)) {
   316  		// Room for both an overflow record and the one being written.
   317  		// Write the overflow record if the reader hasn't gotten to it yet.
   318  		// Only racing against reader, not other writers.
   319  		count, time := b.takeOverflow()
   320  		if count > 0 {
   321  			var stk [1]uintptr
   322  			stk[0] = uintptr(count)
   323  			b.write(nil, int64(time), nil, stk[:])
   324  		}
   325  	} else if hasOverflow || !b.canWriteRecord(len(stk)) {
   326  		// Pending overflow without room to write overflow and new records
   327  		// or no overflow but also no room for new record.
   328  		b.incrementOverflow(now)
   329  		b.wakeupExtra()
   330  		return
   331  	}
   332  
   333  	// There's room: write the record.
   334  	br := b.r.load()
   335  	bw := b.w.load()
   336  
   337  	// Profiling tag
   338  	//
   339  	// The tag is a pointer, but we can't run a write barrier here.
   340  	// We have interrupted the OS-level execution of gp, but the
   341  	// runtime still sees gp as executing. In effect, we are running
   342  	// in place of the real gp. Since gp is the only goroutine that
   343  	// can overwrite gp.labels, the value of gp.labels is stable during
   344  	// this signal handler: it will still be reachable from gp when
   345  	// we finish executing. If a GC is in progress right now, it must
   346  	// keep gp.labels alive, because gp.labels is reachable from gp.
   347  	// If gp were to overwrite gp.labels, the deletion barrier would
   348  	// still shade that pointer, which would preserve it for the
   349  	// in-progress GC, so all is well. Any future GC will see the
   350  	// value we copied when scanning b.tags (heap-allocated).
   351  	// We arrange that the store here is always overwriting a nil,
   352  	// so there is no need for a deletion barrier on b.tags[wt].
   353  	wt := int(bw.tagCount() % uint32(len(b.tags)))
   354  	if tagPtr != nil {
   355  		*(*uintptr)(unsafe.Pointer(&b.tags[wt])) = uintptr(*tagPtr)
   356  	}
   357  
   358  	// Main record.
   359  	// It has to fit in a contiguous section of the slice, so if it doesn't fit at the end,
   360  	// leave a rewind marker (0) and start over at the beginning of the slice.
   361  	wd := int(bw.dataCount() % uint32(len(b.data)))
   362  	nd := countSub(br.dataCount(), bw.dataCount()) + len(b.data)
   363  	skip := 0
   364  	if wd+2+int(b.hdrsize)+len(stk) > len(b.data) {
   365  		b.data[wd] = 0
   366  		skip = len(b.data) - wd
   367  		nd -= skip
   368  		wd = 0
   369  	}
   370  	data := b.data[wd:]
   371  	data[0] = uint64(2 + b.hdrsize + uintptr(len(stk))) // length
   372  	data[1] = uint64(now)                               // time stamp
   373  	// header, zero-padded
   374  	i := copy(data[2:2+b.hdrsize], hdr)
   375  	clear(data[2+i : 2+b.hdrsize])
   376  	for i, pc := range stk {
   377  		data[2+b.hdrsize+uintptr(i)] = uint64(pc)
   378  	}
   379  
   380  	for {
   381  		// Commit write.
   382  		// Racing with reader setting flag bits in b.w, to avoid lost wakeups.
   383  		old := b.w.load()
   384  		new := old.addCountsAndClearFlags(skip+2+len(stk)+int(b.hdrsize), 1)
   385  		// We re-load b.r here to reduce the likelihood of early wakeups
   386  		// if the reader already consumed some data between the last
   387  		// time we read b.r and now. This isn't strictly necessary.
   388  		unread := countSub(new.dataCount(), b.r.load().dataCount())
   389  		if unread < 0 {
   390  			// The new count overflowed and wrapped around.
   391  			unread += len(b.data)
   392  		}
   393  		wakeupThreshold := len(b.data) / 2
   394  		if unread < wakeupThreshold {
   395  			// Carry over the sleeping flag since we're not planning
   396  			// to wake the reader yet
   397  			new |= old & profReaderSleeping
   398  		}
   399  		if !b.w.cas(old, new) {
   400  			continue
   401  		}
   402  		// If we've hit our high watermark for data in the buffer,
   403  		// and there is a reader, wake it up.
   404  		if unread >= wakeupThreshold && old&profReaderSleeping != 0 {
   405  			// NB: if we reach this point, then the sleeping bit is
   406  			// cleared in the new b.w value
   407  			notewakeup(&b.wait)
   408  		}
   409  		break
   410  	}
   411  }
   412  
   413  // close signals that there will be no more writes on the buffer.
   414  // Once all the data has been read from the buffer, reads will return eof=true.
   415  func (b *profBuf) close() {
   416  	if b.eof.Load() > 0 {
   417  		throw("runtime: profBuf already closed")
   418  	}
   419  	b.eof.Store(1)
   420  	b.wakeupExtra()
   421  }
   422  
   423  // wakeupExtra must be called after setting one of the "extra"
   424  // atomic fields b.overflow or b.eof.
   425  // It records the change in b.w and wakes up the reader if needed.
   426  func (b *profBuf) wakeupExtra() {
   427  	for {
   428  		old := b.w.load()
   429  		new := old | profWriteExtra
   430  		// Clear profReaderSleeping. We're going to wake up the reader
   431  		// if it was sleeping and we don't want double wakeups in case
   432  		// we, for example, attempt to write into a full buffer multiple
   433  		// times before the reader wakes up.
   434  		new &^= profReaderSleeping
   435  		if !b.w.cas(old, new) {
   436  			continue
   437  		}
   438  		if old&profReaderSleeping != 0 {
   439  			notewakeup(&b.wait)
   440  		}
   441  		break
   442  	}
   443  }
   444  
   445  // profBufReadMode specifies whether to block when no data is available to read.
   446  type profBufReadMode int
   447  
   448  const (
   449  	profBufBlocking profBufReadMode = iota
   450  	profBufNonBlocking
   451  )
   452  
   453  var overflowTag [1]unsafe.Pointer // always nil
   454  
   455  func (b *profBuf) read(mode profBufReadMode) (data []uint64, tags []unsafe.Pointer, eof bool) {
   456  	if b == nil {
   457  		return nil, nil, true
   458  	}
   459  
   460  	br := b.rNext
   461  
   462  	// Commit previous read, returning that part of the ring to the writer.
   463  	// First clear tags that have now been read, both to avoid holding
   464  	// up the memory they point at for longer than necessary
   465  	// and so that b.write can assume it is always overwriting
   466  	// nil tag entries (see comment in b.write).
   467  	rPrev := b.r.load()
   468  	if rPrev != br {
   469  		ntag := countSub(br.tagCount(), rPrev.tagCount())
   470  		ti := int(rPrev.tagCount() % uint32(len(b.tags)))
   471  		for i := 0; i < ntag; i++ {
   472  			b.tags[ti] = nil
   473  			if ti++; ti == len(b.tags) {
   474  				ti = 0
   475  			}
   476  		}
   477  		b.r.store(br)
   478  	}
   479  
   480  Read:
   481  	bw := b.w.load()
   482  	numData := countSub(bw.dataCount(), br.dataCount())
   483  	if numData == 0 {
   484  		if b.hasOverflow() {
   485  			// No data to read, but there is overflow to report.
   486  			// Racing with writer flushing b.overflow into a real record.
   487  			count, time := b.takeOverflow()
   488  			if count == 0 {
   489  				// Lost the race, go around again.
   490  				goto Read
   491  			}
   492  			// Won the race, report overflow.
   493  			dst := b.overflowBuf
   494  			dst[0] = uint64(2 + b.hdrsize + 1)
   495  			dst[1] = time
   496  			clear(dst[2 : 2+b.hdrsize])
   497  			dst[2+b.hdrsize] = uint64(count)
   498  			return dst[:2+b.hdrsize+1], overflowTag[:1], false
   499  		}
   500  		if b.eof.Load() > 0 {
   501  			// No data, no overflow, EOF set: done.
   502  			return nil, nil, true
   503  		}
   504  		if bw&profWriteExtra != 0 {
   505  			// Writer claims to have published extra information (overflow or eof).
   506  			// Attempt to clear notification and then check again.
   507  			// If we fail to clear the notification it means b.w changed,
   508  			// so we still need to check again.
   509  			b.w.cas(bw, bw&^profWriteExtra)
   510  			goto Read
   511  		}
   512  
   513  		// Nothing to read right now.
   514  		// Return or sleep according to mode.
   515  		if mode == profBufNonBlocking {
   516  			// Necessary on Darwin, notetsleepg below does not work in signal handler, root cause of #61768.
   517  			return nil, nil, false
   518  		}
   519  		if !b.w.cas(bw, bw|profReaderSleeping) {
   520  			goto Read
   521  		}
   522  		// Committed to sleeping.
   523  		notetsleepg(&b.wait, -1)
   524  		noteclear(&b.wait)
   525  		goto Read
   526  	}
   527  	data = b.data[br.dataCount()%uint32(len(b.data)):]
   528  	if len(data) > numData {
   529  		data = data[:numData]
   530  	} else {
   531  		numData -= len(data) // available in case of wraparound
   532  	}
   533  	skip := 0
   534  	if data[0] == 0 {
   535  		// Wraparound record. Go back to the beginning of the ring.
   536  		skip = len(data)
   537  		data = b.data
   538  		if len(data) > numData {
   539  			data = data[:numData]
   540  		}
   541  	}
   542  
   543  	ntag := countSub(bw.tagCount(), br.tagCount())
   544  	if ntag == 0 {
   545  		throw("runtime: malformed profBuf buffer - tag and data out of sync")
   546  	}
   547  	tags = b.tags[br.tagCount()%uint32(len(b.tags)):]
   548  	if len(tags) > ntag {
   549  		tags = tags[:ntag]
   550  	}
   551  
   552  	// Count out whole data records until either data or tags is done.
   553  	// They are always in sync in the buffer, but due to an end-of-slice
   554  	// wraparound we might need to stop early and return the rest
   555  	// in the next call.
   556  	di := 0
   557  	ti := 0
   558  	for di < len(data) && data[di] != 0 && ti < len(tags) {
   559  		if uintptr(di)+uintptr(data[di]) > uintptr(len(data)) {
   560  			throw("runtime: malformed profBuf buffer - invalid size")
   561  		}
   562  		di += int(data[di])
   563  		ti++
   564  	}
   565  
   566  	// Remember how much we returned, to commit read on next call.
   567  	b.rNext = br.addCountsAndClearFlags(skip+di, ti)
   568  
   569  	if raceenabled {
   570  		// Match racereleasemerge in runtime_setProfLabel,
   571  		// so that the setting of the labels in runtime_setProfLabel
   572  		// is treated as happening before any use of the labels
   573  		// by our caller. The synchronization on labelSync itself is a fiction
   574  		// for the race detector. The actual synchronization is handled
   575  		// by the fact that the signal handler only reads from the current
   576  		// goroutine and uses atomics to write the updated queue indices,
   577  		// and then the read-out from the signal handler buffer uses
   578  		// atomics to read those queue indices.
   579  		raceacquire(unsafe.Pointer(&labelSync))
   580  	}
   581  
   582  	return data[:di], tags[:ti], false
   583  }
   584  

View as plain text