Skip to content

Commit f37c2dc

Browse files
committed
Merge remote-tracking branch 'origin/main' into jv_change_code_examples
2 parents 0bccb77 + 9c27421 commit f37c2dc

13 files changed

+844
-73
lines changed

README.md

+75-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Golang client for QuestDB's [Influx Line Protocol](https://questdb.io/docs/refer
99
The library requires Go 1.19 or newer.
1010

1111
Features:
12-
* Context-aware API.
12+
* [Context](https://www.digitalocean.com/community/tutorials/how-to-use-contexts-in-go)-aware API.
1313
* Optimized for batch writes.
1414
* Supports TLS encryption and ILP authentication.
1515
* Automatic write retries and connection reuse for ILP over HTTP.
@@ -50,6 +50,8 @@ func main() {
5050
Float64Column("price", 2615.54).
5151
Float64Column("amount", 0.00044).
5252
AtNow(ctx) // timestamp will be set at the server side
53+
54+
tradedTs, err := time.Parse(time.RFC3339, "2022-08-06T15:04:05.123456Z")
5355
if err != nil {
5456
log.Fatal(err)
5557
}
@@ -61,10 +63,27 @@ func main() {
6163
Symbol("side", "sell").
6264
Float64Column("price", 39269.98).
6365
Float64Column("amount", 0.001).
64-
At(ctx, time.Now())
66+
At(ctx, tradedTs)
67+
if err != nil {
68+
log.Fatal(err)
69+
}
70+
71+
tradedTs, err = time.Parse(time.RFC3339, "2022-08-06T15:04:06.987654Z")
6572
if err != nil {
6673
log.Fatal(err)
6774
}
75+
err = sender.
76+
Table("trades_go").
77+
Symbol("pair", "GBPJPY").
78+
Symbol("type", "sell").
79+
Float64Column("traded_price", 135.97).
80+
Float64Column("limit_price", 0.84).
81+
Int64Column("qty", 400).
82+
At(ctx, tradedTs)
83+
if err != nil {
84+
log.Fatal(err)
85+
}
86+
6887
// Make sure that the messages are sent over the network.
6988
err = sender.Flush(ctx)
7089
if err != nil {
@@ -80,6 +99,60 @@ HTTP is the recommended transport to use. To connect via TCP, set the configurat
8099
// ...
81100
```
82101

102+
## Pooled Line Senders
103+
104+
**Warning: Experimental feature designed for use with HTTP senders ONLY**
105+
106+
Version 3 of the client introduces a `LineSenderPool`, which provides a mechanism
107+
to pool previously-used `LineSender`s so they can be reused without having
108+
to allocate and instantiate new senders.
109+
110+
A LineSenderPool is thread-safe and can be used to concurrently obtain senders
111+
across multiple goroutines.
112+
113+
Since `LineSender`s must be used in a single-threaded context, a typical pattern is to Acquire
114+
a sender from a `LineSenderPool` at the beginning of a goroutine and use a deferred
115+
execution block to Close the sender at the end of the goroutine.
116+
117+
Here is an example of the `LineSenderPool` Acquire, Release, and Close semantics:
118+
119+
```go
120+
package main
121+
122+
import (
123+
"context"
124+
125+
qdb "github.com/questdb/go-questdb-client/v3"
126+
)
127+
128+
func main() {
129+
ctx := context.TODO()
130+
131+
pool := qdb.PoolFromConf("http::addr=localhost:9000")
132+
defer func() {
133+
err := pool.Close(ctx)
134+
if err != nil {
135+
panic(err)
136+
}
137+
}()
138+
139+
sender, err := pool.Sender(ctx)
140+
if err != nil {
141+
panic(err)
142+
}
143+
144+
sender.Table("prices").
145+
Symbol("ticker", "AAPL").
146+
Float64Column("price", 123.45).
147+
AtNow(ctx)
148+
149+
// Close call returns the sender back to the pool
150+
if err := sender.Close(ctx); err != nil {
151+
panic(err)
152+
}
153+
}
154+
```
155+
83156
## Migration from v2
84157

85158
v2 code example:

conf_parse.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type configData struct {
3737
}
3838

3939
func confFromStr(conf string) (*lineSenderConfig, error) {
40-
senderConf := &lineSenderConfig{}
40+
var senderConf *lineSenderConfig
4141

4242
data, err := parseConfigStr(conf)
4343
if err != nil {
@@ -46,14 +46,14 @@ func confFromStr(conf string) (*lineSenderConfig, error) {
4646

4747
switch data.Schema {
4848
case "http":
49-
senderConf.senderType = httpSenderType
49+
senderConf = newLineSenderConfig(httpSenderType)
5050
case "https":
51-
senderConf.senderType = httpSenderType
51+
senderConf = newLineSenderConfig(httpSenderType)
5252
senderConf.tlsMode = tlsEnabled
5353
case "tcp":
54-
senderConf.senderType = tcpSenderType
54+
senderConf = newLineSenderConfig(tcpSenderType)
5555
case "tcps":
56-
senderConf.senderType = tcpSenderType
56+
senderConf = newLineSenderConfig(tcpSenderType)
5757
senderConf.tlsMode = tlsEnabled
5858
default:
5959
return nil, fmt.Errorf("invalid schema: %s", data.Schema)
@@ -90,6 +90,7 @@ func confFromStr(conf string) (*lineSenderConfig, error) {
9090
case "token_y":
9191
// Some clients require public key.
9292
// But since Go sender doesn't need it, we ignore the values.
93+
continue
9394
case "auto_flush":
9495
if v == "off" {
9596
senderConf.autoFlushRows = 0

conf_test.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,15 @@ func TestHappyCasesFromConf(t *testing.T) {
432432
actual, err := qdb.ConfFromStr(tc.config)
433433
assert.NoError(t, err)
434434

435-
expected := &qdb.LineSenderConfig{}
435+
var expected *qdb.LineSenderConfig
436+
switch tc.config[0] {
437+
case 'h':
438+
expected = qdb.NewLineSenderConfig(qdb.HttpSenderType)
439+
case 't':
440+
expected = qdb.NewLineSenderConfig(qdb.TcpSenderType)
441+
default:
442+
assert.FailNow(t, "happy case configs must start with either 'http' or 'tcp'")
443+
}
436444
for _, opt := range tc.expectedOpts {
437445
opt(expected)
438446
}

export_test.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,16 @@ type (
2929
ConfigData = configData
3030
TcpLineSender = tcpLineSender
3131
LineSenderConfig = lineSenderConfig
32+
SenderType = senderType
3233
)
3334

3435
var (
35-
GlobalTransport = globalTransport
36+
GlobalTransport = globalTransport
37+
NoSenderType SenderType = noSenderType
38+
HttpSenderType SenderType = httpSenderType
39+
TcpSenderType SenderType = tcpSenderType
40+
DefaultAutoFlushInterval = defaultAutoFlushInterval
41+
DefaultAutoFlushRows = defaultAutoFlushRows
3642
)
3743

3844
func NewBuffer(initBufSize int, maxBufSize int, fileNameLimit int) Buffer {
@@ -58,6 +64,10 @@ func Messages(s LineSender) string {
5864
}
5965

6066
func MsgCount(s LineSender) int {
67+
if ps, ok := s.(*pooledSender); ok {
68+
hs, _ := ps.wrapped.(*httpLineSender)
69+
return hs.MsgCount()
70+
}
6171
if hs, ok := s.(*httpLineSender); ok {
6272
return hs.MsgCount()
6373
}
@@ -76,3 +86,7 @@ func BufLen(s LineSender) int {
7686
}
7787
panic("unexpected struct")
7888
}
89+
90+
func NewLineSenderConfig(t SenderType) *LineSenderConfig {
91+
return newLineSenderConfig(t)
92+
}

http_sender.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"context"
3030
"crypto/tls"
3131
"encoding/json"
32-
"errors"
3332
"fmt"
3433
"io"
3534
"math/big"
@@ -176,7 +175,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
176175
)
177176

178177
if s.closed {
179-
return errors.New("cannot flush a closed LineSender")
178+
return errClosedSenderFlush
180179
}
181180

182181
err := s.buf.LastErr()
@@ -187,7 +186,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
187186
}
188187
if s.buf.HasTable() {
189188
s.buf.DiscardPendingMsg()
190-
return errors.New("pending ILP message must be finalized with At or AtNow before calling Flush")
189+
return errFlushWithPendingMessage
191190
}
192191

193192
if s.buf.msgCount == 0 {
@@ -285,7 +284,7 @@ func (s *httpLineSender) BoolColumn(name string, val bool) LineSender {
285284

286285
func (s *httpLineSender) Close(ctx context.Context) error {
287286
if s.closed {
288-
return nil
287+
return errDoubleSenderClose
289288
}
290289

291290
var err error
@@ -309,7 +308,7 @@ func (s *httpLineSender) AtNow(ctx context.Context) error {
309308

310309
func (s *httpLineSender) At(ctx context.Context, ts time.Time) error {
311310
if s.closed {
312-
return errors.New("cannot queue new messages on a closed LineSender")
311+
return errClosedSenderAt
313312
}
314313

315314
sendTs := true

http_sender_test.go

+46-9
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,12 @@ func TestRowBasedAutoFlushWithTimeBasedFlushDisabled(t *testing.T) {
502502

503503
assert.Equal(t, autoFlushRows-1, qdb.MsgCount(sender))
504504

505+
// Sleep past the default interval
506+
time.Sleep(qdb.DefaultAutoFlushInterval + time.Millisecond)
507+
508+
// Check that the number of messages hasn't changed
509+
assert.Equal(t, autoFlushRows-1, qdb.MsgCount(sender))
510+
505511
// Send one additional message and ensure that all are flushed
506512
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
507513
assert.NoError(t, err)
@@ -511,8 +517,6 @@ func TestRowBasedAutoFlushWithTimeBasedFlushDisabled(t *testing.T) {
511517

512518
func TestNoFlushWhenAutoFlushDisabled(t *testing.T) {
513519
ctx := context.Background()
514-
autoFlushRows := 10
515-
autoFlushInterval := time.Duration(autoFlushRows-1) * time.Millisecond
516520

517521
srv, err := newTestHttpServer(readAndDiscard)
518522
assert.NoError(t, err)
@@ -524,21 +528,54 @@ func TestNoFlushWhenAutoFlushDisabled(t *testing.T) {
524528
ctx,
525529
qdb.WithHttp(),
526530
qdb.WithAddress(srv.Addr()),
527-
qdb.WithAutoFlushRows(autoFlushRows),
528-
qdb.WithAutoFlushInterval(autoFlushInterval),
531+
qdb.WithAutoFlushRows(qdb.DefaultAutoFlushRows),
532+
qdb.WithAutoFlushInterval(qdb.DefaultAutoFlushInterval),
529533
qdb.WithAutoFlushDisabled(),
530534
)
531535
assert.NoError(t, err)
532536
defer sender.Close(ctx)
533537

534538
// Send autoFlushRows + 1 messages and ensure all are buffered
535-
for i := 0; i < autoFlushRows+1; i++ {
539+
for i := 0; i < qdb.DefaultAutoFlushRows+1; i++ {
536540
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
537541
assert.NoError(t, err)
538-
time.Sleep(time.Millisecond)
539542
}
540543

541-
assert.Equal(t, autoFlushRows+1, qdb.MsgCount(sender))
544+
// Sleep past the default interval
545+
time.Sleep(qdb.DefaultAutoFlushInterval + time.Millisecond)
546+
547+
assert.Equal(t, qdb.DefaultAutoFlushRows+1, qdb.MsgCount(sender))
548+
}
549+
550+
func TestNoFlushWhenAutoFlushRowsAndIntervalAre0(t *testing.T) {
551+
ctx := context.Background()
552+
553+
srv, err := newTestHttpServer(readAndDiscard)
554+
assert.NoError(t, err)
555+
defer srv.Close()
556+
557+
// opts are processed sequentially, so AutoFlushDisabled will
558+
// override AutoFlushRows
559+
sender, err := qdb.NewLineSender(
560+
ctx,
561+
qdb.WithHttp(),
562+
qdb.WithAddress(srv.Addr()),
563+
qdb.WithAutoFlushRows(0),
564+
qdb.WithAutoFlushInterval(0),
565+
)
566+
assert.NoError(t, err)
567+
defer sender.Close(ctx)
568+
569+
// Send autoFlushRows + 1 messages and ensure all are buffered
570+
for i := 0; i < qdb.DefaultAutoFlushRows+1; i++ {
571+
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
572+
assert.NoError(t, err)
573+
}
574+
575+
// Sleep past the default interval
576+
time.Sleep(qdb.DefaultAutoFlushInterval + time.Millisecond)
577+
578+
assert.Equal(t, qdb.DefaultAutoFlushRows+1, qdb.MsgCount(sender))
542579
}
543580

544581
func TestSenderDoubleClose(t *testing.T) {
@@ -564,7 +601,7 @@ func TestSenderDoubleClose(t *testing.T) {
564601
assert.NoError(t, err)
565602

566603
err = sender.Close(ctx)
567-
assert.NoError(t, err)
604+
assert.Error(t, err)
568605
}
569606

570607
func TestErrorOnFlushWhenSenderIsClosed(t *testing.T) {
@@ -625,7 +662,7 @@ func TestNoFlushWhenSenderIsClosedAndAutoFlushIsDisabled(t *testing.T) {
625662

626663
err = sender.Close(ctx)
627664
assert.NoError(t, err)
628-
assert.Empty(t, qdb.Messages(sender))
665+
assert.NotEmpty(t, qdb.Messages(sender))
629666
}
630667

631668
func TestSuccessAfterRetries(t *testing.T) {

integration_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func setupQuestDB0(ctx context.Context, auth ilpAuthType, setupProxy bool) (*que
192192
)
193193
if setupProxy || auth == httpBasicAuth || auth == httpBearerAuth {
194194
req = testcontainers.ContainerRequest{
195-
Image: "haproxy:2.6.0",
195+
Image: "haproxy:2.6.4",
196196
ExposedPorts: []string{"8443/tcp", "8444/tcp", "8445/tcp", "8888/tcp"},
197197
WaitingFor: wait.ForHTTP("/").WithPort("8888"),
198198
Networks: []string{networkName},

0 commit comments

Comments
 (0)