Skip to content

Commit aa2e889

Browse files
committed
Improve error handling
1 parent 1b442eb commit aa2e889

File tree

4 files changed

+28
-13
lines changed

4 files changed

+28
-13
lines changed

http_sender.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"context"
3030
"crypto/tls"
3131
"encoding/json"
32-
"errors"
3332
"fmt"
3433
"io"
3534
"math/big"
@@ -176,7 +175,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
176175
)
177176

178177
if s.closed {
179-
return errors.New("cannot flush a closed LineSender")
178+
return errClosedSenderFlush
180179
}
181180

182181
err := s.buf.LastErr()
@@ -187,7 +186,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
187186
}
188187
if s.buf.HasTable() {
189188
s.buf.DiscardPendingMsg()
190-
return errors.New("pending ILP message must be finalized with At or AtNow before calling Flush")
189+
return errFlushWithPendingMessage
191190
}
192191

193192
if s.buf.msgCount == 0 {
@@ -285,7 +284,7 @@ func (s *httpLineSender) BoolColumn(name string, val bool) LineSender {
285284

286285
func (s *httpLineSender) Close(ctx context.Context) error {
287286
if s.closed {
288-
return errors.New("double http sender close")
287+
return errDoubleSenderClose
289288
}
290289

291290
var err error
@@ -309,7 +308,7 @@ func (s *httpLineSender) AtNow(ctx context.Context) error {
309308

310309
func (s *httpLineSender) At(ctx context.Context, ts time.Time) error {
311310
if s.closed {
312-
return errors.New("cannot queue new messages on a closed LineSender")
311+
return errClosedSenderAt
313312
}
314313

315314
sendTs := true

sender.go

+7
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ import (
3535
"time"
3636
)
3737

38+
var (
39+
errClosedSenderFlush = errors.New("cannot flush a closed LineSender")
40+
errFlushWithPendingMessage = errors.New("pending ILP message must be finalized with At or AtNow before calling Flush")
41+
errClosedSenderAt = errors.New("cannot queue new messages on a closed LineSender")
42+
errDoubleSenderClose = errors.New("double sender close")
43+
)
44+
3845
// LineSender allows you to insert rows into QuestDB by sending ILP
3946
// messages over HTTP or TCP protocol.
4047
//

sender_pool.go

+15-5
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ import (
3535
"time"
3636
)
3737

38+
var (
39+
errAcquireFromClosedPool = errors.New("cannot acquire a LineSender from a closed LineSenderPool")
40+
errHttpOnlySender = errors.New("tcp/s not supported for pooled senders, use http/s only")
41+
errPooledSenderClose = errors.New("error closing one or more LineSenders in the pool")
42+
)
43+
3844
// LineSenderPool wraps a mutex-protected slice of [LineSender]. It allows a goroutine to
3945
// Acquire a sender from the pool and Release it back to the pool when it's done being used.
4046
//
@@ -72,7 +78,7 @@ type LineSenderPoolOption func(*LineSenderPool)
7278
// [WithMaxSenders] option.
7379
func PoolFromConf(conf string, opts ...LineSenderPoolOption) (*LineSenderPool, error) {
7480
if strings.HasPrefix(conf, "tcp") {
75-
return nil, errors.New("tcp/s not supported for pooled senders, use http/s only")
81+
return nil, errHttpOnlySender
7682
}
7783

7884
pool := &LineSenderPool{
@@ -145,14 +151,18 @@ func (p *LineSenderPool) Sender(ctx context.Context) (LineSender, error) {
145151
defer p.mu.Unlock()
146152

147153
if p.closed {
148-
return nil, errors.New("cannot Acquire a LineSender from a closed LineSenderPool")
154+
return nil, errAcquireFromClosedPool
149155
}
150156

151157
// We may have to wait for a free sender
152158
for len(p.freeSenders) == 0 && p.numSenders == p.maxSenders {
153159
p.cond.Wait()
154160
}
155161

162+
if p.closed {
163+
return nil, errAcquireFromClosedPool
164+
}
165+
156166
if len(p.freeSenders) > 0 {
157167
// Pop sender off the slice and return it
158168
s := p.freeSenders[len(p.freeSenders)-1]
@@ -168,7 +178,7 @@ func (p *LineSenderPool) Sender(ctx context.Context) (LineSender, error) {
168178
for _, opt := range p.opts {
169179
opt(conf)
170180
if conf.senderType == tcpSenderType {
171-
return nil, errors.New("tcp/s not supported for pooled senders, use http/s only")
181+
return nil, errHttpOnlySender
172182
}
173183
}
174184
s, err = newHttpLineSender(conf)
@@ -247,7 +257,7 @@ func (p *LineSenderPool) Close(ctx context.Context) error {
247257
return nil
248258
}
249259

250-
err := errors.New("error closing one or more LineSenders in the pool")
260+
err := errPooledSenderClose
251261
for _, senderErr := range senderErrors {
252262
err = fmt.Errorf("%s %w", err, senderErr)
253263
}
@@ -340,7 +350,7 @@ func (ps *pooledSender) Flush(ctx context.Context) error {
340350

341351
func (ps *pooledSender) Close(ctx context.Context) error {
342352
if atomic.AddUint64(&ps.tick, 1)&1 == 1 {
343-
return errors.New("double pooled sender close")
353+
return errDoubleSenderClose
344354
}
345355
return ps.pool.free(ctx, ps)
346356
}

tcp_sender.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"crypto/rand"
3434
"crypto/tls"
3535
"encoding/base64"
36-
"errors"
3736
"fmt"
3837
"math/big"
3938
"net"
@@ -137,7 +136,7 @@ func newTcpLineSender(ctx context.Context, conf *lineSenderConfig) (*tcpLineSend
137136

138137
func (s *tcpLineSender) Close(_ context.Context) error {
139138
if s.conn == nil {
140-
return errors.New("double tcp sender close")
139+
return errDoubleSenderClose
141140
}
142141
conn := s.conn
143142
s.conn = nil
@@ -193,7 +192,7 @@ func (s *tcpLineSender) Flush(ctx context.Context) error {
193192
}
194193
if s.buf.HasTable() {
195194
s.buf.DiscardPendingMsg()
196-
return errors.New("pending ILP message must be finalized with At or AtNow before calling Flush")
195+
return errFlushWithPendingMessage
197196
}
198197

199198
if err = ctx.Err(); err != nil {

0 commit comments

Comments
 (0)