• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

influxdata / telegraf / 297304

15 Dec 2023 02:36PM UTC coverage: 61.897% (-0.03%) from 61.923%
297304

push

circleci

web-flow
chore(linters): Enable rangeValCopy and hugeParams checkers for gocritic (#14454)

31 of 73 new or added lines in 14 files covered. (42.47%)

25 existing lines in 12 files now uncovered.

65294 of 105488 relevant lines covered (61.9%)

82.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.6
/plugins/inputs/docker_log/docker_log.go
1
//go:generate ../../../tools/readme_config_includer/generator
2
package docker_log
3

4
import (
5
        "bufio"
6
        "bytes"
7
        "context"
8
        "crypto/tls"
9
        _ "embed"
10
        "errors"
11
        "fmt"
12
        "io"
13
        "strings"
14
        "sync"
15
        "time"
16
        "unicode"
17

18
        "github.com/docker/docker/api/types"
19
        "github.com/docker/docker/api/types/filters"
20
        "github.com/docker/docker/pkg/stdcopy"
21

22
        "github.com/influxdata/telegraf"
23
        "github.com/influxdata/telegraf/config"
24
        "github.com/influxdata/telegraf/filter"
25
        "github.com/influxdata/telegraf/internal/docker"
26
        tlsint "github.com/influxdata/telegraf/plugins/common/tls"
27
        "github.com/influxdata/telegraf/plugins/inputs"
28
)
29

30
//go:embed sample.conf
31
var sampleConfig string
32

33
const (
34
        defaultEndpoint = "unix:///var/run/docker.sock"
35
)
36

37
var (
38
        containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"}
39
        // ensure *DockerLogs implements telegraf.ServiceInput
40
        _ telegraf.ServiceInput = (*DockerLogs)(nil)
41
)
42

43
type DockerLogs struct {
44
        Endpoint              string          `toml:"endpoint"`
45
        FromBeginning         bool            `toml:"from_beginning"`
46
        Timeout               config.Duration `toml:"timeout"`
47
        LabelInclude          []string        `toml:"docker_label_include"`
48
        LabelExclude          []string        `toml:"docker_label_exclude"`
49
        ContainerInclude      []string        `toml:"container_name_include"`
50
        ContainerExclude      []string        `toml:"container_name_exclude"`
51
        ContainerStateInclude []string        `toml:"container_state_include"`
52
        ContainerStateExclude []string        `toml:"container_state_exclude"`
53
        IncludeSourceTag      bool            `toml:"source_tag"`
54

55
        tlsint.ClientConfig
56

57
        newEnvClient func() (Client, error)
58
        newClient    func(string, *tls.Config) (Client, error)
59

60
        client          Client
61
        labelFilter     filter.Filter
62
        containerFilter filter.Filter
63
        stateFilter     filter.Filter
64
        opts            types.ContainerListOptions
65
        wg              sync.WaitGroup
66
        mu              sync.Mutex
67
        containerList   map[string]context.CancelFunc
68

69
        // State of the plugin mapping container-ID to the timestamp of the
70
        // last record processed
71
        lastRecord    map[string]time.Time
72
        lastRecordMtx sync.Mutex
73
}
74

75
func (*DockerLogs) SampleConfig() string {
×
76
        return sampleConfig
×
77
}
×
78

79
func (d *DockerLogs) Init() error {
3✔
80
        var err error
3✔
81
        if d.Endpoint == "ENV" {
3✔
82
                d.client, err = d.newEnvClient()
×
83
                if err != nil {
×
84
                        return err
×
85
                }
×
86
        } else {
3✔
87
                tlsConfig, err := d.ClientConfig.TLSConfig()
3✔
88
                if err != nil {
3✔
89
                        return err
×
90
                }
×
91
                d.client, err = d.newClient(d.Endpoint, tlsConfig)
3✔
92
                if err != nil {
3✔
93
                        return err
×
94
                }
×
95
        }
96

97
        // Create filters
98
        err = d.createLabelFilters()
3✔
99
        if err != nil {
3✔
100
                return err
×
101
        }
×
102
        err = d.createContainerFilters()
3✔
103
        if err != nil {
3✔
104
                return err
×
105
        }
×
106
        err = d.createContainerStateFilters()
3✔
107
        if err != nil {
3✔
108
                return err
×
109
        }
×
110

111
        filterArgs := filters.NewArgs()
3✔
112
        for _, state := range containerStates {
24✔
113
                if d.stateFilter.Match(state) {
24✔
114
                        filterArgs.Add("status", state)
3✔
115
                }
3✔
116
        }
117

118
        if filterArgs.Len() != 0 {
6✔
119
                d.opts = types.ContainerListOptions{
3✔
120
                        Filters: filterArgs,
3✔
121
                }
3✔
122
        }
3✔
123

124
        d.lastRecord = make(map[string]time.Time)
3✔
125

3✔
126
        return nil
3✔
127
}
128

129
// State persistence interfaces
130
func (d *DockerLogs) GetState() interface{} {
×
131
        d.lastRecordMtx.Lock()
×
132
        recordOffsets := make(map[string]time.Time, len(d.lastRecord))
×
133
        for k, v := range d.lastRecord {
×
134
                recordOffsets[k] = v
×
135
        }
×
136
        d.lastRecordMtx.Unlock()
×
137

×
138
        return recordOffsets
×
139
}
140

141
func (d *DockerLogs) SetState(state interface{}) error {
×
142
        recordOffsets, ok := state.(map[string]time.Time)
×
143
        if !ok {
×
144
                return fmt.Errorf("state has wrong type %T", state)
×
145
        }
×
146
        d.lastRecordMtx.Lock()
×
147
        for k, v := range recordOffsets {
×
148
                d.lastRecord[k] = v
×
149
        }
×
150
        d.lastRecordMtx.Unlock()
×
151

×
152
        return nil
×
153
}
154

155
func (d *DockerLogs) addToContainerList(containerID string, cancel context.CancelFunc) {
2✔
156
        d.mu.Lock()
2✔
157
        defer d.mu.Unlock()
2✔
158
        d.containerList[containerID] = cancel
2✔
159
}
2✔
160

161
func (d *DockerLogs) removeFromContainerList(containerID string) {
2✔
162
        d.mu.Lock()
2✔
163
        defer d.mu.Unlock()
2✔
164
        delete(d.containerList, containerID)
2✔
165
}
2✔
166

167
func (d *DockerLogs) containerInContainerList(containerID string) bool {
2✔
168
        d.mu.Lock()
2✔
169
        defer d.mu.Unlock()
2✔
170
        _, ok := d.containerList[containerID]
2✔
171
        return ok
2✔
172
}
2✔
173

174
func (d *DockerLogs) cancelTails() {
3✔
175
        d.mu.Lock()
3✔
176
        defer d.mu.Unlock()
3✔
177
        for _, cancel := range d.containerList {
3✔
UNCOV
178
                cancel()
×
UNCOV
179
        }
×
180
}
181

182
func (d *DockerLogs) matchedContainerName(names []string) string {
2✔
183
        // Check if all container names are filtered; in practice I believe
2✔
184
        // this array is always of length 1.
2✔
185
        for _, name := range names {
4✔
186
                trimmedName := strings.TrimPrefix(name, "/")
2✔
187
                match := d.containerFilter.Match(trimmedName)
2✔
188
                if match {
4✔
189
                        return trimmedName
2✔
190
                }
2✔
191
        }
192
        return ""
×
193
}
194

195
func (d *DockerLogs) Gather(acc telegraf.Accumulator) error {
3✔
196
        ctx := context.Background()
3✔
197
        acc.SetPrecision(time.Nanosecond)
3✔
198

3✔
199
        ctx, cancel := context.WithTimeout(ctx, time.Duration(d.Timeout))
3✔
200
        defer cancel()
3✔
201
        containers, err := d.client.ContainerList(ctx, d.opts)
3✔
202
        if err != nil {
3✔
203
                return err
×
204
        }
×
205

206
        for _, container := range containers {
5✔
207
                if d.containerInContainerList(container.ID) {
2✔
208
                        continue
×
209
                }
210

211
                containerName := d.matchedContainerName(container.Names)
2✔
212
                if containerName == "" {
2✔
213
                        continue
×
214
                }
215

216
                ctx, cancel := context.WithCancel(context.Background())
2✔
217
                d.addToContainerList(container.ID, cancel)
2✔
218

2✔
219
                // Start a new goroutine for every new container that has logs to collect
2✔
220
                d.wg.Add(1)
2✔
221
                go func(container types.Container) {
4✔
222
                        defer d.wg.Done()
2✔
223
                        defer d.removeFromContainerList(container.ID)
2✔
224

2✔
225
                        err = d.tailContainerLogs(ctx, acc, container, containerName)
2✔
226
                        if err != nil && !errors.Is(err, context.Canceled) {
2✔
227
                                acc.AddError(err)
×
228
                        }
×
229
                }(container)
230
        }
231
        return nil
3✔
232
}
233

234
func (d *DockerLogs) hasTTY(ctx context.Context, container types.Container) (bool, error) {
2✔
235
        ctx, cancel := context.WithTimeout(ctx, time.Duration(d.Timeout))
2✔
236
        defer cancel()
2✔
237
        c, err := d.client.ContainerInspect(ctx, container.ID)
2✔
238
        if err != nil {
2✔
239
                return false, err
×
240
        }
×
241
        return c.Config.Tty, nil
2✔
242
}
243

244
func (d *DockerLogs) tailContainerLogs(
245
        ctx context.Context,
246
        acc telegraf.Accumulator,
247
        container types.Container,
248
        containerName string,
249
) error {
2✔
250
        imageName, imageVersion := docker.ParseImage(container.Image)
2✔
251
        tags := map[string]string{
2✔
252
                "container_name":    containerName,
2✔
253
                "container_image":   imageName,
2✔
254
                "container_version": imageVersion,
2✔
255
        }
2✔
256

2✔
257
        if d.IncludeSourceTag {
4✔
258
                tags["source"] = hostnameFromID(container.ID)
2✔
259
        }
2✔
260

261
        // Add matching container labels as tags
262
        for k, label := range container.Labels {
2✔
263
                if d.labelFilter.Match(k) {
×
264
                        tags[k] = label
×
265
                }
×
266
        }
267

268
        hasTTY, err := d.hasTTY(ctx, container)
2✔
269
        if err != nil {
2✔
270
                return err
×
271
        }
×
272

273
        since := time.Time{}.Format(time.RFC3339Nano)
2✔
274
        if !d.FromBeginning {
4✔
275
                d.lastRecordMtx.Lock()
2✔
276
                if ts, ok := d.lastRecord[container.ID]; ok {
2✔
277
                        since = ts.Format(time.RFC3339Nano)
×
278
                }
×
279
                d.lastRecordMtx.Unlock()
2✔
280
        }
281

282
        logOptions := types.ContainerLogsOptions{
2✔
283
                ShowStdout: true,
2✔
284
                ShowStderr: true,
2✔
285
                Timestamps: true,
2✔
286
                Details:    false,
2✔
287
                Follow:     true,
2✔
288
                Since:      since,
2✔
289
        }
2✔
290

2✔
291
        logReader, err := d.client.ContainerLogs(ctx, container.ID, logOptions)
2✔
292
        if err != nil {
2✔
293
                return err
×
294
        }
×
295

296
        // If the container is using a TTY, there is only a single stream
297
        // (stdout), and data is copied directly from the container output stream,
298
        // no extra multiplexing or headers.
299
        //
300
        // If the container is *not* using a TTY, streams for stdout and stderr are
301
        // multiplexed.
302
        var last time.Time
2✔
303
        if hasTTY {
3✔
304
                last, err = tailStream(acc, tags, container.ID, logReader, "tty")
1✔
305
        } else {
2✔
306
                last, err = tailMultiplexed(acc, tags, container.ID, logReader)
1✔
307
        }
1✔
308
        if err != nil {
2✔
309
                return err
×
310
        }
×
311

312
        if ts, ok := d.lastRecord[container.ID]; !ok || ts.Before(last) {
4✔
313
                d.lastRecordMtx.Lock()
2✔
314
                d.lastRecord[container.ID] = last
2✔
315
                d.lastRecordMtx.Unlock()
2✔
316
        }
2✔
317

318
        return nil
2✔
319
}
320

321
func parseLine(line []byte) (time.Time, string, error) {
2✔
322
        parts := bytes.SplitN(line, []byte(" "), 2)
2✔
323

2✔
324
        if len(parts) == 1 {
2✔
325
                parts = append(parts, []byte(""))
×
326
        }
×
327

328
        tsString := string(parts[0])
2✔
329

2✔
330
        // Keep any leading space, but remove whitespace from end of line.
2✔
331
        // This preserves space in, for example, stacktraces, while removing
2✔
332
        // annoying end of line characters and is similar to how other logging
2✔
333
        // plugins such as syslog behave.
2✔
334
        message := bytes.TrimRightFunc(parts[1], unicode.IsSpace)
2✔
335

2✔
336
        ts, err := time.Parse(time.RFC3339Nano, tsString)
2✔
337
        if err != nil {
2✔
338
                return time.Time{}, "", fmt.Errorf("error parsing timestamp %q: %w", tsString, err)
×
339
        }
×
340

341
        return ts, string(message), nil
2✔
342
}
343

344
func tailStream(
345
        acc telegraf.Accumulator,
346
        baseTags map[string]string,
347
        containerID string,
348
        reader io.ReadCloser,
349
        stream string,
350
) (time.Time, error) {
3✔
351
        defer reader.Close()
3✔
352

3✔
353
        tags := make(map[string]string, len(baseTags)+1)
3✔
354
        for k, v := range baseTags {
15✔
355
                tags[k] = v
12✔
356
        }
12✔
357
        tags["stream"] = stream
3✔
358

3✔
359
        r := bufio.NewReaderSize(reader, 64*1024)
3✔
360

3✔
361
        var lastTs time.Time
3✔
362
        for {
7✔
363
                line, err := r.ReadBytes('\n')
4✔
364

4✔
365
                if len(line) != 0 {
6✔
366
                        ts, message, err := parseLine(line)
2✔
367
                        if err != nil {
2✔
368
                                acc.AddError(err)
×
369
                        } else {
2✔
370
                                acc.AddFields("docker_log", map[string]interface{}{
2✔
371
                                        "container_id": containerID,
2✔
372
                                        "message":      message,
2✔
373
                                }, tags, ts)
2✔
374
                        }
2✔
375

376
                        // Store the last processed timestamp
377
                        if ts.After(lastTs) {
4✔
378
                                lastTs = ts
2✔
379
                        }
2✔
380
                }
381

382
                if err != nil {
7✔
383
                        if err == io.EOF {
6✔
384
                                return lastTs, nil
3✔
385
                        }
3✔
386
                        return time.Time{}, err
×
387
                }
388
        }
389
}
390

391
func tailMultiplexed(
392
        acc telegraf.Accumulator,
393
        tags map[string]string,
394
        containerID string,
395
        src io.ReadCloser,
396
) (time.Time, error) {
1✔
397
        outReader, outWriter := io.Pipe()
1✔
398
        errReader, errWriter := io.Pipe()
1✔
399

1✔
400
        var tsStdout, tsStderr time.Time
1✔
401
        var wg sync.WaitGroup
1✔
402
        wg.Add(1)
1✔
403
        go func() {
2✔
404
                defer wg.Done()
1✔
405
                var err error
1✔
406
                tsStdout, err = tailStream(acc, tags, containerID, outReader, "stdout")
1✔
407
                if err != nil {
1✔
408
                        acc.AddError(err)
×
409
                }
×
410
        }()
411

412
        wg.Add(1)
1✔
413
        go func() {
2✔
414
                defer wg.Done()
1✔
415
                var err error
1✔
416
                tsStderr, err = tailStream(acc, tags, containerID, errReader, "stderr")
1✔
417
                if err != nil {
1✔
418
                        acc.AddError(err)
×
419
                }
×
420
        }()
421

422
        _, err := stdcopy.StdCopy(outWriter, errWriter, src)
1✔
423

1✔
424
        // Ignore the returned errors as we cannot do anything if the closing fails
1✔
425
        _ = outWriter.Close()
1✔
426
        _ = errWriter.Close()
1✔
427
        _ = src.Close()
1✔
428
        wg.Wait()
1✔
429

1✔
430
        if err != nil {
1✔
431
                return time.Time{}, err
×
432
        }
×
433
        if tsStdout.After(tsStderr) {
2✔
434
                return tsStdout, nil
1✔
435
        }
1✔
436
        return tsStderr, nil
×
437
}
438

439
// Start is a noop which is required for a *DockerLogs to implement
440
// the telegraf.ServiceInput interface
441
func (d *DockerLogs) Start(telegraf.Accumulator) error {
×
442
        return nil
×
443
}
×
444

445
func (d *DockerLogs) Stop() {
3✔
446
        d.cancelTails()
3✔
447
        d.wg.Wait()
3✔
448
}
3✔
449

450
// Following few functions have been inherited from telegraf docker input plugin
451
func (d *DockerLogs) createContainerFilters() error {
3✔
452
        containerFilter, err := filter.NewIncludeExcludeFilter(d.ContainerInclude, d.ContainerExclude)
3✔
453
        if err != nil {
3✔
454
                return err
×
455
        }
×
456
        d.containerFilter = containerFilter
3✔
457
        return nil
3✔
458
}
459

460
func (d *DockerLogs) createLabelFilters() error {
3✔
461
        labelFilter, err := filter.NewIncludeExcludeFilter(d.LabelInclude, d.LabelExclude)
3✔
462
        if err != nil {
3✔
463
                return err
×
464
        }
×
465
        d.labelFilter = labelFilter
3✔
466
        return nil
3✔
467
}
468

469
func (d *DockerLogs) createContainerStateFilters() error {
3✔
470
        if len(d.ContainerStateInclude) == 0 && len(d.ContainerStateExclude) == 0 {
6✔
471
                d.ContainerStateInclude = []string{"running"}
3✔
472
        }
3✔
473
        stateFilter, err := filter.NewIncludeExcludeFilter(d.ContainerStateInclude, d.ContainerStateExclude)
3✔
474
        if err != nil {
3✔
475
                return err
×
476
        }
×
477
        d.stateFilter = stateFilter
3✔
478
        return nil
3✔
479
}
480

481
func init() {
1✔
482
        inputs.Add("docker_log", func() telegraf.Input {
1✔
483
                return &DockerLogs{
×
484
                        Timeout:       config.Duration(time.Second * 5),
×
485
                        Endpoint:      defaultEndpoint,
×
486
                        newEnvClient:  NewEnvClient,
×
487
                        newClient:     NewClient,
×
488
                        containerList: make(map[string]context.CancelFunc),
×
489
                }
×
490
        })
×
491
}
492

493
func hostnameFromID(id string) string {
2✔
494
        if len(id) > 12 {
2✔
495
                return id[0:12]
×
496
        }
×
497
        return id
2✔
498
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc