• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

enbility / ship-go / 11201256463

06 Oct 2024 10:57AM UTC coverage: 92.791% (-0.04%) from 92.828%
11201256463

push

github

DerAndereAndi
Fix type naming

2 of 2 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

2523 of 2719 relevant lines covered (92.79%)

13.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.4
/mdns/avahi.go
1
package mdns
2

3
import (
4
        "fmt"
5
        "net"
6
        "sync"
7
        "time"
8

9
        "github.com/enbility/go-avahi"
10
        "github.com/enbility/ship-go/api"
11
        "github.com/enbility/ship-go/logging"
12
)
13

14
type mdnsServiceData struct {
15
        // the service name
16
        Name string
17
        // the service port
18
        Port int
19
        // the service txt
20
        Txt []string
21
}
22

23
type AvahiProvider struct {
24
        ifaceIndexes []int32
25

26
        avServer     avahi.ServerInterface
27
        avEntryGroup avahi.EntryGroupInterface
28
        avBrowser    avahi.ServiceBrowserInterface
29

30
        autoReconnect   bool
31
        manualShutdown  bool
32
        setupSuccessful bool
33
        listenerRunning bool
34

35
        mdnsServiceData *mdnsServiceData
36

37
        resolveCB api.MdnsResolveCB
38

39
        // Used to store the service elements for each service, so that we can recall them when a service is removed
40
        serviceElements map[string]map[string]string
41

42
        shutdownChan                      chan struct{}
43
        addServiceChan, removeServiceChan chan avahi.Service
44

45
        mux   sync.Mutex
46
        muxEl sync.RWMutex // used for serviceElements
47
}
48

49
func NewAvahiProvider(ifaceIndexes []int32) *AvahiProvider {
11✔
50
        return &AvahiProvider{
11✔
51
                avServer:        avahi.ServerNew(),
11✔
52
                setupSuccessful: false,
11✔
53
                ifaceIndexes:    ifaceIndexes,
11✔
54
                serviceElements: make(map[string]map[string]string),
11✔
55
        }
11✔
56
}
11✔
57

58
var _ api.MdnsProviderInterface = (*AvahiProvider)(nil)
59

60
func (a *AvahiProvider) Start(autoReconnect bool, cb api.MdnsResolveCB) bool {
14✔
61
        a.mux.Lock()
14✔
62
        defer a.mux.Unlock()
14✔
63

14✔
64
        a.autoReconnect = autoReconnect
14✔
65
        a.resolveCB = cb
14✔
66
        a.manualShutdown = false
14✔
67

14✔
68
        err := a.avServer.Setup(a.avahiCallback)
14✔
69
        if err != nil {
15✔
70
                return false
1✔
71
        }
1✔
72
        a.setupSuccessful = true
13✔
73
        if a.shutdownChan == nil {
23✔
74
                a.shutdownChan = make(chan struct{})
10✔
75
        }
10✔
76
        if a.addServiceChan == nil {
23✔
77
                a.addServiceChan = make(chan avahi.Service)
10✔
78
        }
10✔
79
        if a.removeServiceChan == nil {
23✔
80
                a.removeServiceChan = make(chan avahi.Service)
10✔
81
        }
10✔
82

83
        a.avServer.Start()
13✔
84

13✔
85
        if _, err := a.avServer.GetAPIVersion(); err != nil {
20✔
86
                a.avServer.Shutdown()
7✔
87
                return false
7✔
88
        }
7✔
89

90
        // instead of limiting search on specific allowed interfaces, we allow all and filter the results
91
        avBrowser, err := a.avServer.ServiceBrowserNew(a.addServiceChan, a.removeServiceChan, avahi.InterfaceUnspec, avahi.ProtoUnspec, shipZeroConfServiceType, shipZeroConfDomain, 0)
6✔
92
        if err != nil || avBrowser == nil {
7✔
93
                a.avServer.Shutdown()
1✔
94
                return false
1✔
95
        }
1✔
96

97
        a.avBrowser = avBrowser
5✔
98

5✔
99
        // autoReconnect is only called with false if the systems does not know if
5✔
100
        // avahi should be used in the first place.
5✔
101
        // but if it was found and therefor being used, it should automatically reconnect once disconnected
5✔
102
        if !autoReconnect {
6✔
103
                a.autoReconnect = true
1✔
104
        }
1✔
105

106
        if !a.listenerRunning {
9✔
107
                a.listenerRunning = true
4✔
108
                go a.chanListener(cb)
4✔
109
        }
4✔
110

111
        return true
5✔
112
}
113

114
func (a *AvahiProvider) Shutdown() {
12✔
115
        a.mux.Lock()
12✔
116
        a.manualShutdown = true
12✔
117

12✔
118
        if !a.setupSuccessful {
13✔
119
                a.mux.Unlock()
1✔
120
                return
1✔
121
        }
1✔
122

123
        // when shutting down on purpose, do not try to reconnect
124
        a.autoReconnect = false
11✔
125
        if a.avBrowser != nil {
15✔
126
                a.avServer.ServiceBrowserFree(a.avBrowser)
4✔
127
                a.avBrowser = nil
4✔
128

4✔
129
                if a.listenerRunning {
8✔
130
                        // stop the currently running resolve
4✔
131
                        a.shutdownChan <- struct{}{}
4✔
132
                }
4✔
133
        }
134
        a.listenerRunning = false
11✔
135
        if a.shutdownChan != nil {
21✔
136
                close(a.shutdownChan)
10✔
137
                a.shutdownChan = nil
10✔
138
        }
10✔
139
        if a.addServiceChan != nil {
21✔
140
                close(a.addServiceChan)
10✔
141
                a.addServiceChan = nil
10✔
142
        }
10✔
143
        if a.removeServiceChan != nil {
21✔
144
                close(a.removeServiceChan)
10✔
145
                a.removeServiceChan = nil
10✔
146
        }
10✔
147
        a.mux.Unlock()
11✔
148

11✔
149
        // Unannounce the service
11✔
150
        a.Unannounce()
11✔
151

11✔
152
        a.mux.Lock()
11✔
153
        defer a.mux.Unlock()
11✔
154

11✔
155
        a.avServer.Shutdown()
11✔
156
        a.avEntryGroup = nil
11✔
157
}
158

159
func (a *AvahiProvider) Announce(serviceName string, port int, txt []string) error {
9✔
160
        a.mux.Lock()
9✔
161
        defer a.mux.Unlock()
9✔
162

9✔
163
        // store the data for reconnection
9✔
164
        a.mdnsServiceData = &mdnsServiceData{
9✔
165
                Name: serviceName,
9✔
166
                Port: port,
9✔
167
                Txt:  txt,
9✔
168
        }
9✔
169

9✔
170
        logging.Log().Debug("mdns: using avahi")
9✔
171

9✔
172
        var btxt [][]byte
9✔
173
        for _, t := range txt {
36✔
174
                btxt = append(btxt, []byte(t))
27✔
175
        }
27✔
176

177
        entryGroup, err := a.avServer.EntryGroupNew()
9✔
178
        if err != nil {
12✔
179
                return err
3✔
180
        }
3✔
181

182
        for _, iface := range a.ifaceIndexes {
12✔
183
                // conversion is safe, as port values are always positive
6✔
184
                err = entryGroup.AddService(iface, avahi.ProtoUnspec, 0, serviceName, shipZeroConfServiceType, shipZeroConfDomain, "", uint16(port), btxt) // #nosec G115
6✔
185
                if err != nil {
7✔
186
                        return err
1✔
187
                }
1✔
188
        }
189

190
        err = entryGroup.Commit()
5✔
191
        if err != nil {
6✔
192
                return err
1✔
193
        }
1✔
194

195
        a.avEntryGroup = entryGroup
4✔
196

4✔
197
        return nil
4✔
198
}
199

200
func (a *AvahiProvider) Unannounce() {
14✔
201
        a.mux.Lock()
14✔
202
        defer a.mux.Unlock()
14✔
203

14✔
204
        // clean up the reconnection data
14✔
205
        a.mdnsServiceData = nil
14✔
206

14✔
207
        if a.avEntryGroup == nil {
25✔
208
                return
11✔
209
        }
11✔
210

211
        a.avServer.EntryGroupFree(a.avEntryGroup)
3✔
212
        a.avEntryGroup = nil
3✔
213
}
214

215
func (a *AvahiProvider) avahiCallback(event avahi.Event) {
7✔
216
        a.mux.Lock()
7✔
217
        // if there is a manual shutdown, we do not want to reconnect
7✔
218
        if a.manualShutdown || !a.autoReconnect || event != avahi.Disconnected {
11✔
219
                a.mux.Unlock()
4✔
220
                return
4✔
221
        }
4✔
222

223
        logging.Log().Debug("mdns: avahi - disconnected")
3✔
224

3✔
225
        // the server was shutdown, set it to nil so we don't try to call free functions
3✔
226
        // on shutting down a currently running resolve
3✔
227
        cb := a.resolveCB
3✔
228
        var serviceData *mdnsServiceData
3✔
229
        if a.mdnsServiceData != nil {
5✔
230
                serviceData = a.mdnsServiceData
2✔
231
        }
2✔
232
        a.mux.Unlock()
3✔
233

3✔
234
        // try to reconnect until successull
3✔
235
        go a.attemptReconnect(cb, serviceData)
3✔
236
}
237

238
// attempt to reconnect to the avahi daemon endlessly
239
func (a *AvahiProvider) attemptReconnect(cb api.MdnsResolveCB, serviceData *mdnsServiceData) {
3✔
240
        for {
6✔
241
                a.mux.Lock()
3✔
242
                isManualShutdown := a.manualShutdown
3✔
243
                a.mux.Unlock()
3✔
244
                if isManualShutdown {
5✔
245
                        return
2✔
246
                }
2✔
247

248
                <-time.After(time.Second)
1✔
249

1✔
250
                if !a.Start(true, cb) {
1✔
UNCOV
251
                        continue
×
252
                }
253

254
                logging.Log().Debug("mdns: avahi - reconnected")
1✔
255

1✔
256
                if serviceData != nil {
2✔
257
                        if err := a.Announce(serviceData.Name, serviceData.Port, serviceData.Txt); err != nil {
1✔
258
                                logging.Log().Debug("mdns: avahi - error re-announcing service:", err)
×
259
                        }
×
260
                }
261

262
                return
1✔
263
        }
264
}
265

266
// listen to service changes and shutdown
267
func (a *AvahiProvider) chanListener(cb api.MdnsResolveCB) {
4✔
268
        for {
10✔
269
                select {
6✔
270
                case <-a.shutdownChan:
4✔
271
                        return
4✔
272
                case service := <-a.addServiceChan:
1✔
273
                        if err := a.processService(service, false, cb); err != nil {
1✔
274
                                logging.Log().Debug("mdns: avahi -", err)
×
275
                        }
×
276
                case service := <-a.removeServiceChan:
1✔
277
                        if err := a.processService(service, true, cb); err != nil {
1✔
278
                                logging.Log().Debug("mdns: avahi -", err)
×
279
                        }
×
280
                }
281
        }
282
}
283

284
// process an avahi mDNS service
285
// as avahi returns a service per interface, we need to combine them
286
func (a *AvahiProvider) processService(service avahi.Service, remove bool, cb api.MdnsResolveCB) error {
6✔
287
        // check if the service is within the allowed list
6✔
288
        allow := false
6✔
289
        if len(a.ifaceIndexes) == 1 && a.ifaceIndexes[0] == avahi.InterfaceUnspec {
7✔
290
                allow = true
1✔
291
        } else {
6✔
292
                for _, iface := range a.ifaceIndexes {
10✔
293
                        if service.Interface == iface {
9✔
294
                                allow = true
4✔
295
                                break
4✔
296
                        }
297
                }
298
        }
299

300
        if !allow {
7✔
301
                return fmt.Errorf("ignoring service as its interface is not in the allowed list: %s", service.Name)
1✔
302
        }
1✔
303

304
        if remove {
7✔
305
                return a.processRemovedService(service, cb)
2✔
306
        }
2✔
307

308
        // resolve the new service
309
        resolved, err := a.avServer.ResolveService(service.Interface, service.Protocol, service.Name, service.Type, service.Domain, avahi.ProtoUnspec, 0)
3✔
310
        if err != nil {
3✔
311
                return fmt.Errorf("error resolving service: %s error: %w", service.Name, err)
×
312
        }
×
313

314
        return a.processAddedService(resolved, cb)
3✔
315
}
316

317
func (a *AvahiProvider) processRemovedService(service avahi.Service, cb api.MdnsResolveCB) error {
3✔
318
        logging.Log().Tracef("mdns: avahi - process remove service: %v", service)
3✔
319

3✔
320
        // get the elements for the service
3✔
321
        a.muxEl.RLock()
3✔
322
        elements := a.serviceElements[getServiceUniqueKey(service)]
3✔
323
        a.muxEl.RUnlock()
3✔
324

3✔
325
        cb(elements, service.Name, service.Host, nil, -1, true)
3✔
326

3✔
327
        return nil
3✔
328
}
3✔
329

330
func (a *AvahiProvider) processAddedService(service avahi.Service, cb api.MdnsResolveCB) error {
7✔
331
        // convert [][]byte to []string manually
7✔
332
        var txt []string
7✔
333
        for _, element := range service.Txt {
8✔
334
                txt = append(txt, string(element))
1✔
335
        }
1✔
336
        elements := parseTxt(txt)
7✔
337

7✔
338
        logging.Log().Trace("mdns: avahi - process add service:", service.Name, service.Type, service.Domain, service.Host, service.Address, service.Port, elements)
7✔
339

7✔
340
        address := net.ParseIP(service.Address)
7✔
341
        // if the address can not be used, ignore the entry
7✔
342
        if address == nil || address.IsUnspecified() {
10✔
343
                return fmt.Errorf("service provides unusable address: %s", service.Name)
3✔
344
        }
3✔
345

346
        // add the elements to the map
347
        a.muxEl.Lock()
4✔
348
        a.serviceElements[getServiceUniqueKey(service)] = elements
4✔
349
        a.muxEl.Unlock()
4✔
350

4✔
351
        cb(elements, service.Name, service.Host, []net.IP{address}, int(service.Port), false)
4✔
352

4✔
353
        return nil
4✔
354
}
355

356
// Create a unique key for a ship service
357
func getServiceUniqueKey(service avahi.Service) string {
8✔
358
        return fmt.Sprintf("%s-%s-%s-%d-%d", service.Name, service.Type, service.Domain, service.Protocol, service.Interface)
8✔
359
}
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc