|
|
@ -56,7 +56,7 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h |
|
|
|
for q.internal == nil { |
|
|
|
for q.internal == nil { |
|
|
|
select { |
|
|
|
select { |
|
|
|
case <-ctx.Done(): |
|
|
|
case <-ctx.Done(): |
|
|
|
return fmt.Errorf("Timedout creating queue %v with cfg %s in %s", q.underlying, q.cfg, q.name) |
|
|
|
return fmt.Errorf("Timedout creating queue %v with cfg %#v in %s", q.underlying, q.cfg, q.name) |
|
|
|
default: |
|
|
|
default: |
|
|
|
queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar) |
|
|
|
queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar) |
|
|
|
if err == nil { |
|
|
|
if err == nil { |
|
|
@ -64,11 +64,11 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h |
|
|
|
break |
|
|
|
break |
|
|
|
} |
|
|
|
} |
|
|
|
if err.Error() != "resource temporarily unavailable" { |
|
|
|
if err.Error() != "resource temporarily unavailable" { |
|
|
|
log.Warn("[Attempt: %d] Failed to create queue: %v for %s cfg: %s error: %v", i, q.underlying, q.name, q.cfg, err) |
|
|
|
log.Warn("[Attempt: %d] Failed to create queue: %v for %s cfg: %#v error: %v", i, q.underlying, q.name, q.cfg, err) |
|
|
|
} |
|
|
|
} |
|
|
|
i++ |
|
|
|
i++ |
|
|
|
if q.maxAttempts > 0 && i > q.maxAttempts { |
|
|
|
if q.maxAttempts > 0 && i > q.maxAttempts { |
|
|
|
return fmt.Errorf("Unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, q.cfg, err) |
|
|
|
return fmt.Errorf("Unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err) |
|
|
|
} |
|
|
|
} |
|
|
|
sleepTime := 100 * time.Millisecond |
|
|
|
sleepTime := 100 * time.Millisecond |
|
|
|
if q.timeout > 0 && q.maxAttempts > 0 { |
|
|
|
if q.timeout > 0 && q.maxAttempts > 0 { |
|
|
|