Skip to content

Commit f2bc148

Browse files
author
Albin Kerouanton
committed
Try to asynchronously connect to Fluentd before writing
PR fluent#77 introduced a new parameter named ForceStopAsyncSend. It can be used to tell the logger to not try to send all the log messages in its buffer before closing. Without this parameter, the logger hangs out whenever the target Fluentd server is down. Also the logger is currently lazily initializing the connection when it receives its first log. But this is a problem when the Fluentd server has never been available as the connection initialization blocks the select signaling the log channel should be drained.
1 parent 28f6a3e commit f2bc148

File tree

1 file changed

+43
-26
lines changed

1 file changed

+43
-26
lines changed

fluent/fluent.go

+43-26
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ import (
1515
"bytes"
1616
"encoding/base64"
1717
"encoding/binary"
18-
"github.com/tinylib/msgp/msgp"
1918
"math/rand"
19+
20+
"github.com/tinylib/msgp/msgp"
2021
)
2122

2223
const (
@@ -340,6 +341,8 @@ func (f *Fluent) close(c net.Conn) {
340341

341342
// connect establishes a new connection using the specified transport.
342343
func (f *Fluent) connect() (err error) {
344+
f.muconn.Lock()
345+
defer f.muconn.Unlock()
343346

344347
switch f.Config.FluentNetwork {
345348
case "tcp":
@@ -355,6 +358,45 @@ func (f *Fluent) connect() (err error) {
355358
func (f *Fluent) run() {
356359
drainEvents := false
357360
var emitEventDrainMsg sync.Once
361+
362+
// First we need to wait for the connection to become ready to make sure
363+
// it won't be initialized during the first for-select iteration. Otherwise
364+
// this would block the select from f.pending without letting the change to
365+
// the select on f.stopRunning to signal its end to this goroutine.
366+
var wait <-chan time.Time
367+
for i := 0; i < f.Config.MaxRetry; i++ {
368+
select {
369+
case stopRunning, ok := <-f.stopRunning:
370+
if stopRunning || !ok {
371+
drainEvents = true
372+
}
373+
break
374+
case <-wait:
375+
}
376+
377+
err := f.connect()
378+
if err == nil {
379+
break
380+
}
381+
382+
if _, ok := err.(*ErrUnknownNetwork); ok {
383+
// No need to retry on unknown network error. Thus ready channel
384+
// is closed and received logs are discarded.
385+
drainEvents = true
386+
break
387+
}
388+
389+
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
390+
if waitTime > f.Config.MaxRetryWait {
391+
waitTime = f.Config.MaxRetryWait
392+
}
393+
394+
wait = time.After(time.Duration(waitTime) * time.Millisecond)
395+
}
396+
397+
// At this point we can go ahead: even if a message is send to
398+
// f.stopRunning right after the connection become ready, the following
399+
// for-select loop will get it and proceed accordingly.
358400
for {
359401
select {
360402
case entry, ok := <-f.pending:
@@ -389,31 +431,6 @@ func (f *Fluent) write(msg *msgToSend) error {
389431
var c net.Conn
390432
for i := 0; i < f.Config.MaxRetry; i++ {
391433
c = f.conn
392-
// Connect if needed
393-
if c == nil {
394-
f.muconn.Lock()
395-
if f.conn == nil {
396-
err := f.connect()
397-
if err != nil {
398-
f.muconn.Unlock()
399-
400-
if _, ok := err.(*ErrUnknownNetwork); ok {
401-
// do not retry on unknown network error
402-
break
403-
}
404-
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
405-
if waitTime > f.Config.MaxRetryWait {
406-
waitTime = f.Config.MaxRetryWait
407-
}
408-
time.Sleep(time.Duration(waitTime) * time.Millisecond)
409-
continue
410-
}
411-
}
412-
c = f.conn
413-
f.muconn.Unlock()
414-
}
415-
416-
// We're connected, write msg
417434
t := f.Config.WriteTimeout
418435
if time.Duration(0) < t {
419436
c.SetWriteDeadline(time.Now().Add(t))

0 commit comments

Comments
 (0)