Source file src/runtime/testdata/testgoroutineleakprofile/goker/kubernetes13135.go

     1  // Copyright 2025 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a MIT
     3  // license that can be found in the LICENSE file.
     4  
     5  /*
     6   * Project: kubernetes
     7   * Issue or PR  : https://github.com/kubernetes/kubernetes/pull/13135
     8   * Buggy version: 6ced66249d4fd2a81e86b4a71d8df0139fe5ceae
     9   * fix commit-id: a12b7edc42c5c06a2e7d9f381975658692951d5a
    10   * Flaky: 93/100
    11   */
    12  package main
    13  
    14  import (
    15  	"os"
    16  	"runtime/pprof"
    17  	"sync"
    18  	"time"
    19  )
    20  
    21  func init() {
    22  	register("Kubernetes13135", Kubernetes13135)
    23  }
    24  
    25  var (
    26  	StopChannel_kubernetes13135 chan struct{}
    27  )
    28  
    29  func Util_kubernetes13135(f func(), period time.Duration, stopCh <-chan struct{}) {
    30  	for {
    31  		select {
    32  		case <-stopCh:
    33  			return
    34  		default:
    35  		}
    36  		func() {
    37  			f()
    38  		}()
    39  		time.Sleep(period)
    40  	}
    41  }
    42  
    43  type Store_kubernetes13135 interface {
    44  	Add(obj interface{})
    45  	Replace(obj interface{})
    46  }
    47  
    48  type Reflector_kubernetes13135 struct {
    49  	store Store_kubernetes13135
    50  }
    51  
    52  func (r *Reflector_kubernetes13135) ListAndWatch(stopCh <-chan struct{}) error {
    53  	r.syncWith()
    54  	return nil
    55  }
    56  
    57  func NewReflector_kubernetes13135(store Store_kubernetes13135) *Reflector_kubernetes13135 {
    58  	return &Reflector_kubernetes13135{
    59  		store: store,
    60  	}
    61  }
    62  
    63  func (r *Reflector_kubernetes13135) syncWith() {
    64  	r.store.Replace(nil)
    65  }
    66  
    67  type Cacher_kubernetes13135 struct {
    68  	sync.Mutex
    69  	initialized sync.WaitGroup
    70  	initOnce    sync.Once
    71  	watchCache  *WatchCache_kubernetes13135
    72  	reflector   *Reflector_kubernetes13135
    73  }
    74  
    75  func (c *Cacher_kubernetes13135) processEvent() {
    76  	c.Lock()
    77  	defer c.Unlock()
    78  }
    79  
    80  func (c *Cacher_kubernetes13135) startCaching(stopChannel <-chan struct{}) {
    81  	c.Lock()
    82  	for {
    83  		err := c.reflector.ListAndWatch(stopChannel)
    84  		if err == nil {
    85  			break
    86  		}
    87  	}
    88  }
    89  
    90  type WatchCache_kubernetes13135 struct {
    91  	sync.RWMutex
    92  	onReplace func()
    93  	onEvent   func()
    94  }
    95  
    96  func (w *WatchCache_kubernetes13135) SetOnEvent(onEvent func()) {
    97  	w.Lock()
    98  	defer w.Unlock()
    99  	w.onEvent = onEvent
   100  }
   101  
   102  func (w *WatchCache_kubernetes13135) SetOnReplace(onReplace func()) {
   103  	w.Lock()
   104  	defer w.Unlock()
   105  	w.onReplace = onReplace
   106  }
   107  
   108  func (w *WatchCache_kubernetes13135) processEvent() {
   109  	w.Lock()
   110  	defer w.Unlock()
   111  	if w.onEvent != nil {
   112  		w.onEvent()
   113  	}
   114  }
   115  
   116  func (w *WatchCache_kubernetes13135) Add(obj interface{}) {
   117  	w.processEvent()
   118  }
   119  
   120  func (w *WatchCache_kubernetes13135) Replace(obj interface{}) {
   121  	w.Lock()
   122  	defer w.Unlock()
   123  	if w.onReplace != nil {
   124  		w.onReplace()
   125  	}
   126  }
   127  
   128  func NewCacher_kubernetes13135(stopCh <-chan struct{}) *Cacher_kubernetes13135 {
   129  	watchCache := &WatchCache_kubernetes13135{}
   130  	cacher := &Cacher_kubernetes13135{
   131  		initialized: sync.WaitGroup{},
   132  		watchCache:  watchCache,
   133  		reflector:   NewReflector_kubernetes13135(watchCache),
   134  	}
   135  	cacher.initialized.Add(1)
   136  	watchCache.SetOnReplace(func() {
   137  		cacher.initOnce.Do(func() { cacher.initialized.Done() })
   138  		cacher.Unlock()
   139  	})
   140  	watchCache.SetOnEvent(cacher.processEvent)
   141  	go Util_kubernetes13135(func() { cacher.startCaching(stopCh) }, 0, stopCh) // G2
   142  	cacher.initialized.Wait()
   143  	return cacher
   144  }
   145  
   146  func Kubernetes13135() {
   147  	prof := pprof.Lookup("goroutineleak")
   148  	defer func() {
   149  		time.Sleep(100 * time.Millisecond)
   150  		prof.WriteTo(os.Stdout, 2)
   151  	}()
   152  
   153  	StopChannel_kubernetes13135 = make(chan struct{})
   154  	for i := 0; i < 50; i++ {
   155  		go func() {
   156  			// Should create a local channel. Using a single global channel
   157  			// concurrently will cause a deadlock which does not actually exist
   158  			// in the original microbenchmark.
   159  			StopChannel_kubernetes13135 := make(chan struct{})
   160  
   161  			c := NewCacher_kubernetes13135(StopChannel_kubernetes13135) // G1
   162  			go c.watchCache.Add(nil)                                    // G3
   163  			go close(StopChannel_kubernetes13135)
   164  		}()
   165  	}
   166  }
   167  

View as plain text