Skip to content

Commit 4cd3c36

Browse files
committed
Track Probe collection+closers in Manager with ProbeReference struct
1 parent 6b65138 commit 4cd3c36

File tree

2 files changed

+70
-89
lines changed

2 files changed

+70
-89
lines changed

internal/pkg/instrumentation/manager.go

+55-26
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ const (
4545
// Manager handles the management of [probe.Probe] instances.
4646
type Manager struct {
4747
logger *slog.Logger
48-
probes map[probe.ID]probe.Probe
48+
probes map[probe.ID]ProbeReference
4949
otelController *opentelemetry.Controller
5050
cp ConfigProvider
5151
exe *link.Executable
@@ -59,11 +59,19 @@ type Manager struct {
5959
collectionOpts *ebpf.CollectionOptions
6060
}
6161

62+
// ProbeReference is used by the Manager to track an initialized reference
63+
// to a Probe and its related resources such as its ebpf.Collection and io.Closers.
64+
type ProbeReference struct {
65+
probe probe.Probe
66+
collection *ebpf.Collection
67+
closers []io.Closer
68+
}
69+
6270
// NewManager returns a new [Manager].
6371
func NewManager(logger *slog.Logger, otelController *opentelemetry.Controller, pid process.ID, cp ConfigProvider, probes ...probe.Probe) (*Manager, error) {
6472
m := &Manager{
6573
logger: logger,
66-
probes: make(map[probe.ID]probe.Probe),
74+
probes: make(map[probe.ID]ProbeReference),
6775
otelController: otelController,
6876
cp: cp,
6977
}
@@ -132,7 +140,10 @@ func (m *Manager) registerProbe(p probe.Probe) error {
132140
return err
133141
}
134142

135-
m.probes[id] = p
143+
m.probes[id] = ProbeReference{
144+
probe: p,
145+
closers: make([]io.Closer, 0),
146+
}
136147
return nil
137148
}
138149

@@ -146,7 +157,7 @@ func (m *Manager) filterUnusedProbes() {
146157

147158
for name, inst := range m.probes {
148159
funcsFound := false
149-
for _, s := range inst.Manifest().Symbols {
160+
for _, s := range inst.probe.Manifest().Symbols {
150161
if len(s.DependsOn) == 0 {
151162
if _, exists := existingFuncMap[s.Symbol]; exists {
152163
funcsFound = true
@@ -213,15 +224,15 @@ func (m *Manager) applyConfig(c Config) error {
213224

214225
if currentlyEnabled && !newEnabled {
215226
m.logger.Info("Disabling probe", "id", id)
216-
err = errors.Join(err, p.Close())
227+
err = errors.Join(err, m.CloseProbe(p))
217228
continue
218229
}
219230

220231
if !currentlyEnabled && newEnabled {
221232
m.logger.Info("Enabling probe", "id", id)
222233
err = errors.Join(err, m.LoadProbe(p, id, c))
223234
if err == nil {
224-
m.runProbe(p)
235+
m.runProbe(p.probe)
225236
}
226237
continue
227238
}
@@ -296,7 +307,7 @@ func (m *Manager) runProbes(ctx context.Context) (context.Context, error) {
296307

297308
for id, p := range m.probes {
298309
if isProbeEnabled(id, m.currentConfig) {
299-
m.runProbe(p)
310+
m.runProbe(p.probe)
300311
}
301312
}
302313

@@ -390,29 +401,31 @@ func (m *Manager) loadProbes() error {
390401
return nil
391402
}
392403

393-
func (m *Manager) LoadProbe(i probe.Probe, name probe.ID, cfg Config) error {
404+
func (m *Manager) LoadProbe(i ProbeReference, name probe.ID, cfg Config) error {
394405
m.logger.Info("loading probe", "name", name)
395406

396-
err := i.Init(cfg.SamplingConfig)
407+
spec, err := i.probe.Spec()
397408
if err != nil {
398409
return errors.Join(err, m.cleanup())
399410
}
400411

401-
spec, err := i.Spec()
412+
err = m.InjectProbeConsts(i.probe, spec)
402413
if err != nil {
403-
return errors.Join(err, m.cleanup())
414+
return err
404415
}
405416

406-
err = m.InjectProbeConsts(i, spec)
417+
c, err := utils.InitializeEBPFCollection(spec, m.collectionOpts)
407418
if err != nil {
408419
return err
409420
}
421+
i.collection = c
410422

411-
c, err := utils.InitializeEBPFCollection(spec, m.collectionOpts)
423+
reader, err := i.probe.InitStartupConfig(c, cfg.SamplingConfig)
412424
if err != nil {
413-
return err
425+
return errors.Join(err, m.cleanup())
414426
}
415-
i.SetCollection(c)
427+
i.closers = append(i.closers, reader)
428+
416429
return nil
417430
}
418431

@@ -437,8 +450,8 @@ func (m *Manager) InjectProbeConsts(i probe.Probe, spec *ebpf.CollectionSpec) er
437450
return inject.Constants(spec, opts...)
438451
}
439452

440-
func (m *Manager) loadUprobesFromProbe(i probe.Probe) error {
441-
for _, up := range i.GetUprobes() {
453+
func (m *Manager) loadUprobesFromProbe(i ProbeReference) error {
454+
for _, up := range i.probe.GetUprobes() {
442455
var skip bool
443456
for _, pc := range up.PackageConstraints {
444457
if pc.Constraints.Check(m.proc.Modules[pc.Package]) {
@@ -448,17 +461,17 @@ func (m *Manager) loadUprobesFromProbe(i probe.Probe) error {
448461
var logFn func(string, ...any)
449462
switch pc.FailureMode {
450463
case probe.FailureModeIgnore:
451-
logFn = i.GetLogger().Debug
464+
logFn = i.probe.GetLogger().Debug
452465
case probe.FailureModeWarn:
453-
logFn = i.GetLogger().Warn
466+
logFn = i.probe.GetLogger().Warn
454467
default:
455468
// Unknown and FailureModeError.
456469
return fmt.Errorf("uprobe %s package constraint (%s) not met, version %v", up.Sym, pc.Constraints.String(), m.proc.Modules[pc.Package])
457470
}
458471

459472
logFn(
460473
"package constraint not meet, skipping uprobe",
461-
"probe", i.Manifest().ID,
474+
"probe", i.probe.Manifest().ID,
462475
"symbol", up.Sym,
463476
"package", pc.Package,
464477
"constraint", pc.Constraints.String(),
@@ -472,22 +485,23 @@ func (m *Manager) loadUprobesFromProbe(i probe.Probe) error {
472485
continue
473486
}
474487

475-
err := m.loadUprobe(up, i.GetCollection())
488+
err := m.loadUprobe(up, i.collection)
476489
if err != nil {
477490
var logFn func(string, ...any)
478491
switch up.FailureMode {
479492
case probe.FailureModeIgnore:
480-
logFn = i.GetLogger().Debug
493+
logFn = i.probe.GetLogger().Debug
481494
case probe.FailureModeWarn:
482-
logFn = i.GetLogger().Warn
495+
logFn = i.probe.GetLogger().Warn
483496
default:
484497
// Unknown and FailureModeError.
485498
return err
486499
}
487-
logFn("failed to load uprobe", "probe", i.Manifest().ID, "symbol", up.Sym, "error", err)
500+
logFn("failed to load uprobe", "probe", i.probe.Manifest().ID, "symbol", up.Sym, "error", err)
488501
continue
489502
}
490-
_ = i.UpdateClosers(up)
503+
504+
i.closers = append(i.closers, up)
491505
}
492506
return nil
493507
}
@@ -555,11 +569,26 @@ func (m *Manager) mount() error {
555569
return bpffsMount(m.proc)
556570
}
557571

572+
func (m *Manager) CloseProbe(p ProbeReference) error {
573+
if p.collection != nil {
574+
p.collection.Close()
575+
}
576+
577+
var err error
578+
for _, c := range p.closers {
579+
err = errors.Join(err, c.Close())
580+
}
581+
if err == nil {
582+
p.probe.GetLogger().Debug("Closed", "Probe", p.probe.Manifest().ID)
583+
}
584+
return err
585+
}
586+
558587
func (m *Manager) cleanup() error {
559588
ctx := context.Background()
560589
err := m.cp.Shutdown(context.Background())
561590
for _, i := range m.probes {
562-
err = errors.Join(err, i.Close())
591+
err = errors.Join(err, m.CloseProbe(i))
563592
}
564593

565594
// Wait for all probes to close so we know there is no more telemetry being

internal/pkg/instrumentation/probe/probe.go

+15-63
Original file line numberDiff line numberDiff line change
@@ -33,36 +33,25 @@ type Probe interface {
3333
// the information about the package the Probe instruments.
3434
Manifest() Manifest
3535

36-
// Init initializes the Probe, setting up the reader, closers, and sampling config.
37-
Init(*sampling.Config) error
36+
// InitStartupConfig sets up initialization config options for the Probe,
37+
// such as its sampling config, sets up its BPFObj as a closer, and initializes
38+
// the Probe's reader, returning it as an io.Closer.
39+
InitStartupConfig(*ebpf.Collection, *sampling.Config) (io.Closer, error)
3840

3941
// Run runs the events processing loop.
4042
Run(func(ptrace.ScopeSpans))
4143

42-
// Close stops the Probe.
43-
Close() error
44-
4544
// GetLogger returns the *slog.Logger associated with the Probe.
4645
GetLogger() *slog.Logger
4746

4847
// Spec returns the *ebpf.CollectionSpec for the Probe.
4948
Spec() (*ebpf.CollectionSpec, error)
5049

51-
// SetCollection sets the *ebpf.Collection for the Probe.
52-
SetCollection(*ebpf.Collection)
53-
54-
// GetCollection returns the *ebpf.Collection for the Probe.
55-
GetCollection() *ebpf.Collection
56-
5750
// GetUprobes returns a list of *Uprobes for the Probe.
5851
GetUprobes() []*Uprobe
5952

6053
// GetConsts returns a list of Consts for the Probe.
6154
GetConsts() []Const
62-
63-
// UpdateClosers updates the closers for the Probe to the io.Closers passed to it,
64-
// and returns the new list of io.Closers for the Probe.
65-
UpdateClosers(...io.Closer) []io.Closer
6655
}
6756

6857
// Base is a base implementation of [Probe].
@@ -92,7 +81,6 @@ type Base[BPFObj any, BPFEvent any] struct {
9281
ProcessRecord func(perf.Record) (*BPFEvent, error)
9382

9483
reader *perf.Reader
95-
collection *ebpf.Collection
9684
closers []io.Closer
9785
samplingManager *sampling.Manager
9886
}
@@ -130,40 +118,33 @@ func (i *Base[BPFObj, BPFEvent]) Spec() (*ebpf.CollectionSpec, error) {
130118
return i.SpecFn()
131119
}
132120

133-
// Init initializes the Probe, setting up the io.Closers, Reader, and Sampling config.
134-
func (i *Base[BPFObj, BPFEvent]) Init(sampler *sampling.Config) error {
121+
func (i *Base[BPFObj, BPFEvent]) InitStartupConfig(c *ebpf.Collection, sampler *sampling.Config) (io.Closer, error) {
135122
obj := new(BPFObj)
136123
if c, ok := ((interface{})(obj)).(io.Closer); ok {
137124
i.closers = append(i.closers, c)
138125
}
139126

140-
err := i.initReader()
127+
samplingManager, err := sampling.NewSamplingManager(c, sampler)
141128
if err != nil {
142-
return err
129+
return nil, err
143130
}
131+
i.samplingManager = samplingManager
144132

145-
i.samplingManager, err = sampling.NewSamplingManager(i.collection, sampler)
133+
buf, ok := c.Maps[DefaultBufferMapName]
134+
if !ok {
135+
return nil, fmt.Errorf("%s map not found", DefaultBufferMapName)
136+
}
137+
i.reader, err = perf.NewReader(buf, PerfBufferDefaultSizeInPages*os.Getpagesize())
146138
if err != nil {
147-
return err
139+
return nil, err
148140
}
149-
150-
i.closers = append(i.closers, i.reader)
151-
152-
return nil
141+
return i.reader, nil
153142
}
154143

155144
func (i *Base[BPFObj, BPFEvent]) GetLogger() *slog.Logger {
156145
return i.Logger
157146
}
158147

159-
func (i *Base[BPFObj, BPFEvent]) SetCollection(c *ebpf.Collection) {
160-
i.collection = c
161-
}
162-
163-
func (i *Base[BPFObj, BPFEvent]) GetCollection() *ebpf.Collection {
164-
return i.collection
165-
}
166-
167148
func (i *Base[BPFObj, BPFEvent]) GetUprobes() []*Uprobe {
168149
return i.Uprobes
169150
}
@@ -177,20 +158,6 @@ func (i *Base[BPFObj, BPFEvent]) GetConsts() []Const {
177158
return i.Consts
178159
}
179160

180-
func (i *Base[BPFObj, BPFEvent]) initReader() error {
181-
buf, ok := i.collection.Maps[DefaultBufferMapName]
182-
if !ok {
183-
return fmt.Errorf("%s map not found", DefaultBufferMapName)
184-
}
185-
var err error
186-
i.reader, err = perf.NewReader(buf, PerfBufferDefaultSizeInPages*os.Getpagesize())
187-
if err != nil {
188-
return err
189-
}
190-
i.closers = append(i.closers, i.reader)
191-
return nil
192-
}
193-
194161
// read reads a new BPFEvent from the perf Reader.
195162
func (i *Base[BPFObj, BPFEvent]) read() (*BPFEvent, error) {
196163
record, err := i.reader.Read()
@@ -221,21 +188,6 @@ func (i *Base[BPFObj, BPFEvent]) read() (*BPFEvent, error) {
221188
return event, nil
222189
}
223190

224-
// Close stops the Probe.
225-
func (i *Base[BPFObj, BPFEvent]) Close() error {
226-
if i.collection != nil {
227-
i.collection.Close()
228-
}
229-
var err error
230-
for _, c := range i.closers {
231-
err = errors.Join(err, c.Close())
232-
}
233-
if err == nil {
234-
i.Logger.Debug("Closed", "Probe", i.ID)
235-
}
236-
return err
237-
}
238-
239191
type SpanProducer[BPFObj any, BPFEvent any] struct {
240192
Base[BPFObj, BPFEvent]
241193

0 commit comments

Comments
 (0)