• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

anycable / anycable-go / 4737922678

pending completion
4737922678

push

github

Vladimir Dementyev
version: 1.4.0

1 of 1 new or added line in 1 file covered. (100.0%)

350 existing lines in 9 files now uncovered.

5018 of 6405 relevant lines covered (78.35%)

7424.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.09
/cli/cli.go
1
package cli
2

3
import (
4
        "fmt"
5
        "net/http"
6
        "os"
7
        "os/signal"
8
        "runtime"
9
        "strconv"
10
        "strings"
11
        "syscall"
12

13
        "github.com/anycable/anycable-go/broadcast"
14
        "github.com/anycable/anycable-go/broker"
15
        "github.com/anycable/anycable-go/common"
16
        "github.com/anycable/anycable-go/config"
17
        "github.com/anycable/anycable-go/enats"
18
        "github.com/anycable/anycable-go/identity"
19
        metricspkg "github.com/anycable/anycable-go/metrics"
20
        "github.com/anycable/anycable-go/mrb"
21
        "github.com/anycable/anycable-go/node"
22
        "github.com/anycable/anycable-go/pubsub"
23
        "github.com/anycable/anycable-go/rails"
24
        "github.com/anycable/anycable-go/router"
25
        "github.com/anycable/anycable-go/server"
26
        "github.com/anycable/anycable-go/utils"
27
        "github.com/anycable/anycable-go/version"
28
        "github.com/anycable/anycable-go/ws"
29
        "github.com/apex/log"
30
        "github.com/gorilla/websocket"
31
        "github.com/joomcode/errorx"
32
        "github.com/syossan27/tebata"
33

34
        "go.uber.org/automaxprocs/maxprocs"
35
)
36

37
type controllerFactory = func(*metricspkg.Metrics, *config.Config) (node.Controller, error)
38
type disconnectorFactory = func(*node.Node, *config.Config) (node.Disconnector, error)
39
type broadcasterFactory = func(broadcast.Handler, *config.Config) (broadcast.Broadcaster, error)
40
type brokerFactory = func(broker.Broadcaster, *config.Config) (broker.Broker, error)
41
type subscriberFactory = func(pubsub.Handler, *config.Config) (pubsub.Subscriber, error)
42
type websocketHandler = func(*node.Node, *config.Config) (http.Handler, error)
43

44
type Shutdownable interface {
45
        Shutdown() error
46
}
47

48
type Runner struct {
49
        options []Option
50

51
        name   string
52
        config *config.Config
53
        log    *log.Entry
54

55
        controllerFactory       controllerFactory
56
        disconnectorFactory     disconnectorFactory
57
        subscriberFactory       subscriberFactory
58
        brokerFactory           brokerFactory
59
        websocketHandlerFactory websocketHandler
60

61
        broadcasters       []broadcasterFactory
62
        websocketEndpoints map[string]websocketHandler
63

64
        router *router.RouterController
65

66
        errChan       chan error
67
        shutdownables []Shutdownable
68
}
69

70
// NewRunner creates returns new Runner structure
71
func NewRunner(c *config.Config, options []Option) (*Runner, error) {
23 all except 4737922678.1 and 4737922678.2 ✔
72
        r := &Runner{
23 all except 4737922678.1 and 4737922678.2 ✔
73
                options:            options,
23 all except 4737922678.1 and 4737922678.2 ✔
74
                config:             c,
23 all except 4737922678.1 and 4737922678.2 ✔
75
                shutdownables:      []Shutdownable{},
23 all except 4737922678.1 and 4737922678.2 ✔
76
                websocketEndpoints: make(map[string]websocketHandler),
23 all except 4737922678.1 and 4737922678.2 ✔
77
                errChan:            make(chan error),
23 all except 4737922678.1 and 4737922678.2 ✔
78
        }
23 all except 4737922678.1 and 4737922678.2 ✔
79

23 all except 4737922678.1 and 4737922678.2 ✔
80
        err := r.checkAndSetDefaults()
23 all except 4737922678.1 and 4737922678.2 ✔
81
        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
82
                return nil, err
×
UNCOV
83
        }
×
84

85
        return r, nil
23 all except 4737922678.1 and 4737922678.2 ✔
86
}
87

88
// checkAndSetDefaults applies passed options and checks that all required fields are set
89
func (r *Runner) checkAndSetDefaults() error {
23 all except 4737922678.1 and 4737922678.2 ✔
90
        for _, o := range r.options {
138 all except 4737922678.1 and 4737922678.2 ✔
91
                err := o(r)
115 all except 4737922678.1 and 4737922678.2 ✔
92
                if err != nil {
115 all except 4737922678.1 and 4737922678.2 ✔
93
                        return err
×
94
                }
×
95
        }
96

97
        err := utils.InitLogger(r.config.LogFormat, r.config.LogLevel)
23 all except 4737922678.1 and 4737922678.2 ✔
98
        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
99
                return errorx.Decorate(err, "!!! Failed to initialize logger !!!")
×
UNCOV
100
        }
×
101

102
        r.log = log.WithFields(log.Fields{"context": "main"})
23 all except 4737922678.1 and 4737922678.2 ✔
103

23 all except 4737922678.1 and 4737922678.2 ✔
104
        err = r.config.LoadPresets()
23 all except 4737922678.1 and 4737922678.2 ✔
105

23 all except 4737922678.1 and 4737922678.2 ✔
106
        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
107
                return errorx.Decorate(err, "!!! Failed to load configuration presets !!!")
×
UNCOV
108
        }
×
109

110
        server.SSL = &r.config.SSL
23 all except 4737922678.1 and 4737922678.2 ✔
111
        server.Host = r.config.Host
23 all except 4737922678.1 and 4737922678.2 ✔
112
        server.MaxConn = r.config.MaxConn
23 all except 4737922678.1 and 4737922678.2 ✔
113

23 all except 4737922678.1 and 4737922678.2 ✔
114
        if r.name == "" {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
115
                return errorx.AssertionFailed.New("Name is blank, specify WithName()")
×
UNCOV
116
        }
×
117

118
        if r.controllerFactory == nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
119
                return errorx.AssertionFailed.New("Controller is blank, specify WithController()")
×
UNCOV
120
        }
×
121

122
        if r.brokerFactory == nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
123
                return errorx.AssertionFailed.New("Broker is blank, specify WithBroker()")
×
UNCOV
124
        }
×
125

126
        if r.subscriberFactory == nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
127
                return errorx.AssertionFailed.New("Subscriber is blank, specify WithSubscriber()")
×
UNCOV
128
        }
×
129

130
        if r.disconnectorFactory == nil {
46 all except 4737922678.1 and 4737922678.2 ✔
131
                r.disconnectorFactory = r.defaultDisconnector
23 all except 4737922678.1 and 4737922678.2 ✔
132
        }
23 all except 4737922678.1 and 4737922678.2 ✔
133

134
        if r.websocketHandlerFactory == nil {
46 all except 4737922678.1 and 4737922678.2 ✔
135
                r.websocketHandlerFactory = r.defaultWebSocketHandler
23 all except 4737922678.1 and 4737922678.2 ✔
136
        }
23 all except 4737922678.1 and 4737922678.2 ✔
137

138
        return nil
23 all except 4737922678.1 and 4737922678.2 ✔
139
}
140

141
// Run starts the instance
142
func (r *Runner) Run() error {
23 all except 4737922678.1 and 4737922678.2 ✔
143
        numProcs := r.setMaxProcs()
23 all except 4737922678.1 and 4737922678.2 ✔
144
        r.announceDebugMode()
23 all except 4737922678.1 and 4737922678.2 ✔
145

23 all except 4737922678.1 and 4737922678.2 ✔
146
        mrubySupport := r.initMRuby()
23 all except 4737922678.1 and 4737922678.2 ✔
147

23 all except 4737922678.1 and 4737922678.2 ✔
148
        r.log.Infof("Starting %s %s%s (pid: %d, open file limit: %s, gomaxprocs: %d)", r.name, version.Version(), mrubySupport, os.Getpid(), utils.OpenFileLimit(), numProcs)
23 all except 4737922678.1 and 4737922678.2 ✔
149

23 all except 4737922678.1 and 4737922678.2 ✔
150
        metrics, err := r.initMetrics(&r.config.Metrics)
23 all except 4737922678.1 and 4737922678.2 ✔
151
        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
152
                return errorx.Decorate(err, "!!! Failed to initialize metrics writer !!!")
×
UNCOV
153
        }
×
154

155
        r.shutdownables = append(r.shutdownables, metrics)
23 all except 4737922678.1 and 4737922678.2 ✔
156

23 all except 4737922678.1 and 4737922678.2 ✔
157
        controller, err := r.newController(metrics)
23 all except 4737922678.1 and 4737922678.2 ✔
158
        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
159
                return err
×
160
        }
×
161

162
        appNode := node.NewNode(controller, metrics, &r.config.App)
23 all except 4737922678.1 and 4737922678.2 ✔
163

23 all except 4737922678.1 and 4737922678.2 ✔
164
        subscriber, err := r.subscriberFactory(appNode, r.config)
23 all except 4737922678.1 and 4737922678.2 ✔
165

23 all except 4737922678.1 and 4737922678.2 ✔
166
        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
167
                return errorx.Decorate(err, "couldn't configure pub/sub")
×
168
        }
×
169

170
        appBroker, err := r.brokerFactory(subscriber, r.config)
23 all except 4737922678.1 and 4737922678.2 ✔
171
        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
172
                return errorx.Decorate(err, "!!! Failed to initialize broker !!!")
×
UNCOV
173
        }
×
174

175
        if appBroker != nil {
46 all except 4737922678.1 and 4737922678.2 ✔
176
                r.log.Infof(appBroker.Announce())
23 all except 4737922678.1 and 4737922678.2 ✔
177
                appNode.SetBroker(appBroker)
23 all except 4737922678.1 and 4737922678.2 ✔
178
        }
23 all except 4737922678.1 and 4737922678.2 ✔
179

180
        disconnector, err := r.disconnectorFactory(appNode, r.config)
23 all except 4737922678.1 and 4737922678.2 ✔
181
        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
182
                return errorx.Decorate(err, "!!! Failed to initialize disconnector !!!")
×
UNCOV
183
        }
×
184

185
        go disconnector.Run() // nolint:errcheck
23 all except 4737922678.1 and 4737922678.2 ✔
186
        appNode.SetDisconnector(disconnector)
23 all except 4737922678.1 and 4737922678.2 ✔
187

23 all except 4737922678.1 and 4737922678.2 ✔
188
        err = appNode.Start()
23 all except 4737922678.1 and 4737922678.2 ✔
189

23 all except 4737922678.1 and 4737922678.2 ✔
190
        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
191
                return errorx.Decorate(err, "!!! Failed to initialize application !!!")
×
UNCOV
192
        }
×
193

194
        if r.config.EmbedNats {
30 all except 4737922678.1 and 4737922678.2 ✔
195
                service, enatsErr := r.embedNATS(&r.config.EmbeddedNats)
7 only 4737922678.3, 4737922678.9, and 4737922678.11 ✔
196

7 only 4737922678.3, 4737922678.9, and 4737922678.11 ✔
197
                if enatsErr != nil {
7 only 4737922678.3, 4737922678.9, and 4737922678.11 ✔
UNCOV
198
                        return errorx.Decorate(enatsErr, "failed to start embedded NATS server")
×
UNCOV
199
                }
×
200

201
                desc := service.Description()
7 only 4737922678.3, 4737922678.9, and 4737922678.11 ✔
202

7 only 4737922678.3, 4737922678.9, and 4737922678.11 ✔
203
                if desc != "" {
10 only 4737922678.3, 4737922678.9, and 4737922678.11 ✔
204
                        desc = fmt.Sprintf(" (%s)", desc)
3 only 4737922678.3 ✔
205
                }
3 only 4737922678.3 ✔
206

207
                r.log.Infof("Embedded NATS server started: %s%s", r.config.EmbeddedNats.ServiceAddr, desc)
7 only 4737922678.3, 4737922678.9, and 4737922678.11 ✔
208

7 only 4737922678.3, 4737922678.9, and 4737922678.11 ✔
209
                r.shutdownables = append(r.shutdownables, service)
7 only 4737922678.3, 4737922678.9, and 4737922678.11 ✔
210
        }
211

212
        err = subscriber.Start(r.errChan)
23 all except 4737922678.1 and 4737922678.2 ✔
213
        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
214
                return errorx.Decorate(err, "!!! Subscriber failed !!!")
×
215
        }
×
216

217
        r.shutdownables = append(r.shutdownables, subscriber)
23 all except 4737922678.1 and 4737922678.2 ✔
218

23 all except 4737922678.1 and 4737922678.2 ✔
219
        if r.broadcasters != nil {
46 all except 4737922678.1 and 4737922678.2 ✔
220
                for _, broadcasterFactory := range r.broadcasters {
46 all except 4737922678.1 and 4737922678.2 ✔
221
                        broadcaster, berr := broadcasterFactory(appNode, r.config)
23 all except 4737922678.1 and 4737922678.2 ✔
222

23 all except 4737922678.1 and 4737922678.2 ✔
223
                        if berr != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
224
                                return errorx.Decorate(err, "couldn't configure broadcaster")
×
UNCOV
225
                        }
×
226

227
                        if broadcaster.IsFanout() && subscriber.IsMultiNode() {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
228
                                r.log.Warnf("Using pub/sub with a distributed broadcaster has no effect")
×
UNCOV
229
                        }
×
230

231
                        if !broadcaster.IsFanout() && !subscriber.IsMultiNode() {
28 all except 4737922678.1 and 4737922678.2 ✔
232
                                r.log.Warnf("Using a non-distributed broadcaster without a pub/sub enabled; each broadcasted message is only processed by a single node")
5 only 4737922678.7, 4737922678.3, and 4737922678.10 ✔
233
                        }
5 only 4737922678.7, 4737922678.3, and 4737922678.10 ✔
234

235
                        err = broadcaster.Start(r.errChan)
23 all except 4737922678.1 and 4737922678.2 ✔
236
                        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
237
                                return errorx.Decorate(err, "!!! Broadcaster failed !!!")
×
UNCOV
238
                        }
×
239

240
                        r.shutdownables = append(r.shutdownables, broadcaster)
23 all except 4737922678.1 and 4737922678.2 ✔
241
                }
242
        }
243

244
        err = controller.Start()
23 all except 4737922678.1 and 4737922678.2 ✔
245
        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
246
                return errorx.Decorate(err, "!!! RPC failed !!!")
×
UNCOV
247
        }
×
248

249
        wsServer, err := server.ForPort(strconv.Itoa(r.config.Port))
23 all except 4737922678.1 and 4737922678.2 ✔
250
        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
251
                return errorx.Decorate(err, "!!! Failed to initialize WebSocket server at %s:%d !!!", r.config.Host, r.config.Port)
×
UNCOV
252
        }
×
253

254
        wsHandler, err := r.websocketHandlerFactory(appNode, r.config)
23 all except 4737922678.1 and 4737922678.2 ✔
255
        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
256
                return errorx.Decorate(err, "!!! Failed to initialize WebSocket handler !!!")
×
UNCOV
257
        }
×
258

259
        for _, path := range r.config.Path {
46 all except 4737922678.1 and 4737922678.2 ✔
260
                wsServer.SetupHandler(path, wsHandler)
23 all except 4737922678.1 and 4737922678.2 ✔
261
                r.log.Infof("Handle WebSocket connections at %s%s", wsServer.Address(), path)
23 all except 4737922678.1 and 4737922678.2 ✔
262
        }
23 all except 4737922678.1 and 4737922678.2 ✔
263

264
        for path, handlerFactory := range r.websocketEndpoints {
23 all except 4737922678.1 and 4737922678.2 ✔
265
                handler, err := handlerFactory(appNode, r.config)
×
UNCOV
266
                if err != nil {
×
UNCOV
267
                        return errorx.Decorate(err, "!!! Failed to initialize WebSocket handler for %s !!!", path)
×
UNCOV
268
                }
×
UNCOV
269
                wsServer.SetupHandler(path, handler)
×
270
        }
271

272
        wsServer.SetupHandler(r.config.HealthPath, http.HandlerFunc(server.HealthHandler))
23 all except 4737922678.1 and 4737922678.2 ✔
273
        r.log.Infof("Handle health requests at %s%s", wsServer.Address(), r.config.HealthPath)
23 all except 4737922678.1 and 4737922678.2 ✔
274

23 all except 4737922678.1 and 4737922678.2 ✔
275
        go r.startWSServer(wsServer)
23 all except 4737922678.1 and 4737922678.2 ✔
276
        go r.startMetrics(metrics)
23 all except 4737922678.1 and 4737922678.2 ✔
277

23 all except 4737922678.1 and 4737922678.2 ✔
278
        r.shutdownables = append(r.shutdownables, appBroker, appNode)
23 all except 4737922678.1 and 4737922678.2 ✔
279

23 all except 4737922678.1 and 4737922678.2 ✔
280
        r.announceGoPools()
23 all except 4737922678.1 and 4737922678.2 ✔
281
        r.setupSignalHandlers()
23 all except 4737922678.1 and 4737922678.2 ✔
282

23 all except 4737922678.1 and 4737922678.2 ✔
283
        // Wait for an error (or none)
23 all except 4737922678.1 and 4737922678.2 ✔
284
        return <-r.errChan
23 all except 4737922678.1 and 4737922678.2 ✔
285
}
286

287
func (r *Runner) setMaxProcs() int {
23 all except 4737922678.1 and 4737922678.2 ✔
288
        // See https://github.com/uber-go/automaxprocs/issues/18
23 all except 4737922678.1 and 4737922678.2 ✔
289
        nopLog := func(string, ...interface{}) {}
46 all except 4737922678.1 and 4737922678.2 ✔
290
        maxprocs.Set(maxprocs.Logger(nopLog)) // nolint:errcheck
23 all except 4737922678.1 and 4737922678.2 ✔
291

23 all except 4737922678.1 and 4737922678.2 ✔
292
        return runtime.GOMAXPROCS(0)
23 all except 4737922678.1 and 4737922678.2 ✔
293
}
294

295
func (r *Runner) announceDebugMode() {
23 all except 4737922678.1 and 4737922678.2 ✔
296
        if r.config.Debug {
46 all except 4737922678.1 and 4737922678.2 ✔
297
                r.log.Debug("🔧 🔧 🔧 Debug mode is on 🔧 🔧 🔧")
23 all except 4737922678.1 and 4737922678.2 ✔
298
        }
23 all except 4737922678.1 and 4737922678.2 ✔
299
}
300

301
func (r *Runner) initMetrics(c *metricspkg.Config) (*metricspkg.Metrics, error) {
23 all except 4737922678.1 and 4737922678.2 ✔
302
        m, err := metricspkg.NewFromConfig(c)
23 all except 4737922678.1 and 4737922678.2 ✔
303

23 all except 4737922678.1 and 4737922678.2 ✔
304
        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
305
                return nil, err
×
306
        }
×
307

308
        if c.Statsd.Enabled() {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
309
                sw := metricspkg.NewStatsdWriter(c.Statsd, c.Tags)
×
UNCOV
310
                m.RegisterWriter(sw)
×
UNCOV
311
        }
×
312

313
        return m, nil
23 all except 4737922678.1 and 4737922678.2 ✔
314
}
315

316
func (r *Runner) newController(metrics *metricspkg.Metrics) (node.Controller, error) {
23 all except 4737922678.1 and 4737922678.2 ✔
317
        controller, err := r.controllerFactory(metrics, r.config)
23 all except 4737922678.1 and 4737922678.2 ✔
318
        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
319
                return nil, errorx.Decorate(err, "!!! Failed to initialize controller !!!")
×
UNCOV
320
        }
×
321

322
        if r.config.JWT.Enabled() {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
323
                identifier := identity.NewJWTIdentifier(&r.config.JWT)
×
UNCOV
324
                controller = identity.NewIdentifiableController(controller, identifier)
×
325
                r.log.Infof("JWT identification is enabled (param: %s, enforced: %v)", r.config.JWT.Param, r.config.JWT.Force)
×
326
        }
×
327

328
        if !r.Router().Empty() {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
329
                r.Router().SetDefault(controller)
×
UNCOV
330
                controller = r.Router()
×
UNCOV
331
                r.log.Infof("Using channels router: %s", strings.Join(r.Router().Routes(), ", "))
×
UNCOV
332
        }
×
333

334
        return controller, nil
23 all except 4737922678.1 and 4737922678.2 ✔
335
}
336

337
func (r *Runner) startWSServer(wsServer *server.HTTPServer) {
23 all except 4737922678.1 and 4737922678.2 ✔
338
        go func() {
46 all except 4737922678.1 and 4737922678.2 ✔
339
                err := wsServer.StartAndAnnounce("WebSocket server")
23 all except 4737922678.1 and 4737922678.2 ✔
340
                if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
341
                        if !wsServer.Stopped() {
×
UNCOV
342
                                r.errChan <- fmt.Errorf("WebSocket server at %s stopped: %v", wsServer.Address(), err)
×
343
                        }
×
344
                }
345
        }()
346
}
347

348
func (r *Runner) startMetrics(metrics *metricspkg.Metrics) {
23 all except 4737922678.1 and 4737922678.2 ✔
349
        err := metrics.Run()
23 all except 4737922678.1 and 4737922678.2 ✔
350
        if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
351
                r.errChan <- fmt.Errorf("!!! Metrics module failed to start !!!\n%v", err)
×
UNCOV
352
        }
×
353
}
354

355
func (r *Runner) defaultDisconnector(n *node.Node, c *config.Config) (node.Disconnector, error) {
23 all except 4737922678.1 and 4737922678.2 ✔
356
        if c.DisconnectorDisabled {
24 all except 4737922678.1 and 4737922678.2 ✔
357
                return node.NewNoopDisconnector(), nil
1 only 4737922678.3 ✔
358
        }
1 only 4737922678.3 ✔
359
        return node.NewDisconnectQueue(n, &c.DisconnectQueue), nil
22 all except 4737922678.1 and 4737922678.2 ✔
360
}
361

362
func (r *Runner) defaultWebSocketHandler(n *node.Node, c *config.Config) (http.Handler, error) {
23 all except 4737922678.1 and 4737922678.2 ✔
363
        extractor := ws.DefaultHeadersExtractor{Headers: c.Headers, Cookies: c.Cookies}
23 all except 4737922678.1 and 4737922678.2 ✔
364
        return ws.WebsocketHandler(common.ActionCableProtocols(), &extractor, &c.WS, func(wsc *websocket.Conn, info *ws.RequestInfo, callback func()) error {
2,347 all except 4737922678.1 and 4737922678.2 ✔
365
                wrappedConn := ws.NewConnection(wsc)
2,324 all except 4737922678.1 and 4737922678.2 ✔
366
                session := node.NewSession(n, wrappedConn, info.URL, info.Headers, info.UID)
2,324 all except 4737922678.1 and 4737922678.2 ✔
367

2,324 all except 4737922678.1 and 4737922678.2 ✔
368
                _, err := n.Authenticate(session)
2,324 all except 4737922678.1 and 4737922678.2 ✔
369

2,324 all except 4737922678.1 and 4737922678.2 ✔
370
                if err != nil {
2,324 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
371
                        return err
×
UNCOV
372
                }
×
373

374
                return session.Serve(callback)
2,324 all except 4737922678.1 and 4737922678.2 ✔
375
        }), nil
376
}
377

378
func (r *Runner) initMRuby() string {
23 all except 4737922678.1 and 4737922678.2 ✔
379
        if mrb.Supported() {
46 all except 4737922678.1 and 4737922678.2 ✔
380
                var mrbv string
23 all except 4737922678.1 and 4737922678.2 ✔
381
                mrbv, err := mrb.Version()
23 all except 4737922678.1 and 4737922678.2 ✔
382
                if err != nil {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
383
                        log.Errorf("mruby failed to initialize: %v", err)
×
384
                } else {
23 all except 4737922678.1 and 4737922678.2 ✔
385
                        return " (with " + mrbv + ")"
23 all except 4737922678.1 and 4737922678.2 ✔
386
                }
23 all except 4737922678.1 and 4737922678.2 ✔
387
        }
388

UNCOV
389
        return ""
×
390
}
391

392
func (r *Runner) Router() *router.RouterController {
23 all except 4737922678.1 and 4737922678.2 ✔
393
        if r.router == nil {
46 all except 4737922678.1 and 4737922678.2 ✔
394
                r.SetRouter(r.defaultRouter())
23 all except 4737922678.1 and 4737922678.2 ✔
395
        }
23 all except 4737922678.1 and 4737922678.2 ✔
396

397
        return r.router
23 all except 4737922678.1 and 4737922678.2 ✔
398
}
399

400
func (r *Runner) SetRouter(router *router.RouterController) {
23 all except 4737922678.1 and 4737922678.2 ✔
401
        r.router = router
23 all except 4737922678.1 and 4737922678.2 ✔
402
}
23 all except 4737922678.1 and 4737922678.2 ✔
403

404
func (r *Runner) defaultRouter() *router.RouterController {
23 all except 4737922678.1 and 4737922678.2 ✔
405
        router := router.NewRouterController(nil)
23 all except 4737922678.1 and 4737922678.2 ✔
406

23 all except 4737922678.1 and 4737922678.2 ✔
407
        if r.config.Rails.TurboRailsKey != "" {
23 all except 4737922678.1 and 4737922678.2 ✔
UNCOV
408
                turboController := rails.NewTurboController(r.config.Rails.TurboRailsKey)
×
UNCOV
409
                router.Route("Turbo::StreamsChannel", turboController) // nolint:errcheck
×
UNCOV
410
        }
×
411

412
        if r.config.Rails.CableReadyKey != "" {
23 all except 4737922678.1 and 4737922678.2 ✔
413
                crController := rails.NewCableReadyController(r.config.Rails.CableReadyKey)
×
UNCOV
414
                router.Route("CableReady::Stream", crController) // nolint:errcheck
×
UNCOV
415
        }
×
416

417
        return router
23 all except 4737922678.1 and 4737922678.2 ✔
418
}
419

420
func (r *Runner) announceGoPools() {
23 all except 4737922678.1 and 4737922678.2 ✔
421
        configs := make([]string, 0)
23 all except 4737922678.1 and 4737922678.2 ✔
422
        pools := utils.AllPools()
23 all except 4737922678.1 and 4737922678.2 ✔
423

23 all except 4737922678.1 and 4737922678.2 ✔
424
        for _, pool := range pools {
46 all except 4737922678.1 and 4737922678.2 ✔
425
                configs = append(configs, fmt.Sprintf("%s: %d", pool.Name(), pool.Size()))
23 all except 4737922678.1 and 4737922678.2 ✔
426
        }
23 all except 4737922678.1 and 4737922678.2 ✔
427

428
        log.WithField("context", "main").Debugf("Go pools initialized (%s)", strings.Join(configs, ", "))
23 all except 4737922678.1 and 4737922678.2 ✔
429
}
430

431
func (r *Runner) setupSignalHandlers() {
23 all except 4737922678.1 and 4737922678.2 ✔
432
        t := tebata.New(syscall.SIGINT, syscall.SIGTERM)
23 all except 4737922678.1 and 4737922678.2 ✔
433

23 all except 4737922678.1 and 4737922678.2 ✔
434
        t.Reserve(func() { // nolint:errcheck
46 all except 4737922678.1 and 4737922678.2 ✔
435
                log.Infof("Shutting down... (hit Ctrl-C to stop immediately)")
23 all except 4737922678.1 and 4737922678.2 ✔
436
                go func() {
46 all except 4737922678.1 and 4737922678.2 ✔
437
                        termSig := make(chan os.Signal, 1)
23 all except 4737922678.1 and 4737922678.2 ✔
438
                        signal.Notify(termSig, syscall.SIGINT, syscall.SIGTERM)
23 all except 4737922678.1 and 4737922678.2 ✔
439
                        <-termSig
23 all except 4737922678.1 and 4737922678.2 ✔
440
                        log.Warnf("Immediate termination requested. Stopped")
23 all except 4737922678.1 and 4737922678.2 ✔
441
                        r.errChan <- nil
23 all except 4737922678.1 and 4737922678.2 ✔
442
                }()
23 all except 4737922678.1 and 4737922678.2 ✔
443
        })
444

445
        for _, shutdownable := range r.shutdownables {
145 all except 4737922678.1 and 4737922678.2 ✔
446
                t.Reserve(shutdownable.Shutdown) // nolint:errcheck
122 all except 4737922678.1 and 4737922678.2 ✔
447
        }
122 all except 4737922678.1 and 4737922678.2 ✔
448

449
        t.Reserve(func() { r.errChan <- nil }) // nolint:errcheck
46 all except 4737922678.1 and 4737922678.2 ✔
450
}
451

452
func (r *Runner) embedNATS(c *enats.Config) (*enats.Service, error) {
7 only 4737922678.3, 4737922678.9, and 4737922678.11 ✔
453
        service := enats.NewService(c)
7 only 4737922678.3, 4737922678.9, and 4737922678.11 ✔
454

7 only 4737922678.3, 4737922678.9, and 4737922678.11 ✔
455
        err := service.Start()
7 only 4737922678.3, 4737922678.9, and 4737922678.11 ✔
456

7 only 4737922678.3, 4737922678.9, and 4737922678.11 ✔
457
        if err != nil {
7 only 4737922678.3, 4737922678.9, and 4737922678.11 ✔
UNCOV
458
                return nil, err
×
UNCOV
459
        }
×
460

461
        return service, nil
7 only 4737922678.3, 4737922678.9, and 4737922678.11 ✔
462
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc