• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mvisonneau / gitlab-ci-pipelines-exporter / 12893008104

21 Jan 2025 05:50PM UTC coverage: 64.413% (-0.8%) from 65.207%
12893008104

Pull #983

github

xNok
fix: a couple issue with the rebase
Pull Request #983: feat: Improve refs garbage collection for redis Redis using TTL

42 of 138 new or added lines in 7 files covered. (30.43%)

1 existing line in 1 file now uncovered.

3640 of 5651 relevant lines covered (64.41%)

3.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.66
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5

6
        "github.com/google/uuid"
7
        "github.com/pkg/errors"
8
        "github.com/redis/go-redis/extra/redisotel/v9"
9
        "github.com/redis/go-redis/v9"
10
        log "github.com/sirupsen/logrus"
11
        "github.com/vmihailenco/taskq/v4"
12
        "go.opentelemetry.io/otel"
13
        "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
14
        "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
15
        "go.opentelemetry.io/otel/sdk/resource"
16
        sdktrace "go.opentelemetry.io/otel/sdk/trace"
17
        semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
18
        "google.golang.org/grpc"
19

20
        "github.com/mvisonneau/gitlab-ci-pipelines-exporter/pkg/config"
21
        "github.com/mvisonneau/gitlab-ci-pipelines-exporter/pkg/gitlab"
22
        "github.com/mvisonneau/gitlab-ci-pipelines-exporter/pkg/ratelimit"
23
        "github.com/mvisonneau/gitlab-ci-pipelines-exporter/pkg/schemas"
24
        "github.com/mvisonneau/gitlab-ci-pipelines-exporter/pkg/store"
25
)
26

27
const tracerName = "gitlab-ci-pipelines-exporter"
28

29
// Controller holds the necessary clients to run the app and handle requests.
30
type Controller struct {
31
        Config         config.Config
32
        Redis          *redis.Client
33
        Gitlab         *gitlab.Client
34
        Store          store.Store
35
        TaskController TaskController
36

37
        // UUID is used to identify this controller/process amongst others when
38
        // the exporter is running in cluster mode, leveraging Redis.
39
        UUID uuid.UUID
40
}
41

42
// New creates a new controller.
43
func New(ctx context.Context, cfg config.Config, version string) (c Controller, err error) {
21✔
44
        c.Config = cfg
21✔
45
        c.UUID = uuid.New()
21✔
46

21✔
47
        if err = configureTracing(ctx, cfg.OpenTelemetry.GRPCEndpoint); err != nil {
21✔
48
                return
×
49
        }
×
50

51
        if err = c.configureRedis(ctx, &cfg.Redis); err != nil {
21✔
52
                return
×
53
        }
×
54

55
        c.TaskController = NewTaskController(ctx, c.Redis, cfg.Gitlab.MaximumJobsQueueSize)
21✔
56
        c.registerTasks()
21✔
57

21✔
58
        var redisStore *store.Redis
21✔
59
        if c.Redis != nil {
21✔
NEW
60
                redisStore = store.NewRedisStore(c.Redis, store.WithTTLConfig(&store.RedisTTLConfig{
×
NEW
61
                        Project: cfg.Redis.ProjectTTL,
×
NEW
62
                        Ref:     cfg.Redis.RefTTL,
×
NEW
63
                        Metric:  cfg.Redis.MetricTTL,
×
NEW
64
                }))
×
NEW
65
        }
×
66

67
        c.Store = store.New(ctx, redisStore, c.Config.Projects)
21✔
68

21✔
69
        if err = c.configureGitlab(cfg.Gitlab, version); err != nil {
21✔
70
                return
×
71
        }
×
72

73
        // Start the scheduler
74
        c.Schedule(ctx, cfg.Pull, cfg.GarbageCollect)
21✔
75

21✔
76
        return
21✔
77
}
78

79
func (c *Controller) registerTasks() {
21✔
80
        for n, h := range map[schemas.TaskType]interface{}{
21✔
81
                schemas.TaskTypeGarbageCollectEnvironments:   c.TaskHandlerGarbageCollectEnvironments,
21✔
82
                schemas.TaskTypeGarbageCollectMetrics:        c.TaskHandlerGarbageCollectMetrics,
21✔
83
                schemas.TaskTypeGarbageCollectProjects:       c.TaskHandlerGarbageCollectProjects,
21✔
84
                schemas.TaskTypeGarbageCollectRefs:           c.TaskHandlerGarbageCollectRefs,
21✔
85
                schemas.TaskTypePullEnvironmentMetrics:       c.TaskHandlerPullEnvironmentMetrics,
21✔
86
                schemas.TaskTypePullEnvironmentsFromProject:  c.TaskHandlerPullEnvironmentsFromProject,
21✔
87
                schemas.TaskTypePullEnvironmentsFromProjects: c.TaskHandlerPullEnvironmentsFromProjects,
21✔
88
                schemas.TaskTypePullMetrics:                  c.TaskHandlerPullMetrics,
21✔
89
                schemas.TaskTypePullProject:                  c.TaskHandlerPullProject,
21✔
90
                schemas.TaskTypePullProjectsFromWildcard:     c.TaskHandlerPullProjectsFromWildcard,
21✔
91
                schemas.TaskTypePullProjectsFromWildcards:    c.TaskHandlerPullProjectsFromWildcards,
21✔
92
                schemas.TaskTypePullRefMetrics:               c.TaskHandlerPullRefMetrics,
21✔
93
                schemas.TaskTypePullRefsFromProject:          c.TaskHandlerPullRefsFromProject,
21✔
94
                schemas.TaskTypePullRefsFromProjects:         c.TaskHandlerPullRefsFromProjects,
21✔
95
        } {
315✔
96
                _, _ = c.TaskController.TaskMap.Register(string(n), &taskq.TaskConfig{
294✔
97
                        Handler:    h,
294✔
98
                        RetryLimit: 1,
294✔
99
                })
294✔
100
        }
294✔
101
}
102

103
func (c *Controller) unqueueTask(ctx context.Context, tt schemas.TaskType, uniqueID string) {
6✔
104
        if err := c.Store.UnqueueTask(ctx, tt, uniqueID); err != nil {
6✔
105
                log.WithContext(ctx).
×
106
                        WithFields(log.Fields{
×
107
                                "task_type":      tt,
×
108
                                "task_unique_id": uniqueID,
×
109
                        }).
×
110
                        WithError(err).
×
111
                        Warn("unqueuing task")
×
112
        }
×
113
}
114

115
func configureTracing(ctx context.Context, grpcEndpoint string) error {
21✔
116
        if len(grpcEndpoint) == 0 {
42✔
117
                log.Debug("opentelemetry.grpc_endpoint is not configured, skipping open telemetry support")
21✔
118

21✔
119
                return nil
21✔
120
        }
21✔
121

122
        log.WithFields(log.Fields{
×
123
                "opentelemetry_grpc_endpoint": grpcEndpoint,
×
124
        }).Info("opentelemetry gRPC endpoint provided, initializing connection..")
×
125

×
126
        traceClient := otlptracegrpc.NewClient(
×
127
                otlptracegrpc.WithInsecure(),
×
128
                otlptracegrpc.WithEndpoint(grpcEndpoint),
×
129
                otlptracegrpc.WithDialOption(grpc.WithBlock()))
×
130

×
131
        traceExp, err := otlptrace.New(ctx, traceClient)
×
132
        if err != nil {
×
133
                return err
×
134
        }
×
135

136
        res, err := resource.New(ctx,
×
137
                resource.WithFromEnv(),
×
138
                resource.WithProcess(),
×
139
                resource.WithTelemetrySDK(),
×
140
                resource.WithHost(),
×
141
                resource.WithAttributes(
×
142
                        semconv.ServiceNameKey.String("gitlab-ci-pipelines-exporter"),
×
143
                ),
×
144
        )
×
145
        if err != nil {
×
146
                return err
×
147
        }
×
148

149
        bsp := sdktrace.NewBatchSpanProcessor(traceExp)
×
150
        tracerProvider := sdktrace.NewTracerProvider(
×
151
                sdktrace.WithSampler(sdktrace.AlwaysSample()),
×
152
                sdktrace.WithResource(res),
×
153
                sdktrace.WithSpanProcessor(bsp),
×
154
        )
×
155

×
156
        otel.SetTracerProvider(tracerProvider)
×
157

×
158
        return nil
×
159
}
160

161
func (c *Controller) configureGitlab(cfg config.Gitlab, version string) (err error) {
22✔
162
        var rl ratelimit.Limiter
22✔
163

22✔
164
        if c.Redis != nil {
22✔
165
                rl = ratelimit.NewRedisLimiter(c.Redis, cfg.MaximumRequestsPerSecond)
×
166
        } else {
22✔
167
                rl = ratelimit.NewLocalLimiter(cfg.MaximumRequestsPerSecond, cfg.BurstableRequestsPerSecond)
22✔
168
        }
22✔
169

170
        c.Gitlab, err = gitlab.NewClient(gitlab.ClientConfig{
22✔
171
                URL:              cfg.URL,
22✔
172
                Token:            cfg.Token,
22✔
173
                DisableTLSVerify: !cfg.EnableTLSVerify,
22✔
174
                UserAgentVersion: version,
22✔
175
                RateLimiter:      rl,
22✔
176
                ReadinessURL:     cfg.HealthURL,
22✔
177
        })
22✔
178

22✔
179
        return
22✔
180
}
181

182
func (c *Controller) configureRedis(ctx context.Context, config *config.Redis) (err error) {
21✔
183
        ctx, span := otel.Tracer(tracerName).Start(ctx, "controller:configureRedis")
21✔
184
        defer span.End()
21✔
185

21✔
186
        if len(config.URL) <= 0 {
42✔
187
                log.Debug("redis url is not configured, skipping configuration & using local driver")
21✔
188

21✔
189
                return
21✔
190
        }
21✔
191

192
        log.Info("redis url configured, initializing connection..")
×
193

×
194
        var opt *redis.Options
×
195

×
NEW
196
        if opt, err = redis.ParseURL(config.URL); err != nil {
×
197
                return
×
198
        }
×
199

200
        c.Redis = redis.NewClient(opt)
×
201

×
202
        if err = redisotel.InstrumentTracing(c.Redis); err != nil {
×
203
                return
×
204
        }
×
205

206
        if _, err := c.Redis.Ping(ctx).Result(); err != nil {
×
207
                return errors.Wrap(err, "connecting to redis")
×
208
        }
×
209

210
        log.Info("connected to redis")
×
211

×
212
        return
×
213
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc