• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_net / 16302681875

15 Jul 2025 07:37PM UTC coverage: 92.07% (-0.3%) from 92.322%
16302681875

push

github

steve-chavez
bump to 0.19.2

476 of 517 relevant lines covered (92.07%)

209.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.44
/src/worker.c
1
#include <unistd.h>
2
#include <errno.h>
3
#include <string.h>
4
#include <inttypes.h>
5

6
#define PG_PRELUDE_IMPL
7
#include "pg_prelude.h"
8
#include "curl_prelude.h"
9
#include "util.h"
10
#include "errors.h"
11
#include "core.h"
12
#include "event.h"
13

14
#define MIN_LIBCURL_VERSION_NUM 0x075300 // This is the 7.83.0 version in hex as defined in curl/curlver.h
15
#define REQUIRED_LIBCURL_ERR_MSG "libcurl >= 7.83.0 is required, we use the curl_easy_nextheader() function added in this version"
16
_Static_assert(LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG); // test for older libcurl versions that don't even have LIBCURL_VERSION_NUM defined (e.g. libcurl 6.5).
17
_Static_assert(LIBCURL_VERSION_NUM >= MIN_LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG);
18

19
PG_MODULE_MAGIC;
93✔
20

21
typedef enum {
22
  WORKER_WAIT_NO_TIMEOUT,
23
  WORKER_WAIT_ONE_SECOND,
24
} WorkerWait;
25

26
static WorkerState *worker_state = NULL;
27

28
static const int                curl_handle_event_timeout_ms = 1000;
29
static const int                net_worker_restart_time_sec = 1;
30
static const long               no_timeout = -1L;
31
static bool                     wake_commit_cb_active = false;
32
static bool                     worker_should_restart = false;
33
static const size_t             total_extension_tables = 2;
34

35
static char*                    guc_ttl;
36
static int                      guc_batch_size;
37
static char*                    guc_database_name;
38
static char*                    guc_username;
39
static MemoryContext            CurlMemContext = NULL;
40
static shmem_startup_hook_type  prev_shmem_startup_hook = NULL;
41
static volatile sig_atomic_t    got_sighup = false;
42

43
void _PG_init(void);
44

45
#if PG_VERSION_NUM >= 180000
46
  PGDLLEXPORT pg_noreturn void pg_net_worker(Datum main_arg);
47
#else
48
  PGDLLEXPORT void pg_net_worker(Datum main_arg) pg_attribute_noreturn();
49
#endif
50

51
PG_FUNCTION_INFO_V1(worker_restart);
65✔
52
Datum worker_restart(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
17✔
53
  bool result = DatumGetBool(DirectFunctionCall1(pg_reload_conf, (Datum) NULL)); // reload the config
17✔
54
  pg_atomic_write_u32(&worker_state->got_restart, 1);
17✔
55
  pg_write_barrier();
17✔
56
  if (worker_state)
17✔
57
    SetLatch(&worker_state->latch);
17✔
58
  PG_RETURN_BOOL(result); // TODO is not necessary to return a bool here, but we do it to maintain backward compatibility
17✔
59
}
60

61
static void wait_until_state(WorkerState *ws, WorkerStatus expected_status){
18✔
62
  if (pg_atomic_read_u32(&ws->status) == expected_status) // fast return without sleeping, in case condition is fulfilled
18✔
63
    return;
64

65
  ConditionVariablePrepareToSleep(&ws->cv);
15✔
66
  while (pg_atomic_read_u32(&ws->status) != expected_status) {
30✔
67
    ConditionVariableSleep(&ws->cv, PG_WAIT_EXTENSION);
15✔
68
  }
69
  ConditionVariableCancelSleep();
15✔
70
}
71

72
PG_FUNCTION_INFO_V1(wait_until_running);
66✔
73
Datum wait_until_running(__attribute__ ((unused)) PG_FUNCTION_ARGS){
18✔
74
  wait_until_state(worker_state, WS_RUNNING);
18✔
75

76
  PG_RETURN_VOID();
18✔
77
}
78

79
// only wake at commit time to prevent excessive and unnecessary wakes.
80
// e.g only one wake when doing `select net.http_get('http://localhost:8080/pathological?status=200') from generate_series(1,100000);`
81
static void wake_at_commit(XactEvent event, __attribute__ ((unused)) void *arg){
293✔
82
  elog(DEBUG2, "pg_net xact callback received: %s", xact_event_name(event));
293✔
83

84
  switch(event){
293✔
85
    case XACT_EVENT_COMMIT:
121✔
86
    case XACT_EVENT_PARALLEL_COMMIT:
87
      if(wake_commit_cb_active){
121✔
88
        uint32 expected = 0;
47✔
89
        bool success = pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 1);
47✔
90
        pg_write_barrier();
47✔
91

92
        if (success) // only wake the worker on first put, so if many concurrent wakes come we only wake once
47✔
93
          SetLatch(&worker_state->latch);
25✔
94

95
        wake_commit_cb_active = false;
47✔
96
      }
97
      break;
98
    // TODO: `PREPARE TRANSACTION 'xx';` and `COMMIT PREPARED TRANSACTION 'xx';` do not wake the worker automatically, they require a manual `net.wake()`
99
    // These are disabled by default and rarely used, see `max_prepared_transactions` https://www.postgresql.org/docs/17/runtime-config-resource.html#GUC-MAX-PREPARED-TRANSACTIONS
100
    case XACT_EVENT_PREPARE:
51✔
101
    // abort the callback on rollback
102
    case XACT_EVENT_ABORT:
103
    case XACT_EVENT_PARALLEL_ABORT:
104
      wake_commit_cb_active = false;
51✔
105
      break;
51✔
106
    default:
107
      break;
108
  }
109
}
293✔
110

111
PG_FUNCTION_INFO_V1(wake);
98✔
112
Datum wake(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
427✔
113
  if (!wake_commit_cb_active) { // register only one callback per transaction
427✔
114
    RegisterXactCallback(wake_at_commit, NULL);
53✔
115
    wake_commit_cb_active = true;
53✔
116
  }
117

118
  PG_RETURN_VOID();
427✔
119
}
120

121
static void
122
handle_sigterm(__attribute__ ((unused)) SIGNAL_ARGS)
2✔
123
{
124
  int save_errno = errno;
2✔
125
  pg_atomic_write_u32(&worker_state->got_restart, 1);
2✔
126
  pg_write_barrier();
2✔
127
  if (worker_state)
2✔
128
    SetLatch(&worker_state->latch);
2✔
129
  errno = save_errno;
2✔
130
}
2✔
131

132
static void
133
handle_sighup(__attribute__ ((unused)) SIGNAL_ARGS)
18✔
134
{
135
  int     save_errno = errno;
18✔
136
  got_sighup = true;
18✔
137
  if (worker_state)
18✔
138
    SetLatch(&worker_state->latch);
18✔
139
  errno = save_errno;
18✔
140
}
18✔
141

142
static void publish_state(WorkerStatus s) {
38✔
143
  pg_atomic_write_u32(&worker_state->status, (uint32)s);
38✔
144
  pg_write_barrier();
38✔
145
  ConditionVariableBroadcast(&worker_state->cv);
38✔
146
}
38✔
147

148
static void
149
net_on_exit(__attribute__ ((unused)) int code, __attribute__ ((unused)) Datum arg){
19✔
150
  worker_should_restart = false;
19✔
151
  pg_atomic_write_u32(&worker_state->should_wake, 1); // ensure the remaining work will continue since we'll restart
19✔
152

153
  DisownLatch(&worker_state->latch);
19✔
154

155
  ev_monitor_close(worker_state);
19✔
156

157
  curl_multi_cleanup(worker_state->curl_mhandle);
19✔
158
  curl_global_cleanup();
19✔
159
}
19✔
160

161
// wait according to the wait type while ensuring interrupts are processed while waiting
162
static void wait_while_processing_interrupts(WorkerWait ww, bool *should_restart){
105✔
163
  switch(ww){
105✔
164
    case WORKER_WAIT_NO_TIMEOUT:
24✔
165
      WaitLatch(&worker_state->latch,
24✔
166
                WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
167
                no_timeout,
168
                PG_WAIT_EXTENSION);
169
      ResetLatch(&worker_state->latch);
24✔
170
      break;
24✔
171
    case WORKER_WAIT_ONE_SECOND:
81✔
172
      WaitLatch(&worker_state->latch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 1000, PG_WAIT_EXTENSION);
81✔
173
      ResetLatch(&worker_state->latch);
81✔
174
      break;
81✔
175
  }
176

177
  CHECK_FOR_INTERRUPTS();
105✔
178

179
  if (got_sighup) {
105✔
180
    got_sighup = false;
3✔
181
    ProcessConfigFile(PGC_SIGHUP);
3✔
182
  }
183

184
  if (pg_atomic_exchange_u32(&worker_state->got_restart, 0)){
105✔
185
    *should_restart = true;
19✔
186
  }
187
}
105✔
188

189
static bool is_extension_locked(Oid ext_table_oids[static total_extension_tables]){
87✔
190
  Oid net_oid = get_namespace_oid("net", true);
87✔
191

192
  if(!OidIsValid(net_oid)){
87✔
193
    return false;
194
  }
195

196
  Oid queue_oid = get_relname_relid("http_request_queue", net_oid);
83✔
197
  Oid resp_oid = get_relname_relid("_http_response", net_oid);
83✔
198

199
  bool is_locked = ConditionalLockRelationOid(queue_oid, AccessShareLock) && ConditionalLockRelationOid(resp_oid, AccessShareLock);
83✔
200

201
  if (is_locked) {
81✔
202
    ext_table_oids[0] = queue_oid;
81✔
203
    ext_table_oids[1] = resp_oid;
81✔
204
  }
205

206
  return is_locked;
207
}
208

209
static void unlock_extension(Oid ext_table_oids[static total_extension_tables]){
81✔
210
  UnlockRelationOid(ext_table_oids[0], AccessShareLock);
81✔
211
  UnlockRelationOid(ext_table_oids[1], AccessShareLock);
81✔
212
}
81✔
213

214
void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
19✔
215
  on_proc_exit(net_on_exit, 0);
19✔
216

217
  OwnLatch(&worker_state->latch);
19✔
218

219
  BackgroundWorkerUnblockSignals();
19✔
220
  pqsignal(SIGTERM, handle_sigterm);
19✔
221
  pqsignal(SIGHUP, handle_sighup);
19✔
222
  pqsignal(SIGUSR1, procsignal_sigusr1_handler);
19✔
223

224
  BackgroundWorkerInitializeConnection(guc_database_name, guc_username, 0);
19✔
225
  pgstat_report_appname("pg_net " EXTVERSION); // set appname for pg_stat_activity
19✔
226

227
  elog(INFO, "pg_net worker started with a config of: pg_net.ttl=%s, pg_net.batch_size=%d, pg_net.username=%s, pg_net.database_name=%s", guc_ttl, guc_batch_size, guc_username, guc_database_name);
19✔
228

229
  int curl_ret = curl_global_init(CURL_GLOBAL_ALL);
19✔
230
  if(curl_ret != CURLE_OK)
19✔
231
    ereport(ERROR, errmsg("curl_global_init() returned %s\n", curl_easy_strerror(curl_ret)));
×
232

233
  worker_state->epfd = event_monitor();
19✔
234

235
  if (worker_state->epfd < 0) {
19✔
236
    ereport(ERROR, errmsg("Failed to create event monitor file descriptor"));
×
237
  }
238

239
  worker_state->curl_mhandle = curl_multi_init();
19✔
240
  if(!worker_state->curl_mhandle)
19✔
241
    ereport(ERROR, errmsg("curl_multi_init()"));
×
242

243
  set_curl_mhandle(worker_state);
19✔
244

245
  publish_state(WS_RUNNING);
19✔
246

247
  do {
65✔
248

249
    uint32 expected = 1;
65✔
250
    if (!pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 0)){
65✔
251
      elog(DEBUG1, "pg_net worker waiting for wake");
24✔
252
      wait_while_processing_interrupts(WORKER_WAIT_NO_TIMEOUT, &worker_should_restart);
24✔
253
      continue;
24✔
254
    }
255

256
    uint64 requests_consumed = 0;
87✔
257
    uint64 expired_responses = 0;
87✔
258

259
    do {
87✔
260
      SetCurrentStatementStartTimestamp();
87✔
261
      StartTransactionCommand();
87✔
262
      PushActiveSnapshot(GetTransactionSnapshot());
87✔
263

264
      Oid ext_table_oids[total_extension_tables];
87✔
265

266
      if(!is_extension_locked(ext_table_oids)){
87✔
267
        elog(DEBUG1, "pg_net extension not loaded");
6✔
268
        PopActiveSnapshot();
6✔
269
        AbortCurrentTransaction();
6✔
270
        break;
6✔
271
      }
272

273
      expired_responses = delete_expired_responses(guc_ttl, guc_batch_size);
81✔
274

275
      elog(DEBUG1, "Deleted "UINT64_FORMAT" expired rows", expired_responses);
81✔
276

277
      requests_consumed = consume_request_queue(worker_state->curl_mhandle, guc_batch_size, CurlMemContext);
81✔
278

279
      elog(DEBUG1, "Consumed "UINT64_FORMAT" request rows", requests_consumed);
81✔
280

281
      if(requests_consumed > 0){
81✔
282
        int running_handles = 0;
50✔
283
        int maxevents = guc_batch_size + 1; // 1 extra for the timer
50✔
284
        event events[maxevents];
50✔
285

286
        do {
169✔
287
          int nfds = wait_event(worker_state->epfd, events, maxevents, curl_handle_event_timeout_ms);
169✔
288
          if (nfds < 0) {
169✔
289
            int save_errno = errno;
×
290
            if(save_errno == EINTR) { // can happen when the wait is interrupted, for example when running under GDB. Just continue in this case.
×
291
              elog(DEBUG1, "wait_event() got %s, continuing", strerror(save_errno));
×
292
              continue;
×
293
            }
294
            else {
295
              ereport(ERROR, errmsg("wait_event() failed: %s", strerror(save_errno)));
×
296
              break;
297
            }
298
          }
299

300
          for (int i = 0; i < nfds; i++) {
1,025✔
301
            if (is_timer(events[i])) {
856✔
302
              EREPORT_MULTI(
61✔
303
                curl_multi_socket_action(worker_state->curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles)
304
              );
305
            } else {
306
              int curl_event = get_curl_event(events[i]);
795✔
307
              int sockfd = get_socket_fd(events[i]);
795✔
308

309
              EREPORT_MULTI(
795✔
310
                curl_multi_socket_action(
311
                  worker_state->curl_mhandle,
312
                  sockfd,
313
                  curl_event,
314
                  &running_handles)
315
              );
316
            }
317

318
            insert_curl_responses(worker_state, CurlMemContext);
856✔
319
          }
320

321
          elog(DEBUG1, "Pending curl running_handles: %d", running_handles);
169✔
322
        } while (running_handles > 0); // run while there are curl handles, some won't finish in a single iteration since they could be slow and waiting for a timeout
169✔
323
      }
324

325
      unlock_extension(ext_table_oids);
81✔
326

327
      PopActiveSnapshot();
81✔
328
      CommitTransactionCommand();
81✔
329

330
      MemoryContextReset(CurlMemContext);
81✔
331

332
      // slow down queue processing to avoid using too much CPU
333
      wait_while_processing_interrupts(WORKER_WAIT_ONE_SECOND, &worker_should_restart);
81✔
334

335
    } while (!worker_should_restart && (requests_consumed > 0 || expired_responses > 0));
81✔
336

337
  } while (!worker_should_restart);
65✔
338

339
  publish_state(WS_EXITED);
19✔
340

341
  // causing a failure on exit will make the postmaster process restart the bg worker
342
  proc_exit(EXIT_FAILURE);
19✔
343
}
344

345
static void net_shmem_startup(void) {
93✔
346
  if (prev_shmem_startup_hook)
93✔
347
    prev_shmem_startup_hook();
×
348

349
  bool found;
93✔
350

351
  worker_state = ShmemInitStruct("pg_net worker state", sizeof(WorkerState), &found);
93✔
352

353
  if (!found) {
93✔
354
    pg_atomic_init_u32(&worker_state->got_restart, 0);
93✔
355
    pg_atomic_init_u32(&worker_state->status, WS_NOT_YET);
93✔
356
    pg_atomic_init_u32(&worker_state->should_wake, 1);
93✔
357
    InitSharedLatch(&worker_state->latch);
93✔
358

359
    ConditionVariableInit(&worker_state->cv);
93✔
360
    worker_state->epfd = 0;
93✔
361
    worker_state->curl_mhandle = NULL;
93✔
362
  }
363
}
93✔
364

365
void _PG_init(void) {
93✔
366
  if (IsBinaryUpgrade) {
93✔
367
      return;
×
368
  }
369

370
  if (!process_shared_preload_libraries_in_progress) {
93✔
371
      ereport(ERROR, errmsg("pg_net is not in shared_preload_libraries"),
×
372
              errhint("Add pg_net to the shared_preload_libraries "
373
                      "configuration variable in postgresql.conf."));
374
  }
375

376
  RegisterBackgroundWorker(&(BackgroundWorker){
93✔
377
    .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
378
    .bgw_start_time = BgWorkerStart_RecoveryFinished,
379
    .bgw_library_name = "pg_net",
380
    .bgw_function_name = "pg_net_worker",
381
    .bgw_name = "pg_net " EXTVERSION " worker",
382
    .bgw_restart_time = net_worker_restart_time_sec,
383
  });
384

385
  prev_shmem_startup_hook = shmem_startup_hook;
93✔
386
  shmem_startup_hook = net_shmem_startup;
93✔
387

388
  CurlMemContext = AllocSetContextCreate(TopMemoryContext,
93✔
389
                       "pg_net curl context",
390
                       ALLOCSET_DEFAULT_MINSIZE,
391
                       ALLOCSET_DEFAULT_INITSIZE,
392
                       ALLOCSET_DEFAULT_MAXSIZE);
393

394
  DefineCustomStringVariable("pg_net.ttl",
93✔
395
                 "time to live for request/response rows",
396
                 "should be a valid interval type",
397
                 &guc_ttl,
398
                 "6 hours",
399
                 PGC_SIGHUP, 0,
400
                 NULL, NULL, NULL);
401

402
  DefineCustomIntVariable("pg_net.batch_size",
93✔
403
                 "number of requests executed in one iteration of the background worker",
404
                 NULL,
405
                 &guc_batch_size,
406
                 200,
407
                 0, PG_INT16_MAX,
408
                 PGC_SIGHUP, 0,
409
                 NULL, NULL, NULL);
410

411
  DefineCustomStringVariable("pg_net.database_name",
93✔
412
                "Database where the worker will connect to",
413
                NULL,
414
                &guc_database_name,
415
                "postgres",
416
                PGC_SU_BACKEND, 0,
417
                NULL, NULL, NULL);
418

419
  DefineCustomStringVariable("pg_net.username",
93✔
420
                "Connection user for the worker",
421
                NULL,
422
                &guc_username,
423
                NULL,
424
                PGC_SU_BACKEND, 0,
425
                NULL, NULL, NULL);
426
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc