• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 24215194021

09 Apr 2026 09:53PM UTC coverage: 63.259% (-17.6%) from 80.843%
24215194021

Pull #1000

github

web-flow
Merge 434cd89fa into 41cfe7541
Pull Request #1000: feat: intro vitest & split units

3465 of 7093 branches covered (48.85%)

Branch coverage included in aggregate %.

2 of 12 new or added lines in 1 file covered. (16.67%)

2655 existing lines in 92 files now uncovered.

13417 of 19594 relevant lines covered (68.48%)

72.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.14
/src/internal/pubsub/postgres.ts
1
import EventEmitter from 'node:events'
2
import { ERRORS } from '@internal/errors'
3
import { logger, logSchema } from '@internal/monitoring'
4
import createSubscriber, { Subscriber } from 'pg-listen'
5
import { PubSubAdapter } from './adapter'
6

7
export class PostgresPubSub extends EventEmitter implements PubSubAdapter {
8
  isConnected = false
1✔
9
  subscriber: Subscriber
10

11
  constructor(connectionString: string) {
40✔
12
    super()
1✔
13
    this.subscriber = createSubscriber(
1✔
14
      { connectionString },
15
      {
UNCOV
16
        retryInterval: (attempt) => Math.min(attempt * 100, 1000),
×
17
        retryTimeout: 60 * 1000 * 60 * 14, // 24h
18
      }
19
    )
20

21
    this.subscriber.events.on('error', (e) => {
1✔
22
      this.isConnected = false
×
23
      this.emit('error', e)
×
24
    })
25
  }
26

27
  async start(opts?: { signal?: AbortSignal }): Promise<void> {
2✔
UNCOV
28
    if (opts?.signal?.aborted) {
×
29
      throw ERRORS.Aborted('Postgres pubsub connection aborted')
×
30
    }
31

UNCOV
32
    await this.subscriber.connect()
×
UNCOV
33
    this.isConnected = true
×
34

UNCOV
35
    if (opts?.signal) {
×
36
      opts.signal.addEventListener(
×
37
        'abort',
38
        async () => {
39
          logSchema.info(logger, '[PubSub] Stopping', {
×
40
            type: 'pubsub',
41
          })
42
          await this.close()
×
43
        },
44
        { once: true }
45
      )
46
    }
47

UNCOV
48
    await Promise.all(
×
49
      this.subscriber.notifications.eventNames().map(async (channel) => {
50
        return this.subscriber.listenTo(channel as string)
×
51
      })
52
    )
53
  }
54

55
  async close(): Promise<void> {
2✔
UNCOV
56
    this.isConnected = false
×
UNCOV
57
    this.subscriber.notifications.eventNames().forEach((event) => {
✔
UNCOV
58
      this.subscriber.notifications.removeAllListeners(event)
×
59
    })
UNCOV
60
    await this.subscriber.close()
×
UNCOV
61
    logSchema.info(logger, '[PubSub] Exited', {
×
62
      type: 'pubsub',
63
    })
64
  }
65

66
  async publish(channel: string, payload: unknown): Promise<void> {
67
    await this.subscriber.notify(channel, payload)
×
68
  }
69

70
  async subscribe(channel: string, cb: (payload: any) => void): Promise<void> {
263✔
UNCOV
71
    const listenerCount = this.subscriber.notifications.listenerCount(channel)
×
UNCOV
72
    this.subscriber.notifications.on(channel, cb)
×
73

UNCOV
74
    if (this.isConnected && listenerCount === 0) {
×
UNCOV
75
      await this.subscriber.listenTo(channel)
×
76
    }
77
  }
78

79
  async unsubscribe(channel: string, cb: (payload: any) => void): Promise<void> {
257✔
UNCOV
80
    this.subscriber.notifications.removeListener(channel, cb)
×
81

UNCOV
82
    const isListening = this.subscriber.notifications.listenerCount(channel) > 0
×
83

UNCOV
84
    if (!isListening && this.isConnected) {
×
85
      await this.subscriber.unlisten(channel)
×
86
    }
87
  }
88
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc