Source file src/internal/trace/reader.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package trace
     6  
     7  import (
     8  	"bufio"
     9  	"errors"
    10  	"fmt"
    11  	"io"
    12  	"slices"
    13  	"strings"
    14  
    15  	"internal/trace/internal/tracev1"
    16  	"internal/trace/tracev2"
    17  	"internal/trace/version"
    18  )
    19  
    20  // Reader reads a byte stream, validates it, and produces trace events.
    21  //
    22  // Provided the trace is non-empty the Reader always produces a Sync
    23  // event as the first event, and a Sync event as the last event.
    24  // (There may also be any number of Sync events in the middle, too.)
    25  type Reader struct {
    26  	version    version.Version
    27  	r          *bufio.Reader
    28  	lastTs     Time
    29  	gen        *generation
    30  	frontier   []*batchCursor
    31  	cpuSamples []cpuSample
    32  	order      ordering
    33  	syncs      int
    34  	readGenErr error
    35  	done       bool
    36  
    37  	// Spill state.
    38  	//
    39  	// Traces before Go 1.26 had no explicit end-of-generation signal, and
    40  	// so the first batch of the next generation needed to be parsed to identify
    41  	// a new generation. This batch is the "spilled" so we don't lose track
    42  	// of it when parsing the next generation.
    43  	//
    44  	// This is unnecessary after Go 1.26 because of an explicit end-of-generation
    45  	// signal.
    46  	spill        *spilledBatch
    47  	spillErr     error // error from reading spill
    48  	spillErrSync bool  // whether we emitted a Sync before reporting spillErr
    49  
    50  	v1Events *traceV1Converter
    51  }
    52  
    53  // NewReader creates a new trace reader.
    54  func NewReader(r io.Reader) (*Reader, error) {
    55  	br := bufio.NewReader(r)
    56  	v, err := version.ReadHeader(br)
    57  	if err != nil {
    58  		return nil, err
    59  	}
    60  	switch v {
    61  	case version.Go111, version.Go119, version.Go121:
    62  		tr, err := tracev1.Parse(br, v)
    63  		if err != nil {
    64  			return nil, err
    65  		}
    66  		return &Reader{
    67  			v1Events: convertV1Trace(tr),
    68  		}, nil
    69  	case version.Go122, version.Go123, version.Go125, version.Go126:
    70  		return &Reader{
    71  			version: v,
    72  			r:       br,
    73  			order: ordering{
    74  				traceVer:    v,
    75  				mStates:     make(map[ThreadID]*mState),
    76  				pStates:     make(map[ProcID]*pState),
    77  				gStates:     make(map[GoID]*gState),
    78  				activeTasks: make(map[TaskID]taskState),
    79  			},
    80  		}, nil
    81  	default:
    82  		return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
    83  	}
    84  }
    85  
    86  // ReadEvent reads a single event from the stream.
    87  //
    88  // If the stream has been exhausted, it returns an invalid event and io.EOF.
    89  func (r *Reader) ReadEvent() (e Event, err error) {
    90  	// Return only io.EOF if we're done.
    91  	if r.done {
    92  		return Event{}, io.EOF
    93  	}
    94  
    95  	// Handle v1 execution traces.
    96  	if r.v1Events != nil {
    97  		if r.syncs == 0 {
    98  			// Always emit a sync event first, if we have any events at all.
    99  			ev, ok := r.v1Events.events.Peek()
   100  			if ok {
   101  				r.syncs++
   102  				return syncEvent(r.v1Events.evt, Time(ev.Ts-1), r.syncs), nil
   103  			}
   104  		}
   105  		ev, err := r.v1Events.next()
   106  		if err == io.EOF {
   107  			// Always emit a sync event at the end.
   108  			r.done = true
   109  			r.syncs++
   110  			return syncEvent(nil, r.v1Events.lastTs+1, r.syncs), nil
   111  		} else if err != nil {
   112  			return Event{}, err
   113  		}
   114  		return ev, nil
   115  	}
   116  
   117  	// Trace v2 parsing algorithm.
   118  	//
   119  	// (1) Read in all the batches for the next generation from the stream.
   120  	//   (a) Use the size field in the header to quickly find all batches.
   121  	// (2) Parse out the strings, stacks, CPU samples, and timestamp conversion data.
   122  	// (3) Group each event batch by M, sorted by timestamp. (batchCursor contains the groups.)
   123  	// (4) Organize batchCursors in a min-heap, ordered by the timestamp of the next event for each M.
   124  	// (5) Try to advance the next event for the M at the top of the min-heap.
   125  	//   (a) On success, select that M.
   126  	//   (b) On failure, sort the min-heap and try to advance other Ms. Select the first M that advances.
   127  	//   (c) If there's nothing left to advance, goto (1).
   128  	// (6) Select the latest event for the selected M and get it ready to be returned.
   129  	// (7) Read the next event for the selected M and update the min-heap.
   130  	// (8) Return the selected event, goto (5) on the next call.
   131  
   132  	// Set us up to track the last timestamp and fix up
   133  	// the timestamp of any event that comes through.
   134  	defer func() {
   135  		if err != nil {
   136  			return
   137  		}
   138  		if err = e.validateTableIDs(); err != nil {
   139  			return
   140  		}
   141  		if e.base.time <= r.lastTs {
   142  			e.base.time = r.lastTs + 1
   143  		}
   144  		r.lastTs = e.base.time
   145  	}()
   146  
   147  	// Consume any events in the ordering first.
   148  	if ev, ok := r.order.Next(); ok {
   149  		return ev, nil
   150  	}
   151  
   152  	// Check if we need to refresh the generation.
   153  	if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
   154  		if r.version < version.Go126 {
   155  			return r.nextGenWithSpill()
   156  		}
   157  		if r.readGenErr != nil {
   158  			return Event{}, r.readGenErr
   159  		}
   160  		gen, err := readGeneration(r.r, r.version)
   161  		if err != nil {
   162  			// Before returning an error, emit the sync event
   163  			// for the current generation and queue up the error
   164  			// for the next call.
   165  			r.readGenErr = err
   166  			r.gen = nil
   167  			r.syncs++
   168  			return syncEvent(nil, r.lastTs, r.syncs), nil
   169  		}
   170  		return r.installGen(gen)
   171  	}
   172  	tryAdvance := func(i int) (bool, error) {
   173  		bc := r.frontier[i]
   174  
   175  		if ok, err := r.order.Advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); !ok || err != nil {
   176  			return ok, err
   177  		}
   178  
   179  		// Refresh the cursor's event.
   180  		ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
   181  		if err != nil {
   182  			return false, err
   183  		}
   184  		if ok {
   185  			// If we successfully refreshed, update the heap.
   186  			heapUpdate(r.frontier, i)
   187  		} else {
   188  			// There's nothing else to read. Delete this cursor from the frontier.
   189  			r.frontier = heapRemove(r.frontier, i)
   190  		}
   191  		return true, nil
   192  	}
   193  	// Inject a CPU sample if it comes next.
   194  	if len(r.cpuSamples) != 0 {
   195  		if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time {
   196  			e := r.cpuSamples[0].asEvent(r.gen.evTable)
   197  			r.cpuSamples = r.cpuSamples[1:]
   198  			return e, nil
   199  		}
   200  	}
   201  	// Try to advance the head of the frontier, which should have the minimum timestamp.
   202  	// This should be by far the most common case
   203  	if len(r.frontier) == 0 {
   204  		return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
   205  	}
   206  	if ok, err := tryAdvance(0); err != nil {
   207  		return Event{}, err
   208  	} else if !ok {
   209  		// Try to advance the rest of the frontier, in timestamp order.
   210  		//
   211  		// To do this, sort the min-heap. A sorted min-heap is still a
   212  		// min-heap, but now we can iterate over the rest and try to
   213  		// advance in order. This path should be rare.
   214  		slices.SortFunc(r.frontier, (*batchCursor).compare)
   215  		success := false
   216  		for i := 1; i < len(r.frontier); i++ {
   217  			if ok, err = tryAdvance(i); err != nil {
   218  				return Event{}, err
   219  			} else if ok {
   220  				success = true
   221  				break
   222  			}
   223  		}
   224  		if !success {
   225  			return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
   226  		}
   227  	}
   228  
   229  	// Pick off the next event on the queue. At this point, one must exist.
   230  	ev, ok := r.order.Next()
   231  	if !ok {
   232  		panic("invariant violation: advance successful, but queue is empty")
   233  	}
   234  	return ev, nil
   235  }
   236  
   237  // nextGenWithSpill reads the generation and calls nextGen while
   238  // also handling any spilled batches.
   239  func (r *Reader) nextGenWithSpill() (Event, error) {
   240  	if r.version >= version.Go126 {
   241  		return Event{}, errors.New("internal error: nextGenWithSpill called for Go 1.26+ trace")
   242  	}
   243  	if r.spillErr != nil {
   244  		if r.spillErrSync {
   245  			return Event{}, r.spillErr
   246  		}
   247  		r.spillErrSync = true
   248  		r.syncs++
   249  		return syncEvent(nil, r.lastTs, r.syncs), nil
   250  	}
   251  	if r.gen != nil && r.spill == nil {
   252  		// If we have a generation from the last read,
   253  		// and there's nothing left in the frontier, and
   254  		// there's no spilled batch, indicating that there's
   255  		// no further generation, it means we're done.
   256  		// Emit the final sync event.
   257  		r.done = true
   258  		r.syncs++
   259  		return syncEvent(nil, r.lastTs, r.syncs), nil
   260  	}
   261  
   262  	// Read the next generation.
   263  	var gen *generation
   264  	gen, r.spill, r.spillErr = readGenerationWithSpill(r.r, r.spill, r.version)
   265  	if gen == nil {
   266  		r.gen = nil
   267  		r.spillErrSync = true
   268  		r.syncs++
   269  		return syncEvent(nil, r.lastTs, r.syncs), nil
   270  	}
   271  	return r.installGen(gen)
   272  }
   273  
   274  // installGen installs the new generation into the Reader and returns
   275  // a Sync event for the new generation.
   276  func (r *Reader) installGen(gen *generation) (Event, error) {
   277  	if gen == nil {
   278  		// Emit the final sync event.
   279  		r.gen = nil
   280  		r.done = true
   281  		r.syncs++
   282  		return syncEvent(nil, r.lastTs, r.syncs), nil
   283  	}
   284  	r.gen = gen
   285  
   286  	// Reset CPU samples cursor.
   287  	r.cpuSamples = r.gen.cpuSamples
   288  
   289  	// Reset frontier.
   290  	for _, m := range r.gen.batchMs {
   291  		batches := r.gen.batches[m]
   292  		bc := &batchCursor{m: m}
   293  		ok, err := bc.nextEvent(batches, r.gen.freq)
   294  		if err != nil {
   295  			return Event{}, err
   296  		}
   297  		if !ok {
   298  			// Turns out there aren't actually any events in these batches.
   299  			continue
   300  		}
   301  		r.frontier = heapInsert(r.frontier, bc)
   302  	}
   303  	r.syncs++
   304  
   305  	// Always emit a sync event at the beginning of the generation.
   306  	return syncEvent(r.gen.evTable, r.gen.freq.mul(r.gen.minTs), r.syncs), nil
   307  }
   308  
   309  func dumpFrontier(frontier []*batchCursor) string {
   310  	var sb strings.Builder
   311  	for _, bc := range frontier {
   312  		spec := tracev2.Specs()[bc.ev.typ]
   313  		fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time)
   314  		for i, arg := range spec.Args[1:] {
   315  			fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i])
   316  		}
   317  		fmt.Fprintf(&sb, "]\n")
   318  	}
   319  	return sb.String()
   320  }
   321  

View as plain text