diff --git a/http_sender.go b/http_sender.go index 2142961..7d9e163 100644 --- a/http_sender.go +++ b/http_sender.go @@ -155,13 +155,15 @@ func newHttpLineSender(conf *lineSenderConfig) (*httpLineSender, error) { s.globalTransport.RegisterClient() } - s.uri = "http" + var protocol = "http" if conf.tlsMode != tlsDisabled { - s.uri += "s" + protocol += "s" } - s.uri += fmt.Sprintf("://%s/write", s.address) - return s, nil + s.uri = fmt.Sprintf("%s://%s/write", protocol, s.address) + + var pingUri = fmt.Sprintf("%s://%s/ping", protocol, s.address) + return s, s.ping(pingUri) } func (s *httpLineSender) Flush(ctx context.Context) error { @@ -336,23 +338,56 @@ func (s *httpLineSender) At(ctx context.Context, ts time.Time) error { return nil } -// makeRequest returns a boolean if we need to retry the request -func (s *httpLineSender) makeRequest(ctx context.Context) (bool, error) { +func (s *httpLineSender) ping(uri string) error { + buf := bytes.Buffer{} + success := http.StatusNoContent + req, err := http.NewRequest( http.MethodPost, - s.uri, - bytes.NewReader(s.buf.Bytes()), + uri, + &buf, ) if err != nil { - return false, err + return err } - req.ContentLength = int64(s.BufLen()) + s.setAuth(req) + + resp, err := s.client.Do(req) + if err != nil { + return err + } + + if resp.StatusCode != success { + return fmt.Errorf("ping returned status %d, expected %d", resp.StatusCode, success) + } + + return nil + +} + +func (s *httpLineSender) setAuth(req *http.Request) { if s.user != "" && s.pass != "" { req.SetBasicAuth(s.user, s.pass) } else if s.token != "" { req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.token)) } +} + +// makeRequest returns a boolean if we need to retry the request +func (s *httpLineSender) makeRequest(ctx context.Context) (bool, error) { + req, err := http.NewRequest( + http.MethodPost, + s.uri, + bytes.NewReader(s.buf.Bytes()), + ) + if err != nil { + return false, err + } + + s.setAuth(req) + + req.ContentLength = int64(s.BufLen()) // reqTimeout = ( request.len() / request_min_throughput ) + request_timeout // nb: conversion from int to time.Duration is in milliseconds diff --git a/http_sender_test.go b/http_sender_test.go index 67d12ad..3f7e4e2 100644 --- a/http_sender_test.go +++ b/http_sender_test.go @@ -93,19 +93,19 @@ func TestHttpHappyCasesFromConf(t *testing.T) { } func TestHttpHappyCasesFromEnv(t *testing.T) { - var ( - addr = "localhost:1111" - ) + // Set up test server to handle pings from constructor + s, err := newTestServerWithProtocol(readAndDiscard, "http") + assert.NoError(t, err) testCases := []httpConfigTestCase{ { name: "addr only", - config: fmt.Sprintf("http::addr=%s", addr), + config: fmt.Sprintf("http::addr=%s", s.addr), }, { name: "auto flush", config: fmt.Sprintf("http::addr=%s;auto_flush_rows=100;auto_flush_interval=1000;", - addr), + s.addr), }, } diff --git a/tcp_sender_test.go b/tcp_sender_test.go index d738d93..a576352 100644 --- a/tcp_sender_test.go +++ b/tcp_sender_test.go @@ -300,7 +300,6 @@ func TestErrorOnContextDeadline(t *testing.T) { } t.Fail() } - func BenchmarkLineSenderBatch1000(b *testing.B) { ctx := context.Background() diff --git a/utils_test.go b/utils_test.go index 8ecef81..9823458 100644 --- a/utils_test.go +++ b/utils_test.go @@ -194,6 +194,12 @@ func (s *testServer) serveHttp() { err error ) + // handle ping + if r.URL.Path == "/ping" { + w.WriteHeader(http.StatusNoContent) + return + } + switch s.serverType { case failFirstThenSendToBackChannel: if atomic.AddInt64(&reqs, 1) == 1 {