• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_net / 17311608596

29 Aug 2025 12:52AM UTC coverage: 92.514% (+0.1%) from 92.365%
17311608596

Pull #232

github

web-flow
Merge ee7408156 into 314d00e34
Pull Request #232: refactor: remove unnecessary memory context switches

78 of 82 new or added lines in 3 files covered. (95.12%)

6 existing lines in 2 files now uncovered.

482 of 521 relevant lines covered (92.51%)

193.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.71
/src/worker.c
1
#include <unistd.h>
2
#include <errno.h>
3
#include <string.h>
4
#include <inttypes.h>
5

6
#define PG_PRELUDE_IMPL
7
#include "pg_prelude.h"
8
#include "curl_prelude.h"
9
#include "util.h"
10
#include "errors.h"
11
#include "core.h"
12
#include "event.h"
13

14
#define MIN_LIBCURL_VERSION_NUM 0x075300 // This is the 7.83.0 version in hex as defined in curl/curlver.h
15
#define REQUIRED_LIBCURL_ERR_MSG "libcurl >= 7.83.0 is required, we use the curl_easy_nextheader() function added in this version"
16
_Static_assert(LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG); // test for older libcurl versions that don't even have LIBCURL_VERSION_NUM defined (e.g. libcurl 6.5).
17
_Static_assert(LIBCURL_VERSION_NUM >= MIN_LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG);
18

19
PG_MODULE_MAGIC;
95✔
20

21
typedef enum {
22
  WORKER_WAIT_NO_TIMEOUT,
23
  WORKER_WAIT_ONE_SECOND,
24
} WorkerWait;
25

26
static WorkerState *worker_state = NULL;
27

28
static const int                curl_handle_event_timeout_ms = 1000;
29
static const int                net_worker_restart_time_sec = 1;
30
static const long               no_timeout = -1L;
31
static bool                     wake_commit_cb_active = false;
32
static bool                     worker_should_restart = false;
33
static const size_t             total_extension_tables = 2;
34

35
static char*                    guc_ttl;
36
static int                      guc_batch_size;
37
static char*                    guc_database_name;
38
static char*                    guc_username;
39

40
#if PG15_GTE
41
static shmem_request_hook_type  prev_shmem_request_hook = NULL;
42
#endif
43

44
static shmem_startup_hook_type  prev_shmem_startup_hook = NULL;
45
static volatile sig_atomic_t    got_sighup = false;
46

47
void _PG_init(void);
48

49
#if PG_VERSION_NUM >= 180000
50
  PGDLLEXPORT pg_noreturn void pg_net_worker(Datum main_arg);
51
#else
52
  PGDLLEXPORT void pg_net_worker(Datum main_arg) pg_attribute_noreturn();
53
#endif
54

55
PG_FUNCTION_INFO_V1(worker_restart);
66✔
56
Datum worker_restart(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
17✔
57
  bool result = DatumGetBool(DirectFunctionCall1(pg_reload_conf, (Datum) NULL)); // reload the config
17✔
58
  pg_atomic_write_u32(&worker_state->got_restart, 1);
17✔
59
  pg_write_barrier();
17✔
60
  if(worker_state->shared_latch)
17✔
61
    SetLatch(worker_state->shared_latch);
16✔
62
  PG_RETURN_BOOL(result); // TODO is not necessary to return a bool here, but we do it to maintain backward compatibility
17✔
63
}
64

65
static void wait_until_state(WorkerState *ws, WorkerStatus expected_status){
18✔
66
  if (pg_atomic_read_u32(&ws->status) == expected_status) // fast return without sleeping, in case condition is fulfilled
18✔
67
    return;
68

69
  ConditionVariablePrepareToSleep(&ws->cv);
15✔
70
  while (pg_atomic_read_u32(&ws->status) != expected_status) {
30✔
71
    ConditionVariableSleep(&ws->cv, PG_WAIT_EXTENSION);
15✔
72
  }
73
  ConditionVariableCancelSleep();
15✔
74
}
75

76
PG_FUNCTION_INFO_V1(wait_until_running);
67✔
77
Datum wait_until_running(__attribute__ ((unused)) PG_FUNCTION_ARGS){
18✔
78
  wait_until_state(worker_state, WS_RUNNING);
18✔
79

80
  PG_RETURN_VOID();
18✔
81
}
82

83
// only wake at commit time to prevent excessive and unnecessary wakes.
84
// e.g only one wake when doing `select net.http_get('http://localhost:8080/pathological?status=200') from generate_series(1,100000);`
85
static void wake_at_commit(XactEvent event, __attribute__ ((unused)) void *arg){
292✔
86
  elog(DEBUG2, "pg_net xact callback received: %s", xact_event_name(event));
292✔
87

88
  switch(event){
292✔
89
    case XACT_EVENT_COMMIT:
120✔
90
    case XACT_EVENT_PARALLEL_COMMIT:
91
      if(wake_commit_cb_active){
120✔
92
        uint32 expected = 0;
48✔
93
        bool success = pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 1);
48✔
94
        pg_write_barrier();
48✔
95

96
        if (success) // only wake the worker on first put, so if many concurrent wakes come we only wake once
48✔
97
          SetLatch(worker_state->shared_latch);
22✔
98

99
        wake_commit_cb_active = false;
48✔
100
      }
101
      break;
102
    // TODO: `PREPARE TRANSACTION 'xx';` and `COMMIT PREPARED TRANSACTION 'xx';` do not wake the worker automatically, they require a manual `net.wake()`
103
    // These are disabled by default and rarely used, see `max_prepared_transactions` https://www.postgresql.org/docs/17/runtime-config-resource.html#GUC-MAX-PREPARED-TRANSACTIONS
104
    case XACT_EVENT_PREPARE:
52✔
105
    // abort the callback on rollback
106
    case XACT_EVENT_ABORT:
107
    case XACT_EVENT_PARALLEL_ABORT:
108
      wake_commit_cb_active = false;
52✔
109
      break;
52✔
110
    default:
111
      break;
112
  }
113
}
292✔
114

115
PG_FUNCTION_INFO_V1(wake);
100✔
116
Datum wake(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
433✔
117
  if (!wake_commit_cb_active) { // register only one callback per transaction
433✔
118
    RegisterXactCallback(wake_at_commit, NULL);
54✔
119
    wake_commit_cb_active = true;
54✔
120
  }
121

122
  PG_RETURN_VOID();
433✔
123
}
124

125
static void
126
handle_sigterm(__attribute__ ((unused)) SIGNAL_ARGS)
2✔
127
{
128
  int save_errno = errno;
2✔
129
  pg_atomic_write_u32(&worker_state->got_restart, 1);
2✔
130
  pg_write_barrier();
2✔
131
  if(worker_state->shared_latch)
2✔
132
    SetLatch(worker_state->shared_latch);
2✔
133
  errno = save_errno;
2✔
134
}
2✔
135

136
static void
137
handle_sighup(__attribute__ ((unused)) SIGNAL_ARGS)
18✔
138
{
139
  int     save_errno = errno;
18✔
140
  got_sighup = true;
18✔
141
  if(worker_state->shared_latch)
18✔
142
    SetLatch(worker_state->shared_latch);
3✔
143
  errno = save_errno;
18✔
144
}
18✔
145

146
/*
147
 *We have to handle sigusr1 explicitly because the default
148
 *procsignal_sigusr1_handler doesn't `SetLatch`, this would prevent
149
 *DROP DATATABASE from finishing since our worker would be sleeping and not reach
150
 *CHECK_FOR_INTERRUPTS()
151
 */
152
static void
153
handle_sigusr1(SIGNAL_ARGS)
4✔
154
{
155
  int     save_errno = errno;
4✔
156
  if(worker_state->shared_latch)
4✔
157
    SetLatch(worker_state->shared_latch);
4✔
158
  errno = save_errno;
4✔
159
  procsignal_sigusr1_handler(postgres_signal_arg);
4✔
160
}
4✔
161

162
static void publish_state(WorkerStatus s) {
38✔
163
  pg_atomic_write_u32(&worker_state->status, (uint32)s);
38✔
164
  pg_write_barrier();
38✔
165
  ConditionVariableBroadcast(&worker_state->cv);
38✔
166
}
38✔
167

168
static void
169
net_on_exit(__attribute__ ((unused)) int code, __attribute__ ((unused)) Datum arg){
19✔
170
  worker_should_restart = false;
19✔
171
  pg_atomic_write_u32(&worker_state->should_wake, 1); // ensure the remaining work will continue since we'll restart
19✔
172

173
  worker_state->shared_latch = NULL;
19✔
174

175
  ev_monitor_close(worker_state);
19✔
176

177
  curl_multi_cleanup(worker_state->curl_mhandle);
19✔
178
  curl_global_cleanup();
19✔
179
}
19✔
180

181
// wait according to the wait type while ensuring interrupts are processed while waiting
182
static void wait_while_processing_interrupts(WorkerWait ww, bool *should_restart){
107✔
183
  switch(ww){
107✔
184
    case WORKER_WAIT_NO_TIMEOUT:
28✔
185
      WaitLatch(worker_state->shared_latch,
28✔
186
                WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
187
                no_timeout,
188
                PG_WAIT_EXTENSION);
189
      ResetLatch(worker_state->shared_latch);
28✔
190
      break;
28✔
191
    case WORKER_WAIT_ONE_SECOND:
79✔
192
      WaitLatch(worker_state->shared_latch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 1000, PG_WAIT_EXTENSION);
79✔
193
      ResetLatch(worker_state->shared_latch);
79✔
194
      break;
79✔
195
  }
196

197
  CHECK_FOR_INTERRUPTS();
107✔
198

199
  if (got_sighup) {
107✔
200
    got_sighup = false;
3✔
201
    ProcessConfigFile(PGC_SIGHUP);
3✔
202
  }
203

204
  if (pg_atomic_exchange_u32(&worker_state->got_restart, 0)){
107✔
205
    *should_restart = true;
19✔
206
  }
207
}
107✔
208

209
static bool is_extension_locked(Oid ext_table_oids[static total_extension_tables]){
83✔
210
  Oid net_oid = get_namespace_oid("net", true);
83✔
211

212
  if(!OidIsValid(net_oid)){
83✔
213
    return false;
214
  }
215

216
  Oid queue_oid = get_relname_relid("http_request_queue", net_oid);
80✔
217
  Oid resp_oid = get_relname_relid("_http_response", net_oid);
80✔
218

219
  bool is_locked = ConditionalLockRelationOid(queue_oid, AccessShareLock) && ConditionalLockRelationOid(resp_oid, AccessShareLock);
80✔
220

221
  if (is_locked) {
79✔
222
    ext_table_oids[0] = queue_oid;
79✔
223
    ext_table_oids[1] = resp_oid;
79✔
224
  }
225

226
  return is_locked;
227
}
228

229
static void unlock_extension(Oid ext_table_oids[static total_extension_tables]){
79✔
230
  UnlockRelationOid(ext_table_oids[0], AccessShareLock);
79✔
231
  UnlockRelationOid(ext_table_oids[1], AccessShareLock);
79✔
232
}
79✔
233

234
void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
19✔
235
  worker_state->shared_latch = &MyProc->procLatch;
19✔
236
  on_proc_exit(net_on_exit, 0);
19✔
237

238
  BackgroundWorkerUnblockSignals();
19✔
239
  pqsignal(SIGTERM, handle_sigterm);
19✔
240
  pqsignal(SIGHUP, handle_sighup);
19✔
241
  pqsignal(SIGUSR1, handle_sigusr1);
19✔
242

243
  BackgroundWorkerInitializeConnection(guc_database_name, guc_username, 0);
19✔
244
  pgstat_report_appname("pg_net " EXTVERSION); // set appname for pg_stat_activity
19✔
245

246
  elog(INFO, "pg_net worker started with a config of: pg_net.ttl=%s, pg_net.batch_size=%d, pg_net.username=%s, pg_net.database_name=%s", guc_ttl, guc_batch_size, guc_username, guc_database_name);
19✔
247

248
  int curl_ret = curl_global_init(CURL_GLOBAL_ALL);
19✔
249
  if(curl_ret != CURLE_OK)
19✔
250
    ereport(ERROR, errmsg("curl_global_init() returned %s\n", curl_easy_strerror(curl_ret)));
×
251

252
  worker_state->epfd = event_monitor();
19✔
253

254
  if (worker_state->epfd < 0) {
19✔
255
    ereport(ERROR, errmsg("Failed to create event monitor file descriptor"));
×
256
  }
257

258
  worker_state->curl_mhandle = curl_multi_init();
19✔
259
  if(!worker_state->curl_mhandle)
19✔
260
    ereport(ERROR, errmsg("curl_multi_init()"));
×
261

262
  set_curl_mhandle(worker_state);
19✔
263

264
  publish_state(WS_RUNNING);
19✔
265

266
  do {
65✔
267

268
    uint32 expected = 1;
65✔
269
    if (!pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 0)){
65✔
270
      elog(DEBUG1, "pg_net worker waiting for wake");
28✔
271
      wait_while_processing_interrupts(WORKER_WAIT_NO_TIMEOUT, &worker_should_restart);
28✔
272
      continue;
28✔
273
    }
274

275
    uint64 requests_consumed = 0;
83✔
276
    uint64 expired_responses = 0;
83✔
277

278
    do {
83✔
279
      SetCurrentStatementStartTimestamp();
83✔
280
      StartTransactionCommand();
83✔
281
      PushActiveSnapshot(GetTransactionSnapshot());
83✔
282

283
      Oid ext_table_oids[total_extension_tables];
83✔
284

285
      if(!is_extension_locked(ext_table_oids)){
83✔
286
        elog(DEBUG1, "pg_net extension not loaded");
4✔
287
        PopActiveSnapshot();
4✔
288
        AbortCurrentTransaction();
4✔
289
        break;
4✔
290
      }
291

292
      SPI_connect();
79✔
293

294
      expired_responses = delete_expired_responses(guc_ttl, guc_batch_size);
79✔
295

296
      elog(DEBUG1, "Deleted "UINT64_FORMAT" expired rows", expired_responses);
79✔
297

298
      requests_consumed = consume_request_queue(guc_batch_size);
79✔
299

300
      elog(DEBUG1, "Consumed "UINT64_FORMAT" request rows", requests_consumed);
79✔
301

302
      if(requests_consumed > 0){
79✔
303
        CurlData *cdatas = palloc(mul_size(sizeof(CurlData), requests_consumed));
51✔
304

305
        // initialize curl handles
306
        for (size_t j = 0; j < requests_consumed; j++) {
359✔
307
          init_curl_handle(&cdatas[j], get_request_queue_row(SPI_tuptable->vals[j], SPI_tuptable->tupdesc));
308✔
308

309
          EREPORT_MULTI(
308✔
310
            curl_multi_add_handle(worker_state->curl_mhandle, cdatas[j].ez_handle)
311
          );
312
        }
313

314
        // start curl event loop
315
        int running_handles = 0;
51✔
316
        int maxevents = guc_batch_size + 1; // 1 extra for the timer
51✔
317
        event events[maxevents];
51✔
318

319
        do {
166✔
320
          int nfds = wait_event(worker_state->epfd, events, maxevents, curl_handle_event_timeout_ms);
166✔
321
          if (nfds < 0) {
166✔
322
            int save_errno = errno;
×
323
            if(save_errno == EINTR) { // can happen when the wait is interrupted, for example when running under GDB. Just continue in this case.
×
324
              elog(DEBUG1, "wait_event() got %s, continuing", strerror(save_errno));
×
325
              continue;
×
326
            }
327
            else {
328
              ereport(ERROR, errmsg("wait_event() failed: %s", strerror(save_errno)));
×
329
              break;
330
            }
331
          }
332

333
          for (int i = 0; i < nfds; i++) {
1,023✔
334
            if (is_timer(events[i])) {
857✔
335
              EREPORT_MULTI(
62✔
336
                curl_multi_socket_action(worker_state->curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles)
337
              );
338
            } else {
339
              int curl_event = get_curl_event(events[i]);
795✔
340
              int sockfd = get_socket_fd(events[i]);
795✔
341

342
              EREPORT_MULTI(
857✔
343
                curl_multi_socket_action(
344
                  worker_state->curl_mhandle,
345
                  sockfd,
346
                  curl_event,
347
                  &running_handles)
348
              );
349
            }
350
          }
351

352
          // insert finished responses
353
          CURLMsg *msg = NULL; int msgs_left=0;
166✔
354
          while ((msg = curl_multi_info_read(worker_state->curl_mhandle, &msgs_left))) {
640✔
355
            if (msg->msg == CURLMSG_DONE) {
308✔
356
              insert_response(msg->easy_handle, msg->data.result);
308✔
357
            } else {
358
              ereport(ERROR, errmsg("curl_multi_info_read(), CURLMsg=%d\n", msg->msg));
474✔
359
            }
360
          }
361

362
          elog(DEBUG1, "Pending curl running_handles: %d", running_handles);
166✔
363
        } while (running_handles > 0); // run while there are curl handles, some won't finish in a single iteration since they could be slow and waiting for a timeout
166✔
364

365
        // cleanup
366
        for(uint64 i = 0; i < requests_consumed; i++){
359✔
367
          EREPORT_MULTI(
308✔
368
            curl_multi_remove_handle(worker_state->curl_mhandle, cdatas[i].ez_handle)
369
          );
370

371
          curl_easy_cleanup(cdatas[i].ez_handle);
308✔
372

373
          pfree_curl_data(&cdatas[i]);
308✔
374
        }
375

376
        pfree(cdatas);
51✔
377
      }
378

379
      SPI_finish();
79✔
380

381
      unlock_extension(ext_table_oids);
79✔
382

383
      PopActiveSnapshot();
79✔
384
      CommitTransactionCommand();
79✔
385

386
      // slow down queue processing to avoid using too much CPU
387
      wait_while_processing_interrupts(WORKER_WAIT_ONE_SECOND, &worker_should_restart);
79✔
388

389
    } while (!worker_should_restart && (requests_consumed > 0 || expired_responses > 0));
79✔
390

391
  } while (!worker_should_restart);
65✔
392

393
  publish_state(WS_EXITED);
19✔
394

395
  // causing a failure on exit will make the postmaster process restart the bg worker
396
  proc_exit(EXIT_FAILURE);
19✔
397
}
398

399
static Size net_memsize(void) {
400
  return MAXALIGN(sizeof(WorkerState));
401
}
402

403
#if PG15_GTE
404
static void net_shmem_request(void) {
95✔
405
  if (prev_shmem_request_hook)
95✔
UNCOV
406
    prev_shmem_request_hook();
×
407

408
  RequestAddinShmemSpace(net_memsize());
95✔
409
}
95✔
410
#endif
411

412
static void net_shmem_startup(void) {
95✔
413
  if (prev_shmem_startup_hook)
95✔
UNCOV
414
    prev_shmem_startup_hook();
×
415

416
  bool found;
95✔
417

418
  LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
95✔
419

420
  worker_state = ShmemInitStruct("pg_net worker state", sizeof(WorkerState), &found);
95✔
421

422
  if (!found) {
95✔
423
    pg_atomic_init_u32(&worker_state->got_restart, 0);
95✔
424
    pg_atomic_init_u32(&worker_state->status, WS_NOT_YET);
95✔
425
    pg_atomic_init_u32(&worker_state->should_wake, 1);
95✔
426
    worker_state->shared_latch = NULL;
95✔
427

428
    ConditionVariableInit(&worker_state->cv);
95✔
429
    worker_state->epfd = 0;
95✔
430
    worker_state->curl_mhandle = NULL;
95✔
431
  }
432

433
  LWLockRelease(AddinShmemInitLock);
95✔
434
}
95✔
435

436
void _PG_init(void) {
95✔
437
  if (IsBinaryUpgrade) {
95✔
UNCOV
438
      return;
×
439
  }
440

441
  if (!process_shared_preload_libraries_in_progress) {
95✔
UNCOV
442
      ereport(ERROR, errmsg("pg_net is not in shared_preload_libraries"),
×
443
              errhint("Add pg_net to the shared_preload_libraries "
444
                      "configuration variable in postgresql.conf."));
445
  }
446

447
  RegisterBackgroundWorker(&(BackgroundWorker){
95✔
448
    .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
449
    .bgw_start_time = BgWorkerStart_RecoveryFinished,
450
    .bgw_library_name = "pg_net",
451
    .bgw_function_name = "pg_net_worker",
452
    .bgw_name = "pg_net " EXTVERSION " worker",
453
    .bgw_restart_time = net_worker_restart_time_sec,
454
  });
455

456
#if PG15_GTE
457
  prev_shmem_request_hook = shmem_request_hook;
95✔
458
  shmem_request_hook = net_shmem_request;
95✔
459
#else
460
  RequestAddinShmemSpace(net_memsize());
461
#endif
462

463
  prev_shmem_startup_hook = shmem_startup_hook;
95✔
464
  shmem_startup_hook = net_shmem_startup;
95✔
465

466
  DefineCustomStringVariable("pg_net.ttl",
95✔
467
                 "time to live for request/response rows",
468
                 "should be a valid interval type",
469
                 &guc_ttl,
470
                 "6 hours",
471
                 PGC_SIGHUP, 0,
472
                 NULL, NULL, NULL);
473

474
  DefineCustomIntVariable("pg_net.batch_size",
95✔
475
                 "number of requests executed in one iteration of the background worker",
476
                 NULL,
477
                 &guc_batch_size,
478
                 200,
479
                 0, PG_INT16_MAX,
480
                 PGC_SIGHUP, 0,
481
                 NULL, NULL, NULL);
482

483
  DefineCustomStringVariable("pg_net.database_name",
95✔
484
                "Database where the worker will connect to",
485
                NULL,
486
                &guc_database_name,
487
                "postgres",
488
                PGC_SU_BACKEND, 0,
489
                NULL, NULL, NULL);
490

491
  DefineCustomStringVariable("pg_net.username",
95✔
492
                "Connection user for the worker",
493
                NULL,
494
                &guc_username,
495
                NULL,
496
                PGC_SU_BACKEND, 0,
497
                NULL, NULL, NULL);
498
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc