Source file src/runtime/testdata/testgoroutineleakprofile/goker/grpc1275.go

     1  // Copyright 2025 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a MIT
     3  // license that can be found in the LICENSE file.
     4  
     5  /*
     6   * Project: grpc-go
     7   * Issue or PR  : https://github.com/grpc/grpc-go/pull/1275
     8   * Buggy version: (missing)
     9   * fix commit-id: 0669f3f89e0330e94bb13fa1ce8cc704aab50c9c
    10   * Flaky: 100/100
    11   */
    12  package main
    13  
    14  import (
    15  	"io"
    16  	"os"
    17  	"runtime/pprof"
    18  	"time"
    19  )
    20  
    21  func init() {
    22  	register("Grpc1275", Grpc1275)
    23  }
    24  
    25  type recvBuffer_grpc1275 struct {
    26  	c chan bool
    27  }
    28  
    29  func (b *recvBuffer_grpc1275) get() <-chan bool {
    30  	return b.c
    31  }
    32  
    33  type recvBufferReader_grpc1275 struct {
    34  	recv *recvBuffer_grpc1275
    35  }
    36  
    37  func (r *recvBufferReader_grpc1275) Read(p []byte) (int, error) {
    38  	select {
    39  	case <-r.recv.get():
    40  	}
    41  	return 0, nil
    42  }
    43  
    44  type Stream_grpc1275 struct {
    45  	trReader io.Reader
    46  }
    47  
    48  func (s *Stream_grpc1275) Read(p []byte) (int, error) {
    49  	return io.ReadFull(s.trReader, p)
    50  }
    51  
    52  type http2Client_grpc1275 struct{}
    53  
    54  func (t *http2Client_grpc1275) CloseStream(s *Stream_grpc1275) {
    55  	// It is the client.CloseSream() method called by the
    56  	// main goroutine that should send the message, but it
    57  	// is not. The patch is to send out this message.
    58  }
    59  
    60  func (t *http2Client_grpc1275) NewStream() *Stream_grpc1275 {
    61  	return &Stream_grpc1275{
    62  		trReader: &recvBufferReader_grpc1275{
    63  			recv: &recvBuffer_grpc1275{
    64  				c: make(chan bool),
    65  			},
    66  		},
    67  	}
    68  }
    69  
    70  func testInflightStreamClosing_grpc1275() {
    71  	client := &http2Client_grpc1275{}
    72  	stream := client.NewStream()
    73  	donec := make(chan bool)
    74  	go func() { // G2
    75  		defer close(donec)
    76  		stream.Read([]byte{1})
    77  	}()
    78  
    79  	client.CloseStream(stream)
    80  
    81  	timeout := time.NewTimer(300 * time.Nanosecond)
    82  	select {
    83  	case <-donec:
    84  		if !timeout.Stop() {
    85  			<-timeout.C
    86  		}
    87  	case <-timeout.C:
    88  	}
    89  }
    90  
    91  ///
    92  /// G1 									G2
    93  /// testInflightStreamClosing()
    94  /// 									stream.Read()
    95  /// 									io.ReadFull()
    96  /// 									<- r.recv.get()
    97  /// CloseStream()
    98  /// <- donec
    99  /// ------------G1 timeout, G2 leak---------------------
   100  ///
   101  
   102  func Grpc1275() {
   103  	prof := pprof.Lookup("goroutineleak")
   104  	defer func() {
   105  		time.Sleep(100 * time.Millisecond)
   106  		prof.WriteTo(os.Stdout, 2)
   107  	}()
   108  	go func() {
   109  		testInflightStreamClosing_grpc1275() // G1
   110  	}()
   111  }
   112  

View as plain text