• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

elastic / cloudbeat / 16114132341

07 Jul 2025 10:09AM UTC coverage: 76.124% (-0.07%) from 76.193%
16114132341

Pull #3430

github

orestisfl
Add mocks
Pull Request #3430: Add OpenTelemetry

147 of 205 new or added lines in 7 files covered. (71.71%)

2 existing lines in 1 file now uncovered.

9463 of 12431 relevant lines covered (76.12%)

16.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.49
/internal/infra/observability/otel.go
1
// Licensed to Elasticsearch B.V. under one or more contributor
2
// license agreements. See the NOTICE file distributed with
3
// this work for additional information regarding copyright
4
// ownership. Elasticsearch B.V. licenses this file to you under
5
// the Apache License, Version 2.0 (the "License"); you may
6
// not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17

18
package observability
19

20
import (
21
        "context"
22
        "errors"
23
        "fmt"
24
        "os"
25

26
        "github.com/elastic/elastic-agent-libs/logp"
27
        "github.com/go-logr/logr"
28
        "go.opentelemetry.io/otel"
29
        "go.opentelemetry.io/otel/codes"
30
        "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
31
        "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
32
        "go.opentelemetry.io/otel/metric"
33
        sdkmetric "go.opentelemetry.io/otel/sdk/metric"
34
        "go.opentelemetry.io/otel/sdk/resource"
35
        sdktrace "go.opentelemetry.io/otel/sdk/trace"
36
        semconv "go.opentelemetry.io/otel/semconv/v1.34.0"
37
        "go.opentelemetry.io/otel/trace"
38
        "go.uber.org/zap"
39

40
        "github.com/elastic/cloudbeat/version"
41
)
42

43
const (
44
        serviceName    = "cloudbeat"
45
        endpointEnvVar = "OTEL_EXPORTER_OTLP_ENDPOINT"
46
)
47

48
type gracefulCloser interface {
49
        ForceFlush(ctx context.Context) error
50
        Shutdown(ctx context.Context) error
51
}
52

53
type otelProviders struct {
54
        traceProvider tracerProvider
55
        meterProvider meterProvider
56
}
57

58
// SetUpOtel initializes OpenTelemetry logging, tracing, and metrics providers.
59
// It configures OTLP exporters that send data to an OTLP endpoint
60
// (e.g., APM Server) configured via environment variables.
61
func SetUpOtel(ctx context.Context, logger *logp.Logger) (context.Context, error) {
1✔
62
        logger = logger.Named("otel")
1✔
63
        if os.Getenv(endpointEnvVar) == "" {
1✔
NEW
64
                logger.Infof("%s is not set, skipping OpenTelemetry setup", endpointEnvVar)
×
NEW
65
                return ctx, nil
×
NEW
66
        }
×
67

68
        wrap := wrapLogger{l: logger}
1✔
69
        otel.SetLogger(logr.New(&wrap))
1✔
70
        otel.SetErrorHandler(&wrap)
1✔
71

1✔
72
        res, err := newResource(ctx)
1✔
73
        if err != nil {
1✔
NEW
74
                return ctx, fmt.Errorf("failed to create OTel resource: %w", err)
×
NEW
75
        }
×
76

77
        mp, err := newMetricsProvider(ctx, res)
1✔
78
        if err != nil {
1✔
NEW
79
                return ctx, fmt.Errorf("failed to create metrics provider: %w", err)
×
NEW
80
        }
×
81

82
        tp, err := newTracerProvider(ctx, res)
1✔
83
        if err != nil {
1✔
NEW
84
                return ctx, fmt.Errorf("failed to create tracer provider: %w", err)
×
NEW
85
        }
×
86

87
        return contextWithOTel(ctx, otelProviders{
1✔
88
                traceProvider: tp,
1✔
89
                meterProvider: mp,
1✔
90
        }), nil
1✔
91
}
92

93
// StartSpan starts a new trace span.
94
// It's a convenience wrapper around tracer.Start().
95
func StartSpan(ctx context.Context, tracerName, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
26✔
96
        return TracerFromContext(ctx, tracerName).Start(ctx, spanName, opts...)
26✔
97
}
26✔
98

99
// FailSpan records an error in the span and sets its status to Error.
100
// It returns an error that includes the original error message.
101
// Note: If you want to record an error in a span but not mark the span as failed, use `span.RecordError(err)` instead.
NEW
102
func FailSpan(span trace.Span, msg string, err error) error {
×
NEW
103
        span.RecordError(err)
×
NEW
104
        span.SetStatus(codes.Error, err.Error())
×
NEW
105
        return fmt.Errorf("%s: %w", msg, err)
×
NEW
106
}
×
107

108
// tracerProvider is an extension of the trace.TracerProvider interface with shutdown and force flush operations.
109
type tracerProvider interface {
110
        trace.TracerProvider
111
        gracefulCloser
112
}
113

114
// meterProvider is an extension of the metric.MeterProvider interface with shutdown and force flush operations.
115
type meterProvider interface {
116
        metric.MeterProvider
117
        gracefulCloser
118
}
119

120
// ShutdownOtel flushes and shuts down the registered OpenTelemetry providers.
121
func ShutdownOtel(ctx context.Context) error {
1✔
122
        otl := otelFromContext(ctx)
1✔
123
        return errors.Join(
1✔
124
                otl.meterProvider.ForceFlush(ctx),
1✔
125
                otl.meterProvider.Shutdown(ctx),
1✔
126
                otl.traceProvider.ForceFlush(ctx),
1✔
127
                otl.traceProvider.Shutdown(ctx),
1✔
128
        )
1✔
129
}
1✔
130

131
func newMetricsProvider(ctx context.Context, res *resource.Resource) (*sdkmetric.MeterProvider, error) {
1✔
132
        // The OTLP gRPC exporter will be configured using environment variables (e.g., OTEL_EXPORTER_OTLP_ENDPOINT).
1✔
133
        metricExporter, err := otlpmetricgrpc.New(ctx)
1✔
134
        if err != nil {
1✔
NEW
135
                return nil, fmt.Errorf("failed to create OTLP metric exporter: %w", err)
×
NEW
136
        }
×
137

138
        mp := sdkmetric.NewMeterProvider(
1✔
139
                sdkmetric.WithResource(res),
1✔
140
                sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter)),
1✔
141
        )
1✔
142
        otel.SetMeterProvider(mp)
1✔
143
        return mp, nil
1✔
144
}
145

146
func newResource(ctx context.Context) (*resource.Resource, error) {
1✔
147
        res, err := resource.New(
1✔
148
                ctx,
1✔
149
                resource.WithSchemaURL(semconv.SchemaURL),
1✔
150
                resource.WithAttributes(
1✔
151
                        semconv.ServiceNameKey.String(serviceName),
1✔
152
                        semconv.ServiceVersion(version.CloudbeatSemanticVersion()),
1✔
153
                ),
1✔
154
                resource.WithTelemetrySDK(),
1✔
155
                resource.WithHost(),
1✔
156
                resource.WithContainer(),
1✔
157
                resource.WithProcess(),
1✔
158
                resource.WithFromEnv(),
1✔
159
        )
1✔
160
        if err != nil {
1✔
NEW
161
                return nil, fmt.Errorf("failed to create application resource: %w", err)
×
NEW
162
        }
×
163

164
        res, err = resource.Merge(resource.Default(), res)
1✔
165
        if err != nil {
1✔
NEW
166
                return nil, fmt.Errorf("failed to merge OTel resources: %w", err)
×
NEW
167
        }
×
168
        return res, nil
1✔
169
}
170

171
func newTracerProvider(ctx context.Context, res *resource.Resource) (*sdktrace.TracerProvider, error) {
1✔
172
        // The APM server supports OTLP over gRPC, so we use the gRPC exporter.
1✔
173
        // The OTLP gRPC exporter uses environment variables for configuration (e.g., OTEL_EXPORTER_OTLP_ENDPOINT).
1✔
174
        exporter, err := otlptracegrpc.New(ctx)
1✔
175
        if err != nil {
1✔
NEW
176
                return nil, fmt.Errorf("failed to create OTLP trace exporter: %w", err)
×
NEW
177
        }
×
178

179
        tp := sdktrace.NewTracerProvider(
1✔
180
                sdktrace.WithResource(res),
1✔
181
                sdktrace.WithBatcher(exporter), // Batches spans for better performance.
1✔
182
        )
1✔
183
        // Set the global TracerProvider to allow instrumentation libraries to use it.
1✔
184
        otel.SetTracerProvider(tp)
1✔
185
        return tp, nil
1✔
186
}
187

188
// wrapLogger is a wrapper around logp.Logger that implements the logr.LogSink and otel.ErrorHandler interfaces.
189
type wrapLogger struct {
190
        l *logp.Logger
191
}
192

NEW
193
func (w *wrapLogger) Handle(err error) {
×
NEW
194
        w.Error(err, "otel error")
×
NEW
195
}
×
196

197
func (w *wrapLogger) Init(ri logr.RuntimeInfo) {
1✔
198
        w.l = w.l.WithOptions(zap.AddCallerSkip(ri.CallDepth))
1✔
199
}
1✔
200

201
func (w *wrapLogger) Enabled(level int) bool {
13✔
202
        // From the OTel documentation:
13✔
203
        // To see Warn messages use a logger with `l.V(1).Enabled() == true`
13✔
204
        // To see Info messages use a logger with `l.V(4).Enabled() == true`
13✔
205
        // To see Debug messages use a logger with `l.V(8).Enabled() == true`.
13✔
206
        return level <= 4
13✔
207
}
13✔
208

209
func (w *wrapLogger) Info(level int, msg string, keysAndValues ...any) {
4✔
210
        if !w.Enabled(level) {
4✔
NEW
211
                return
×
NEW
212
        }
×
213
        w.l.Infow(msg, keysAndValues...)
4✔
214
}
215

NEW
216
func (w *wrapLogger) Error(err error, msg string, keysAndValues ...any) {
×
NEW
217
        w.l.With(logp.Error(err)).Errorw(msg, keysAndValues...)
×
NEW
218
}
×
219

NEW
220
func (w *wrapLogger) WithValues(keysAndValues ...any) logr.LogSink {
×
NEW
221
        return &wrapLogger{l: w.l.With(keysAndValues...)}
×
NEW
222
}
×
223

NEW
224
func (w *wrapLogger) WithName(name string) logr.LogSink {
×
NEW
225
        return &wrapLogger{l: w.l.Named(name)}
×
NEW
226
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc