1
2
3
4
5
12 package main
13
14 import (
15 "os"
16 "runtime/pprof"
17 "sync"
18 "time"
19 )
20
21 func init() {
22 register("Kubernetes13135", Kubernetes13135)
23 }
24
25 var (
26 StopChannel_kubernetes13135 chan struct{}
27 )
28
29 func Util_kubernetes13135(f func(), period time.Duration, stopCh <-chan struct{}) {
30 for {
31 select {
32 case <-stopCh:
33 return
34 default:
35 }
36 func() {
37 f()
38 }()
39 time.Sleep(period)
40 }
41 }
42
43 type Store_kubernetes13135 interface {
44 Add(obj interface{})
45 Replace(obj interface{})
46 }
47
48 type Reflector_kubernetes13135 struct {
49 store Store_kubernetes13135
50 }
51
52 func (r *Reflector_kubernetes13135) ListAndWatch(stopCh <-chan struct{}) error {
53 r.syncWith()
54 return nil
55 }
56
57 func NewReflector_kubernetes13135(store Store_kubernetes13135) *Reflector_kubernetes13135 {
58 return &Reflector_kubernetes13135{
59 store: store,
60 }
61 }
62
63 func (r *Reflector_kubernetes13135) syncWith() {
64 r.store.Replace(nil)
65 }
66
67 type Cacher_kubernetes13135 struct {
68 sync.Mutex
69 initialized sync.WaitGroup
70 initOnce sync.Once
71 watchCache *WatchCache_kubernetes13135
72 reflector *Reflector_kubernetes13135
73 }
74
75 func (c *Cacher_kubernetes13135) processEvent() {
76 c.Lock()
77 defer c.Unlock()
78 }
79
80 func (c *Cacher_kubernetes13135) startCaching(stopChannel <-chan struct{}) {
81 c.Lock()
82 for {
83 err := c.reflector.ListAndWatch(stopChannel)
84 if err == nil {
85 break
86 }
87 }
88 }
89
90 type WatchCache_kubernetes13135 struct {
91 sync.RWMutex
92 onReplace func()
93 onEvent func()
94 }
95
96 func (w *WatchCache_kubernetes13135) SetOnEvent(onEvent func()) {
97 w.Lock()
98 defer w.Unlock()
99 w.onEvent = onEvent
100 }
101
102 func (w *WatchCache_kubernetes13135) SetOnReplace(onReplace func()) {
103 w.Lock()
104 defer w.Unlock()
105 w.onReplace = onReplace
106 }
107
108 func (w *WatchCache_kubernetes13135) processEvent() {
109 w.Lock()
110 defer w.Unlock()
111 if w.onEvent != nil {
112 w.onEvent()
113 }
114 }
115
116 func (w *WatchCache_kubernetes13135) Add(obj interface{}) {
117 w.processEvent()
118 }
119
120 func (w *WatchCache_kubernetes13135) Replace(obj interface{}) {
121 w.Lock()
122 defer w.Unlock()
123 if w.onReplace != nil {
124 w.onReplace()
125 }
126 }
127
128 func NewCacher_kubernetes13135(stopCh <-chan struct{}) *Cacher_kubernetes13135 {
129 watchCache := &WatchCache_kubernetes13135{}
130 cacher := &Cacher_kubernetes13135{
131 initialized: sync.WaitGroup{},
132 watchCache: watchCache,
133 reflector: NewReflector_kubernetes13135(watchCache),
134 }
135 cacher.initialized.Add(1)
136 watchCache.SetOnReplace(func() {
137 cacher.initOnce.Do(func() { cacher.initialized.Done() })
138 cacher.Unlock()
139 })
140 watchCache.SetOnEvent(cacher.processEvent)
141 go Util_kubernetes13135(func() { cacher.startCaching(stopCh) }, 0, stopCh)
142 cacher.initialized.Wait()
143 return cacher
144 }
145
146 func Kubernetes13135() {
147 prof := pprof.Lookup("goroutineleak")
148 defer func() {
149 time.Sleep(100 * time.Millisecond)
150 prof.WriteTo(os.Stdout, 2)
151 }()
152
153 StopChannel_kubernetes13135 = make(chan struct{})
154 for i := 0; i < 50; i++ {
155 go func() {
156
157
158
159 StopChannel_kubernetes13135 := make(chan struct{})
160
161 c := NewCacher_kubernetes13135(StopChannel_kubernetes13135)
162 go c.watchCache.Add(nil)
163 go close(StopChannel_kubernetes13135)
164 }()
165 }
166 }
167
View as plain text