• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

influxdata / telegraf / 296794

15 Dec 2023 01:25PM UTC coverage: 61.92% (-0.002%) from 61.922%
296794

push

circleci

web-flow
chore(linters): Enable equalFold, preferStringWriter, and stringXbytes checkers for gocritic. (#14452)

12 of 15 new or added lines in 15 files covered. (80.0%)

15 existing lines in 5 files now uncovered.

65309 of 105474 relevant lines covered (61.92%)

82.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.74
/plugins/processors/execd/execd.go
1
//go:generate ../../../tools/readme_config_includer/generator
2
package execd
3

4
import (
5
        "bufio"
6
        _ "embed"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "strings"
11
        "time"
12

13
        "github.com/influxdata/telegraf"
14
        "github.com/influxdata/telegraf/config"
15
        "github.com/influxdata/telegraf/internal/process"
16
        "github.com/influxdata/telegraf/plugins/parsers/influx"
17
        "github.com/influxdata/telegraf/plugins/processors"
18
        "github.com/influxdata/telegraf/plugins/serializers"
19
)
20

21
//go:embed sample.conf
22
var sampleConfig string
23

24
type Execd struct {
25
        Command      []string        `toml:"command"`
26
        Environment  []string        `toml:"environment"`
27
        RestartDelay config.Duration `toml:"restart_delay"`
28
        Log          telegraf.Logger
29

30
        parser     telegraf.Parser
31
        serializer serializers.Serializer
32
        acc        telegraf.Accumulator
33
        process    *process.Process
34
}
35

36
func New() *Execd {
8✔
37
        return &Execd{
8✔
38
                RestartDelay: config.Duration(10 * time.Second),
8✔
39
        }
8✔
40
}
8✔
41

42
func (e *Execd) SetParser(p telegraf.Parser) {
8✔
43
        e.parser = p
8✔
44
}
8✔
45

46
func (e *Execd) SetSerializer(s telegraf.Serializer) {
8✔
47
        e.serializer = s
8✔
48
}
8✔
49

50
func (*Execd) SampleConfig() string {
×
51
        return sampleConfig
×
52
}
×
53

54
func (e *Execd) Start(acc telegraf.Accumulator) error {
5✔
55
        e.acc = acc
5✔
56

5✔
57
        var err error
5✔
58
        e.process, err = process.New(e.Command, e.Environment)
5✔
59
        if err != nil {
5✔
60
                return fmt.Errorf("error creating new process: %w", err)
×
61
        }
×
62
        e.process.Log = e.Log
5✔
63
        e.process.RestartDelay = time.Duration(e.RestartDelay)
5✔
64
        e.process.ReadStdoutFn = e.cmdReadOut
5✔
65
        e.process.ReadStderrFn = e.cmdReadErr
5✔
66

5✔
67
        if err = e.process.Start(); err != nil {
5✔
68
                // if there was only one argument, and it contained spaces, warn the user
×
69
                // that they may have configured it wrong.
×
70
                if len(e.Command) == 1 && strings.Contains(e.Command[0], " ") {
×
71
                        e.Log.Warn("The processors.execd Command contained spaces but no arguments. " +
×
72
                                "This setting expects the program and arguments as an array of strings, " +
×
73
                                "not as a space-delimited string. See the plugin readme for an example.")
×
74
                }
×
75
                return fmt.Errorf("failed to start process %s: %w", e.Command, err)
×
76
        }
77

78
        return nil
5✔
79
}
80

81
func (e *Execd) Add(m telegraf.Metric, _ telegraf.Accumulator) error {
26✔
82
        b, err := e.serializer.Serialize(m)
26✔
83
        if err != nil {
26✔
84
                return fmt.Errorf("metric serializing error: %w", err)
×
85
        }
×
86

87
        _, err = e.process.Stdin.Write(b)
26✔
88
        if err != nil {
26✔
89
                return fmt.Errorf("error writing to process stdin: %w", err)
×
90
        }
×
91

92
        // We cannot maintain tracking metrics at the moment because input/output
93
        // is done asynchronously and we don't have any metric metadata to tie the
94
        // output metric back to the original input metric.
95
        m.Drop()
26✔
96
        return nil
26✔
97
}
98

99
func (e *Execd) Stop() {
5✔
100
        e.process.Stop()
5✔
101
}
5✔
102

103
func (e *Execd) cmdReadOut(out io.Reader) {
5✔
104
        // Prefer using the StreamParser when parsing influx format.
5✔
105
        if _, isInfluxParser := e.parser.(*influx.Parser); isInfluxParser {
7✔
106
                e.cmdReadOutStream(out)
2✔
107
                return
2✔
108
        }
2✔
109

110
        scanner := bufio.NewScanner(out)
3✔
111
        scanBuf := make([]byte, 4096)
3✔
112
        scanner.Buffer(scanBuf, 262144)
3✔
113

3✔
114
        for scanner.Scan() {
33✔
115
                metrics, err := e.parser.Parse(scanner.Bytes())
30✔
116
                if err != nil {
30✔
117
                        e.Log.Errorf("Parse error: %s", err)
×
118
                }
×
119

120
                for _, metric := range metrics {
45✔
121
                        e.acc.AddMetric(metric)
15✔
122
                }
15✔
123
        }
124

125
        if err := scanner.Err(); err != nil {
3✔
UNCOV
126
                e.Log.Errorf("Error reading stdout: %s", err)
×
UNCOV
127
        }
×
128
}
129

130
func (e *Execd) cmdReadOutStream(out io.Reader) {
2✔
131
        parser := influx.NewStreamParser(out)
2✔
132

2✔
133
        for {
15✔
134
                metric, err := parser.Next()
13✔
135

13✔
136
                if err != nil {
15✔
137
                        // Stop parsing when we've reached the end.
2✔
138
                        if errors.Is(err, influx.EOF) {
4✔
139
                                break
2✔
140
                        }
141

142
                        var parseErr *influx.ParseError
×
143
                        if errors.As(err, &parseErr) {
×
144
                                // Continue past parse errors.
×
145
                                e.acc.AddError(parseErr)
×
146
                                continue
×
147
                        }
148

149
                        // Stop reading on any non-recoverable error.
150
                        e.acc.AddError(err)
×
151
                        return
×
152
                }
153

154
                e.acc.AddMetric(metric)
11✔
155
        }
156
}
157

158
func (e *Execd) cmdReadErr(out io.Reader) {
5✔
159
        scanner := bufio.NewScanner(out)
5✔
160

5✔
161
        for scanner.Scan() {
8✔
162
                e.Log.Errorf("stderr: %q", scanner.Text())
3✔
163
        }
3✔
164

165
        if err := scanner.Err(); err != nil {
5✔
UNCOV
166
                e.Log.Errorf("Error reading stderr: %s", err)
×
UNCOV
167
        }
×
168
}
169

170
func (e *Execd) Init() error {
×
171
        if len(e.Command) == 0 {
×
172
                return errors.New("no command specified")
×
173
        }
×
174
        return nil
×
175
}
176

177
func init() {
3✔
178
        processors.AddStreaming("execd", func() telegraf.StreamingProcessor {
3✔
179
                return New()
×
180
        })
×
181
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc