// Copyright 2025 The Go Authors. All rights reserved. // Use of this source code is governed by a MIT // license that can be found in the LICENSE file. package main import ( "os" "runtime" "runtime/pprof" "sync" ) func init() { register("Cockroach35931", Cockroach35931) } type RowReceiver_cockroach35931 interface { Push() } type inboundStreamInfo_cockroach35931 struct { receiver RowReceiver_cockroach35931 } type RowChannel_cockroach35931 struct { dataChan chan struct{} } func (rc *RowChannel_cockroach35931) Push() { // The buffer size can be either 0 or 1 when this function is entered. // We need context sensitivity or a path-condition on the buffer size // to find this bug. rc.dataChan <- struct{}{} } func (rc *RowChannel_cockroach35931) initWithBufSizeAndNumSenders(chanBufSize int) { rc.dataChan = make(chan struct{}, chanBufSize) } type flowEntry_cockroach35931 struct { flow *Flow_cockroach35931 inboundStreams map[int]*inboundStreamInfo_cockroach35931 } type flowRegistry_cockroach35931 struct { sync.Mutex flows map[int]*flowEntry_cockroach35931 } func (fr *flowRegistry_cockroach35931) getEntryLocked(id int) *flowEntry_cockroach35931 { entry, ok := fr.flows[id] if !ok { entry = &flowEntry_cockroach35931{} fr.flows[id] = entry } return entry } func (fr *flowRegistry_cockroach35931) cancelPendingStreamsLocked(id int) []RowReceiver_cockroach35931 { entry := fr.flows[id] pendingReceivers := make([]RowReceiver_cockroach35931, 0) for _, is := range entry.inboundStreams { pendingReceivers = append(pendingReceivers, is.receiver) } return pendingReceivers } type Flow_cockroach35931 struct { id int flowRegistry *flowRegistry_cockroach35931 inboundStreams map[int]*inboundStreamInfo_cockroach35931 } func (f *Flow_cockroach35931) cancel() { f.flowRegistry.Lock() timedOutReceivers := f.flowRegistry.cancelPendingStreamsLocked(f.id) f.flowRegistry.Unlock() for _, receiver := range timedOutReceivers { receiver.Push() } } func (fr *flowRegistry_cockroach35931) RegisterFlow(f *Flow_cockroach35931, inboundStreams map[int]*inboundStreamInfo_cockroach35931) { entry := fr.getEntryLocked(f.id) entry.flow = f entry.inboundStreams = inboundStreams } func makeFlowRegistry_cockroach35931() *flowRegistry_cockroach35931 { return &flowRegistry_cockroach35931{ flows: make(map[int]*flowEntry_cockroach35931), } } func Cockroach35931() { prof := pprof.Lookup("goroutineleak") defer func() { // Yield several times to allow the child goroutine to run. for i := 0; i < yieldCount; i++ { runtime.Gosched() } prof.WriteTo(os.Stdout, 2) }() go func() { fr := makeFlowRegistry_cockroach35931() left := &RowChannel_cockroach35931{} left.initWithBufSizeAndNumSenders(1) right := &RowChannel_cockroach35931{} right.initWithBufSizeAndNumSenders(1) inboundStreams := map[int]*inboundStreamInfo_cockroach35931{ 0: { receiver: left, }, 1: { receiver: right, }, } left.Push() flow := &Flow_cockroach35931{ id: 0, flowRegistry: fr, inboundStreams: inboundStreams, } fr.RegisterFlow(flow, inboundStreams) flow.cancel() }() }