Fix Workerpool deadlock (#10283)

* Prevent deadlock on boost

* Force a boost in testchannelqueue
tokarchuk/v1.17
zeripath 5 years ago committed by GitHub
parent 15614a8368
commit 88986746d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      modules/queue/queue_channel_test.go
  2. 2
      modules/queue/workerpool.go

@ -26,16 +26,19 @@ func TestChannelQueue(t *testing.T) {
queue, err := NewChannelQueue(handle, queue, err := NewChannelQueue(handle,
ChannelQueueConfiguration{ ChannelQueueConfiguration{
WorkerPoolConfiguration: WorkerPoolConfiguration{ WorkerPoolConfiguration: WorkerPoolConfiguration{
QueueLength: 20, QueueLength: 0,
MaxWorkers: 10, MaxWorkers: 10,
BlockTimeout: 1 * time.Second, BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute, BoostTimeout: 5 * time.Minute,
BoostWorkers: 5, BoostWorkers: 5,
}, },
Workers: 1, Workers: 0,
Name: "TestChannelQueue",
}, &testData{}) }, &testData{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, queue.(*ChannelQueue).WorkerPool.boostWorkers, 5)
go queue.Run(nilFn, nilFn) go queue.Run(nilFn, nilFn)
test1 := testData{"A", 1} test1 := testData{"A", 1}

@ -132,8 +132,8 @@ func (p *WorkerPool) pushBoost(data Data) {
p.blockTimeout /= 2 p.blockTimeout /= 2
p.lock.Unlock() p.lock.Unlock()
}() }()
p.addWorkers(ctx, boost)
p.lock.Unlock() p.lock.Unlock()
p.addWorkers(ctx, boost)
p.dataChan <- data p.dataChan <- data
} }
} }

Loading…
Cancel
Save