@ -308,22 +308,18 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
p . cond . Broadcast ( )
p . cond . Broadcast ( )
cancel ( )
cancel ( )
}
}
if p . hasNoWorkerScaling ( ) {
select {
case <- p . baseCtx . Done ( ) :
// Don't warn if the baseCtx is shutdown
default :
log . Warn (
"Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n" +
"The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required." , p . qid )
}
p . pause ( )
}
select {
select {
case <- p . baseCtx . Done ( ) :
case <- p . baseCtx . Done ( ) :
// this worker queue is shut-down don't reboost
// Don't warn or check for ongoing work if the baseCtx is shutdown
case <- p . paused :
// Don't warn or check for ongoing work if the pool is paused
default :
default :
if p . numberOfWorkers == 0 && atomic . LoadInt64 ( & p . numInQueue ) > 0 {
if p . hasNoWorkerScaling ( ) {
log . Warn (
"Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n" +
"The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required." , p . qid )
p . pause ( )
} else if p . numberOfWorkers == 0 && atomic . LoadInt64 ( & p . numInQueue ) > 0 {
// OK there are no workers but... there's still work to be done -> Reboost
// OK there are no workers but... there's still work to be done -> Reboost
p . zeroBoost ( )
p . zeroBoost ( )
// p.lock will be unlocked by zeroBoost
// p.lock will be unlocked by zeroBoost
@ -385,14 +381,37 @@ func (p *WorkerPool) pause() {
// Resume resumes the WorkerPool
// Resume resumes the WorkerPool
func ( p * WorkerPool ) Resume ( ) {
func ( p * WorkerPool ) Resume ( ) {
p . lock . Lock ( )
p . lock . Lock ( ) // can't defer unlock because of the zeroBoost at the end
defer p . lock . Unlock ( )
select {
select {
case <- p . resumed :
case <- p . resumed :
// already resumed - there's nothing to do
p . lock . Unlock ( )
return
default :
default :
p . paused = make ( chan struct { } )
close ( p . resumed )
}
}
p . paused = make ( chan struct { } )
close ( p . resumed )
// OK now we need to check if we need to add some workers...
if p . numberOfWorkers > 0 || p . hasNoWorkerScaling ( ) || atomic . LoadInt64 ( & p . numInQueue ) == 0 {
// We either have workers, can't scale or there's no work to be done -> so just resume
p . lock . Unlock ( )
return
}
// OK we got some work but no workers we need to think about boosting
select {
case <- p . baseCtx . Done ( ) :
// don't bother boosting if the baseCtx is done
p . lock . Unlock ( )
return
default :
}
// OK we'd better add some boost workers!
p . zeroBoost ( )
// p.zeroBoost will unlock the lock
}
}
// CleanUp will drain the remaining contents of the channel
// CleanUp will drain the remaining contents of the channel