Skip to content

Commit 05f5f84

Browse files
author
Albin Kerouanton
committed
Try to asynchronously connect to Fluentd before writing
PR fluent#77 introduced a new parameter named ForceStopAsyncSend. It can be used to tell the logger to not try to send all the log messages in its buffer before closing. Without this parameter, the logger hangs out whenever it has logs to write and the target Fluentd server is down. But this new parameter is not enough: the connection is currently lazily initialized when the logger receive its first message. This blocks the select reading messages from the queue (until the connection is ready). Moreover, the connection dialing uses an exponential back-off retry. Because of that, the logger won't look for messages on `stopRunning` channel (the channel introduced by PR fluent#77), either because it's blocked by the Sleep used for the retry or because the connection dialing is waiting for tcp timeout. To fix these edge cases, the connection isn't initialized lazily anymore. However, it's still initialized asynchronously and with the exponential back-off retry. The Fluent.run() method has been adapted to wait for either the connection to become ready or to receive a stop signal on the stopRunning channel before starting to unqueue logs. This fix is motivated by the issue described in: moby/moby#40063. Signed-off-by: Albin Kerouanton <[email protected]>
1 parent 28f6a3e commit 05f5f84

File tree

2 files changed

+112
-70
lines changed

2 files changed

+112
-70
lines changed

fluent/fluent.go

+99-46
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package fluent
22

33
import (
4+
"context"
45
"encoding/json"
56
"errors"
67
"fmt"
@@ -15,8 +16,9 @@ import (
1516
"bytes"
1617
"encoding/base64"
1718
"encoding/binary"
18-
"github.com/tinylib/msgp/msgp"
1919
"math/rand"
20+
21+
"github.com/tinylib/msgp/msgp"
2022
)
2123

2224
const (
@@ -84,6 +86,7 @@ type msgToSend struct {
8486
type Fluent struct {
8587
Config
8688

89+
ready chan bool
8790
stopRunning chan bool
8891
pending chan *msgToSend
8992
wg sync.WaitGroup
@@ -130,14 +133,16 @@ func New(config Config) (f *Fluent, err error) {
130133
}
131134
if config.Async {
132135
f = &Fluent{
133-
Config: config,
134-
pending: make(chan *msgToSend, config.BufferLimit),
136+
Config: config,
137+
ready: make(chan bool),
138+
stopRunning: make(chan bool),
139+
pending: make(chan *msgToSend, config.BufferLimit),
135140
}
136141
f.wg.Add(1)
137142
go f.run()
138143
} else {
139144
f = &Fluent{Config: config}
140-
err = f.connect()
145+
err = f.connect(context.Background())
141146
}
142147
return
143148
}
@@ -339,38 +344,111 @@ func (f *Fluent) close(c net.Conn) {
339344
}
340345

341346
// connect establishes a new connection using the specified transport.
342-
func (f *Fluent) connect() (err error) {
347+
func (f *Fluent) connect(ctx context.Context) (err error) {
348+
f.muconn.Lock()
349+
defer f.muconn.Unlock()
350+
351+
dialer := net.Dialer{Timeout: f.Config.Timeout}
343352

344353
switch f.Config.FluentNetwork {
345354
case "tcp":
346-
f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), f.Config.Timeout)
355+
f.conn, err = dialer.DialContext(ctx,
356+
f.Config.FluentNetwork,
357+
f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort))
347358
case "unix":
348-
f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentSocketPath, f.Config.Timeout)
359+
f.conn, err = dialer.DialContext(ctx,
360+
f.Config.FluentNetwork,
361+
f.Config.FluentSocketPath)
349362
default:
350363
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
351364
}
365+
352366
return err
353367
}
354368

355-
func (f *Fluent) run() {
356-
drainEvents := false
357-
var emitEventDrainMsg sync.Once
358-
for {
369+
func (f *Fluent) connectAsync(ctx context.Context, stopAsyncConnect <-chan bool) {
370+
f.wg.Add(1)
371+
defer f.wg.Done()
372+
373+
waiter := time.After(time.Duration(0))
374+
for i := 0; i < f.Config.MaxRetry; i++ {
359375
select {
360-
case entry, ok := <-f.pending:
361-
if !ok {
362-
f.wg.Done()
363-
return
376+
case <-waiter:
377+
if f.conn != nil {
378+
f.ready <- true
364379
}
365-
if drainEvents {
366-
emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) })
367-
continue
380+
381+
err := f.connect(ctx)
382+
if err == nil {
383+
f.ready <- true
384+
break
385+
}
386+
387+
if _, ok := err.(*ErrUnknownNetwork); ok {
388+
// No need to retry on unknown network error. Thus false is passed
389+
// to ready channel to let the other end drain the message queue.
390+
f.ready <- false
391+
break
368392
}
369-
err := f.write(entry)
370-
if err != nil {
371-
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
393+
394+
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
395+
if waitTime > f.Config.MaxRetryWait {
396+
waitTime = f.Config.MaxRetryWait
372397
}
398+
399+
waiter = time.After(time.Duration(waitTime) * time.Millisecond)
400+
case <-stopAsyncConnect:
401+
break
373402
}
403+
}
404+
}
405+
406+
func (f *Fluent) run() {
407+
drainEvents := false
408+
var emitEventDrainMsg sync.Once
409+
410+
// First we need to wait for the connection to become ready. We cannot
411+
// initialize the connection lazily (eg. when the first message is
412+
// received) because it'd be done in the first for-select iteration on
413+
// f.pending and this would block the select without letting the chance to
414+
// the select on f.stopRunning to signal its end to this goroutine.
415+
ctx, cancelDialing := context.WithCancel(context.Background())
416+
stopAsyncConnect := make(chan bool)
417+
go f.connectAsync(ctx, stopAsyncConnect)
418+
select {
419+
case <-f.stopRunning:
420+
drainEvents = true
421+
// Stop any connection dialing and then tell connectAsync to stop
422+
// trying to dial the connection. This has to be done in this
423+
// specifc order to make sure connectAsync() is not blocking on the
424+
// connection dialing.
425+
cancelDialing()
426+
close(stopAsyncConnect)
427+
break asyncConnect
428+
case ready, ok := <-f.ready:
429+
if !ready || !ok {
430+
drainEvents = true
431+
}
432+
break asyncConnect
433+
}
434+
435+
// At this point we can go ahead: the connection is either ready to use or
436+
// drainEvents is true and thus all logs should be discarded.
437+
for {
438+
entry, ok := <-f.pending:
439+
if !ok {
440+
f.wg.Done()
441+
return
442+
}
443+
if drainEvents {
444+
emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) })
445+
continue
446+
}
447+
err := f.write(entry)
448+
if err != nil {
449+
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
450+
}
451+
374452
select {
375453
case stopRunning, ok := <-f.stopRunning:
376454
if stopRunning || !ok {
@@ -389,31 +467,6 @@ func (f *Fluent) write(msg *msgToSend) error {
389467
var c net.Conn
390468
for i := 0; i < f.Config.MaxRetry; i++ {
391469
c = f.conn
392-
// Connect if needed
393-
if c == nil {
394-
f.muconn.Lock()
395-
if f.conn == nil {
396-
err := f.connect()
397-
if err != nil {
398-
f.muconn.Unlock()
399-
400-
if _, ok := err.(*ErrUnknownNetwork); ok {
401-
// do not retry on unknown network error
402-
break
403-
}
404-
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
405-
if waitTime > f.Config.MaxRetryWait {
406-
waitTime = f.Config.MaxRetryWait
407-
}
408-
time.Sleep(time.Duration(waitTime) * time.Millisecond)
409-
continue
410-
}
411-
}
412-
c = f.conn
413-
f.muconn.Unlock()
414-
}
415-
416-
// We're connected, write msg
417470
t := f.Config.WriteTimeout
418471
if time.Duration(0) < t {
419472
c.SetWriteDeadline(time.Now().Add(t))

fluent/fluent_test.go

+13-24
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,10 @@ func Test_New_itShouldUseConfigValuesFromMashalAsJSONArgument(t *testing.T) {
140140
}
141141

142142
func Test_send_WritePendingToConn(t *testing.T) {
143-
f, _ := New(Config{Async: true})
143+
f, _ := New(Config{
144+
Async: true,
145+
ForceStopAsyncSend: true,
146+
})
144147

145148
conn := &Conn{}
146149
f.conn = conn
@@ -274,31 +277,17 @@ func TestJsonConfig(t *testing.T) {
274277
}
275278
}
276279

277-
func TestAsyncConnect(t *testing.T) {
278-
type result struct {
279-
f *Fluent
280-
err error
280+
func TestAsyncConnectDoesNotPreventClose(t *testing.T) {
281+
config := Config{
282+
FluentPort: 6666,
283+
Async: true,
284+
ForceStopAsyncSend: true,
281285
}
282-
ch := make(chan result, 1)
283-
go func() {
284-
config := Config{
285-
FluentPort: 8888,
286-
Async: true,
287-
}
288-
f, err := New(config)
289-
ch <- result{f: f, err: err}
290-
}()
291-
292-
select {
293-
case res := <-ch:
294-
if res.err != nil {
295-
t.Errorf("fluent.New() failed with %#v", res.err)
296-
return
297-
}
298-
res.f.Close()
299-
case <-time.After(time.Millisecond * 500):
300-
t.Error("Async must not block")
286+
f, err := New(config)
287+
if err != nil {
288+
t.Errorf("Unexpected error: %v", err)
301289
}
290+
f.Close()
302291
}
303292

304293
func Test_PostWithTimeNotTimeOut(t *testing.T) {

0 commit comments

Comments
 (0)