• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fogfish / swarm / 11110439377

30 Sep 2024 04:40PM UTC coverage: 60.391% (+10.0%) from 50.381%
11110439377

push

github

web-flow
Modules `queue` and `qtest` are not needed with in the new kernel (#93)

988 of 1636 relevant lines covered (60.39%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.18
/kernel/cathode.go
1
//
2
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
3
//
4
// This file may be modified and distributed under the terms
5
// of the Apache License Version 2.0. See the LICENSE file for details.
6
// https://github.com/fogfish/swarm
7
//
8

9
package kernel
10

11
import (
12
        "context"
13
        "log/slog"
14
        "sync"
15
        "time"
16

17
        "github.com/fogfish/swarm"
18
)
19

20
// Cathode defines on-the-wire protocol for [swarm.Bag], covering the ingress.
21
type Cathode interface {
22
        Ack(ctx context.Context, digest string) error
23
        Err(ctx context.Context, digest string, err error) error
24
        Ask(ctx context.Context) ([]swarm.Bag, error)
25
}
26

27
// Decode message from wire format
28
type Decoder[T any] interface{ Decode([]byte) (T, error) }
29

30
type Router = interface {
31
        Route(context.Context, swarm.Bag) error
32
}
33

34
type Dequeuer struct {
35
        sync.WaitGroup
36
        sync.RWMutex
37

38
        // Control-plane stop channel used by go routines to stop I/O on data channels
39
        context context.Context
40
        cancel  context.CancelFunc
41

42
        // Kernel configuration
43
        Config swarm.Config
44

45
        // event router, binds category with destination channel
46
        router map[string]Router
47

48
        // Cathode is the reader port on message broker
49
        Cathode Cathode
50
}
51

52
// Creates instance of broker reader
53
func NewDequeuer(cathode Cathode, config swarm.Config) *Dequeuer {
1✔
54
        ctx, can := context.WithCancel(context.Background())
1✔
55

1✔
56
        return &Dequeuer{
1✔
57
                Config:  config,
1✔
58
                context: ctx,
1✔
59
                cancel:  can,
1✔
60
                router:  make(map[string]Router),
1✔
61
                Cathode: cathode,
1✔
62
        }
1✔
63
}
1✔
64

65
// Closes broker reader, gracefully shutdowns all I/O
66
func (k *Dequeuer) Close() {
1✔
67
        k.cancel()
1✔
68
        k.WaitGroup.Wait()
1✔
69
}
1✔
70

71
// Await reader to complete
72
func (k *Dequeuer) Await() {
1✔
73
        if spawner, ok := k.Cathode.(interface{ Run() }); ok {
2✔
74
                go spawner.Run()
1✔
75
        }
1✔
76

77
        k.receive()
1✔
78
        <-k.context.Done()
1✔
79
        k.WaitGroup.Wait()
1✔
80
}
81

82
// internal infinite receive loop.
83
// waiting for message from event buses and queues and schedules it for delivery.
84
func (k *Dequeuer) receive() {
1✔
85
        asker := func() {
2✔
86
                seq, err := k.Cathode.Ask(k.context)
1✔
87
                if k.Config.StdErr != nil && err != nil {
1✔
88
                        k.Config.StdErr <- err
×
89
                        return
×
90
                }
×
91

92
                for i := 0; i < len(seq); i++ {
2✔
93
                        bag := seq[i]
1✔
94

1✔
95
                        k.RWMutex.RLock()
1✔
96
                        r, has := k.router[bag.Category]
1✔
97
                        k.RWMutex.RUnlock()
1✔
98

1✔
99
                        if has {
2✔
100
                                err := r.Route(k.context, bag)
1✔
101
                                if k.Config.StdErr != nil && err != nil {
1✔
102
                                        k.Config.StdErr <- err
×
103
                                        return
×
104
                                }
×
105
                        }
106
                }
107
        }
108

109
        k.WaitGroup.Add(1)
1✔
110
        go func() {
2✔
111
                slog.Debug("kernel receive loop started")
1✔
112

1✔
113
        exit:
1✔
114
                for {
2✔
115
                        select {
1✔
116
                        case <-k.context.Done():
1✔
117
                                break exit
1✔
118
                        default:
1✔
119
                        }
120

121
                        select {
1✔
122
                        case <-k.context.Done():
1✔
123
                                break exit
1✔
124
                        case <-time.After(k.Config.PollFrequency):
1✔
125
                                asker()
1✔
126
                        }
127
                }
128

129
                k.WaitGroup.Done()
1✔
130
                slog.Debug("kernel receive loop stopped")
1✔
131
        }()
132
}
133

134
// Dequeue creates pair of channels within kernel to enqueue messages
135
func Dequeue[T any](k *Dequeuer, cat string, codec Decoder[T]) ( /*rcv*/ <-chan swarm.Msg[T] /*ack*/, chan<- swarm.Msg[T]) {
1✔
136
        rcv := make(chan swarm.Msg[T], k.Config.CapRcv)
1✔
137
        ack := make(chan swarm.Msg[T], k.Config.CapAck)
1✔
138

1✔
139
        k.RWMutex.Lock()
1✔
140
        k.router[cat] = router[T]{ch: rcv, codec: codec}
1✔
141
        k.RWMutex.Unlock()
1✔
142

1✔
143
        // emitter routine
1✔
144
        acks := func(msg swarm.Msg[T]) {
2✔
145
                if msg.Error == nil {
2✔
146
                        err := k.Cathode.Ack(k.context, msg.Digest)
1✔
147
                        if k.Config.StdErr != nil && err != nil {
1✔
148
                                k.Config.StdErr <- err
×
149
                        }
×
150
                } else {
1✔
151
                        err := k.Cathode.Err(k.context, msg.Digest, msg.Error)
1✔
152
                        if k.Config.StdErr != nil && err != nil {
1✔
153
                                k.Config.StdErr <- err
×
154
                        }
×
155
                }
156
        }
157

158
        k.WaitGroup.Add(1)
1✔
159
        go func() {
2✔
160
                slog.Debug("kernel dequeue started", "cat", cat)
1✔
161

1✔
162
        exit:
1✔
163
                for {
2✔
164
                        // The try-receive operation here is to
1✔
165
                        // try to exit the sender goroutine as
1✔
166
                        // early as possible. Try-receive and
1✔
167
                        // try-send select blocks are specially
1✔
168
                        // optimized by the standard Go
1✔
169
                        // compiler, so they are very efficient.
1✔
170
                        select {
1✔
171
                        case <-k.context.Done():
1✔
172
                                break exit
1✔
173
                        default:
1✔
174
                        }
175

176
                        select {
1✔
177
                        case <-k.context.Done():
1✔
178
                                break exit
1✔
179
                        case msg := <-ack:
1✔
180
                                acks(msg)
1✔
181
                        }
182
                }
183

184
                backlog := len(ack)
1✔
185
                close(ack)
1✔
186

1✔
187
                if backlog != 0 {
1✔
188
                        for msg := range ack {
×
189
                                acks(msg)
×
190
                        }
×
191
                }
192

193
                k.WaitGroup.Done()
1✔
194
                slog.Debug("kernel dequeue stopped", "cat", cat)
1✔
195
        }()
196

197
        return rcv, ack
1✔
198
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc