• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 23245841242

18 Mar 2026 12:59PM UTC coverage: 76.578% (+0.007%) from 76.571%
23245841242

Pull #917

github

web-flow
Merge 8ed98b482 into d9fb9c058
Pull Request #917: feat: support for watt process runner

4069 of 5777 branches covered (70.43%)

Branch coverage included in aggregate %.

5 of 6 new or added lines in 4 files covered. (83.33%)

2 existing lines in 1 file now uncovered.

26910 of 34677 relevant lines covered (77.6%)

186.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.56
/src/internal/pubsub/postgres.ts
1
import EventEmitter from 'node:events'
2✔
2
import { ERRORS } from '@internal/errors'
2✔
3
import { logger, logSchema } from '@internal/monitoring'
2✔
4
import createSubscriber, { Subscriber } from 'pg-listen'
2✔
5
import { PubSubAdapter } from './adapter'
2✔
6

2✔
7
export class PostgresPubSub extends EventEmitter implements PubSubAdapter {
2✔
8
  isConnected = false
2✔
9
  subscriber: Subscriber
2✔
10

2✔
11
  constructor(connectionString: string) {
2✔
12
    super()
2✔
13
    this.subscriber = createSubscriber(
2✔
14
      { connectionString },
2✔
15
      {
2✔
16
        retryInterval: (attempt) => Math.min(attempt * 100, 1000),
2✔
17
        retryTimeout: 60 * 1000 * 60 * 14, // 24h
2✔
18
      }
2✔
19
    )
2✔
20

2✔
21
    this.subscriber.events.on('error', (e) => {
2✔
22
      this.isConnected = false
×
23
      this.emit('error', e)
×
24
    })
2✔
25
  }
2✔
26

2✔
27
  async start(opts?: { signal?: AbortSignal }): Promise<void> {
2✔
28
    if (opts?.signal?.aborted) {
2!
29
      throw ERRORS.Aborted('Postgres pubsub connection aborted')
×
30
    }
×
31

2✔
32
    await this.subscriber.connect()
2✔
33
    this.isConnected = true
2✔
34

2✔
35
    if (opts?.signal) {
2!
36
      opts.signal.addEventListener(
×
37
        'abort',
×
38
        async () => {
×
39
          logSchema.info(logger, '[PubSub] Stopping', {
×
40
            type: 'pubsub',
×
41
          })
×
42
          await this.close()
×
43
        },
×
44
        { once: true }
×
45
      )
×
46
    }
×
47

2✔
48
    await Promise.all(
2✔
49
      this.subscriber.notifications.eventNames().map(async (channel) => {
2✔
50
        return this.subscriber.listenTo(channel as string)
×
51
      })
2✔
52
    )
2✔
53
  }
2✔
54

2✔
55
  async close(): Promise<void> {
2✔
56
    this.isConnected = false
2✔
57
    this.subscriber.notifications.eventNames().forEach((event) => {
2✔
58
      this.subscriber.notifications.removeAllListeners(event)
6✔
59
    })
2✔
60
    await this.subscriber.close()
2✔
61
    logSchema.info(logger, '[PubSub] Exited', {
2✔
62
      type: 'pubsub',
2✔
63
    })
2✔
64
  }
2✔
65

2✔
66
  async publish(channel: string, payload: unknown): Promise<void> {
2✔
67
    await this.subscriber.notify(channel, payload)
×
68
  }
×
69

2✔
70
  async subscribe(channel: string, cb: (payload: any) => void): Promise<void> {
2✔
71
    const listenerCount = this.subscriber.notifications.listenerCount(channel)
266✔
72
    this.subscriber.notifications.on(channel, cb)
266✔
73

266✔
74
    if (this.isConnected && listenerCount === 0) {
266!
75
      await this.subscriber.listenTo(channel)
×
76
    }
×
77
  }
266✔
78

2✔
79
  async unsubscribe(channel: string, cb: (payload: any) => void): Promise<void> {
2✔
80
    this.subscriber.notifications.removeListener(channel, cb)
266✔
81

266✔
82
    const isListening = this.subscriber.notifications.listenerCount(channel) > 0
266✔
83

266✔
84
    if (!isListening && this.isConnected) {
266!
UNCOV
85
      await this.subscriber.unlisten(channel)
×
UNCOV
86
    }
×
87
  }
266✔
88
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc