Source file src/runtime/trace/subscribe.go

     1  // Copyright 2025 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package trace
     6  
     7  import (
     8  	"fmt"
     9  	"internal/trace/tracev2"
    10  	"io"
    11  	"runtime"
    12  	"sync"
    13  	"sync/atomic"
    14  	_ "unsafe"
    15  )
    16  
    17  var tracing traceMultiplexer
    18  
    19  type traceMultiplexer struct {
    20  	sync.Mutex
    21  	enabled     atomic.Bool
    22  	subscribers int
    23  
    24  	subscribersMu    sync.Mutex
    25  	traceStartWriter io.Writer
    26  	flightRecorder   *recorder
    27  }
    28  
    29  func (t *traceMultiplexer) subscribeFlightRecorder(r *recorder) error {
    30  	t.Lock()
    31  	defer t.Unlock()
    32  
    33  	t.subscribersMu.Lock()
    34  	if t.flightRecorder != nil {
    35  		t.subscribersMu.Unlock()
    36  		return fmt.Errorf("flight recorder already enabled")
    37  	}
    38  	t.flightRecorder = r
    39  	t.subscribersMu.Unlock()
    40  
    41  	if err := t.addedSubscriber(); err != nil {
    42  		t.subscribersMu.Lock()
    43  		t.flightRecorder = nil
    44  		t.subscribersMu.Unlock()
    45  		return err
    46  	}
    47  	return nil
    48  }
    49  
    50  func (t *traceMultiplexer) unsubscribeFlightRecorder() error {
    51  	t.Lock()
    52  	defer t.Unlock()
    53  
    54  	t.removingSubscriber()
    55  
    56  	t.subscribersMu.Lock()
    57  	if t.flightRecorder == nil {
    58  		t.subscribersMu.Unlock()
    59  		return fmt.Errorf("attempt to unsubscribe missing flight recorder")
    60  	}
    61  	t.flightRecorder = nil
    62  	t.subscribersMu.Unlock()
    63  
    64  	t.removedSubscriber()
    65  	return nil
    66  }
    67  
    68  func (t *traceMultiplexer) subscribeTraceStartWriter(w io.Writer) error {
    69  	t.Lock()
    70  	defer t.Unlock()
    71  
    72  	t.subscribersMu.Lock()
    73  	if t.traceStartWriter != nil {
    74  		t.subscribersMu.Unlock()
    75  		return fmt.Errorf("execution tracer already enabled")
    76  	}
    77  	t.traceStartWriter = w
    78  	t.subscribersMu.Unlock()
    79  
    80  	if err := t.addedSubscriber(); err != nil {
    81  		t.subscribersMu.Lock()
    82  		t.traceStartWriter = nil
    83  		t.subscribersMu.Unlock()
    84  		return err
    85  	}
    86  	return nil
    87  }
    88  
    89  func (t *traceMultiplexer) unsubscribeTraceStartWriter() {
    90  	t.Lock()
    91  	defer t.Unlock()
    92  
    93  	t.removingSubscriber()
    94  
    95  	t.subscribersMu.Lock()
    96  	if t.traceStartWriter == nil {
    97  		t.subscribersMu.Unlock()
    98  		return
    99  	}
   100  	t.traceStartWriter = nil
   101  	t.subscribersMu.Unlock()
   102  
   103  	t.removedSubscriber()
   104  	return
   105  }
   106  
   107  func (t *traceMultiplexer) addedSubscriber() error {
   108  	if t.enabled.Load() {
   109  		// This is necessary for the trace reader goroutine to pick up on the new subscriber.
   110  		runtime_traceAdvance(false)
   111  	} else {
   112  		if err := t.startLocked(); err != nil {
   113  			return err
   114  		}
   115  	}
   116  	t.subscribers++
   117  	return nil
   118  }
   119  
   120  func (t *traceMultiplexer) removingSubscriber() {
   121  	if t.subscribers == 0 {
   122  		return
   123  	}
   124  	t.subscribers--
   125  	if t.subscribers == 0 {
   126  		runtime.StopTrace()
   127  		t.enabled.Store(false)
   128  	} else {
   129  		// This is necessary to avoid missing trace data when the system is under high load.
   130  		runtime_traceAdvance(false)
   131  	}
   132  }
   133  
   134  func (t *traceMultiplexer) removedSubscriber() {
   135  	if t.subscribers > 0 {
   136  		// This is necessary for the trace reader goroutine to pick up on the new subscriber.
   137  		runtime_traceAdvance(false)
   138  	}
   139  }
   140  
   141  func (t *traceMultiplexer) startLocked() error {
   142  	if err := runtime.StartTrace(); err != nil {
   143  		return err
   144  	}
   145  
   146  	// Grab the trace reader goroutine's subscribers.
   147  	//
   148  	// We only update our subscribers if we see an end-of-generation
   149  	// signal from the runtime after this, so any new subscriptions
   150  	// or unsubscriptions must call traceAdvance to ensure the reader
   151  	// goroutine sees an end-of-generation signal.
   152  	t.subscribersMu.Lock()
   153  	flightRecorder := t.flightRecorder
   154  	traceStartWriter := t.traceStartWriter
   155  	t.subscribersMu.Unlock()
   156  
   157  	go func() {
   158  		header := runtime_readTrace()
   159  		if traceStartWriter != nil {
   160  			traceStartWriter.Write(header)
   161  		}
   162  		if flightRecorder != nil {
   163  			flightRecorder.Write(header)
   164  		}
   165  
   166  		for {
   167  			data := runtime_readTrace()
   168  			if data == nil {
   169  				break
   170  			}
   171  			if len(data) == 1 && tracev2.EventType(data[0]) == tracev2.EvEndOfGeneration {
   172  				if flightRecorder != nil {
   173  					flightRecorder.endGeneration()
   174  				}
   175  
   176  				// Pick up any changes.
   177  				t.subscribersMu.Lock()
   178  				frIsNew := flightRecorder != t.flightRecorder && t.flightRecorder != nil
   179  				trIsNew := traceStartWriter != t.traceStartWriter && t.traceStartWriter != nil
   180  				flightRecorder = t.flightRecorder
   181  				traceStartWriter = t.traceStartWriter
   182  				t.subscribersMu.Unlock()
   183  
   184  				if trIsNew {
   185  					traceStartWriter.Write(header)
   186  				}
   187  				if frIsNew {
   188  					flightRecorder.Write(header)
   189  				}
   190  			} else {
   191  				if traceStartWriter != nil {
   192  					traceStartWriter.Write(data)
   193  				}
   194  				if flightRecorder != nil {
   195  					flightRecorder.Write(data)
   196  				}
   197  			}
   198  		}
   199  	}()
   200  	t.enabled.Store(true)
   201  	return nil
   202  }
   203  
   204  //go:linkname runtime_readTrace
   205  func runtime_readTrace() (buf []byte)
   206  

View as plain text