Fix race in log (#16490)

A race has been detected in #1441 relating to getting log levels.

This PR protects the GetLevel and GetStacktraceLevel calls with a RW mutex.

Signed-off-by: Andrew Thornton <art27@cantab.net>
tokarchuk/v1.17
zeripath 3 years ago committed by GitHub
parent 97381aad5d
commit 49bd9a1111
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 56
      modules/log/event.go
  2. 1
      modules/queue/queue_disk_channel_test.go

@ -143,7 +143,7 @@ type MultiChannelledLog struct {
name string name string
bufferLength int64 bufferLength int64
queue chan *Event queue chan *Event
mutex sync.Mutex rwmutex sync.RWMutex
loggers map[string]EventLogger loggers map[string]EventLogger
flush chan bool flush chan bool
close chan bool close chan bool
@ -173,10 +173,10 @@ func NewMultiChannelledLog(name string, bufferLength int64) *MultiChannelledLog
// AddLogger adds a logger to this MultiChannelledLog // AddLogger adds a logger to this MultiChannelledLog
func (m *MultiChannelledLog) AddLogger(logger EventLogger) error { func (m *MultiChannelledLog) AddLogger(logger EventLogger) error {
m.mutex.Lock() m.rwmutex.Lock()
name := logger.GetName() name := logger.GetName()
if _, has := m.loggers[name]; has { if _, has := m.loggers[name]; has {
m.mutex.Unlock() m.rwmutex.Unlock()
return ErrDuplicateName{name} return ErrDuplicateName{name}
} }
m.loggers[name] = logger m.loggers[name] = logger
@ -186,7 +186,7 @@ func (m *MultiChannelledLog) AddLogger(logger EventLogger) error {
if logger.GetStacktraceLevel() < m.stacktraceLevel { if logger.GetStacktraceLevel() < m.stacktraceLevel {
m.stacktraceLevel = logger.GetStacktraceLevel() m.stacktraceLevel = logger.GetStacktraceLevel()
} }
m.mutex.Unlock() m.rwmutex.Unlock()
go m.Start() go m.Start()
return nil return nil
} }
@ -195,15 +195,15 @@ func (m *MultiChannelledLog) AddLogger(logger EventLogger) error {
// NB: If you delete the last sublogger this logger will simply drop // NB: If you delete the last sublogger this logger will simply drop
// log events // log events
func (m *MultiChannelledLog) DelLogger(name string) bool { func (m *MultiChannelledLog) DelLogger(name string) bool {
m.mutex.Lock() m.rwmutex.Lock()
logger, has := m.loggers[name] logger, has := m.loggers[name]
if !has { if !has {
m.mutex.Unlock() m.rwmutex.Unlock()
return false return false
} }
delete(m.loggers, name) delete(m.loggers, name)
m.internalResetLevel() m.internalResetLevel()
m.mutex.Unlock() m.rwmutex.Unlock()
logger.Flush() logger.Flush()
logger.Close() logger.Close()
return true return true
@ -211,15 +211,15 @@ func (m *MultiChannelledLog) DelLogger(name string) bool {
// GetEventLogger returns a sub logger from this MultiChannelledLog // GetEventLogger returns a sub logger from this MultiChannelledLog
func (m *MultiChannelledLog) GetEventLogger(name string) EventLogger { func (m *MultiChannelledLog) GetEventLogger(name string) EventLogger {
m.mutex.Lock() m.rwmutex.RLock()
defer m.mutex.Unlock() defer m.rwmutex.RUnlock()
return m.loggers[name] return m.loggers[name]
} }
// GetEventLoggerNames returns a list of names // GetEventLoggerNames returns a list of names
func (m *MultiChannelledLog) GetEventLoggerNames() []string { func (m *MultiChannelledLog) GetEventLoggerNames() []string {
m.mutex.Lock() m.rwmutex.RLock()
defer m.mutex.Unlock() defer m.rwmutex.RUnlock()
var keys []string var keys []string
for k := range m.loggers { for k := range m.loggers {
keys = append(keys, k) keys = append(keys, k)
@ -228,12 +228,12 @@ func (m *MultiChannelledLog) GetEventLoggerNames() []string {
} }
func (m *MultiChannelledLog) closeLoggers() { func (m *MultiChannelledLog) closeLoggers() {
m.mutex.Lock() m.rwmutex.Lock()
for _, logger := range m.loggers { for _, logger := range m.loggers {
logger.Flush() logger.Flush()
logger.Close() logger.Close()
} }
m.mutex.Unlock() m.rwmutex.Unlock()
m.closed <- true m.closed <- true
} }
@ -249,8 +249,8 @@ func (m *MultiChannelledLog) Resume() {
// ReleaseReopen causes this logger to tell its subloggers to release and reopen // ReleaseReopen causes this logger to tell its subloggers to release and reopen
func (m *MultiChannelledLog) ReleaseReopen() error { func (m *MultiChannelledLog) ReleaseReopen() error {
m.mutex.Lock() m.rwmutex.Lock()
defer m.mutex.Unlock() defer m.rwmutex.Unlock()
var accumulatedErr error var accumulatedErr error
for _, logger := range m.loggers { for _, logger := range m.loggers {
if err := logger.ReleaseReopen(); err != nil { if err := logger.ReleaseReopen(); err != nil {
@ -266,13 +266,13 @@ func (m *MultiChannelledLog) ReleaseReopen() error {
// Start processing the MultiChannelledLog // Start processing the MultiChannelledLog
func (m *MultiChannelledLog) Start() { func (m *MultiChannelledLog) Start() {
m.mutex.Lock() m.rwmutex.Lock()
if m.started { if m.started {
m.mutex.Unlock() m.rwmutex.Unlock()
return return
} }
m.started = true m.started = true
m.mutex.Unlock() m.rwmutex.Unlock()
paused := false paused := false
for { for {
if paused { if paused {
@ -286,11 +286,11 @@ func (m *MultiChannelledLog) Start() {
m.closeLoggers() m.closeLoggers()
return return
} }
m.mutex.Lock() m.rwmutex.RLock()
for _, logger := range m.loggers { for _, logger := range m.loggers {
logger.Flush() logger.Flush()
} }
m.mutex.Unlock() m.rwmutex.RUnlock()
case <-m.close: case <-m.close:
m.closeLoggers() m.closeLoggers()
return return
@ -307,24 +307,24 @@ func (m *MultiChannelledLog) Start() {
m.closeLoggers() m.closeLoggers()
return return
} }
m.mutex.Lock() m.rwmutex.RLock()
for _, logger := range m.loggers { for _, logger := range m.loggers {
err := logger.LogEvent(event) err := logger.LogEvent(event)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }
} }
m.mutex.Unlock() m.rwmutex.RUnlock()
case _, ok := <-m.flush: case _, ok := <-m.flush:
if !ok { if !ok {
m.closeLoggers() m.closeLoggers()
return return
} }
m.mutex.Lock() m.rwmutex.RLock()
for _, logger := range m.loggers { for _, logger := range m.loggers {
logger.Flush() logger.Flush()
} }
m.mutex.Unlock() m.rwmutex.RUnlock()
case <-m.close: case <-m.close:
m.closeLoggers() m.closeLoggers()
return return
@ -359,11 +359,15 @@ func (m *MultiChannelledLog) Flush() {
// GetLevel gets the level of this MultiChannelledLog // GetLevel gets the level of this MultiChannelledLog
func (m *MultiChannelledLog) GetLevel() Level { func (m *MultiChannelledLog) GetLevel() Level {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
return m.level return m.level
} }
// GetStacktraceLevel gets the level of this MultiChannelledLog // GetStacktraceLevel gets the level of this MultiChannelledLog
func (m *MultiChannelledLog) GetStacktraceLevel() Level { func (m *MultiChannelledLog) GetStacktraceLevel() Level {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
return m.stacktraceLevel return m.stacktraceLevel
} }
@ -384,8 +388,8 @@ func (m *MultiChannelledLog) internalResetLevel() Level {
// ResetLevel will reset the level of this MultiChannelledLog // ResetLevel will reset the level of this MultiChannelledLog
func (m *MultiChannelledLog) ResetLevel() Level { func (m *MultiChannelledLog) ResetLevel() Level {
m.mutex.Lock() m.rwmutex.Lock()
defer m.mutex.Unlock() defer m.rwmutex.Unlock()
return m.internalResetLevel() return m.internalResetLevel()
} }

@ -16,7 +16,6 @@ import (
func TestPersistableChannelQueue(t *testing.T) { func TestPersistableChannelQueue(t *testing.T) {
handleChan := make(chan *testData) handleChan := make(chan *testData)
handle := func(data ...Data) { handle := func(data ...Data) {
assert.True(t, len(data) == 2)
for _, datum := range data { for _, datum := range data {
testDatum := datum.(*testData) testDatum := datum.(*testData)
handleChan <- testDatum handleChan <- testDatum

Loading…
Cancel
Save