• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supabase-swift / 16195026786

10 Jul 2025 12:24PM UTC coverage: 76.606% (-0.04%) from 76.649%
16195026786

Pull #745

github

web-flow
Merge 4da9d184a into 227c559ed
Pull Request #745: fix: drop Swift 5.9

5308 of 6929 relevant lines covered (76.61%)

21.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.4
/Sources/Realtime/RealtimeChannelV2.swift
1
import ConcurrencyExtras
2
import Foundation
3
import HTTPTypes
4
import IssueReporting
5

6
#if canImport(FoundationNetworking)
7
  import FoundationNetworking
8

9
  extension HTTPURLResponse {
10
    convenience init() {
11
      self.init(
12
        url: URL(string: "http://127.0.0.1")!,
13
        statusCode: 200,
14
        httpVersion: nil,
15
        headerFields: nil
16
      )!
17
    }
18
  }
19
#endif
20

21
public struct RealtimeChannelConfig: Sendable {
22
  public var broadcast: BroadcastJoinConfig
23
  public var presence: PresenceJoinConfig
24
  public var isPrivate: Bool
25
}
26

27
public final class RealtimeChannelV2: Sendable {
28
  struct MutableState {
29
    var clientChanges: [PostgresJoinConfig] = []
8✔
30
    var joinRef: String?
31
    var pushes: [String: PushV2] = [:]
8✔
32
  }
33

34
  @MainActor
35
  private var mutableState = MutableState()
8✔
36

37
  let topic: String
38

39
  @MainActor var config: RealtimeChannelConfig
40

41
  let logger: (any SupabaseLogger)?
42
  let socket: RealtimeClientV2
43

44
  @MainActor var joinRef: String? { mutableState.joinRef }
5✔
45

46
  let callbackManager = CallbackManager()
8✔
47
  private let statusSubject = AsyncValueSubject<RealtimeChannelStatus>(.unsubscribed)
8✔
48

49
  public private(set) var status: RealtimeChannelStatus {
50
    get { statusSubject.value }
13✔
51
    set { statusSubject.yield(newValue) }
7✔
52
  }
53

54
  public var statusChange: AsyncStream<RealtimeChannelStatus> {
4✔
55
    statusSubject.values
4✔
56
  }
4✔
57

58
  /// Listen for connection status changes.
59
  /// - Parameter listener: Closure that will be called when connection status changes.
60
  /// - Returns: An observation handle that can be used to stop listening.
61
  ///
62
  /// - Note: Use ``statusChange`` if you prefer to use Async/Await.
63
  public func onStatusChange(
64
    _ listener: @escaping @Sendable (RealtimeChannelStatus) -> Void
65
  ) -> RealtimeSubscription {
1✔
66
    let task = statusSubject.onChange { listener($0) }
3✔
67
    return RealtimeSubscription { task.cancel() }
1✔
68
  }
1✔
69

70
  init(
71
    topic: String,
72
    config: RealtimeChannelConfig,
73
    socket: RealtimeClientV2,
74
    logger: (any SupabaseLogger)?
75
  ) {
8✔
76
    self.topic = topic
8✔
77
    self.config = config
8✔
78
    self.logger = logger
8✔
79
    self.socket = socket
8✔
80
  }
8✔
81

82
  deinit {
2✔
83
    callbackManager.reset()
2✔
84
  }
2✔
85

86
  /// Subscribes to the channel
87
  @MainActor
88
  public func subscribe() async {
4✔
89
    if socket.status != .connected {
4✔
90
      if socket.options.connectOnSubscribe != true {
×
91
        reportIssue(
×
92
          "You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?"
×
93
        )
×
94
        return
×
95
      }
×
96
      await socket.connect()
×
97
    }
4✔
98

4✔
99
    status = .subscribing
4✔
100
    logger?.debug("Subscribing to channel \(topic)")
4✔
101

4✔
102
    config.presence.enabled = callbackManager.callbacks.contains(where: { $0.isPresence })
4✔
103

4✔
104
    let joinConfig = RealtimeJoinConfig(
4✔
105
      broadcast: config.broadcast,
4✔
106
      presence: config.presence,
4✔
107
      postgresChanges: mutableState.clientChanges,
4✔
108
      isPrivate: config.isPrivate
4✔
109
    )
4✔
110

4✔
111
    let payload = RealtimeJoinPayload(
4✔
112
      config: joinConfig,
4✔
113
      accessToken: await socket._getAccessToken(),
4✔
114
      version: socket.options.headers[.xClientInfo]
4✔
115
    )
4✔
116

4✔
117
    let joinRef = socket.makeRef()
4✔
118
    mutableState.joinRef = joinRef
4✔
119

4✔
120
    logger?.debug("Subscribing to channel with body: \(joinConfig)")
4✔
121

4✔
122
    await push(
4✔
123
      ChannelEvent.join,
4✔
124
      ref: joinRef,
4✔
125
      payload: try! JSONObject(payload)
4✔
126
    )
4✔
127

4✔
128
    do {
4✔
129
      try await withTimeout(interval: socket.options.timeoutInterval) { [self] in
4✔
130
        _ = await statusChange.first { @Sendable in $0 == .subscribed }
7✔
131
      }
4✔
132
    } catch {
3✔
133
      if error is TimeoutError {
1✔
134
        logger?.debug("Subscribe timed out.")
1✔
135
        await subscribe()
1✔
136
      } else {
1✔
137
        logger?.error("Subscribe failed: \(error)")
×
138
      }
×
139
    }
4✔
140
  }
4✔
141

142
  public func unsubscribe() async {
1✔
143
    status = .unsubscribing
1✔
144
    logger?.debug("Unsubscribing from channel \(topic)")
1✔
145

1✔
146
    await push(ChannelEvent.leave)
1✔
147
  }
1✔
148

149
  @available(
150
    *,
151
    deprecated,
152
    message:
153
      "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
154
  )
155
  public func updateAuth(jwt: String?) async {
×
156
    logger?.debug("Updating auth token for channel \(topic)")
×
157
    await push(
×
158
      ChannelEvent.accessToken,
×
159
      payload: ["access_token": jwt.map { .string($0) } ?? .null]
×
160
    )
×
161
  }
×
162

163
  /// Send a broadcast message with `event` and a `Codable` payload.
164
  /// - Parameters:
165
  ///   - event: Broadcast message event.
166
  ///   - message: Message payload.
167
  public func broadcast(event: String, message: some Codable) async throws {
1✔
168
    try await broadcast(event: event, message: JSONObject(message))
1✔
169
  }
1✔
170

171
  /// Send a broadcast message with `event` and a raw `JSON` payload.
172
  /// - Parameters:
173
  ///   - event: Broadcast message event.
174
  ///   - message: Message payload.
175
  @MainActor
176
  public func broadcast(event: String, message: JSONObject) async {
1✔
177
    if status != .subscribed {
1✔
178
      struct Message: Encodable {
1✔
179
        let topic: String
1✔
180
        let event: String
1✔
181
        let payload: JSONObject
1✔
182
        let `private`: Bool
1✔
183
      }
1✔
184

1✔
185
      var headers: HTTPFields = [.contentType: "application/json"]
1✔
186
      if let apiKey = socket.options.apikey {
1✔
187
        headers[.apiKey] = apiKey
1✔
188
      }
1✔
189
      if let accessToken = await socket._getAccessToken() {
1✔
190
        headers[.authorization] = "Bearer \(accessToken)"
1✔
191
      }
1✔
192

1✔
193
      let task = Task { [headers] in
1✔
194
        _ = try? await socket.http.send(
1✔
195
          HTTPRequest(
1✔
196
            url: socket.broadcastURL,
1✔
197
            method: .post,
1✔
198
            headers: headers,
1✔
199
            body: JSONEncoder().encode(
1✔
200
              [
1✔
201
                "messages": [
1✔
202
                  Message(
1✔
203
                    topic: topic,
1✔
204
                    event: event,
1✔
205
                    payload: message,
1✔
206
                    private: config.isPrivate
1✔
207
                  )
1✔
208
                ]
1✔
209
              ]
1✔
210
            )
1✔
211
          )
1✔
212
        )
1✔
213
      }
1✔
214

1✔
215
      if config.broadcast.acknowledgeBroadcasts {
1✔
216
        try? await withTimeout(interval: socket.options.timeoutInterval) {
1✔
217
          await task.value
1✔
218
        }
1✔
219
      }
1✔
220
    } else {
1✔
221
      await push(
×
222
        ChannelEvent.broadcast,
×
223
        payload: [
×
224
          "type": "broadcast",
×
225
          "event": .string(event),
×
226
          "payload": .object(message),
×
227
        ]
×
228
      )
×
229
    }
×
230
  }
1✔
231

232
  /// Tracks the given state in the channel.
233
  /// - Parameter state: The state to be tracked, conforming to `Codable`.
234
  /// - Throws: An error if the tracking fails.
235
  public func track(_ state: some Codable) async throws {
×
236
    try await track(state: JSONObject(state))
×
237
  }
×
238

239
  /// Tracks the given state in the channel.
240
  /// - Parameter state: The state to be tracked as a `JSONObject`.
241
  public func track(state: JSONObject) async {
×
242
    if status != .subscribed {
×
243
      reportIssue(
×
244
        "You can only track your presence after subscribing to the channel. Did you forget to call `channel.subscribe()`?"
×
245
      )
×
246
    }
×
247

×
248
    await push(
×
249
      ChannelEvent.presence,
×
250
      payload: [
×
251
        "type": "presence",
×
252
        "event": "track",
×
253
        "payload": .object(state),
×
254
      ]
×
255
    )
×
256
  }
×
257

258
  /// Stops tracking the current state in the channel.
259
  public func untrack() async {
×
260
    await push(
×
261
      ChannelEvent.presence,
×
262
      payload: [
×
263
        "type": "presence",
×
264
        "event": "untrack",
×
265
      ]
×
266
    )
×
267
  }
×
268

269
  func onMessage(_ message: RealtimeMessageV2) async {
2✔
270
    do {
2✔
271
      guard let eventType = message._eventType else {
2✔
272
        logger?.debug("Received message without event type: \(message)")
×
273
        return
×
274
      }
2✔
275

2✔
276
      switch eventType {
2✔
277
      case .tokenExpired:
2✔
278
        // deprecated type
×
279
        break
×
280

2✔
281
      case .system:
2✔
282
        if message.status == .ok {
×
283
          logger?.debug("Subscribed to channel \(message.topic)")
×
284
          status = .subscribed
×
285
        } else {
×
286
          logger?.debug(
×
287
            "Failed to subscribe to channel \(message.topic): \(message.payload)"
×
288
          )
×
289
        }
×
290

×
291
        callbackManager.triggerSystem(message: message)
×
292

2✔
293
      case .reply:
2✔
294
        guard
2✔
295
          let ref = message.ref,
2✔
296
          let status = message.payload["status"]?.stringValue
2✔
297
        else {
2✔
298
          throw RealtimeError("Received a reply with unexpected payload: \(message)")
×
299
        }
2✔
300

2✔
301
        await didReceiveReply(ref: ref, status: status)
2✔
302

2✔
303
        if message.payload["response"]?.objectValue?.keys
2✔
304
          .contains(ChannelEvent.postgresChanges) == true
2✔
305
        {
2✔
306
          let serverPostgresChanges = try message.payload["response"]?
2✔
307
            .objectValue?["postgres_changes"]?
2✔
308
            .decode(as: [PostgresJoinConfig].self)
2✔
309

2✔
310
          callbackManager.setServerChanges(changes: serverPostgresChanges ?? [])
2✔
311

2✔
312
          if self.status != .subscribed {
2✔
313
            self.status = .subscribed
2✔
314
            logger?.debug("Subscribed to channel \(message.topic)")
2✔
315
          }
2✔
316
        }
2✔
317

2✔
318
      case .postgresChanges:
2✔
319
        guard let data = message.payload["data"] else {
×
320
          logger?.debug("Expected \"data\" key in message payload.")
×
321
          return
×
322
        }
×
323

×
324
        let ids = message.payload["ids"]?.arrayValue?.compactMap(\.intValue) ?? []
×
325

×
326
        let postgresActions = try data.decode(as: PostgresActionData.self)
×
327

×
328
        let action: AnyAction
×
329
        switch postgresActions.type {
×
330
        case "UPDATE":
×
331
          action = .update(
×
332
            UpdateAction(
×
333
              columns: postgresActions.columns,
×
334
              commitTimestamp: postgresActions.commitTimestamp,
×
335
              record: postgresActions.record ?? [:],
×
336
              oldRecord: postgresActions.oldRecord ?? [:],
×
337
              rawMessage: message
×
338
            )
×
339
          )
×
340

×
341
        case "DELETE":
×
342
          action = .delete(
×
343
            DeleteAction(
×
344
              columns: postgresActions.columns,
×
345
              commitTimestamp: postgresActions.commitTimestamp,
×
346
              oldRecord: postgresActions.oldRecord ?? [:],
×
347
              rawMessage: message
×
348
            )
×
349
          )
×
350

×
351
        case "INSERT":
×
352
          action = .insert(
×
353
            InsertAction(
×
354
              columns: postgresActions.columns,
×
355
              commitTimestamp: postgresActions.commitTimestamp,
×
356
              record: postgresActions.record ?? [:],
×
357
              rawMessage: message
×
358
            )
×
359
          )
×
360

×
361
        default:
×
362
          throw RealtimeError("Unknown event type: \(postgresActions.type)")
×
363
        }
×
364

×
365
        callbackManager.triggerPostgresChanges(ids: ids, data: action)
×
366

2✔
367
      case .broadcast:
2✔
368
        let payload = message.payload
×
369

×
370
        guard let event = payload["event"]?.stringValue else {
×
371
          throw RealtimeError("Expected 'event' key in 'payload' for broadcast event.")
×
372
        }
×
373

×
374
        callbackManager.triggerBroadcast(event: event, json: payload)
×
375

2✔
376
      case .close:
2✔
377
        socket._remove(self)
×
378
        logger?.debug("Unsubscribed from channel \(message.topic)")
×
379
        status = .unsubscribed
×
380

2✔
381
      case .error:
2✔
382
        logger?.error(
×
383
          "Received an error in channel \(message.topic). That could be as a result of an invalid access token"
×
384
        )
×
385

2✔
386
      case .presenceDiff:
2✔
387
        let joins = try message.payload["joins"]?.decode(as: [String: PresenceV2].self) ?? [:]
×
388
        let leaves = try message.payload["leaves"]?.decode(as: [String: PresenceV2].self) ?? [:]
×
389
        callbackManager.triggerPresenceDiffs(joins: joins, leaves: leaves, rawMessage: message)
×
390

2✔
391
      case .presenceState:
2✔
392
        let joins = try message.payload.decode(as: [String: PresenceV2].self)
×
393
        callbackManager.triggerPresenceDiffs(joins: joins, leaves: [:], rawMessage: message)
×
394
      }
2✔
395
    } catch {
2✔
396
      logger?.debug("Failed: \(error)")
×
397
    }
2✔
398
  }
2✔
399

400
  /// Listen for clients joining / leaving the channel using presences.
401
  public func onPresenceChange(
402
    _ callback: @escaping @Sendable (any PresenceAction) -> Void
403
  ) -> RealtimeSubscription {
2✔
404
    if status == .subscribed {
2✔
405
      logger?.debug(
×
406
        "Resubscribe to \(self.topic) due to change in presence callback on joined channel."
×
407
      )
×
408
      Task {
×
409
        await unsubscribe()
×
410
        await subscribe()
×
411
      }
×
412
    }
×
413

2✔
414
    let id = callbackManager.addPresenceCallback(callback: callback)
2✔
415

2✔
416
    return RealtimeSubscription { [weak callbackManager, logger] in
2✔
417
      logger?.debug("Removing presence callback with id: \(id)")
2✔
418
      callbackManager?.removeCallback(id: id)
2✔
419
    }
2✔
420
  }
2✔
421

422
  /// Listen for postgres changes in a channel.
423
  public func onPostgresChange(
424
    _: AnyAction.Type,
425
    schema: String = "public",
426
    table: String? = nil,
427
    filter: String? = nil,
428
    callback: @escaping @Sendable (AnyAction) -> Void
429
  ) -> RealtimeSubscription {
1✔
430
    _onPostgresChange(
1✔
431
      event: .all,
1✔
432
      schema: schema,
1✔
433
      table: table,
1✔
434
      filter: filter
1✔
435
    ) {
1✔
436
      callback($0)
×
437
    }
×
438
  }
1✔
439

440
  /// Listen for postgres changes in a channel.
441
  public func onPostgresChange(
442
    _: InsertAction.Type,
443
    schema: String = "public",
444
    table: String? = nil,
445
    filter: String? = nil,
446
    callback: @escaping @Sendable (InsertAction) -> Void
447
  ) -> RealtimeSubscription {
2✔
448
    _onPostgresChange(
2✔
449
      event: .insert,
2✔
450
      schema: schema,
2✔
451
      table: table,
2✔
452
      filter: filter
2✔
453
    ) {
2✔
454
      guard case let .insert(action) = $0 else { return }
×
455
      callback(action)
×
456
    }
×
457
  }
2✔
458

459
  /// Listen for postgres changes in a channel.
460
  public func onPostgresChange(
461
    _: UpdateAction.Type,
462
    schema: String = "public",
463
    table: String? = nil,
464
    filter: String? = nil,
465
    callback: @escaping @Sendable (UpdateAction) -> Void
466
  ) -> RealtimeSubscription {
2✔
467
    _onPostgresChange(
2✔
468
      event: .update,
2✔
469
      schema: schema,
2✔
470
      table: table,
2✔
471
      filter: filter
2✔
472
    ) {
2✔
473
      guard case let .update(action) = $0 else { return }
×
474
      callback(action)
×
475
    }
×
476
  }
2✔
477

478
  /// Listen for postgres changes in a channel.
479
  public func onPostgresChange(
480
    _: DeleteAction.Type,
481
    schema: String = "public",
482
    table: String? = nil,
483
    filter: String? = nil,
484
    callback: @escaping @Sendable (DeleteAction) -> Void
485
  ) -> RealtimeSubscription {
2✔
486
    _onPostgresChange(
2✔
487
      event: .delete,
2✔
488
      schema: schema,
2✔
489
      table: table,
2✔
490
      filter: filter
2✔
491
    ) {
2✔
492
      guard case let .delete(action) = $0 else { return }
×
493
      callback(action)
×
494
    }
×
495
  }
2✔
496

497
  func _onPostgresChange(
498
    event: PostgresChangeEvent,
499
    schema: String,
500
    table: String?,
501
    filter: String?,
502
    callback: @escaping @Sendable (AnyAction) -> Void
503
  ) -> RealtimeSubscription {
7✔
504
    guard status != .subscribed else {
7✔
505
      reportIssue(
×
506
        "You cannot call postgresChange after joining the channel, this won't work as expected."
×
507
      )
×
508
      return RealtimeSubscription {}
×
509
    }
7✔
510

7✔
511
    let config = PostgresJoinConfig(
7✔
512
      event: event,
7✔
513
      schema: schema,
7✔
514
      table: table,
7✔
515
      filter: filter
7✔
516
    )
7✔
517

7✔
518
    Task { @MainActor in
7✔
519
      mutableState.clientChanges.append(config)
7✔
520
    }
7✔
521

7✔
522
    let id = callbackManager.addPostgresCallback(filter: config, callback: callback)
7✔
523
    return RealtimeSubscription { [weak callbackManager, logger] in
7✔
524
      logger?.debug("Removing postgres callback with id: \(id)")
7✔
525
      callbackManager?.removeCallback(id: id)
7✔
526
    }
7✔
527
  }
7✔
528

529
  /// Listen for broadcast messages sent by other clients within the same channel under a specific `event`.
530
  public func onBroadcast(
531
    event: String,
532
    callback: @escaping @Sendable (JSONObject) -> Void
533
  ) -> RealtimeSubscription {
2✔
534
    let id = callbackManager.addBroadcastCallback(event: event, callback: callback)
2✔
535
    return RealtimeSubscription { [weak callbackManager, logger] in
2✔
536
      logger?.debug("Removing broadcast callback with id: \(id)")
2✔
537
      callbackManager?.removeCallback(id: id)
2✔
538
    }
2✔
539
  }
2✔
540

541
  /// Listen for `system` event.
542
  public func onSystem(
543
    callback: @escaping @Sendable (RealtimeMessageV2) -> Void
544
  ) -> RealtimeSubscription {
1✔
545
    let id = callbackManager.addSystemCallback(callback: callback)
1✔
546
    return RealtimeSubscription { [weak callbackManager, logger] in
1✔
547
      logger?.debug("Removing system callback with id: \(id)")
1✔
548
      callbackManager?.removeCallback(id: id)
1✔
549
    }
1✔
550
  }
1✔
551

552
  /// Listen for `system` event.
553
  public func onSystem(
554
    callback: @escaping @Sendable () -> Void
555
  ) -> RealtimeSubscription {
1✔
556
    self.onSystem { _ in callback() }
1✔
557
  }
1✔
558

559
  @MainActor
560
  @discardableResult
561
  func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus {
5✔
562
    let message = RealtimeMessageV2(
5✔
563
      joinRef: joinRef,
5✔
564
      ref: ref ?? socket.makeRef(),
5✔
565
      topic: self.topic,
5✔
566
      event: event,
5✔
567
      payload: payload
5✔
568
    )
5✔
569

5✔
570
    let push = PushV2(channel: self, message: message)
5✔
571
    if let ref = message.ref {
5✔
572
      mutableState.pushes[ref] = push
5✔
573
    }
5✔
574

5✔
575
    return await push.send()
5✔
576
  }
5✔
577

578
  @MainActor
579
  private func didReceiveReply(ref: String, status: String) {
2✔
580
    let push = mutableState.pushes.removeValue(forKey: ref)
2✔
581
    push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
2✔
582
  }
2✔
583
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc