• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 24396888249

14 Apr 2026 11:40AM UTC coverage: 20.876% (-61.5%) from 82.422%
24396888249

Pull #1018

github

web-flow
Merge d70c1c3c5 into 5d1db21b1
Pull Request #1018: feat: add outcome logging for tenant pools

625 of 3457 branches covered (18.08%)

Branch coverage included in aggregate %.

7 of 14 new or added lines in 2 files covered. (50.0%)

2865 existing lines in 97 files now uncovered.

1344 of 5975 relevant lines covered (22.49%)

4.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

9.3
/src/internal/pubsub/postgres.ts
1
import EventEmitter from 'node:events'
2
import { ERRORS } from '@internal/errors'
3
import { logger, logSchema } from '@internal/monitoring'
4
import createSubscriber, { Subscriber } from 'pg-listen'
5
import { PubSubAdapter } from './adapter'
6

7
export class PostgresPubSub extends EventEmitter implements PubSubAdapter {
8
  isConnected = false
1✔
9
  subscriber: Subscriber
10

11
  constructor(connectionString: string) {
12
    super()
1✔
13
    this.subscriber = createSubscriber(
1✔
14
      { connectionString },
15
      {
UNCOV
16
        retryInterval: (attempt) => Math.min(attempt * 100, 1000),
×
17
        retryTimeout: 60 * 1000 * 60 * 14, // 24h
18
      }
19
    )
20

21
    this.subscriber.events.on('error', (e) => {
1✔
22
      this.isConnected = false
×
23
      this.emit('error', e)
×
24
    })
25
  }
26

27
  async start(opts?: { signal?: AbortSignal }): Promise<void> {
UNCOV
28
    if (opts?.signal?.aborted) {
×
29
      throw ERRORS.Aborted('Postgres pubsub connection aborted')
×
30
    }
31

UNCOV
32
    await this.subscriber.connect()
×
UNCOV
33
    this.isConnected = true
×
34

UNCOV
35
    if (opts?.signal) {
×
36
      opts.signal.addEventListener(
×
37
        'abort',
38
        async () => {
39
          logSchema.info(logger, '[PubSub] Stopping', {
×
40
            type: 'pubsub',
41
          })
42
          await this.close()
×
43
        },
44
        { once: true }
45
      )
46
    }
47

UNCOV
48
    await Promise.all(
×
49
      this.subscriber.notifications.eventNames().map(async (channel) => {
50
        return this.subscriber.listenTo(channel as string)
×
51
      })
52
    )
53
  }
54

55
  async close(): Promise<void> {
UNCOV
56
    this.isConnected = false
×
UNCOV
57
    this.subscriber.notifications.eventNames().forEach((event) => {
×
UNCOV
58
      this.subscriber.notifications.removeAllListeners(event)
×
59
    })
UNCOV
60
    await this.subscriber.close()
×
UNCOV
61
    logSchema.info(logger, '[PubSub] Exited', {
×
62
      type: 'pubsub',
63
    })
64
  }
65

66
  async publish(channel: string, payload: unknown): Promise<void> {
67
    await this.subscriber.notify(channel, payload)
×
68
  }
69

70
  async subscribe(channel: string, cb: (payload: any) => void): Promise<void> {
UNCOV
71
    const listenerCount = this.subscriber.notifications.listenerCount(channel)
×
UNCOV
72
    this.subscriber.notifications.on(channel, cb)
×
73

UNCOV
74
    if (this.isConnected && listenerCount === 0) {
×
UNCOV
75
      await this.subscriber.listenTo(channel)
×
76
    }
77
  }
78

79
  async unsubscribe(channel: string, cb: (payload: any) => void): Promise<void> {
UNCOV
80
    this.subscriber.notifications.removeListener(channel, cb)
×
81

UNCOV
82
    const isListening = this.subscriber.notifications.listenerCount(channel) > 0
×
83

UNCOV
84
    if (!isListening && this.isConnected) {
×
85
      await this.subscriber.unlisten(channel)
×
86
    }
87
  }
88
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc