• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mvisonneau / gitlab-ci-pipelines-exporter / 15584626810

11 Jun 2025 12:17PM UTC coverage: 64.904% (-0.5%) from 65.4%
15584626810

push

github

web-flow
tools: moved to new go 1.24 approach using go get -tool and upgraded to golangci v2 (#1009)

26 of 49 new or added lines in 17 files covered. (53.06%)

22 existing lines in 1 file now uncovered.

3645 of 5616 relevant lines covered (64.9%)

3.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.08
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5

6
        "github.com/google/uuid"
7
        "github.com/pkg/errors"
8
        "github.com/redis/go-redis/extra/redisotel/v9"
9
        "github.com/redis/go-redis/v9"
10
        log "github.com/sirupsen/logrus"
11
        "github.com/vmihailenco/taskq/v4"
12
        "go.opentelemetry.io/otel"
13
        "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
14
        "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
15
        "go.opentelemetry.io/otel/sdk/resource"
16
        sdktrace "go.opentelemetry.io/otel/sdk/trace"
17
        semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
18

19
        "github.com/mvisonneau/gitlab-ci-pipelines-exporter/pkg/config"
20
        "github.com/mvisonneau/gitlab-ci-pipelines-exporter/pkg/gitlab"
21
        "github.com/mvisonneau/gitlab-ci-pipelines-exporter/pkg/ratelimit"
22
        "github.com/mvisonneau/gitlab-ci-pipelines-exporter/pkg/schemas"
23
        "github.com/mvisonneau/gitlab-ci-pipelines-exporter/pkg/store"
24
)
25

26
const tracerName = "gitlab-ci-pipelines-exporter"
27

28
// Controller holds the necessary clients to run the app and handle requests.
29
type Controller struct {
30
        Config         config.Config
31
        Redis          *redis.Client
32
        Gitlab         *gitlab.Client
33
        Store          store.Store
34
        TaskController TaskController
35

36
        // UUID is used to identify this controller/process amongst others when
37
        // the exporter is running in cluster mode, leveraging Redis.
38
        UUID uuid.UUID
39
}
40

41
// New creates a new controller.
42
func New(ctx context.Context, cfg config.Config, version string) (c Controller, err error) {
21✔
43
        c.Config = cfg
21✔
44
        c.UUID = uuid.New()
21✔
45

21✔
46
        if err = configureTracing(ctx, cfg.OpenTelemetry.GRPCEndpoint); err != nil {
21✔
47
                return
×
48
        }
×
49

50
        if err = c.configureRedis(ctx, cfg.Redis.URL); err != nil {
21✔
51
                return
×
52
        }
×
53

54
        c.TaskController = NewTaskController(ctx, c.Redis, cfg.Gitlab.MaximumJobsQueueSize)
21✔
55
        c.registerTasks()
21✔
56

21✔
57
        c.Store = store.New(ctx, c.Redis, c.Config.Projects)
21✔
58

21✔
59
        if err = c.configureGitlab(cfg.Gitlab, version); err != nil {
21✔
60
                return
×
61
        }
×
62

63
        // Start the scheduler
64
        c.Schedule(ctx, cfg.Pull, cfg.GarbageCollect)
21✔
65

21✔
66
        return
21✔
67
}
68

69
func (c *Controller) registerTasks() {
21✔
70
        for n, h := range map[schemas.TaskType]interface{}{
21✔
71
                schemas.TaskTypeGarbageCollectEnvironments:   c.TaskHandlerGarbageCollectEnvironments,
21✔
72
                schemas.TaskTypeGarbageCollectMetrics:        c.TaskHandlerGarbageCollectMetrics,
21✔
73
                schemas.TaskTypeGarbageCollectProjects:       c.TaskHandlerGarbageCollectProjects,
21✔
74
                schemas.TaskTypeGarbageCollectRefs:           c.TaskHandlerGarbageCollectRefs,
21✔
75
                schemas.TaskTypePullEnvironmentMetrics:       c.TaskHandlerPullEnvironmentMetrics,
21✔
76
                schemas.TaskTypePullEnvironmentsFromProject:  c.TaskHandlerPullEnvironmentsFromProject,
21✔
77
                schemas.TaskTypePullEnvironmentsFromProjects: c.TaskHandlerPullEnvironmentsFromProjects,
21✔
78
                schemas.TaskTypePullMetrics:                  c.TaskHandlerPullMetrics,
21✔
79
                schemas.TaskTypePullProject:                  c.TaskHandlerPullProject,
21✔
80
                schemas.TaskTypePullProjectsFromWildcard:     c.TaskHandlerPullProjectsFromWildcard,
21✔
81
                schemas.TaskTypePullProjectsFromWildcards:    c.TaskHandlerPullProjectsFromWildcards,
21✔
82
                schemas.TaskTypePullRefMetrics:               c.TaskHandlerPullRefMetrics,
21✔
83
                schemas.TaskTypePullRefsFromProject:          c.TaskHandlerPullRefsFromProject,
21✔
84
                schemas.TaskTypePullRefsFromProjects:         c.TaskHandlerPullRefsFromProjects,
21✔
85
        } {
315✔
86
                _, _ = c.TaskController.TaskMap.Register(string(n), &taskq.TaskConfig{
294✔
87
                        Handler:    h,
294✔
88
                        RetryLimit: 1,
294✔
89
                })
294✔
90
        }
294✔
91
}
92

93
func (c *Controller) unqueueTask(ctx context.Context, tt schemas.TaskType, uniqueID string) {
6✔
94
        if err := c.Store.UnqueueTask(ctx, tt, uniqueID); err != nil {
6✔
95
                log.WithContext(ctx).
×
96
                        WithFields(log.Fields{
×
97
                                "task_type":      tt,
×
98
                                "task_unique_id": uniqueID,
×
99
                        }).
×
100
                        WithError(err).
×
101
                        Warn("unqueuing task")
×
102
        }
×
103
}
104

105
func configureTracing(ctx context.Context, grpcEndpoint string) error {
21✔
106
        if len(grpcEndpoint) == 0 {
42✔
107
                log.Debug("opentelemetry.grpc_endpoint is not configured, skipping open telemetry support")
21✔
108

21✔
109
                return nil
21✔
110
        }
21✔
111

112
        log.WithFields(log.Fields{
×
113
                "opentelemetry_grpc_endpoint": grpcEndpoint,
×
114
        }).Info("opentelemetry gRPC endpoint provided, initializing connection..")
×
115

×
116
        traceClient := otlptracegrpc.NewClient(
×
117
                otlptracegrpc.WithInsecure(),
×
118
                otlptracegrpc.WithEndpoint(grpcEndpoint),
×
NEW
119
        )
×
120

×
121
        traceExp, err := otlptrace.New(ctx, traceClient)
×
122
        if err != nil {
×
123
                return err
×
124
        }
×
125

126
        res, err := resource.New(ctx,
×
127
                resource.WithFromEnv(),
×
128
                resource.WithProcess(),
×
129
                resource.WithTelemetrySDK(),
×
130
                resource.WithHost(),
×
131
                resource.WithAttributes(
×
132
                        semconv.ServiceNameKey.String("gitlab-ci-pipelines-exporter"),
×
133
                ),
×
134
        )
×
135
        if err != nil {
×
136
                return err
×
137
        }
×
138

139
        bsp := sdktrace.NewBatchSpanProcessor(traceExp)
×
140
        tracerProvider := sdktrace.NewTracerProvider(
×
141
                sdktrace.WithSampler(sdktrace.AlwaysSample()),
×
142
                sdktrace.WithResource(res),
×
143
                sdktrace.WithSpanProcessor(bsp),
×
144
        )
×
145

×
146
        otel.SetTracerProvider(tracerProvider)
×
147

×
148
        return nil
×
149
}
150

151
func (c *Controller) configureGitlab(cfg config.Gitlab, version string) (err error) {
22✔
152
        var rl ratelimit.Limiter
22✔
153

22✔
154
        if c.Redis != nil {
22✔
155
                rl = ratelimit.NewRedisLimiter(c.Redis, cfg.MaximumRequestsPerSecond)
×
156
        } else {
22✔
157
                rl = ratelimit.NewLocalLimiter(cfg.MaximumRequestsPerSecond, cfg.BurstableRequestsPerSecond)
22✔
158
        }
22✔
159

160
        c.Gitlab, err = gitlab.NewClient(gitlab.ClientConfig{
22✔
161
                URL:              cfg.URL,
22✔
162
                Token:            cfg.Token,
22✔
163
                DisableTLSVerify: !cfg.EnableTLSVerify,
22✔
164
                UserAgentVersion: version,
22✔
165
                RateLimiter:      rl,
22✔
166
                ReadinessURL:     cfg.HealthURL,
22✔
167
        })
22✔
168

22✔
169
        return
22✔
170
}
171

172
func (c *Controller) configureRedis(ctx context.Context, url string) (err error) {
21✔
173
        ctx, span := otel.Tracer(tracerName).Start(ctx, "controller:configureRedis")
21✔
174
        defer span.End()
21✔
175

21✔
176
        if len(url) <= 0 {
42✔
177
                log.Debug("redis url is not configured, skipping configuration & using local driver")
21✔
178

21✔
179
                return
21✔
180
        }
21✔
181

182
        log.Info("redis url configured, initializing connection..")
×
183

×
184
        var opt *redis.Options
×
185

×
186
        if opt, err = redis.ParseURL(url); err != nil {
×
187
                return
×
188
        }
×
189

190
        c.Redis = redis.NewClient(opt)
×
191

×
192
        if err = redisotel.InstrumentTracing(c.Redis); err != nil {
×
193
                return
×
194
        }
×
195

196
        if _, err := c.Redis.Ping(ctx).Result(); err != nil {
×
197
                return errors.Wrap(err, "connecting to redis")
×
198
        }
×
199

200
        log.Info("connected to redis")
×
201

×
202
        return
×
203
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc