• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pomerium / pomerium / 18690754649

21 Oct 2025 04:27PM UTC coverage: 53.953% (+0.02%) from 53.929%
18690754649

push

github

web-flow
endpoints: add paths (#5888)

## Summary
Add additional paths to the `endpoints` package.


## Checklist

- [ ] reference any related issues
- [x] updated unit tests
- [x] add appropriate label (`enhancement`, `bug`, `breaking`,
`dependencies`, `ci`)
- [x] ready for review

60 of 76 new or added lines in 22 files covered. (78.95%)

8 existing lines in 5 files now uncovered.

27424 of 50829 relevant lines covered (53.95%)

86.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.24
/pkg/envoy/resource_monitor_linux.go
1
//go:build linux
2

3
package envoy
4

5
import (
6
        "bufio"
7
        "bytes"
8
        "context"
9
        "errors"
10
        "fmt"
11
        "io/fs"
12
        "os"
13
        "path/filepath"
14
        "slices"
15
        "strconv"
16
        "strings"
17
        "sync"
18
        "sync/atomic"
19
        "time"
20
        "unsafe"
21

22
        envoy_config_bootstrap_v3 "github.com/envoyproxy/go-control-plane/envoy/config/bootstrap/v3"
23
        envoy_config_overload_v3 "github.com/envoyproxy/go-control-plane/envoy/config/overload/v3"
24
        envoy_extensions_resource_monitors_injected_resource_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/resource_monitors/injected_resource/v3"
25
        typev3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
26
        atomicfs "github.com/natefinch/atomic"
27
        "golang.org/x/sys/unix"
28
        "google.golang.org/protobuf/proto"
29
        "google.golang.org/protobuf/types/known/anypb"
30

31
        "github.com/pomerium/pomerium/config"
32
        "github.com/pomerium/pomerium/internal/log"
33
        "github.com/pomerium/pomerium/internal/telemetry/metrics"
34
)
35

36
type CgroupFilePath int
37

38
const (
39
        RootPath CgroupFilePath = iota
40
        MemoryUsagePath
41
        MemoryLimitPath
42
)
43

44
type CgroupDriver interface {
45
        CgroupForPid(pid int) (string, error)
46
        Path(cgroup string, kind CgroupFilePath) string
47
        Validate(cgroup string) error
48
        MemoryUsage(cgroup string) (uint64, error)
49
        MemoryLimit(cgroup string) (uint64, error)
50
}
51

52
var (
53
        overloadActions = []struct {
54
                ActionName string
55
                Trigger    *envoy_config_overload_v3.Trigger
56
        }{
57
                // At 90%, envoy will shrink its heap every 10 seconds
58
                // https://github.com/envoyproxy/envoy/blob/v1.30.1/source/common/memory/heap_shrinker.cc
59
                {"shrink_heap", memUsageThreshold(0.9)},
60

61
                // At >85% memory usage, gradually start reducing timeouts, by up to 50%.
62
                // https://www.envoyproxy.io/docs/envoy/latest/configuration/operations/overload_manager/overload_manager#reducing-timeouts
63
                // https://github.com/envoyproxy/envoy/blob/v1.30.1/source/server/overload_manager_impl.cc#L565-L572
64
                {"reduce_timeouts", memUsageScaled(0.85, 0.95)},
65

66
                // At 90%, start resetting streams using the most memory. As memory usage
67
                // increases, the eligibility threshold is reduced.
68
                // https://www.envoyproxy.io/docs/envoy/latest/configuration/operations/overload_manager/overload_manager#reset-streams
69
                // https://github.com/envoyproxy/envoy/blob/v1.30.1/source/server/worker_impl.cc#L180
70
                {"reset_high_memory_stream", memUsageScaled(0.90, 0.98)},
71

72
                // At 95%, stop accepting new connections, but keep existing ones open.
73
                // https://github.com/envoyproxy/envoy/blob/v1.30.1/source/server/worker_impl.cc#L168-L174
74
                {"stop_accepting_connections", memUsageThreshold(0.95)},
75

76
                // At 98%, disable HTTP keepalive. This prevents new http/2 streams and
77
                // ends all existing ones.
78
                // https://github.com/envoyproxy/envoy/blob/v1.30.1/source/common/http/conn_manager_impl.cc#L1735-L1755
79
                {"disable_http_keepalive", memUsageThreshold(0.98)},
80

81
                // At 99%, drop all new requests.
82
                // https://github.com/envoyproxy/envoy/blob/v1.30.1/source/common/http/conn_manager_impl.cc#L1203-L1225
83
                {"stop_accepting_requests", memUsageThreshold(0.99)},
84
        }
85
        overloadActionConfigs = map[string]*anypb.Any{
86
                "reduce_timeouts": marshalAny(&envoy_config_overload_v3.ScaleTimersOverloadActionConfig{
87
                        TimerScaleFactors: []*envoy_config_overload_v3.ScaleTimersOverloadActionConfig_ScaleTimer{
88
                                {
89
                                        Timer: envoy_config_overload_v3.ScaleTimersOverloadActionConfig_HTTP_DOWNSTREAM_CONNECTION_IDLE,
90
                                        OverloadAdjust: &envoy_config_overload_v3.ScaleTimersOverloadActionConfig_ScaleTimer_MinScale{
91
                                                MinScale: &typev3.Percent{
92
                                                        Value: 50, // reduce the idle timeout by 50% at most
93
                                                },
94
                                        },
95
                                },
96
                        },
97
                }),
98
        }
99
        recordActionThresholdsOnce sync.Once
100
        computedActionThresholds   = make(map[string]float64)
101
)
102

103
func init() {
1✔
104
        for _, action := range overloadActions {
7✔
105
                var minThreshold float64
6✔
106
                switch trigger := action.Trigger.TriggerOneof.(type) {
6✔
107
                case *envoy_config_overload_v3.Trigger_Scaled:
2✔
108
                        minThreshold = trigger.Scaled.ScalingThreshold
2✔
109
                case *envoy_config_overload_v3.Trigger_Threshold:
4✔
110
                        minThreshold = trigger.Threshold.Value
4✔
111
                }
112
                computedActionThresholds[action.ActionName] = minThreshold
6✔
113
        }
114
}
115

116
func recordActionThresholds() {
3✔
117
        recordActionThresholdsOnce.Do(func() {
4✔
118
                for name, minThreshold := range computedActionThresholds {
7✔
119
                        metrics.RecordEnvoyOverloadActionThreshold(context.Background(), name, minThreshold)
6✔
120
                }
6✔
121
        })
122
}
123

124
const (
125
        groupMemory = "memory"
126

127
        metricCgroupMemorySaturation = "cgroup_memory_saturation"
128
)
129

130
type ResourceMonitorOptions struct {
131
        driver CgroupDriver
132
}
133

134
type ResourceMonitorOption func(*ResourceMonitorOptions)
135

136
func (o *ResourceMonitorOptions) apply(opts ...ResourceMonitorOption) {
3✔
137
        for _, op := range opts {
6✔
138
                op(o)
3✔
139
        }
3✔
140
}
141

142
// WithCgroupDriver overrides the cgroup driver used for the resource monitor.
143
// If unset, it will be chosen automatically.
144
func WithCgroupDriver(driver CgroupDriver) ResourceMonitorOption {
3✔
145
        return func(o *ResourceMonitorOptions) {
6✔
146
                o.driver = driver
3✔
147
        }
3✔
148
}
149

150
// NewSharedResourceMonitor creates a new ResourceMonitor suitable for running
151
// envoy in the same cgroup as the parent process. It reports the cgroup's
152
// memory saturation to envoy as an injected resource. This allows envoy to
153
// react to actual memory pressure in the cgroup, taking into account memory
154
// usage from pomerium itself.
155
func NewSharedResourceMonitor(ctx context.Context, src config.Source, tempDir string, opts ...ResourceMonitorOption) (ResourceMonitor, error) {
3✔
156
        options := ResourceMonitorOptions{}
3✔
157
        options.apply(opts...)
3✔
158
        if options.driver == nil {
3✔
159
                var err error
×
160
                options.driver, err = DetectCgroupDriver()
×
161
                if err != nil {
×
162
                        return nil, err
×
163
                }
×
164
        }
165
        recordActionThresholds()
3✔
166

3✔
167
        selfCgroup, err := options.driver.CgroupForPid(os.Getpid())
3✔
168
        if err != nil {
3✔
169
                return nil, fmt.Errorf("failed to look up cgroup: %w", err)
×
170
        }
×
171
        if err := options.driver.Validate(selfCgroup); err != nil {
3✔
172
                return nil, fmt.Errorf("cgroup not valid for resource monitoring: %w", err)
×
173
        }
×
174

175
        if err := os.MkdirAll(filepath.Join(tempDir, "resource_monitor", groupMemory), 0o755); err != nil {
3✔
176
                return nil, fmt.Errorf("failed to create resource monitor directory: %w", err)
×
177
        }
×
178

179
        s := &sharedResourceMonitor{
3✔
180
                ResourceMonitorOptions: options,
3✔
181
                cgroup:                 selfCgroup,
3✔
182
                tempDir:                filepath.Join(tempDir, "resource_monitor"),
3✔
183
        }
3✔
184
        readInitialConfig := make(chan struct{})
3✔
185
        src.OnConfigChange(ctx, func(ctx context.Context, c *config.Config) {
5✔
186
                <-readInitialConfig
2✔
187
                s.onConfigChange(ctx, c)
2✔
188
        })
2✔
189
        s.onConfigChange(ctx, src.GetConfig())
3✔
190
        close(readInitialConfig)
3✔
191

3✔
192
        if err := s.writeMetricFile(groupMemory, metricCgroupMemorySaturation, "0", 0o644); err != nil {
3✔
193
                return nil, fmt.Errorf("failed to initialize metrics: %w", err)
×
194
        }
×
195
        return s, nil
3✔
196
}
197

198
type sharedResourceMonitor struct {
199
        ResourceMonitorOptions
200
        cgroup  string
201
        tempDir string
202
        enabled atomic.Bool
203
}
204

205
func (s *sharedResourceMonitor) onConfigChange(_ context.Context, cfg *config.Config) {
5✔
206
        if cfg == nil || cfg.Options == nil {
7✔
207
                s.enabled.Store(config.DefaultRuntimeFlags()[config.RuntimeFlagEnvoyResourceManager])
2✔
208
                return
2✔
209
        }
2✔
210
        s.enabled.Store(cfg.Options.IsRuntimeFlagSet(config.RuntimeFlagEnvoyResourceManager))
3✔
211
}
212

213
func (s *sharedResourceMonitor) metricFilename(group, name string) string {
1✔
214
        return filepath.Join(s.tempDir, group, name)
1✔
215
}
1✔
216

217
func memUsageScaled(scaling, saturation float64) *envoy_config_overload_v3.Trigger {
2✔
218
        return &envoy_config_overload_v3.Trigger{
2✔
219
                Name: "envoy.resource_monitors.injected_resource",
2✔
220
                TriggerOneof: &envoy_config_overload_v3.Trigger_Scaled{
2✔
221
                        Scaled: &envoy_config_overload_v3.ScaledTrigger{
2✔
222
                                ScalingThreshold:    scaling,
2✔
223
                                SaturationThreshold: saturation,
2✔
224
                        },
2✔
225
                },
2✔
226
        }
2✔
227
}
2✔
228

229
func memUsageThreshold(threshold float64) *envoy_config_overload_v3.Trigger {
4✔
230
        return &envoy_config_overload_v3.Trigger{
4✔
231
                Name: "envoy.resource_monitors.injected_resource",
4✔
232
                TriggerOneof: &envoy_config_overload_v3.Trigger_Threshold{
4✔
233
                        Threshold: &envoy_config_overload_v3.ThresholdTrigger{
4✔
234
                                Value: threshold,
4✔
235
                        },
4✔
236
                },
4✔
237
        }
4✔
238
}
4✔
239

240
func (s *sharedResourceMonitor) ApplyBootstrapConfig(bootstrap *envoy_config_bootstrap_v3.Bootstrap) {
1✔
241
        if bootstrap.OverloadManager == nil {
1✔
242
                bootstrap.OverloadManager = &envoy_config_overload_v3.OverloadManager{}
×
243
        }
×
244

245
        bootstrap.OverloadManager.ResourceMonitors = append(bootstrap.OverloadManager.ResourceMonitors,
1✔
246
                &envoy_config_overload_v3.ResourceMonitor{
1✔
247
                        Name: "envoy.resource_monitors.injected_resource",
1✔
248
                        ConfigType: &envoy_config_overload_v3.ResourceMonitor_TypedConfig{
1✔
249
                                TypedConfig: marshalAny(&envoy_extensions_resource_monitors_injected_resource_v3.InjectedResourceConfig{
1✔
250
                                        Filename: s.metricFilename(groupMemory, metricCgroupMemorySaturation),
1✔
251
                                }),
1✔
252
                        },
1✔
253
                },
1✔
254
        )
1✔
255

1✔
256
        for _, action := range overloadActions {
7✔
257
                bootstrap.OverloadManager.Actions = append(bootstrap.OverloadManager.Actions,
6✔
258
                        &envoy_config_overload_v3.OverloadAction{
6✔
259
                                Name:        fmt.Sprintf("envoy.overload_actions.%s", action.ActionName),
6✔
260
                                Triggers:    []*envoy_config_overload_v3.Trigger{action.Trigger},
6✔
261
                                TypedConfig: overloadActionConfigs[action.ActionName],
6✔
262
                        },
6✔
263
                )
6✔
264
        }
6✔
265

266
        bootstrap.OverloadManager.BufferFactoryConfig = &envoy_config_overload_v3.BufferFactoryConfig{
1✔
267
                // https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/overload/v3/overload.proto#config-overload-v3-bufferfactoryconfig
1✔
268
                MinimumAccountToTrackPowerOfTwo: 20,
1✔
269
        }
1✔
270
}
271

272
var (
273
        monitorInitialTickDelay = 1 * time.Second
274
        monitorMaxTickInterval  = 10 * time.Second
275
        monitorMinTickInterval  = 250 * time.Millisecond
276
)
277

278
func (s *sharedResourceMonitor) Run(ctx context.Context, envoyPid int) error {
2✔
279
        envoyCgroup, err := s.driver.CgroupForPid(envoyPid)
2✔
280
        if err != nil {
2✔
281
                return fmt.Errorf("failed to look up cgroup for envoy process: %w", err)
×
282
        }
×
283
        if envoyCgroup != s.cgroup {
2✔
284
                return fmt.Errorf("envoy process is not in the expected cgroup: %s", envoyCgroup)
×
285
        }
×
286
        log.Ctx(ctx).Debug().Str("service", "envoy").Str("cgroup", s.cgroup).Msg("starting resource monitor")
2✔
287

2✔
288
        ctx, ca := context.WithCancelCause(ctx)
2✔
289

2✔
290
        limitWatcher := &memoryLimitWatcher{
2✔
291
                limitFilePath: filepath.Clean("/" + s.driver.Path(s.cgroup, MemoryLimitPath)),
2✔
292
        }
2✔
293

2✔
294
        watcherExited := make(chan struct{})
2✔
295
        if err := limitWatcher.Watch(ctx); err != nil {
2✔
296
                ca(nil)
×
297
                return fmt.Errorf("failed to start watch on cgroup memory limit: %w", err)
×
298
        }
×
299
        go func() {
4✔
300
                limitWatcher.Wait()
2✔
301
                ca(errors.New("memory limit watcher stopped"))
2✔
302
                close(watcherExited)
2✔
303
        }()
2✔
304

305
        // Set initial values for state metrics
306
        s.updateActionStates(ctx, 0)
2✔
307

2✔
308
        // The interval at which we check memory usage is scaled based on the current
2✔
309
        // memory saturation. When memory usage is low, we check less frequently, and
2✔
310
        // as the saturation increases, we also increase the frequency of checks. Most
2✔
311
        // of the thresholds at which some action is taken to reduce memory usage are
2✔
312
        // very high (e.g. 95% of the limit). As memory usage approaches this limit,
2✔
313
        // it becomes increasingly important to have accurate data, since memory usage
2✔
314
        // can change rapidly; we want to avoid hitting the limit, but also delay
2✔
315
        // taking disruptive actions for as long as possible.
2✔
316

2✔
317
        // the envoy default interval for the builtin heap monitor is 1s
2✔
318

2✔
319
        tick := time.NewTimer(monitorInitialTickDelay)
2✔
320
        var lastValue string
2✔
321
LOOP:
2✔
322
        for {
12✔
323
                select {
10✔
324
                case <-ctx.Done():
2✔
325
                        tick.Stop()
2✔
326
                        break LOOP
2✔
327
                case <-tick.C:
8✔
328
                        var saturation float64
8✔
329
                        if s.enabled.Load() {
15✔
330
                                if limit := limitWatcher.Value(); limit > 0 {
13✔
331
                                        usage, err := s.driver.MemoryUsage(s.cgroup)
6✔
332
                                        if err != nil {
6✔
333
                                                log.Ctx(ctx).Error().Err(err).Msg("failed to get memory saturation")
×
334
                                                continue
×
335
                                        }
336
                                        saturation = max(0.0, min(1.0, float64(usage)/float64(limit)))
6✔
337
                                }
338
                        }
339

340
                        saturationStr := fmt.Sprintf("%.6f", saturation)
8✔
341
                        nextInterval := computeScaledTickInterval(saturation)
8✔
342

8✔
343
                        if saturationStr != lastValue {
16✔
344
                                lastValue = saturationStr
8✔
345
                                if err := s.writeMetricFile(groupMemory, metricCgroupMemorySaturation, saturationStr, 0o644); err != nil {
8✔
346
                                        log.Ctx(ctx).Error().Err(err).Msg("failed to write metric file")
×
347
                                }
×
348
                                s.updateActionStates(ctx, saturation)
8✔
349
                                metrics.RecordEnvoyCgroupMemorySaturation(ctx, s.cgroup, saturation)
8✔
350
                                log.Ctx(ctx).Debug().
8✔
351
                                        Str("service", "envoy").
8✔
352
                                        Str("metric", metricCgroupMemorySaturation).
8✔
353
                                        Str("value", saturationStr).
8✔
354
                                        Dur("interval_ms", nextInterval).
8✔
355
                                        Msg("updated metric")
8✔
356
                        }
357

358
                        tick.Reset(nextInterval)
8✔
359
                }
360
        }
361

362
        <-watcherExited
2✔
363
        return context.Cause(ctx)
2✔
364
}
365

366
// Returns a value between monitorMinTickInterval and monitorMaxTickInterval, based
367
// on the given saturation value in the range [0.0, 1.0].
368
func computeScaledTickInterval(saturation float64) time.Duration {
14✔
369
        return monitorMaxTickInterval - (time.Duration(float64(monitorMaxTickInterval-monitorMinTickInterval) * max(0.0, min(1.0, saturation)))).
14✔
370
                Round(time.Millisecond)
14✔
371
}
14✔
372

373
func (s *sharedResourceMonitor) updateActionStates(ctx context.Context, pct float64) {
10✔
374
        for name, minThreshold := range computedActionThresholds {
70✔
375
                var state int64
60✔
376
                if pct >= minThreshold {
72✔
377
                        state = 1
12✔
378
                }
12✔
379
                metrics.RecordEnvoyOverloadActionState(ctx,
60✔
380
                        metrics.EnvoyOverloadActionStateTags{
60✔
381
                                Cgroup:     s.cgroup,
60✔
382
                                ActionName: name,
60✔
383
                        },
60✔
384
                        state,
60✔
385
                )
60✔
386
        }
387
}
388

389
func (s *sharedResourceMonitor) writeMetricFile(group, name, data string, mode fs.FileMode) error {
11✔
390
        // Logic here is similar to atomic.WriteFile, but because envoy watches the
11✔
391
        // parent directory for changes to any file, we write the temp file one level
11✔
392
        // up before moving it into the watched location, to avoid triggering inotify
11✔
393
        // events for the temp file.
11✔
394
        f, err := os.CreateTemp(s.tempDir, name)
11✔
395
        if err != nil {
11✔
396
                return err
×
397
        }
×
398
        tempFilename := f.Name()
11✔
399
        defer os.Remove(tempFilename)
11✔
400
        defer f.Close()
11✔
401
        if _, err := f.Write([]byte(data)); err != nil {
11✔
402
                return err
×
403
        }
×
404
        if err := f.Sync(); err != nil {
11✔
405
                return err
×
406
        }
×
407
        if err := f.Close(); err != nil {
11✔
408
                return err
×
409
        }
×
410
        if err := os.Chmod(tempFilename, mode); err != nil {
11✔
411
                return err
×
412
        }
×
413
        if err := atomicfs.ReplaceFile(tempFilename, filepath.Join(s.tempDir, group, name)); err != nil {
11✔
414
                return err
×
415
        }
×
416
        return nil
11✔
417
}
418

419
type cgroupV2Driver struct {
420
        fs   fs.FS
421
        root string
422
}
423

424
func (d *cgroupV2Driver) Path(cgroup string, kind CgroupFilePath) string {
17✔
425
        switch kind {
17✔
426
        case RootPath:
1✔
427
                return d.root
1✔
428
        case MemoryUsagePath:
10✔
429
                return filepath.Join(d.root, cgroup, "memory.current")
10✔
430
        case MemoryLimitPath:
5✔
431
                return filepath.Join(d.root, cgroup, "memory.max")
5✔
432
        }
433
        return ""
1✔
434
}
435

436
func (d *cgroupV2Driver) CgroupForPid(pid int) (string, error) {
9✔
437
        data, err := fs.ReadFile(d.fs, fmt.Sprintf("proc/%d/cgroup", pid))
9✔
438
        if err != nil {
10✔
439
                return "", err
1✔
440
        }
1✔
441
        return parseCgroupName(data)
8✔
442
}
443

444
// MemoryUsage implements CgroupDriver.
445
func (d *cgroupV2Driver) MemoryUsage(cgroup string) (uint64, error) {
9✔
446
        current, err := fs.ReadFile(d.fs, d.Path(cgroup, MemoryUsagePath))
9✔
447
        if err != nil {
10✔
448
                return 0, err
1✔
449
        }
1✔
450
        return strconv.ParseUint(strings.TrimSpace(string(current)), 10, 64)
8✔
451
}
452

453
// MemoryLimit implements CgroupDriver.
454
func (d *cgroupV2Driver) MemoryLimit(cgroup string) (uint64, error) {
4✔
455
        data, err := fs.ReadFile(d.fs, d.Path(cgroup, MemoryLimitPath))
4✔
456
        if err != nil {
5✔
457
                return 0, err
1✔
458
        }
1✔
459
        v := strings.TrimSpace(string(data))
3✔
460
        if v == "max" {
4✔
461
                return 0, nil
1✔
462
        }
1✔
463
        return strconv.ParseUint(v, 10, 64)
2✔
464
}
465

466
// Validate implements CgroupDriver.
467
func (d *cgroupV2Driver) Validate(cgroup string) error {
11✔
468
        if typ, err := fs.ReadFile(d.fs, filepath.Join(d.root, cgroup, "cgroup.type")); err != nil {
12✔
469
                return err
1✔
470
        } else if strings.TrimSpace(string(typ)) != "domain" {
12✔
471
                return errors.New("not a domain cgroup")
1✔
472
        }
1✔
473

474
        if controllers, err := d.enabledSubtreeControllers(cgroup); err != nil {
10✔
475
                return err
1✔
476
        } else if len(controllers) > 0 {
10✔
477
                return errors.New("not a leaf cgroup")
1✔
478
        }
1✔
479

480
        if controllers, err := d.enabledControllers(cgroup); err != nil {
8✔
481
                return err
1✔
482
        } else if !slices.Contains(controllers, "memory") {
8✔
483
                return errors.New("memory controller not enabled")
1✔
484
        }
1✔
485

486
        return nil
5✔
487
}
488

489
func (d *cgroupV2Driver) enabledControllers(cgroup string) ([]string, error) {
7✔
490
        data, err := fs.ReadFile(d.fs, filepath.Join(d.root, cgroup, "cgroup.controllers"))
7✔
491
        if err != nil {
8✔
492
                return nil, err
1✔
493
        }
1✔
494
        return strings.Fields(string(data)), nil
6✔
495
}
496

497
func (d *cgroupV2Driver) enabledSubtreeControllers(cgroup string) ([]string, error) {
9✔
498
        data, err := fs.ReadFile(d.fs, filepath.Join(d.root, cgroup, "cgroup.subtree_control"))
9✔
499
        if err != nil {
10✔
500
                return nil, err
1✔
501
        }
1✔
502
        return strings.Fields(string(data)), nil
8✔
503
}
504

505
var _ CgroupDriver = (*cgroupV2Driver)(nil)
506

507
type cgroupV1Driver struct {
508
        fs   fs.FS
509
        root string
510
}
511

512
func (d *cgroupV1Driver) Path(cgroup string, kind CgroupFilePath) string {
11✔
513
        switch kind {
11✔
514
        case RootPath:
1✔
515
                return d.root
1✔
516
        case MemoryUsagePath:
4✔
517
                return filepath.Join(d.root, "memory", cgroup, "memory.usage_in_bytes")
4✔
518
        case MemoryLimitPath:
5✔
519
                return filepath.Join(d.root, "memory", cgroup, "memory.limit_in_bytes")
5✔
520
        }
521
        return ""
1✔
522
}
523

524
func (d *cgroupV1Driver) CgroupForPid(pid int) (string, error) {
4✔
525
        data, err := fs.ReadFile(d.fs, fmt.Sprintf("proc/%d/cgroup", pid))
4✔
526
        if err != nil {
5✔
527
                return "", err
1✔
528
        }
1✔
529
        name, err := parseCgroupName(data)
3✔
530
        if err != nil {
3✔
531
                return "", err
×
532
        }
×
533

534
        mountinfo, err := fs.ReadFile(d.fs, fmt.Sprintf("proc/%d/mountinfo", pid))
3✔
535
        if err != nil {
3✔
536
                return "", err
×
537
        }
×
538
        scanner := bufio.NewScanner(bytes.NewReader(mountinfo))
3✔
539
        for scanner.Scan() {
17✔
540
                line := strings.Fields(scanner.Text())
14✔
541
                if len(line) < 5 {
14✔
542
                        continue
×
543
                }
544

545
                // Entries 3 and 4 contain the root path and the mountpoint, respectively.
546
                // each resource will contain a separate mountpoint for the same path, so
547
                // we can just pick one.
548
                if line[4] == fmt.Sprintf("/%s/memory", d.root) {
17✔
549
                        mountpoint, err := filepath.Rel(line[3], name)
3✔
550
                        if err != nil {
3✔
551
                                return "", err
×
552
                        }
×
553
                        return filepath.Clean("/" + mountpoint), nil
3✔
554
                }
555
        }
556
        return "", errors.New("cgroup not found")
×
557
}
558

559
// MemoryUsage implements CgroupDriver.
560
func (d *cgroupV1Driver) MemoryUsage(cgroup string) (uint64, error) {
3✔
561
        current, err := fs.ReadFile(d.fs, d.Path(cgroup, MemoryUsagePath))
3✔
562
        if err != nil {
4✔
563
                return 0, err
1✔
564
        }
1✔
565
        return strconv.ParseUint(strings.TrimSpace(string(current)), 10, 64)
2✔
566
}
567

568
// MemoryLimit implements CgroupDriver.
569
func (d *cgroupV1Driver) MemoryLimit(cgroup string) (uint64, error) {
4✔
570
        data, err := fs.ReadFile(d.fs, d.Path(cgroup, MemoryLimitPath))
4✔
571
        if err != nil {
5✔
572
                return 0, err
1✔
573
        }
1✔
574
        v := strings.TrimSpace(string(data))
3✔
575
        if v == "max" {
4✔
576
                return 0, nil
1✔
577
        }
1✔
578
        return strconv.ParseUint(v, 10, 64)
2✔
579
}
580

581
// Validate implements CgroupDriver.
582
func (d *cgroupV1Driver) Validate(cgroup string) error {
4✔
583
        memoryPath := filepath.Join(d.root, "memory", cgroup)
4✔
584
        info, err := fs.Stat(d.fs, memoryPath)
4✔
585
        if err != nil {
5✔
586
                if errors.Is(err, fs.ErrNotExist) {
2✔
587
                        return errors.New("memory controller not enabled")
1✔
588
                }
1✔
589
                return fmt.Errorf("failed to stat cgroup: %w", err)
×
590
        }
591
        if !info.IsDir() {
3✔
592
                return fmt.Errorf("%s is not a directory", memoryPath)
×
593
        }
×
594
        return nil
3✔
595
}
596

597
var _ CgroupDriver = (*cgroupV1Driver)(nil)
598

599
func DetectCgroupDriver() (CgroupDriver, error) {
×
600
        osFs := os.DirFS("/")
×
601

×
602
        // fast path: cgroup2 only
×
603
        var stat unix.Statfs_t
×
604
        if err := unix.Statfs("/sys/fs/cgroup", &stat); err != nil {
×
605
                return nil, err
×
606
        }
×
607
        if stat.Type == unix.CGROUP2_SUPER_MAGIC {
×
608
                return &cgroupV2Driver{root: "sys/fs/cgroup", fs: osFs}, nil
×
609
        }
×
610

611
        // find the hybrid mountpoint, or fall back to v1
612
        mountpoint, isV2, err := findMountpoint(osFs)
×
613
        if err != nil {
×
614
                return nil, err
×
615
        }
×
616
        if isV2 {
×
617
                return &cgroupV2Driver{root: mountpoint, fs: osFs}, nil
×
618
        }
×
619
        return &cgroupV1Driver{root: mountpoint, fs: osFs}, nil
×
620
}
621

622
func parseCgroupName(contents []byte) (string, error) {
11✔
623
        scan := bufio.NewScanner(bytes.NewReader(contents))
11✔
624
        for scan.Scan() {
26✔
625
                line := scan.Text()
15✔
626
                if strings.HasPrefix(line, "0::") {
26✔
627
                        return strings.Split(strings.TrimPrefix(strings.TrimSpace(line), "0::"), " ")[0], nil
11✔
628
                }
11✔
629
        }
630
        return "", errors.New("cgroup not found")
×
631
}
632

633
func findMountpoint(fsys fs.FS) (mountpoint string, isV2 bool, err error) {
3✔
634
        mounts, err := fs.ReadFile(fsys, fmt.Sprintf("proc/%d/mountinfo", os.Getpid()))
3✔
635
        if err != nil {
3✔
636
                return "", false, err
×
637
        }
×
638
        scanner := bufio.NewScanner(bytes.NewReader(mounts))
3✔
639
        var cgv1Root string
3✔
640
        for scanner.Scan() {
15✔
641
                line := strings.Fields(scanner.Text())
12✔
642
                fsType := line[slices.Index(line, "-")+1]
12✔
643
                switch fsType {
12✔
644
                case "cgroup2":
2✔
645
                        return line[4][1:], true, nil
2✔
646
                case "cgroup":
1✔
647
                        if cgv1Root == "" {
2✔
648
                                cgv1Root = filepath.Dir(line[4][1:])
1✔
649
                        }
1✔
650
                }
651
        }
652
        if cgv1Root == "" {
1✔
653
                return "", false, errors.New("no cgroup mount found")
×
654
        }
×
655
        return cgv1Root, false, nil
1✔
656
}
657

658
func marshalAny(msg proto.Message) *anypb.Any {
2✔
659
        data := new(anypb.Any)
2✔
660
        _ = anypb.MarshalFrom(data, msg, proto.MarshalOptions{
2✔
661
                AllowPartial:  true,
2✔
662
                Deterministic: true,
2✔
663
        })
2✔
664
        return data
2✔
665
}
2✔
666

667
type memoryLimitWatcher struct {
668
        limitFilePath string
669

670
        value atomic.Uint64
671

672
        watches sync.WaitGroup
673
}
674

675
func (w *memoryLimitWatcher) Value() uint64 {
7✔
676
        return w.value.Load()
7✔
677
}
7✔
678

679
func (w *memoryLimitWatcher) readValue() (uint64, error) {
13✔
680
        data, err := os.ReadFile(w.limitFilePath)
13✔
681
        if err != nil {
13✔
682
                return 0, err
×
683
        }
×
684
        v := strings.TrimSpace(string(data))
13✔
685
        if v == "max" {
14✔
686
                return 0, nil
1✔
687
        }
1✔
688
        return strconv.ParseUint(v, 10, 64)
12✔
689
}
690

691
func (w *memoryLimitWatcher) Watch(ctx context.Context) error {
2✔
692
        fd, err := unix.InotifyInit1(unix.IN_CLOEXEC)
2✔
693
        if err != nil {
2✔
694
                return err
×
695
        }
×
696
        closeInotify := sync.OnceFunc(func() {
4✔
697
                log.Ctx(ctx).Debug().Msg("stopping memory limit watcher")
2✔
698
                unix.Close(fd)
2✔
699
        })
2✔
700
        log.Ctx(ctx).Debug().Str("file", w.limitFilePath).Msg("starting watch")
2✔
701
        wd, err := unix.InotifyAddWatch(fd, w.limitFilePath, unix.IN_MODIFY)
2✔
702
        if err != nil {
2✔
703
                closeInotify()
×
704
                return fmt.Errorf("failed to watch %s: %w", w.limitFilePath, err)
×
705
        }
×
706
        w.watches.Add(1)
2✔
707
        closeWatch := sync.OnceFunc(func() {
4✔
708
                log.Ctx(ctx).Debug().Str("file", w.limitFilePath).Msg("stopping watch")
2✔
709
                _, _ = unix.InotifyRmWatch(fd, uint32(wd))
2✔
710
                closeInotify()
2✔
711
                w.watches.Done()
2✔
712
        })
2✔
713

714
        // perform the initial read synchronously and only after setting up the watch
715
        v, err := w.readValue()
2✔
716
        if err != nil {
2✔
717
                closeWatch()
×
718
                return err
×
719
        }
×
720
        w.value.Store(v)
2✔
721
        log.Ctx(ctx).Debug().Uint64("bytes", v).Msg("current memory limit")
2✔
722

2✔
723
        context.AfterFunc(ctx, closeWatch) // to unblock unix.Read below
2✔
724
        go func() {
4✔
725
                defer closeWatch()
2✔
726
                var buf [unix.SizeofInotifyEvent]byte
2✔
727
                for ctx.Err() == nil {
13✔
728
                        v, err := w.readValue()
11✔
729
                        if err != nil {
11✔
UNCOV
730
                                log.Ctx(ctx).Error().Err(err).Msg("error reading memory limit")
×
731
                        } else if prev := w.value.Swap(v); prev != v {
15✔
732
                                log.Ctx(ctx).Debug().
4✔
733
                                        Uint64("prev", prev).
4✔
734
                                        Uint64("current", v).
4✔
735
                                        Msg("memory limit updated")
4✔
736
                        }
4✔
737
                        // After ctx is canceled, inotify_rm_watch sends an IN_IGNORED event,
738
                        // which unblocks this read and allows the loop to exit.
739
                        n, err := unix.Read(fd, buf[:])
11✔
740
                        if err != nil {
11✔
741
                                if errors.Is(err, unix.EINTR) {
×
742
                                        continue
×
743
                                }
744
                                return
×
745
                        }
746
                        if n == unix.SizeofInotifyEvent {
22✔
747
                                event := (*unix.InotifyEvent)(unsafe.Pointer(&buf))
11✔
748
                                if (event.Mask & unix.IN_IGNORED) != 0 {
13✔
749
                                        // watch was removed, or the file was deleted (this can happen if
2✔
750
                                        // the memory controller is removed from the parent's subtree_control)
2✔
751
                                        log.Ctx(ctx).Info().Str("file", w.limitFilePath).Msg("watched file removed")
2✔
752
                                        return
2✔
753
                                }
2✔
754
                        }
755
                }
756
        }()
757

758
        return nil
2✔
759
}
760

761
// Wait blocks until all watches have been closed.
762
//
763
// Example use:
764
//
765
//        ctx, ca := context.WithCancel(context.Background())
766
//        w := &memoryLimitWatcher{...}
767
//        w.Watch(ctx)
768
//        ...
769
//        ca()
770
//        w.Wait() // blocks until the previous watch is closed
771
func (w *memoryLimitWatcher) Wait() {
2✔
772
        w.watches.Wait()
2✔
773
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc