• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mvisonneau / gitlab-ci-pipelines-exporter / 17455371724

04 Sep 2025 06:29AM UTC coverage: 64.156% (-0.8%) from 64.935%
17455371724

push

github

web-flow
feat: Improve refs garbage collection for redis Redis using TTL (#983)

* feat: add merge request handle to garbage collect closed mr refs

* refactor: simplify the code

we already have a deleteRef function

* fix: if no pipeline found check for merge results pipeline

* fix: add missing return case when no pipeline found

* feat: add ttl on redis field

only redis 7.4 support expire on hashmap thus i had to make a workaround using keys

* fix: continue if key has expired

* chore: better debug for garbage collection

* fix: debug log reporting wrong progress

* fix: a couple issue with the rebase

42 of 138 new or added lines in 7 files covered. (30.43%)

1 existing line in 1 file now uncovered.

3671 of 5722 relevant lines covered (64.16%)

3.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.66
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5

6
        "github.com/google/uuid"
7
        "github.com/pkg/errors"
8
        "github.com/redis/go-redis/extra/redisotel/v9"
9
        "github.com/redis/go-redis/v9"
10
        log "github.com/sirupsen/logrus"
11
        "github.com/vmihailenco/taskq/v4"
12
        "go.opentelemetry.io/otel"
13
        "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
14
        "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
15
        "go.opentelemetry.io/otel/sdk/resource"
16
        sdktrace "go.opentelemetry.io/otel/sdk/trace"
17
        semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
18

19
        "github.com/mvisonneau/gitlab-ci-pipelines-exporter/pkg/config"
20
        "github.com/mvisonneau/gitlab-ci-pipelines-exporter/pkg/gitlab"
21
        "github.com/mvisonneau/gitlab-ci-pipelines-exporter/pkg/ratelimit"
22
        "github.com/mvisonneau/gitlab-ci-pipelines-exporter/pkg/schemas"
23
        "github.com/mvisonneau/gitlab-ci-pipelines-exporter/pkg/store"
24
)
25

26
const tracerName = "gitlab-ci-pipelines-exporter"
27

28
// Controller holds the necessary clients to run the app and handle requests.
29
type Controller struct {
30
        Config         config.Config
31
        Redis          *redis.Client
32
        Gitlab         *gitlab.Client
33
        Store          store.Store
34
        TaskController TaskController
35

36
        // UUID is used to identify this controller/process amongst others when
37
        // the exporter is running in cluster mode, leveraging Redis.
38
        UUID uuid.UUID
39
}
40

41
// New creates a new controller.
42
func New(ctx context.Context, cfg config.Config, version string) (c Controller, err error) {
21✔
43
        c.Config = cfg
21✔
44
        c.UUID = uuid.New()
21✔
45

21✔
46
        if err = configureTracing(ctx, cfg.OpenTelemetry.GRPCEndpoint); err != nil {
21✔
47
                return
×
48
        }
×
49

50
        if err = c.configureRedis(ctx, &cfg.Redis); err != nil {
21✔
51
                return
×
52
        }
×
53

54
        c.TaskController = NewTaskController(ctx, c.Redis, cfg.Gitlab.MaximumJobsQueueSize)
21✔
55
        c.registerTasks()
21✔
56

21✔
57
        var redisStore *store.Redis
21✔
58
        if c.Redis != nil {
21✔
NEW
59
                redisStore = store.NewRedisStore(c.Redis, store.WithTTLConfig(&store.RedisTTLConfig{
×
NEW
60
                        Project: cfg.Redis.ProjectTTL,
×
NEW
61
                        Ref:     cfg.Redis.RefTTL,
×
NEW
62
                        Metric:  cfg.Redis.MetricTTL,
×
NEW
63
                }))
×
NEW
64
        }
×
65

66
        c.Store = store.New(ctx, redisStore, c.Config.Projects)
21✔
67

21✔
68
        if err = c.configureGitlab(cfg.Gitlab, version); err != nil {
21✔
69
                return
×
70
        }
×
71

72
        // Start the scheduler
73
        c.Schedule(ctx, cfg.Pull, cfg.GarbageCollect)
21✔
74

21✔
75
        return
21✔
76
}
77

78
func (c *Controller) registerTasks() {
21✔
79
        for n, h := range map[schemas.TaskType]interface{}{
21✔
80
                schemas.TaskTypeGarbageCollectEnvironments:   c.TaskHandlerGarbageCollectEnvironments,
21✔
81
                schemas.TaskTypeGarbageCollectMetrics:        c.TaskHandlerGarbageCollectMetrics,
21✔
82
                schemas.TaskTypeGarbageCollectProjects:       c.TaskHandlerGarbageCollectProjects,
21✔
83
                schemas.TaskTypeGarbageCollectRefs:           c.TaskHandlerGarbageCollectRefs,
21✔
84
                schemas.TaskTypePullEnvironmentMetrics:       c.TaskHandlerPullEnvironmentMetrics,
21✔
85
                schemas.TaskTypePullEnvironmentsFromProject:  c.TaskHandlerPullEnvironmentsFromProject,
21✔
86
                schemas.TaskTypePullEnvironmentsFromProjects: c.TaskHandlerPullEnvironmentsFromProjects,
21✔
87
                schemas.TaskTypePullMetrics:                  c.TaskHandlerPullMetrics,
21✔
88
                schemas.TaskTypePullProject:                  c.TaskHandlerPullProject,
21✔
89
                schemas.TaskTypePullProjectsFromWildcard:     c.TaskHandlerPullProjectsFromWildcard,
21✔
90
                schemas.TaskTypePullProjectsFromWildcards:    c.TaskHandlerPullProjectsFromWildcards,
21✔
91
                schemas.TaskTypePullRefMetrics:               c.TaskHandlerPullRefMetrics,
21✔
92
                schemas.TaskTypePullRefsFromProject:          c.TaskHandlerPullRefsFromProject,
21✔
93
                schemas.TaskTypePullRefsFromProjects:         c.TaskHandlerPullRefsFromProjects,
21✔
94
        } {
315✔
95
                _, _ = c.TaskController.TaskMap.Register(string(n), &taskq.TaskConfig{
294✔
96
                        Handler:    h,
294✔
97
                        RetryLimit: 1,
294✔
98
                })
294✔
99
        }
294✔
100
}
101

102
func (c *Controller) unqueueTask(ctx context.Context, tt schemas.TaskType, uniqueID string) {
6✔
103
        if err := c.Store.UnqueueTask(ctx, tt, uniqueID); err != nil {
6✔
104
                log.WithContext(ctx).
×
105
                        WithFields(log.Fields{
×
106
                                "task_type":      tt,
×
107
                                "task_unique_id": uniqueID,
×
108
                        }).
×
109
                        WithError(err).
×
110
                        Warn("unqueuing task")
×
111
        }
×
112
}
113

114
func configureTracing(ctx context.Context, grpcEndpoint string) error {
21✔
115
        if len(grpcEndpoint) == 0 {
42✔
116
                log.Debug("opentelemetry.grpc_endpoint is not configured, skipping open telemetry support")
21✔
117

21✔
118
                return nil
21✔
119
        }
21✔
120

121
        log.WithFields(log.Fields{
×
122
                "opentelemetry_grpc_endpoint": grpcEndpoint,
×
123
        }).Info("opentelemetry gRPC endpoint provided, initializing connection..")
×
124

×
125
        traceClient := otlptracegrpc.NewClient(
×
126
                otlptracegrpc.WithInsecure(),
×
127
                otlptracegrpc.WithEndpoint(grpcEndpoint),
×
128
        )
×
129

×
130
        traceExp, err := otlptrace.New(ctx, traceClient)
×
131
        if err != nil {
×
132
                return err
×
133
        }
×
134

135
        res, err := resource.New(ctx,
×
136
                resource.WithFromEnv(),
×
137
                resource.WithProcess(),
×
138
                resource.WithTelemetrySDK(),
×
139
                resource.WithHost(),
×
140
                resource.WithAttributes(
×
141
                        semconv.ServiceNameKey.String("gitlab-ci-pipelines-exporter"),
×
142
                ),
×
143
        )
×
144
        if err != nil {
×
145
                return err
×
146
        }
×
147

148
        bsp := sdktrace.NewBatchSpanProcessor(traceExp)
×
149
        tracerProvider := sdktrace.NewTracerProvider(
×
150
                sdktrace.WithSampler(sdktrace.AlwaysSample()),
×
151
                sdktrace.WithResource(res),
×
152
                sdktrace.WithSpanProcessor(bsp),
×
153
        )
×
154

×
155
        otel.SetTracerProvider(tracerProvider)
×
156

×
157
        return nil
×
158
}
159

160
func (c *Controller) configureGitlab(cfg config.Gitlab, version string) (err error) {
22✔
161
        var rl ratelimit.Limiter
22✔
162

22✔
163
        if c.Redis != nil {
22✔
164
                rl = ratelimit.NewRedisLimiter(c.Redis, cfg.MaximumRequestsPerSecond)
×
165
        } else {
22✔
166
                rl = ratelimit.NewLocalLimiter(cfg.MaximumRequestsPerSecond, cfg.BurstableRequestsPerSecond)
22✔
167
        }
22✔
168

169
        c.Gitlab, err = gitlab.NewClient(gitlab.ClientConfig{
22✔
170
                URL:              cfg.URL,
22✔
171
                Token:            cfg.Token,
22✔
172
                DisableTLSVerify: !cfg.EnableTLSVerify,
22✔
173
                UserAgentVersion: version,
22✔
174
                RateLimiter:      rl,
22✔
175
                ReadinessURL:     cfg.HealthURL,
22✔
176
        })
22✔
177

22✔
178
        return
22✔
179
}
180

181
func (c *Controller) configureRedis(ctx context.Context, config *config.Redis) (err error) {
21✔
182
        ctx, span := otel.Tracer(tracerName).Start(ctx, "controller:configureRedis")
21✔
183
        defer span.End()
21✔
184

21✔
185
        if len(config.URL) <= 0 {
42✔
186
                log.Debug("redis url is not configured, skipping configuration & using local driver")
21✔
187

21✔
188
                return
21✔
189
        }
21✔
190

191
        log.Info("redis url configured, initializing connection..")
×
192

×
193
        var opt *redis.Options
×
194

×
NEW
195
        if opt, err = redis.ParseURL(config.URL); err != nil {
×
196
                return
×
197
        }
×
198

199
        c.Redis = redis.NewClient(opt)
×
200

×
201
        if err = redisotel.InstrumentTracing(c.Redis); err != nil {
×
202
                return
×
203
        }
×
204

205
        if _, err := c.Redis.Ping(ctx).Result(); err != nil {
×
206
                return errors.Wrap(err, "connecting to redis")
×
207
        }
×
208

209
        log.Info("connected to redis")
×
210

×
211
        return
×
212
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc