• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geniusrabbit / eventstream / 3953294224

pending completion
3953294224

push

github

Dmitry Ponomarev
Configs fixes and improvements

147 of 219 new or added lines in 13 files covered. (67.12%)

20 existing lines in 1 file now uncovered.

718 of 1495 relevant lines covered (48.03%)

1.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/storage/sql/stream.go
1
//
2
// @project geniusrabbit::eventstream 2017 - 2023
3
// @author Dmitry Ponomarev <demdxx@gmail.com> 2017 - 2023
4
//
5

6
// TODO: Store all messages if not sended yet
7

8
package sql
9

10
import (
11
        "context"
12
        "database/sql"
13
        "errors"
14
        "sync/atomic"
15
        "time"
16

17
        "go.uber.org/zap"
18

19
        "github.com/geniusrabbit/eventstream"
20
        "github.com/geniusrabbit/eventstream/internal/message"
21
)
22

23
var (
24
        errInvalidQueryObject = errors.New("[sql] invalid query object")
25
)
26

27
// Connector to DB
28
type Connector interface {
29
        Connection() (*sql.DB, error)
30
}
31

32
// StreamSQL stream
33
type StreamSQL struct {
34
        isWriting int32
35

36
        // Debug mode of the stream
37
        debug bool
38

39
        // ID of the stream
40
        id string
41

42
        // Connector interface
43
        connector Connector
44

45
        buffer        chan eventstream.Message
46
        blockSize     int // size of the block suitable to save into DB
47
        flushInterval time.Duration
48
        writeLastTime time.Time
49

50
        // Query prepared data formater object
51
        query *Query
52

53
        // Time ticker to pereodic data flush
54
        processTimer *time.Ticker
55

56
        // Logger object of log writing
57
        logger *zap.Logger
58
}
59

60
// NewStreamSQL creates streamer object for SQL based stream integration
61
func NewStreamSQL(id string, connector Connector, options ...Option) (eventstream.Streamer, error) {
×
62
        var opts Options
×
63
        for _, opt := range options {
×
64
                if err := opt(&opts); err != nil {
×
65
                        return nil, err
×
66
                }
×
67
        }
68
        if opts.QueryBuilder == nil {
×
69
                return nil, errInvalidQueryObject
×
70
        }
×
71
        return &StreamSQL{
×
72
                debug:         opts.Debug,
×
73
                id:            id,
×
74
                connector:     connector,
×
75
                blockSize:     opts.getBlockSize(),
×
76
                flushInterval: opts.getFlushInterval(),
×
77
                buffer:        make(chan eventstream.Message, opts.getBlockSize()*2),
×
78
                query:         opts.QueryBuilder,
×
79
                logger:        opts.getLogger(),
×
80
        }, nil
×
81
}
82

83
// ID returns unical stream identificator
84
func (s *StreamSQL) ID() string {
×
85
        return s.id
×
86
}
×
87

88
// Put message to stream
89
func (s *StreamSQL) Put(ctx context.Context, msg eventstream.Message) error {
×
90
        if s.debug {
×
91
                s.logger.Debug(`put-message`, zap.Any(`message`, msg))
×
92
        }
×
93
        s.buffer <- msg
×
94
        return nil
×
95
}
96

97
// Run SQL writer daemon
98
func (s *StreamSQL) Run(ctx context.Context) error {
×
99
        if s.processTimer != nil {
×
100
                s.processTimer.Stop()
×
101
        }
×
102

103
        s.writeLastTime = time.Now()
×
104
        s.processTimer = time.NewTicker(time.Millisecond * 50)
×
105
        ch := s.processTimer.C
×
106

×
107
        for _, ok := <-ch; ok; {
×
108
                if lastMsg, err := s.writeBuffer(false); err != nil {
×
109
                        s.logger.Error(`write-buffer`,
×
110
                                zap.String(`last_message`, lastMsg.JSON()),
×
111
                                zap.Error(err))
×
112
                }
×
113
                time.Sleep(time.Millisecond * 50)
×
114
        }
115
        return nil
×
116
}
117

118
// Check message value
119
func (s *StreamSQL) Check(ctx context.Context, msg eventstream.Message) bool {
×
120
        return true
×
121
}
×
122

123
// Close implementation
124
func (s *StreamSQL) Close() (err error) {
×
125
        if s.processTimer != nil {
×
126
                s.processTimer.Stop()
×
127
                s.processTimer = nil
×
128
        }
×
129
        var lastMsg message.Message
×
130
        if lastMsg, err = s.writeBuffer(true); err != nil {
×
131
                s.logger.Error(`Close::write-buffer`,
×
132
                        zap.String(`last_message`, lastMsg.JSON()),
×
133
                        zap.Error(err))
×
134
        }
×
135
        close(s.buffer)
×
136
        return err
×
137
}
138

139
// writeBuffer all data
140
func (s *StreamSQL) writeBuffer(flush bool) (msg message.Message, err error) {
×
141
        if !atomic.CompareAndSwapInt32(&s.isWriting, 0, 1) {
×
142
                return nil, nil
×
143
        }
×
144

145
        var (
×
146
                tx       *sql.Tx
×
147
                stmt     *sql.Stmt
×
148
                stop     = false
×
149
                conn     *sql.DB
×
150
                now      = time.Now()
×
151
                interval = now.Sub(s.writeLastTime)
×
152
        )
×
153

×
154
        defer func() {
×
155
                if rec := recover(); rec != nil {
×
156
                        s.logger.Error(`write-buffer`,
×
157
                                zap.String(`last_message`, msg.JSON()),
×
158
                                zap.Any(`error`, rec))
×
159
                        if tx != nil {
×
160
                                _ = tx.Rollback()
×
161
                        }
×
162
                }
163
                atomic.StoreInt32(&s.isWriting, 0)
×
164
        }()
165

166
        if !flush {
×
167
                if c := len(s.buffer); c < 1 || (s.blockSize > c && interval < s.flushInterval) {
×
168
                        return nil, err
×
169
                }
×
170
        }
171
        if conn, err = s.connector.Connection(); err != nil {
×
172
                return nil, err
×
173
        }
×
174

175
        if s.debug {
×
176
                s.logger.Debug(`write-buffer`,
×
177
                        zap.Bool(`hardflush`, flush),
×
178
                        zap.Duration(`interval`, interval),
×
179
                        zap.String(`query`, s.query.QueryString()),
×
180
                )
×
181
        }
×
182

183
        if tx, err = conn.Begin(); err != nil {
×
184
                return nil, err
×
185
        }
×
186
        if stmt, err = tx.Prepare(s.query.QueryString()); err != nil {
×
187
                _ = tx.Rollback()
×
188
                return nil, err
×
189
        }
×
190

191
        // Writing loop of prepared requests
192
        for !stop {
×
193
                select {
×
194
                case msg = <-s.buffer:
×
195
                        if s.debug {
×
196
                                s.logger.Debug(`write-message`, zap.Any(`message`, msg))
×
197
                        }
×
NEW
198
                        listParams := s.query.ParamsBy(msg)
×
NEW
199
                        for _, params := range listParams {
×
NEW
200
                                if _, err = stmt.Exec(params...); err != nil {
×
NEW
201
                                        stop = true
×
NEW
202
                                }
×
203
                        }
NEW
204
                        listParams.release()
×
205
                default:
×
206
                        stop = true
×
207
                }
208
        }
209

210
        if err == nil {
×
211
                _, _ = stmt.Exec()
×
212
                err = tx.Commit()
×
213
        } else {
×
214
                _ = tx.Rollback()
×
215
        }
×
216

217
        s.writeLastTime = time.Now()
×
218
        return msg, err
×
219
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc