1
2
3
4
5
12 package main
13
14 import (
15 "errors"
16 "os"
17 "runtime"
18 "runtime/pprof"
19 "sync"
20 "time"
21 )
22
23 func init() {
24 register("Moby27782", Moby27782)
25 }
26
27 type Event_moby27782 struct {
28 Op Op_moby27782
29 }
30
31 type Op_moby27782 uint32
32
33 const (
34 Create_moby27782 Op_moby27782 = 1 << iota
35 Write_moby27782
36 Remove_moby27782
37 Rename_moby27782
38 Chmod_moby27782
39 )
40
41 func newEvent(op Op_moby27782) Event_moby27782 {
42 return Event_moby27782{op}
43 }
44
45 func (e *Event_moby27782) ignoreLinux(w *Watcher_moby27782) bool {
46 if e.Op != Write_moby27782 {
47 w.mu.Lock()
48 defer w.mu.Unlock()
49 w.cv.Broadcast()
50 return true
51 }
52 runtime.Gosched()
53 return false
54 }
55
56 type Watcher_moby27782 struct {
57 Events chan Event_moby27782
58 mu sync.Mutex
59 cv *sync.Cond
60 done chan struct{}
61 }
62
63 func NewWatcher_moby27782() *Watcher_moby27782 {
64 w := &Watcher_moby27782{
65 Events: make(chan Event_moby27782),
66 done: make(chan struct{}),
67 }
68 w.cv = sync.NewCond(&w.mu)
69 go w.readEvents()
70 return w
71 }
72
73 func (w *Watcher_moby27782) readEvents() {
74 defer close(w.Events)
75 for {
76 if w.isClosed() {
77 return
78 }
79 event := newEvent(Write_moby27782)
80 if !event.ignoreLinux(w) {
81 runtime.Gosched()
82 select {
83 case w.Events <- event:
84 case <-w.done:
85 return
86 }
87 }
88 }
89 }
90
91 func (w *Watcher_moby27782) isClosed() bool {
92 select {
93 case <-w.done:
94 return true
95 default:
96 return false
97 }
98 }
99
100 func (w *Watcher_moby27782) Close() {
101 if w.isClosed() {
102 return
103 }
104 close(w.done)
105 }
106
107 func (w *Watcher_moby27782) Remove() {
108 w.mu.Lock()
109 defer w.mu.Unlock()
110 exists := true
111 for exists {
112 w.cv.Wait()
113 runtime.Gosched()
114 }
115 }
116
117 type FileWatcher_moby27782 interface {
118 Events() <-chan Event_moby27782
119 Remove()
120 Close()
121 }
122
123 func New_moby27782() FileWatcher_moby27782 {
124 return NewEventWatcher_moby27782()
125 }
126
127 func NewEventWatcher_moby27782() FileWatcher_moby27782 {
128 return &fsNotifyWatcher_moby27782{NewWatcher_moby27782()}
129 }
130
131 type fsNotifyWatcher_moby27782 struct {
132 *Watcher_moby27782
133 }
134
135 func (w *fsNotifyWatcher_moby27782) Events() <-chan Event_moby27782 {
136 return w.Watcher_moby27782.Events
137 }
138
139 func watchFile_moby27782() FileWatcher_moby27782 {
140 fileWatcher := New_moby27782()
141 return fileWatcher
142 }
143
144 type LogWatcher_moby27782 struct {
145 closeOnce sync.Once
146 closeNotifier chan struct{}
147 }
148
149 func (w *LogWatcher_moby27782) Close() {
150 w.closeOnce.Do(func() {
151 close(w.closeNotifier)
152 })
153 }
154
155 func (w *LogWatcher_moby27782) WatchClose() <-chan struct{} {
156 return w.closeNotifier
157 }
158
159 func NewLogWatcher_moby27782() *LogWatcher_moby27782 {
160 return &LogWatcher_moby27782{
161 closeNotifier: make(chan struct{}),
162 }
163 }
164
165 func followLogs_moby27782(logWatcher *LogWatcher_moby27782) {
166 fileWatcher := watchFile_moby27782()
167 defer func() {
168 fileWatcher.Close()
169 }()
170 waitRead := func() {
171 runtime.Gosched()
172 select {
173 case <-fileWatcher.Events():
174 case <-logWatcher.WatchClose():
175 fileWatcher.Remove()
176 return
177 }
178 }
179 handleDecodeErr := func() {
180 waitRead()
181 }
182 handleDecodeErr()
183 }
184
185 type Container_moby27782 struct {
186 LogDriver *JSONFileLogger_moby27782
187 }
188
189 func (container *Container_moby27782) InitializeStdio() {
190 if err := container.startLogging(); err != nil {
191 container.Reset()
192 }
193 }
194
195 func (container *Container_moby27782) startLogging() error {
196 l := &JSONFileLogger_moby27782{
197 readers: make(map[*LogWatcher_moby27782]struct{}),
198 }
199 container.LogDriver = l
200 l.ReadLogs()
201 return errors.New("Some error")
202 }
203
204 func (container *Container_moby27782) Reset() {
205 if container.LogDriver != nil {
206 container.LogDriver.Close()
207 }
208 }
209
210 type JSONFileLogger_moby27782 struct {
211 mu sync.Mutex
212 readers map[*LogWatcher_moby27782]struct{}
213 }
214
215 func (l *JSONFileLogger_moby27782) ReadLogs() *LogWatcher_moby27782 {
216 logWatcher := NewLogWatcher_moby27782()
217 go l.readLogs(logWatcher)
218 return logWatcher
219 }
220
221 func (l *JSONFileLogger_moby27782) readLogs(logWatcher *LogWatcher_moby27782) {
222 l.mu.Lock()
223 defer l.mu.Unlock()
224
225 l.readers[logWatcher] = struct{}{}
226 followLogs_moby27782(logWatcher)
227 }
228
229 func (l *JSONFileLogger_moby27782) Close() {
230 l.mu.Lock()
231 defer l.mu.Unlock()
232
233 for r := range l.readers {
234 r.Close()
235 delete(l.readers, r)
236 }
237 }
238
239 func Moby27782() {
240 prof := pprof.Lookup("goroutineleak")
241 defer func() {
242 time.Sleep(100 * time.Millisecond)
243 prof.WriteTo(os.Stdout, 2)
244 }()
245
246 for i := 0; i < 10000; i++ {
247 go (&Container_moby27782{}).InitializeStdio()
248 }
249 }
250
View as plain text