• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

influxdata / telegraf / 297396

18 Dec 2023 01:46PM UTC coverage: 61.913% (+0.01%) from 61.899%
297396

push

circleci

web-flow
fix(outputs.bigquery): Ignore fields containing NaN or infinity (#14458)

7 of 9 new or added lines in 1 file covered. (77.78%)

4 existing lines in 1 file now uncovered.

65316 of 105496 relevant lines covered (61.91%)

82.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.48
/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer.go
1
//go:generate ../../../tools/readme_config_includer/generator
2
package ctrlx_datalayer
3

4
import (
5
        "bytes"
6
        "context"
7
        _ "embed"
8
        "encoding/json"
9
        "errors"
10
        "fmt"
11
        "net/http"
12
        "net/url"
13
        "strconv"
14
        "sync"
15
        "time"
16

17
        "github.com/boschrexroth/ctrlx-datalayer-golang/pkg/sseclient"
18
        "github.com/boschrexroth/ctrlx-datalayer-golang/pkg/token"
19
        "github.com/google/uuid"
20

21
        "github.com/influxdata/telegraf"
22
        "github.com/influxdata/telegraf/config"
23
        "github.com/influxdata/telegraf/internal/choice"
24
        "github.com/influxdata/telegraf/metric"
25
        httpconfig "github.com/influxdata/telegraf/plugins/common/http"
26
        "github.com/influxdata/telegraf/plugins/inputs"
27
        jsonParser "github.com/influxdata/telegraf/plugins/parsers/json"
28
)
29

30
// This plugin is based on the official ctrlX CORE API. Documentation can be found in OpenAPI format at:
31
// https://boschrexroth.github.io/rest-api-description/ctrlx-automation/ctrlx-core/
32
// Used APIs are:
33
// * ctrlX CORE - Authorization and Authentication API
34
// * ctrlX CORE - Data Layer API
35
//
36
// All communication between the device and this input plugin is based
37
// on https REST and HTML5 Server Sent Events (sse).
38

39
//go:embed sample.conf
40
var sampleConfig string
41

42
// CtrlXDataLayer encapsulated the configuration as well as the state of this plugin.
43
type CtrlXDataLayer struct {
44
        Server   string        `toml:"server"`
45
        Username config.Secret `toml:"username"`
46
        Password config.Secret `toml:"password"`
47

48
        Log          telegraf.Logger `toml:"-"`
49
        Subscription []Subscription
50

51
        url    string
52
        wg     sync.WaitGroup
53
        cancel context.CancelFunc
54

55
        acc          telegraf.Accumulator
56
        connection   *http.Client
57
        tokenManager token.TokenManager
58
        httpconfig.HTTPClientConfig
59
}
60

61
// convertTimestamp2UnixTime converts the given Data Layer timestamp of the payload to UnixTime.
62
func convertTimestamp2UnixTime(t int64) time.Time {
938✔
63
        // 1 sec=1000 millisec=1000000 microsec=1000000000 nanosec.
938✔
64
        // Convert from FILETIME (100-nanosecond intervals since January 1, 1601 UTC) to
938✔
65
        // seconds and nanoseconds since January 1, 1970 UTC.
938✔
66
        // Between Jan 1, 1601 and Jan 1, 1970 there are 11644473600 seconds.
938✔
67
        return time.Unix(0, (t-116444736000000000)*100)
938✔
68
}
938✔
69

70
// createSubscription uses the official 'ctrlX Data Layer API' to create the sse subscription.
71
func (c *CtrlXDataLayer) createSubscription(sub *Subscription) (string, error) {
941✔
72
        sseURL := c.url + subscriptionPath
941✔
73

941✔
74
        id := "telegraf_" + uuid.New().String()
941✔
75
        request := sub.createRequest(id)
941✔
76
        payload, err := json.Marshal(request)
941✔
77
        if err != nil {
941✔
78
                return "", fmt.Errorf("failed to create subscription %d payload: %w", sub.index, err)
×
79
        }
×
80

81
        requestBody := bytes.NewBuffer(payload)
941✔
82
        req, err := http.NewRequest("POST", sseURL, requestBody)
941✔
83
        if err != nil {
941✔
84
                return "", fmt.Errorf("failed to create subscription %d request: %w", sub.index, err)
×
85
        }
×
86

87
        req.Header.Add("Authorization", c.tokenManager.Token.String())
941✔
88

941✔
89
        resp, err := c.connection.Do(req)
941✔
90
        if err != nil {
941✔
UNCOV
91
                return "", fmt.Errorf("failed to do request to create sse subscription %d: %w", sub.index, err)
×
UNCOV
92
        }
×
93
        resp.Body.Close()
941✔
94

941✔
95
        if resp.StatusCode != 200 && resp.StatusCode != 201 {
942✔
96
                return "", fmt.Errorf("failed to create sse subscription %d, status: %s", sub.index, resp.Status)
1✔
97
        }
1✔
98

99
        return sseURL + "/" + id, nil
940✔
100
}
101

102
// createSubscriptionAndSseClient creates a sse subscription on the server and
103
// initializes a sse client to receive sse events from the server.
104
func (c *CtrlXDataLayer) createSubscriptionAndSseClient(sub *Subscription) (*sseclient.SseClient, error) {
940✔
105
        t, err := c.tokenManager.RequestAuthToken()
940✔
106
        if err != nil {
942✔
107
                return nil, err
2✔
108
        }
2✔
109

110
        subURL, err := c.createSubscription(sub)
938✔
111
        if err != nil {
938✔
UNCOV
112
                return nil, err
×
UNCOV
113
        }
×
114

115
        client := sseclient.NewSseClient(subURL, t.String(), c.InsecureSkipVerify)
938✔
116

938✔
117
        return client, nil
938✔
118
}
119

120
// addMetric writes sse metric into accumulator.
121
func (c *CtrlXDataLayer) addMetric(se *sseclient.SseEvent, sub *Subscription) {
937✔
122
        switch se.Event {
937✔
123
        case "update":
937✔
124
                // Received an updated value, that we translate into a metric
937✔
125
                var d sseEventData
937✔
126

937✔
127
                if err := json.Unmarshal([]byte(se.Data), &d); err != nil {
937✔
128
                        c.acc.AddError(fmt.Errorf("received malformed data from 'update' event: %w", err))
×
129
                        return
×
130
                }
×
131
                m, err := c.createMetric(&d, sub)
937✔
132
                if err != nil {
937✔
133
                        c.acc.AddError(fmt.Errorf("failed to create metrics: %w", err))
×
134
                        return
×
135
                }
×
136
                c.acc.AddMetric(m)
937✔
137
        case "error":
×
138
                // Received an error event, that we report to the accumulator
×
139
                var e sseEventError
×
140
                if err := json.Unmarshal([]byte(se.Data), &e); err != nil {
×
141
                        c.acc.AddError(fmt.Errorf("received malformed data from 'error' event: %w", err))
×
142
                        return
×
143
                }
×
144
                c.acc.AddError(fmt.Errorf("received 'error' event for node: %q", e.Instance))
×
145
        case "keepalive":
×
146
                // Keepalive events are ignored for the moment
×
147
                c.Log.Debug("Received keepalive event")
×
148
        default:
×
149
                // Received a yet unsupported event type
×
150
                c.Log.Debugf("Received unsupported event: %q", se.Event)
×
151
        }
152
}
153

154
// createMetric - create metric depending on flag 'output_json' and data type
155
func (c *CtrlXDataLayer) createMetric(em *sseEventData, sub *Subscription) (telegraf.Metric, error) {
937✔
156
        t := convertTimestamp2UnixTime(em.Timestamp)
937✔
157
        node := sub.node(em.Node)
937✔
158
        if node == nil {
937✔
159
                return nil, errors.New("node not found")
×
160
        }
×
161

162
        // default tags
163
        tags := map[string]string{
937✔
164
                "node":   em.Node,
937✔
165
                "source": c.Server,
937✔
166
        }
937✔
167

937✔
168
        // add tags of subscription if user has defined
937✔
169
        for key, value := range sub.Tags {
937✔
170
                tags[key] = value
×
171
        }
×
172

173
        // add tags of node if user has defined
174
        for key, value := range node.Tags {
937✔
175
                tags[key] = value
×
176
        }
×
177

178
        // set measurement of subscription
179
        measurement := sub.Measurement
937✔
180

937✔
181
        // get field key from node properties
937✔
182
        fieldKey := node.fieldKey()
937✔
183

937✔
184
        if fieldKey == "" {
937✔
185
                return nil, errors.New("field key not valid")
×
186
        }
×
187

188
        if sub.OutputJSONString {
937✔
189
                b, err := json.Marshal(em.Value)
×
190
                if err != nil {
×
191
                        return nil, err
×
192
                }
×
193
                fields := map[string]interface{}{fieldKey: string(b)}
×
194
                m := metric.New(measurement, tags, fields, t)
×
195
                return m, nil
×
196
        }
197

198
        switch em.Type {
937✔
199
        case "object":
×
200
                flattener := jsonParser.JSONFlattener{}
×
201
                err := flattener.FullFlattenJSON(fieldKey, em.Value, true, true)
×
202
                if err != nil {
×
203
                        return nil, err
×
204
                }
×
205

206
                m := metric.New(measurement, tags, flattener.Fields, t)
×
207
                return m, nil
×
208
        case "arbool8",
209
                "arint8", "aruint8",
210
                "arint16", "aruint16",
211
                "arint32", "aruint32",
212
                "arint64", "aruint64",
213
                "arfloat", "ardouble",
214
                "arstring",
215
                "artimestamp":
×
216
                fields := make(map[string]interface{})
×
217
                values := em.Value.([]interface{})
×
218
                for i := 0; i < len(values); i++ {
×
219
                        index := strconv.Itoa(i)
×
220
                        key := fieldKey + "_" + index
×
221
                        fields[key] = values[i]
×
222
                }
×
223
                m := metric.New(measurement, tags, fields, t)
×
224
                return m, nil
×
225
        case "bool8",
226
                "int8", "uint8",
227
                "int16", "uint16",
228
                "int32", "uint32",
229
                "int64", "uint64",
230
                "float", "double",
231
                "string",
232
                "timestamp":
937✔
233
                fields := map[string]interface{}{fieldKey: em.Value}
937✔
234
                m := metric.New(measurement, tags, fields, t)
937✔
235
                return m, nil
937✔
236
        }
237

238
        return nil, fmt.Errorf("unsupported value type: %s", em.Type)
×
239
}
240

241
// Init is for setup, and validating config
242
func (c *CtrlXDataLayer) Init() error {
×
243
        // Check all configured subscriptions for valid settings
×
244
        for i := range c.Subscription {
×
245
                sub := &c.Subscription[i]
×
246
                sub.applyDefaultSettings()
×
247
                if !choice.Contains(sub.QueueBehaviour, queueBehaviours) {
×
248
                        c.Log.Infof("The right queue behaviour values are %v", queueBehaviours)
×
249
                        return fmt.Errorf("subscription %d: setting 'queue_behaviour' %q is invalid", i, sub.QueueBehaviour)
×
250
                }
×
251
                if !choice.Contains(sub.ValueChange, valueChanges) {
×
252
                        c.Log.Infof("The right value change values are %v", valueChanges)
×
253
                        return fmt.Errorf("subscription %d: setting 'value_change' %q is invalid", i, sub.ValueChange)
×
254
                }
×
255
                if len(sub.Nodes) == 0 {
×
256
                        c.Log.Warn("A configured subscription has no nodes configured")
×
257
                }
×
258
                sub.index = i
×
259
        }
260

261
        // Generate valid communication url based on configured server address
262
        u := url.URL{
×
263
                Scheme: "https",
×
264
                Host:   c.Server,
×
265
        }
×
266
        c.url = u.String()
×
267
        if _, err := url.Parse(c.url); err != nil {
×
268
                return errors.New("invalid server address")
×
269
        }
×
270

271
        return nil
×
272
}
273

274
// Start input as service, retain the accumulator, establish the connection.
275
func (c *CtrlXDataLayer) Start(acc telegraf.Accumulator) error {
2✔
276
        var ctx context.Context
2✔
277
        ctx, c.cancel = context.WithCancel(context.Background())
2✔
278

2✔
279
        var err error
2✔
280
        c.connection, err = c.HTTPClientConfig.CreateClient(ctx, c.Log)
2✔
281
        if err != nil {
2✔
282
                return fmt.Errorf("failed to create http client: %w", err)
×
283
        }
×
284

285
        username, err := c.Username.Get()
2✔
286
        if err != nil {
2✔
287
                return fmt.Errorf("getting username failed: %w", err)
×
288
        }
×
289

290
        password, err := c.Password.Get()
2✔
291
        if err != nil {
2✔
292
                username.Destroy()
×
293
                return fmt.Errorf("getting password failed: %w", err)
×
294
        }
×
295

296
        c.tokenManager = token.TokenManager{
2✔
297
                Url:        c.url,
2✔
298
                Username:   username.String(),
2✔
299
                Password:   password.String(),
2✔
300
                Connection: c.connection,
2✔
301
        }
2✔
302
        username.Destroy()
2✔
303
        password.Destroy()
2✔
304

2✔
305
        c.acc = acc
2✔
306

2✔
307
        c.gatherLoop(ctx)
2✔
308

2✔
309
        return nil
2✔
310
}
311

312
// gatherLoop creates sse subscriptions on the Data Layer and requests the sse data
313
// the connection will be restablished if the sse subscription is broken.
314
func (c *CtrlXDataLayer) gatherLoop(ctx context.Context) {
2✔
315
        for _, sub := range c.Subscription {
4✔
316
                c.wg.Add(1)
2✔
317
                go func(sub Subscription) {
4✔
318
                        defer c.wg.Done()
2✔
319
                        for {
941✔
320
                                select {
939✔
321
                                case <-ctx.Done():
×
322
                                        c.Log.Debugf("Gather loop for subscription %d stopped", sub.index)
×
323
                                        return
×
324
                                default:
939✔
325
                                        client, err := c.createSubscriptionAndSseClient(&sub)
939✔
326
                                        if err != nil {
941✔
327
                                                c.Log.Errorf("Creating sse client to subscription %d: %v", sub.index, err)
2✔
328
                                                time.Sleep(time.Duration(defaultReconnectInterval))
2✔
329
                                                continue
2✔
330
                                        }
331
                                        c.Log.Debugf("Created sse client to subscription %d", sub.index)
937✔
332

937✔
333
                                        // Establish connection and handle events in a callback function.
937✔
334
                                        err = client.Subscribe(ctx, func(event string, data string) {
1,874✔
335
                                                c.addMetric(&sseclient.SseEvent{
937✔
336
                                                        Event: event,
937✔
337
                                                        Data:  data,
937✔
338
                                                }, &sub)
937✔
339
                                        })
937✔
340
                                        if errors.Is(err, context.Canceled) {
937✔
341
                                                // Subscription cancelled
×
342
                                                c.Log.Debugf("Requesting data of subscription %d cancelled", sub.index)
×
343
                                                return
×
344
                                        }
×
345
                                        c.Log.Errorf("Requesting data of subscription %d failed: %v", sub.index, err)
937✔
346
                                }
347
                        }
348
                }(sub)
349
        }
350
}
351

352
// Stop input as service.
353
func (c *CtrlXDataLayer) Stop() {
×
354
        c.cancel()
×
355
        c.wg.Wait()
×
356
}
×
357

358
// Gather is called by telegraf to collect the metrics.
359
func (c *CtrlXDataLayer) Gather(_ telegraf.Accumulator) error {
×
360
        // Metrics are sent to the accumulator asynchronously in worker thread. So nothing to do here.
×
361
        return nil
×
362
}
×
363

364
// SampleConfig returns the auto-inserted sample configuration to the telegraf.
365
func (*CtrlXDataLayer) SampleConfig() string {
×
366
        return sampleConfig
×
367
}
×
368

369
// init registers the plugin in telegraf.
370
func init() {
1✔
371
        inputs.Add("ctrlx_datalayer", func() telegraf.Input {
1✔
372
                return &CtrlXDataLayer{}
×
373
        })
×
374
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc