Skip to content

Commit 824efa8

Browse files
authored
Merge branch 'main' into steve/add-ping
2 parents 36803c2 + 9c27421 commit 824efa8

13 files changed

+848
-81
lines changed

README.md

+81-10
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Golang client for QuestDB's [Influx Line Protocol](https://questdb.io/docs/refer
99
The library requires Go 1.19 or newer.
1010

1111
Features:
12-
* Context-aware API.
12+
* [Context](https://www.digitalocean.com/community/tutorials/how-to-use-contexts-in-go)-aware API.
1313
* Optimized for batch writes.
1414
* Supports TLS encryption and ILP authentication.
1515
* Automatic write retries and connection reuse for ILP over HTTP.
@@ -42,24 +42,41 @@ func main() {
4242
log.Fatal(err)
4343
}
4444
// Make sure to close the sender on exit to release resources.
45-
defer sender.Close()
45+
defer sender.Close(ctx)
46+
4647
// Send a few ILP messages.
48+
tradedTs, err := time.Parse(time.RFC3339, "2022-08-06T15:04:05.123456Z")
49+
if err != nil {
50+
log.Fatal(err)
51+
}
4752
err = sender.
48-
Table("trades").
49-
Symbol("name", "test_ilp1").
50-
Float64Column("value", 12.4).
51-
AtNow(ctx)
53+
Table("trades_go").
54+
Symbol("pair", "USDGBP").
55+
Symbol("type", "buy").
56+
Float64Column("traded_price", 0.83).
57+
Float64Column("limit_price", 0.84).
58+
Int64Column("qty", 100).
59+
At(ctx, tradedTs)
60+
if err != nil {
61+
log.Fatal(err)
62+
}
63+
64+
tradedTs, err = time.Parse(time.RFC3339, "2022-08-06T15:04:06.987654Z")
5265
if err != nil {
5366
log.Fatal(err)
5467
}
5568
err = sender.
56-
Table("trades").
57-
Symbol("name", "test_ilp2").
58-
Float64Column("value", 11.4).
59-
At(ctx, time.Now().UnixNano())
69+
Table("trades_go").
70+
Symbol("pair", "GBPJPY").
71+
Symbol("type", "sell").
72+
Float64Column("traded_price", 135.97).
73+
Float64Column("limit_price", 0.84).
74+
Int64Column("qty", 400).
75+
At(ctx, tradedTs)
6076
if err != nil {
6177
log.Fatal(err)
6278
}
79+
6380
// Make sure that the messages are sent over the network.
6481
err = sender.Flush(ctx)
6582
if err != nil {
@@ -75,6 +92,60 @@ To connect via TCP, set the configuration string to:
7592
// ...
7693
```
7794

95+
## Pooled Line Senders
96+
97+
**Warning: Experimental feature designed for use with HTTP senders ONLY**
98+
99+
Version 3 of the client introduces a `LineSenderPool`, which provides a mechanism
100+
to pool previously-used `LineSender`s so they can be reused without having
101+
to allocate and instantiate new senders.
102+
103+
A LineSenderPool is thread-safe and can be used to concurrently obtain senders
104+
across multiple goroutines.
105+
106+
Since `LineSender`s must be used in a single-threaded context, a typical pattern is to Acquire
107+
a sender from a `LineSenderPool` at the beginning of a goroutine and use a deferred
108+
execution block to Close the sender at the end of the goroutine.
109+
110+
Here is an example of the `LineSenderPool` Acquire, Release, and Close semantics:
111+
112+
```go
113+
package main
114+
115+
import (
116+
"context"
117+
118+
qdb "github.com/questdb/go-questdb-client/v3"
119+
)
120+
121+
func main() {
122+
ctx := context.TODO()
123+
124+
pool := qdb.PoolFromConf("http::addr=localhost:9000")
125+
defer func() {
126+
err := pool.Close(ctx)
127+
if err != nil {
128+
panic(err)
129+
}
130+
}()
131+
132+
sender, err := pool.Sender(ctx)
133+
if err != nil {
134+
panic(err)
135+
}
136+
137+
sender.Table("prices").
138+
Symbol("ticker", "AAPL").
139+
Float64Column("price", 123.45).
140+
AtNow(ctx)
141+
142+
// Close call returns the sender back to the pool
143+
if err := sender.Close(ctx); err != nil {
144+
panic(err)
145+
}
146+
}
147+
```
148+
78149
## Migration from v2
79150

80151
v2 code example:

conf_parse.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type configData struct {
3737
}
3838

3939
func confFromStr(conf string) (*lineSenderConfig, error) {
40-
senderConf := &lineSenderConfig{}
40+
var senderConf *lineSenderConfig
4141

4242
data, err := parseConfigStr(conf)
4343
if err != nil {
@@ -46,14 +46,14 @@ func confFromStr(conf string) (*lineSenderConfig, error) {
4646

4747
switch data.Schema {
4848
case "http":
49-
senderConf.senderType = httpSenderType
49+
senderConf = newLineSenderConfig(httpSenderType)
5050
case "https":
51-
senderConf.senderType = httpSenderType
51+
senderConf = newLineSenderConfig(httpSenderType)
5252
senderConf.tlsMode = tlsEnabled
5353
case "tcp":
54-
senderConf.senderType = tcpSenderType
54+
senderConf = newLineSenderConfig(tcpSenderType)
5555
case "tcps":
56-
senderConf.senderType = tcpSenderType
56+
senderConf = newLineSenderConfig(tcpSenderType)
5757
senderConf.tlsMode = tlsEnabled
5858
default:
5959
return nil, fmt.Errorf("invalid schema: %s", data.Schema)
@@ -90,6 +90,7 @@ func confFromStr(conf string) (*lineSenderConfig, error) {
9090
case "token_y":
9191
// Some clients require public key.
9292
// But since Go sender doesn't need it, we ignore the values.
93+
continue
9394
case "auto_flush":
9495
if v == "off" {
9596
senderConf.autoFlushRows = 0

conf_test.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,15 @@ func TestHappyCasesFromConf(t *testing.T) {
432432
actual, err := qdb.ConfFromStr(tc.config)
433433
assert.NoError(t, err)
434434

435-
expected := &qdb.LineSenderConfig{}
435+
var expected *qdb.LineSenderConfig
436+
switch tc.config[0] {
437+
case 'h':
438+
expected = qdb.NewLineSenderConfig(qdb.HttpSenderType)
439+
case 't':
440+
expected = qdb.NewLineSenderConfig(qdb.TcpSenderType)
441+
default:
442+
assert.FailNow(t, "happy case configs must start with either 'http' or 'tcp'")
443+
}
436444
for _, opt := range tc.expectedOpts {
437445
opt(expected)
438446
}

export_test.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,16 @@ type (
2929
ConfigData = configData
3030
TcpLineSender = tcpLineSender
3131
LineSenderConfig = lineSenderConfig
32+
SenderType = senderType
3233
)
3334

3435
var (
35-
GlobalTransport = globalTransport
36+
GlobalTransport = globalTransport
37+
NoSenderType SenderType = noSenderType
38+
HttpSenderType SenderType = httpSenderType
39+
TcpSenderType SenderType = tcpSenderType
40+
DefaultAutoFlushInterval = defaultAutoFlushInterval
41+
DefaultAutoFlushRows = defaultAutoFlushRows
3642
)
3743

3844
func NewBuffer(initBufSize int, maxBufSize int, fileNameLimit int) Buffer {
@@ -58,6 +64,10 @@ func Messages(s LineSender) string {
5864
}
5965

6066
func MsgCount(s LineSender) int {
67+
if ps, ok := s.(*pooledSender); ok {
68+
hs, _ := ps.wrapped.(*httpLineSender)
69+
return hs.MsgCount()
70+
}
6171
if hs, ok := s.(*httpLineSender); ok {
6272
return hs.MsgCount()
6373
}
@@ -76,3 +86,7 @@ func BufLen(s LineSender) int {
7686
}
7787
panic("unexpected struct")
7888
}
89+
90+
func NewLineSenderConfig(t SenderType) *LineSenderConfig {
91+
return newLineSenderConfig(t)
92+
}

http_sender.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"context"
3030
"crypto/tls"
3131
"encoding/json"
32-
"errors"
3332
"fmt"
3433
"io"
3534
"math/big"
@@ -178,7 +177,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
178177
)
179178

180179
if s.closed {
181-
return errors.New("cannot flush a closed LineSender")
180+
return errClosedSenderFlush
182181
}
183182

184183
err := s.buf.LastErr()
@@ -189,7 +188,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
189188
}
190189
if s.buf.HasTable() {
191190
s.buf.DiscardPendingMsg()
192-
return errors.New("pending ILP message must be finalized with At or AtNow before calling Flush")
191+
return errFlushWithPendingMessage
193192
}
194193

195194
if s.buf.msgCount == 0 {
@@ -287,7 +286,7 @@ func (s *httpLineSender) BoolColumn(name string, val bool) LineSender {
287286

288287
func (s *httpLineSender) Close(ctx context.Context) error {
289288
if s.closed {
290-
return nil
289+
return errDoubleSenderClose
291290
}
292291

293292
var err error
@@ -311,7 +310,7 @@ func (s *httpLineSender) AtNow(ctx context.Context) error {
311310

312311
func (s *httpLineSender) At(ctx context.Context, ts time.Time) error {
313312
if s.closed {
314-
return errors.New("cannot queue new messages on a closed LineSender")
313+
return errClosedSenderAt
315314
}
316315

317316
sendTs := true

http_sender_test.go

+46-9
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,12 @@ func TestRowBasedAutoFlushWithTimeBasedFlushDisabled(t *testing.T) {
502502

503503
assert.Equal(t, autoFlushRows-1, qdb.MsgCount(sender))
504504

505+
// Sleep past the default interval
506+
time.Sleep(qdb.DefaultAutoFlushInterval + time.Millisecond)
507+
508+
// Check that the number of messages hasn't changed
509+
assert.Equal(t, autoFlushRows-1, qdb.MsgCount(sender))
510+
505511
// Send one additional message and ensure that all are flushed
506512
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
507513
assert.NoError(t, err)
@@ -511,8 +517,6 @@ func TestRowBasedAutoFlushWithTimeBasedFlushDisabled(t *testing.T) {
511517

512518
func TestNoFlushWhenAutoFlushDisabled(t *testing.T) {
513519
ctx := context.Background()
514-
autoFlushRows := 10
515-
autoFlushInterval := time.Duration(autoFlushRows-1) * time.Millisecond
516520

517521
srv, err := newTestHttpServer(readAndDiscard)
518522
assert.NoError(t, err)
@@ -524,21 +528,54 @@ func TestNoFlushWhenAutoFlushDisabled(t *testing.T) {
524528
ctx,
525529
qdb.WithHttp(),
526530
qdb.WithAddress(srv.Addr()),
527-
qdb.WithAutoFlushRows(autoFlushRows),
528-
qdb.WithAutoFlushInterval(autoFlushInterval),
531+
qdb.WithAutoFlushRows(qdb.DefaultAutoFlushRows),
532+
qdb.WithAutoFlushInterval(qdb.DefaultAutoFlushInterval),
529533
qdb.WithAutoFlushDisabled(),
530534
)
531535
assert.NoError(t, err)
532536
defer sender.Close(ctx)
533537

534538
// Send autoFlushRows + 1 messages and ensure all are buffered
535-
for i := 0; i < autoFlushRows+1; i++ {
539+
for i := 0; i < qdb.DefaultAutoFlushRows+1; i++ {
536540
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
537541
assert.NoError(t, err)
538-
time.Sleep(time.Millisecond)
539542
}
540543

541-
assert.Equal(t, autoFlushRows+1, qdb.MsgCount(sender))
544+
// Sleep past the default interval
545+
time.Sleep(qdb.DefaultAutoFlushInterval + time.Millisecond)
546+
547+
assert.Equal(t, qdb.DefaultAutoFlushRows+1, qdb.MsgCount(sender))
548+
}
549+
550+
func TestNoFlushWhenAutoFlushRowsAndIntervalAre0(t *testing.T) {
551+
ctx := context.Background()
552+
553+
srv, err := newTestHttpServer(readAndDiscard)
554+
assert.NoError(t, err)
555+
defer srv.Close()
556+
557+
// opts are processed sequentially, so AutoFlushDisabled will
558+
// override AutoFlushRows
559+
sender, err := qdb.NewLineSender(
560+
ctx,
561+
qdb.WithHttp(),
562+
qdb.WithAddress(srv.Addr()),
563+
qdb.WithAutoFlushRows(0),
564+
qdb.WithAutoFlushInterval(0),
565+
)
566+
assert.NoError(t, err)
567+
defer sender.Close(ctx)
568+
569+
// Send autoFlushRows + 1 messages and ensure all are buffered
570+
for i := 0; i < qdb.DefaultAutoFlushRows+1; i++ {
571+
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
572+
assert.NoError(t, err)
573+
}
574+
575+
// Sleep past the default interval
576+
time.Sleep(qdb.DefaultAutoFlushInterval + time.Millisecond)
577+
578+
assert.Equal(t, qdb.DefaultAutoFlushRows+1, qdb.MsgCount(sender))
542579
}
543580

544581
func TestSenderDoubleClose(t *testing.T) {
@@ -564,7 +601,7 @@ func TestSenderDoubleClose(t *testing.T) {
564601
assert.NoError(t, err)
565602

566603
err = sender.Close(ctx)
567-
assert.NoError(t, err)
604+
assert.Error(t, err)
568605
}
569606

570607
func TestErrorOnFlushWhenSenderIsClosed(t *testing.T) {
@@ -625,7 +662,7 @@ func TestNoFlushWhenSenderIsClosedAndAutoFlushIsDisabled(t *testing.T) {
625662

626663
err = sender.Close(ctx)
627664
assert.NoError(t, err)
628-
assert.Empty(t, qdb.Messages(sender))
665+
assert.NotEmpty(t, qdb.Messages(sender))
629666
}
630667

631668
func TestSuccessAfterRetries(t *testing.T) {

integration_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func setupQuestDB0(ctx context.Context, auth ilpAuthType, setupProxy bool) (*que
192192
)
193193
if setupProxy || auth == httpBasicAuth || auth == httpBearerAuth {
194194
req = testcontainers.ContainerRequest{
195-
Image: "haproxy:2.6.0",
195+
Image: "haproxy:2.6.4",
196196
ExposedPorts: []string{"8443/tcp", "8444/tcp", "8445/tcp", "8888/tcp"},
197197
WaitingFor: wait.ForHTTP("/").WithPort("8888"),
198198
Networks: []string{networkName},

0 commit comments

Comments
 (0)