Skip to content

ForceStopAsyncSend option for graceful stop in async mode #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ Since the default is zero value, Write will not time out.
Enable asynchronous I/O (connect and write) for sending events to Fluentd.
The default is false.

### ForceStopAsyncSend

When Async is enabled, immediately discard the event queue on close() and return (instead of trying MaxRetry times for each event in the queue before returning)
The default is false.

### RequestAck

Sets whether to request acknowledgment from Fluentd to increase the reliability
Expand Down
47 changes: 33 additions & 14 deletions fluent/fluent.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ const (
)

type Config struct {
FluentPort int `json:"fluent_port"`
FluentHost string `json:"fluent_host"`
FluentNetwork string `json:"fluent_network"`
FluentSocketPath string `json:"fluent_socket_path"`
Timeout time.Duration `json:"timeout"`
WriteTimeout time.Duration `json:"write_timeout"`
BufferLimit int `json:"buffer_limit"`
RetryWait int `json:"retry_wait"`
MaxRetry int `json:"max_retry"`
MaxRetryWait int `json:"max_retry_wait"`
TagPrefix string `json:"tag_prefix"`
Async bool `json:"async"`
FluentPort int `json:"fluent_port"`
FluentHost string `json:"fluent_host"`
FluentNetwork string `json:"fluent_network"`
FluentSocketPath string `json:"fluent_socket_path"`
Timeout time.Duration `json:"timeout"`
WriteTimeout time.Duration `json:"write_timeout"`
BufferLimit int `json:"buffer_limit"`
RetryWait int `json:"retry_wait"`
MaxRetry int `json:"max_retry"`
MaxRetryWait int `json:"max_retry_wait"`
TagPrefix string `json:"tag_prefix"`
Async bool `json:"async"`
ForceStopAsyncSend bool `json:"force_stop_async_send"`
// Deprecated: Use Async instead
AsyncConnect bool `json:"async_connect"`
MarshalAsJSON bool `json:"marshal_as_json"`
Expand Down Expand Up @@ -83,8 +84,9 @@ type msgToSend struct {
type Fluent struct {
Config

pending chan *msgToSend
wg sync.WaitGroup
stopRunning chan bool
pending chan *msgToSend
wg sync.WaitGroup

muconn sync.Mutex
conn net.Conn
Expand Down Expand Up @@ -305,6 +307,10 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg
// Close closes the connection, waiting for pending logs to be sent
func (f *Fluent) Close() (err error) {
if f.Config.Async {
if f.Config.ForceStopAsyncSend {
f.stopRunning <- true
close(f.stopRunning)
}
close(f.pending)
f.wg.Wait()
}
Expand Down Expand Up @@ -347,18 +353,31 @@ func (f *Fluent) connect() (err error) {
}

func (f *Fluent) run() {
drainEvents := false
var emitEventDrainMsg sync.Once
for {
select {
case entry, ok := <-f.pending:
if !ok {
f.wg.Done()
return
}
if drainEvents {
emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) })
continue
}
err := f.write(entry)
if err != nil {
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
}
}
select {
case stopRunning, ok := <-f.stopRunning:
if stopRunning || !ok {
drainEvents = true
}
default:
}
}
}

Expand Down
25 changes: 13 additions & 12 deletions fluent/fluent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,18 +249,19 @@ func TestJsonConfig(t *testing.T) {
}
var got Config
expect := Config{
FluentPort: 8888,
FluentHost: "localhost",
FluentNetwork: "tcp",
FluentSocketPath: "/var/tmp/fluent.sock",
Timeout: 3000,
WriteTimeout: 6000,
BufferLimit: 10,
RetryWait: 5,
MaxRetry: 3,
TagPrefix: "fluent",
Async: false,
MarshalAsJSON: true,
FluentPort: 8888,
FluentHost: "localhost",
FluentNetwork: "tcp",
FluentSocketPath: "/var/tmp/fluent.sock",
Timeout: 3000,
WriteTimeout: 6000,
BufferLimit: 10,
RetryWait: 5,
MaxRetry: 3,
TagPrefix: "fluent",
Async: false,
ForceStopAsyncSend: false,
MarshalAsJSON: true,
}

err = json.Unmarshal(b, &got)
Expand Down
1 change: 1 addition & 0 deletions fluent/testdata/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
"max_retry":3,
"tag_prefix":"fluent",
"async": false,
"force_stop_async_send": false,
"marshal_as_json": true
}