• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

didil / inhooks / 4981123626

15 May 2023 01:44PM UTC coverage: 74.435% (-0.7%) from 75.116%
4981123626

Pull #33

github

didil
test: add ingest no flow test
Pull Request #33: Move stuck processing messages to ready periodically

134 of 134 new or added lines in 6 files covered. (100.0%)

725 of 974 relevant lines covered (74.44%)

2.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

26.92
/pkg/supervisor/processing.go
1
package supervisor
2

3
import (
4
        "context"
5
        "time"
6

7
        "github.com/didil/inhooks/pkg/models"
8
        "go.uber.org/zap"
9
)
10

11
// move stuck messages from processing to ready queue periodically
12
func (s *Supervisor) HandleProcessingQueue(ctx context.Context, f *models.Flow, sink *models.Sink) {
×
13
        logger := s.logger.With(zap.String("flowID", f.ID), zap.String("sinkID", sink.ID))
×
14
        for {
×
15
                movedMessageIds, err := s.MoveProcessingToReady(ctx, f, sink)
×
16
                if err != nil {
×
17
                        logger.Error("failed to move processing to ready", zap.Error(err))
×
18
                }
×
19
                if len(movedMessageIds) > 0 {
×
20
                        logger.Info("moved stuck messages from processing to ready", zap.Strings("messageIDs", movedMessageIds))
×
21
                }
×
22

23
                // wait before next check
24
                timer := time.NewTimer(s.appConf.Supervisor.ProcessingRecoveryInterval)
×
25

×
26
                select {
×
27
                case <-s.ctx.Done():
×
28
                        return
×
29
                case <-timer.C:
×
30
                        continue
×
31
                }
32
        }
33
}
34

35
func (s *Supervisor) MoveProcessingToReady(ctx context.Context, f *models.Flow, sink *models.Sink) ([]string, error) {
1✔
36
        // cache keys for twice the processing recovery interval
1✔
37
        // this avoids the recovery process from interfering with legitimate retry attempts
1✔
38
        ttl := 2 * s.appConf.Supervisor.ProcessingRecoveryInterval
1✔
39
        movedMessageIds, err := s.processingRecoverySvc.MoveProcessingToReady(ctx, f, sink, ttl)
1✔
40
        if err != nil {
1✔
41
                return nil, err
×
42
        }
×
43

44
        return movedMessageIds, nil
1✔
45
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc