Skip to content

feat(client): add ping method to http/s sender #45

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions http_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,57 @@ func (suite *integrationTestSuite) TestServerSideError() {
sender.Close(ctx)
questdbC.Stop(ctx)
}

func (suite *integrationTestSuite) TestHttpPingSuccess() {
if testing.Short() {
suite.T().Skip("skipping integration test")
}

ctx := context.Background()

var (
sender qdb.LineSender
err error
)

questdbC, err := setupQuestDB(ctx, noAuth)
assert.NoError(suite.T(), err)

sender, err = qdb.NewLineSender(ctx, qdb.WithHttp(), qdb.WithAddress(questdbC.httpAddress))
assert.NoError(suite.T(), err)

assert.Eventually(suite.T(), func() bool {
return sender.Ping(ctx) != nil
}, time.Second, 100*time.Millisecond)

sender.Close(ctx)
questdbC.Stop(ctx)

}

func (suite *integrationTestSuite) TestHttpsBasicAuthPingSuccess() {
if testing.Short() {
suite.T().Skip("skipping integration test")
}

ctx := context.Background()

var (
sender qdb.LineSender
err error
)

questdbC, err := setupQuestDB(ctx, httpBasicAuth)
assert.NoError(suite.T(), err)

sender, err = qdb.NewLineSender(ctx, qdb.WithHttp(), qdb.WithAddress(questdbC.httpAddress))
assert.NoError(suite.T(), err)

assert.Eventually(suite.T(), func() bool {
return sender.Ping(ctx) != nil
}, time.Second, 100*time.Millisecond)

sender.Close(ctx)
questdbC.Stop(ctx)

}
60 changes: 48 additions & 12 deletions http_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ type httpLineSender struct {
pass string
token string

client http.Client
uri string
closed bool
client http.Client
uri string
pingUri string
closed bool

// Global transport is used unless a custom transport was provided.
globalTransport *globalHttpTransport
Expand Down Expand Up @@ -156,11 +157,13 @@ func newHttpLineSender(conf *lineSenderConfig) (*httpLineSender, error) {
s.globalTransport.RegisterClient()
}

s.uri = "http"
var protocol = "http"
if conf.tlsMode != tlsDisabled {
s.uri += "s"
protocol += "s"
}
s.uri += fmt.Sprintf("://%s/write", s.address)

s.uri = fmt.Sprintf("%s://%s/write", protocol, s.address)
s.pingUri = fmt.Sprintf("%s://%s/ping", protocol, s.address)

return s, nil
}
Expand Down Expand Up @@ -337,23 +340,56 @@ func (s *httpLineSender) At(ctx context.Context, ts time.Time) error {
return nil
}

// makeRequest returns a boolean if we need to retry the request
func (s *httpLineSender) makeRequest(ctx context.Context) (bool, error) {
func (s *httpLineSender) Ping(ctx context.Context) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should execute a ping when we initialize an HTTP sender? Then we won't need a dedicated method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that seems reasonable and keeps the interface clean.

I would need to test how well this integrates into RedPanda Connect but I suspect that it's just a matter of where we return an error on failed ping.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting a bit hairy now when running integration tests. I've noticed the behavior on startup where QuestDB responds at / with 200, but the /ping endpoint still 404s (probably because the database isn't fully initialized yet).

In my older tests (with the explicit .Ping()), I used an eventually assertion when testing the Ping() function to account for this.

But now with ping baked into the constructor, we don't have that option. Unless we retry ping in the sender constructor? That seems like an anti-pattern to me, but maybe it's worth creating an option for? But then that option would be at the interface level so we would need to invalidate it for tcp senders, which is ugly, and part of the reason why we're baking it into the http sender's constructor in the first place... wdyt?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could check for retriable errors and retry for retry_timeout, just like we do it in Flush. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love it because 404 is actually an important error, and could be the result of user misconfiguration of a reverse proxy, for example. I'll need a bit of time to think about this.

In the meantime, ping might not even be necessary for RedPanda Connect compatibility if we decide to go with a sender pool implementation. Using this pool, senders will be created and released during a WriteBatch call instead of cached during the Connect phase (see https://pkg.go.dev/github.com/benthosdev/benthos/[email protected]/public/service#example-package-OutputBatchedPlugin and https://pkg.go.dev/github.com/benthosdev/benthos/[email protected]/public/service#example-package-OutputBatchedPlugin for a bit more detail)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's think a bit more. Maybe we can come up with a better idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@puzpuzpuz I think it's time that we revisit this discussion. When running in Redpanda Connect, our client can emit some cryptic messages when we fail to successfully flush, making it difficult to diagnose and fix the underlying issue. I've encountered this when (mis)configuring TLS, and can imagine other users having problems, especially when setting this up for the first time.

I tried writing some code in our connector, but it didn't feel like the right place because we already have the following information stored in the client:

  1. QuestDB address
  2. Authentication (username/password or token)
  3. TLS settings

Maybe we incorporate this into the LineSenderPool by adding a Ping() method. This would keep the feature http-only AND we would not be modifying the LineSender interface. It's cleaner for me (at least in this case) to have a separate method so I can call it in the connector's Connect method.

wdyt?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about sending the ping when you create a new pooled sender? Ping method available on the pool would look weird IMO. It should be available on the sender itself, but indeed this doesn't play nicely with our common Sender interface.

buf := bytes.Buffer{}
success := http.StatusNoContent

req, err := http.NewRequest(
http.MethodPost,
s.uri,
bytes.NewReader(s.buf.Bytes()),
s.pingUri,
&buf,
)
if err != nil {
return false, err
return err
}
req.ContentLength = int64(s.BufLen())

s.setAuth(req)

resp, err := s.client.Do(req)
if err != nil {
return err
}

if resp.StatusCode != success {
return fmt.Errorf("ping returned status %d, expected %d", resp.StatusCode, success)
}

return nil

}

func (s *httpLineSender) setAuth(req *http.Request) {
if s.user != "" && s.pass != "" {
req.SetBasicAuth(s.user, s.pass)
} else if s.token != "" {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.token))
}
}

// makeRequest returns a boolean if we need to retry the request
func (s *httpLineSender) makeRequest(ctx context.Context) (bool, error) {
req, err := http.NewRequest(
http.MethodPost,
s.uri,
bytes.NewReader(s.buf.Bytes()),
)
if err != nil {
return false, err
}

s.setAuth(req)

req.ContentLength = int64(s.BufLen())

// reqTimeout = ( request.len() / request_min_throughput ) + request_timeout
// nb: conversion from int to time.Duration is in milliseconds
Expand Down
6 changes: 6 additions & 0 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ type LineSender interface {
// If auto-flush is enabled, the client will flush any remaining buffered
// messages before closing itself.
Close(ctx context.Context) error

// Ping will send an HTTP request to the server's /ping path
// to test connectivity.
//
// Valid only for HTTP senders. TCP senders will return an error
Ping(ctx context.Context) error
}

const (
Expand Down
4 changes: 4 additions & 0 deletions tcp_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ func (s *tcpLineSender) At(ctx context.Context, ts time.Time) error {
return nil
}

func (s *tcpLineSender) Ping(ctx context.Context) error {
return errors.New("ping not supported for tcp senders")
}

// Messages returns a copy of accumulated ILP messages that are not
// flushed to the TCP connection yet. Useful for debugging purposes.
func (s *tcpLineSender) Messages() string {
Expand Down
13 changes: 13 additions & 0 deletions tcp_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,19 @@ func TestErrorOnContextDeadline(t *testing.T) {
t.Fail()
}

func TestTcpPingReturnsError(t *testing.T) {
ctx := context.Background()
srv, err := newTestTcpServer(readAndDiscard)
assert.NoError(t, err)
defer srv.Close()

sender, err := qdb.NewLineSender(ctx, qdb.WithTcp(), qdb.WithAddress(srv.Addr()))
assert.NoError(t, err)
defer sender.Close(ctx)

assert.ErrorContains(t, sender.Ping(ctx), "ping not supported")
}

func BenchmarkLineSenderBatch1000(b *testing.B) {
ctx := context.Background()

Expand Down
4 changes: 3 additions & 1 deletion test/haproxy.cfg
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
global
maxconn 256

frontend httpfront
bind 0.0.0.0:8888
mode http
Expand Down Expand Up @@ -26,4 +29,3 @@ frontend httpbasicauthfront
mode http
http-request auth unless { http_auth(httpcreds) }
default_backend http

Loading