1
2
3
4
5 package trace
6
7 import (
8 "fmt"
9 "internal/trace/tracev2"
10 )
11
12
13 type timestamp uint64
14
15
16
17 type batch struct {
18 m threadID
19 time timestamp
20 data []byte
21 }
22
23
24
25 type threadID int64
26
27
28
29 func readBatch(b []byte) (batch, uint64, uint64, error) {
30 if len(b) == 0 {
31 return batch{}, 0, 0, fmt.Errorf("batch is empty")
32 }
33 data := make([]byte, len(b))
34 if nw := copy(data, b); nw != len(b) {
35 return batch{}, 0, 0, fmt.Errorf("unexpected error copying batch")
36 }
37
38 if typ := tracev2.EventType(b[0]); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
39 return batch{}, 0, 1, fmt.Errorf("expected batch event, got event %d", typ)
40 }
41
42
43
44 total := 1
45 b = b[1:]
46 gen, n, err := readUvarint(b)
47 if err != nil {
48 return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch gen: %w", err)
49 }
50 total += n
51 b = b[n:]
52 m, n, err := readUvarint(b)
53 if err != nil {
54 return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch M ID: %w", err)
55 }
56 total += n
57 b = b[n:]
58 ts, n, err := readUvarint(b)
59 if err != nil {
60 return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch timestamp: %w", err)
61 }
62 total += n
63 b = b[n:]
64
65
66 size, n, err := readUvarint(b)
67 if err != nil {
68 return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch size: %w", err)
69 }
70 if size > tracev2.MaxBatchSize {
71 return batch{}, gen, uint64(total + n), fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize)
72 }
73 total += n
74 total += int(size)
75 data = data[:total]
76
77
78 return batch{
79 m: threadID(m),
80 time: timestamp(ts),
81 data: data,
82 }, gen, uint64(total), nil
83 }
84
View as plain text