• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_net / 16633900030

30 Jul 2025 09:13PM UTC coverage: 92.176% (+0.1%) from 92.07%
16633900030

push

github

steve-chavez
bump to 0.19.4

483 of 524 relevant lines covered (92.18%)

207.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.63
/src/worker.c
1
#include <unistd.h>
2
#include <errno.h>
3
#include <string.h>
4
#include <inttypes.h>
5

6
#define PG_PRELUDE_IMPL
7
#include "pg_prelude.h"
8
#include "curl_prelude.h"
9
#include "util.h"
10
#include "errors.h"
11
#include "core.h"
12
#include "event.h"
13

14
#define MIN_LIBCURL_VERSION_NUM 0x075300 // This is the 7.83.0 version in hex as defined in curl/curlver.h
15
#define REQUIRED_LIBCURL_ERR_MSG "libcurl >= 7.83.0 is required, we use the curl_easy_nextheader() function added in this version"
16
_Static_assert(LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG); // test for older libcurl versions that don't even have LIBCURL_VERSION_NUM defined (e.g. libcurl 6.5).
17
_Static_assert(LIBCURL_VERSION_NUM >= MIN_LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG);
18

19
PG_MODULE_MAGIC;
95✔
20

21
typedef enum {
22
  WORKER_WAIT_NO_TIMEOUT,
23
  WORKER_WAIT_ONE_SECOND,
24
} WorkerWait;
25

26
static WorkerState *worker_state = NULL;
27

28
static const int                curl_handle_event_timeout_ms = 1000;
29
static const int                net_worker_restart_time_sec = 1;
30
static const long               no_timeout = -1L;
31
static bool                     wake_commit_cb_active = false;
32
static bool                     worker_should_restart = false;
33
static const size_t             total_extension_tables = 2;
34

35
static char*                    guc_ttl;
36
static int                      guc_batch_size;
37
static char*                    guc_database_name;
38
static char*                    guc_username;
39
static MemoryContext            CurlMemContext = NULL;
40
static shmem_startup_hook_type  prev_shmem_startup_hook = NULL;
41
static volatile sig_atomic_t    got_sighup = false;
42

43
void _PG_init(void);
44

45
#if PG_VERSION_NUM >= 180000
46
  PGDLLEXPORT pg_noreturn void pg_net_worker(Datum main_arg);
47
#else
48
  PGDLLEXPORT void pg_net_worker(Datum main_arg) pg_attribute_noreturn();
49
#endif
50

51
PG_FUNCTION_INFO_V1(worker_restart);
66✔
52
Datum worker_restart(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
17✔
53
  bool result = DatumGetBool(DirectFunctionCall1(pg_reload_conf, (Datum) NULL)); // reload the config
17✔
54
  pg_atomic_write_u32(&worker_state->got_restart, 1);
17✔
55
  pg_write_barrier();
17✔
56
  if (worker_state)
17✔
57
    SetLatch(&worker_state->latch);
17✔
58
  PG_RETURN_BOOL(result); // TODO is not necessary to return a bool here, but we do it to maintain backward compatibility
17✔
59
}
60

61
static void wait_until_state(WorkerState *ws, WorkerStatus expected_status){
18✔
62
  if (pg_atomic_read_u32(&ws->status) == expected_status) // fast return without sleeping, in case condition is fulfilled
18✔
63
    return;
64

65
  ConditionVariablePrepareToSleep(&ws->cv);
16✔
66
  while (pg_atomic_read_u32(&ws->status) != expected_status) {
32✔
67
    ConditionVariableSleep(&ws->cv, PG_WAIT_EXTENSION);
16✔
68
  }
69
  ConditionVariableCancelSleep();
16✔
70
}
71

72
PG_FUNCTION_INFO_V1(wait_until_running);
67✔
73
Datum wait_until_running(__attribute__ ((unused)) PG_FUNCTION_ARGS){
18✔
74
  wait_until_state(worker_state, WS_RUNNING);
18✔
75

76
  PG_RETURN_VOID();
18✔
77
}
78

79
// only wake at commit time to prevent excessive and unnecessary wakes.
80
// e.g only one wake when doing `select net.http_get('http://localhost:8080/pathological?status=200') from generate_series(1,100000);`
81
static void wake_at_commit(XactEvent event, __attribute__ ((unused)) void *arg){
298✔
82
  elog(DEBUG2, "pg_net xact callback received: %s", xact_event_name(event));
298✔
83

84
  switch(event){
298✔
85
    case XACT_EVENT_COMMIT:
123✔
86
    case XACT_EVENT_PARALLEL_COMMIT:
87
      if(wake_commit_cb_active){
123✔
88
        uint32 expected = 0;
48✔
89
        bool success = pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 1);
48✔
90
        pg_write_barrier();
48✔
91

92
        if (success) // only wake the worker on first put, so if many concurrent wakes come we only wake once
48✔
93
          SetLatch(&worker_state->latch);
22✔
94

95
        wake_commit_cb_active = false;
48✔
96
      }
97
      break;
98
    // TODO: `PREPARE TRANSACTION 'xx';` and `COMMIT PREPARED TRANSACTION 'xx';` do not wake the worker automatically, they require a manual `net.wake()`
99
    // These are disabled by default and rarely used, see `max_prepared_transactions` https://www.postgresql.org/docs/17/runtime-config-resource.html#GUC-MAX-PREPARED-TRANSACTIONS
100
    case XACT_EVENT_PREPARE:
52✔
101
    // abort the callback on rollback
102
    case XACT_EVENT_ABORT:
103
    case XACT_EVENT_PARALLEL_ABORT:
104
      wake_commit_cb_active = false;
52✔
105
      break;
52✔
106
    default:
107
      break;
108
  }
109
}
298✔
110

111
PG_FUNCTION_INFO_V1(wake);
100✔
112
Datum wake(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
428✔
113
  if (!wake_commit_cb_active) { // register only one callback per transaction
428✔
114
    RegisterXactCallback(wake_at_commit, NULL);
54✔
115
    wake_commit_cb_active = true;
54✔
116
  }
117

118
  PG_RETURN_VOID();
428✔
119
}
120

121
static void
122
handle_sigterm(__attribute__ ((unused)) SIGNAL_ARGS)
2✔
123
{
124
  int save_errno = errno;
2✔
125
  pg_atomic_write_u32(&worker_state->got_restart, 1);
2✔
126
  pg_write_barrier();
2✔
127
  if (worker_state)
2✔
128
    SetLatch(&worker_state->latch);
2✔
129
  errno = save_errno;
2✔
130
}
2✔
131

132
static void
133
handle_sighup(__attribute__ ((unused)) SIGNAL_ARGS)
18✔
134
{
135
  int     save_errno = errno;
18✔
136
  got_sighup = true;
18✔
137
  if (worker_state)
18✔
138
    SetLatch(&worker_state->latch);
18✔
139
  errno = save_errno;
18✔
140
}
18✔
141

142
/*
143
 *We have to handle sigusr1 explicitly because the default
144
 *procsignal_sigusr1_handler doesn't `SetLatch`, this would prevent
145
 *DROP DATATABASE from finishing since our worker would be sleeping and not reach
146
 *CHECK_FOR_INTERRUPTS()
147
 */
148
static void
149
handle_sigusr1(SIGNAL_ARGS)
3✔
150
{
151
  int     save_errno = errno;
3✔
152
  if (worker_state)
3✔
153
    SetLatch(&worker_state->latch);
3✔
154
  errno = save_errno;
3✔
155
  procsignal_sigusr1_handler(postgres_signal_arg);
3✔
156
}
3✔
157

158
static void publish_state(WorkerStatus s) {
38✔
159
  pg_atomic_write_u32(&worker_state->status, (uint32)s);
38✔
160
  pg_write_barrier();
38✔
161
  ConditionVariableBroadcast(&worker_state->cv);
38✔
162
}
38✔
163

164
static void
165
net_on_exit(__attribute__ ((unused)) int code, __attribute__ ((unused)) Datum arg){
19✔
166
  worker_should_restart = false;
19✔
167
  pg_atomic_write_u32(&worker_state->should_wake, 1); // ensure the remaining work will continue since we'll restart
19✔
168

169
  DisownLatch(&worker_state->latch);
19✔
170

171
  ev_monitor_close(worker_state);
19✔
172

173
  curl_multi_cleanup(worker_state->curl_mhandle);
19✔
174
  curl_global_cleanup();
19✔
175
}
19✔
176

177
// wait according to the wait type while ensuring interrupts are processed while waiting
178
static void wait_while_processing_interrupts(WorkerWait ww, bool *should_restart){
106✔
179
  switch(ww){
106✔
180
    case WORKER_WAIT_NO_TIMEOUT:
24✔
181
      WaitLatch(&worker_state->latch,
24✔
182
                WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
183
                no_timeout,
184
                PG_WAIT_EXTENSION);
185
      ResetLatch(&worker_state->latch);
24✔
186
      break;
24✔
187
    case WORKER_WAIT_ONE_SECOND:
82✔
188
      WaitLatch(&worker_state->latch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 1000, PG_WAIT_EXTENSION);
82✔
189
      ResetLatch(&worker_state->latch);
82✔
190
      break;
82✔
191
  }
192

193
  CHECK_FOR_INTERRUPTS();
106✔
194

195
  if (got_sighup) {
106✔
196
    got_sighup = false;
3✔
197
    ProcessConfigFile(PGC_SIGHUP);
3✔
198
  }
199

200
  if (pg_atomic_exchange_u32(&worker_state->got_restart, 0)){
106✔
201
    *should_restart = true;
19✔
202
  }
203
}
106✔
204

205
static bool is_extension_locked(Oid ext_table_oids[static total_extension_tables]){
85✔
206
  Oid net_oid = get_namespace_oid("net", true);
85✔
207

208
  if(!OidIsValid(net_oid)){
85✔
209
    return false;
210
  }
211

212
  Oid queue_oid = get_relname_relid("http_request_queue", net_oid);
82✔
213
  Oid resp_oid = get_relname_relid("_http_response", net_oid);
82✔
214

215
  bool is_locked = ConditionalLockRelationOid(queue_oid, AccessShareLock) && ConditionalLockRelationOid(resp_oid, AccessShareLock);
82✔
216

217
  if (is_locked) {
82✔
218
    ext_table_oids[0] = queue_oid;
82✔
219
    ext_table_oids[1] = resp_oid;
82✔
220
  }
221

222
  return is_locked;
223
}
224

225
static void unlock_extension(Oid ext_table_oids[static total_extension_tables]){
82✔
226
  UnlockRelationOid(ext_table_oids[0], AccessShareLock);
82✔
227
  UnlockRelationOid(ext_table_oids[1], AccessShareLock);
82✔
228
}
82✔
229

230
void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
19✔
231
  on_proc_exit(net_on_exit, 0);
19✔
232

233
  OwnLatch(&worker_state->latch);
19✔
234

235
  BackgroundWorkerUnblockSignals();
19✔
236
  pqsignal(SIGTERM, handle_sigterm);
19✔
237
  pqsignal(SIGHUP, handle_sighup);
19✔
238
  pqsignal(SIGUSR1, handle_sigusr1);
19✔
239

240
  BackgroundWorkerInitializeConnection(guc_database_name, guc_username, 0);
19✔
241
  pgstat_report_appname("pg_net " EXTVERSION); // set appname for pg_stat_activity
19✔
242

243
  elog(INFO, "pg_net worker started with a config of: pg_net.ttl=%s, pg_net.batch_size=%d, pg_net.username=%s, pg_net.database_name=%s", guc_ttl, guc_batch_size, guc_username, guc_database_name);
19✔
244

245
  int curl_ret = curl_global_init(CURL_GLOBAL_ALL);
19✔
246
  if(curl_ret != CURLE_OK)
19✔
247
    ereport(ERROR, errmsg("curl_global_init() returned %s\n", curl_easy_strerror(curl_ret)));
×
248

249
  worker_state->epfd = event_monitor();
19✔
250

251
  if (worker_state->epfd < 0) {
19✔
252
    ereport(ERROR, errmsg("Failed to create event monitor file descriptor"));
×
253
  }
254

255
  worker_state->curl_mhandle = curl_multi_init();
19✔
256
  if(!worker_state->curl_mhandle)
19✔
257
    ereport(ERROR, errmsg("curl_multi_init()"));
×
258

259
  set_curl_mhandle(worker_state);
19✔
260

261
  publish_state(WS_RUNNING);
19✔
262

263
  do {
61✔
264

265
    uint32 expected = 1;
61✔
266
    if (!pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 0)){
61✔
267
      elog(DEBUG1, "pg_net worker waiting for wake");
24✔
268
      wait_while_processing_interrupts(WORKER_WAIT_NO_TIMEOUT, &worker_should_restart);
24✔
269
      continue;
24✔
270
    }
271

272
    uint64 requests_consumed = 0;
85✔
273
    uint64 expired_responses = 0;
85✔
274

275
    do {
85✔
276
      SetCurrentStatementStartTimestamp();
85✔
277
      StartTransactionCommand();
85✔
278
      PushActiveSnapshot(GetTransactionSnapshot());
85✔
279

280
      Oid ext_table_oids[total_extension_tables];
85✔
281

282
      if(!is_extension_locked(ext_table_oids)){
85✔
283
        elog(DEBUG1, "pg_net extension not loaded");
3✔
284
        PopActiveSnapshot();
3✔
285
        AbortCurrentTransaction();
3✔
286
        break;
3✔
287
      }
288

289
      expired_responses = delete_expired_responses(guc_ttl, guc_batch_size);
82✔
290

291
      elog(DEBUG1, "Deleted "UINT64_FORMAT" expired rows", expired_responses);
82✔
292

293
      requests_consumed = consume_request_queue(worker_state->curl_mhandle, guc_batch_size, CurlMemContext);
82✔
294

295
      elog(DEBUG1, "Consumed "UINT64_FORMAT" request rows", requests_consumed);
82✔
296

297
      if(requests_consumed > 0){
82✔
298
        int running_handles = 0;
51✔
299
        int maxevents = guc_batch_size + 1; // 1 extra for the timer
51✔
300
        event events[maxevents];
51✔
301

302
        do {
169✔
303
          int nfds = wait_event(worker_state->epfd, events, maxevents, curl_handle_event_timeout_ms);
169✔
304
          if (nfds < 0) {
169✔
305
            int save_errno = errno;
×
306
            if(save_errno == EINTR) { // can happen when the wait is interrupted, for example when running under GDB. Just continue in this case.
×
307
              elog(DEBUG1, "wait_event() got %s, continuing", strerror(save_errno));
×
308
              continue;
×
309
            }
310
            else {
311
              ereport(ERROR, errmsg("wait_event() failed: %s", strerror(save_errno)));
×
312
              break;
313
            }
314
          }
315

316
          for (int i = 0; i < nfds; i++) {
1,028✔
317
            if (is_timer(events[i])) {
859✔
318
              EREPORT_MULTI(
63✔
319
                curl_multi_socket_action(worker_state->curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles)
320
              );
321
            } else {
322
              int curl_event = get_curl_event(events[i]);
796✔
323
              int sockfd = get_socket_fd(events[i]);
796✔
324

325
              EREPORT_MULTI(
796✔
326
                curl_multi_socket_action(
327
                  worker_state->curl_mhandle,
328
                  sockfd,
329
                  curl_event,
330
                  &running_handles)
331
              );
332
            }
333

334
            insert_curl_responses(worker_state, CurlMemContext);
859✔
335
          }
336

337
          elog(DEBUG1, "Pending curl running_handles: %d", running_handles);
169✔
338
        } while (running_handles > 0); // run while there are curl handles, some won't finish in a single iteration since they could be slow and waiting for a timeout
169✔
339
      }
340

341
      unlock_extension(ext_table_oids);
82✔
342

343
      PopActiveSnapshot();
82✔
344
      CommitTransactionCommand();
82✔
345

346
      MemoryContextReset(CurlMemContext);
82✔
347

348
      // slow down queue processing to avoid using too much CPU
349
      wait_while_processing_interrupts(WORKER_WAIT_ONE_SECOND, &worker_should_restart);
82✔
350

351
    } while (!worker_should_restart && (requests_consumed > 0 || expired_responses > 0));
82✔
352

353
  } while (!worker_should_restart);
61✔
354

355
  publish_state(WS_EXITED);
19✔
356

357
  // causing a failure on exit will make the postmaster process restart the bg worker
358
  proc_exit(EXIT_FAILURE);
19✔
359
}
360

361
static void net_shmem_startup(void) {
95✔
362
  if (prev_shmem_startup_hook)
95✔
363
    prev_shmem_startup_hook();
×
364

365
  bool found;
95✔
366

367
  worker_state = ShmemInitStruct("pg_net worker state", sizeof(WorkerState), &found);
95✔
368

369
  if (!found) {
95✔
370
    pg_atomic_init_u32(&worker_state->got_restart, 0);
95✔
371
    pg_atomic_init_u32(&worker_state->status, WS_NOT_YET);
95✔
372
    pg_atomic_init_u32(&worker_state->should_wake, 1);
95✔
373
    InitSharedLatch(&worker_state->latch);
95✔
374

375
    ConditionVariableInit(&worker_state->cv);
95✔
376
    worker_state->epfd = 0;
95✔
377
    worker_state->curl_mhandle = NULL;
95✔
378
  }
379
}
95✔
380

381
void _PG_init(void) {
95✔
382
  if (IsBinaryUpgrade) {
95✔
383
      return;
×
384
  }
385

386
  if (!process_shared_preload_libraries_in_progress) {
95✔
387
      ereport(ERROR, errmsg("pg_net is not in shared_preload_libraries"),
×
388
              errhint("Add pg_net to the shared_preload_libraries "
389
                      "configuration variable in postgresql.conf."));
390
  }
391

392
  RegisterBackgroundWorker(&(BackgroundWorker){
95✔
393
    .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
394
    .bgw_start_time = BgWorkerStart_RecoveryFinished,
395
    .bgw_library_name = "pg_net",
396
    .bgw_function_name = "pg_net_worker",
397
    .bgw_name = "pg_net " EXTVERSION " worker",
398
    .bgw_restart_time = net_worker_restart_time_sec,
399
  });
400

401
  prev_shmem_startup_hook = shmem_startup_hook;
95✔
402
  shmem_startup_hook = net_shmem_startup;
95✔
403

404
  CurlMemContext = AllocSetContextCreate(TopMemoryContext,
95✔
405
                       "pg_net curl context",
406
                       ALLOCSET_DEFAULT_MINSIZE,
407
                       ALLOCSET_DEFAULT_INITSIZE,
408
                       ALLOCSET_DEFAULT_MAXSIZE);
409

410
  DefineCustomStringVariable("pg_net.ttl",
95✔
411
                 "time to live for request/response rows",
412
                 "should be a valid interval type",
413
                 &guc_ttl,
414
                 "6 hours",
415
                 PGC_SIGHUP, 0,
416
                 NULL, NULL, NULL);
417

418
  DefineCustomIntVariable("pg_net.batch_size",
95✔
419
                 "number of requests executed in one iteration of the background worker",
420
                 NULL,
421
                 &guc_batch_size,
422
                 200,
423
                 0, PG_INT16_MAX,
424
                 PGC_SIGHUP, 0,
425
                 NULL, NULL, NULL);
426

427
  DefineCustomStringVariable("pg_net.database_name",
95✔
428
                "Database where the worker will connect to",
429
                NULL,
430
                &guc_database_name,
431
                "postgres",
432
                PGC_SU_BACKEND, 0,
433
                NULL, NULL, NULL);
434

435
  DefineCustomStringVariable("pg_net.username",
95✔
436
                "Connection user for the worker",
437
                NULL,
438
                &guc_username,
439
                NULL,
440
                PGC_SU_BACKEND, 0,
441
                NULL, NULL, NULL);
442
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc