• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_net / 16282592267

15 Jul 2025 02:31AM UTC coverage: 92.495%. Remained the same
16282592267

Pull #213

github

web-flow
Merge 251541a8b into a8c23a9b8
Pull Request #213: chore: compile and test on darwin

2 of 3 new or added lines in 1 file covered. (66.67%)

4 existing lines in 1 file now uncovered.

493 of 533 relevant lines covered (92.5%)

210.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.22
/src/worker.c
1
#include <unistd.h>
2
#include <errno.h>
3
#include <string.h>
4
#include <inttypes.h>
5

6
#define PG_PRELUDE_IMPL
7
#include "pg_prelude.h"
8
#include "curl_prelude.h"
9
#include "util.h"
10
#include "errors.h"
11
#include "core.h"
12
#include "event.h"
13

14
#define MIN_LIBCURL_VERSION_NUM 0x075300 // This is the 7.83.0 version in hex as defined in curl/curlver.h
15
#define REQUIRED_LIBCURL_ERR_MSG "libcurl >= 7.83.0 is required, we use the curl_easy_nextheader() function added in this version"
16
_Static_assert(LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG); // test for older libcurl versions that don't even have LIBCURL_VERSION_NUM defined (e.g. libcurl 6.5).
17
_Static_assert(LIBCURL_VERSION_NUM >= MIN_LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG);
18

19
PG_MODULE_MAGIC;
94✔
20

21
typedef enum {
22
  WORKER_WAIT_NO_TIMEOUT,
23
  WORKER_WAIT_ONE_SECOND,
24
} WorkerWait;
25

26
static WorkerState *worker_state = NULL;
27

28
static const int                curl_handle_event_timeout_ms = 1000;
29
static const int                net_worker_restart_time_sec = 1;
30
static const long               no_timeout = -1L;
31
static bool                     extension_locked = false;
32
static bool                     wake_commit_cb_active = false;
33
static bool                     worker_should_restart = false;
34

35
static char*                    guc_ttl;
36
static int                      guc_batch_size;
37
static char*                    guc_database_name;
38
static char*                    guc_username;
39
static MemoryContext            CurlMemContext = NULL;
40
static LockRelId                queue_table_lock;
41
static LockRelId                response_table_lock;
42
static shmem_startup_hook_type  prev_shmem_startup_hook = NULL;
43
static volatile sig_atomic_t    got_sighup = false;
44

45
void _PG_init(void);
46

47
#if PG_VERSION_NUM >= 180000
48
  PGDLLEXPORT pg_noreturn void pg_net_worker(Datum main_arg);
49
#else
50
  PGDLLEXPORT void pg_net_worker(Datum main_arg) pg_attribute_noreturn();
51
#endif
52

53
PG_FUNCTION_INFO_V1(worker_restart);
65✔
54
Datum worker_restart(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
17✔
55
  bool result = DatumGetBool(DirectFunctionCall1(pg_reload_conf, (Datum) NULL)); // reload the config
17✔
56
  pg_atomic_write_u32(&worker_state->got_restart, 1);
17✔
57
  pg_write_barrier();
58
  if (worker_state)
17✔
59
    SetLatch(&worker_state->latch);
17✔
60
  PG_RETURN_BOOL(result); // TODO is not necessary to return a bool here, but we do it to maintain backward compatibility
17✔
61
}
62

63
static void wait_until_state(WorkerState *ws, WorkerStatus expected_status){
18✔
64
  if (pg_atomic_read_u32(&ws->status) == expected_status) // fast return without sleeping, in case condition is fulfilled
18✔
65
    return;
66

67
  ConditionVariablePrepareToSleep(&ws->cv);
16✔
68
  while (pg_atomic_read_u32(&ws->status) != expected_status) {
32✔
69
    ConditionVariableSleep(&ws->cv, PG_WAIT_EXTENSION);
16✔
70
  }
71
  ConditionVariableCancelSleep();
16✔
72
}
73

74
PG_FUNCTION_INFO_V1(wait_until_running);
66✔
75
Datum wait_until_running(__attribute__ ((unused)) PG_FUNCTION_ARGS){
18✔
76
  wait_until_state(worker_state, WS_RUNNING);
18✔
77

78
  PG_RETURN_VOID();
18✔
79
}
80

81
// only wake at commit time to prevent excessive and unnecessary wakes.
82
// e.g only one wake when doing `select net.http_get('http://localhost:8080/pathological?status=200') from generate_series(1,100000);`
83
static void wake_at_commit(XactEvent event, __attribute__ ((unused)) void *arg){
290✔
84
  elog(DEBUG2, "pg_net xact callback received: %s", xact_event_name(event));
290✔
85

86
  switch(event){
290✔
87
    case XACT_EVENT_COMMIT:
120✔
88
    case XACT_EVENT_PARALLEL_COMMIT:
89
      if(wake_commit_cb_active){
120✔
90
        uint32 expected = 0;
47✔
91
        bool success = pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 1);
47✔
92
        pg_write_barrier();
47✔
93

94
        if (success) // only wake the worker on first put, so if many concurrent wakes come we only wake once
47✔
95
          SetLatch(&worker_state->latch);
42✔
96

97
        wake_commit_cb_active = false;
47✔
98
      }
99
      break;
100
    // TODO: `PREPARE TRANSACTION 'xx';` and `COMMIT PREPARED TRANSACTION 'xx';` do not wake the worker automatically, they require a manual `net.wake()`
101
    // These are disabled by default and rarely used, see `max_prepared_transactions` https://www.postgresql.org/docs/17/runtime-config-resource.html#GUC-MAX-PREPARED-TRANSACTIONS
102
    case XACT_EVENT_PREPARE:
50✔
103
    // abort the callback on rollback
104
    case XACT_EVENT_ABORT:
105
    case XACT_EVENT_PARALLEL_ABORT:
106
      wake_commit_cb_active = false;
50✔
107
      break;
50✔
108
    default:
109
      break;
110
  }
111
}
290✔
112

113
PG_FUNCTION_INFO_V1(wake);
98✔
114
Datum wake(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
427✔
115
  if (!wake_commit_cb_active) { // register only one callback per transaction
427✔
116
    RegisterXactCallback(wake_at_commit, NULL);
53✔
117
    wake_commit_cb_active = true;
53✔
118
  }
119

120
  PG_RETURN_VOID();
427✔
121
}
122

123
static void
124
handle_sigterm(__attribute__ ((unused)) SIGNAL_ARGS)
2✔
125
{
126
  int save_errno = errno;
2✔
127
  pg_atomic_write_u32(&worker_state->got_restart, 1);
2✔
128
  pg_write_barrier();
2✔
129
  if (worker_state)
2✔
130
    SetLatch(&worker_state->latch);
2✔
131
  errno = save_errno;
2✔
132
}
2✔
133

134
static void
135
handle_sighup(__attribute__ ((unused)) SIGNAL_ARGS)
18✔
136
{
137
  int     save_errno = errno;
18✔
138
  got_sighup = true;
18✔
139
  if (worker_state)
18✔
140
    SetLatch(&worker_state->latch);
18✔
141
  errno = save_errno;
18✔
142
}
18✔
143

144
static void publish_state(WorkerStatus s) {
38✔
145
  pg_atomic_write_u32(&worker_state->status, (uint32)s);
38✔
146
  pg_write_barrier();
38✔
147
  ConditionVariableBroadcast(&worker_state->cv);
38✔
148
}
38✔
149

150
static bool is_extension_loaded(){
109✔
151
  StartTransactionCommand();
109✔
152

153
  bool extension_exists = OidIsValid(get_extension_oid("pg_net", true));
109✔
154

155
  if(extension_exists && !extension_locked){
109✔
156
    Oid db_oid = get_database_oid(guc_database_name, false);
55✔
157

158
    Oid net_oid = get_namespace_oid("net", false);
55✔
159

160
    queue_table_lock.dbId = db_oid;
55✔
161
    queue_table_lock.relId = get_relname_relid("http_request_queue", net_oid);
55✔
162

163
    response_table_lock.dbId = db_oid;
55✔
164
    response_table_lock.relId = get_relname_relid("_http_response", net_oid);
55✔
165
  }
166

167
  CommitTransactionCommand();
109✔
168

169
  return extension_exists;
109✔
170
}
171

172
static inline void lock_extension(){
106✔
173
  if(!extension_locked){
106✔
174
    elog(DEBUG1, "pg_net worker locking extension tables");
55✔
175
    LockRelationIdForSession(&queue_table_lock, AccessShareLock);
55✔
176
    LockRelationIdForSession(&response_table_lock, AccessShareLock);
55✔
177
    extension_locked = true;
55✔
178
  }
179
}
106✔
180

181
static inline void unlock_extension(){
84✔
182
  if(extension_locked){
84✔
183
    elog(DEBUG1, "pg_net worker unlocking extension tables");
55✔
184
    UnlockRelationIdForSession(&queue_table_lock, AccessShareLock);
55✔
185
    UnlockRelationIdForSession(&response_table_lock, AccessShareLock);
55✔
186
    extension_locked = false;
55✔
187
  }
188
}
84✔
189

190
static void
191
net_on_exit(__attribute__ ((unused)) int code, __attribute__ ((unused)) Datum arg){
19✔
192
  worker_should_restart = false;
19✔
193
  pg_atomic_write_u32(&worker_state->should_wake, 1); // ensure the remaining work will continue since we'll restart
19✔
194

195
  // ensure unlock happens in case of error
196
  unlock_extension();
19✔
197

198
  DisownLatch(&worker_state->latch);
19✔
199

200
  ev_monitor_close(worker_state);
19✔
201

202
  curl_multi_cleanup(worker_state->curl_mhandle);
19✔
203
  curl_global_cleanup();
19✔
204
}
19✔
205

206
// wait according to the wait type while ensuring interrupts are processed while waiting
207
static void wait_while_processing_interrupts(WorkerWait ww, bool *should_restart){
161✔
208
  switch(ww){
161✔
209
    case WORKER_WAIT_NO_TIMEOUT:
46✔
210
      WaitLatch(&worker_state->latch,
46✔
211
                WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
212
                no_timeout,
213
                PG_WAIT_EXTENSION);
214
      ResetLatch(&worker_state->latch);
46✔
215
      break;
46✔
216
    case WORKER_WAIT_ONE_SECOND:
115✔
217
      WaitLatch(&worker_state->latch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 1000, PG_WAIT_EXTENSION);
115✔
218
      ResetLatch(&worker_state->latch);
115✔
219
      break;
115✔
220
  }
221

222
  CHECK_FOR_INTERRUPTS();
161✔
223

224
  if (got_sighup) {
161✔
225
    got_sighup = false;
2✔
226
    ProcessConfigFile(PGC_SIGHUP);
2✔
227
  }
228

229
  if (pg_atomic_exchange_u32(&worker_state->got_restart, 0)){
161✔
230
    *should_restart = true;
19✔
231
  }
232
}
161✔
233

234
void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
19✔
235
  on_proc_exit(net_on_exit, 0);
19✔
236

237
  OwnLatch(&worker_state->latch);
19✔
238

239
  BackgroundWorkerUnblockSignals();
19✔
240
  pqsignal(SIGTERM, handle_sigterm);
19✔
241
  pqsignal(SIGHUP, handle_sighup);
19✔
242
  pqsignal(SIGUSR1, procsignal_sigusr1_handler);
19✔
243

244
  BackgroundWorkerInitializeConnection(guc_database_name, guc_username, 0);
19✔
245
  pgstat_report_appname("pg_net " EXTVERSION); // set appname for pg_stat_activity
19✔
246

247
  elog(INFO, "pg_net worker started with a config of: pg_net.ttl=%s, pg_net.batch_size=%d, pg_net.username=%s, pg_net.database_name=%s", guc_ttl, guc_batch_size, guc_username, guc_database_name);
19✔
248

249
  int curl_ret = curl_global_init(CURL_GLOBAL_ALL);
19✔
250
  if(curl_ret != CURLE_OK)
19✔
251
    ereport(ERROR, errmsg("curl_global_init() returned %s\n", curl_easy_strerror(curl_ret)));
×
252

253
  worker_state->epfd = event_monitor();
19✔
254

255
  if (worker_state->epfd < 0) {
19✔
256
    ereport(ERROR, errmsg("Failed to create event monitor file descriptor"));
×
257
  }
258

259
  worker_state->curl_mhandle = curl_multi_init();
19✔
260
  if(!worker_state->curl_mhandle)
19✔
261
    ereport(ERROR, errmsg("curl_multi_init()"));
×
262

263
  set_curl_mhandle(worker_state);
19✔
264

265
  publish_state(WS_RUNNING);
19✔
266

267
  do {
109✔
268

269
    if(!is_extension_loaded()){
109✔
270
      elog(DEBUG1, "pg_net worker waiting for extension to load");
3✔
271
      wait_while_processing_interrupts(WORKER_WAIT_ONE_SECOND, &worker_should_restart);
3✔
272
      continue;
3✔
273
    }
274

275
    lock_extension(); // lock the extension immediately after it's loaded
106✔
276

277
    uint32 expected = 1;
106✔
278
    if (!pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 0)){
106✔
279
      unlock_extension();
46✔
280
      elog(DEBUG1, "pg_net worker waiting for wake");
46✔
281
      wait_while_processing_interrupts(WORKER_WAIT_NO_TIMEOUT, &worker_should_restart);
46✔
282
      continue;
46✔
283
    }
284

285
    uint64 requests_consumed = 0;
112✔
286
    uint64 expired_responses = 0;
112✔
287

288
    do {
112✔
289
      expired_responses = delete_expired_responses(guc_ttl, guc_batch_size);
112✔
290

291
      elog(DEBUG1, "Deleted "UINT64_FORMAT" expired rows", expired_responses);
112✔
292

293
      StartTransactionCommand();
112✔
294
      PushActiveSnapshot(GetTransactionSnapshot());
112✔
295

296
      requests_consumed = consume_request_queue(worker_state->curl_mhandle, guc_batch_size, CurlMemContext);
112✔
297

298
      elog(DEBUG1, "Consumed "UINT64_FORMAT" request rows", requests_consumed);
112✔
299

300
      if(requests_consumed > 0){
112✔
301
        int running_handles = 0;
54✔
302
        int maxevents = guc_batch_size + 1; // 1 extra for the timer
54✔
303
        event events[maxevents];
54✔
304

305
        do {
173✔
306
          int nfds = wait_event(worker_state->epfd, events, maxevents, curl_handle_event_timeout_ms);
173✔
307
          if (nfds < 0) {
173✔
308
            int save_errno = errno;
×
309
            if(save_errno == EINTR) { // can happen when the wait is interrupted, for example when running under GDB. Just continue in this case.
×
NEW
310
              continue;
×
311
            }
312
            else {
UNCOV
313
              ereport(ERROR, errmsg("wait_event() failed: %s", strerror(save_errno)));
×
314
              break;
315
            }
316
          }
317

318
          for (int i = 0; i < nfds; i++) {
1,035✔
319
            if (is_timer(events[i])) {
862✔
320
              EREPORT_MULTI(
64✔
321
                curl_multi_socket_action(worker_state->curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles)
322
              );
323
            } else {
324
              int curl_event = get_curl_event(events[i]);
798✔
325
              int sockfd = get_socket_fd(events[i]);
798✔
326

327
              EREPORT_MULTI(
798✔
328
                curl_multi_socket_action(
329
                  worker_state->curl_mhandle,
330
                  sockfd,
331
                  curl_event,
332
                  &running_handles)
333
              );
334
            }
335

336
            insert_curl_responses(worker_state, CurlMemContext);
862✔
337
          }
338

339
          elog(DEBUG1, "Pending curl running_handles: %d", running_handles);
173✔
340
        } while (running_handles > 0); // run while there are curl handles, some won't finish in a single iteration since they could be slow and waiting for a timeout
173✔
341
      }
342

343
      PopActiveSnapshot();
112✔
344
      CommitTransactionCommand();
112✔
345

346
      MemoryContextReset(CurlMemContext);
112✔
347

348
      // slow down queue processing to avoid using too much CPU
349
      wait_while_processing_interrupts(WORKER_WAIT_ONE_SECOND, &worker_should_restart);
112✔
350

351
    } while (!worker_should_restart && (requests_consumed > 0 || expired_responses > 0));
112✔
352

353
  } while (!worker_should_restart);
109✔
354

355
  unlock_extension();
19✔
356

357
  publish_state(WS_EXITED);
19✔
358

359
  // causing a failure on exit will make the postmaster process restart the bg worker
360
  proc_exit(EXIT_FAILURE);
19✔
361
}
362

363
static void net_shmem_startup(void) {
94✔
364
  if (prev_shmem_startup_hook)
94✔
UNCOV
365
    prev_shmem_startup_hook();
×
366

367
  bool found;
94✔
368

369
  worker_state = ShmemInitStruct("pg_net worker state", sizeof(WorkerState), &found);
94✔
370

371
  if (!found) {
94✔
372
    pg_atomic_init_u32(&worker_state->got_restart, 0);
94✔
373
    pg_atomic_init_u32(&worker_state->status, WS_NOT_YET);
94✔
374
    pg_atomic_init_u32(&worker_state->should_wake, 1);
94✔
375
    InitSharedLatch(&worker_state->latch);
94✔
376

377
    ConditionVariableInit(&worker_state->cv);
94✔
378
    worker_state->epfd = 0;
94✔
379
    worker_state->curl_mhandle = NULL;
94✔
380
  }
381
}
94✔
382

383
void _PG_init(void) {
94✔
384
  if (IsBinaryUpgrade) {
94✔
UNCOV
385
      return;
×
386
  }
387

388
  if (!process_shared_preload_libraries_in_progress) {
94✔
UNCOV
389
      ereport(ERROR, errmsg("pg_net is not in shared_preload_libraries"),
×
390
              errhint("Add pg_net to the shared_preload_libraries "
391
                      "configuration variable in postgresql.conf."));
392
  }
393

394
  RegisterBackgroundWorker(&(BackgroundWorker){
94✔
395
    .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
396
    .bgw_start_time = BgWorkerStart_RecoveryFinished,
397
    .bgw_library_name = "pg_net",
398
    .bgw_function_name = "pg_net_worker",
399
    .bgw_name = "pg_net " EXTVERSION " worker",
400
    .bgw_restart_time = net_worker_restart_time_sec,
401
  });
402

403
  prev_shmem_startup_hook = shmem_startup_hook;
94✔
404
  shmem_startup_hook = net_shmem_startup;
94✔
405

406
  CurlMemContext = AllocSetContextCreate(TopMemoryContext,
94✔
407
                       "pg_net curl context",
408
                       ALLOCSET_DEFAULT_MINSIZE,
409
                       ALLOCSET_DEFAULT_INITSIZE,
410
                       ALLOCSET_DEFAULT_MAXSIZE);
411

412
  DefineCustomStringVariable("pg_net.ttl",
94✔
413
                 "time to live for request/response rows",
414
                 "should be a valid interval type",
415
                 &guc_ttl,
416
                 "6 hours",
417
                 PGC_SIGHUP, 0,
418
                 NULL, NULL, NULL);
419

420
  DefineCustomIntVariable("pg_net.batch_size",
94✔
421
                 "number of requests executed in one iteration of the background worker",
422
                 NULL,
423
                 &guc_batch_size,
424
                 200,
425
                 0, PG_INT16_MAX,
426
                 PGC_SIGHUP, 0,
427
                 NULL, NULL, NULL);
428

429
  DefineCustomStringVariable("pg_net.database_name",
94✔
430
                "Database where the worker will connect to",
431
                NULL,
432
                &guc_database_name,
433
                "postgres",
434
                PGC_SU_BACKEND, 0,
435
                NULL, NULL, NULL);
436

437
  DefineCustomStringVariable("pg_net.username",
94✔
438
                "Connection user for the worker",
439
                NULL,
440
                &guc_username,
441
                NULL,
442
                PGC_SU_BACKEND, 0,
443
                NULL, NULL, NULL);
444
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc