Only use boost workers for leveldb shadow queues (#15696)

* The leveldb shadow queue of a persistable channel queue should always start with 0
workers and just use boost to add additional workers if necessary.

* create a zero boost so that if there are no workers in a pool - boost to start the workers

* actually set timeout appropriately on boosted workers

Signed-off-by: Andrew Thornton <art27@cantab.net>
tokarchuk/v1.17
zeripath 4 years ago committed by GitHub
parent 6ebd833780
commit 0590176a23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      modules/queue/queue_disk_channel.go
  2. 10
      modules/queue/unique_queue_disk_channel.go
  3. 40
      modules/queue/workerpool.go

@ -75,10 +75,10 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
BatchLength: config.BatchLength, BatchLength: config.BatchLength,
BlockTimeout: 1 * time.Second, BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute, BoostTimeout: 5 * time.Minute,
BoostWorkers: 5, BoostWorkers: 1,
MaxWorkers: 6, MaxWorkers: 5,
}, },
Workers: 1, Workers: 0,
Name: config.Name + "-level", Name: config.Name + "-level",
}, },
DataDir: config.DataDir, DataDir: config.DataDir,

@ -73,12 +73,12 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
WorkerPoolConfiguration: WorkerPoolConfiguration{ WorkerPoolConfiguration: WorkerPoolConfiguration{
QueueLength: config.QueueLength, QueueLength: config.QueueLength,
BatchLength: config.BatchLength, BatchLength: config.BatchLength,
BlockTimeout: 0, BlockTimeout: 1 * time.Second,
BoostTimeout: 0, BoostTimeout: 5 * time.Minute,
BoostWorkers: 0, BoostWorkers: 1,
MaxWorkers: 1, MaxWorkers: 5,
}, },
Workers: 1, Workers: 0,
Name: config.Name + "-level", Name: config.Name + "-level",
}, },
DataDir: config.DataDir, DataDir: config.DataDir,

@ -70,7 +70,11 @@ func (p *WorkerPool) Push(data Data) {
atomic.AddInt64(&p.numInQueue, 1) atomic.AddInt64(&p.numInQueue, 1)
p.lock.Lock() p.lock.Lock()
if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) { if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) {
if p.numberOfWorkers == 0 {
p.zeroBoost()
} else {
p.lock.Unlock() p.lock.Unlock()
}
p.pushBoost(data) p.pushBoost(data)
} else { } else {
p.lock.Unlock() p.lock.Unlock()
@ -78,6 +82,40 @@ func (p *WorkerPool) Push(data Data) {
} }
} }
func (p *WorkerPool) zeroBoost() {
ctx, cancel := context.WithCancel(p.baseCtx)
mq := GetManager().GetManagedQueue(p.qid)
boost := p.boostWorkers
if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
boost = p.maxNumberOfWorkers - p.numberOfWorkers
}
if mq != nil {
log.Warn("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout)
start := time.Now()
pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
go func() {
select {
case <-ctx.Done():
case <-time.After(p.boostTimeout):
}
mq.RemoveWorkers(pid)
cancel()
}()
} else {
log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout)
go func() {
select {
case <-ctx.Done():
case <-time.After(p.boostTimeout):
}
cancel()
}()
}
p.lock.Unlock()
p.addWorkers(ctx, boost)
}
func (p *WorkerPool) pushBoost(data Data) { func (p *WorkerPool) pushBoost(data Data) {
select { select {
case p.dataChan <- data: case p.dataChan <- data:
@ -112,7 +150,7 @@ func (p *WorkerPool) pushBoost(data Data) {
log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout) log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
start := time.Now() start := time.Now()
pid := mq.RegisterWorkers(boost, start, false, start, cancel, false) pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
go func() { go func() {
<-ctx.Done() <-ctx.Done()
mq.RemoveWorkers(pid) mq.RemoveWorkers(pid)

Loading…
Cancel
Save