Multiple Queue improvements: LevelDB Wait on empty, shutdown empty shadow level queue, reduce goroutines etc (#15693)

* move shutdownfns, terminatefns and hammerfns out of separate goroutines

Coalesce the shutdownfns etc into a list of functions that get run at shutdown
rather then have them run at goroutines blocked on selects.

This may help reduce the background select/poll load in certain
configurations.

* The LevelDB queues can actually wait on empty instead of polling

Slight refactor to cause leveldb queues to wait on empty instead of polling.

* Shutdown the shadow level queue once it is empty

* Remove bytefifo additional goroutine for readToChan as it can just be run in run

* Remove additional removeWorkers goroutine for workers

* Simplify the AtShutdown and AtTerminate functions and add Channel Flusher

* Add shutdown flusher to CUQ

* move persistable channel shutdown stuff to Shutdown Fn

* Ensure that UPCQ has the correct config

* handle shutdown during the flushing

* reduce risk of race between zeroBoost and addWorkers

* prevent double shutdown

Signed-off-by: Andrew Thornton <art27@cantab.net>
tokarchuk/v1.17
zeripath 4 years ago committed by GitHub
parent 9f19c2b8cc
commit ba526ceffe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      modules/graceful/context.go
  2. 116
      modules/graceful/manager.go
  3. 26
      modules/graceful/manager_unix.go
  4. 28
      modules/graceful/manager_windows.go
  5. 8
      modules/indexer/code/indexer.go
  6. 4
      modules/indexer/issues/indexer.go
  7. 18
      modules/queue/bytefifo.go
  8. 6
      modules/queue/manager.go
  9. 6
      modules/queue/queue.go
  10. 167
      modules/queue/queue_bytefifo.go
  11. 60
      modules/queue/queue_channel.go
  12. 5
      modules/queue/queue_channel_test.go
  13. 9
      modules/queue/queue_disk.go
  14. 75
      modules/queue/queue_disk_channel.go
  15. 29
      modules/queue/queue_disk_channel_test.go
  16. 9
      modules/queue/queue_disk_test.go
  17. 18
      modules/queue/queue_redis.go
  18. 8
      modules/queue/queue_wrapped.go
  19. 57
      modules/queue/unique_queue_channel.go
  20. 11
      modules/queue/unique_queue_disk.go
  21. 82
      modules/queue/unique_queue_disk_channel.go
  22. 21
      modules/queue/unique_queue_redis.go
  23. 53
      modules/queue/workerpool.go
  24. 5
      services/pull/check_test.go

@ -6,17 +6,9 @@ package graceful
import ( import (
"context" "context"
"fmt"
"time" "time"
) )
// Errors for context.Err()
var (
ErrShutdown = fmt.Errorf("Graceful Manager called Shutdown")
ErrHammer = fmt.Errorf("Graceful Manager called Hammer")
ErrTerminate = fmt.Errorf("Graceful Manager called Terminate")
)
// ChannelContext is a context that wraps a channel and error as a context // ChannelContext is a context that wraps a channel and error as a context
type ChannelContext struct { type ChannelContext struct {
done <-chan struct{} done <-chan struct{}
@ -63,28 +55,19 @@ func (ctx *ChannelContext) Value(key interface{}) interface{} {
// Callers using this context should ensure that they are registered as a running server // Callers using this context should ensure that they are registered as a running server
// in order that they are waited for. // in order that they are waited for.
func (g *Manager) ShutdownContext() context.Context { func (g *Manager) ShutdownContext() context.Context {
return &ChannelContext{ return g.shutdownCtx
done: g.IsShutdown(),
err: ErrShutdown,
}
} }
// HammerContext returns a context.Context that is Done at hammer // HammerContext returns a context.Context that is Done at hammer
// Callers using this context should ensure that they are registered as a running server // Callers using this context should ensure that they are registered as a running server
// in order that they are waited for. // in order that they are waited for.
func (g *Manager) HammerContext() context.Context { func (g *Manager) HammerContext() context.Context {
return &ChannelContext{ return g.hammerCtx
done: g.IsHammer(),
err: ErrHammer,
}
} }
// TerminateContext returns a context.Context that is Done at terminate // TerminateContext returns a context.Context that is Done at terminate
// Callers using this context should ensure that they are registered as a terminating server // Callers using this context should ensure that they are registered as a terminating server
// in order that they are waited for. // in order that they are waited for.
func (g *Manager) TerminateContext() context.Context { func (g *Manager) TerminateContext() context.Context {
return &ChannelContext{ return g.terminateCtx
done: g.IsTerminate(),
err: ErrTerminate,
}
} }

@ -54,8 +54,8 @@ func InitManager(ctx context.Context) {
}) })
} }
// CallbackWithContext is combined runnable and context to watch to see if the caller has finished // WithCallback is a runnable to call when the caller has finished
type CallbackWithContext func(ctx context.Context, callback func()) type WithCallback func(callback func())
// RunnableWithShutdownFns is a runnable with functions to run at shutdown and terminate // RunnableWithShutdownFns is a runnable with functions to run at shutdown and terminate
// After the callback to atShutdown is called and is complete, the main function must return. // After the callback to atShutdown is called and is complete, the main function must return.
@ -63,7 +63,7 @@ type CallbackWithContext func(ctx context.Context, callback func())
// Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals // Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals
// - users must therefore be careful to only call these as necessary. // - users must therefore be careful to only call these as necessary.
// If run is not expected to run indefinitely RunWithShutdownChan is likely to be more appropriate. // If run is not expected to run indefinitely RunWithShutdownChan is likely to be more appropriate.
type RunnableWithShutdownFns func(atShutdown, atTerminate func(context.Context, func())) type RunnableWithShutdownFns func(atShutdown, atTerminate func(func()))
// RunWithShutdownFns takes a function that has both atShutdown and atTerminate callbacks // RunWithShutdownFns takes a function that has both atShutdown and atTerminate callbacks
// After the callback to atShutdown is called and is complete, the main function must return. // After the callback to atShutdown is called and is complete, the main function must return.
@ -80,17 +80,21 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) {
g.doShutdown() g.doShutdown()
} }
}() }()
run(func(ctx context.Context, atShutdown func()) { run(func(atShutdown func()) {
go func() { g.lock.Lock()
select { defer g.lock.Unlock()
case <-g.IsShutdown(): g.toRunAtShutdown = append(g.toRunAtShutdown,
atShutdown() func() {
case <-ctx.Done(): defer func() {
return if err := recover(); err != nil {
log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2))
g.doShutdown()
} }
}() }()
}, func(ctx context.Context, atTerminate func()) { atShutdown()
g.RunAtTerminate(ctx, atTerminate) })
}, func(atTerminate func()) {
g.RunAtTerminate(atTerminate)
}) })
} }
@ -99,7 +103,7 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) {
// (Optionally IsHammer may be waited for instead however, this should be avoided if possible.) // (Optionally IsHammer may be waited for instead however, this should be avoided if possible.)
// The callback function provided to atTerminate must return once termination is complete. // The callback function provided to atTerminate must return once termination is complete.
// Please note that use of the atTerminate function will create a go-routine that will wait till terminate - users must therefore be careful to only call this as necessary. // Please note that use of the atTerminate function will create a go-routine that will wait till terminate - users must therefore be careful to only call this as necessary.
type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate CallbackWithContext) type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate WithCallback)
// RunWithShutdownChan takes a function that has channel to watch for shutdown and atTerminate callbacks // RunWithShutdownChan takes a function that has channel to watch for shutdown and atTerminate callbacks
// After the atShutdown channel is closed, the main function must return once shutdown is complete. // After the atShutdown channel is closed, the main function must return once shutdown is complete.
@ -115,8 +119,8 @@ func (g *Manager) RunWithShutdownChan(run RunnableWithShutdownChan) {
g.doShutdown() g.doShutdown()
} }
}() }()
run(g.IsShutdown(), func(ctx context.Context, atTerminate func()) { run(g.IsShutdown(), func(atTerminate func()) {
g.RunAtTerminate(ctx, atTerminate) g.RunAtTerminate(atTerminate)
}) })
} }
@ -136,60 +140,65 @@ func (g *Manager) RunWithShutdownContext(run func(context.Context)) {
} }
// RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination // RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination
func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) { func (g *Manager) RunAtTerminate(terminate func()) {
g.terminateWaitGroup.Add(1) g.terminateWaitGroup.Add(1)
go func() { g.lock.Lock()
defer g.lock.Unlock()
g.toRunAtTerminate = append(g.toRunAtTerminate,
func() {
defer g.terminateWaitGroup.Done() defer g.terminateWaitGroup.Done()
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2)) log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2))
} }
}() }()
select {
case <-g.IsTerminate():
terminate() terminate()
case <-ctx.Done(): })
}
}()
} }
// RunAtShutdown creates a go-routine to run the provided function at shutdown // RunAtShutdown creates a go-routine to run the provided function at shutdown
func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) { func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) {
go func() { g.lock.Lock()
defer g.lock.Unlock()
g.toRunAtShutdown = append(g.toRunAtShutdown,
func() {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2)) log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2))
} }
}() }()
select { select {
case <-g.IsShutdown():
shutdown()
case <-ctx.Done(): case <-ctx.Done():
return
default:
shutdown()
} }
}() })
} }
// RunAtHammer creates a go-routine to run the provided function at shutdown // RunAtHammer creates a go-routine to run the provided function at shutdown
func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) { func (g *Manager) RunAtHammer(hammer func()) {
go func() { g.lock.Lock()
defer g.lock.Unlock()
g.toRunAtHammer = append(g.toRunAtHammer,
func() {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2)) log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2))
} }
}() }()
select {
case <-g.IsHammer():
hammer() hammer()
case <-ctx.Done(): })
}
}()
} }
func (g *Manager) doShutdown() { func (g *Manager) doShutdown() {
if !g.setStateTransition(stateRunning, stateShuttingDown) { if !g.setStateTransition(stateRunning, stateShuttingDown) {
return return
} }
g.lock.Lock() g.lock.Lock()
close(g.shutdown) g.shutdownCtxCancel()
for _, fn := range g.toRunAtShutdown {
go fn()
}
g.lock.Unlock() g.lock.Unlock()
if setting.GracefulHammerTime >= 0 { if setting.GracefulHammerTime >= 0 {
@ -203,7 +212,7 @@ func (g *Manager) doShutdown() {
g.doTerminate() g.doTerminate()
g.WaitForTerminate() g.WaitForTerminate()
g.lock.Lock() g.lock.Lock()
close(g.done) g.doneCtxCancel()
g.lock.Unlock() g.lock.Unlock()
}() }()
} }
@ -212,10 +221,13 @@ func (g *Manager) doHammerTime(d time.Duration) {
time.Sleep(d) time.Sleep(d)
g.lock.Lock() g.lock.Lock()
select { select {
case <-g.hammer: case <-g.hammerCtx.Done():
default: default:
log.Warn("Setting Hammer condition") log.Warn("Setting Hammer condition")
close(g.hammer) g.hammerCtxCancel()
for _, fn := range g.toRunAtHammer {
go fn()
}
} }
g.lock.Unlock() g.lock.Unlock()
} }
@ -226,10 +238,13 @@ func (g *Manager) doTerminate() {
} }
g.lock.Lock() g.lock.Lock()
select { select {
case <-g.terminate: case <-g.terminateCtx.Done():
default: default:
log.Warn("Terminating") log.Warn("Terminating")
close(g.terminate) g.terminateCtxCancel()
for _, fn := range g.toRunAtTerminate {
go fn()
}
} }
g.lock.Unlock() g.lock.Unlock()
} }
@ -242,7 +257,7 @@ func (g *Manager) IsChild() bool {
// IsShutdown returns a channel which will be closed at shutdown. // IsShutdown returns a channel which will be closed at shutdown.
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
func (g *Manager) IsShutdown() <-chan struct{} { func (g *Manager) IsShutdown() <-chan struct{} {
return g.shutdown return g.shutdownCtx.Done()
} }
// IsHammer returns a channel which will be closed at hammer // IsHammer returns a channel which will be closed at hammer
@ -250,14 +265,14 @@ func (g *Manager) IsShutdown() <-chan struct{} {
// Servers running within the running server wait group should respond to IsHammer // Servers running within the running server wait group should respond to IsHammer
// if not shutdown already // if not shutdown already
func (g *Manager) IsHammer() <-chan struct{} { func (g *Manager) IsHammer() <-chan struct{} {
return g.hammer return g.hammerCtx.Done()
} }
// IsTerminate returns a channel which will be closed at terminate // IsTerminate returns a channel which will be closed at terminate
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
// IsTerminate will only close once all running servers have stopped // IsTerminate will only close once all running servers have stopped
func (g *Manager) IsTerminate() <-chan struct{} { func (g *Manager) IsTerminate() <-chan struct{} {
return g.terminate return g.terminateCtx.Done()
} }
// ServerDone declares a running server done and subtracts one from the // ServerDone declares a running server done and subtracts one from the
@ -314,25 +329,20 @@ func (g *Manager) InformCleanup() {
// Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating // Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating
func (g *Manager) Done() <-chan struct{} { func (g *Manager) Done() <-chan struct{} {
return g.done return g.doneCtx.Done()
} }
// Err allows the manager to be viewed as a context.Context done at Terminate, it returns ErrTerminate // Err allows the manager to be viewed as a context.Context done at Terminate
func (g *Manager) Err() error { func (g *Manager) Err() error {
select { return g.doneCtx.Err()
case <-g.Done():
return ErrTerminate
default:
return nil
}
} }
// Value allows the manager to be viewed as a context.Context done at Terminate, it has no values // Value allows the manager to be viewed as a context.Context done at Terminate
func (g *Manager) Value(key interface{}) interface{} { func (g *Manager) Value(key interface{}) interface{} {
return nil return g.doneCtx.Value(key)
} }
// Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context // Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context
func (g *Manager) Deadline() (deadline time.Time, ok bool) { func (g *Manager) Deadline() (deadline time.Time, ok bool) {
return return g.doneCtx.Deadline()
} }

@ -25,13 +25,21 @@ type Manager struct {
forked bool forked bool
lock *sync.RWMutex lock *sync.RWMutex
state state state state
shutdown chan struct{} shutdownCtx context.Context
hammer chan struct{} hammerCtx context.Context
terminate chan struct{} terminateCtx context.Context
done chan struct{} doneCtx context.Context
shutdownCtxCancel context.CancelFunc
hammerCtxCancel context.CancelFunc
terminateCtxCancel context.CancelFunc
doneCtxCancel context.CancelFunc
runningServerWaitGroup sync.WaitGroup runningServerWaitGroup sync.WaitGroup
createServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup
terminateWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup
toRunAtShutdown []func()
toRunAtHammer []func()
toRunAtTerminate []func()
} }
func newGracefulManager(ctx context.Context) *Manager { func newGracefulManager(ctx context.Context) *Manager {
@ -45,11 +53,11 @@ func newGracefulManager(ctx context.Context) *Manager {
} }
func (g *Manager) start(ctx context.Context) { func (g *Manager) start(ctx context.Context) {
// Make channels // Make contexts
g.terminate = make(chan struct{}) g.terminateCtx, g.terminateCtxCancel = context.WithCancel(ctx)
g.shutdown = make(chan struct{}) g.shutdownCtx, g.shutdownCtxCancel = context.WithCancel(ctx)
g.hammer = make(chan struct{}) g.hammerCtx, g.hammerCtxCancel = context.WithCancel(ctx)
g.done = make(chan struct{}) g.doneCtx, g.doneCtxCancel = context.WithCancel(ctx)
// Set the running state & handle signals // Set the running state & handle signals
g.setState(stateRunning) g.setState(stateRunning)

@ -36,14 +36,22 @@ type Manager struct {
isChild bool isChild bool
lock *sync.RWMutex lock *sync.RWMutex
state state state state
shutdown chan struct{} shutdownCtx context.Context
hammer chan struct{} hammerCtx context.Context
terminate chan struct{} terminateCtx context.Context
done chan struct{} doneCtx context.Context
shutdownCtxCancel context.CancelFunc
hammerCtxCancel context.CancelFunc
terminateCtxCancel context.CancelFunc
doneCtxCancel context.CancelFunc
runningServerWaitGroup sync.WaitGroup runningServerWaitGroup sync.WaitGroup
createServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup
terminateWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup
shutdownRequested chan struct{} shutdownRequested chan struct{}
toRunAtShutdown []func()
toRunAtHammer []func()
toRunAtTerminate []func()
} }
func newGracefulManager(ctx context.Context) *Manager { func newGracefulManager(ctx context.Context) *Manager {
@ -58,11 +66,13 @@ func newGracefulManager(ctx context.Context) *Manager {
} }
func (g *Manager) start() { func (g *Manager) start() {
// Make contexts
g.terminateCtx, g.terminateCtxCancel = context.WithCancel(g.ctx)
g.shutdownCtx, g.shutdownCtxCancel = context.WithCancel(g.ctx)
g.hammerCtx, g.hammerCtxCancel = context.WithCancel(g.ctx)
g.doneCtx, g.doneCtxCancel = context.WithCancel(g.ctx)
// Make channels // Make channels
g.terminate = make(chan struct{})
g.shutdown = make(chan struct{})
g.hammer = make(chan struct{})
g.done = make(chan struct{})
g.shutdownRequested = make(chan struct{}) g.shutdownRequested = make(chan struct{})
// Set the running state // Set the running state
@ -171,7 +181,7 @@ hammerLoop:
default: default:
log.Debug("Unexpected control request: %v", change.Cmd) log.Debug("Unexpected control request: %v", change.Cmd)
} }
case <-g.hammer: case <-g.hammerCtx.Done():
break hammerLoop break hammerLoop
} }
} }

@ -115,7 +115,13 @@ func Init() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
graceful.GetManager().RunAtTerminate(ctx, func() { graceful.GetManager().RunAtTerminate(func() {
select {
case <-ctx.Done():
return
default:
}
cancel()
log.Debug("Closing repository indexer") log.Debug("Closing repository indexer")
indexer.Close() indexer.Close()
log.Info("PID: %d Repository Indexer closed", os.Getpid()) log.Info("PID: %d Repository Indexer closed", os.Getpid())

@ -160,7 +160,7 @@ func InitIssueIndexer(syncReindex bool) {
} }
populate = !exist populate = !exist
holder.set(issueIndexer) holder.set(issueIndexer)
graceful.GetManager().RunAtTerminate(context.Background(), func() { graceful.GetManager().RunAtTerminate(func() {
log.Debug("Closing issue indexer") log.Debug("Closing issue indexer")
issueIndexer := holder.get() issueIndexer := holder.get()
if issueIndexer != nil { if issueIndexer != nil {
@ -170,7 +170,7 @@ func InitIssueIndexer(syncReindex bool) {
}) })
log.Debug("Created Bleve Indexer") log.Debug("Created Bleve Indexer")
case "elasticsearch": case "elasticsearch":
graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) { graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) {
issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName) issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
if err != nil { if err != nil {
log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)

@ -4,14 +4,16 @@
package queue package queue
import "context"
// ByteFIFO defines a FIFO that takes a byte array // ByteFIFO defines a FIFO that takes a byte array
type ByteFIFO interface { type ByteFIFO interface {
// Len returns the length of the fifo // Len returns the length of the fifo
Len() int64 Len(ctx context.Context) int64
// PushFunc pushes data to the end of the fifo and calls the callback if it is added // PushFunc pushes data to the end of the fifo and calls the callback if it is added
PushFunc(data []byte, fn func() error) error PushFunc(ctx context.Context, data []byte, fn func() error) error
// Pop pops data from the start of the fifo // Pop pops data from the start of the fifo
Pop() ([]byte, error) Pop(ctx context.Context) ([]byte, error)
// Close this fifo // Close this fifo
Close() error Close() error
} }
@ -20,7 +22,7 @@ type ByteFIFO interface {
type UniqueByteFIFO interface { type UniqueByteFIFO interface {
ByteFIFO ByteFIFO
// Has returns whether the fifo contains this data // Has returns whether the fifo contains this data
Has(data []byte) (bool, error) Has(ctx context.Context, data []byte) (bool, error)
} }
var _ ByteFIFO = &DummyByteFIFO{} var _ ByteFIFO = &DummyByteFIFO{}
@ -29,12 +31,12 @@ var _ ByteFIFO = &DummyByteFIFO{}
type DummyByteFIFO struct{} type DummyByteFIFO struct{}
// PushFunc returns nil // PushFunc returns nil
func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error { func (*DummyByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
return nil return nil
} }
// Pop returns nil // Pop returns nil
func (*DummyByteFIFO) Pop() ([]byte, error) { func (*DummyByteFIFO) Pop(ctx context.Context) ([]byte, error) {
return []byte{}, nil return []byte{}, nil
} }
@ -44,7 +46,7 @@ func (*DummyByteFIFO) Close() error {
} }
// Len is always 0 // Len is always 0
func (*DummyByteFIFO) Len() int64 { func (*DummyByteFIFO) Len(ctx context.Context) int64 {
return 0 return 0
} }
@ -56,6 +58,6 @@ type DummyUniqueByteFIFO struct {
} }
// Has always returns false // Has always returns false
func (*DummyUniqueByteFIFO) Has([]byte) (bool, error) { func (*DummyUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
return false, nil return false, nil
} }

@ -187,14 +187,14 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
if flushable, ok := mq.Managed.(Flushable); ok { if flushable, ok := mq.Managed.(Flushable); ok {
log.Debug("Flushing (flushable) queue: %s", mq.Name) log.Debug("Flushing (flushable) queue: %s", mq.Name)
go func(q *ManagedQueue) { go func(q *ManagedQueue) {
localCtx, localCancel := context.WithCancel(ctx) localCtx, localCtxCancel := context.WithCancel(ctx)
pid := q.RegisterWorkers(1, start, hasTimeout, end, localCancel, true) pid := q.RegisterWorkers(1, start, hasTimeout, end, localCtxCancel, true)
err := flushable.FlushWithContext(localCtx) err := flushable.FlushWithContext(localCtx)
if err != nil && err != ctx.Err() { if err != nil && err != ctx.Err() {
cancel() cancel()
} }
q.CancelWorkers(pid) q.CancelWorkers(pid)
localCancel() localCtxCancel()
wg.Done() wg.Done()
}(mq) }(mq)
} else { } else {

@ -57,7 +57,7 @@ type Named interface {
// Queues will handle their own contents in the Run method // Queues will handle their own contents in the Run method
type Queue interface { type Queue interface {
Flushable Flushable
Run(atShutdown, atTerminate func(context.Context, func())) Run(atShutdown, atTerminate func(func()))
Push(Data) error Push(Data) error
} }
@ -74,7 +74,7 @@ type DummyQueue struct {
} }
// Run does nothing // Run does nothing
func (*DummyQueue) Run(_, _ func(context.Context, func())) {} func (*DummyQueue) Run(_, _ func(func())) {}
// Push fakes a push of data to the queue // Push fakes a push of data to the queue
func (*DummyQueue) Push(Data) error { func (*DummyQueue) Push(Data) error {
@ -122,7 +122,7 @@ type Immediate struct {
} }
// Run does nothing // Run does nothing
func (*Immediate) Run(_, _ func(context.Context, func())) {} func (*Immediate) Run(_, _ func(func())) {}
// Push fakes a push of data to the queue // Push fakes a push of data to the queue
func (q *Immediate) Push(data Data) error { func (q *Immediate) Push(data Data) error {

@ -19,6 +19,7 @@ type ByteFIFOQueueConfiguration struct {
WorkerPoolConfiguration WorkerPoolConfiguration
Workers int Workers int
Name string Name string
WaitOnEmpty bool
} }
var _ Queue = &ByteFIFOQueue{} var _ Queue = &ByteFIFOQueue{}
@ -28,12 +29,16 @@ type ByteFIFOQueue struct {
*WorkerPool *WorkerPool
byteFIFO ByteFIFO byteFIFO ByteFIFO
typ Type typ Type
closed chan struct{} shutdownCtx context.Context
terminated chan struct{} shutdownCtxCancel context.CancelFunc
terminateCtx context.Context
terminateCtxCancel context.CancelFunc
exemplar interface{} exemplar interface{}
workers int workers int
name string name string
lock sync.Mutex lock sync.Mutex
waitOnEmpty bool
pushed chan struct{}
} }
// NewByteFIFOQueue creates a new ByteFIFOQueue // NewByteFIFOQueue creates a new ByteFIFOQueue
@ -44,15 +49,22 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem
} }
config := configInterface.(ByteFIFOQueueConfiguration) config := configInterface.(ByteFIFOQueueConfiguration)
terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
return &ByteFIFOQueue{ return &ByteFIFOQueue{
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
byteFIFO: byteFIFO, byteFIFO: byteFIFO,
typ: typ, typ: typ,
closed: make(chan struct{}), shutdownCtx: shutdownCtx,
terminated: make(chan struct{}), shutdownCtxCancel: shutdownCtxCancel,
terminateCtx: terminateCtx,
terminateCtxCancel: terminateCtxCancel,
exemplar: exemplar, exemplar: exemplar,
workers: config.Workers, workers: config.Workers,
name: config.Name, name: config.Name,
waitOnEmpty: config.WaitOnEmpty,
pushed: make(chan struct{}, 1),
}, nil }, nil
} }
@ -76,7 +88,15 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
if err != nil { if err != nil {
return err return err
} }
return q.byteFIFO.PushFunc(bs, fn) if q.waitOnEmpty {
defer func() {
select {
case q.pushed <- struct{}{}:
default:
}
}()
}
return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn)
} }
// IsEmpty checks if the queue is empty // IsEmpty checks if the queue is empty
@ -86,135 +106,160 @@ func (q *ByteFIFOQueue) IsEmpty() bool {
if !q.WorkerPool.IsEmpty() { if !q.WorkerPool.IsEmpty() {
return false return false
} }
return q.byteFIFO.Len() == 0 return q.byteFIFO.Len(q.terminateCtx) == 0
} }
// Run runs the bytefifo queue // Run runs the bytefifo queue
func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) { func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) {
atShutdown(context.Background(), q.Shutdown) atShutdown(q.Shutdown)
atTerminate(context.Background(), q.Terminate) atTerminate(q.Terminate)
log.Debug("%s: %s Starting", q.typ, q.name) log.Debug("%s: %s Starting", q.typ, q.name)
go func() {
_ = q.AddWorkers(q.workers, 0) _ = q.AddWorkers(q.workers, 0)
}()
go q.readToChan() log.Trace("%s: %s Now running", q.typ, q.name)
q.readToChan()
log.Trace("%s: %s Waiting til closed", q.typ, q.name) <-q.shutdownCtx.Done()
<-q.closed
log.Trace("%s: %s Waiting til done", q.typ, q.name) log.Trace("%s: %s Waiting til done", q.typ, q.name)
q.Wait() q.Wait()
log.Trace("%s: %s Waiting til cleaned", q.typ, q.name) log.Trace("%s: %s Waiting til cleaned", q.typ, q.name)
ctx, cancel := context.WithCancel(context.Background()) q.CleanUp(q.terminateCtx)
atTerminate(ctx, cancel) q.terminateCtxCancel()
q.CleanUp(ctx)
cancel()
} }
const maxBackOffTime = time.Second * 3
func (q *ByteFIFOQueue) readToChan() { func (q *ByteFIFOQueue) readToChan() {
// handle quick cancels // handle quick cancels
select { select {
case <-q.closed: case <-q.shutdownCtx.Done():
// tell the pool to shutdown. // tell the pool to shutdown.
q.cancel() q.baseCtxCancel()
return return
default: default:
} }
// Default backoff values
backOffTime := time.Millisecond * 100 backOffTime := time.Millisecond * 100
maxBackOffTime := time.Second * 3
loop:
for { for {
success, resetBackoff := q.doPop() err := q.doPop()
if resetBackoff { if err == errQueueEmpty {
log.Trace("%s: %s Waiting on Empty", q.typ, q.name)
select {
case <-q.pushed:
// reset backOffTime
backOffTime = 100 * time.Millisecond backOffTime = 100 * time.Millisecond
continue loop
case <-q.shutdownCtx.Done():
// Oops we've been shutdown whilst waiting
// Make sure the worker pool is shutdown too
q.baseCtxCancel()
return
}
} }
if success { // Reset the backOffTime if there is no error or an unmarshalError
select { if err == nil || err == errUnmarshal {
case <-q.closed: backOffTime = 100 * time.Millisecond
// tell the pool to shutdown.
q.cancel()
return
default:
} }
} else {
if err != nil {
// Need to Backoff
select { select {
case <-q.closed: case <-q.shutdownCtx.Done():
// tell the pool to shutdown. // Oops we've been shutdown whilst backing off
q.cancel() // Make sure the worker pool is shutdown too
q.baseCtxCancel()
return return
case <-time.After(backOffTime): case <-time.After(backOffTime):
} // OK we've waited - so backoff a bit
backOffTime += backOffTime / 2 backOffTime += backOffTime / 2
if backOffTime > maxBackOffTime { if backOffTime > maxBackOffTime {
backOffTime = maxBackOffTime backOffTime = maxBackOffTime
} }
continue loop
}
}
select {
case <-q.shutdownCtx.Done():
// Oops we've been shutdown
// Make sure the worker pool is shutdown too
q.baseCtxCancel()
return
default:
continue loop
} }
} }
} }
func (q *ByteFIFOQueue) doPop() (success, resetBackoff bool) { var errQueueEmpty = fmt.Errorf("empty queue")
var errEmptyBytes = fmt.Errorf("empty bytes")
var errUnmarshal = fmt.Errorf("failed to unmarshal")
func (q *ByteFIFOQueue) doPop() error {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
bs, err := q.byteFIFO.Pop() bs, err := q.byteFIFO.Pop(q.shutdownCtx)
if err != nil { if err != nil {
if err == context.Canceled {
q.baseCtxCancel()
return err
}
log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
return return err
} }
if len(bs) == 0 { if len(bs) == 0 {
return if q.waitOnEmpty && q.byteFIFO.Len(q.shutdownCtx) == 0 {
return errQueueEmpty
}
return errEmptyBytes
} }
resetBackoff = true
data, err := unmarshalAs(bs, q.exemplar) data, err := unmarshalAs(bs, q.exemplar)
if err != nil { if err != nil {
log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err) log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
return return errUnmarshal
} }
log.Trace("%s %s: Task found: %#v", q.typ, q.name, data) log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
q.WorkerPool.Push(data) q.WorkerPool.Push(data)
success = true return nil
return
} }
// Shutdown processing from this queue // Shutdown processing from this queue
func (q *ByteFIFOQueue) Shutdown() { func (q *ByteFIFOQueue) Shutdown() {
log.Trace("%s: %s Shutting down", q.typ, q.name) log.Trace("%s: %s Shutting down", q.typ, q.name)
q.lock.Lock()
select { select {
case <-q.closed: case <-q.shutdownCtx.Done():
return
default: default:
close(q.closed)
} }
q.lock.Unlock() q.shutdownCtxCancel()
log.Debug("%s: %s Shutdown", q.typ, q.name) log.Debug("%s: %s Shutdown", q.typ, q.name)
} }
// IsShutdown returns a channel which is closed when this Queue is shutdown // IsShutdown returns a channel which is closed when this Queue is shutdown
func (q *ByteFIFOQueue) IsShutdown() <-chan struct{} { func (q *ByteFIFOQueue) IsShutdown() <-chan struct{} {
return q.closed return q.shutdownCtx.Done()
} }
// Terminate this queue and close the queue // Terminate this queue and close the queue
func (q *ByteFIFOQueue) Terminate() { func (q *ByteFIFOQueue) Terminate() {
log.Trace("%s: %s Terminating", q.typ, q.name) log.Trace("%s: %s Terminating", q.typ, q.name)
q.Shutdown() q.Shutdown()
q.lock.Lock()
select { select {
case <-q.terminated: case <-q.terminateCtx.Done():
q.lock.Unlock()
return return
default: default:
} }
close(q.terminated)
q.lock.Unlock()
if log.IsDebug() { if log.IsDebug() {
log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len()) log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len(q.terminateCtx))
} }
q.terminateCtxCancel()
if err := q.byteFIFO.Close(); err != nil { if err := q.byteFIFO.Close(); err != nil {
log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err) log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err)
} }
@ -223,7 +268,7 @@ func (q *ByteFIFOQueue) Terminate() {
// IsTerminated returns a channel which is closed when this Queue is terminated // IsTerminated returns a channel which is closed when this Queue is terminated
func (q *ByteFIFOQueue) IsTerminated() <-chan struct{} { func (q *ByteFIFOQueue) IsTerminated() <-chan struct{} {
return q.terminated return q.terminateCtx.Done()
} }
var _ UniqueQueue = &ByteFIFOUniqueQueue{} var _ UniqueQueue = &ByteFIFOUniqueQueue{}
@ -240,14 +285,18 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun
return nil, err return nil, err
} }
config := configInterface.(ByteFIFOQueueConfiguration) config := configInterface.(ByteFIFOQueueConfiguration)
terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
return &ByteFIFOUniqueQueue{ return &ByteFIFOUniqueQueue{
ByteFIFOQueue: ByteFIFOQueue{ ByteFIFOQueue: ByteFIFOQueue{
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
byteFIFO: byteFIFO, byteFIFO: byteFIFO,
typ: typ, typ: typ,
closed: make(chan struct{}), shutdownCtx: shutdownCtx,
terminated: make(chan struct{}), shutdownCtxCancel: shutdownCtxCancel,
terminateCtx: terminateCtx,
terminateCtxCancel: terminateCtxCancel,
exemplar: exemplar, exemplar: exemplar,
workers: config.Workers, workers: config.Workers,
name: config.Name, name: config.Name,
@ -265,5 +314,5 @@ func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
if err != nil { if err != nil {
return false, err return false, err
} }
return q.byteFIFO.(UniqueByteFIFO).Has(bs) return q.byteFIFO.(UniqueByteFIFO).Has(q.terminateCtx, bs)
} }

@ -27,6 +27,10 @@ type ChannelQueueConfiguration struct {
// It is basically a very thin wrapper around a WorkerPool // It is basically a very thin wrapper around a WorkerPool
type ChannelQueue struct { type ChannelQueue struct {
*WorkerPool *WorkerPool
shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
terminateCtx context.Context
terminateCtxCancel context.CancelFunc
exemplar interface{} exemplar interface{}
workers int workers int
name string name string
@ -42,8 +46,16 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
if config.BatchLength == 0 { if config.BatchLength == 0 {
config.BatchLength = 1 config.BatchLength = 1
} }
terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
queue := &ChannelQueue{ queue := &ChannelQueue{
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,
terminateCtx: terminateCtx,
terminateCtxCancel: terminateCtxCancel,
exemplar: exemplar, exemplar: exemplar,
workers: config.Workers, workers: config.Workers,
name: config.Name, name: config.Name,
@ -53,17 +65,11 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
} }
// Run starts to run the queue // Run starts to run the queue
func (q *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) {
atShutdown(context.Background(), func() { atShutdown(q.Shutdown)
log.Warn("ChannelQueue: %s is not shutdownable!", q.name) atTerminate(q.Terminate)
})
atTerminate(context.Background(), func() {
log.Warn("ChannelQueue: %s is not terminatable!", q.name)
})
log.Debug("ChannelQueue: %s Starting", q.name) log.Debug("ChannelQueue: %s Starting", q.name)
go func() {
_ = q.AddWorkers(q.workers, 0) _ = q.AddWorkers(q.workers, 0)
}()
} }
// Push will push data into the queue // Push will push data into the queue
@ -75,6 +81,42 @@ func (q *ChannelQueue) Push(data Data) error {
return nil return nil
} }
// Shutdown processing from this queue
func (q *ChannelQueue) Shutdown() {
q.lock.Lock()
defer q.lock.Unlock()
select {
case <-q.shutdownCtx.Done():
log.Trace("ChannelQueue: %s Already Shutting down", q.name)
return
default:
}
log.Trace("ChannelQueue: %s Shutting down", q.name)
go func() {
log.Trace("ChannelQueue: %s Flushing", q.name)
if err := q.FlushWithContext(q.terminateCtx); err != nil {
log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
return
}
log.Debug("ChannelQueue: %s Flushed", q.name)
}()
q.shutdownCtxCancel()
log.Debug("ChannelQueue: %s Shutdown", q.name)
}
// Terminate this queue and close the queue
func (q *ChannelQueue) Terminate() {
log.Trace("ChannelQueue: %s Terminating", q.name)
q.Shutdown()
select {
case <-q.terminateCtx.Done():
return
default:
}
q.terminateCtxCancel()
log.Debug("ChannelQueue: %s Terminated", q.name)
}
// Name returns the name of this queue // Name returns the name of this queue
func (q *ChannelQueue) Name() string { func (q *ChannelQueue) Name() string {
return q.name return q.name

@ -5,7 +5,6 @@
package queue package queue
import ( import (
"context"
"testing" "testing"
"time" "time"
@ -21,7 +20,7 @@ func TestChannelQueue(t *testing.T) {
} }
} }
nilFn := func(_ context.Context, _ func()) {} nilFn := func(_ func()) {}
queue, err := NewChannelQueue(handle, queue, err := NewChannelQueue(handle,
ChannelQueueConfiguration{ ChannelQueueConfiguration{
@ -61,7 +60,7 @@ func TestChannelQueue_Batch(t *testing.T) {
} }
} }
nilFn := func(_ context.Context, _ func()) {} nilFn := func(_ func()) {}
queue, err := NewChannelQueue(handle, queue, err := NewChannelQueue(handle,
ChannelQueueConfiguration{ ChannelQueueConfiguration{

@ -5,6 +5,8 @@
package queue package queue
import ( import (
"context"
"code.gitea.io/gitea/modules/nosql" "code.gitea.io/gitea/modules/nosql"
"gitea.com/lunny/levelqueue" "gitea.com/lunny/levelqueue"
@ -37,6 +39,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
if len(config.ConnectionString) == 0 { if len(config.ConnectionString) == 0 {
config.ConnectionString = config.DataDir config.ConnectionString = config.DataDir
} }
config.WaitOnEmpty = true
byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName) byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName)
if err != nil { if err != nil {
@ -82,7 +85,7 @@ func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, erro
} }
// PushFunc will push data into the fifo // PushFunc will push data into the fifo
func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error { func (fifo *LevelQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
if fn != nil { if fn != nil {
if err := fn(); err != nil { if err := fn(); err != nil {
return err return err
@ -92,7 +95,7 @@ func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
} }
// Pop pops data from the start of the fifo // Pop pops data from the start of the fifo
func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) { func (fifo *LevelQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
data, err := fifo.internal.RPop() data, err := fifo.internal.RPop()
if err != nil && err != levelqueue.ErrNotFound { if err != nil && err != levelqueue.ErrNotFound {
return nil, err return nil, err
@ -108,7 +111,7 @@ func (fifo *LevelQueueByteFIFO) Close() error {
} }
// Len returns the length of the fifo // Len returns the length of the fifo
func (fifo *LevelQueueByteFIFO) Len() int64 { func (fifo *LevelQueueByteFIFO) Len(ctx context.Context) int64 {
return fifo.internal.Len() return fifo.internal.Len()
} }

@ -133,8 +133,9 @@ func (q *PersistableChannelQueue) Push(data Data) error {
} }
// Run starts to run the queue // Run starts to run the queue
func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name) log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name)
_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
q.lock.Lock() q.lock.Lock()
if q.internal == nil { if q.internal == nil {
@ -147,34 +148,32 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte
} else { } else {
q.lock.Unlock() q.lock.Unlock()
} }
atShutdown(context.Background(), q.Shutdown) atShutdown(q.Shutdown)
atTerminate(context.Background(), q.Terminate) atTerminate(q.Terminate)
// Just run the level queue - we shut it down later
go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
go func() {
_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
}()
log.Trace("PersistableChannelQueue: %s Waiting til closed", q.delayedStarter.name) if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 {
<-q.closed // Just run the level queue - we shut it down once it's flushed
log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name) go q.internal.Run(func(_ func()) {}, func(_ func()) {})
q.channelQueue.cancel()
q.internal.(*LevelQueue).cancel()
log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name)
q.channelQueue.Wait()
q.internal.(*LevelQueue).Wait()
// Redirect all remaining data in the chan to the internal channel
go func() { go func() {
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) for !q.IsEmpty() {
for data := range q.channelQueue.dataChan { _ = q.internal.Flush(0)
_ = q.internal.Push(data) select {
atomic.AddInt64(&q.channelQueue.numInQueue, -1) case <-time.After(100 * time.Millisecond):
case <-q.internal.(*LevelQueue).shutdownCtx.Done():
log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
return
} }
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) }
log.Debug("LevelQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name())
q.internal.(*LevelQueue).Shutdown()
GetManager().Remove(q.internal.(*LevelQueue).qid)
}() }()
log.Trace("PersistableChannelQueue: %s Done main loop", q.delayedStarter.name) } else {
log.Debug("PersistableChannelQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
q.internal.(*LevelQueue).Shutdown()
GetManager().Remove(q.internal.(*LevelQueue).qid)
}
} }
// Flush flushes the queue and blocks till the queue is empty // Flush flushes the queue and blocks till the queue is empty
@ -232,16 +231,37 @@ func (q *PersistableChannelQueue) IsEmpty() bool {
func (q *PersistableChannelQueue) Shutdown() { func (q *PersistableChannelQueue) Shutdown() {
log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name) log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name)
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock()
select { select {
case <-q.closed: case <-q.closed:
q.lock.Unlock()
return
default: default:
}
q.channelQueue.Shutdown()
if q.internal != nil { if q.internal != nil {
q.internal.(*LevelQueue).Shutdown() q.internal.(*LevelQueue).Shutdown()
} }
close(q.closed) close(q.closed)
log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) q.lock.Unlock()
log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name)
q.channelQueue.baseCtxCancel()
q.internal.(*LevelQueue).baseCtxCancel()
log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name)
q.channelQueue.Wait()
q.internal.(*LevelQueue).Wait()
// Redirect all remaining data in the chan to the internal channel
go func() {
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data)
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
} }
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
}()
log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
} }
// Terminate this queue and close the queue // Terminate this queue and close the queue
@ -250,6 +270,7 @@ func (q *PersistableChannelQueue) Terminate() {
q.Shutdown() q.Shutdown()
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
q.channelQueue.Terminate()
if q.internal != nil { if q.internal != nil {
q.internal.(*LevelQueue).Terminate() q.internal.(*LevelQueue).Terminate()
} }

@ -5,10 +5,8 @@
package queue package queue
import ( import (
"context"
"io/ioutil" "io/ioutil"
"testing" "testing"
"time"
"code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/util"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -36,13 +34,15 @@ func TestPersistableChannelQueue(t *testing.T) {
BatchLength: 2, BatchLength: 2,
QueueLength: 20, QueueLength: 20,
Workers: 1, Workers: 1,
BoostWorkers: 0,
MaxWorkers: 10, MaxWorkers: 10,
Name: "first",
}, &testData{}) }, &testData{})
assert.NoError(t, err) assert.NoError(t, err)
go queue.Run(func(_ context.Context, shutdown func()) { go queue.Run(func(shutdown func()) {
queueShutdown = append(queueShutdown, shutdown) queueShutdown = append(queueShutdown, shutdown)
}, func(_ context.Context, terminate func()) { }, func(terminate func()) {
queueTerminate = append(queueTerminate, terminate) queueTerminate = append(queueTerminate, terminate)
}) })
@ -64,13 +64,18 @@ func TestPersistableChannelQueue(t *testing.T) {
assert.Equal(t, test2.TestString, result2.TestString) assert.Equal(t, test2.TestString, result2.TestString)
assert.Equal(t, test2.TestInt, result2.TestInt) assert.Equal(t, test2.TestInt, result2.TestInt)
// test1 is a testData not a *testData so will be rejected
err = queue.Push(test1) err = queue.Push(test1)
assert.Error(t, err) assert.Error(t, err)
// Now shutdown the queue
for _, callback := range queueShutdown { for _, callback := range queueShutdown {
callback() callback()
} }
time.Sleep(200 * time.Millisecond)
// Wait til it is closed
<-queue.(*PersistableChannelQueue).closed
err = queue.Push(&test1) err = queue.Push(&test1)
assert.NoError(t, err) assert.NoError(t, err)
err = queue.Push(&test2) err = queue.Push(&test2)
@ -80,23 +85,33 @@ func TestPersistableChannelQueue(t *testing.T) {
assert.Fail(t, "Handler processing should have stopped") assert.Fail(t, "Handler processing should have stopped")
default: default:
} }
// terminate the queue
for _, callback := range queueTerminate { for _, callback := range queueTerminate {
callback() callback()
} }
select {
case <-handleChan:
assert.Fail(t, "Handler processing should have stopped")
default:
}
// Reopen queue // Reopen queue
queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
DataDir: tmpDir, DataDir: tmpDir,
BatchLength: 2, BatchLength: 2,
QueueLength: 20, QueueLength: 20,
Workers: 1, Workers: 1,
BoostWorkers: 0,
MaxWorkers: 10, MaxWorkers: 10,
Name: "second",
}, &testData{}) }, &testData{})
assert.NoError(t, err) assert.NoError(t, err)
go queue.Run(func(_ context.Context, shutdown func()) { go queue.Run(func(shutdown func()) {
queueShutdown = append(queueShutdown, shutdown) queueShutdown = append(queueShutdown, shutdown)
}, func(_ context.Context, terminate func()) { }, func(terminate func()) {
queueTerminate = append(queueTerminate, terminate) queueTerminate = append(queueTerminate, terminate)
}) })

@ -5,7 +5,6 @@
package queue package queue
import ( import (
"context"
"io/ioutil" "io/ioutil"
"sync" "sync"
"testing" "testing"
@ -49,11 +48,11 @@ func TestLevelQueue(t *testing.T) {
}, &testData{}) }, &testData{})
assert.NoError(t, err) assert.NoError(t, err)
go queue.Run(func(_ context.Context, shutdown func()) { go queue.Run(func(shutdown func()) {
lock.Lock() lock.Lock()
queueShutdown = append(queueShutdown, shutdown) queueShutdown = append(queueShutdown, shutdown)
lock.Unlock() lock.Unlock()
}, func(_ context.Context, terminate func()) { }, func(terminate func()) {
lock.Lock() lock.Lock()
queueTerminate = append(queueTerminate, terminate) queueTerminate = append(queueTerminate, terminate)
lock.Unlock() lock.Unlock()
@ -123,11 +122,11 @@ func TestLevelQueue(t *testing.T) {
}, &testData{}) }, &testData{})
assert.NoError(t, err) assert.NoError(t, err)
go queue.Run(func(_ context.Context, shutdown func()) { go queue.Run(func(shutdown func()) {
lock.Lock() lock.Lock()
queueShutdown = append(queueShutdown, shutdown) queueShutdown = append(queueShutdown, shutdown)
lock.Unlock() lock.Unlock()
}, func(_ context.Context, terminate func()) { }, func(terminate func()) {
lock.Lock() lock.Lock()
queueTerminate = append(queueTerminate, terminate) queueTerminate = append(queueTerminate, terminate)
lock.Unlock() lock.Unlock()

@ -6,7 +6,6 @@ package queue
import ( import (
"context" "context"
"fmt"
"code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
@ -47,8 +46,6 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
return nil, err return nil, err
} }
byteFIFO.ctx = graceful.NewChannelContext(byteFIFOQueue.IsTerminated(), fmt.Errorf("queue has been terminated"))
queue := &RedisQueue{ queue := &RedisQueue{
ByteFIFOQueue: byteFIFOQueue, ByteFIFOQueue: byteFIFOQueue,
} }
@ -73,8 +70,8 @@ var _ ByteFIFO = &RedisByteFIFO{}
// RedisByteFIFO represents a ByteFIFO formed from a redisClient // RedisByteFIFO represents a ByteFIFO formed from a redisClient
type RedisByteFIFO struct { type RedisByteFIFO struct {
ctx context.Context
client redisClient client redisClient
queueName string queueName string
} }
@ -89,7 +86,6 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error)
fifo := &RedisByteFIFO{ fifo := &RedisByteFIFO{
queueName: config.QueueName, queueName: config.QueueName,
} }
fifo.ctx = graceful.GetManager().TerminateContext()
fifo.client = nosql.GetManager().GetRedisClient(config.ConnectionString) fifo.client = nosql.GetManager().GetRedisClient(config.ConnectionString)
if err := fifo.client.Ping(graceful.GetManager().ShutdownContext()).Err(); err != nil { if err := fifo.client.Ping(graceful.GetManager().ShutdownContext()).Err(); err != nil {
return nil, err return nil, err
@ -98,18 +94,18 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error)
} }
// PushFunc pushes data to the end of the fifo and calls the callback if it is added // PushFunc pushes data to the end of the fifo and calls the callback if it is added
func (fifo *RedisByteFIFO) PushFunc(data []byte, fn func() error) error { func (fifo *RedisByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
if fn != nil { if fn != nil {
if err := fn(); err != nil { if err := fn(); err != nil {
return err return err
} }
} }
return fifo.client.RPush(fifo.ctx, fifo.queueName, data).Err() return fifo.client.RPush(ctx, fifo.queueName, data).Err()
} }
// Pop pops data from the start of the fifo // Pop pops data from the start of the fifo
func (fifo *RedisByteFIFO) Pop() ([]byte, error) { func (fifo *RedisByteFIFO) Pop(ctx context.Context) ([]byte, error) {
data, err := fifo.client.LPop(fifo.ctx, fifo.queueName).Bytes() data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes()
if err == nil || err == redis.Nil { if err == nil || err == redis.Nil {
return data, nil return data, nil
} }
@ -122,8 +118,8 @@ func (fifo *RedisByteFIFO) Close() error {
} }
// Len returns the length of the fifo // Len returns the length of the fifo
func (fifo *RedisByteFIFO) Len() int64 { func (fifo *RedisByteFIFO) Len(ctx context.Context) int64 {
val, err := fifo.client.LLen(fifo.ctx, fifo.queueName).Result() val, err := fifo.client.LLen(ctx, fifo.queueName).Result()
if err != nil { if err != nil {
log.Error("Error whilst getting length of redis queue %s: Error: %v", fifo.queueName, err) log.Error("Error whilst getting length of redis queue %s: Error: %v", fifo.queueName, err)
return -1 return -1

@ -38,7 +38,7 @@ type delayedStarter struct {
} }
// setInternal must be called with the lock locked. // setInternal must be called with the lock locked.
func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) error { func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc, exemplar interface{}) error {
var ctx context.Context var ctx context.Context
var cancel context.CancelFunc var cancel context.CancelFunc
if q.timeout > 0 { if q.timeout > 0 {
@ -49,9 +49,7 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h
defer cancel() defer cancel()
// Ensure we also stop at shutdown // Ensure we also stop at shutdown
atShutdown(ctx, func() { atShutdown(cancel)
cancel()
})
i := 1 i := 1
for q.internal == nil { for q.internal == nil {
@ -221,7 +219,7 @@ func (q *WrappedQueue) IsEmpty() bool {
} }
// Run starts to run the queue and attempts to create the internal queue // Run starts to run the queue and attempts to create the internal queue
func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) { func (q *WrappedQueue) Run(atShutdown, atTerminate func(func())) {
log.Debug("WrappedQueue: %s Starting", q.name) log.Debug("WrappedQueue: %s Starting", q.name)
q.lock.Lock() q.lock.Lock()
if q.internal == nil { if q.internal == nil {

@ -30,6 +30,10 @@ type ChannelUniqueQueue struct {
*WorkerPool *WorkerPool
lock sync.Mutex lock sync.Mutex
table map[Data]bool table map[Data]bool
shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
terminateCtx context.Context
terminateCtxCancel context.CancelFunc
exemplar interface{} exemplar interface{}
workers int workers int
name string name string
@ -45,8 +49,16 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
if config.BatchLength == 0 { if config.BatchLength == 0 {
config.BatchLength = 1 config.BatchLength = 1
} }
terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
queue := &ChannelUniqueQueue{ queue := &ChannelUniqueQueue{
table: map[Data]bool{}, table: map[Data]bool{},
shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,
terminateCtx: terminateCtx,
terminateCtxCancel: terminateCtxCancel,
exemplar: exemplar, exemplar: exemplar,
workers: config.Workers, workers: config.Workers,
name: config.Name, name: config.Name,
@ -65,17 +77,11 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
} }
// Run starts to run the queue // Run starts to run the queue
func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) { func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
atShutdown(context.Background(), func() { atShutdown(q.Shutdown)
log.Warn("ChannelUniqueQueue: %s is not shutdownable!", q.name) atTerminate(q.Terminate)
})
atTerminate(context.Background(), func() {
log.Warn("ChannelUniqueQueue: %s is not terminatable!", q.name)
})
log.Debug("ChannelUniqueQueue: %s Starting", q.name) log.Debug("ChannelUniqueQueue: %s Starting", q.name)
go func() {
_ = q.AddWorkers(q.workers, 0) _ = q.AddWorkers(q.workers, 0)
}()
} }
// Push will push data into the queue if the data is not already in the queue // Push will push data into the queue if the data is not already in the queue
@ -122,6 +128,39 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
return has, nil return has, nil
} }
// Shutdown processing from this queue
func (q *ChannelUniqueQueue) Shutdown() {
log.Trace("ChannelUniqueQueue: %s Shutting down", q.name)
select {
case <-q.shutdownCtx.Done():
return
default:
}
go func() {
log.Trace("ChannelUniqueQueue: %s Flushing", q.name)
if err := q.FlushWithContext(q.terminateCtx); err != nil {
log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
return
}
log.Debug("ChannelUniqueQueue: %s Flushed", q.name)
}()
q.shutdownCtxCancel()
log.Debug("ChannelUniqueQueue: %s Shutdown", q.name)
}
// Terminate this queue and close the queue
func (q *ChannelUniqueQueue) Terminate() {
log.Trace("ChannelUniqueQueue: %s Terminating", q.name)
q.Shutdown()
select {
case <-q.terminateCtx.Done():
return
default:
}
q.terminateCtxCancel()
log.Debug("ChannelUniqueQueue: %s Terminated", q.name)
}
// Name returns the name of this queue // Name returns the name of this queue
func (q *ChannelUniqueQueue) Name() string { func (q *ChannelUniqueQueue) Name() string {
return q.name return q.name

@ -5,6 +5,8 @@
package queue package queue
import ( import (
"context"
"code.gitea.io/gitea/modules/nosql" "code.gitea.io/gitea/modules/nosql"
"gitea.com/lunny/levelqueue" "gitea.com/lunny/levelqueue"
@ -41,6 +43,7 @@ func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue,
if len(config.ConnectionString) == 0 { if len(config.ConnectionString) == 0 {
config.ConnectionString = config.DataDir config.ConnectionString = config.DataDir
} }
config.WaitOnEmpty = true
byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.ConnectionString, config.QueueName) byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.ConnectionString, config.QueueName)
if err != nil { if err != nil {
@ -86,12 +89,12 @@ func NewLevelUniqueQueueByteFIFO(connection, prefix string) (*LevelUniqueQueueBy
} }
// PushFunc pushes data to the end of the fifo and calls the callback if it is added // PushFunc pushes data to the end of the fifo and calls the callback if it is added
func (fifo *LevelUniqueQueueByteFIFO) PushFunc(data []byte, fn func() error) error { func (fifo *LevelUniqueQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
return fifo.internal.LPushFunc(data, fn) return fifo.internal.LPushFunc(data, fn)
} }
// Pop pops data from the start of the fifo // Pop pops data from the start of the fifo
func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) { func (fifo *LevelUniqueQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
data, err := fifo.internal.RPop() data, err := fifo.internal.RPop()
if err != nil && err != levelqueue.ErrNotFound { if err != nil && err != levelqueue.ErrNotFound {
return nil, err return nil, err
@ -100,12 +103,12 @@ func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) {
} }
// Len returns the length of the fifo // Len returns the length of the fifo
func (fifo *LevelUniqueQueueByteFIFO) Len() int64 { func (fifo *LevelUniqueQueueByteFIFO) Len(ctx context.Context) int64 {
return fifo.internal.Len() return fifo.internal.Len()
} }
// Has returns whether the fifo contains this data // Has returns whether the fifo contains this data
func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) { func (fifo *LevelUniqueQueueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
return fifo.internal.Has(data) return fifo.internal.Has(data)
} }

@ -36,7 +36,7 @@ type PersistableChannelUniqueQueueConfiguration struct {
// task cannot be processed twice or more at the same time. Uniqueness is // task cannot be processed twice or more at the same time. Uniqueness is
// only guaranteed whilst the task is waiting in the queue. // only guaranteed whilst the task is waiting in the queue.
type PersistableChannelUniqueQueue struct { type PersistableChannelUniqueQueue struct {
*ChannelUniqueQueue channelQueue *ChannelUniqueQueue
delayedStarter delayedStarter
lock sync.Mutex lock sync.Mutex
closed chan struct{} closed chan struct{}
@ -85,7 +85,7 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
} }
queue := &PersistableChannelUniqueQueue{ queue := &PersistableChannelUniqueQueue{
ChannelUniqueQueue: channelUniqueQueue.(*ChannelUniqueQueue), channelQueue: channelUniqueQueue.(*ChannelUniqueQueue),
closed: make(chan struct{}), closed: make(chan struct{}),
} }
@ -138,14 +138,14 @@ func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) err
case <-q.closed: case <-q.closed:
return q.internal.(UniqueQueue).PushFunc(data, fn) return q.internal.(UniqueQueue).PushFunc(data, fn)
default: default:
return q.ChannelUniqueQueue.PushFunc(data, fn) return q.channelQueue.PushFunc(data, fn)
} }
} }
// Has will test if the queue has the data // Has will test if the queue has the data
func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) { func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
// This is more difficult... // This is more difficult...
has, err := q.ChannelUniqueQueue.Has(data) has, err := q.channelQueue.Has(data)
if err != nil || has { if err != nil || has {
return has, err return has, err
} }
@ -158,7 +158,7 @@ func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
} }
// Run starts to run the queue // Run starts to run the queue
func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) { func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name) log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name)
q.lock.Lock() q.lock.Lock()
@ -170,7 +170,7 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context
log.Error("Unable push to channelled queue: %v", err) log.Error("Unable push to channelled queue: %v", err)
} }
} }
}, q.exemplar) }, q.channelQueue.exemplar)
q.lock.Unlock() q.lock.Unlock()
if err != nil { if err != nil {
log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err) log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err)
@ -179,53 +179,73 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context
} else { } else {
q.lock.Unlock() q.lock.Unlock()
} }
atShutdown(context.Background(), q.Shutdown) atShutdown(q.Shutdown)
atTerminate(context.Background(), q.Terminate) atTerminate(q.Terminate)
_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
// Just run the level queue - we shut it down later
go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 {
// Just run the level queue - we shut it down once it's flushed
go q.internal.Run(func(_ func()) {}, func(_ func()) {})
go func() { go func() {
_ = q.ChannelUniqueQueue.AddWorkers(q.workers, 0) _ = q.internal.Flush(0)
log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name())
q.internal.(*LevelUniqueQueue).Shutdown()
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
}() }()
} else {
log.Trace("PersistableChannelUniqueQueue: %s Waiting til closed", q.delayedStarter.name) log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
<-q.closed q.internal.(*LevelUniqueQueue).Shutdown()
log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name) GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
q.internal.(*LevelUniqueQueue).cancel()
q.ChannelUniqueQueue.cancel()
log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name)
q.ChannelUniqueQueue.Wait()
q.internal.(*LevelUniqueQueue).Wait()
// Redirect all remaining data in the chan to the internal channel
go func() {
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
for data := range q.ChannelUniqueQueue.dataChan {
_ = q.internal.Push(data)
} }
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
}()
log.Trace("PersistableChannelUniqueQueue: %s Done main loop", q.delayedStarter.name)
} }
// Flush flushes the queue // Flush flushes the queue
func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error { func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error {
return q.ChannelUniqueQueue.Flush(timeout) return q.channelQueue.Flush(timeout)
}
// FlushWithContext flushes the queue
func (q *PersistableChannelUniqueQueue) FlushWithContext(ctx context.Context) error {
return q.channelQueue.FlushWithContext(ctx)
}
// IsEmpty checks if a queue is empty
func (q *PersistableChannelUniqueQueue) IsEmpty() bool {
return q.channelQueue.IsEmpty()
} }
// Shutdown processing this queue // Shutdown processing this queue
func (q *PersistableChannelUniqueQueue) Shutdown() { func (q *PersistableChannelUniqueQueue) Shutdown() {
log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name) log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name)
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock()
select { select {
case <-q.closed: case <-q.closed:
q.lock.Unlock()
return
default: default:
if q.internal != nil { if q.internal != nil {
q.internal.(*LevelUniqueQueue).Shutdown() q.internal.(*LevelUniqueQueue).Shutdown()
} }
close(q.closed) close(q.closed)
q.lock.Unlock()
} }
log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name)
q.internal.(*LevelUniqueQueue).baseCtxCancel()
q.channelQueue.baseCtxCancel()
log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name)
q.channelQueue.Wait()
q.internal.(*LevelUniqueQueue).Wait()
// Redirect all remaining data in the chan to the internal channel
go func() {
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data)
}
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
}()
log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name) log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name)
} }

@ -5,9 +5,8 @@
package queue package queue
import ( import (
"fmt" "context"
"code.gitea.io/gitea/modules/graceful"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
) )
@ -51,8 +50,6 @@ func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue,
return nil, err return nil, err
} }
byteFIFO.ctx = graceful.NewChannelContext(byteFIFOQueue.IsTerminated(), fmt.Errorf("queue has been terminated"))
queue := &RedisUniqueQueue{ queue := &RedisUniqueQueue{
ByteFIFOUniqueQueue: byteFIFOQueue, ByteFIFOUniqueQueue: byteFIFOQueue,
} }
@ -92,8 +89,8 @@ func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniq
} }
// PushFunc pushes data to the end of the fifo and calls the callback if it is added // PushFunc pushes data to the end of the fifo and calls the callback if it is added
func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error { func (fifo *RedisUniqueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
added, err := fifo.client.SAdd(fifo.ctx, fifo.setName, data).Result() added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result()
if err != nil { if err != nil {
return err return err
} }
@ -105,12 +102,12 @@ func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error {
return err return err
} }
} }
return fifo.client.RPush(fifo.ctx, fifo.queueName, data).Err() return fifo.client.RPush(ctx, fifo.queueName, data).Err()
} }
// Pop pops data from the start of the fifo // Pop pops data from the start of the fifo
func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) { func (fifo *RedisUniqueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
data, err := fifo.client.LPop(fifo.ctx, fifo.queueName).Bytes() data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes()
if err != nil && err != redis.Nil { if err != nil && err != redis.Nil {
return data, err return data, err
} }
@ -119,13 +116,13 @@ func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) {
return data, nil return data, nil
} }
err = fifo.client.SRem(fifo.ctx, fifo.setName, data).Err() err = fifo.client.SRem(ctx, fifo.setName, data).Err()
return data, err return data, err
} }
// Has returns whether the fifo contains this data // Has returns whether the fifo contains this data
func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error) { func (fifo *RedisUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
return fifo.client.SIsMember(fifo.ctx, fifo.setName, data).Result() return fifo.client.SIsMember(ctx, fifo.setName, data).Result()
} }
func init() { func init() {

@ -21,7 +21,7 @@ import (
type WorkerPool struct { type WorkerPool struct {
lock sync.Mutex lock sync.Mutex
baseCtx context.Context baseCtx context.Context
cancel context.CancelFunc baseCtxCancel context.CancelFunc
cond *sync.Cond cond *sync.Cond
qid int64 qid int64
maxNumberOfWorkers int maxNumberOfWorkers int
@ -52,7 +52,7 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
dataChan := make(chan Data, config.QueueLength) dataChan := make(chan Data, config.QueueLength)
pool := &WorkerPool{ pool := &WorkerPool{
baseCtx: ctx, baseCtx: ctx,
cancel: cancel, baseCtxCancel: cancel,
batchLength: config.BatchLength, batchLength: config.BatchLength,
dataChan: dataChan, dataChan: dataChan,
handle: handle, handle: handle,
@ -83,7 +83,7 @@ func (p *WorkerPool) Push(data Data) {
} }
func (p *WorkerPool) zeroBoost() { func (p *WorkerPool) zeroBoost() {
ctx, cancel := context.WithCancel(p.baseCtx) ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout)
mq := GetManager().GetManagedQueue(p.qid) mq := GetManager().GetManagedQueue(p.qid)
boost := p.boostWorkers boost := p.boostWorkers
if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 { if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
@ -94,26 +94,14 @@ func (p *WorkerPool) zeroBoost() {
start := time.Now() start := time.Now()
pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false) pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
go func() { cancel = func() {
select {
case <-ctx.Done():
case <-time.After(p.boostTimeout):
}
mq.RemoveWorkers(pid) mq.RemoveWorkers(pid)
cancel() }
}()
} else { } else {
log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout) log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout)
go func() {
select {
case <-ctx.Done():
case <-time.After(p.boostTimeout):
}
cancel()
}()
} }
p.lock.Unlock() p.lock.Unlock()
p.addWorkers(ctx, boost) p.addWorkers(ctx, cancel, boost)
} }
func (p *WorkerPool) pushBoost(data Data) { func (p *WorkerPool) pushBoost(data Data) {
@ -140,7 +128,7 @@ func (p *WorkerPool) pushBoost(data Data) {
return return
} }
p.blockTimeout *= 2 p.blockTimeout *= 2
ctx, cancel := context.WithCancel(p.baseCtx) boostCtx, boostCtxCancel := context.WithCancel(p.baseCtx)
mq := GetManager().GetManagedQueue(p.qid) mq := GetManager().GetManagedQueue(p.qid)
boost := p.boostWorkers boost := p.boostWorkers
if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 { if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
@ -150,24 +138,24 @@ func (p *WorkerPool) pushBoost(data Data) {
log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout) log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
start := time.Now() start := time.Now()
pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false) pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), boostCtxCancel, false)
go func() { go func() {
<-ctx.Done() <-boostCtx.Done()
mq.RemoveWorkers(pid) mq.RemoveWorkers(pid)
cancel() boostCtxCancel()
}() }()
} else { } else {
log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout) log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout)
} }
go func() { go func() {
<-time.After(p.boostTimeout) <-time.After(p.boostTimeout)
cancel() boostCtxCancel()
p.lock.Lock() p.lock.Lock()
p.blockTimeout /= 2 p.blockTimeout /= 2
p.lock.Unlock() p.lock.Unlock()
}() }()
p.lock.Unlock() p.lock.Unlock()
p.addWorkers(ctx, boost) p.addWorkers(boostCtx, boostCtxCancel, boost)
p.dataChan <- data p.dataChan <- data
} }
} }
@ -243,28 +231,25 @@ func (p *WorkerPool) commonRegisterWorkers(number int, timeout time.Duration, is
mq := GetManager().GetManagedQueue(p.qid) mq := GetManager().GetManagedQueue(p.qid)
if mq != nil { if mq != nil {
pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher) pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher)
go func() {
<-ctx.Done()
mq.RemoveWorkers(pid)
cancel()
}()
log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid) log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid)
} else { return ctx, func() {
mq.RemoveWorkers(pid)
}
}
log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number) log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
}
return ctx, cancel return ctx, cancel
} }
// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit // AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc { func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
ctx, cancel := p.commonRegisterWorkers(number, timeout, false) ctx, cancel := p.commonRegisterWorkers(number, timeout, false)
p.addWorkers(ctx, number) p.addWorkers(ctx, cancel, number)
return cancel return cancel
} }
// addWorkers adds workers to the pool // addWorkers adds workers to the pool
func (p *WorkerPool) addWorkers(ctx context.Context, number int) { func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, number int) {
for i := 0; i < number; i++ { for i := 0; i < number; i++ {
p.lock.Lock() p.lock.Lock()
if p.cond == nil { if p.cond == nil {
@ -279,11 +264,13 @@ func (p *WorkerPool) addWorkers(ctx context.Context, number int) {
p.numberOfWorkers-- p.numberOfWorkers--
if p.numberOfWorkers == 0 { if p.numberOfWorkers == 0 {
p.cond.Broadcast() p.cond.Broadcast()
cancel()
} else if p.numberOfWorkers < 0 { } else if p.numberOfWorkers < 0 {
// numberOfWorkers can't go negative but... // numberOfWorkers can't go negative but...
log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid) log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid)
p.numberOfWorkers = 0 p.numberOfWorkers = 0
p.cond.Broadcast() p.cond.Broadcast()
cancel()
} }
p.lock.Unlock() p.lock.Unlock()
}() }()

@ -6,7 +6,6 @@
package pull package pull
import ( import (
"context"
"strconv" "strconv"
"testing" "testing"
"time" "time"
@ -54,9 +53,9 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) {
assert.True(t, has) assert.True(t, has)
assert.NoError(t, err) assert.NoError(t, err)
prQueue.Run(func(_ context.Context, shutdown func()) { prQueue.Run(func(shutdown func()) {
queueShutdown = append(queueShutdown, shutdown) queueShutdown = append(queueShutdown, shutdown)
}, func(_ context.Context, terminate func()) { }, func(terminate func()) {
queueTerminate = append(queueTerminate, terminate) queueTerminate = append(queueTerminate, terminate)
}) })

Loading…
Cancel
Save