• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

laiambryant / telemetry-ingestor / 20312992440

17 Dec 2025 06:21PM UTC coverage: 81.574% (-12.2%) from 93.737%
20312992440

push

github

425 of 521 relevant lines covered (81.57%)

15.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.55
/processor/processor.go
1
package processor
2

3
import (
4
        "bufio"
5
        "encoding/json"
6
        "log/slog"
7
        "os"
8
        "sync"
9

10
        "github.com/laiambryant/telemetry-ingestor/config"
11
        "github.com/laiambryant/telemetry-ingestor/sender"
12
        "github.com/laiambryant/telemetry-ingestor/stats"
13
        s "github.com/laiambryant/telemetry-ingestor/structs"
14
)
15

16
type LastTelemetryData struct {
17
        Traces  s.TelemetryData
18
        Logs    s.TelemetryData
19
        Metrics s.TelemetryData
20
}
21

22
const (
23
        resourceSpansField   = "resourceSpans"
24
        resourceLogsField    = "resourceLogs"
25
        resourceMetricsField = "resourceMetrics"
26
)
27

28
func OpenTelemetryFile(filePath string, maxBufferCapacity int) (*os.File, *bufio.Scanner, error) {
29

30
        if _, err := os.Stat(filePath); os.IsNotExist(err) {
31
                return nil, nil, &FileNotFoundError{FilePath: filePath}
12✔
32
        }
12✔
33

13✔
34
        file, err := os.Open(filePath)
1✔
35
        if err != nil {
1✔
36
                return nil, nil, &FileOpenError{FilePath: filePath, Err: err}
37
        }
11✔
38

11✔
39
        scanner := bufio.NewScanner(file)
×
40
        buf := make([]byte, maxBufferCapacity)
×
41
        scanner.Buffer(buf, maxBufferCapacity)
42

11✔
43
        return file, scanner, nil
11✔
44
}
11✔
45

11✔
46
func ParseTelemetryLine(line string, lineNum int) (s.TelemetryData, error) {
11✔
47
        if len(line) == 0 {
48
                return nil, nil
49
        }
21✔
50

23✔
51
        var data s.TelemetryData
2✔
52
        if err := json.Unmarshal([]byte(line), &data); err != nil {
2✔
53
                slog.Error("Error parsing line", "line", lineNum, "error", err)
54
                return nil, err
19✔
55
        }
21✔
56

2✔
57
        return data, nil
2✔
58
}
2✔
59

60
func ProcessTelemetryInSendAllMode(data s.TelemetryData, lineNum int, config *config.Config, jobChan chan<- s.TelemetryJob) {
17✔
61
        if _, hasTraces := data[resourceSpansField]; hasTraces {
62
                payload := map[string]any{resourceSpansField: data[resourceSpansField]}
63
                jobChan <- s.TelemetryJob{
11✔
64
                        Endpoint:      config.OtelEndpoint,
17✔
65
                        Payload:       payload,
6✔
66
                        TelemetryType: s.TelemetryTraces,
6✔
67
                        LineNum:       lineNum,
6✔
68
                }
6✔
69
        }
6✔
70

6✔
71
        if _, hasLogs := data[resourceLogsField]; hasLogs {
6✔
72
                payload := map[string]any{resourceLogsField: data[resourceLogsField]}
6✔
73
                jobChan <- s.TelemetryJob{
74
                        Endpoint:      config.OtelLogsEndpoint,
15✔
75
                        Payload:       payload,
4✔
76
                        TelemetryType: s.TelemetryLogs,
4✔
77
                        LineNum:       lineNum,
4✔
78
                }
4✔
79
        }
4✔
80

4✔
81
        if _, hasMetrics := data[resourceMetricsField]; hasMetrics {
4✔
82
                payload := map[string]any{resourceMetricsField: data[resourceMetricsField]}
4✔
83
                jobChan <- s.TelemetryJob{
84
                        Endpoint:      config.OtelMetricsEndpoint,
14✔
85
                        Payload:       payload,
3✔
86
                        TelemetryType: s.TelemetryMetrics,
3✔
87
                        LineNum:       lineNum,
3✔
88
                }
3✔
89
        }
3✔
90
}
3✔
91

3✔
92
func UpdateLastTelemetryData(data s.TelemetryData, lastData *LastTelemetryData) {
3✔
93
        if _, hasTraces := data[resourceSpansField]; hasTraces {
94
                lastData.Traces = data
95
        }
6✔
96
        if _, hasLogs := data[resourceLogsField]; hasLogs {
10✔
97
                lastData.Logs = data
4✔
98
        }
4✔
99
        if _, hasMetrics := data[resourceMetricsField]; hasMetrics {
8✔
100
                lastData.Metrics = data
2✔
101
        }
2✔
102
}
8✔
103

2✔
104
func StartWorkerPool(numWorkers int, stats *stats.SendStats) (chan s.TelemetryJob, *sync.WaitGroup) {
2✔
105
        jobChan := make(chan s.TelemetryJob, numWorkers*2)
106
        wg := &sync.WaitGroup{}
107

6✔
108
        for i := range numWorkers {
6✔
109
                wg.Add(1)
6✔
110
                go worker(i+1, jobChan, wg, stats)
6✔
111
        }
17✔
112

11✔
113
        return jobChan, wg
11✔
114
}
11✔
115

116
func ProcessFileInSendAllMode(scanner *bufio.Scanner, config *config.Config, stats *stats.SendStats) error {
6✔
117
        jobChan, wg := StartWorkerPool(config.Workers, stats)
118

119
        lineNum := 0
6✔
120
        lineCount := 0
6✔
121

6✔
122
        for scanner.Scan() {
6✔
123
                lineNum++
6✔
124
                line := scanner.Text()
6✔
125

20✔
126
                data, err := ParseTelemetryLine(line, lineNum)
14✔
127
                if err != nil || data == nil {
14✔
128
                        continue
14✔
129
                }
14✔
130

17✔
131
                lineCount++
3✔
132
                ProcessTelemetryInSendAllMode(data, lineNum, config, jobChan)
133
        }
134

11✔
135
        if err := scanner.Err(); err != nil {
11✔
136
                return err
137
        }
138

7✔
139
        slog.Info("Finished reading file", "total_lines", lineCount)
1✔
140

1✔
141
        close(jobChan)
142
        slog.Info("Waiting for workers to finish")
5✔
143
        wg.Wait()
5✔
144
        stats.PrintSummary()
5✔
145

5✔
146
        return nil
5✔
147
}
5✔
148

5✔
149
func ProcessFileInLastMode(scanner *bufio.Scanner) (*LastTelemetryData, int, error) {
5✔
150
        lastData := &LastTelemetryData{}
151
        lineNum := 0
152
        lineCount := 0
5✔
153

5✔
154
        for scanner.Scan() {
5✔
155
                lineNum++
5✔
156
                line := scanner.Text()
5✔
157

12✔
158
                data, err := ParseTelemetryLine(line, lineNum)
7✔
159
                if err != nil || data == nil {
7✔
160
                        continue
7✔
161
                }
7✔
162

8✔
163
                lineCount++
1✔
164
                UpdateLastTelemetryData(data, lastData)
165
        }
166

6✔
167
        if err := scanner.Err(); err != nil {
6✔
168
                return nil, lineCount, err
169
        }
170

6✔
171
        slog.Info("Finished reading file", "total_lines", lineCount)
1✔
172
        return lastData, lineCount, nil
1✔
173
}
174

4✔
175
func SendLastTelemetryData(lastData *LastTelemetryData, config *config.Config, stats *stats.SendStats) {
4✔
176
        slog.Info("Sending last instances to OTel Collector")
177

178
        if lastData.Traces != nil {
5✔
179
                payload := map[string]any{
5✔
180
                        resourceSpansField: lastData.Traces[resourceSpansField],
5✔
181
                }
9✔
182
                if err := sender.SendToOTel(config.OtelEndpoint, payload, s.TelemetryTraces, stats); err != nil {
4✔
183
                        slog.Error("Failed to send traces", "error", err)
4✔
184
                }
4✔
185
        }
5✔
186

1✔
187
        if lastData.Logs != nil {
1✔
188
                payload := map[string]any{
189
                        resourceLogsField: lastData.Logs[resourceLogsField],
190
                }
8✔
191
                if err := sender.SendToOTel(config.OtelLogsEndpoint, payload, s.TelemetryLogs, stats); err != nil {
3✔
192
                        slog.Error("Failed to send logs", "error", err)
3✔
193
                }
3✔
194
        }
4✔
195

1✔
196
        if lastData.Metrics != nil {
1✔
197
                payload := map[string]any{
198
                        resourceMetricsField: lastData.Metrics[resourceMetricsField],
199
                }
8✔
200
                if err := sender.SendToOTel(config.OtelMetricsEndpoint, payload, s.TelemetryMetrics, stats); err != nil {
3✔
201
                        slog.Error("Failed to send metrics", "error", err)
3✔
202
                }
3✔
203
        }
4✔
204

1✔
205
        stats.PrintSummary()
1✔
206
}
207

208
func IngestTelemetry(filePath string, cfg *config.Config) error {
5✔
209
        slog.Info("Reading telemetry data", "file", filePath)
210
        if cfg.SendAll {
211
                slog.Info("Mode: Sending all telemetry lines")
10✔
212
        } else {
10✔
213
                slog.Info("Mode: Scanning file to find last instances of each telemetry type")
14✔
214
        }
4✔
215

10✔
216
        file, scanner, err := OpenTelemetryFile(filePath, cfg.MaxBufferCapacity)
6✔
217
        if err != nil {
6✔
218
                return err
219
        }
10✔
220
        defer file.Close()
11✔
221

1✔
222
        stats := &stats.SendStats{}
1✔
223

9✔
224
        if cfg.SendAll {
9✔
225
                return ProcessFileInSendAllMode(scanner, cfg, stats)
9✔
226
        }
9✔
227

13✔
228
        lastData, _, err := ProcessFileInLastMode(scanner)
4✔
229
        if err != nil {
4✔
230
                return &FileReadError{FilePath: filePath, Err: err}
231
        }
5✔
232

6✔
233
        SendLastTelemetryData(lastData, cfg, stats)
1✔
234
        return nil
1✔
235
}
236

4✔
237
func worker(id int, jobs <-chan s.TelemetryJob, wg *sync.WaitGroup, stats *stats.SendStats) {
4✔
238
        defer wg.Done()
239
        for job := range jobs {
240
                if err := sender.SendToOTel(job.Endpoint, job.Payload, job.TelemetryType, stats); err != nil {
11✔
241
                        slog.Error("Worker failed to send telemetry", "worker", id, "type", job.TelemetryType, "line", job.LineNum, "error", err)
11✔
242
                }
24✔
243
        }
16✔
244
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc