• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

enbility / spine-go / 10973819786

21 Sep 2024 03:50PM UTC coverage: 93.707% (+0.2%) from 93.465%
10973819786

push

github

DerAndereAndi
Merge branch 'release/v0.7.0'

626 of 646 new or added lines in 36 files covered. (96.9%)

2 existing lines in 2 files now uncovered.

5018 of 5355 relevant lines covered (93.71%)

88.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.31
/spine/feature_local.go
1
package spine
2

3
import (
4
        "errors"
5
        "fmt"
6
        "reflect"
7
        "sync"
8
        "time"
9

10
        "github.com/enbility/ship-go/logging"
11
        "github.com/enbility/spine-go/api"
12
        "github.com/enbility/spine-go/model"
13
        "github.com/enbility/spine-go/util"
14
)
15

16
type FeatureLocal struct {
17
        *Feature
18

19
        entity              api.EntityLocalInterface
20
        functionDataMap     map[model.FunctionType]api.FunctionDataCmdInterface
21
        muxResponseCB       sync.Mutex
22
        responseMsgCallback map[model.MsgCounterType][]func(result api.ResponseMessage)
23
        resultCallbacks     []func(result api.ResponseMessage)
24

25
        writeTimeout           time.Duration
26
        writeApprovalCallbacks []api.WriteApprovalCallbackFunc
27
        muxWriteReceived       sync.Mutex
28
        writeApprovalReceived  map[string]map[model.MsgCounterType]int
29
        pendingWriteApprovals  map[string]map[model.MsgCounterType]*time.Timer
30

31
        bindings      []*model.FeatureAddressType // bindings to remote features
32
        subscriptions []*model.FeatureAddressType // subscriptions to remote features
33

34
        mux sync.Mutex
35
}
36

37
func NewFeatureLocal(id uint, entity api.EntityLocalInterface, ftype model.FeatureTypeType, role model.RoleType) *FeatureLocal {
204✔
38
        res := &FeatureLocal{
204✔
39
                Feature: NewFeature(
204✔
40
                        featureAddressType(id, entity.Address()),
204✔
41
                        ftype,
204✔
42
                        role),
204✔
43
                entity:                entity,
204✔
44
                functionDataMap:       make(map[model.FunctionType]api.FunctionDataCmdInterface),
204✔
45
                responseMsgCallback:   make(map[model.MsgCounterType][]func(result api.ResponseMessage)),
204✔
46
                writeApprovalReceived: make(map[string]map[model.MsgCounterType]int),
204✔
47
                pendingWriteApprovals: make(map[string]map[model.MsgCounterType]*time.Timer),
204✔
48
                writeTimeout:          defaultMaxResponseDelay,
204✔
49
        }
204✔
50

204✔
51
        for _, fd := range CreateFunctionData[api.FunctionDataCmdInterface](ftype) {
885✔
52
                res.functionDataMap[fd.FunctionType()] = fd
681✔
53
        }
681✔
54
        res.operations = make(map[model.FunctionType]api.OperationsInterface)
204✔
55

204✔
56
        return res
204✔
57
}
58

59
var _ api.FeatureLocalInterface = (*FeatureLocal)(nil)
60

61
/* FeatureLocalInterface */
62

63
func (r *FeatureLocal) Device() api.DeviceLocalInterface {
76✔
64
        return r.entity.Device()
76✔
65
}
76✔
66

67
func (r *FeatureLocal) Entity() api.EntityLocalInterface {
8✔
68
        return r.entity
8✔
69
}
8✔
70

71
// Add supported function to the feature if its role is Server or Special
72
func (r *FeatureLocal) AddFunctionType(function model.FunctionType, read, write bool) {
506✔
73
        if r.role != model.RoleTypeServer && r.role != model.RoleTypeSpecial {
506✔
74
                return
×
75
        }
×
76
        if r.operations[function] != nil {
506✔
77
                return
×
78
        }
×
79
        writePartial := false
506✔
80
        if write {
530✔
81
                // partials are not supported on all features and functions, so check if this function supports it
24✔
82
                if fctData := r.functionData(function); fctData != nil {
48✔
83
                        writePartial = fctData.SupportsPartialWrite()
24✔
84
                }
24✔
85
        }
86
        // partial reads are currently not supported!
87
        r.operations[function] = NewOperations(read, false, write, writePartial)
506✔
88

506✔
89
        if r.role == model.RoleTypeServer &&
506✔
90
                r.ftype == model.FeatureTypeTypeDeviceDiagnosis &&
506✔
91
                function == model.FunctionTypeDeviceDiagnosisHeartbeatData {
509✔
92
                // Update HeartbeatManager
3✔
93
                r.Entity().HeartbeatManager().SetLocalFeature(r.Entity(), r)
3✔
94
        }
3✔
95
}
96

97
func (r *FeatureLocal) Functions() []model.FunctionType {
1✔
98
        var fcts []model.FunctionType
1✔
99

1✔
100
        for key := range r.operations {
2✔
101
                fcts = append(fcts, key)
1✔
102
        }
1✔
103

104
        return fcts
1✔
105
}
106

107
// Add a callback function to be invoked when SPINE message comes in with a given msgCounterReference value
108
//
109
// Returns an error if there is already a callback for the msgCounter set
110
func (r *FeatureLocal) AddResponseCallback(msgCounterReference model.MsgCounterType, function func(msg api.ResponseMessage)) error {
2✔
111
        r.muxResponseCB.Lock()
2✔
112
        defer r.muxResponseCB.Unlock()
2✔
113

2✔
114
        if _, ok := r.responseMsgCallback[msgCounterReference]; ok {
3✔
115
                for _, cb := range r.responseMsgCallback[msgCounterReference] {
2✔
116
                        if reflect.ValueOf(cb).Pointer() == reflect.ValueOf(function).Pointer() {
2✔
117
                                return errors.New("callback already set")
1✔
118
                        }
1✔
119
                }
120
        }
121

122
        r.responseMsgCallback[msgCounterReference] = append(r.responseMsgCallback[msgCounterReference], function)
1✔
123

1✔
124
        return nil
1✔
125
}
126

127
func (r *FeatureLocal) processResponseMsgCallbacks(msgCounterReference model.MsgCounterType, msg api.ResponseMessage) {
4✔
128
        r.muxResponseCB.Lock()
4✔
129
        defer r.muxResponseCB.Unlock()
4✔
130

4✔
131
        cbs, ok := r.responseMsgCallback[msgCounterReference]
4✔
132
        if !ok {
8✔
133
                return
4✔
134
        }
4✔
135

NEW
136
        for _, cb := range cbs {
×
NEW
137
                go cb(msg)
×
NEW
138
        }
×
139

140
        delete(r.responseMsgCallback, msgCounterReference)
×
141
}
142

143
// Add a callback function to be invoked when a result message comes in for this feature
144
func (r *FeatureLocal) AddResultCallback(function func(msg api.ResponseMessage)) {
1✔
145
        r.muxResponseCB.Lock()
1✔
146
        defer r.muxResponseCB.Unlock()
1✔
147

1✔
148
        r.resultCallbacks = append(r.resultCallbacks, function)
1✔
149
}
1✔
150

151
func (r *FeatureLocal) processResultCallbacks(msg api.ResponseMessage) {
2✔
152
        r.muxResponseCB.Lock()
2✔
153
        defer r.muxResponseCB.Unlock()
2✔
154

2✔
155
        for _, cb := range r.resultCallbacks {
2✔
156
                go cb(msg)
×
157
        }
×
158
}
159

160
func (r *FeatureLocal) AddWriteApprovalCallback(function api.WriteApprovalCallbackFunc) error {
9✔
161
        if r.Role() != model.RoleTypeServer {
10✔
162
                return errors.New("only allowed on a server feature")
1✔
163
        }
1✔
164

165
        r.muxResponseCB.Lock()
8✔
166
        defer r.muxResponseCB.Unlock()
8✔
167

8✔
168
        r.writeApprovalCallbacks = append(r.writeApprovalCallbacks, function)
8✔
169

8✔
170
        return nil
8✔
171
}
172

173
func (r *FeatureLocal) processWriteApprovalCallbacks(msg *api.Message) {
6✔
174
        r.muxResponseCB.Lock()
6✔
175
        defer r.muxResponseCB.Unlock()
6✔
176

6✔
177
        for _, cb := range r.writeApprovalCallbacks {
14✔
178
                go cb(msg)
8✔
179
        }
8✔
180
}
181

182
func (r *FeatureLocal) addPendingApproval(msg *api.Message) {
6✔
183
        if r.Role() != model.RoleTypeServer ||
6✔
184
                msg.DeviceRemote == nil ||
6✔
185
                msg.RequestHeader == nil ||
6✔
186
                msg.RequestHeader.MsgCounter == nil {
7✔
187
                return
1✔
188
        }
1✔
189

190
        ski := msg.DeviceRemote.Ski()
5✔
191

5✔
192
        newTimer := time.AfterFunc(r.writeTimeout, func() {
6✔
193
                r.muxResponseCB.Lock()
1✔
194
                delete(r.pendingWriteApprovals[ski], *msg.RequestHeader.MsgCounter)
1✔
195
                r.muxResponseCB.Unlock()
1✔
196

1✔
197
                err := model.NewErrorTypeFromString("write not approved in time by application")
1✔
198
                _ = msg.FeatureRemote.Device().Sender().ResultError(msg.RequestHeader, r.Address(), err)
1✔
199
        })
1✔
200

201
        r.muxResponseCB.Lock()
5✔
202
        if _, ok := r.pendingWriteApprovals[ski]; !ok {
10✔
203
                r.pendingWriteApprovals[ski] = make(map[model.MsgCounterType]*time.Timer)
5✔
204
        }
5✔
205
        r.pendingWriteApprovals[ski][*msg.RequestHeader.MsgCounter] = newTimer
5✔
206
        r.muxResponseCB.Unlock()
5✔
207
}
208

209
func (r *FeatureLocal) ApproveOrDenyWrite(msg *api.Message, err model.ErrorType) {
8✔
210
        if r.Role() != model.RoleTypeServer ||
8✔
211
                msg.DeviceRemote == nil {
9✔
212
                return
1✔
213
        }
1✔
214

215
        ski := msg.DeviceRemote.Ski()
7✔
216

7✔
217
        r.muxResponseCB.Lock()
7✔
218
        timer, ok := r.pendingWriteApprovals[ski][*msg.RequestHeader.MsgCounter]
7✔
219
        count := len(r.writeApprovalCallbacks)
7✔
220
        r.muxResponseCB.Unlock()
7✔
221

7✔
222
        // if there is no timer running, we are too late and error has already been sent
7✔
223
        if !ok || timer == nil {
8✔
224
                return
1✔
225
        }
1✔
226

227
        // do we have enough approvals?
228
        r.muxWriteReceived.Lock()
6✔
229
        defer r.muxWriteReceived.Unlock()
6✔
230
        if count > 1 && err.ErrorNumber == 0 {
9✔
231
                amount, ok := r.writeApprovalReceived[ski][*msg.RequestHeader.MsgCounter]
3✔
232
                if ok {
4✔
233
                        r.writeApprovalReceived[ski][*msg.RequestHeader.MsgCounter] = amount + 1
1✔
234
                } else {
3✔
235
                        r.writeApprovalReceived[ski] = make(map[model.MsgCounterType]int)
2✔
236
                        r.writeApprovalReceived[ski][*msg.RequestHeader.MsgCounter] = 1
2✔
237
                }
2✔
238
                // do we have enough approve messages, if not exit
239
                if r.writeApprovalReceived[ski][*msg.RequestHeader.MsgCounter] < count {
5✔
240
                        return
2✔
241
                }
2✔
242
        }
243

244
        timer.Stop()
4✔
245

4✔
246
        delete(r.writeApprovalReceived[ski], *msg.RequestHeader.MsgCounter)
4✔
247

4✔
248
        r.muxResponseCB.Lock()
4✔
249
        defer r.muxResponseCB.Unlock()
4✔
250
        delete(r.pendingWriteApprovals[ski], *msg.RequestHeader.MsgCounter)
4✔
251

4✔
252
        if err.ErrorNumber == 0 {
6✔
253
                r.processWrite(msg)
2✔
254
                return
2✔
255
        }
2✔
256

257
        _ = msg.FeatureRemote.Device().Sender().ResultError(msg.RequestHeader, r.Address(), &err)
2✔
258
}
259

260
func (r *FeatureLocal) SetWriteApprovalTimeout(duration time.Duration) {
1✔
261
        r.writeTimeout = duration
1✔
262
}
1✔
263

264
func (r *FeatureLocal) CleanWriteApprovalCaches(ski string) {
6✔
265
        r.muxResponseCB.Lock()
6✔
266
        defer r.muxResponseCB.Unlock()
6✔
267

6✔
268
        delete(r.pendingWriteApprovals, ski)
6✔
269
        delete(r.writeApprovalReceived, ski)
6✔
270
}
6✔
271

272
// Remove subscriptions and bindings from local cache for a remote device
273
// used if a remote device is getting disconnected
274
func (r *FeatureLocal) CleanRemoteDeviceCaches(remoteAddress *model.DeviceAddressType) {
9✔
275
        if remoteAddress == nil ||
9✔
276
                remoteAddress.Device == nil {
15✔
277
                return
6✔
278
        }
6✔
279

280
        r.mux.Lock()
3✔
281
        defer r.mux.Unlock()
3✔
282

3✔
283
        var subscriptions []*model.FeatureAddressType
3✔
284

3✔
285
        for _, item := range r.subscriptions {
6✔
286
                if item.Device == nil ||
3✔
287
                        *item.Device != *remoteAddress.Device {
4✔
288
                        subscriptions = append(subscriptions, item)
1✔
289
                }
1✔
290
        }
291

292
        r.subscriptions = subscriptions
3✔
293

3✔
294
        var bindings []*model.FeatureAddressType
3✔
295

3✔
296
        for _, item := range r.bindings {
6✔
297
                if item.Device == nil ||
3✔
298
                        *item.Device != *remoteAddress.Device {
4✔
299
                        bindings = append(bindings, item)
1✔
300
                }
1✔
301
        }
302

303
        r.bindings = bindings
3✔
304
}
305

306
// Remove subscriptions and bindings from local cache for a remote entity
307
// used if a remote entity is removed
308
func (r *FeatureLocal) CleanRemoteEntityCaches(remoteAddress *model.EntityAddressType) {
11✔
309
        if remoteAddress == nil ||
11✔
310
                remoteAddress.Device == nil ||
11✔
311
                remoteAddress.Entity == nil {
14✔
312
                return
3✔
313
        }
3✔
314

315
        r.mux.Lock()
8✔
316
        defer r.mux.Unlock()
8✔
317

8✔
318
        var subscriptions []*model.FeatureAddressType
8✔
319

8✔
320
        for _, item := range r.subscriptions {
11✔
321
                if item.Device == nil || item.Entity == nil ||
3✔
322
                        *item.Device != *remoteAddress.Device ||
3✔
323
                        !reflect.DeepEqual(item.Entity, remoteAddress.Entity) {
5✔
324
                        subscriptions = append(subscriptions, item)
2✔
325
                }
2✔
326
        }
327

328
        r.subscriptions = subscriptions
8✔
329

8✔
330
        var bindings []*model.FeatureAddressType
8✔
331

8✔
332
        for _, item := range r.bindings {
10✔
333
                if item.Device == nil || item.Entity == nil ||
2✔
334
                        *item.Device != *remoteAddress.Device ||
2✔
335
                        !reflect.DeepEqual(item.Entity, remoteAddress.Entity) {
3✔
336
                        bindings = append(bindings, item)
1✔
337
                }
1✔
338
        }
339

340
        r.bindings = bindings
8✔
341
}
342

343
func (r *FeatureLocal) DataCopy(function model.FunctionType) any {
31✔
344
        r.mux.Lock()
31✔
345
        defer r.mux.Unlock()
31✔
346

31✔
347
        fctData := r.functionData(function)
31✔
348
        if fctData == nil {
33✔
349
                return nil
2✔
350
        }
2✔
351

352
        return fctData.DataCopyAny()
29✔
353
}
354

355
func (r *FeatureLocal) SetData(function model.FunctionType, data any) {
63✔
356
        fctData, err := r.updateData(false, function, data, nil, nil)
63✔
357

63✔
358
        if err != nil {
63✔
359
                logging.Log().Debug(err.String())
×
360
        }
×
361

362
        if fctData != nil && err == nil {
126✔
363
                r.Device().NotifySubscribers(r.Address(), fctData.NotifyOrWriteCmdType(nil, nil, false, nil))
63✔
364
        }
63✔
365
}
366

367
func (r *FeatureLocal) UpdateData(function model.FunctionType, data any, filterPartial *model.FilterType, filterDelete *model.FilterType) *model.ErrorType {
5✔
368
        fctData, err := r.updateData(false, function, data, filterPartial, filterDelete)
5✔
369

5✔
370
        if err != nil {
5✔
371
                logging.Log().Debug(err.String())
×
372
        }
×
373

374
        if fctData != nil && err == nil {
10✔
375
                var deleteSelector, deleteElements, partialSelector any
5✔
376

5✔
377
                if filterDelete != nil {
6✔
378
                        if fDelete, err := filterDelete.Data(); err == nil {
2✔
379
                                if fDelete.Selector != nil {
2✔
380
                                        deleteSelector = fDelete.Selector
1✔
381
                                }
1✔
382
                                if fDelete.Elements != nil {
2✔
383
                                        deleteElements = fDelete.Elements
1✔
384
                                }
1✔
385
                        }
386
                }
387

388
                if filterPartial != nil {
9✔
389
                        if fPartial, err := filterPartial.Data(); err == nil && fPartial.Selector != nil {
4✔
390
                                partialSelector = fPartial.Selector
×
391
                        }
×
392
                }
393

394
                r.Device().NotifySubscribers(r.Address(), fctData.NotifyOrWriteCmdType(deleteSelector, partialSelector, partialSelector == nil, deleteElements))
5✔
395
        }
396

397
        return err
5✔
398
}
399

400
func (r *FeatureLocal) updateData(remoteWrite bool, function model.FunctionType, data any, filterPartial *model.FilterType, filterDelete *model.FilterType) (api.FunctionDataCmdInterface, *model.ErrorType) {
72✔
401
        r.mux.Lock()
72✔
402
        defer r.mux.Unlock()
72✔
403

72✔
404
        fctData := r.functionData(function)
72✔
405
        if fctData == nil {
72✔
406
                return nil, model.NewErrorTypeFromString("data not found")
×
407
        }
×
408

409
        _, err := fctData.UpdateDataAny(remoteWrite, true, data, filterPartial, filterDelete)
72✔
410

72✔
411
        return fctData, err
72✔
412
}
413

414
func (r *FeatureLocal) RequestRemoteData(
415
        function model.FunctionType,
416
        selector any,
417
        elements any,
418
        destination api.FeatureRemoteInterface) (*model.MsgCounterType, *model.ErrorType) {
4✔
419
        fd := r.functionData(function)
4✔
420
        if fd == nil {
5✔
421
                return nil, model.NewErrorTypeFromString("function data not found")
1✔
422
        }
1✔
423

424
        cmd := fd.ReadCmdType(selector, elements)
3✔
425

3✔
426
        return r.RequestRemoteDataBySenderAddress(cmd, destination.Device().Sender(), destination.Device().Ski(), destination.Address(), destination.MaxResponseDelayDuration())
3✔
427
}
428

429
func (r *FeatureLocal) RequestRemoteDataBySenderAddress(
430
        cmd model.CmdType,
431
        sender api.SenderInterface,
432
        deviceSki string,
433
        destinationAddress *model.FeatureAddressType,
434
        maxDelay time.Duration) (*model.MsgCounterType, *model.ErrorType) {
33✔
435
        msgCounter, err := sender.Request(model.CmdClassifierTypeRead, r.Address(), destinationAddress, false, []model.CmdType{cmd})
33✔
436
        if err == nil {
65✔
437
                return msgCounter, nil
32✔
438
        }
32✔
439

440
        return msgCounter, model.NewErrorType(model.ErrorNumberTypeGeneralError, err.Error())
1✔
441
}
442

443
// check if there already is a subscription to a remote feature
444
func (r *FeatureLocal) HasSubscriptionToRemote(remoteAddress *model.FeatureAddressType) bool {
12✔
445
        r.mux.Lock()
12✔
446
        defer r.mux.Unlock()
12✔
447

12✔
448
        for _, item := range r.subscriptions {
27✔
449
                if reflect.DeepEqual(*remoteAddress, *item) {
23✔
450
                        return true
8✔
451
                }
8✔
452
        }
453

454
        return false
4✔
455
}
456

457
// SubscribeToRemote to a remote feature
458
func (r *FeatureLocal) SubscribeToRemote(remoteAddress *model.FeatureAddressType) (*model.MsgCounterType, *model.ErrorType) {
18✔
459
        if remoteAddress.Device == nil {
21✔
460
                return nil, model.NewErrorTypeFromString("device not found")
3✔
461
        }
3✔
462
        remoteDevice := r.entity.Device().RemoteDeviceForAddress(*remoteAddress.Device)
15✔
463
        if remoteDevice == nil {
18✔
464
                return nil, model.NewErrorTypeFromString("device not found")
3✔
465
        }
3✔
466

467
        if r.Role() == model.RoleTypeServer {
13✔
468
                return nil, model.NewErrorTypeFromString(fmt.Sprintf("the server feature '%s' cannot request a subscription", r.Feature.String()))
1✔
469
        }
1✔
470

471
        msgCounter, err := remoteDevice.Sender().Subscribe(r.Address(), remoteAddress, r.ftype)
11✔
472
        if err != nil {
11✔
473
                return nil, model.NewErrorTypeFromString(err.Error())
×
474
        }
×
475

476
        r.mux.Lock()
11✔
477
        r.subscriptions = append(r.subscriptions, remoteAddress)
11✔
478
        r.mux.Unlock()
11✔
479

11✔
480
        return msgCounter, nil
11✔
481
}
482

483
// Remove a subscriptions to a remote feature
484
func (r *FeatureLocal) RemoveRemoteSubscription(remoteAddress *model.FeatureAddressType) (*model.MsgCounterType, *model.ErrorType) {
4✔
485
        if remoteAddress.Device == nil {
4✔
486
                return nil, model.NewErrorTypeFromString("device not found")
×
487
        }
×
488
        remoteDevice := r.entity.Device().RemoteDeviceForAddress(*remoteAddress.Device)
4✔
489
        if remoteDevice == nil {
5✔
490
                return nil, model.NewErrorTypeFromString("device not found")
1✔
491
        }
1✔
492

493
        msgCounter, err := remoteDevice.Sender().Unsubscribe(r.Address(), remoteAddress)
3✔
494
        if err != nil {
3✔
495
                return nil, model.NewErrorTypeFromString("device not found")
×
496
        }
×
497

498
        var subscriptions []*model.FeatureAddressType
3✔
499

3✔
500
        r.mux.Lock()
3✔
501
        defer r.mux.Unlock()
3✔
502

3✔
503
        for _, item := range r.subscriptions {
6✔
504
                if reflect.DeepEqual(item, remoteAddress) {
5✔
505
                        continue
2✔
506
                }
507

508
                subscriptions = append(subscriptions, item)
1✔
509
        }
510

511
        r.subscriptions = subscriptions
3✔
512

3✔
513
        return msgCounter, nil
3✔
514
}
515

516
// Remove all subscriptions to remote features
517
func (r *FeatureLocal) RemoveAllRemoteSubscriptions() {
8✔
518
        for _, item := range r.subscriptions {
9✔
519
                _, _ = r.RemoveRemoteSubscription(item)
1✔
520
        }
1✔
521
}
522

523
// check if there already is a binding to a remote feature
524
func (r *FeatureLocal) HasBindingToRemote(remoteAddress *model.FeatureAddressType) bool {
11✔
525
        r.mux.Lock()
11✔
526
        defer r.mux.Unlock()
11✔
527

11✔
528
        for _, item := range r.bindings {
25✔
529
                if reflect.DeepEqual(*remoteAddress, *item) {
21✔
530
                        return true
7✔
531
                }
7✔
532
        }
533

534
        return false
4✔
535
}
536

537
// BindToRemote to a remote feature
538
func (r *FeatureLocal) BindToRemote(remoteAddress *model.FeatureAddressType) (*model.MsgCounterType, *model.ErrorType) {
9✔
539
        if remoteAddress.Device == nil {
9✔
540
                return nil, model.NewErrorTypeFromString("device not found")
×
541
        }
×
542
        remoteDevice := r.entity.Device().RemoteDeviceForAddress(*remoteAddress.Device)
9✔
543
        if remoteDevice == nil {
10✔
544
                return nil, model.NewErrorTypeFromString("device not found")
1✔
545
        }
1✔
546

547
        if r.Role() == model.RoleTypeServer {
9✔
548
                return nil, model.NewErrorTypeFromString(fmt.Sprintf("the server feature '%s' cannot request a binding", r.Feature.String()))
1✔
549
        }
1✔
550

551
        msgCounter, err := remoteDevice.Sender().Bind(r.Address(), remoteAddress, r.ftype)
7✔
552
        if err != nil {
7✔
553
                return nil, model.NewErrorTypeFromString(err.Error())
×
554
        }
×
555

556
        r.mux.Lock()
7✔
557
        r.bindings = append(r.bindings, remoteAddress)
7✔
558
        r.mux.Unlock()
7✔
559

7✔
560
        return msgCounter, nil
7✔
561
}
562

563
// Remove a binding to a remote feature
564
func (r *FeatureLocal) RemoveRemoteBinding(remoteAddress *model.FeatureAddressType) (*model.MsgCounterType, *model.ErrorType) {
4✔
565
        if remoteAddress.Device == nil {
4✔
566
                return nil, model.NewErrorTypeFromString("device not found")
×
567
        }
×
568
        remoteDevice := r.entity.Device().RemoteDeviceForAddress(*remoteAddress.Device)
4✔
569
        if remoteDevice == nil {
5✔
570
                return nil, model.NewErrorTypeFromString("device not found")
1✔
571
        }
1✔
572

573
        msgCounter, err := remoteDevice.Sender().Unbind(r.Address(), remoteAddress)
3✔
574
        if err != nil {
3✔
575
                return nil, model.NewErrorTypeFromString(err.Error())
×
576
        }
×
577

578
        var bindings []*model.FeatureAddressType
3✔
579

3✔
580
        r.mux.Lock()
3✔
581
        defer r.mux.Unlock()
3✔
582

3✔
583
        for _, item := range r.bindings {
6✔
584
                if reflect.DeepEqual(item, remoteAddress) {
5✔
585
                        continue
2✔
586
                }
587

588
                bindings = append(bindings, item)
1✔
589
        }
590

591
        r.bindings = bindings
3✔
592

3✔
593
        return msgCounter, nil
3✔
594
}
595

596
// Remove all subscriptions to remote features
597
func (r *FeatureLocal) RemoveAllRemoteBindings() {
8✔
598
        for _, item := range r.bindings {
9✔
599
                _, _ = r.RemoveRemoteBinding(item)
1✔
600
        }
1✔
601
}
602

603
func (r *FeatureLocal) HandleMessage(message *api.Message) *model.ErrorType {
23✔
604
        cmdData, err := message.Cmd.Data()
23✔
605
        if err != nil {
24✔
606
                return model.NewErrorType(model.ErrorNumberTypeCommandNotSupported, err.Error())
1✔
607
        }
1✔
608
        if cmdData.Function == nil {
22✔
609
                return model.NewErrorType(model.ErrorNumberTypeCommandNotSupported, "No function found for cmd data")
×
610
        }
×
611

612
        switch message.CmdClassifier {
22✔
613
        case model.CmdClassifierTypeResult:
3✔
614
                if err := r.processResult(message); err != nil {
4✔
615
                        return err
1✔
616
                }
1✔
617
        case model.CmdClassifierTypeRead:
6✔
618
                if err := r.processRead(*cmdData.Function, message.RequestHeader, message.FeatureRemote); err != nil {
11✔
619
                        return err
5✔
620
                }
5✔
621
        case model.CmdClassifierTypeReply:
3✔
622
                if err := r.processReply(message); err != nil {
3✔
623
                        return err
×
624
                }
×
625
        case model.CmdClassifierTypeNotify:
2✔
626
                if err := r.processNotify(*cmdData.Function, cmdData.Value, message.FilterPartial, message.FilterDelete, message.FeatureRemote); err != nil {
3✔
627
                        return err
1✔
628
                }
1✔
629
        case model.CmdClassifierTypeWrite:
7✔
630
                // if there is a write permission check callback set, invoke this instead of directly allowing the write
7✔
631
                if len(r.writeApprovalCallbacks) > 0 {
13✔
632
                        r.addPendingApproval(message)
6✔
633
                        r.processWriteApprovalCallbacks(message)
6✔
634
                } else {
7✔
635
                        // this method handles ack and error results, so no need to return an error
1✔
636
                        r.processWrite(message)
1✔
637
                }
1✔
638
        default:
1✔
639
                return model.NewErrorTypeFromString(fmt.Sprintf("CmdClassifier not implemented: %s", message.CmdClassifier))
1✔
640
        }
641

642
        return nil
14✔
643
}
644

645
func (r *FeatureLocal) processResult(message *api.Message) *model.ErrorType {
3✔
646
        if message.Cmd.ResultData == nil || message.Cmd.ResultData.ErrorNumber == nil {
4✔
647
                return model.NewErrorType(
1✔
648
                        model.ErrorNumberTypeGeneralError,
1✔
649
                        fmt.Sprintf("ResultData CmdClassifierType %s not implemented", message.CmdClassifier))
1✔
650
        }
1✔
651

652
        if *message.Cmd.ResultData.ErrorNumber != model.ErrorNumberTypeNoError {
4✔
653
                // error numbers explained in Resource Spec 3.11
2✔
654
                errorString := fmt.Sprintf("Error Result received %d", *message.Cmd.ResultData.ErrorNumber)
2✔
655
                if message.Cmd.ResultData.Description != nil {
4✔
656
                        errorString += fmt.Sprintf(": %s", *message.Cmd.ResultData.Description)
2✔
657
                }
2✔
658
                logging.Log().Debug(errorString)
2✔
659
        }
660

661
        // we don't need to populate this message if there is no MsgCounterReference
662
        if message.RequestHeader == nil || message.RequestHeader.MsgCounterReference == nil {
2✔
663
                return nil
×
664
        }
×
665

666
        responseMsg := api.ResponseMessage{
2✔
667
                MsgCounterReference: *message.RequestHeader.MsgCounterReference,
2✔
668
                Data:                message.Cmd.ResultData,
2✔
669
                FeatureLocal:        r,
2✔
670
                FeatureRemote:       message.FeatureRemote,
2✔
671
                EntityRemote:        message.EntityRemote,
2✔
672
                DeviceRemote:        message.DeviceRemote,
2✔
673
        }
2✔
674

2✔
675
        r.processResponseMsgCallbacks(*message.RequestHeader.MsgCounterReference, responseMsg)
2✔
676
        r.processResultCallbacks(responseMsg)
2✔
677

2✔
678
        return nil
2✔
679
}
680

681
func (r *FeatureLocal) processRead(function model.FunctionType, requestHeader *model.HeaderType, featureRemote api.FeatureRemoteInterface) *model.ErrorType {
6✔
682
        // is this a read request to a local server/special feature?
6✔
683
        if r.role == model.RoleTypeClient {
9✔
684
                // Read requests to a client feature are not allowed
3✔
685
                return model.NewErrorTypeFromNumber(model.ErrorNumberTypeCommandRejected)
3✔
686
        }
3✔
687

688
        fd := r.functionData(function)
3✔
689
        if fd == nil {
4✔
690
                return model.NewErrorTypeFromString("function data not found")
1✔
691
        }
1✔
692

693
        cmd := fd.ReplyCmdType(false)
2✔
694
        if err := featureRemote.Device().Sender().Reply(requestHeader, r.Address(), cmd); err != nil {
3✔
695
                return model.NewErrorTypeFromString(err.Error())
1✔
696
        }
1✔
697

698
        return nil
1✔
699
}
700

701
func (r *FeatureLocal) processReply(message *api.Message) *model.ErrorType {
3✔
702
        // function model.FunctionType, data any, filterPartial *model.FilterType, filterDelete *model.FilterType, featureRemote api.FeatureRemoteInterface)
3✔
703

3✔
704
        // the error is handled already in the caller
3✔
705
        cmdData, _ := message.Cmd.Data()
3✔
706
        featureRemote := message.FeatureRemote
3✔
707

3✔
708
        if _, err := featureRemote.UpdateData(true, *cmdData.Function, cmdData.Value, message.FilterPartial, message.FilterDelete); err != nil {
3✔
709
                return err
×
710
        }
×
711

712
        // the data was updated, so send an event, other event handlers may watch out for this as well
713
        payload := api.EventPayload{
3✔
714
                Ski:           featureRemote.Device().Ski(),
3✔
715
                EventType:     api.EventTypeDataChange,
3✔
716
                ChangeType:    api.ElementChangeUpdate,
3✔
717
                Feature:       featureRemote,
3✔
718
                Device:        featureRemote.Device(),
3✔
719
                Entity:        featureRemote.Entity(),
3✔
720
                LocalFeature:  r,
3✔
721
                Function:      *cmdData.Function,
3✔
722
                CmdClassifier: util.Ptr(model.CmdClassifierTypeReply),
3✔
723
                Data:          cmdData.Value,
3✔
724
        }
3✔
725
        Events.Publish(payload)
3✔
726

3✔
727
        // we don't need to populate this message if there is no MsgCounterReference
3✔
728
        if message.RequestHeader == nil || message.RequestHeader.MsgCounterReference == nil {
4✔
729
                return nil
1✔
730
        }
1✔
731

732
        responseMsg := api.ResponseMessage{
2✔
733
                MsgCounterReference: *message.RequestHeader.MsgCounterReference,
2✔
734
                Data:                cmdData.Value,
2✔
735
                FeatureLocal:        r,
2✔
736
                FeatureRemote:       message.FeatureRemote,
2✔
737
                EntityRemote:        message.EntityRemote,
2✔
738
                DeviceRemote:        message.DeviceRemote,
2✔
739
        }
2✔
740

2✔
741
        r.processResponseMsgCallbacks(*message.RequestHeader.MsgCounterReference, responseMsg)
2✔
742

2✔
743
        return nil
2✔
744
}
745

746
func (r *FeatureLocal) processNotify(function model.FunctionType, data any, filterPartial *model.FilterType, filterDelete *model.FilterType, featureRemote api.FeatureRemoteInterface) *model.ErrorType {
2✔
747
        if _, err := featureRemote.UpdateData(true, function, data, filterPartial, filterDelete); err != nil {
3✔
748
                return err
1✔
749
        }
1✔
750

751
        payload := api.EventPayload{
1✔
752
                Ski:           featureRemote.Device().Ski(),
1✔
753
                EventType:     api.EventTypeDataChange,
1✔
754
                ChangeType:    api.ElementChangeUpdate,
1✔
755
                Feature:       featureRemote,
1✔
756
                Device:        featureRemote.Device(),
1✔
757
                Entity:        featureRemote.Entity(),
1✔
758
                LocalFeature:  r,
1✔
759
                Function:      function,
1✔
760
                CmdClassifier: util.Ptr(model.CmdClassifierTypeNotify),
1✔
761
                Data:          data,
1✔
762
        }
1✔
763
        Events.Publish(payload)
1✔
764

1✔
765
        return nil
1✔
766
}
767

768
func (r *FeatureLocal) processWrite(msg *api.Message) {
3✔
769
        if err := r.executeWrite(msg); err != nil {
3✔
770
                _ = msg.FeatureRemote.Device().Sender().ResultError(msg.RequestHeader, r.Address(), err)
×
771
        } else if msg.RequestHeader != nil {
6✔
772
                ackRequest := msg.RequestHeader.AckRequest
3✔
773
                if ackRequest != nil && *ackRequest {
6✔
774
                        _ = msg.FeatureRemote.Device().Sender().ResultSuccess(msg.RequestHeader, r.Address())
3✔
775
                }
3✔
776
        }
777
}
778

779
func (r *FeatureLocal) executeWrite(msg *api.Message) *model.ErrorType {
3✔
780
        cmdData, err := msg.Cmd.Data()
3✔
781
        if err != nil {
3✔
782
                return model.NewErrorType(model.ErrorNumberTypeCommandNotSupported, err.Error())
×
783
        }
×
784
        if cmdData.Function == nil {
3✔
785
                return model.NewErrorType(model.ErrorNumberTypeCommandNotSupported, "No function found for cmd data")
×
786
        }
×
787

788
        fctData, err1 := r.updateData(true, *cmdData.Function, cmdData.Value, msg.FilterPartial, msg.FilterDelete)
3✔
789
        if err1 != nil {
3✔
790
                return err1
×
791
        } else if fctData == nil {
3✔
792
                return model.NewErrorTypeFromString("function not found")
×
793
        }
×
794

795
        r.Device().NotifySubscribers(r.Address(), fctData.NotifyOrWriteCmdType(nil, nil, false, nil))
3✔
796

3✔
797
        payload := api.EventPayload{
3✔
798
                Ski:           msg.FeatureRemote.Device().Ski(),
3✔
799
                EventType:     api.EventTypeDataChange,
3✔
800
                ChangeType:    api.ElementChangeUpdate,
3✔
801
                Feature:       msg.FeatureRemote,
3✔
802
                Device:        msg.FeatureRemote.Device(),
3✔
803
                Entity:        msg.FeatureRemote.Entity(),
3✔
804
                LocalFeature:  r,
3✔
805
                Function:      *cmdData.Function,
3✔
806
                CmdClassifier: util.Ptr(model.CmdClassifierTypeWrite),
3✔
807
                Data:          cmdData.Value,
3✔
808
        }
3✔
809
        Events.Publish(payload)
3✔
810

3✔
811
        return nil
3✔
812
}
813

814
func (r *FeatureLocal) functionData(function model.FunctionType) api.FunctionDataCmdInterface {
134✔
815
        fd, found := r.functionDataMap[function]
134✔
816
        if !found {
138✔
817
                logging.Log().Errorf("Data was not found for function '%s'", function)
4✔
818
                return nil
4✔
819
        }
4✔
820
        return fd
130✔
821
}
822

823
func (r *FeatureLocal) Information() *model.NodeManagementDetailedDiscoveryFeatureInformationType {
6✔
824
        var funs []model.FunctionPropertyType
6✔
825
        for fun, operations := range r.operations {
27✔
826
                var functionType = model.FunctionType(fun)
21✔
827
                sf := model.FunctionPropertyType{
21✔
828
                        Function:           &functionType,
21✔
829
                        PossibleOperations: operations.Information(),
21✔
830
                }
21✔
831

21✔
832
                funs = append(funs, sf)
21✔
833
        }
21✔
834

835
        res := model.NodeManagementDetailedDiscoveryFeatureInformationType{
6✔
836
                Description: &model.NetworkManagementFeatureDescriptionDataType{
6✔
837
                        FeatureAddress:    r.Address(),
6✔
838
                        FeatureType:       &r.ftype,
6✔
839
                        Role:              &r.role,
6✔
840
                        Description:       r.description,
6✔
841
                        SupportedFunction: funs,
6✔
842
                },
6✔
843
        }
6✔
844

6✔
845
        return &res
6✔
846
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc