Skip to content

Commit 238bf9b

Browse files
committed
no global prometheus metrics
1 parent 8c24464 commit 238bf9b

File tree

9 files changed

+51
-43
lines changed

9 files changed

+51
-43
lines changed

libs/common/grpc.go

+15-11
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func StartNewGRPCServer(ctx context.Context, addr string, registerServerHook fun
4141
}
4242

4343
// dapr/grpc service
44-
service, ok := daprd.NewServiceWithListener(listener, DefaultServerOptions()...).(*daprd.Server)
44+
service, ok := daprd.NewServiceWithListener(listener, DefaultServerOptions(ctx)...).(*daprd.Server)
4545
if !ok {
4646
log.Fatal().Msg("dapr service listener is not a *daprd.Server")
4747
}
@@ -87,10 +87,13 @@ func StartNewGRPCServer(ctx context.Context, addr string, registerServerHook fun
8787
// DefaultUnaryInterceptors returns the slice of default interceptors for unary gRPC calls
8888
//
8989
// chain := grpc.ChainUnaryInterceptor(common.DefaultUnaryInterceptors()...)
90-
func DefaultUnaryInterceptors(metrics *prometheusGrpcProvider.ServerMetrics) []grpc.UnaryServerInterceptor {
90+
func DefaultUnaryInterceptors(
91+
ctx context.Context,
92+
metrics *prometheusGrpcProvider.ServerMetrics,
93+
) []grpc.UnaryServerInterceptor {
9194
return []grpc.UnaryServerInterceptor{
9295
metrics.UnaryServerInterceptor(),
93-
hwgrpc.UnaryPanicRecoverInterceptor(),
96+
hwgrpc.UnaryPanicRecoverInterceptor(ctx),
9497
hwgrpc.UnaryLoggingInterceptor,
9598
hwgrpc.UnaryErrorQualityControlInterceptor,
9699
hwgrpc.UnaryLocaleInterceptor,
@@ -104,10 +107,13 @@ func DefaultUnaryInterceptors(metrics *prometheusGrpcProvider.ServerMetrics) []g
104107
// DefaultStreamInterceptors returns the slice of default interceptors for stream gRPC calls
105108
//
106109
// chain := grpc.ChainStreamInterceptor(common.DefaultStreamInterceptors()...)
107-
func DefaultStreamInterceptors(metrics *prometheusGrpcProvider.ServerMetrics) []grpc.StreamServerInterceptor {
110+
func DefaultStreamInterceptors(
111+
ctx context.Context,
112+
metrics *prometheusGrpcProvider.ServerMetrics,
113+
) []grpc.StreamServerInterceptor {
108114
return []grpc.StreamServerInterceptor{
109115
metrics.StreamServerInterceptor(),
110-
hwgrpc.StreamPanicRecoverInterceptor(),
116+
hwgrpc.StreamPanicRecoverInterceptor(ctx),
111117
hwgrpc.StreamLoggingInterceptor,
112118
hwgrpc.StreamErrorQualityControlInterceptor,
113119
hwgrpc.StreamLocaleInterceptor,
@@ -118,16 +124,14 @@ func DefaultStreamInterceptors(metrics *prometheusGrpcProvider.ServerMetrics) []
118124
}
119125
}
120126

121-
func DefaultServerOptions() []grpc.ServerOption {
127+
func DefaultServerOptions(ctx context.Context) []grpc.ServerOption {
122128
// register new metrics collector with prometheus
123129
metrics := prometheusGrpcProvider.NewServerMetrics()
124130

125-
if !hwutil.IsRunningInGoTests() {
126-
telemetry.PrometheusRegistry().MustRegister(metrics)
127-
}
131+
telemetry.PrometheusRegistry(ctx).MustRegister(metrics)
128132

129-
unaryInterceptorChain := grpc.ChainUnaryInterceptor(DefaultUnaryInterceptors(metrics)...)
130-
streamInterceptorChain := grpc.ChainStreamInterceptor(DefaultStreamInterceptors(metrics)...)
133+
unaryInterceptorChain := grpc.ChainUnaryInterceptor(DefaultUnaryInterceptors(ctx, metrics)...)
134+
streamInterceptorChain := grpc.ChainStreamInterceptor(DefaultStreamInterceptors(ctx, metrics)...)
131135
statsHandler := grpc.StatsHandler(otelgrpc.NewServerHandler())
132136

133137
return []grpc.ServerOption{unaryInterceptorChain, streamInterceptorChain, statsHandler}

libs/common/hwgrpc/panic_interceptor.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,22 @@ func recoveryHandlerFn() recovery.RecoveryHandlerFuncContext {
3232

3333
_, _ = fmt.Fprintln(os.Stderr, string(debug.Stack()))
3434

35-
panicsRecovered.Counter().Inc()
35+
panicsRecovered.Counter(ctx).Inc()
3636

3737
return hwerr.NewStatusError(ctx, codes.Internal, "panic recovered", locale.GenericError(ctx))
3838
}
3939
}
4040

41-
func UnaryPanicRecoverInterceptor() grpc.UnaryServerInterceptor {
42-
panicsRecovered.Ensure()
41+
func UnaryPanicRecoverInterceptor(ctx context.Context) grpc.UnaryServerInterceptor {
42+
panicsRecovered.Ensure(ctx)
4343

4444
return recovery.UnaryServerInterceptor(
4545
recovery.WithRecoveryHandlerContext(recoveryHandlerFn()),
4646
)
4747
}
4848

49-
func StreamPanicRecoverInterceptor() grpc.StreamServerInterceptor {
50-
panicsRecovered.Ensure()
49+
func StreamPanicRecoverInterceptor(ctx context.Context) grpc.StreamServerInterceptor {
50+
panicsRecovered.Ensure(ctx)
5151

5252
return recovery.StreamServerInterceptor(
5353
recovery.WithRecoveryHandlerContext(recoveryHandlerFn()),

libs/common/hwgrpc/panic_interceptor_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,13 @@ type RecoverySuite struct {
3838

3939
func TestPanicRecoverInterceptor(t *testing.T) {
4040
t.Parallel()
41-
telemetry.SetupMetrics(context.Background(), nil)
41+
ctx := telemetry.SetupMetrics(context.Background(), nil)
4242
s := &RecoverySuite{
4343
InterceptorTestSuite: &testpb.InterceptorTestSuite{
4444
TestService: &recoveryAssertService{TestServiceServer: &testpb.TestPingService{}},
4545
ServerOpts: []grpc.ServerOption{
46-
grpc.StreamInterceptor(StreamPanicRecoverInterceptor()),
47-
grpc.UnaryInterceptor(UnaryPanicRecoverInterceptor()),
46+
grpc.StreamInterceptor(StreamPanicRecoverInterceptor(ctx)),
47+
grpc.UnaryInterceptor(UnaryPanicRecoverInterceptor(ctx)),
4848
},
4949
},
5050
}

libs/common/setup.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func Setup(serviceName, version string, opts ...SetupOption) context.Context {
123123
}
124124
}
125125

126-
telemetry.SetupMetrics(ctx, Shutdown)
126+
ctx = telemetry.SetupMetrics(ctx, Shutdown)
127127

128128
if len(version) == 0 && Mode == ProductionMode {
129129
log.Warn().Msg("Version is empty in production build! Recompile using ldflag '-X main.Version=<version>'")

libs/hwutil/env.go

-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package hwutil
22

33
import (
4-
"flag"
54
"os"
65

76
"github.com/rs/zerolog/log"
@@ -30,7 +29,3 @@ func HasEnv(key string) bool {
3029
_, found := os.LookupEnv(key)
3130
return found
3231
}
33-
34-
func IsRunningInGoTests() bool {
35-
return flag.Lookup("test.v") != nil
36-
}

libs/telemetry/setup.go

+24-15
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"hwutil"
7+
"hwutil/errs"
78
"net/http"
89
"os"
910
"time"
@@ -16,8 +17,6 @@ import (
1617
"github.com/rs/zerolog/log"
1718
)
1819

19-
var prometheusRegistry *prometheus.Registry
20-
2120
func SetupLogging(mode, rawLevel, service, version string) {
2221
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
2322

@@ -50,11 +49,12 @@ func SetupLogging(mode, rawLevel, service, version string) {
5049
}
5150

5251
func startMetricsServer(ctx context.Context, addr string, shutdown func(error)) {
52+
reg := PrometheusRegistry(ctx)
5353
server := &http.Server{
5454
Addr: addr,
5555
Handler: promhttp.InstrumentMetricHandler(
56-
PrometheusRegistry(),
57-
promhttp.HandlerFor(PrometheusRegistry(), promhttp.HandlerOpts{}),
56+
reg,
57+
promhttp.HandlerFor(reg, promhttp.HandlerOpts{}),
5858
),
5959
ReadHeaderTimeout: time.Second * 30, // prevent potential slowloris attack
6060
}
@@ -85,28 +85,37 @@ func startMetricsServer(ctx context.Context, addr string, shutdown func(error))
8585
cancel() // prevent mem leak
8686
}
8787

88+
type promRegKey struct{}
89+
90+
func PrometheusRegistry(ctx context.Context) *prometheus.Registry {
91+
value := ctx.Value(promRegKey{})
92+
reg, ok := value.(*prometheus.Registry)
93+
if !ok {
94+
panic(errs.NewCastError("*prometheus.Registry", value))
95+
}
96+
return reg
97+
}
98+
8899
// SetupMetrics will start a new http server for prometheus to scrape from
89-
func SetupMetrics(ctx context.Context, shutdown func(error)) {
100+
func SetupMetrics(ctx context.Context, shutdown func(error)) context.Context {
90101
// create new prometheus registry, we do not use the global default one,
91102
// as it causes problems with tests
92-
prometheusRegistry = prometheus.NewRegistry()
103+
prometheusRegistry := prometheus.NewRegistry()
104+
ctx = context.WithValue(ctx, promRegKey{}, prometheusRegistry)
93105

94106
l := log.Ctx(ctx)
95107

96108
addr := hwutil.GetEnvOr("METRICS_ADDR", "")
97109

98110
if addr == "" {
99111
l.Warn().Msg("METRICS_ADDR not set, will not export metrics")
100-
return
112+
return ctx
101113
}
102114

103115
l.Info().Str("addr", addr).Msg("starting metrics server")
104116

105117
go startMetricsServer(ctx, addr, shutdown)
106-
}
107-
108-
func PrometheusRegistry() *prometheus.Registry {
109-
return prometheusRegistry
118+
return ctx
110119
}
111120

112121
// LazyCounter prevents access to PrometheusRegistry, before it is initialized
@@ -123,14 +132,14 @@ func NewLazyCounter(opts prometheus.CounterOpts) LazyCounter {
123132
}
124133
}
125134

126-
func (lc *LazyCounter) Counter() prometheus.Counter {
135+
func (lc *LazyCounter) Counter(ctx context.Context) prometheus.Counter {
127136
if lc.counter != nil {
128137
return *lc.counter
129138
}
130-
lc.counter = hwutil.PtrTo(promauto.With(prometheusRegistry).NewCounter(lc.opts))
139+
lc.counter = hwutil.PtrTo(promauto.With(PrometheusRegistry(ctx)).NewCounter(lc.opts))
131140
return *lc.counter
132141
}
133142

134-
func (lc *LazyCounter) Ensure() {
135-
lc.Counter()
143+
func (lc *LazyCounter) Ensure(ctx context.Context) {
144+
lc.Counter(ctx)
136145
}

services/property-svc/internal/property-view/api/grpc_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func server() (context.Context, pb.PropertyViewsServiceClient, *hwes_test.Aggreg
3838
ctx := common.Setup("property-svc", "test", common.WithFakeAuthOnly())
3939

4040
// Start Server
41-
grpcServer := grpc.NewServer(common.DefaultServerOptions()...)
41+
grpcServer := grpc.NewServer(common.DefaultServerOptions(ctx)...)
4242
pb.RegisterPropertyViewsServiceServer(grpcServer, grpcService)
4343
conn, closer := common_test.StartGRPCServer(ctx, grpcServer)
4444

services/tasks-svc/internal/patient/api/grpc_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func server() (context.Context, pb.PatientServiceClient, func()) {
3434
ctx := common.Setup("tasks-svc", "test", common.WithFakeAuthOnly())
3535

3636
// Start Server
37-
grpcServer := grpc.NewServer(common.DefaultServerOptions()...)
37+
grpcServer := grpc.NewServer(common.DefaultServerOptions(ctx)...)
3838
pb.RegisterPatientServiceServer(grpcServer, patientGrpcService)
3939
conn, closer := common_test.StartGRPCServer(ctx, grpcServer)
4040

services/tasks-svc/internal/task/api/grpc_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func server() (context.Context, pb.TaskServiceClient, func()) {
2727
ctx := common.Setup("tasks-svc", "test", common.WithFakeAuthOnly())
2828

2929
// Start Server
30-
grpcServer := grpc.NewServer(common.DefaultServerOptions()...)
30+
grpcServer := grpc.NewServer(common.DefaultServerOptions(ctx)...)
3131
pb.RegisterTaskServiceServer(grpcServer, taskGrpcService)
3232
conn, closer := common_test.StartGRPCServer(ctx, grpcServer)
3333

0 commit comments

Comments
 (0)