• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_net / 19149954872

06 Nov 2025 09:10PM UTC coverage: 93.071% (+1.0%) from 92.081%
19149954872

push

github

steve-chavez
refactor: use clang-format style

You can run this with:

```bash
net-style
```

The style is also checked on CI.

220 of 243 new or added lines in 5 files covered. (90.53%)

3 existing lines in 2 files now uncovered.

497 of 534 relevant lines covered (93.07%)

192.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.0
/src/worker.c
1
#include <errno.h>
2
#include <inttypes.h>
3
#include <string.h>
4
#include <unistd.h>
5

6
#define PG_PRELUDE_IMPL
7
#include "pg_prelude.h"
8

9
#include "curl_prelude.h"
10

11
#include "core.h"
12
#include "errors.h"
13
#include "event.h"
14
#include "util.h"
15

16
#define MIN_LIBCURL_VERSION_NUM                                                                    \
17
  0x075300 // This is the 7.83.0 version in hex as defined in curl/curlver.h
18
#define REQUIRED_LIBCURL_ERR_MSG                                                                   \
19
  "libcurl >= 7.83.0 is required, we use the curl_easy_nextheader() function added in this "       \
20
  "version"
21
_Static_assert(LIBCURL_VERSION_NUM,
22
               REQUIRED_LIBCURL_ERR_MSG); // test for older libcurl versions that don't even have
23
                                          // LIBCURL_VERSION_NUM defined (e.g. libcurl 6.5).
24
_Static_assert(LIBCURL_VERSION_NUM >= MIN_LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG);
25

26
PG_MODULE_MAGIC;
95✔
27

28
typedef enum {
29
  WORKER_WAIT_NO_TIMEOUT,
30
  WORKER_WAIT_ONE_SECOND,
31
} WorkerWait;
32

33
static WorkerState *worker_state = NULL;
34

35
static const int    curl_handle_event_timeout_ms = 1000;
36
static const int    net_worker_restart_time_sec  = 1;
37
static const long   no_timeout                   = -1L;
38
static bool         wake_commit_cb_active        = false;
39
static bool         worker_should_restart        = false;
40
static const size_t total_extension_tables       = 2;
41

42
static char *guc_ttl;
43
static int   guc_batch_size;
44
static char *guc_database_name;
45
static char *guc_username;
46

47
#if PG15_GTE
48
static shmem_request_hook_type prev_shmem_request_hook = NULL;
49
#endif
50

51
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
52
static volatile sig_atomic_t   got_sighup              = false;
53

54
void _PG_init(void);
55

56
#if PG_VERSION_NUM >= 180000
57
PGDLLEXPORT pg_noreturn void pg_net_worker(Datum main_arg);
58
#else
59
PGDLLEXPORT void pg_net_worker(Datum main_arg) pg_attribute_noreturn();
60
#endif
61

62
PG_FUNCTION_INFO_V1(worker_restart);
66✔
63
Datum worker_restart(__attribute__((unused)) PG_FUNCTION_ARGS) {
17✔
64
  bool result = DatumGetBool(DirectFunctionCall1(pg_reload_conf, (Datum)NULL)); // reload the config
17✔
65
  pg_atomic_write_u32(&worker_state->got_restart, 1);
17✔
66
  pg_write_barrier();
17✔
67
  if (worker_state->shared_latch) SetLatch(worker_state->shared_latch);
17✔
68
  PG_RETURN_BOOL(result); // TODO is not necessary to return a bool here, but we do it to maintain
17✔
69
                          // backward compatibility
70
}
71

72
static void wait_until_state(WorkerState *ws, WorkerStatus expected_status) {
18✔
73
  if (pg_atomic_read_u32(&ws->status) ==
18✔
74
      expected_status) // fast return without sleeping, in case condition is fulfilled
75
    return;
76

77
  ConditionVariablePrepareToSleep(&ws->cv);
15✔
78
  while (pg_atomic_read_u32(&ws->status) != expected_status) {
30✔
79
    ConditionVariableSleep(&ws->cv, PG_WAIT_EXTENSION);
15✔
80
  }
81
  ConditionVariableCancelSleep();
15✔
82
}
83

84
PG_FUNCTION_INFO_V1(wait_until_running);
67✔
85
Datum wait_until_running(__attribute__((unused)) PG_FUNCTION_ARGS) {
18✔
86
  wait_until_state(worker_state, WS_RUNNING);
18✔
87

88
  PG_RETURN_VOID();
18✔
89
}
90

91
// only wake at commit time to prevent excessive and unnecessary wakes.
92
// e.g only one wake when doing `select
93
// net.http_get('http://localhost:8080/pathological?status=200') from generate_series(1,100000);`
94
static void wake_at_commit(XactEvent event, __attribute__((unused)) void *arg) {
292✔
95
  elog(DEBUG2, "pg_net xact callback received: %s", xact_event_name(event));
292✔
96

97
  switch (event) {
292✔
98
  case XACT_EVENT_COMMIT:
120✔
99
  case XACT_EVENT_PARALLEL_COMMIT:
100
    if (wake_commit_cb_active) {
120✔
101
      uint32 expected = 0;
48✔
102
      bool   success  = pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 1);
48✔
103
      pg_write_barrier();
48✔
104

105
      if (success) // only wake the worker on first put, so if many concurrent wakes come we only
48✔
106
                   // wake once
107
        SetLatch(worker_state->shared_latch);
21✔
108

109
      wake_commit_cb_active = false;
48✔
110
    }
111
    break;
112
  // TODO: `PREPARE TRANSACTION 'xx';` and `COMMIT PREPARED TRANSACTION 'xx';` do not wake the
113
  // worker automatically, they require a manual `net.wake()` These are disabled by default and
114
  // rarely used, see `max_prepared_transactions`
115
  // https://www.postgresql.org/docs/17/runtime-config-resource.html#GUC-MAX-PREPARED-TRANSACTIONS
116
  case XACT_EVENT_PREPARE:
52✔
117
  // abort the callback on rollback
118
  case XACT_EVENT_ABORT:
119
  case XACT_EVENT_PARALLEL_ABORT: wake_commit_cb_active = false; break;
52✔
120
  default                       : break;
121
  }
122
}
292✔
123

124
PG_FUNCTION_INFO_V1(wake);
100✔
125
Datum wake(__attribute__((unused)) PG_FUNCTION_ARGS) {
433✔
126
  if (!wake_commit_cb_active) { // register only one callback per transaction
433✔
127
    RegisterXactCallback(wake_at_commit, NULL);
54✔
128
    wake_commit_cb_active = true;
54✔
129
  }
130

131
  PG_RETURN_VOID();
433✔
132
}
133

134
static void handle_sigterm(__attribute__((unused)) SIGNAL_ARGS) {
2✔
135
  int save_errno = errno;
2✔
136
  pg_atomic_write_u32(&worker_state->got_restart, 1);
2✔
137
  pg_write_barrier();
2✔
138
  if (worker_state->shared_latch) SetLatch(worker_state->shared_latch);
2✔
139
  errno = save_errno;
2✔
140
}
2✔
141

142
static void handle_sighup(__attribute__((unused)) SIGNAL_ARGS) {
18✔
143
  int save_errno = errno;
18✔
144
  got_sighup     = true;
18✔
145
  if (worker_state->shared_latch) SetLatch(worker_state->shared_latch);
18✔
146
  errno = save_errno;
18✔
147
}
18✔
148

149
/*
150
 *We have to handle sigusr1 explicitly because the default
151
 *procsignal_sigusr1_handler doesn't `SetLatch`, this would prevent
152
 *DROP DATATABASE from finishing since our worker would be sleeping and not reach
153
 *CHECK_FOR_INTERRUPTS()
154
 */
155
static void handle_sigusr1(SIGNAL_ARGS) {
4✔
156
  int save_errno = errno;
4✔
157
  if (worker_state->shared_latch) SetLatch(worker_state->shared_latch);
4✔
158
  errno = save_errno;
4✔
159
  procsignal_sigusr1_handler(postgres_signal_arg);
4✔
160
}
4✔
161

162
static void publish_state(WorkerStatus s) {
38✔
163
  pg_atomic_write_u32(&worker_state->status, (uint32)s);
38✔
164
  pg_write_barrier();
38✔
165
  ConditionVariableBroadcast(&worker_state->cv);
38✔
166
}
38✔
167

168
static void net_on_exit(__attribute__((unused)) int code, __attribute__((unused)) Datum arg) {
19✔
169
  worker_should_restart = false;
19✔
170
  pg_atomic_write_u32(&worker_state->should_wake,
19✔
171
                      1); // ensure the remaining work will continue since we'll restart
172

173
  worker_state->shared_latch = NULL;
19✔
174

175
  ev_monitor_close(worker_state);
19✔
176

177
  curl_multi_cleanup(worker_state->curl_mhandle);
19✔
178
  curl_global_cleanup();
19✔
179
}
19✔
180

181
// wait according to the wait type while ensuring interrupts are processed while waiting
182
static void wait_while_processing_interrupts(WorkerWait ww, bool *should_restart) {
107✔
183
  switch (ww) {
107✔
184
  case WORKER_WAIT_NO_TIMEOUT:
28✔
185
    WaitLatch(worker_state->shared_latch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, no_timeout,
28✔
186
              PG_WAIT_EXTENSION);
187
    ResetLatch(worker_state->shared_latch);
28✔
188
    break;
28✔
189
  case WORKER_WAIT_ONE_SECOND:
79✔
190
    WaitLatch(worker_state->shared_latch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 1000,
79✔
191
              PG_WAIT_EXTENSION);
192
    ResetLatch(worker_state->shared_latch);
79✔
193
    break;
79✔
194
  }
195

196
  CHECK_FOR_INTERRUPTS();
107✔
197

198
  if (got_sighup) {
107✔
199
    got_sighup = false;
3✔
200
    ProcessConfigFile(PGC_SIGHUP);
3✔
201
  }
202

203
  if (pg_atomic_exchange_u32(&worker_state->got_restart, 0)) {
107✔
204
    *should_restart = true;
19✔
205
  }
206
}
107✔
207

208
static bool is_extension_locked(Oid ext_table_oids[static total_extension_tables]) {
83✔
209
  Oid net_oid = get_namespace_oid("net", true);
83✔
210

211
  if (!OidIsValid(net_oid)) {
83✔
212
    return false;
213
  }
214

215
  Oid queue_oid = get_relname_relid("http_request_queue", net_oid);
80✔
216
  Oid resp_oid  = get_relname_relid("_http_response", net_oid);
80✔
217

218
  bool is_locked = ConditionalLockRelationOid(queue_oid, AccessShareLock) &&
159✔
219
                   ConditionalLockRelationOid(resp_oid, AccessShareLock);
79✔
220

221
  if (is_locked) {
79✔
222
    ext_table_oids[0] = queue_oid;
79✔
223
    ext_table_oids[1] = resp_oid;
79✔
224
  }
225

226
  return is_locked;
227
}
228

229
static void unlock_extension(Oid ext_table_oids[static total_extension_tables]) {
79✔
230
  UnlockRelationOid(ext_table_oids[0], AccessShareLock);
79✔
231
  UnlockRelationOid(ext_table_oids[1], AccessShareLock);
79✔
232
}
79✔
233

234
void pg_net_worker(__attribute__((unused)) Datum main_arg) {
19✔
235
  worker_state->shared_latch = &MyProc->procLatch;
19✔
236
  on_proc_exit(net_on_exit, 0);
19✔
237

238
  BackgroundWorkerUnblockSignals();
19✔
239
  pqsignal(SIGTERM, handle_sigterm);
19✔
240
  pqsignal(SIGHUP, handle_sighup);
19✔
241
  pqsignal(SIGUSR1, handle_sigusr1);
19✔
242

243
  BackgroundWorkerInitializeConnection(guc_database_name, guc_username, 0);
19✔
244
  pgstat_report_appname("pg_net " EXTVERSION); // set appname for pg_stat_activity
19✔
245

246
  elog(INFO,
19✔
247
       "pg_net worker started with a config of: pg_net.ttl=%s, pg_net.batch_size=%d, "
248
       "pg_net.username=%s, pg_net.database_name=%s",
249
       guc_ttl, guc_batch_size, guc_username, guc_database_name);
250

251
  int curl_ret = curl_global_init(CURL_GLOBAL_ALL);
19✔
252
  if (curl_ret != CURLE_OK)
19✔
253
    ereport(ERROR, errmsg("curl_global_init() returned %s\n", curl_easy_strerror(curl_ret)));
×
254

255
  worker_state->epfd = event_monitor();
19✔
256

257
  if (worker_state->epfd < 0) {
19✔
258
    ereport(ERROR, errmsg("Failed to create event monitor file descriptor"));
×
259
  }
260

261
  worker_state->curl_mhandle = curl_multi_init();
19✔
262
  if (!worker_state->curl_mhandle) ereport(ERROR, errmsg("curl_multi_init()"));
19✔
263

264
  set_curl_mhandle(worker_state);
19✔
265

266
  publish_state(WS_RUNNING);
19✔
267

268
  do {
65✔
269

270
    uint32 expected = 1;
65✔
271
    if (!pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 0)) {
65✔
272
      elog(DEBUG1, "pg_net worker waiting for wake");
28✔
273
      wait_while_processing_interrupts(WORKER_WAIT_NO_TIMEOUT, &worker_should_restart);
28✔
274
      continue;
28✔
275
    }
276

277
    uint64 requests_consumed = 0;
83✔
278
    uint64 expired_responses = 0;
83✔
279

280
    do {
83✔
281
      SetCurrentStatementStartTimestamp();
83✔
282
      StartTransactionCommand();
83✔
283
      PushActiveSnapshot(GetTransactionSnapshot());
83✔
284

285
      Oid ext_table_oids[total_extension_tables];
83✔
286

287
      if (!is_extension_locked(ext_table_oids)) {
83✔
288
        elog(DEBUG1, "pg_net extension not loaded");
4✔
289
        PopActiveSnapshot();
4✔
290
        AbortCurrentTransaction();
4✔
291
        break;
4✔
292
      }
293

294
      SPI_connect();
79✔
295

296
      expired_responses = delete_expired_responses(guc_ttl, guc_batch_size);
79✔
297

298
      elog(DEBUG1, "Deleted " UINT64_FORMAT " expired rows", expired_responses);
79✔
299

300
      requests_consumed = consume_request_queue(guc_batch_size);
79✔
301

302
      elog(DEBUG1, "Consumed " UINT64_FORMAT " request rows", requests_consumed);
79✔
303

304
      if (requests_consumed > 0) {
79✔
305
        CurlHandle *handles = palloc(mul_size(sizeof(CurlHandle), requests_consumed));
51✔
306

307
        // initialize curl handles
308
        for (size_t j = 0; j < requests_consumed; j++) {
359✔
309
          init_curl_handle(&handles[j],
308✔
310
                           get_request_queue_row(SPI_tuptable->vals[j], SPI_tuptable->tupdesc));
308✔
311

312
          EREPORT_MULTI(curl_multi_add_handle(worker_state->curl_mhandle, handles[j].ez_handle));
308✔
313
        }
314

315
        // start curl event loop
316
        int   running_handles = 0;
51✔
317
        int   maxevents       = requests_consumed + 1; // 1 extra for the timer
51✔
318
        event events[maxevents];
51✔
319

320
        do {
168✔
321
          int nfds =
168✔
322
              wait_event(worker_state->epfd, events, maxevents, curl_handle_event_timeout_ms);
168✔
323

324
          if (nfds < 0) {
168✔
325
            int save_errno = errno;
×
NEW
326
            if (save_errno == EINTR) { // can happen when the wait is interrupted, for example when
×
327
                                       // running under GDB. Just continue in this case.
328
              elog(DEBUG1, "wait_event() got %s, continuing", strerror(save_errno));
×
329
              continue;
×
330
            } else {
UNCOV
331
              ereport(ERROR, errmsg("wait_event() failed: %s", strerror(save_errno)));
×
332
              break;
333
            }
334
          }
335

336
          for (int i = 0; i < nfds; i++) {
1,027✔
337
            if (is_timer(events[i])) {
859✔
338
              EREPORT_MULTI(curl_multi_socket_action(worker_state->curl_mhandle,
64✔
339
                                                     CURL_SOCKET_TIMEOUT, 0, &running_handles));
340
            } else {
341
              int curl_event = get_curl_event(events[i]);
795✔
342
              int sockfd     = get_socket_fd(events[i]);
795✔
343

344
              EREPORT_MULTI(curl_multi_socket_action(worker_state->curl_mhandle, sockfd, curl_event,
859✔
345
                                                     &running_handles));
346
            }
347
          }
348

349
          // insert finished responses
350
          CURLMsg *msg       = NULL;
168✔
351
          int      msgs_left = 0;
168✔
352
          while ((msg = curl_multi_info_read(worker_state->curl_mhandle, &msgs_left))) {
644✔
353
            if (msg->msg == CURLMSG_DONE) {
308✔
354
              CurlHandle *handle = NULL;
308✔
355
              EREPORT_CURL_GETINFO(msg->easy_handle, CURLINFO_PRIVATE, &handle);
308✔
356
              insert_response(handle, msg->data.result);
308✔
357
            } else {
358
              ereport(ERROR, errmsg("curl_multi_info_read(), CURLMsg=%d\n", msg->msg));
476✔
359
            }
360
          }
361

362
          elog(DEBUG1, "Pending curl running_handles: %d", running_handles);
168✔
363
          // run while there are curl handles, some won't finish in a single iteration since they
364
          // could be slow and waiting for a timeout
365
        } while (running_handles > 0);
168✔
366

367
        // cleanup
368
        for (uint64 i = 0; i < requests_consumed; i++) {
359✔
369
          EREPORT_MULTI(curl_multi_remove_handle(worker_state->curl_mhandle, handles[i].ez_handle));
308✔
370

371
          curl_easy_cleanup(handles[i].ez_handle);
308✔
372

373
          pfree_handle(&handles[i]);
308✔
374
        }
375

376
        pfree(handles);
51✔
377
      }
378

379
      SPI_finish();
79✔
380

381
      unlock_extension(ext_table_oids);
79✔
382

383
      PopActiveSnapshot();
79✔
384
      CommitTransactionCommand();
79✔
385

386
      // slow down queue processing to avoid using too much CPU
387
      wait_while_processing_interrupts(WORKER_WAIT_ONE_SECOND, &worker_should_restart);
79✔
388

389
    } while (!worker_should_restart && (requests_consumed > 0 || expired_responses > 0));
79✔
390

391
  } while (!worker_should_restart);
65✔
392

393
  publish_state(WS_EXITED);
19✔
394

395
  // causing a failure on exit will make the postmaster process restart the bg worker
396
  proc_exit(EXIT_FAILURE);
19✔
397
}
398

399
static Size net_memsize(void) {
400
  return MAXALIGN(sizeof(WorkerState));
401
}
402

403
#if PG15_GTE
404
static void net_shmem_request(void) {
95✔
405
  if (prev_shmem_request_hook) prev_shmem_request_hook();
95✔
406

407
  RequestAddinShmemSpace(net_memsize());
95✔
408
}
95✔
409
#endif
410

411
static void net_shmem_startup(void) {
95✔
412
  if (prev_shmem_startup_hook) prev_shmem_startup_hook();
95✔
413

414
  bool found;
95✔
415

416
  LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
95✔
417

418
  worker_state = ShmemInitStruct("pg_net worker state", sizeof(WorkerState), &found);
95✔
419

420
  if (!found) {
95✔
421
    pg_atomic_init_u32(&worker_state->got_restart, 0);
95✔
422
    pg_atomic_init_u32(&worker_state->status, WS_NOT_YET);
95✔
423
    pg_atomic_init_u32(&worker_state->should_wake, 1);
95✔
424
    worker_state->shared_latch = NULL;
95✔
425

426
    ConditionVariableInit(&worker_state->cv);
95✔
427
    worker_state->epfd         = 0;
95✔
428
    worker_state->curl_mhandle = NULL;
95✔
429
  }
430

431
  LWLockRelease(AddinShmemInitLock);
95✔
432
}
95✔
433

434
void _PG_init(void) {
95✔
435
  if (IsBinaryUpgrade) {
95✔
NEW
436
    return;
×
437
  }
438

439
  if (!process_shared_preload_libraries_in_progress) {
95✔
NEW
440
    ereport(ERROR, errmsg("pg_net is not in shared_preload_libraries"),
×
441
            errhint("Add pg_net to the shared_preload_libraries "
442
                    "configuration variable in postgresql.conf."));
443
  }
444

445
  RegisterBackgroundWorker(&(BackgroundWorker){
95✔
446
    .bgw_flags         = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
447
    .bgw_start_time    = BgWorkerStart_RecoveryFinished,
448
    .bgw_library_name  = "pg_net",
449
    .bgw_function_name = "pg_net_worker",
450
    .bgw_name          = "pg_net " EXTVERSION " worker",
451
    .bgw_restart_time  = net_worker_restart_time_sec,
452
  });
453

454
#if PG15_GTE
455
  prev_shmem_request_hook = shmem_request_hook;
95✔
456
  shmem_request_hook      = net_shmem_request;
95✔
457
#else
458
  RequestAddinShmemSpace(net_memsize());
459
#endif
460

461
  prev_shmem_startup_hook = shmem_startup_hook;
95✔
462
  shmem_startup_hook      = net_shmem_startup;
95✔
463

464
  DefineCustomStringVariable("pg_net.ttl", "time to live for request/response rows",
95✔
465
                             "should be a valid interval type", &guc_ttl, "6 hours", PGC_SIGHUP, 0,
466
                             NULL, NULL, NULL);
467

468
  DefineCustomIntVariable(
95✔
469
      "pg_net.batch_size", "number of requests executed in one iteration of the background worker",
470
      NULL, &guc_batch_size, 200, 0, PG_INT16_MAX, PGC_SIGHUP, 0, NULL, NULL, NULL);
471

472
  DefineCustomStringVariable("pg_net.database_name", "Database where the worker will connect to",
95✔
473
                             NULL, &guc_database_name, "postgres", PGC_SU_BACKEND, 0, NULL, NULL,
474
                             NULL);
475

476
  DefineCustomStringVariable("pg_net.username", "Connection user for the worker", NULL,
95✔
477
                             &guc_username, NULL, PGC_SU_BACKEND, 0, NULL, NULL, NULL);
478
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc