• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fogfish / swarm / 11113666559

30 Sep 2024 08:30PM UTC coverage: 59.94% (-0.5%) from 60.391%
11113666559

push

github

web-flow
kernel v0.20.1 with minor improvements (#94)

18 of 53 new or added lines in 8 files covered. (33.96%)

3 existing lines in 3 files now uncovered.

998 of 1665 relevant lines covered (59.94%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.91
/kernel/cathode.go
1
//
2
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
3
//
4
// This file may be modified and distributed under the terms
5
// of the Apache License Version 2.0. See the LICENSE file for details.
6
// https://github.com/fogfish/swarm
7
//
8

9
package kernel
10

11
import (
12
        "context"
13
        "fmt"
14
        "log/slog"
15
        "sync"
16
        "time"
17

18
        "github.com/fogfish/swarm"
19
)
20

21
// Cathode defines on-the-wire protocol for [swarm.Bag], covering the ingress.
22
type Cathode interface {
23
        Ack(ctx context.Context, digest string) error
24
        Err(ctx context.Context, digest string, err error) error
25
        Ask(ctx context.Context) ([]swarm.Bag, error)
26
}
27

28
// Decode message from wire format
29
type Decoder[T any] interface{ Decode([]byte) (T, error) }
30

31
type Router = interface {
32
        Route(context.Context, swarm.Bag) error
33
}
34

35
type Dequeuer struct {
36
        sync.WaitGroup
37
        sync.RWMutex
38

39
        // Control-plane stop channel used by go routines to stop I/O on data channels
40
        context context.Context
41
        cancel  context.CancelFunc
42

43
        // Kernel configuration
44
        Config swarm.Config
45

46
        // event router, binds category with destination channel
47
        router map[string]Router
48

49
        // Cathode is the reader port on message broker
50
        Cathode Cathode
51
}
52

53
// Creates instance of broker reader
54
func NewDequeuer(cathode Cathode, config swarm.Config) *Dequeuer {
1✔
55
        ctx, can := context.WithCancel(context.Background())
1✔
56

1✔
57
        return &Dequeuer{
1✔
58
                Config:  config,
1✔
59
                context: ctx,
1✔
60
                cancel:  can,
1✔
61
                router:  make(map[string]Router),
1✔
62
                Cathode: cathode,
1✔
63
        }
1✔
64
}
1✔
65

66
// Closes broker reader, gracefully shutdowns all I/O
67
func (k *Dequeuer) Close() {
1✔
68
        k.cancel()
1✔
69
        k.WaitGroup.Wait()
1✔
70
}
1✔
71

72
// Await reader to complete
73
func (k *Dequeuer) Await() {
1✔
74
        if spawner, ok := k.Cathode.(interface{ Run() }); ok {
2✔
75
                go spawner.Run()
1✔
76
        }
1✔
77

78
        k.receive()
1✔
79
        <-k.context.Done()
1✔
80
        k.WaitGroup.Wait()
1✔
81
}
82

83
// internal infinite receive loop.
84
// waiting for message from event buses and queues and schedules it for delivery.
85
func (k *Dequeuer) receive() {
1✔
86
        asker := func() {
2✔
87
                seq, err := k.Cathode.Ask(k.context)
1✔
88
                if k.Config.StdErr != nil && err != nil {
1✔
89
                        k.Config.StdErr <- err
×
90
                        return
×
91
                }
×
92

93
                for i := 0; i < len(seq); i++ {
2✔
94
                        bag := seq[i]
1✔
95

1✔
96
                        k.RWMutex.RLock()
1✔
97
                        r, has := k.router[bag.Category]
1✔
98
                        k.RWMutex.RUnlock()
1✔
99

1✔
100
                        if has {
2✔
101
                                err := r.Route(k.context, bag)
1✔
102
                                if k.Config.StdErr != nil && err != nil {
1✔
103
                                        k.Config.StdErr <- err
×
104
                                        return
×
105
                                }
×
NEW
106
                        } else {
×
NEW
107
                                if k.Config.FailOnUnknownCategory {
×
NEW
108
                                        slog.Error("Unknown category", "cat", bag.Category, "kernel", k.Config.Source)
×
NEW
109
                                        k.Cathode.Err(k.context, bag.Digest, swarm.ErrDequeue.New(fmt.Errorf("unknown category %s ", bag.Category)))
×
NEW
110
                                } else {
×
NEW
111
                                        slog.Warn("Unknown category", "cat", bag.Category, "kernel", k.Config.Source)
×
NEW
112
                                }
×
113
                        }
114
                }
115
        }
116

117
        k.WaitGroup.Add(1)
1✔
118
        go func() {
2✔
119
                slog.Debug("kernel receive loop started")
1✔
120

1✔
121
        exit:
1✔
122
                for {
2✔
123
                        select {
1✔
124
                        case <-k.context.Done():
1✔
125
                                break exit
1✔
126
                        default:
1✔
127
                        }
128

129
                        select {
1✔
130
                        case <-k.context.Done():
1✔
131
                                break exit
1✔
132
                        case <-time.After(k.Config.PollFrequency):
1✔
133
                                asker()
1✔
134
                        }
135
                }
136

137
                k.WaitGroup.Done()
1✔
138
                slog.Debug("kernel receive loop stopped")
1✔
139
        }()
140
}
141

142
// Dequeue creates pair of channels within kernel to enqueue messages
143
func Dequeue[T any](k *Dequeuer, cat string, codec Decoder[T]) ( /*rcv*/ <-chan swarm.Msg[T] /*ack*/, chan<- swarm.Msg[T]) {
1✔
144
        rcv := make(chan swarm.Msg[T], k.Config.CapRcv)
1✔
145
        ack := make(chan swarm.Msg[T], k.Config.CapAck)
1✔
146

1✔
147
        k.RWMutex.Lock()
1✔
148
        k.router[cat] = router[T]{ch: rcv, codec: codec}
1✔
149
        k.RWMutex.Unlock()
1✔
150

1✔
151
        // emitter routine
1✔
152
        acks := func(msg swarm.Msg[T]) {
2✔
153
                if msg.Error == nil {
2✔
154
                        err := k.Cathode.Ack(k.context, msg.Digest)
1✔
155
                        if k.Config.StdErr != nil && err != nil {
1✔
156
                                k.Config.StdErr <- err
×
157
                        }
×
158
                } else {
1✔
159
                        err := k.Cathode.Err(k.context, msg.Digest, msg.Error)
1✔
160
                        if k.Config.StdErr != nil && err != nil {
1✔
161
                                k.Config.StdErr <- err
×
162
                        }
×
163
                }
164
        }
165

166
        k.WaitGroup.Add(1)
1✔
167
        go func() {
2✔
168
                slog.Debug("kernel dequeue started", "cat", cat)
1✔
169

1✔
170
        exit:
1✔
171
                for {
2✔
172
                        // The try-receive operation here is to
1✔
173
                        // try to exit the sender goroutine as
1✔
174
                        // early as possible. Try-receive and
1✔
175
                        // try-send select blocks are specially
1✔
176
                        // optimized by the standard Go
1✔
177
                        // compiler, so they are very efficient.
1✔
178
                        select {
1✔
179
                        case <-k.context.Done():
1✔
180
                                break exit
1✔
181
                        default:
1✔
182
                        }
183

184
                        select {
1✔
185
                        case <-k.context.Done():
1✔
186
                                break exit
1✔
187
                        case msg := <-ack:
1✔
188
                                acks(msg)
1✔
189
                        }
190
                }
191

192
                backlog := len(ack)
1✔
193
                close(ack)
1✔
194

1✔
195
                if backlog != 0 {
1✔
196
                        for msg := range ack {
×
197
                                acks(msg)
×
198
                        }
×
199
                }
200

201
                k.WaitGroup.Done()
1✔
202
                slog.Debug("kernel dequeue stopped", "cat", cat)
1✔
203
        }()
204

205
        return rcv, ack
1✔
206
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc