• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

topfreegames / pitaya / 15070819794

16 May 2025 01:13PM UTC coverage: 62.699% (+0.5%) from 62.235%
15070819794

Pull #452

github

Felippe Durán
Add e2e test to validate unique session doesn't fail if kick fails
Pull Request #452: Draft: Fix unique session kick failure preventing clients to connect

78 of 83 new or added lines in 4 files covered. (93.98%)

20 existing lines in 3 files now uncovered.

5152 of 8217 relevant lines covered (62.7%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.73
/app.go
1
// Copyright (c) nano Author and TFG Co. All Rights Reserved.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package pitaya
22

23
import (
24
        "context"
25
        "os"
26
        "os/signal"
27
        "reflect"
28
        "strings"
29
        "syscall"
30

31
        "time"
32

33
        "github.com/golang/protobuf/proto"
34
        opentracing "github.com/opentracing/opentracing-go"
35
        "github.com/topfreegames/pitaya/v2/acceptor"
36
        "github.com/topfreegames/pitaya/v2/cluster"
37
        "github.com/topfreegames/pitaya/v2/component"
38
        "github.com/topfreegames/pitaya/v2/config"
39
        "github.com/topfreegames/pitaya/v2/conn/message"
40
        "github.com/topfreegames/pitaya/v2/constants"
41
        pcontext "github.com/topfreegames/pitaya/v2/context"
42
        "github.com/topfreegames/pitaya/v2/docgenerator"
43
        "github.com/topfreegames/pitaya/v2/errors"
44
        "github.com/topfreegames/pitaya/v2/groups"
45
        "github.com/topfreegames/pitaya/v2/interfaces"
46
        "github.com/topfreegames/pitaya/v2/logger"
47
        logging "github.com/topfreegames/pitaya/v2/logger/interfaces"
48
        "github.com/topfreegames/pitaya/v2/metrics"
49
        mods "github.com/topfreegames/pitaya/v2/modules"
50
        "github.com/topfreegames/pitaya/v2/remote"
51
        "github.com/topfreegames/pitaya/v2/router"
52
        "github.com/topfreegames/pitaya/v2/serialize"
53
        "github.com/topfreegames/pitaya/v2/service"
54
        "github.com/topfreegames/pitaya/v2/session"
55
        "github.com/topfreegames/pitaya/v2/timer"
56
        "github.com/topfreegames/pitaya/v2/tracing"
57
        "github.com/topfreegames/pitaya/v2/worker"
58
)
59

60
// ServerMode represents a server mode
61
type ServerMode byte
62

63
const (
64
        _ ServerMode = iota
65
        // Cluster represents a server running with connection to other servers
66
        Cluster
67
        // Standalone represents a server running without connection to other servers
68
        Standalone
69
)
70

71
// Pitaya App interface
72
type Pitaya interface {
73
        GetDieChan() chan bool
74
        SetDebug(debug bool)
75
        SetHeartbeatTime(interval time.Duration)
76
        GetServerID() string
77
        GetMetricsReporters() []metrics.Reporter
78
        GetServer() *cluster.Server
79
        GetServerByID(id string) (*cluster.Server, error)
80
        GetServersByType(t string) (map[string]*cluster.Server, error)
81
        GetServers() []*cluster.Server
82
        GetSessionFromCtx(ctx context.Context) session.Session
83
        Start()
84
        SetDictionary(dict map[string]uint16) error
85
        AddRoute(serverType string, routingFunction router.RoutingFunc) error
86
        Shutdown()
87
        StartWorker()
88
        RegisterRPCJob(rpcJob worker.RPCJob) error
89
        Documentation(getPtrNames bool) (map[string]interface{}, error)
90
        IsRunning() bool
91

92
        RPC(ctx context.Context, routeStr string, reply proto.Message, arg proto.Message) error
93
        RPCTo(ctx context.Context, serverID, routeStr string, reply proto.Message, arg proto.Message) error
94
        ReliableRPC(
95
                routeStr string,
96
                metadata map[string]interface{},
97
                reply, arg proto.Message,
98
        ) (jid string, err error)
99
        ReliableRPCWithOptions(
100
                routeStr string,
101
                metadata map[string]interface{},
102
                reply, arg proto.Message,
103
                opts *config.EnqueueOpts,
104
        ) (jid string, err error)
105

106
        SendPushToUsers(route string, v interface{}, uids []string, frontendType string) ([]string, error)
107
        SendKickToUsers(uids []string, frontendType string) ([]string, error)
108

109
        GroupCreate(ctx context.Context, groupName string) error
110
        GroupCreateWithTTL(ctx context.Context, groupName string, ttlTime time.Duration) error
111
        GroupMembers(ctx context.Context, groupName string) ([]string, error)
112
        GroupBroadcast(ctx context.Context, frontendType, groupName, route string, v interface{}) error
113
        GroupContainsMember(ctx context.Context, groupName, uid string) (bool, error)
114
        GroupAddMember(ctx context.Context, groupName, uid string) error
115
        GroupRemoveMember(ctx context.Context, groupName, uid string) error
116
        GroupRemoveAll(ctx context.Context, groupName string) error
117
        GroupCountMembers(ctx context.Context, groupName string) (int, error)
118
        GroupRenewTTL(ctx context.Context, groupName string) error
119
        GroupDelete(ctx context.Context, groupName string) error
120

121
        Register(c component.Component, options ...component.Option)
122
        RegisterRemote(c component.Component, options ...component.Option)
123

124
        RegisterModule(module interfaces.Module, name string) error
125
        RegisterModuleAfter(module interfaces.Module, name string) error
126
        RegisterModuleBefore(module interfaces.Module, name string) error
127
        GetModule(name string) (interfaces.Module, error)
128

129
        GetNumberOfConnectedClients() int64
130
}
131

132
// App is the base app struct
133
type App struct {
134
        acceptors         []acceptor.Acceptor
135
        config            config.PitayaConfig
136
        debug             bool
137
        dieChan           chan bool
138
        heartbeat         time.Duration
139
        onSessionBind     func(session.Session)
140
        router            *router.Router
141
        rpcClient         cluster.RPCClient
142
        rpcServer         cluster.RPCServer
143
        metricsReporters  []metrics.Reporter
144
        running           bool
145
        serializer        serialize.Serializer
146
        server            *cluster.Server
147
        serverMode        ServerMode
148
        serviceDiscovery  cluster.ServiceDiscovery
149
        startAt           time.Time
150
        worker            *worker.Worker
151
        remoteService     *service.RemoteService
152
        handlerService    *service.HandlerService
153
        handlerComp       []regComp
154
        remoteComp        []regComp
155
        modulesMap        map[string]interfaces.Module
156
        modulesArr        []moduleWrapper
157
        sessionModulesArr []sessionModuleWrapper
158
        groups            groups.GroupService
159
        sessionPool       session.SessionPool
160
}
161

162
// NewApp is the base constructor for a pitaya app instance
163
func NewApp(
164
        serverMode ServerMode,
165
        serializer serialize.Serializer,
166
        acceptors []acceptor.Acceptor,
167
        dieChan chan bool,
168
        router *router.Router,
169
        server *cluster.Server,
170
        rpcClient cluster.RPCClient,
171
        rpcServer cluster.RPCServer,
172
        worker *worker.Worker,
173
        serviceDiscovery cluster.ServiceDiscovery,
174
        remoteService *service.RemoteService,
175
        handlerService *service.HandlerService,
176
        groups groups.GroupService,
177
        sessionPool session.SessionPool,
178
        metricsReporters []metrics.Reporter,
179
        config config.PitayaConfig,
180
) *App {
181
        app := &App{
182
                server:            server,
183
                config:            config,
1✔
184
                rpcClient:         rpcClient,
1✔
185
                rpcServer:         rpcServer,
1✔
186
                worker:            worker,
1✔
187
                serviceDiscovery:  serviceDiscovery,
1✔
188
                remoteService:     remoteService,
1✔
189
                handlerService:    handlerService,
1✔
190
                groups:            groups,
1✔
191
                debug:             false,
1✔
192
                startAt:           time.Now(),
1✔
193
                dieChan:           dieChan,
1✔
194
                acceptors:         acceptors,
1✔
195
                metricsReporters:  metricsReporters,
1✔
196
                serverMode:        serverMode,
1✔
197
                running:           false,
1✔
198
                serializer:        serializer,
1✔
199
                router:            router,
1✔
200
                handlerComp:       make([]regComp, 0),
1✔
201
                remoteComp:        make([]regComp, 0),
1✔
202
                modulesMap:        make(map[string]interfaces.Module),
1✔
203
                modulesArr:        []moduleWrapper{},
1✔
204
                sessionModulesArr: []sessionModuleWrapper{},
1✔
205
                sessionPool:       sessionPool,
1✔
206
        }
1✔
207
        if app.heartbeat == time.Duration(0) {
1✔
208
                app.heartbeat = config.Heartbeat.Interval
1✔
209
        }
1✔
210

1✔
211
        app.initSysRemotes()
1✔
212
        return app
2✔
213
}
1✔
214

1✔
215
// GetDieChan gets the channel that the app sinalizes when its going to die
216
func (app *App) GetDieChan() chan bool {
1✔
217
        return app.dieChan
1✔
218
}
219

220
// SetDebug toggles debug on/off
221
func (app *App) SetDebug(debug bool) {
1✔
222
        app.debug = debug
1✔
223
}
1✔
224

225
// SetHeartbeatTime sets the heartbeat time
226
func (app *App) SetHeartbeatTime(interval time.Duration) {
1✔
227
        app.heartbeat = interval
1✔
228
}
1✔
229

230
// GetServerID returns the generated server id
231
func (app *App) GetServerID() string {
1✔
232
        return app.server.ID
1✔
233
}
1✔
234

235
// GetMetricsReporters gets registered metrics reporters
236
func (app *App) GetMetricsReporters() []metrics.Reporter {
×
237
        return app.metricsReporters
×
238
}
×
239

240
// GetServer gets the local server instance
241
func (app *App) GetServer() *cluster.Server {
1✔
242
        return app.server
1✔
243
}
1✔
244

245
// GetServerByID returns the server with the specified id
246
func (app *App) GetServerByID(id string) (*cluster.Server, error) {
1✔
247
        return app.serviceDiscovery.GetServer(id)
1✔
248
}
1✔
249

250
// GetServersByType get all servers of type
251
func (app *App) GetServersByType(t string) (map[string]*cluster.Server, error) {
1✔
252
        return app.serviceDiscovery.GetServersByType(t)
1✔
253
}
1✔
254

255
// GetServers get all servers
256
func (app *App) GetServers() []*cluster.Server {
1✔
257
        return app.serviceDiscovery.GetServers()
1✔
258
}
1✔
259

260
// IsRunning indicates if the Pitaya app has been initialized. Note: This
261
// doesn't cover acceptors, only the pitaya internal registration and modules
×
262
// initialization.
×
263
func (app *App) IsRunning() bool {
×
264
        return app.running
265
}
266

267
// SetLogger logger setter
268
func SetLogger(l logging.Logger) {
×
269
        logger.Log = l
×
270
}
×
271

272
func (app *App) initSysRemotes() {
273
        sys := remote.NewSys(app.sessionPool)
1✔
274
        app.RegisterRemote(sys,
1✔
275
                component.WithName("sys"),
1✔
276
                component.WithNameFunc(strings.ToLower),
277
        )
1✔
278
}
1✔
279

1✔
280
func (app *App) periodicMetrics() {
1✔
281
        period := app.config.Metrics.Period
1✔
282
        go metrics.ReportSysMetrics(app.metricsReporters, period)
1✔
283

1✔
284
        if app.worker.Started() {
285
                go worker.Report(app.metricsReporters, period)
1✔
286
        }
1✔
287
}
1✔
288

1✔
289
// Start starts the app
1✔
290
func (app *App) Start() {
×
291
        if !app.server.Frontend && len(app.acceptors) > 0 {
×
292
                logger.Log.Fatal("acceptors are not allowed on backend servers")
293
        }
294

295
        if app.server.Frontend && len(app.acceptors) == 0 {
1✔
296
                logger.Log.Fatal("frontend servers should have at least one configured acceptor")
1✔
297
        }
×
298

×
299
        if app.serverMode == Cluster {
300
                if reflect.TypeOf(app.rpcClient) == reflect.TypeOf(&cluster.GRPCClient{}) {
1✔
301
                        app.serviceDiscovery.AddListener(app.rpcClient.(*cluster.GRPCClient))
×
302
                }
×
303

304
                if err := app.RegisterModuleBefore(app.rpcServer, "rpcServer"); err != nil {
1✔
305
                        logger.Log.Fatal("failed to register rpc server module: %s", err.Error())
×
306
                }
×
307
                if err := app.RegisterModuleBefore(app.rpcClient, "rpcClient"); err != nil {
×
308
                        logger.Log.Fatal("failed to register rpc client module: %s", err.Error())
309
                }
×
310
                // set the service discovery as the last module to be started to ensure
×
311
                // all modules have been properly initialized before the server starts
×
312
                // receiving requests from other pitaya servers
×
313
                if err := app.RegisterModuleAfter(app.serviceDiscovery, "serviceDiscovery"); err != nil {
×
314
                        logger.Log.Fatal("failed to register service discovery module: %s", err.Error())
×
315
                }
316
        }
317

318
        app.periodicMetrics()
×
319

×
320
        app.listen()
×
321

322
        defer func() {
323
                timer.GlobalTicker.Stop()
1✔
324
                app.running = false
1✔
325
        }()
1✔
326

1✔
327
        sg := make(chan os.Signal, 1)
2✔
328
        signal.Notify(sg, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
1✔
329

1✔
330
        maxSessionCount := func() int64 {
1✔
331
                count := app.sessionPool.GetSessionCount()
332
                mc := app.maxModuleSessionCount()
1✔
333
                if mc > count {
1✔
334
                        count = mc
1✔
UNCOV
335
                }
×
UNCOV
336
                return count
×
UNCOV
337
        }
×
338

×
339
        // stop server
×
UNCOV
340
        select {
×
341
        case <-app.dieChan:
342
                logger.Log.Warn("the app will shutdown in a few seconds")
343
        case s := <-sg:
344
                logger.Log.Warn("got signal: ", s, ", shutting down...")
1✔
345
                if app.config.Session.Drain.Enabled && s == syscall.SIGTERM {
1✔
346
                        logger.Log.Info("Session drain is enabled, draining all sessions before shutting down")
1✔
347
                        timeoutTimer := time.NewTimer(app.config.Session.Drain.Timeout)
1✔
348
                        app.startModuleSessionDraining()
1✔
349
                loop:
1✔
UNCOV
350
                        for {
×
UNCOV
351
                                if maxSessionCount() == 0 {
×
UNCOV
352
                                        logger.Log.Info("All sessions drained")
×
UNCOV
353
                                        break loop
×
UNCOV
354
                                }
×
UNCOV
355
                                select {
×
UNCOV
356
                                case s := <-sg:
×
UNCOV
357
                                        logger.Log.Warn("got signal: ", s)
×
358
                                        if s == syscall.SIGINT {
359
                                                logger.Log.Warnf("Bypassing session draing due to SIGINT. %d sessions will be immediately terminated", maxSessionCount())
×
360
                                        }
×
361
                                        break loop
×
362
                                case <-timeoutTimer.C:
×
363
                                        logger.Log.Warnf("Session drain has reached maximum timeout. %d sessions will be immediately terminated", maxSessionCount())
×
364
                                        break loop
×
365
                                case <-time.After(app.config.Session.Drain.Period):
×
366
                                        logger.Log.Infof("Waiting for all sessions to finish: %d sessions remaining...", maxSessionCount())
×
367
                                }
×
368
                        }
×
369
                }
×
370
                close(app.dieChan)
×
371
        }
372

373
        logger.Log.Warn("server is stopping...")
374

375
        app.sessionPool.CloseAll()
376
        app.shutdownModules()
1✔
377
        app.shutdownComponents()
1✔
378
}
1✔
379

1✔
380
func (app *App) listen() {
1✔
381
        app.startupComponents()
1✔
382
        // create global ticker instance, timer precision could be customized
1✔
383
        // by SetTimerPrecision
1✔
384
        timer.GlobalTicker = time.NewTicker(timer.Precision)
1✔
385

386
        logger.Log.Infof("starting server %s:%s", app.server.Type, app.server.ID)
387
        for i := 0; i < app.config.Concurrency.Handler.Dispatch; i++ {
1✔
388
                go app.handlerService.Dispatch(i)
1✔
389
        }
1✔
390
        for _, acc := range app.acceptors {
1✔
391
                a := acc
1✔
392
                go func() {
1✔
393
                        for conn := range a.GetConnChan() {
1✔
394
                                go app.handlerService.Handle(conn)
2✔
395
                        }
1✔
396
                }()
1✔
397
                if app.config.Acceptor.ProxyProtocol {
2✔
398
                        logger.Log.Info("Enabling PROXY protocol for inbound connections")
1✔
399
                        a.EnableProxyProtocol()
2✔
400
                } else {
2✔
401
                        logger.Log.Debug("PROXY protocol is disabled for inbound connections")
1✔
402
                }
1✔
403
                go func() {
404
                        a.ListenAndServe()
1✔
405
                }()
×
406
                logger.Log.Infof("Waiting for Acceptor %s to start on addr %s", reflect.TypeOf(a), a.GetConfiguredAddress())
×
407

1✔
408
                for !a.IsRunning() {
1✔
409
                }
1✔
410

2✔
411
                logger.Log.Infof("Acceptor %s on addr %s is now accepting connections", reflect.TypeOf(a), a.GetAddr())
1✔
412
        }
1✔
413

1✔
414
        if app.serverMode == Cluster && app.server.Frontend && app.config.Session.Unique {
1✔
415
                unique := mods.NewUniqueSession(app.server, app.rpcServer, app.rpcClient, app.sessionPool)
2✔
416
                app.remoteService.AddRemoteBindingListener(unique)
1✔
417
                app.RegisterModule(unique, "uniqueSession")
418
        }
1✔
419

420
        app.startModules()
421

1✔
422
        logger.Log.Info("all modules started!")
×
423

×
424
        app.running = true
×
425
}
×
426

427
// SetDictionary sets routes map
1✔
428
func (app *App) SetDictionary(dict map[string]uint16) error {
1✔
429
        if app.running {
1✔
430
                return constants.ErrChangeDictionaryWhileRunning
1✔
431
        }
1✔
432
        return message.SetDictionary(dict)
433
}
434

435
// AddRoute adds a routing function to a server type
1✔
436
func (app *App) AddRoute(
2✔
437
        serverType string,
1✔
438
        routingFunction router.RoutingFunc,
1✔
439
) error {
1✔
440
        if app.router != nil {
441
                if app.running {
442
                        return constants.ErrChangeRouteWhileRunning
443
                }
444
                app.router.AddRoute(serverType, routingFunction)
445
        } else {
446
                return constants.ErrRouterNotInitialized
1✔
447
        }
2✔
448
        return nil
2✔
449
}
1✔
450

1✔
451
// Shutdown send a signal to let 'pitaya' shutdown itself.
1✔
452
func (app *App) Shutdown() {
1✔
453
        select {
1✔
454
        case <-app.dieChan: // prevent closing closed channel
1✔
455
        default:
1✔
456
                close(app.dieChan)
457
        }
458
}
459

1✔
460
// Error creates a new error with a code, message and metadata
1✔
461
func Error(err error, code string, metadata ...map[string]string) *errors.Error {
1✔
462
        return errors.NewError(err, code, metadata...)
1✔
463
}
1✔
464

465
// GetSessionFromCtx retrieves a session from a given context
466
func (app *App) GetSessionFromCtx(ctx context.Context) session.Session {
467
        sessionVal := ctx.Value(constants.SessionCtxKey)
468
        if sessionVal == nil {
1✔
469
                logger.Log.Debug("ctx doesn't contain a session, are you calling GetSessionFromCtx from inside a remote?")
1✔
470
                return nil
1✔
471
        }
472
        return sessionVal.(session.Session)
473
}
1✔
474

1✔
475
// GetDefaultLoggerFromCtx returns the default logger from the given context
1✔
476
func GetDefaultLoggerFromCtx(ctx context.Context) logging.Logger {
×
477
        l := ctx.Value(constants.LoggerCtxKey)
×
478
        if l == nil {
×
479
                return logger.Log
1✔
480
        }
481

482
        return l.(logging.Logger)
483
}
×
484

×
485
// AddMetricTagsToPropagateCtx adds a key and metric tags that will
×
486
// be propagated through RPC calls. Use the same tags that are at
×
487
// 'pitaya.metrics.additionalLabels' config
×
488
func AddMetricTagsToPropagateCtx(
489
        ctx context.Context,
×
490
        tags map[string]string,
491
) context.Context {
492
        return pcontext.AddToPropagateCtx(ctx, constants.MetricTagsKey, tags)
493
}
494

495
// AddToPropagateCtx adds a key and value that will be propagated through RPC calls
496
func AddToPropagateCtx(ctx context.Context, key string, val interface{}) context.Context {
497
        return pcontext.AddToPropagateCtx(ctx, key, val)
498
}
1✔
499

1✔
500
// GetFromPropagateCtx adds a key and value that came through RPC calls
1✔
501
func GetFromPropagateCtx(ctx context.Context, key string) interface{} {
502
        return pcontext.GetFromPropagateCtx(ctx, key)
503
}
1✔
504

1✔
505
// ExtractSpan retrieves an opentracing span context from the given context
1✔
506
// The span context can be received directly or via an RPC call
507
func ExtractSpan(ctx context.Context) (opentracing.SpanContext, error) {
508
        return tracing.ExtractSpan(ctx)
1✔
509
}
1✔
510

1✔
511
// Documentation returns handler and remotes documentacion
512
func (app *App) Documentation(getPtrNames bool) (map[string]interface{}, error) {
513
        handlerDocs, err := app.handlerService.Docs(getPtrNames)
514
        if err != nil {
1✔
515
                return nil, err
1✔
516
        }
1✔
517
        remoteDocs, err := app.remoteService.Docs(getPtrNames)
518
        if err != nil {
519
                return nil, err
1✔
520
        }
1✔
521
        return map[string]interface{}{
1✔
522
                "handlers": handlerDocs,
×
523
                "remotes":  remoteDocs,
×
524
        }, nil
1✔
525
}
1✔
526

×
527
// AddGRPCInfoToMetadata adds host, external host and
×
528
// port into metadata
1✔
529
func AddGRPCInfoToMetadata(
1✔
530
        metadata map[string]string,
1✔
531
        region string,
1✔
532
        host, port string,
533
        externalHost, externalPort string,
534
) map[string]string {
535
        metadata[constants.GRPCHostKey] = host
536
        metadata[constants.GRPCPortKey] = port
537
        metadata[constants.GRPCExternalHostKey] = externalHost
538
        metadata[constants.GRPCExternalPortKey] = externalPort
539
        metadata[constants.RegionKey] = region
540
        return metadata
541
}
1✔
542

1✔
543
// Descriptor returns the protobuf message descriptor for a given message name
1✔
544
func Descriptor(protoName string) ([]byte, error) {
1✔
545
        return docgenerator.ProtoDescriptors(protoName)
1✔
546
}
1✔
547

1✔
548
// StartWorker configures, starts and returns pitaya worker
1✔
549
func (app *App) StartWorker() {
550
        app.worker.Start()
551
}
1✔
552

1✔
553
// RegisterRPCJob registers rpc job to execute jobs with retries
1✔
554
func (app *App) RegisterRPCJob(rpcJob worker.RPCJob) error {
555
        err := app.worker.RegisterRPCJob(rpcJob)
556
        return err
1✔
557
}
1✔
558

1✔
559
// GetNumberOfConnectedClients returns the number of connected clients
560
func (app *App) GetNumberOfConnectedClients() int64 {
561
        return app.sessionPool.GetSessionCount()
1✔
562
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc