1
2
3
4
5 package main
6
7 import (
8 "os"
9 "runtime"
10 "runtime/pprof"
11 "sync"
12 )
13
14 func init() {
15 register("Cockroach35931", Cockroach35931)
16 }
17
18 type RowReceiver_cockroach35931 interface {
19 Push()
20 }
21
22 type inboundStreamInfo_cockroach35931 struct {
23 receiver RowReceiver_cockroach35931
24 }
25
26 type RowChannel_cockroach35931 struct {
27 dataChan chan struct{}
28 }
29
30 func (rc *RowChannel_cockroach35931) Push() {
31
32
33
34 rc.dataChan <- struct{}{}
35 }
36
37 func (rc *RowChannel_cockroach35931) initWithBufSizeAndNumSenders(chanBufSize int) {
38 rc.dataChan = make(chan struct{}, chanBufSize)
39 }
40
41 type flowEntry_cockroach35931 struct {
42 flow *Flow_cockroach35931
43 inboundStreams map[int]*inboundStreamInfo_cockroach35931
44 }
45
46 type flowRegistry_cockroach35931 struct {
47 sync.Mutex
48 flows map[int]*flowEntry_cockroach35931
49 }
50
51 func (fr *flowRegistry_cockroach35931) getEntryLocked(id int) *flowEntry_cockroach35931 {
52 entry, ok := fr.flows[id]
53 if !ok {
54 entry = &flowEntry_cockroach35931{}
55 fr.flows[id] = entry
56 }
57 return entry
58 }
59
60 func (fr *flowRegistry_cockroach35931) cancelPendingStreamsLocked(id int) []RowReceiver_cockroach35931 {
61 entry := fr.flows[id]
62 pendingReceivers := make([]RowReceiver_cockroach35931, 0)
63 for _, is := range entry.inboundStreams {
64 pendingReceivers = append(pendingReceivers, is.receiver)
65 }
66 return pendingReceivers
67 }
68
69 type Flow_cockroach35931 struct {
70 id int
71 flowRegistry *flowRegistry_cockroach35931
72 inboundStreams map[int]*inboundStreamInfo_cockroach35931
73 }
74
75 func (f *Flow_cockroach35931) cancel() {
76 f.flowRegistry.Lock()
77 timedOutReceivers := f.flowRegistry.cancelPendingStreamsLocked(f.id)
78 f.flowRegistry.Unlock()
79
80 for _, receiver := range timedOutReceivers {
81 receiver.Push()
82 }
83 }
84
85 func (fr *flowRegistry_cockroach35931) RegisterFlow(f *Flow_cockroach35931, inboundStreams map[int]*inboundStreamInfo_cockroach35931) {
86 entry := fr.getEntryLocked(f.id)
87 entry.flow = f
88 entry.inboundStreams = inboundStreams
89 }
90
91 func makeFlowRegistry_cockroach35931() *flowRegistry_cockroach35931 {
92 return &flowRegistry_cockroach35931{
93 flows: make(map[int]*flowEntry_cockroach35931),
94 }
95 }
96
97 func Cockroach35931() {
98 prof := pprof.Lookup("goroutineleak")
99 defer func() {
100
101 for i := 0; i < yieldCount; i++ {
102 runtime.Gosched()
103 }
104 prof.WriteTo(os.Stdout, 2)
105 }()
106 go func() {
107 fr := makeFlowRegistry_cockroach35931()
108
109 left := &RowChannel_cockroach35931{}
110 left.initWithBufSizeAndNumSenders(1)
111 right := &RowChannel_cockroach35931{}
112 right.initWithBufSizeAndNumSenders(1)
113
114 inboundStreams := map[int]*inboundStreamInfo_cockroach35931{
115 0: {
116 receiver: left,
117 },
118 1: {
119 receiver: right,
120 },
121 }
122
123 left.Push()
124
125 flow := &Flow_cockroach35931{
126 id: 0,
127 flowRegistry: fr,
128 inboundStreams: inboundStreams,
129 }
130
131 fr.RegisterFlow(flow, inboundStreams)
132
133 flow.cancel()
134 }()
135 }
136
View as plain text