1
2
3
4
5 package main
6
7 import (
8 "os"
9 "runtime"
10 "runtime/pprof"
11 "sync"
12 "sync/atomic"
13 )
14
15 func init() {
16 register("Cockroach35073", Cockroach35073)
17 }
18
19 type ConsumerStatus_cockroach35073 uint32
20
21 const (
22 NeedMoreRows_cockroach35073 ConsumerStatus_cockroach35073 = iota
23 DrainRequested_cockroach35073
24 ConsumerClosed_cockroach35073
25 )
26
27 const rowChannelBufSize_cockroach35073 = 16
28 const outboxBufRows_cockroach35073 = 16
29
30 type rowSourceBase_cockroach35073 struct {
31 consumerStatus ConsumerStatus_cockroach35073
32 }
33
34 func (rb *rowSourceBase_cockroach35073) consumerClosed() {
35 atomic.StoreUint32((*uint32)(&rb.consumerStatus), uint32(ConsumerClosed_cockroach35073))
36 }
37
38 type RowChannelMsg_cockroach35073 int
39
40 type RowChannel_cockroach35073 struct {
41 rowSourceBase_cockroach35073
42 dataChan chan RowChannelMsg_cockroach35073
43 }
44
45 func (rc *RowChannel_cockroach35073) ConsumerClosed() {
46 rc.consumerClosed()
47 select {
48 case <-rc.dataChan:
49 default:
50 }
51 }
52
53 func (rc *RowChannel_cockroach35073) Push() ConsumerStatus_cockroach35073 {
54 consumerStatus := ConsumerStatus_cockroach35073(
55 atomic.LoadUint32((*uint32)(&rc.consumerStatus)))
56 switch consumerStatus {
57 case NeedMoreRows_cockroach35073:
58 rc.dataChan <- RowChannelMsg_cockroach35073(0)
59 case DrainRequested_cockroach35073:
60 case ConsumerClosed_cockroach35073:
61 }
62 return consumerStatus
63 }
64
65 func (rc *RowChannel_cockroach35073) InitWithNumSenders() {
66 rc.initWithBufSizeAndNumSenders(rowChannelBufSize_cockroach35073)
67 }
68
69 func (rc *RowChannel_cockroach35073) initWithBufSizeAndNumSenders(chanBufSize int) {
70 rc.dataChan = make(chan RowChannelMsg_cockroach35073, chanBufSize)
71 }
72
73 type outbox_cockroach35073 struct {
74 RowChannel_cockroach35073
75 }
76
77 func (m *outbox_cockroach35073) init() {
78 m.RowChannel_cockroach35073.InitWithNumSenders()
79 }
80
81 func (m *outbox_cockroach35073) start(wg *sync.WaitGroup) {
82 if wg != nil {
83 wg.Add(1)
84 }
85 go m.run(wg)
86 }
87
88 func (m *outbox_cockroach35073) run(wg *sync.WaitGroup) {
89 if wg != nil {
90 wg.Done()
91 }
92 }
93
94 func Cockroach35073() {
95 prof := pprof.Lookup("goroutineleak")
96 defer func() {
97
98 for i := 0; i < yieldCount; i++ {
99 runtime.Gosched()
100 }
101 prof.WriteTo(os.Stdout, 2)
102 }()
103 go func() {
104 outbox := &outbox_cockroach35073{}
105 outbox.init()
106
107 var wg sync.WaitGroup
108 for i := 0; i < outboxBufRows_cockroach35073; i++ {
109 outbox.Push()
110 }
111
112 var blockedPusherWg sync.WaitGroup
113 blockedPusherWg.Add(1)
114 go func() {
115 outbox.Push()
116 blockedPusherWg.Done()
117 }()
118
119 outbox.start(&wg)
120
121 wg.Wait()
122 outbox.RowChannel_cockroach35073.Push()
123 }()
124 }
125
View as plain text