Skip to content

Commit 48922ce

Browse files
author
Albin Kerouanton
committed
Try to asynchronously connect to Fluentd before writing
PR fluent#77 introduced a new parameter named ForceStopAsyncSend. It can be used to tell the logger to not try to send all the log messages in its buffer before closing. Without this parameter, the logger hangs out whenever the target Fluentd server is down. Also the logger is currently lazily initializing the connection when it receives its first log. But this is a problem when the Fluentd server has never been available as the connection initialization blocks the select signaling the log channel should be drained.
1 parent 28f6a3e commit 48922ce

File tree

1 file changed

+40
-26
lines changed

1 file changed

+40
-26
lines changed

fluent/fluent.go

+40-26
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ import (
1515
"bytes"
1616
"encoding/base64"
1717
"encoding/binary"
18-
"github.com/tinylib/msgp/msgp"
1918
"math/rand"
19+
20+
"github.com/tinylib/msgp/msgp"
2021
)
2122

2223
const (
@@ -340,6 +341,8 @@ func (f *Fluent) close(c net.Conn) {
340341

341342
// connect establishes a new connection using the specified transport.
342343
func (f *Fluent) connect() (err error) {
344+
f.muconn.Lock()
345+
defer f.muconn.Unlock()
343346

344347
switch f.Config.FluentNetwork {
345348
case "tcp":
@@ -355,6 +358,42 @@ func (f *Fluent) connect() (err error) {
355358
func (f *Fluent) run() {
356359
drainEvents := false
357360
var emitEventDrainMsg sync.Once
361+
362+
// First we need to wait for the connection to become ready to make sure
363+
// it won't be initialized during the first for-select iteration. Otherwise
364+
// this would block the select from f.pending without letting the change to
365+
// the select on f.stopRunning to signal its end to this goroutine.
366+
for i := 0; i < f.Config.MaxRetry; i++ {
367+
select {
368+
case stopRunning, ok := <-f.stopRunning:
369+
if stopRunning || !ok {
370+
drainEvents = true
371+
}
372+
default:
373+
err := f.connect()
374+
if err == nil {
375+
break
376+
}
377+
378+
if _, ok := err.(*ErrUnknownNetwork); ok {
379+
// No need to retry on unknown network error. Thus ready channel
380+
// is closed and received logs are discarded.
381+
drainEvents = true
382+
break
383+
}
384+
385+
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
386+
if waitTime > f.Config.MaxRetryWait {
387+
waitTime = f.Config.MaxRetryWait
388+
}
389+
390+
time.Sleep(time.Duration(waitTime) * time.Millisecond)
391+
}
392+
}
393+
394+
// At this point we can go ahead: even if a message is send to
395+
// f.stopRunning right after the connection become ready, the following
396+
// for-select loop will get it and proceed accordingly.
358397
for {
359398
select {
360399
case entry, ok := <-f.pending:
@@ -389,31 +428,6 @@ func (f *Fluent) write(msg *msgToSend) error {
389428
var c net.Conn
390429
for i := 0; i < f.Config.MaxRetry; i++ {
391430
c = f.conn
392-
// Connect if needed
393-
if c == nil {
394-
f.muconn.Lock()
395-
if f.conn == nil {
396-
err := f.connect()
397-
if err != nil {
398-
f.muconn.Unlock()
399-
400-
if _, ok := err.(*ErrUnknownNetwork); ok {
401-
// do not retry on unknown network error
402-
break
403-
}
404-
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
405-
if waitTime > f.Config.MaxRetryWait {
406-
waitTime = f.Config.MaxRetryWait
407-
}
408-
time.Sleep(time.Duration(waitTime) * time.Millisecond)
409-
continue
410-
}
411-
}
412-
c = f.conn
413-
f.muconn.Unlock()
414-
}
415-
416-
// We're connected, write msg
417431
t := f.Config.WriteTimeout
418432
if time.Duration(0) < t {
419433
c.SetWriteDeadline(time.Now().Add(t))

0 commit comments

Comments
 (0)