• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supabase-swift / 16528215778

25 Jul 2025 05:44PM UTC coverage: 75.872% (-0.7%) from 76.606%
16528215778

Pull #752

github

web-flow
Merge 91538a241 into 8ac8c4a3d
Pull Request #752: fix(realtime): implement event buffering for URLSessionWebSocket

0 of 101 new or added lines in 1 file covered. (0.0%)

3 existing lines in 1 file now uncovered.

5308 of 6996 relevant lines covered (75.87%)

21.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/Sources/Realtime/WebSocket/URLSessionWebSocket.swift
1
import ConcurrencyExtras
2
import Foundation
3

4
#if canImport(FoundationNetworking)
5
  import FoundationNetworking
6
#endif
7

8
/// A WebSocket connection implementation using `URLSession`.
9
///
10
/// This class provides a WebSocket connection built on top of `URLSessionWebSocketTask`.
11
/// It handles connection lifecycle, message sending/receiving, and proper cleanup.
12
///
13
/// ## Thread Safety
14
/// This class is thread-safe and can be used from multiple concurrent contexts.
15
/// All operations are protected by internal synchronization mechanisms.
16
///
17
/// ## Connection Management
18
/// The connection is established asynchronously using the `connect(to:protocols:configuration:)` method.
19
/// Once connected, you can send text/binary messages and listen for events through the `onEvent` callback.
20
///
21
/// ## Error Handling
22
/// Network errors are automatically handled and converted to appropriate WebSocket close codes.
23
/// The connection will be closed gracefully when errors occur, with proper cleanup of resources.
24
final class URLSessionWebSocket: WebSocket {
25
  /// Private initializer for creating a WebSocket instance.
26
  /// - Parameters:
27
  ///   - _task: The underlying `URLSessionWebSocketTask` for this connection.
28
  ///   - _protocol: The negotiated WebSocket subprotocol, empty string if none.
29
  private init(
30
    _task: URLSessionWebSocketTask,
31
    _protocol: String
32
  ) {
×
33
    self._task = _task
×
34
    self._protocol = _protocol
×
35

×
36
    _scheduleReceive()
×
37
  }
×
38

39
  /// Creates and establishes a new WebSocket connection.
40
  ///
41
  /// This method asynchronously connects to the specified WebSocket URL and returns
42
  /// a fully initialized `URLSessionWebSocket` instance ready for use.
43
  ///
44
  /// - Parameters:
45
  ///   - url: The WebSocket URL to connect to. Must use `ws://` or `wss://` scheme.
46
  ///   - protocols: Optional array of WebSocket subprotocols to negotiate with the server.
47
  ///   - configuration: Optional `URLSessionConfiguration` for customizing the connection.
48
  ///                   Defaults to `.default` if not provided.
49
  /// - Returns: A connected `URLSessionWebSocket` instance.
50
  /// - Throws: `WebSocketError.connection` if the connection fails or times out.
51
  static func connect(
52
    to url: URL,
53
    protocols: [String]? = nil,
54
    configuration: URLSessionConfiguration? = nil
55
  ) async throws -> URLSessionWebSocket {
×
56
    guard url.scheme == "ws" || url.scheme == "wss" else {
×
57
      preconditionFailure("only ws: and wss: schemes are supported")
×
58
    }
×
59

×
60
    struct MutableState {
×
61
      var continuation: CheckedContinuation<URLSessionWebSocket, any Error>!
×
62
      var webSocket: URLSessionWebSocket?
×
63
    }
×
64

×
65
    let mutableState = LockIsolated(MutableState())
×
66

×
67
    let session = URLSession.sessionWithConfiguration(
×
68
      configuration ?? .default,
×
69
      onComplete: { session, task, error in
×
70
        mutableState.withValue {
×
71
          if let webSocket = $0.webSocket {
×
72
            // There are three possibilities here:
×
73
            // 1. the peer sent a close Frame, `onWebSocketTaskClosed` was already
×
74
            //    called and `_connectionClosed` is a no-op.
×
75
            // 2. we sent a close Frame (through `close()`) and `_connectionClosed`
×
76
            //    is a no-op.
×
77
            // 3. an error occurred (e.g. network failure) and `_connectionClosed`
×
78
            //    will signal that and close `event`.
×
79
            webSocket._connectionClosed(
×
NEW
80
              code: 1006,
×
NEW
81
              reason: Data("abnormal close".utf8)
×
NEW
82
            )
×
83
          } else if let error {
×
84
            $0.continuation.resume(
×
85
              throwing: WebSocketError.connection(
×
NEW
86
                message: "connection ended unexpectedly",
×
NEW
87
                error: error
×
NEW
88
              )
×
NEW
89
            )
×
90
          } else {
×
91
            // `onWebSocketTaskOpened` should have been called and resumed continuation.
×
92
            // So either there was an error creating the connection or a logic error.
×
93
            assertionFailure(
×
NEW
94
              "expected an error or `onWebSocketTaskOpened` to have been called first"
×
NEW
95
            )
×
96
          }
×
97
        }
×
98
      },
×
99
      onWebSocketTaskOpened: { session, task, `protocol` in
×
100
        mutableState.withValue {
×
101
          $0.webSocket = URLSessionWebSocket(_task: task, _protocol: `protocol` ?? "")
×
102
          $0.continuation.resume(returning: $0.webSocket!)
×
103
        }
×
104
      },
×
105
      onWebSocketTaskClosed: { session, task, code, reason in
×
106
        mutableState.withValue {
×
107
          assert($0.webSocket != nil, "connection should exist by this time")
×
108
          $0.webSocket!._connectionClosed(code: code, reason: reason)
×
109
        }
×
110
      }
×
111
    )
×
112

×
113
    session.webSocketTask(with: url, protocols: protocols ?? []).resume()
×
114
    return try await withCheckedThrowingContinuation { continuation in
×
115
      mutableState.withValue {
×
116
        $0.continuation = continuation
×
117
      }
×
118
    }
×
119
  }
×
120

121
  /// The underlying URLSession WebSocket task.
122
  let _task: URLSessionWebSocketTask
123
  /// The negotiated WebSocket subprotocol.
124
  let _protocol: String
125

126
  /// Thread-safe mutable state for the WebSocket connection.
127
  struct MutableState {
128
    /// Whether the connection has been closed.
UNCOV
129
    var isClosed = false
×
130
    /// Callback for handling WebSocket events.
131
    var onEvent: (@Sendable (WebSocketEvent) -> Void)?
132
    /// Buffer for events received before onEvent callback is attached.
NEW
133
    var eventBuffer: [WebSocketEvent] = []
×
134
    /// The close code received when connection was closed.
135
    var closeCode: Int?
136
    /// The close reason received when connection was closed.
137
    var closeReason: String?
138
  }
139

140
  /// Lock-isolated mutable state to ensure thread safety.
UNCOV
141
  let mutableState = LockIsolated(MutableState())
×
142

143
  /// The close code received when the connection was closed, if any.
144
  var closeCode: Int? {
×
145
    mutableState.value.closeCode
×
146
  }
×
147

148
  /// The close reason received when the connection was closed, if any.
149
  var closeReason: String? {
×
150
    mutableState.value.closeReason
×
151
  }
×
152

153
  /// Whether the WebSocket connection is closed.
154
  var isClosed: Bool {
×
155
    mutableState.value.isClosed
×
156
  }
×
157

158
  /// Handles incoming WebSocket messages and converts them to events.
159
  /// - Parameter value: The message received from the WebSocket.
160
  private func _handleMessage(_ value: URLSessionWebSocketTask.Message) {
×
161
    guard !isClosed else { return }
×
162

×
NEW
163
    let event: WebSocketEvent
×
NEW
164
    switch value {
×
NEW
165
    case .string(let text):
×
NEW
166
      event = .text(text)
×
NEW
167
    case .data(let data):
×
NEW
168
      event = .binary(data)
×
NEW
169
    @unknown default:
×
NEW
170
      // Handle unknown message types gracefully by closing the connection
×
NEW
171
      _closeConnectionWithError(
×
NEW
172
        WebSocketError.connection(
×
NEW
173
          message: "Received unsupported message type",
×
NEW
174
          error: NSError(
×
NEW
175
            domain: "WebSocketError",
×
NEW
176
            code: 1002,
×
NEW
177
            userInfo: [NSLocalizedDescriptionKey: "Unsupported message type"]
×
NEW
178
          )
×
NEW
179
        )
×
NEW
180
      )
×
NEW
181
      return
×
NEW
182
    }
×
183
    _trigger(event)
×
184
    _scheduleReceive()
×
185
  }
×
186

187
  /// Schedules the next message receive operation.
188
  /// This method continuously listens for incoming messages until the connection is closed.
189
  private func _scheduleReceive() {
×
190
    Task {
×
191
      let result = await Result { try await _task.receive() }
×
192
      switch result {
×
NEW
193
      case .success(let value):
×
NEW
194
        _handleMessage(value)
×
NEW
195
      case .failure(let error):
×
NEW
196
        _closeConnectionWithError(error)
×
197
      }
×
198
    }
×
199
  }
×
200

201
  /// Closes the connection due to an error and maps the error to appropriate WebSocket close codes.
202
  /// - Parameter error: The error that caused the connection to close.
203
  private func _closeConnectionWithError(_ error: any Error) {
×
204
    let nsError = error as NSError
×
NEW
205

×
NEW
206
    // Handle socket not connected error - delegate callbacks will handle this
×
207
    if nsError.domain == NSPOSIXErrorDomain && nsError.code == 57 {
×
208
      // Socket is not connected.
×
209
      // onWebsocketTaskClosed/onComplete will be invoked and may indicate a close code.
×
210
      return
×
211
    }
×
NEW
212

×
NEW
213
    // Map errors to appropriate WebSocket close codes per RFC 6455
×
NEW
214
    let (code, reason): (Int, String) = {
×
215
      switch (nsError.domain, nsError.code) {
×
216
      case (NSPOSIXErrorDomain, 100):
×
NEW
217
        // Network protocol error
×
NEW
218
        return (1002, nsError.localizedDescription)
×
NEW
219
      case (NSURLErrorDomain, NSURLErrorTimedOut):
×
NEW
220
        // Connection timeout
×
NEW
221
        return (1006, "Connection timed out")
×
NEW
222
      case (NSURLErrorDomain, NSURLErrorNetworkConnectionLost):
×
NEW
223
        // Network connection lost
×
NEW
224
        return (1006, "Network connection lost")
×
NEW
225
      case (NSURLErrorDomain, NSURLErrorNotConnectedToInternet):
×
NEW
226
        // No internet connection
×
NEW
227
        return (1006, "No internet connection")
×
NEW
228
      default:
×
NEW
229
        // Abnormal closure for other errors
×
NEW
230
        return (1006, nsError.localizedDescription)
×
231
      }
×
NEW
232
    }()
×
NEW
233

×
234
    _task.cancel()
×
235
    _connectionClosed(code: code, reason: Data(reason.utf8))
×
236
  }
×
237

238
  /// Handles the connection being closed and triggers the close event.
239
  /// - Parameters:
240
  ///   - code: The WebSocket close code, if available.
241
  ///   - reason: The close reason data, if available.
242
  private func _connectionClosed(code: Int?, reason: Data?) {
×
243
    guard !isClosed else { return }
×
244

×
245
    let closeReason = reason.map { String(decoding: $0, as: UTF8.self) } ?? ""
×
246
    _trigger(.close(code: code, reason: closeReason))
×
247
  }
×
248

249
  /// Sends a text message to the connected peer.
250
  /// - Parameter text: The text message to send.
251
  ///
252
  /// This method is non-blocking and will return immediately. If the connection
253
  /// is closed, the message will be silently dropped. Any errors during sending
254
  /// will cause the connection to be closed with an appropriate error code.
255
  func send(_ text: String) {
×
256
    guard !isClosed else {
×
257
      return
×
258
    }
×
259

×
260
    Task {
×
261
      do {
×
262
        try await _task.send(.string(text))
×
263
      } catch {
×
264
        _closeConnectionWithError(error)
×
265
      }
×
266
    }
×
267
  }
×
268

269
  /// Callback for handling WebSocket events.
270
  ///
271
  /// Set this property to receive notifications about WebSocket events including:
272
  /// - `.text(String)`: Text messages received from the peer
273
  /// - `.binary(Data)`: Binary messages received from the peer
274
  /// - `.close(code: Int?, reason: String)`: Connection closed events
275
  ///
276
  /// When setting this callback, any buffered events received before the callback
277
  /// was attached will be replayed immediately in the order they were received.
278
  ///
279
  /// The callback is called on an arbitrary queue and should be thread-safe.
280
  var onEvent: (@Sendable (WebSocketEvent) -> Void)? {
281
    get { mutableState.value.onEvent }
×
NEW
282
    set {
×
NEW
283
      mutableState.withValue { state in
×
NEW
284
        state.onEvent = newValue
×
NEW
285

×
NEW
286
        // Replay buffered events when callback is attached
×
NEW
287
        if let onEvent = newValue, !state.eventBuffer.isEmpty {
×
NEW
288
          let bufferedEvents = state.eventBuffer
×
NEW
289
          state.eventBuffer.removeAll()
×
NEW
290

×
NEW
291
          for event in bufferedEvents {
×
NEW
292
            onEvent(event)
×
NEW
293
          }
×
NEW
294
        }
×
NEW
295
      }
×
NEW
296
    }
×
297
  }
298

299
  /// Triggers a WebSocket event and updates internal state if needed.
300
  /// - Parameter event: The event to trigger.
301
  private func _trigger(_ event: WebSocketEvent) {
×
302
    mutableState.withValue {
×
NEW
303
      if let onEvent = $0.onEvent {
×
NEW
304
        // Deliver event immediately if callback is available
×
NEW
305
        onEvent(event)
×
NEW
306
      } else {
×
NEW
307
        // Buffer event if no callback is attached yet
×
NEW
308
        // Limit buffer size to prevent memory issues (keep last 100 events)
×
NEW
309
        $0.eventBuffer.append(event)
×
NEW
310
        if $0.eventBuffer.count > 100 {
×
NEW
311
          $0.eventBuffer.removeFirst()
×
NEW
312
        }
×
NEW
313
      }
×
314

×
NEW
315
      // Update state when connection closes
×
316
      if case .close(let code, let reason) = event {
×
317
        $0.onEvent = nil
×
318
        $0.isClosed = true
×
319
        $0.closeCode = code
×
320
        $0.closeReason = reason
×
NEW
321
        // Clear buffer when connection closes
×
NEW
322
        $0.eventBuffer.removeAll()
×
323
      }
×
324
    }
×
325
  }
×
326

327
  /// Sends binary data to the connected peer.
328
  /// - Parameter binary: The binary data to send.
329
  ///
330
  /// This method is non-blocking and will return immediately. If the connection
331
  /// is closed, the message will be silently dropped. Any errors during sending
332
  /// will cause the connection to be closed with an appropriate error code.
333
  func send(_ binary: Data) {
×
334
    guard !isClosed else {
×
335
      return
×
336
    }
×
337

×
338
    Task {
×
339
      do {
×
340
        try await _task.send(.data(binary))
×
341
      } catch {
×
342
        _closeConnectionWithError(error)
×
343
      }
×
344
    }
×
345
  }
×
346

347
  /// Closes the WebSocket connection gracefully.
348
  ///
349
  /// Sends a close frame to the peer with the specified code and reason.
350
  /// Valid close codes are 1000 (normal closure) or in the range 3000-4999 (application-specific).
351
  ///
352
  /// - Parameters:
353
  ///   - code: Optional close code. Must be 1000 or in range 3000-4999. Defaults to normal closure.
354
  ///   - reason: Optional reason string. Must be ≤ 123 bytes when UTF-8 encoded.
355
  ///
356
  /// - Note: If the connection is already closed, this method has no effect.
357
  func close(code: Int?, reason: String?) {
×
358
    guard !isClosed else {
×
359
      return
×
360
    }
×
361

×
NEW
362
    // Validate close code per RFC 6455
×
NEW
363
    if let code = code, code != 1000, !(code >= 3000 && code <= 4999) {
×
364
      preconditionFailure(
×
NEW
365
        "Invalid close code: \(code). Must be 1000 or in range 3000-4999"
×
NEW
366
      )
×
367
    }
×
368

×
NEW
369
    // Validate reason length per RFC 6455
×
NEW
370
    if let reason = reason, reason.utf8.count > 123 {
×
NEW
371
      preconditionFailure("Close reason must be ≤ 123 bytes when UTF-8 encoded")
×
372
    }
×
373

×
374
    mutableState.withValue {
×
NEW
375
      guard !$0.isClosed else { return }
×
NEW
376

×
NEW
377
      if let code = code {
×
NEW
378
        let closeReason = reason ?? ""
×
NEW
379
        _task.cancel(
×
NEW
380
          with: URLSessionWebSocketTask.CloseCode(rawValue: code)!,
×
NEW
381
          reason: Data(closeReason.utf8)
×
NEW
382
        )
×
NEW
383
      } else {
×
NEW
384
        _task.cancel()
×
385
      }
×
386
    }
×
387
  }
×
388

389
  /// The WebSocket subprotocol negotiated with the peer.
390
  ///
391
  /// Returns an empty string if no subprotocol was negotiated during the handshake.
392
  /// See [RFC 6455 Section 1.9](https://datatracker.ietf.org/doc/html/rfc6455#section-1.9) for details.
UNCOV
393
  var `protocol`: String { _protocol }
×
394
}
395

396
// MARK: - URLSession Extension
397

398
extension URLSession {
399
  /// Creates a URLSession with WebSocket delegate callbacks.
400
  ///
401
  /// This factory method creates a URLSession configured with the specified delegate callbacks
402
  /// for handling WebSocket lifecycle events. The session uses a dedicated operation queue
403
  /// with maximum concurrency of 1 to ensure proper sequencing of delegate callbacks.
404
  ///
405
  /// - Parameters:
406
  ///   - configuration: The URLSession configuration to use.
407
  ///   - onComplete: Optional callback when a task completes (with or without error).
408
  ///   - onWebSocketTaskOpened: Optional callback when a WebSocket connection opens successfully.
409
  ///   - onWebSocketTaskClosed: Optional callback when a WebSocket connection closes.
410
  /// - Returns: A configured URLSession instance.
411
  static func sessionWithConfiguration(
412
    _ configuration: URLSessionConfiguration,
413
    onComplete: (@Sendable (URLSession, URLSessionTask, (any Error)?) -> Void)? = nil,
414
    onWebSocketTaskOpened: (@Sendable (URLSession, URLSessionWebSocketTask, String?) -> Void)? =
415
      nil,
416
    onWebSocketTaskClosed: (@Sendable (URLSession, URLSessionWebSocketTask, Int?, Data?) -> Void)? =
417
      nil
418
  ) -> URLSession {
×
419
    let queue = OperationQueue()
×
420
    queue.maxConcurrentOperationCount = 1
×
421

×
422
    let hasDelegate =
×
423
      onComplete != nil || onWebSocketTaskOpened != nil || onWebSocketTaskClosed != nil
×
424

×
425
    if hasDelegate {
×
426
      return URLSession(
×
427
        configuration: configuration,
×
428
        delegate: _Delegate(
×
429
          onComplete: onComplete,
×
430
          onWebSocketTaskOpened: onWebSocketTaskOpened,
×
431
          onWebSocketTaskClosed: onWebSocketTaskClosed
×
432
        ),
×
433
        delegateQueue: queue
×
434
      )
×
435
    } else {
×
436
      return URLSession(configuration: configuration)
×
437
    }
×
438
  }
×
439
}
440

441
// MARK: - Private Delegate
442

443
/// Internal URLSession delegate for handling WebSocket events.
444
///
445
/// This delegate handles the various WebSocket lifecycle events and forwards them
446
/// to the appropriate callbacks provided during URLSession creation.
447
final class _Delegate: NSObject, URLSessionDelegate, URLSessionDataDelegate, URLSessionTaskDelegate,
448
  URLSessionWebSocketDelegate
449
{
450
  /// Callback for task completion events.
451
  let onComplete: (@Sendable (URLSession, URLSessionTask, (any Error)?) -> Void)?
452
  /// Callback for WebSocket connection opened events.
453
  let onWebSocketTaskOpened: (@Sendable (URLSession, URLSessionWebSocketTask, String?) -> Void)?
454
  /// Callback for WebSocket connection closed events.
455
  let onWebSocketTaskClosed: (@Sendable (URLSession, URLSessionWebSocketTask, Int?, Data?) -> Void)?
456

457
  init(
458
    onComplete: (@Sendable (URLSession, URLSessionTask, (any Error)?) -> Void)?,
459
    onWebSocketTaskOpened: (
460
      @Sendable (URLSession, URLSessionWebSocketTask, String?) -> Void
461
    )?,
462
    onWebSocketTaskClosed: (
463
      @Sendable (URLSession, URLSessionWebSocketTask, Int?, Data?) -> Void
464
    )?
465
  ) {
×
466
    self.onComplete = onComplete
×
467
    self.onWebSocketTaskOpened = onWebSocketTaskOpened
×
468
    self.onWebSocketTaskClosed = onWebSocketTaskClosed
×
469
  }
×
470

471
  /// Called when a task completes, with or without error.
472
  func urlSession(
473
    _ session: URLSession,
474
    task: URLSessionTask,
475
    didCompleteWithError error: (any Error)?
476
  ) {
×
477
    onComplete?(session, task, error)
×
478
  }
×
479

480
  /// Called when a WebSocket connection is successfully established.
481
  func urlSession(
482
    _ session: URLSession,
483
    webSocketTask: URLSessionWebSocketTask,
484
    didOpenWithProtocol protocol: String?
485
  ) {
×
486
    onWebSocketTaskOpened?(session, webSocketTask, `protocol`)
×
487
  }
×
488

489
  /// Called when a WebSocket connection is closed.
490
  func urlSession(
491
    _ session: URLSession,
492
    webSocketTask: URLSessionWebSocketTask,
493
    didCloseWith closeCode: URLSessionWebSocketTask.CloseCode,
494
    reason: Data?
495
  ) {
×
496
    onWebSocketTaskClosed?(session, webSocketTask, closeCode.rawValue, reason)
×
497
  }
×
498
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc