• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

welovemedia / ffmate / 18175958211

01 Oct 2025 09:19PM UTC coverage: 64.344% (-0.6%) from 64.927%
18175958211

push

github

YoSev
fix: make static handler in goyave enforce unix paths to serve the ui

2207 of 3430 relevant lines covered (64.34%)

13.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

19.84
/internal/service/websocket/websocket.go
1
package websocket
2

3
import (
4
        "bytes"
5
        "encoding/base64"
6
        "encoding/json"
7
        "fmt"
8
        "io"
9
        "sync"
10
        "time"
11

12
        "github.com/andybalholm/brotli"
13
        "github.com/lib/pq"
14
        "github.com/welovemedia/ffmate/v2/internal/cfg"
15
        "github.com/welovemedia/ffmate/v2/internal/debug"
16
        "github.com/welovemedia/ffmate/v2/internal/metrics"
17
        "github.com/welovemedia/ffmate/v2/internal/service"
18
        "gorm.io/gorm"
19
        "goyave.dev/goyave/v5/websocket"
20
)
21

22
type WebsocketSubject = string
23

24
const (
25
        TaskCreated WebsocketSubject = "task:created"
26
        TaskUpdated WebsocketSubject = "task:updated"
27
        TaskDeleted WebsocketSubject = "task:deleted"
28

29
        PresetCreated WebsocketSubject = "preset:created"
30
        PresetUpdated WebsocketSubject = "preset:updated"
31
        PresetDeleted WebsocketSubject = "preset:deleted"
32

33
        WatchfolderCreated WebsocketSubject = "watchfolder:created"
34
        WatchfolderUpdated WebsocketSubject = "watchfolder:updated"
35
        WatchfolderDeleted WebsocketSubject = "watchfolder:deleted"
36

37
        WebhookCreated WebsocketSubject = "webhook:created"
38
        WebhookUpdated WebsocketSubject = "webhook:updated"
39
        WebhookDeleted WebsocketSubject = "webhook:deleted"
40

41
        WebhookExecutionCreated WebsocketSubject = "webhookExecution:created"
42

43
        SettingsUpdated WebsocketSubject = "settings:updated"
44

45
        ClientUpdated WebsocketSubject = "client:updated"
46

47
        Log WebsocketSubject = "log:created"
48
)
49

50
var (
51
        connections = make(map[string]*websocket.Conn)
52
        mu          = sync.Mutex{}
53
)
54

55
type Service struct {
56
        db *gorm.DB
57
}
58

59
func NewService(db *gorm.DB) *Service {
38✔
60
        s := &Service{
38✔
61
                db: db,
38✔
62
        }
38✔
63

38✔
64
        // process broadcast queue
38✔
65
        go s.processBroadcastQueue()
38✔
66

38✔
67
        return s
38✔
68
}
38✔
69

70
func (s *Service) Add(uuid string, c *websocket.Conn) {
×
71
        mu.Lock()
×
72
        defer mu.Unlock()
×
73
        connections[uuid] = c
×
74
        metrics.Gauge("websocket.connect").Inc()
×
75
}
×
76

77
func (s *Service) Remove(uuid string) {
×
78
        mu.Lock()
×
79
        defer mu.Unlock()
×
80
        delete(connections, uuid)
×
81
        metrics.Gauge("websocket.disconnect").Inc()
×
82
}
×
83

84
/**
85
 * Broadcast
86
 */
87

88
type broadcastMessage struct {
89
        msg     any
90
        subject WebsocketSubject
91
}
92

93
var broadcastQueue = make(chan broadcastMessage, 1000)
94

95
func (s *Service) Broadcast(subject WebsocketSubject, msg any) {
77✔
96
        select {
77✔
97
        case broadcastQueue <- broadcastMessage{msg, subject}:
77✔
98
        default:
×
99
                debug.Websocket.Debug("dropped local broadcast due to blocked channel (full)")
×
100
        }
101

102
        if subject != Log && isCluster {
77✔
103
                select {
×
104
                case notifyQueue <- &ClusterUpdate{Subject: subject, Payload: msg, Client: session}:
×
105
                default:
×
106
                        debug.Websocket.Debug("dropped cluster broadcast due to blocked channel (full)")
×
107
                }
108
        }
109
}
110

111
func (s *Service) processBroadcastQueue() {
38✔
112
        for b := range broadcastQueue {
115✔
113
                s.broadcastLocal(b.subject, b.msg)
77✔
114
        }
77✔
115
}
116

117
func (s *Service) broadcastLocal(subject WebsocketSubject, msg any) {
77✔
118
        mu.Lock()
77✔
119
        defer mu.Unlock()
77✔
120
        for _, c := range connections {
77✔
121
                _ = c.WriteJSON(map[string]any{"subject": subject, "payload": msg})
×
122
                metrics.Gauge("websocket.broadcast").Inc()
×
123
        }
×
124
}
125

126
/**
127
 * Cluster broadcasting
128
 */
129

130
type ClusterUpdate struct {
131
        Subject WebsocketSubject `json:"subject"`
132
        Payload any              `json:"payload"`
133
        Client  string           `json:"client"`
134
}
135

136
var notifyQueue = make(chan *ClusterUpdate, 1000)
137
var isCluster = false
138
var session = ""
139

140
func (s *Service) InitCluster() {
×
141
        session = cfg.GetString("ffmate.session")
×
142
        isCluster = true
×
143
        go s.listenCluster()
×
144
        go s.notifyCluster()
×
145
}
×
146

147
func (s *Service) listenCluster() {
×
148
        listener := pq.NewListener(cfg.GetString("ffmate.database"), 10*time.Second, time.Minute, func(_ pq.ListenerEventType, err error) {
×
149
                if err != nil {
×
150
                        debug.Websocket.Error("listener error:", err)
×
151
                }
×
152
        })
153

154
        // Listen on the same channel used by notifyCluster
155
        if err := listener.Listen("ffmate"); err != nil {
×
156
                debug.Websocket.Error("failed to listen on channel:", err)
×
157
                return
×
158
        }
×
159

160
        debug.Log.Info("cluster listener started")
×
161
        for {
×
162
                select {
×
163
                case n := <-listener.Notify:
×
164
                        if n == nil {
×
165
                                continue
×
166
                        }
167

168
                        compressed, err := base64.StdEncoding.DecodeString(n.Extra)
×
169
                        if err != nil {
×
170
                                fmt.Println("failed to decode notification:", err)
×
171
                                continue
×
172
                        }
173
                        decompressed, err := s.decompressPayload(compressed)
×
174
                        if err != nil {
×
175
                                fmt.Println("failed to decompress notification:", err)
×
176
                                continue
×
177
                        }
178

179
                        var payload ClusterUpdate
×
180
                        if err := json.Unmarshal(decompressed, &payload); err != nil {
×
181
                                fmt.Println("failed to parse notification:", err)
×
182
                                continue
×
183
                        }
184

185
                        if payload.Client != session {
×
186
                                debug.Websocket.Debug("> %s from %s (size: %db)", payload.Subject, payload.Client, len(n.Extra))
×
187

×
188
                                // remove self from external clients
×
189
                                if payload.Subject == string(ClientUpdated) {
×
190
                                        delete(payload.Payload.(map[string]any), "self")
×
191
                                }
×
192

193
                                select {
×
194
                                case broadcastQueue <- broadcastMessage{payload.Payload, payload.Subject}:
×
195
                                default:
×
196
                                        debug.Websocket.Warn("dropped local broadcast due to blocked channel (full)")
×
197
                                }
198
                        }
199

200
                case <-time.After(90 * time.Second):
×
201
                        go listener.Ping() // nolint:errcheck
×
202
                }
203
        }
204
}
205

206
func (s *Service) notifyCluster() {
×
207
        debug.Log.Info("cluster notifier started")
×
208
        go func() {
×
209
                for update := range notifyQueue {
×
210
                        payloadBytes, err := json.Marshal(update)
×
211
                        if err != nil {
×
212
                                debug.Websocket.Error("failed to marshal message:", err)
×
213
                                return
×
214
                        }
×
215

216
                        compressed, err := s.compressPayload(payloadBytes)
×
217
                        if err != nil {
×
218
                                debug.Websocket.Error("failed to compress message:", err)
×
219
                                return
×
220
                        }
×
221

222
                        encoded := base64.StdEncoding.EncodeToString(compressed)
×
223

×
224
                        sql := `SELECT pg_notify('ffmate', ?)`
×
225
                        if err := s.db.Exec(sql, encoded).Error; err != nil {
×
226
                                fmt.Println("failed to send cluster notification:", err)
×
227
                                return
×
228
                        }
×
229

230
                        debug.Websocket.Debug("< %s from %s (size: %db/%d%s)", update.Subject, update.Client, len(encoded), len(compressed)*100/len(payloadBytes), "%")
×
231
                }
232
        }()
233
}
234

235
func (s *Service) compressPayload(data []byte) ([]byte, error) {
×
236
        var buf bytes.Buffer
×
237
        w := brotli.NewWriter(&buf)
×
238
        if _, err := w.Write(data); err != nil {
×
239
                return nil, err
×
240
        }
×
241
        _ = w.Close()
×
242
        return buf.Bytes(), nil
×
243
}
244

245
func (s *Service) decompressPayload(data []byte) ([]byte, error) {
×
246
        r := brotli.NewReader(bytes.NewReader(data))
×
247
        return io.ReadAll(r)
×
248
}
×
249

250
func (s *Service) Name() string {
38✔
251
        return service.Websocket
38✔
252
}
38✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc