1
2
3
4
5 package trace
6
7 import (
8 "bufio"
9 "errors"
10 "fmt"
11 "io"
12 "slices"
13 "strings"
14
15 "internal/trace/internal/tracev1"
16 "internal/trace/tracev2"
17 "internal/trace/version"
18 )
19
20
21
22
23
24
25 type Reader struct {
26 version version.Version
27 r *bufio.Reader
28 lastTs Time
29 gen *generation
30 frontier []*batchCursor
31 cpuSamples []cpuSample
32 order ordering
33 syncs int
34 readGenErr error
35 done bool
36
37
38
39
40
41
42
43
44
45
46 spill *spilledBatch
47 spillErr error
48 spillErrSync bool
49
50 v1Events *traceV1Converter
51 }
52
53
54 func NewReader(r io.Reader) (*Reader, error) {
55 br := bufio.NewReader(r)
56 v, err := version.ReadHeader(br)
57 if err != nil {
58 return nil, err
59 }
60 switch v {
61 case version.Go111, version.Go119, version.Go121:
62 tr, err := tracev1.Parse(br, v)
63 if err != nil {
64 return nil, err
65 }
66 return &Reader{
67 v1Events: convertV1Trace(tr),
68 }, nil
69 case version.Go122, version.Go123, version.Go125, version.Go126:
70 return &Reader{
71 version: v,
72 r: br,
73 order: ordering{
74 traceVer: v,
75 mStates: make(map[ThreadID]*mState),
76 pStates: make(map[ProcID]*pState),
77 gStates: make(map[GoID]*gState),
78 activeTasks: make(map[TaskID]taskState),
79 },
80 }, nil
81 default:
82 return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
83 }
84 }
85
86
87
88
89 func (r *Reader) ReadEvent() (e Event, err error) {
90
91 if r.done {
92 return Event{}, io.EOF
93 }
94
95
96 if r.v1Events != nil {
97 if r.syncs == 0 {
98
99 ev, ok := r.v1Events.events.Peek()
100 if ok {
101 r.syncs++
102 return syncEvent(r.v1Events.evt, Time(ev.Ts-1), r.syncs), nil
103 }
104 }
105 ev, err := r.v1Events.next()
106 if err == io.EOF {
107
108 r.done = true
109 r.syncs++
110 return syncEvent(nil, r.v1Events.lastTs+1, r.syncs), nil
111 } else if err != nil {
112 return Event{}, err
113 }
114 return ev, nil
115 }
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 defer func() {
135 if err != nil {
136 return
137 }
138 if err = e.validateTableIDs(); err != nil {
139 return
140 }
141 if e.base.time <= r.lastTs {
142 e.base.time = r.lastTs + 1
143 }
144 r.lastTs = e.base.time
145 }()
146
147
148 if ev, ok := r.order.Next(); ok {
149 return ev, nil
150 }
151
152
153 if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
154 if r.version < version.Go126 {
155 return r.nextGenWithSpill()
156 }
157 if r.readGenErr != nil {
158 return Event{}, r.readGenErr
159 }
160 gen, err := readGeneration(r.r, r.version)
161 if err != nil {
162
163
164
165 r.readGenErr = err
166 r.gen = nil
167 r.syncs++
168 return syncEvent(nil, r.lastTs, r.syncs), nil
169 }
170 return r.installGen(gen)
171 }
172 tryAdvance := func(i int) (bool, error) {
173 bc := r.frontier[i]
174
175 if ok, err := r.order.Advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); !ok || err != nil {
176 return ok, err
177 }
178
179
180 ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
181 if err != nil {
182 return false, err
183 }
184 if ok {
185
186 heapUpdate(r.frontier, i)
187 } else {
188
189 r.frontier = heapRemove(r.frontier, i)
190 }
191 return true, nil
192 }
193
194 if len(r.cpuSamples) != 0 {
195 if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time {
196 e := r.cpuSamples[0].asEvent(r.gen.evTable)
197 r.cpuSamples = r.cpuSamples[1:]
198 return e, nil
199 }
200 }
201
202
203 if len(r.frontier) == 0 {
204 return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
205 }
206 if ok, err := tryAdvance(0); err != nil {
207 return Event{}, err
208 } else if !ok {
209
210
211
212
213
214 slices.SortFunc(r.frontier, (*batchCursor).compare)
215 success := false
216 for i := 1; i < len(r.frontier); i++ {
217 if ok, err = tryAdvance(i); err != nil {
218 return Event{}, err
219 } else if ok {
220 success = true
221 break
222 }
223 }
224 if !success {
225 return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
226 }
227 }
228
229
230 ev, ok := r.order.Next()
231 if !ok {
232 panic("invariant violation: advance successful, but queue is empty")
233 }
234 return ev, nil
235 }
236
237
238
239 func (r *Reader) nextGenWithSpill() (Event, error) {
240 if r.version >= version.Go126 {
241 return Event{}, errors.New("internal error: nextGenWithSpill called for Go 1.26+ trace")
242 }
243 if r.spillErr != nil {
244 if r.spillErrSync {
245 return Event{}, r.spillErr
246 }
247 r.spillErrSync = true
248 r.syncs++
249 return syncEvent(nil, r.lastTs, r.syncs), nil
250 }
251 if r.gen != nil && r.spill == nil {
252
253
254
255
256
257 r.done = true
258 r.syncs++
259 return syncEvent(nil, r.lastTs, r.syncs), nil
260 }
261
262
263 var gen *generation
264 gen, r.spill, r.spillErr = readGenerationWithSpill(r.r, r.spill, r.version)
265 if gen == nil {
266 r.gen = nil
267 r.spillErrSync = true
268 r.syncs++
269 return syncEvent(nil, r.lastTs, r.syncs), nil
270 }
271 return r.installGen(gen)
272 }
273
274
275
276 func (r *Reader) installGen(gen *generation) (Event, error) {
277 if gen == nil {
278
279 r.gen = nil
280 r.done = true
281 r.syncs++
282 return syncEvent(nil, r.lastTs, r.syncs), nil
283 }
284 r.gen = gen
285
286
287 r.cpuSamples = r.gen.cpuSamples
288
289
290 for _, m := range r.gen.batchMs {
291 batches := r.gen.batches[m]
292 bc := &batchCursor{m: m}
293 ok, err := bc.nextEvent(batches, r.gen.freq)
294 if err != nil {
295 return Event{}, err
296 }
297 if !ok {
298
299 continue
300 }
301 r.frontier = heapInsert(r.frontier, bc)
302 }
303 r.syncs++
304
305
306 return syncEvent(r.gen.evTable, r.gen.freq.mul(r.gen.minTs), r.syncs), nil
307 }
308
309 func dumpFrontier(frontier []*batchCursor) string {
310 var sb strings.Builder
311 for _, bc := range frontier {
312 spec := tracev2.Specs()[bc.ev.typ]
313 fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time)
314 for i, arg := range spec.Args[1:] {
315 fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i])
316 }
317 fmt.Fprintf(&sb, "]\n")
318 }
319 return sb.String()
320 }
321
View as plain text