• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

enbility / spine-go / 10973819786

21 Sep 2024 03:50PM UTC coverage: 93.707% (+0.2%) from 93.465%
10973819786

push

github

DerAndereAndi
Merge branch 'release/v0.7.0'

626 of 646 new or added lines in 36 files covered. (96.9%)

2 existing lines in 2 files now uncovered.

5018 of 5355 relevant lines covered (93.71%)

88.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.83
/spine/send.go
1
package spine
2

3
import (
4
        "crypto/sha256"
5
        "encoding/hex"
6
        "encoding/json"
7
        "errors"
8
        "fmt"
9
        "sort"
10
        "sync"
11
        "sync/atomic"
12

13
        shipapi "github.com/enbility/ship-go/api"
14
        "github.com/enbility/ship-go/logging"
15
        "github.com/enbility/spine-go/api"
16
        "github.com/enbility/spine-go/model"
17
        "github.com/enbility/spine-go/util"
18
        "github.com/golanguzb70/lrucache"
19
)
20

21
type reqMsgCacheData map[model.MsgCounterType]string
22

23
type Sender struct {
24
        msgNum uint64 // 64bit values need to be defined on top of the struct to make atomic commands work on 32bit systems
25

26
        // we cache the last 100 notify messages, so we can find the matching item for result errors being returned
27
        datagramNotifyCache *lrucache.LRUCache[model.MsgCounterType, model.DatagramType]
28

29
        writeHandler shipapi.ShipConnectionDataWriterInterface
30

31
        reqMsgCache reqMsgCacheData // cache for unanswered request messages, so we can filter duplicates and not send them
32

33
        muxNotifyCache sync.RWMutex
34
        muxReadCache   sync.RWMutex
35
}
36

37
var _ api.SenderInterface = (*Sender)(nil)
38

39
func NewSender(writeI shipapi.ShipConnectionDataWriterInterface) api.SenderInterface {
36✔
40
        cache := lrucache.New[model.MsgCounterType, model.DatagramType](100, 0)
36✔
41
        return &Sender{
36✔
42
                datagramNotifyCache: &cache,
36✔
43
                writeHandler:        writeI,
36✔
44
                reqMsgCache:         make(reqMsgCacheData),
36✔
45
        }
36✔
46
}
36✔
47

48
// return the datagram for a given msgCounter (only availbe for Notify messasges!), error if not found
49
func (c *Sender) DatagramForMsgCounter(msgCounter model.MsgCounterType) (model.DatagramType, error) {
2✔
50
        c.muxNotifyCache.RLock()
2✔
51
        defer c.muxNotifyCache.RUnlock()
2✔
52

2✔
53
        if datagram, ok := c.datagramNotifyCache.Get(msgCounter); ok {
3✔
54
                return datagram, nil
1✔
55
        }
1✔
56

57
        return model.DatagramType{}, errors.New("msgCounter not found")
1✔
58
}
59

60
func (c *Sender) sendSpineMessage(datagram model.DatagramType) error {
125✔
61
        // pack into datagram
125✔
62
        data := model.Datagram{
125✔
63
                Datagram: datagram,
125✔
64
        }
125✔
65

125✔
66
        // marshal
125✔
67
        msg, err := json.Marshal(data)
125✔
68
        if err != nil {
125✔
69
                return err
×
70
        }
×
71

72
        if c.writeHandler == nil {
126✔
73
                return errors.New("outgoing interface implementation not set")
1✔
74
        }
1✔
75

76
        if msg == nil {
124✔
77
                return errors.New("message is nil")
×
78
        }
×
79

80
        logging.Log().Debug(datagram.PrintMessageOverview(true, "", ""))
124✔
81

124✔
82
        // write to channel
124✔
83
        c.writeHandler.WriteShipMessageWithPayload(msg)
124✔
84

124✔
85
        return nil
124✔
86
}
87

88
// Caching of outgoing and unanswered requests, so we can filter duplicates
89
func (c *Sender) hashForMessage(destinationAddress *model.FeatureAddressType, cmd []model.CmdType) string {
99✔
90
        cmdString, err := json.Marshal(cmd)
99✔
91
        if err != nil {
99✔
NEW
92
                return ""
×
NEW
93
        }
×
94

95
        sig := fmt.Sprintf("%s-%s", destinationAddress.String(), cmdString)
99✔
96
        shaBytes := sha256.Sum256([]byte(sig))
99✔
97
        return hex.EncodeToString(shaBytes[:])
99✔
98
}
99

100
func (c *Sender) msgCounterForHashFromCache(hash string) *model.MsgCounterType {
99✔
101
        c.muxReadCache.RLock()
99✔
102
        defer c.muxReadCache.RUnlock()
99✔
103

99✔
104
        for msgCounter, h := range c.reqMsgCache {
962✔
105
                if h == hash {
871✔
106
                        return &msgCounter
8✔
107
                }
8✔
108
        }
109

110
        return nil
91✔
111
}
112

113
func (c *Sender) hasMsgCounterInCache(msgCounter model.MsgCounterType) bool {
9✔
114
        c.muxReadCache.RLock()
9✔
115
        defer c.muxReadCache.RUnlock()
9✔
116

9✔
117
        _, ok := c.reqMsgCache[msgCounter]
9✔
118

9✔
119
        return ok
9✔
120
}
9✔
121

122
func (c *Sender) addMsgCounterHashToCache(msgCounter model.MsgCounterType, hash string) {
91✔
123
        c.muxReadCache.Lock()
91✔
124
        defer c.muxReadCache.Unlock()
91✔
125

91✔
126
        // cleanup cache, keep only the last 20 messages
91✔
127
        if len(c.reqMsgCache) > 20 {
121✔
128
                keys := make([]uint64, 0, len(c.reqMsgCache))
30✔
129
                for k := range c.reqMsgCache {
660✔
130
                        keys = append(keys, uint64(k))
630✔
131
                }
630✔
132
                sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
2,635✔
133

134
                // oldest key is the one with the lowest msgCounterValue
135
                oldestKey := keys[0]
30✔
136
                delete(c.reqMsgCache, model.MsgCounterType(oldestKey))
30✔
137
        }
138

139
        c.reqMsgCache[msgCounter] = hash
91✔
140
}
141

142
// we need to remove the msgCounter from the cache, if we have it cached
143
func (c *Sender) ProcessResponseForMsgCounterReference(msgCounterRef *model.MsgCounterType) {
9✔
144
        if msgCounterRef != nil &&
9✔
145
                c.hasMsgCounterInCache(*msgCounterRef) {
17✔
146
                c.muxReadCache.Lock()
8✔
147
                defer c.muxReadCache.Unlock()
8✔
148

8✔
149
                delete(c.reqMsgCache, *msgCounterRef)
8✔
150
        }
8✔
151
}
152

153
// Sends request
154
func (c *Sender) Request(cmdClassifier model.CmdClassifierType, senderAddress, destinationAddress *model.FeatureAddressType, ackRequest bool, cmd []model.CmdType) (*model.MsgCounterType, error) {
99✔
155
        // check if there is an unanswered subscribe message for this destination and cmd and return that msgCounter
99✔
156
        hash := c.hashForMessage(destinationAddress, cmd)
99✔
157
        if len(hash) > 0 {
198✔
158
                if msgCounterCache := c.msgCounterForHashFromCache(hash); msgCounterCache != nil {
107✔
159
                        return msgCounterCache, nil
8✔
160
                }
8✔
161
        }
162

163
        msgCounter := c.getMsgCounter()
91✔
164

91✔
165
        datagram := model.DatagramType{
91✔
166
                Header: model.HeaderType{
91✔
167
                        SpecificationVersion: &SpecificationVersion,
91✔
168
                        AddressSource:        senderAddress,
91✔
169
                        AddressDestination:   destinationAddress,
91✔
170
                        MsgCounter:           msgCounter,
91✔
171
                        CmdClassifier:        &cmdClassifier,
91✔
172
                },
91✔
173
                Payload: model.PayloadType{
91✔
174
                        Cmd: cmd,
91✔
175
                },
91✔
176
        }
91✔
177

91✔
178
        if ackRequest {
101✔
179
                datagram.Header.AckRequest = &ackRequest
10✔
180
        }
10✔
181

182
        err := c.sendSpineMessage(datagram)
91✔
183
        if err == nil {
182✔
184
                if len(hash) > 0 {
182✔
185
                        c.addMsgCounterHashToCache(*msgCounter, hash)
91✔
186
                }
91✔
187
        }
188

189
        return msgCounter, err
91✔
190
}
191

192
func (c *Sender) ResultSuccess(requestHeader *model.HeaderType, senderAddress *model.FeatureAddressType) error {
3✔
193
        return c.result(requestHeader, senderAddress, nil)
3✔
194
}
3✔
195

196
func (c *Sender) ResultError(requestHeader *model.HeaderType, senderAddress *model.FeatureAddressType, err *model.ErrorType) error {
6✔
197
        return c.result(requestHeader, senderAddress, err)
6✔
198
}
6✔
199

200
// sends a result for a request
201
func (c *Sender) result(requestHeader *model.HeaderType, senderAddress *model.FeatureAddressType, err *model.ErrorType) error {
9✔
202
        cmdClassifier := model.CmdClassifierTypeResult
9✔
203

9✔
204
        addressSource := *requestHeader.AddressDestination
9✔
205
        addressSource.Device = senderAddress.Device
9✔
206

9✔
207
        var resultData model.ResultDataType
9✔
208
        if err != nil {
15✔
209
                resultData = model.ResultDataType{
6✔
210
                        ErrorNumber: &err.ErrorNumber,
6✔
211
                        Description: err.Description,
6✔
212
                }
6✔
213
        } else {
9✔
214
                resultData = model.ResultDataType{
3✔
215
                        ErrorNumber: util.Ptr(model.ErrorNumberTypeNoError),
3✔
216
                }
3✔
217
        }
3✔
218

219
        cmd := model.CmdType{
9✔
220
                ResultData: &resultData,
9✔
221
        }
9✔
222

9✔
223
        datagram := model.DatagramType{
9✔
224
                Header: model.HeaderType{
9✔
225
                        SpecificationVersion: &SpecificationVersion,
9✔
226
                        AddressSource:        &addressSource,
9✔
227
                        AddressDestination:   requestHeader.AddressSource,
9✔
228
                        MsgCounter:           c.getMsgCounter(),
9✔
229
                        MsgCounterReference:  requestHeader.MsgCounter,
9✔
230
                        CmdClassifier:        &cmdClassifier,
9✔
231
                },
9✔
232
                Payload: model.PayloadType{
9✔
233
                        Cmd: []model.CmdType{cmd},
9✔
234
                },
9✔
235
        }
9✔
236

9✔
237
        return c.sendSpineMessage(datagram)
9✔
238
}
239

240
// Reply sends reply to original sender
241
func (c *Sender) Reply(requestHeader *model.HeaderType, senderAddress *model.FeatureAddressType, cmd model.CmdType) error {
6✔
242
        cmdClassifier := model.CmdClassifierTypeReply
6✔
243

6✔
244
        addressSource := *requestHeader.AddressDestination
6✔
245
        addressSource.Device = senderAddress.Device
6✔
246

6✔
247
        datagram := model.DatagramType{
6✔
248
                Header: model.HeaderType{
6✔
249
                        SpecificationVersion: &SpecificationVersion,
6✔
250
                        AddressSource:        &addressSource,
6✔
251
                        AddressDestination:   requestHeader.AddressSource,
6✔
252
                        MsgCounter:           c.getMsgCounter(),
6✔
253
                        MsgCounterReference:  requestHeader.MsgCounter,
6✔
254
                        CmdClassifier:        &cmdClassifier,
6✔
255
                },
6✔
256
                Payload: model.PayloadType{
6✔
257
                        Cmd: []model.CmdType{cmd},
6✔
258
                },
6✔
259
        }
6✔
260

6✔
261
        return c.sendSpineMessage(datagram)
6✔
262
}
6✔
263

264
// Notify sends notification to destination
265
func (c *Sender) Notify(senderAddress, destinationAddress *model.FeatureAddressType, cmd model.CmdType) (*model.MsgCounterType, error) {
15✔
266
        msgCounter := c.getMsgCounter()
15✔
267

15✔
268
        cmdClassifier := model.CmdClassifierTypeNotify
15✔
269

15✔
270
        datagram := model.DatagramType{
15✔
271
                Header: model.HeaderType{
15✔
272
                        SpecificationVersion: &SpecificationVersion,
15✔
273
                        AddressSource:        senderAddress,
15✔
274
                        AddressDestination:   destinationAddress,
15✔
275
                        MsgCounter:           msgCounter,
15✔
276
                        CmdClassifier:        &cmdClassifier,
15✔
277
                },
15✔
278
                Payload: model.PayloadType{
15✔
279
                        Cmd: []model.CmdType{cmd},
15✔
280
                },
15✔
281
        }
15✔
282

15✔
283
        c.muxNotifyCache.Lock()
15✔
284
        c.datagramNotifyCache.Put(*msgCounter, datagram)
15✔
285
        c.muxNotifyCache.Unlock()
15✔
286

15✔
287
        return msgCounter, c.sendSpineMessage(datagram)
15✔
288
}
15✔
289

290
// Write sends notification to destination
291
func (c *Sender) Write(senderAddress, destinationAddress *model.FeatureAddressType, cmd model.CmdType) (*model.MsgCounterType, error) {
2✔
292
        msgCounter := c.getMsgCounter()
2✔
293

2✔
294
        cmdClassifier := model.CmdClassifierTypeWrite
2✔
295
        ackRequest := true
2✔
296

2✔
297
        datagram := model.DatagramType{
2✔
298
                Header: model.HeaderType{
2✔
299
                        SpecificationVersion: &SpecificationVersion,
2✔
300
                        AddressSource:        senderAddress,
2✔
301
                        AddressDestination:   destinationAddress,
2✔
302
                        MsgCounter:           msgCounter,
2✔
303
                        CmdClassifier:        &cmdClassifier,
2✔
304
                        AckRequest:           &ackRequest,
2✔
305
                },
2✔
306
                Payload: model.PayloadType{
2✔
307
                        Cmd: []model.CmdType{cmd},
2✔
308
                },
2✔
309
        }
2✔
310

2✔
311
        return msgCounter, c.sendSpineMessage(datagram)
2✔
312
}
2✔
313

314
// Send a subscription request to a remote server feature
315
func (c *Sender) Subscribe(senderAddress, destinationAddress *model.FeatureAddressType, serverFeatureType model.FeatureTypeType) (*model.MsgCounterType, error) {
7✔
316
        cmd := model.CmdType{
7✔
317
                NodeManagementSubscriptionRequestCall: NewNodeManagementSubscriptionRequestCallType(senderAddress, destinationAddress, serverFeatureType),
7✔
318
        }
7✔
319

7✔
320
        // we always send it to the remote NodeManagement feature, which always is at entity:[0],feature:0
7✔
321
        localAddress := NodeManagementAddress(senderAddress.Device)
7✔
322
        remoteAddress := NodeManagementAddress(destinationAddress.Device)
7✔
323

7✔
324
        return c.Request(model.CmdClassifierTypeCall, localAddress, remoteAddress, true, []model.CmdType{cmd})
7✔
325
}
7✔
326

327
// Send a subscription deletion request to a remote server feature
328
func (c *Sender) Unsubscribe(senderAddress, destinationAddress *model.FeatureAddressType) (*model.MsgCounterType, error) {
3✔
329
        cmd := model.CmdType{
3✔
330
                NodeManagementSubscriptionDeleteCall: NewNodeManagementSubscriptionDeleteCallType(senderAddress, destinationAddress),
3✔
331
        }
3✔
332

3✔
333
        // we always send it to the remote NodeManagement feature, which always is at entity:[0],feature:0
3✔
334
        localAddress := NodeManagementAddress(senderAddress.Device)
3✔
335
        remoteAddress := NodeManagementAddress(destinationAddress.Device)
3✔
336

3✔
337
        return c.Request(model.CmdClassifierTypeCall, localAddress, remoteAddress, true, []model.CmdType{cmd})
3✔
338
}
3✔
339

340
// Send a binding request to a remote server feature
341
func (c *Sender) Bind(senderAddress, destinationAddress *model.FeatureAddressType, serverFeatureType model.FeatureTypeType) (*model.MsgCounterType, error) {
3✔
342
        cmd := model.CmdType{
3✔
343
                NodeManagementBindingRequestCall: NewNodeManagementBindingRequestCallType(senderAddress, destinationAddress, serverFeatureType),
3✔
344
        }
3✔
345

3✔
346
        // we always send it to the remote NodeManagement feature, which always is at entity:[0],feature:0
3✔
347
        localAddress := NodeManagementAddress(senderAddress.Device)
3✔
348
        remoteAddress := NodeManagementAddress(destinationAddress.Device)
3✔
349

3✔
350
        return c.Request(model.CmdClassifierTypeCall, localAddress, remoteAddress, true, []model.CmdType{cmd})
3✔
351
}
3✔
352

353
// Send a binding request to a remote server feature
354
func (c *Sender) Unbind(senderAddress, destinationAddress *model.FeatureAddressType) (*model.MsgCounterType, error) {
3✔
355
        cmd := model.CmdType{
3✔
356
                NodeManagementBindingDeleteCall: NewNodeManagementBindingDeleteCallType(senderAddress, destinationAddress),
3✔
357
        }
3✔
358

3✔
359
        // we always send it to the remote NodeManagement feature, which always is at entity:[0],feature:0
3✔
360
        localAddress := NodeManagementAddress(senderAddress.Device)
3✔
361
        remoteAddress := NodeManagementAddress(destinationAddress.Device)
3✔
362

3✔
363
        return c.Request(model.CmdClassifierTypeCall, localAddress, remoteAddress, true, []model.CmdType{cmd})
3✔
364
}
3✔
365

366
func (c *Sender) getMsgCounter() *model.MsgCounterType {
123✔
367
        // TODO:  persistence
123✔
368
        i := model.MsgCounterType(atomic.AddUint64(&c.msgNum, 1))
123✔
369
        return &i
123✔
370
}
123✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc