• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_net / 15910199448

26 Jun 2025 06:57PM UTC coverage: 94.008% (+0.7%) from 93.275%
15910199448

Pull #199

github

web-flow
Merge c1c0c5e19 into 1e7398c63
Pull Request #199: feat: process the queue on demand

68 of 72 new or added lines in 1 file covered. (94.44%)

2 existing lines in 1 file now uncovered.

455 of 484 relevant lines covered (94.01%)

252.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.67
/src/worker.c
1
#include <unistd.h>
2
#include <errno.h>
3
#include <string.h>
4
#include <inttypes.h>
5

6
#include "pg_prelude.h"
7
#include "curl_prelude.h"
8
#include "util.h"
9
#include "errors.h"
10
#include "core.h"
11
#include "event.h"
12

13
#define MIN_LIBCURL_VERSION_NUM 0x075300 // This is the 7.83.0 version in hex as defined in curl/curlver.h
14
#define REQUIRED_LIBCURL_ERR_MSG "libcurl >= 7.83.0 is required, we use the curl_easy_nextheader() function added in this version"
15
_Static_assert(LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG); // test for older libcurl versions that don't even have LIBCURL_VERSION_NUM defined (e.g. libcurl 6.5).
16
_Static_assert(LIBCURL_VERSION_NUM >= MIN_LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG);
17

18
PG_MODULE_MAGIC;
122✔
19

20
typedef enum {
21
  WS_NOT_YET = 1,
22
  WS_RUNNING,
23
  WS_EXITED,
24
} WorkerStatus;
25

26
typedef struct {
27
  pg_atomic_uint32  should_restart;
28
  pg_atomic_uint32  should_work;
29
  pg_atomic_uint32  status;
30
  Latch             latch;
31
  ConditionVariable cv;
32
} WorkerState;
33

34
WorkerState *worker_state = NULL;
35

36
static const int                curl_handle_event_timeout_ms = 1000;
37
static const long               queue_processing_wait_timeout_ms = 1000;
38
static const int                net_worker_restart_time_sec = 1;
39
static const long               no_timeout = -1L;
40

41
static char*                    guc_ttl;
42
static int                      guc_batch_size;
43
static char*                    guc_database_name;
44
static char*                    guc_username;
45
static MemoryContext            CurlMemContext = NULL;
46
static shmem_startup_hook_type  prev_shmem_startup_hook = NULL;
47
static volatile sig_atomic_t    got_sighup = false;
48

49
void _PG_init(void);
50
PGDLLEXPORT void pg_net_worker(Datum main_arg) pg_attribute_noreturn();
51

52
PG_FUNCTION_INFO_V1(worker_restart);
104✔
53
Datum worker_restart(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
18✔
54
  bool result = DatumGetBool(DirectFunctionCall1(pg_reload_conf, (Datum) NULL)); // reload the config
18✔
55
  pg_atomic_write_u32(&worker_state->should_restart, 1);
18✔
56
  pg_write_barrier();
18✔
57
  if (worker_state)
18✔
58
    SetLatch(&worker_state->latch);
18✔
59
  PG_RETURN_BOOL(result); // TODO is not necessary to return a bool here, but we do it to maintain backward compatibility
18✔
60
}
61

62
static void wait_until_state(WorkerState *ws, WorkerStatus expected_status){
22✔
63
  if (pg_atomic_read_u32(&ws->status) == expected_status) // fast return without sleeping, in case condition is fulfilled
22✔
64
    return;
65

66
  ConditionVariablePrepareToSleep(&ws->cv);
16✔
67
  while (pg_atomic_read_u32(&ws->status) != expected_status) {
32✔
68
    ConditionVariableSleep(&ws->cv, PG_WAIT_EXTENSION);
16✔
69
  }
70
  ConditionVariableCancelSleep();
16✔
71
}
72

73
PG_FUNCTION_INFO_V1(wait_until_running);
108✔
74
Datum wait_until_running(__attribute__ ((unused)) PG_FUNCTION_ARGS){
22✔
75
  wait_until_state(worker_state, WS_RUNNING);
22✔
76

77
  PG_RETURN_VOID();
22✔
78
}
79

80
PG_FUNCTION_INFO_V1(wake);
166✔
81
Datum wake(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
382✔
82
  uint32 expected = 0;
382✔
83

84
  bool success = pg_atomic_compare_exchange_u32(&worker_state->should_work, &expected, 1);
382✔
85
  pg_write_barrier();
382✔
86

87
  if (success) // only wake the worker on first put
382✔
88
    SetLatch(&worker_state->latch);
67✔
89

90
  PG_RETURN_VOID();
382✔
91
}
92

93
static void
94
handle_sigterm(__attribute__ ((unused)) SIGNAL_ARGS)
2✔
95
{
96
  int save_errno = errno;
2✔
97
  pg_atomic_write_u32(&worker_state->should_restart, 1);
2✔
98
  pg_write_barrier();
2✔
99
  if (worker_state)
2✔
100
    SetLatch(&worker_state->latch);
2✔
101
  errno = save_errno;
2✔
102
}
2✔
103

104
static void
105
handle_sighup(__attribute__ ((unused)) SIGNAL_ARGS)
16✔
106
{
107
  int     save_errno = errno;
16✔
108
  got_sighup = true;
16✔
109
  if (worker_state)
16✔
110
    SetLatch(&worker_state->latch);
16✔
111
  errno = save_errno;
16✔
112
}
16✔
113

114
static void publish_state(WorkerStatus s) {
139✔
115
  pg_atomic_write_u32(&worker_state->status, (uint32)s);
139✔
116
  pg_write_barrier();
139✔
117
  ConditionVariableBroadcast(&worker_state->cv);
139✔
118
}
139✔
119

120
static bool process_interrupts(){
134✔
121
  bool restart = false;
134✔
122

123
  CHECK_FOR_INTERRUPTS();
134✔
124

125
  if (got_sighup) {
134✔
126
    got_sighup = false;
2✔
127
    ProcessConfigFile(PGC_SIGHUP);
2✔
128
  }
129

130
  if (pg_atomic_read_u32(&worker_state->should_restart) == 1){ // if a restart is issued, make sure we do it again after waiting
134✔
131
    restart = true;
20✔
132
  }
133

134
  return restart;
134✔
135
}
136

137
void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
20✔
138
  OwnLatch(&worker_state->latch);
20✔
139

140
  pqsignal(SIGTERM, handle_sigterm);
20✔
141
  pqsignal(SIGHUP, handle_sighup);
20✔
142
  pqsignal(SIGUSR1, procsignal_sigusr1_handler);
20✔
143

144
  BackgroundWorkerUnblockSignals();
20✔
145

146
  BackgroundWorkerInitializeConnection(guc_database_name, guc_username, 0);
20✔
147
  pgstat_report_appname("pg_net " EXTVERSION); // set appname for pg_stat_activity
20✔
148

149
  elog(INFO, "pg_net_worker started with a config of: pg_net.ttl=%s, pg_net.batch_size=%d, pg_net.username=%s, pg_net.database_name=%s", guc_ttl, guc_batch_size, guc_username, guc_database_name);
20✔
150

151
  int curl_ret = curl_global_init(CURL_GLOBAL_ALL);
20✔
152
  if(curl_ret != CURLE_OK)
20✔
153
    ereport(ERROR, errmsg("curl_global_init() returned %s\n", curl_easy_strerror(curl_ret)));
×
154

155
  LoopState lstate = {
60✔
156
    .epfd = event_monitor(),
20✔
157
    .curl_mhandle = curl_multi_init(),
20✔
158
  };
159

160
  if (lstate.epfd < 0) {
20✔
UNCOV
161
    ereport(ERROR, errmsg("Failed to create event monitor file descriptor"));
×
162
  }
163

164
  if(!lstate.curl_mhandle)
20✔
UNCOV
165
    ereport(ERROR, errmsg("curl_multi_init()"));
×
166

167
  set_curl_mhandle(lstate.curl_mhandle, &lstate);
20✔
168

169
  while (true) {
119✔
170
    publish_state(WS_RUNNING);
119✔
171

172
    uint32 expected = 1;
119✔
173
    if (!pg_atomic_compare_exchange_u32(&worker_state->should_work, &expected, 0)){
119✔
174
      elog(DEBUG1, "pg_net_worker waiting for wake");
54✔
175
      // this will also wait for the `create extension net` to load, since the signal can only come from the request functions inside the `net` schema
176
      WaitLatch(&worker_state->latch,
54✔
177
                WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
178
                no_timeout,
179
                PG_WAIT_EXTENSION);
180
      ResetLatch(&worker_state->latch);
54✔
181
      if(process_interrupts())
54✔
182
        goto restart;
12✔
183

184
      continue;
42✔
185
    }
186

187
    uint64 requests_consumed = 0;
139✔
188
    uint64 expired_responses = 0;
139✔
189

190
    do {
139✔
191
      expired_responses = delete_expired_responses(guc_ttl, guc_batch_size);
139✔
192

193
      elog(DEBUG1, "Deleted %zu expired rows", expired_responses);
139✔
194

195
      requests_consumed = consume_request_queue(lstate.curl_mhandle, guc_batch_size, CurlMemContext);
139✔
196

197
      elog(DEBUG1, "Consumed %zu request rows", requests_consumed);
139✔
198

199
      if(requests_consumed == 0)
139✔
200
        continue;
59✔
201

202
      int running_handles = 0;
80✔
203
      int maxevents = guc_batch_size + 1; // 1 extra for the timer
80✔
204
      event *events = palloc0(sizeof(event) * maxevents);
80✔
205

206
      do {
253✔
207
        int nfds = wait_event(lstate.epfd, events, maxevents, curl_handle_event_timeout_ms);
253✔
208
        if (nfds < 0) {
253✔
NEW
209
          int save_errno = errno;
×
NEW
210
          if(save_errno == EINTR) { // can happen when the wait is interrupted, for example when running under GDB. Just continue in this case.
×
NEW
211
            continue;
×
212
          }
213
          else {
NEW
214
            ereport(ERROR, errmsg("wait_event() failed: %s", strerror(save_errno)));
×
215
            break;
216
          }
217
        }
218

219
        for (int i = 0; i < nfds; i++) {
1,178✔
220
          if (is_timer(events[i])) {
925✔
221
            EREPORT_MULTI(
106✔
222
              curl_multi_socket_action(lstate.curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles)
223
            );
224
          } else {
225
            int curl_event = get_curl_event(events[i]);
819✔
226
            int sockfd = get_socket_fd(events[i]);
819✔
227

228
            EREPORT_MULTI(
819✔
229
              curl_multi_socket_action(
230
                lstate.curl_mhandle,
231
                sockfd,
232
                curl_event,
233
                &running_handles)
234
            );
235
          }
236

237
          insert_curl_responses(&lstate, CurlMemContext);
925✔
238
        }
239

240
        elog(DEBUG1, "Pending curl running_handles: %d", running_handles);
253✔
241
      } while (running_handles > 0); // run while there are curl handles, some won't finish in a single iteration since they could be slow and waiting for a timeout
253✔
242

243
      pfree(events);
80✔
244

245
      MemoryContextReset(CurlMemContext);
80✔
246

247
      // slow down queue processing to avoid using too much CPU
248
      WaitLatch(&worker_state->latch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, queue_processing_wait_timeout_ms, PG_WAIT_EXTENSION);
80✔
249
      ResetLatch(&worker_state->latch);
80✔
250
      if(process_interrupts()){
80✔
251
        if(requests_consumed > 0 || expired_responses > 0){ // if we have to restart, ensure the remaining work will continue
8✔
252
          pg_atomic_write_u32(&worker_state->should_work, 1);
8✔
253
          pg_write_barrier();
8✔
254
        }
255

256
        goto restart;
8✔
257
      }
258

259
    } while (requests_consumed > 0 || expired_responses > 0);
131✔
260
  }
261

262
restart:
20✔
263

264
  publish_state(WS_EXITED);
20✔
265

266
  elog(DEBUG1, "pg_net_worker restarting, will it process the queue when started: %s", pg_atomic_read_u32(&worker_state->should_work)?"Yes":"No");
32✔
267

268
  pg_atomic_write_u32(&worker_state->should_restart, 0);
20✔
269

270
  ev_monitor_close(&lstate);
20✔
271

272
  curl_multi_cleanup(lstate.curl_mhandle);
20✔
273
  curl_global_cleanup();
20✔
274

275
  DisownLatch(&worker_state->latch);
20✔
276

277
  // causing a failure on exit will make the postmaster process restart the bg worker
278
  proc_exit(EXIT_FAILURE);
20✔
279
}
280

281
static void net_shmem_startup(void) {
122✔
282
  if (prev_shmem_startup_hook)
122✔
283
    prev_shmem_startup_hook();
×
284

285
  bool found;
122✔
286

287
  worker_state = ShmemInitStruct("pg_net worker state", sizeof(WorkerState), &found);
122✔
288

289
  if (!found) { // only at worker initialization, once worker restarts it will be found
122✔
290
    pg_atomic_init_u32(&worker_state->should_restart, 0);
122✔
291
    pg_atomic_init_u32(&worker_state->status, WS_NOT_YET);
122✔
292
    pg_atomic_init_u32(&worker_state->should_work, 0);
122✔
293
    InitSharedLatch(&worker_state->latch);
122✔
294
    ConditionVariableInit(&worker_state->cv);
122✔
295
  }
296
}
122✔
297

298
void _PG_init(void) {
122✔
299
  if (IsBinaryUpgrade) {
122✔
300
      return;
×
301
  }
302

303
  if (!process_shared_preload_libraries_in_progress) {
122✔
304
      ereport(ERROR, errmsg("pg_net is not in shared_preload_libraries"),
×
305
              errhint("Add pg_net to the shared_preload_libraries "
306
                      "configuration variable in postgresql.conf."));
307
  }
308

309
  RegisterBackgroundWorker(&(BackgroundWorker){
122✔
310
    .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
311
    .bgw_start_time = BgWorkerStart_RecoveryFinished,
312
    .bgw_library_name = "pg_net",
313
    .bgw_function_name = "pg_net_worker",
314
    .bgw_name = "pg_net " EXTVERSION " worker",
315
    .bgw_restart_time = net_worker_restart_time_sec,
316
  });
317

318
  prev_shmem_startup_hook = shmem_startup_hook;
122✔
319
  shmem_startup_hook = net_shmem_startup;
122✔
320

321
  CurlMemContext = AllocSetContextCreate(TopMemoryContext,
122✔
322
                       "pg_net curl context",
323
                       ALLOCSET_DEFAULT_MINSIZE,
324
                       ALLOCSET_DEFAULT_INITSIZE,
325
                       ALLOCSET_DEFAULT_MAXSIZE);
326

327
  DefineCustomStringVariable("pg_net.ttl",
122✔
328
                 "time to live for request/response rows",
329
                 "should be a valid interval type",
330
                 &guc_ttl,
331
                 "6 hours",
332
                 PGC_SIGHUP, 0,
333
                 NULL, NULL, NULL);
334

335
  DefineCustomIntVariable("pg_net.batch_size",
122✔
336
                 "number of requests executed in one iteration of the background worker",
337
                 NULL,
338
                 &guc_batch_size,
339
                 200,
340
                 0, PG_INT16_MAX,
341
                 PGC_SIGHUP, 0,
342
                 NULL, NULL, NULL);
343

344
  DefineCustomStringVariable("pg_net.database_name",
122✔
345
                "Database where the worker will connect to",
346
                NULL,
347
                &guc_database_name,
348
                "postgres",
349
                PGC_SU_BACKEND, 0,
350
                NULL, NULL, NULL);
351

352
  DefineCustomStringVariable("pg_net.username",
122✔
353
                "Connection user for the worker",
354
                NULL,
355
                &guc_username,
356
                NULL,
357
                PGC_SU_BACKEND, 0,
358
                NULL, NULL, NULL);
359
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc