1  
     2  
     3  
     4  
     5  package par
     6  
     7  import "fmt"
     8  
     9  
    10  
    11  type Queue struct {
    12  	maxActive int
    13  	st        chan queueState
    14  }
    15  
    16  type queueState struct {
    17  	active  int 
    18  	backlog []func()
    19  	idle    chan struct{} 
    20  }
    21  
    22  
    23  
    24  
    25  func NewQueue(maxActive int) *Queue {
    26  	if maxActive < 1 {
    27  		panic(fmt.Sprintf("par.NewQueue called with nonpositive limit (%d)", maxActive))
    28  	}
    29  
    30  	q := &Queue{
    31  		maxActive: maxActive,
    32  		st:        make(chan queueState, 1),
    33  	}
    34  	q.st <- queueState{}
    35  	return q
    36  }
    37  
    38  
    39  
    40  
    41  
    42  func (q *Queue) Add(f func()) {
    43  	st := <-q.st
    44  	if st.active == q.maxActive {
    45  		st.backlog = append(st.backlog, f)
    46  		q.st <- st
    47  		return
    48  	}
    49  	if st.active == 0 {
    50  		
    51  		st.idle = nil
    52  	}
    53  	st.active++
    54  	q.st <- st
    55  
    56  	go func() {
    57  		for {
    58  			f()
    59  
    60  			st := <-q.st
    61  			if len(st.backlog) == 0 {
    62  				if st.active--; st.active == 0 && st.idle != nil {
    63  					close(st.idle)
    64  				}
    65  				q.st <- st
    66  				return
    67  			}
    68  			f, st.backlog = st.backlog[0], st.backlog[1:]
    69  			q.st <- st
    70  		}
    71  	}()
    72  }
    73  
    74  
    75  
    76  func (q *Queue) Idle() <-chan struct{} {
    77  	st := <-q.st
    78  	defer func() { q.st <- st }()
    79  
    80  	if st.idle == nil {
    81  		st.idle = make(chan struct{})
    82  		if st.active == 0 {
    83  			close(st.idle)
    84  		}
    85  	}
    86  
    87  	return st.idle
    88  }
    89  
View as plain text