• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

raystack / meteor / 11609503192

31 Oct 2024 09:37AM UTC coverage: 82.814% (+0.05%) from 82.764%
11609503192

push

github

ravisuhag
feat: added sink batch size config for sink concurrency

1 of 1 new or added line in 1 file covered. (100.0%)

2 existing lines in 1 file now uncovered.

6809 of 8222 relevant lines covered (82.81%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.75
/agent/stream.go
1
package agent
2

3
import (
4
        "fmt"
5
        "sync"
6

7
        "github.com/raystack/meteor/models"
8
)
9

10
type (
11
        streamMiddleware func(src models.Record) (dst models.Record, err error)
12
        subscriber       struct {
13
                callback  func([]models.Record) error
14
                channel   chan models.Record
15
                batchSize int
16
        }
17
)
18

19
type stream struct {
20
        middlewares []streamMiddleware
21
        subscribers []*subscriber
22
        onCloses    []func()
23
        mu          sync.Mutex
24
        shutdown    bool
25
        closed      bool
26
        err         error
27
}
28

29
func newStream() *stream {
1✔
30
        return &stream{}
1✔
31
}
1✔
32

33
// subscribe() will register callback with a batch size to the emitter.
34
// Calling this will not start listening yet, use broadcast() to start sending data to subscriber.
35
func (s *stream) subscribe(callback func(batch []models.Record) error, batchSize int) *stream {
1✔
36
        s.subscribers = append(s.subscribers, &subscriber{
1✔
37
                callback:  callback,
1✔
38
                batchSize: batchSize,
1✔
39
                channel:   make(chan models.Record),
1✔
40
        })
1✔
41

1✔
42
        return s
1✔
43
}
1✔
44

45
// onClose() is used to register callback for after stream is closed.
46
func (s *stream) onClose(callback func()) *stream {
1✔
47
        s.onCloses = append(s.onCloses, callback)
1✔
48

1✔
49
        return s
1✔
50
}
1✔
51

52
// broadcast() will start listening to emitter for any pushed data.
53
// This process is blocking, so most times you would want to call this inside a goroutine.
54
func (s *stream) broadcast() error {
1✔
55
        var wg sync.WaitGroup
1✔
56
        wg.Add(len(s.subscribers))
1✔
57
        for _, l := range s.subscribers {
2✔
58
                go func(l *subscriber) {
2✔
59
                        defer func() {
2✔
60
                                if r := recover(); r != nil {
1✔
61
                                        s.closeWithError(fmt.Errorf("%s", r))
×
62
                                }
×
63
                                wg.Done()
1✔
64
                        }()
65

66
                        batch := newBatch(l.batchSize)
1✔
67
                        // listen to channel and emit data to subscriber callback if batch is full
1✔
68
                        for d := range l.channel {
2✔
69
                                if err := batch.add(d); err != nil {
1✔
70
                                        s.closeWithError(err)
×
71
                                }
×
72
                                if batch.isFull() {
2✔
73
                                        if err := l.callback(batch.flush()); err != nil {
1✔
UNCOV
74
                                                s.closeWithError(err)
×
UNCOV
75
                                        }
×
76
                                }
77
                        }
78

79
                        // emit leftover data in the batch if any after channel is closed
80
                        if !batch.isEmpty() {
2✔
81
                                if err := l.callback(batch.flush()); err != nil {
2✔
82
                                        s.closeWithError(err)
1✔
83
                                }
1✔
84
                        }
85
                }(l)
86
        }
87

88
        wg.Wait()
1✔
89

1✔
90
        return s.err
1✔
91
}
92

93
// push() will run the record through all the registered middleware
94
// and emit the record to all registered subscribers.
95
func (s *stream) push(data models.Record) {
1✔
96
        data, err := s.runMiddlewares(data)
1✔
97
        if err != nil {
2✔
98
                s.closeWithError(fmt.Errorf("emitter: error running middleware: %w", err))
1✔
99
                return
1✔
100
        }
1✔
101

102
        for _, l := range s.subscribers {
2✔
103
                l.channel <- data
1✔
104
        }
1✔
105
}
106

107
// setMiddleware registers a middleware that will be used to
108
// process given record before broadcasting.
109
func (s *stream) setMiddleware(m streamMiddleware) *stream {
1✔
110
        s.middlewares = append(s.middlewares, m)
1✔
111
        return s
1✔
112
}
1✔
113

114
func (s *stream) closeWithError(err error) {
1✔
115
        s.mu.Lock()
1✔
116
        s.err = err
1✔
117
        s.mu.Unlock()
1✔
118
        s.Close()
1✔
119
}
1✔
120

121
func (s *stream) Shutdown() {
1✔
122
        s.mu.Lock()
1✔
123
        defer s.mu.Unlock()
1✔
124

1✔
125
        if s.shutdown {
2✔
126
                return
1✔
127
        }
1✔
128

129
        for _, l := range s.subscribers {
2✔
130
                close(l.channel)
1✔
131
        }
1✔
132
        s.shutdown = true
1✔
133
}
134

135
// Close the emitter and signalling all subscriber of the event.
136
func (s *stream) Close() {
1✔
137
        s.Shutdown()
1✔
138

1✔
139
        s.mu.Lock()
1✔
140
        defer s.mu.Unlock()
1✔
141

1✔
142
        if s.closed {
2✔
143
                return
1✔
144
        }
1✔
145

146
        for _, onClose := range s.onCloses {
2✔
147
                onClose()
1✔
148
        }
1✔
149
        s.closed = true
1✔
150
}
151

152
func (s *stream) runMiddlewares(d models.Record) (models.Record, error) {
1✔
153
        res := d
1✔
154
        for _, middleware := range s.middlewares {
2✔
155
                var err error
1✔
156
                res, err = middleware(res)
1✔
157
                if err != nil {
2✔
158
                        return models.Record{}, err
1✔
159
                }
1✔
160
        }
161

162
        return res, nil
1✔
163
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc