• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 23638991080

27 Mar 2026 09:06AM UTC coverage: 79.332% (+0.001%) from 79.331%
23638991080

push

github

web-flow
feat: support for watt process runner (#917)

5047 of 6664 branches covered (75.74%)

Branch coverage included in aggregate %.

14 of 18 new or added lines in 5 files covered. (77.78%)

3 existing lines in 2 files now uncovered.

28742 of 35928 relevant lines covered (80.0%)

587.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.27
/src/internal/pubsub/postgres.ts
1
import EventEmitter from 'node:events'
2✔
2
import { ERRORS } from '@internal/errors'
2✔
3
import { logger, logSchema } from '@internal/monitoring'
2✔
4
import createSubscriber, { Subscriber } from 'pg-listen'
2✔
5
import { PubSubAdapter } from './adapter'
2✔
6

2✔
7
export class PostgresPubSub extends EventEmitter implements PubSubAdapter {
2✔
8
  isConnected = false
2✔
9
  subscriber: Subscriber
2✔
10

2✔
11
  constructor(connectionString: string) {
2✔
12
    super()
60✔
13
    this.subscriber = createSubscriber(
60✔
14
      { connectionString },
60✔
15
      {
60✔
16
        retryInterval: (attempt) => Math.min(attempt * 100, 1000),
60✔
17
        retryTimeout: 60 * 1000 * 60 * 14, // 24h
60✔
18
      }
60✔
19
    )
60✔
20

60✔
21
    this.subscriber.events.on('error', (e) => {
60✔
22
      this.isConnected = false
×
23
      this.emit('error', e)
×
24
    })
60✔
25
  }
60✔
26

2✔
27
  async start(opts?: { signal?: AbortSignal }): Promise<void> {
2✔
28
    if (opts?.signal?.aborted) {
4!
29
      throw ERRORS.Aborted('Postgres pubsub connection aborted')
×
30
    }
×
31

4✔
32
    await this.subscriber.connect()
4✔
33
    this.isConnected = true
4✔
34

4✔
35
    if (opts?.signal) {
4!
36
      opts.signal.addEventListener(
×
37
        'abort',
×
38
        async () => {
×
39
          logSchema.info(logger, '[PubSub] Stopping', {
×
40
            type: 'pubsub',
×
41
          })
×
42
          await this.close()
×
43
        },
×
44
        { once: true }
×
45
      )
×
46
    }
×
47

4✔
48
    await Promise.all(
4✔
49
      this.subscriber.notifications.eventNames().map(async (channel) => {
4✔
50
        return this.subscriber.listenTo(channel as string)
×
51
      })
4✔
52
    )
4✔
53
  }
4✔
54

2✔
55
  async close(): Promise<void> {
2✔
56
    this.isConnected = false
4✔
57
    this.subscriber.notifications.eventNames().forEach((event) => {
4✔
58
      this.subscriber.notifications.removeAllListeners(event)
12✔
59
    })
4✔
60
    await this.subscriber.close()
4✔
61
    logSchema.info(logger, '[PubSub] Exited', {
4✔
62
      type: 'pubsub',
4✔
63
    })
4✔
64
  }
4✔
65

2✔
66
  async publish(channel: string, payload: unknown): Promise<void> {
2✔
67
    await this.subscriber.notify(channel, payload)
×
68
  }
×
69

2✔
70
  async subscribe(channel: string, cb: (payload: any) => void): Promise<void> {
2✔
71
    const listenerCount = this.subscriber.notifications.listenerCount(channel)
506✔
72
    this.subscriber.notifications.on(channel, cb)
506✔
73

506✔
74
    if (this.isConnected && listenerCount === 0) {
506✔
75
      await this.subscriber.listenTo(channel)
12✔
76
    }
12✔
77
  }
506✔
78

2✔
79
  async unsubscribe(channel: string, cb: (payload: any) => void): Promise<void> {
2✔
80
    this.subscriber.notifications.removeListener(channel, cb)
494✔
81

494✔
82
    const isListening = this.subscriber.notifications.listenerCount(channel) > 0
494✔
83

494✔
84
    if (!isListening && this.isConnected) {
494!
UNCOV
85
      await this.subscriber.unlisten(channel)
×
UNCOV
86
    }
×
87
  }
494✔
88
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc