• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_net / 16725696051

04 Aug 2025 02:17PM UTC coverage: 92.176%. Remained the same
16725696051

push

github

steve-chavez
test: avoid sleeping in truncate test to flakiness

We only need to ensure the truncate succeeds fast, not necessary to
sleep.

Also adjust sleep time of test_http_responses_will_delete_despite_restart.

483 of 524 relevant lines covered (92.18%)

207.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.63
/src/worker.c
1
#include <unistd.h>
2
#include <errno.h>
3
#include <string.h>
4
#include <inttypes.h>
5

6
#define PG_PRELUDE_IMPL
7
#include "pg_prelude.h"
8
#include "curl_prelude.h"
9
#include "util.h"
10
#include "errors.h"
11
#include "core.h"
12
#include "event.h"
13

14
#define MIN_LIBCURL_VERSION_NUM 0x075300 // This is the 7.83.0 version in hex as defined in curl/curlver.h
15
#define REQUIRED_LIBCURL_ERR_MSG "libcurl >= 7.83.0 is required, we use the curl_easy_nextheader() function added in this version"
16
_Static_assert(LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG); // test for older libcurl versions that don't even have LIBCURL_VERSION_NUM defined (e.g. libcurl 6.5).
17
_Static_assert(LIBCURL_VERSION_NUM >= MIN_LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG);
18

19
PG_MODULE_MAGIC;
95✔
20

21
typedef enum {
22
  WORKER_WAIT_NO_TIMEOUT,
23
  WORKER_WAIT_ONE_SECOND,
24
} WorkerWait;
25

26
static WorkerState *worker_state = NULL;
27

28
static const int                curl_handle_event_timeout_ms = 1000;
29
static const int                net_worker_restart_time_sec = 1;
30
static const long               no_timeout = -1L;
31
static bool                     wake_commit_cb_active = false;
32
static bool                     worker_should_restart = false;
33
static const size_t             total_extension_tables = 2;
34

35
static char*                    guc_ttl;
36
static int                      guc_batch_size;
37
static char*                    guc_database_name;
38
static char*                    guc_username;
39
static MemoryContext            CurlMemContext = NULL;
40
static shmem_startup_hook_type  prev_shmem_startup_hook = NULL;
41
static volatile sig_atomic_t    got_sighup = false;
42

43
void _PG_init(void);
44

45
#if PG_VERSION_NUM >= 180000
46
  PGDLLEXPORT pg_noreturn void pg_net_worker(Datum main_arg);
47
#else
48
  PGDLLEXPORT void pg_net_worker(Datum main_arg) pg_attribute_noreturn();
49
#endif
50

51
PG_FUNCTION_INFO_V1(worker_restart);
66✔
52
Datum worker_restart(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
17✔
53
  bool result = DatumGetBool(DirectFunctionCall1(pg_reload_conf, (Datum) NULL)); // reload the config
17✔
54
  pg_atomic_write_u32(&worker_state->got_restart, 1);
17✔
55
  pg_write_barrier();
17✔
56
  if(worker_state->shared_latch)
17✔
57
    SetLatch(worker_state->shared_latch);
16✔
58
  PG_RETURN_BOOL(result); // TODO is not necessary to return a bool here, but we do it to maintain backward compatibility
17✔
59
}
60

61
static void wait_until_state(WorkerState *ws, WorkerStatus expected_status){
18✔
62
  if (pg_atomic_read_u32(&ws->status) == expected_status) // fast return without sleeping, in case condition is fulfilled
18✔
63
    return;
64

65
  ConditionVariablePrepareToSleep(&ws->cv);
16✔
66
  while (pg_atomic_read_u32(&ws->status) != expected_status) {
32✔
67
    ConditionVariableSleep(&ws->cv, PG_WAIT_EXTENSION);
16✔
68
  }
69
  ConditionVariableCancelSleep();
16✔
70
}
71

72
PG_FUNCTION_INFO_V1(wait_until_running);
67✔
73
Datum wait_until_running(__attribute__ ((unused)) PG_FUNCTION_ARGS){
18✔
74
  wait_until_state(worker_state, WS_RUNNING);
18✔
75

76
  PG_RETURN_VOID();
18✔
77
}
78

79
// only wake at commit time to prevent excessive and unnecessary wakes.
80
// e.g only one wake when doing `select net.http_get('http://localhost:8080/pathological?status=200') from generate_series(1,100000);`
81
static void wake_at_commit(XactEvent event, __attribute__ ((unused)) void *arg){
292✔
82
  elog(DEBUG2, "pg_net xact callback received: %s", xact_event_name(event));
292✔
83

84
  switch(event){
292✔
85
    case XACT_EVENT_COMMIT:
120✔
86
    case XACT_EVENT_PARALLEL_COMMIT:
87
      if(wake_commit_cb_active){
120✔
88
        uint32 expected = 0;
48✔
89
        bool success = pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 1);
48✔
90
        pg_write_barrier();
48✔
91

92
        if (success) // only wake the worker on first put, so if many concurrent wakes come we only wake once
48✔
93
          SetLatch(worker_state->shared_latch);
22✔
94

95
        wake_commit_cb_active = false;
48✔
96
      }
97
      break;
98
    // TODO: `PREPARE TRANSACTION 'xx';` and `COMMIT PREPARED TRANSACTION 'xx';` do not wake the worker automatically, they require a manual `net.wake()`
99
    // These are disabled by default and rarely used, see `max_prepared_transactions` https://www.postgresql.org/docs/17/runtime-config-resource.html#GUC-MAX-PREPARED-TRANSACTIONS
100
    case XACT_EVENT_PREPARE:
52✔
101
    // abort the callback on rollback
102
    case XACT_EVENT_ABORT:
103
    case XACT_EVENT_PARALLEL_ABORT:
104
      wake_commit_cb_active = false;
52✔
105
      break;
52✔
106
    default:
107
      break;
108
  }
109
}
292✔
110

111
PG_FUNCTION_INFO_V1(wake);
100✔
112
Datum wake(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
433✔
113
  if (!wake_commit_cb_active) { // register only one callback per transaction
433✔
114
    RegisterXactCallback(wake_at_commit, NULL);
54✔
115
    wake_commit_cb_active = true;
54✔
116
  }
117

118
  PG_RETURN_VOID();
433✔
119
}
120

121
static void
122
handle_sigterm(__attribute__ ((unused)) SIGNAL_ARGS)
2✔
123
{
124
  int save_errno = errno;
2✔
125
  pg_atomic_write_u32(&worker_state->got_restart, 1);
2✔
126
  pg_write_barrier();
2✔
127
  if(worker_state->shared_latch)
2✔
128
    SetLatch(worker_state->shared_latch);
2✔
129
  errno = save_errno;
2✔
130
}
2✔
131

132
static void
133
handle_sighup(__attribute__ ((unused)) SIGNAL_ARGS)
18✔
134
{
135
  int     save_errno = errno;
18✔
136
  got_sighup = true;
18✔
137
  if(worker_state->shared_latch)
18✔
138
    SetLatch(worker_state->shared_latch);
3✔
139
  errno = save_errno;
18✔
140
}
18✔
141

142
/*
143
 *We have to handle sigusr1 explicitly because the default
144
 *procsignal_sigusr1_handler doesn't `SetLatch`, this would prevent
145
 *DROP DATATABASE from finishing since our worker would be sleeping and not reach
146
 *CHECK_FOR_INTERRUPTS()
147
 */
148
static void
149
handle_sigusr1(SIGNAL_ARGS)
4✔
150
{
151
  int     save_errno = errno;
4✔
152
  if(worker_state->shared_latch)
4✔
153
    SetLatch(worker_state->shared_latch);
4✔
154
  errno = save_errno;
4✔
155
  procsignal_sigusr1_handler(postgres_signal_arg);
4✔
156
}
4✔
157

158
static void publish_state(WorkerStatus s) {
38✔
159
  pg_atomic_write_u32(&worker_state->status, (uint32)s);
38✔
160
  pg_write_barrier();
38✔
161
  ConditionVariableBroadcast(&worker_state->cv);
38✔
162
}
38✔
163

164
static void
165
net_on_exit(__attribute__ ((unused)) int code, __attribute__ ((unused)) Datum arg){
19✔
166
  worker_should_restart = false;
19✔
167
  pg_atomic_write_u32(&worker_state->should_wake, 1); // ensure the remaining work will continue since we'll restart
19✔
168

169
  worker_state->shared_latch = NULL;
19✔
170

171
  ev_monitor_close(worker_state);
19✔
172

173
  curl_multi_cleanup(worker_state->curl_mhandle);
19✔
174
  curl_global_cleanup();
19✔
175
}
19✔
176

177
// wait according to the wait type while ensuring interrupts are processed while waiting
178
static void wait_while_processing_interrupts(WorkerWait ww, bool *should_restart){
108✔
179
  switch(ww){
108✔
180
    case WORKER_WAIT_NO_TIMEOUT:
28✔
181
      WaitLatch(worker_state->shared_latch,
28✔
182
                WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
183
                no_timeout,
184
                PG_WAIT_EXTENSION);
185
      ResetLatch(worker_state->shared_latch);
28✔
186
      break;
28✔
187
    case WORKER_WAIT_ONE_SECOND:
80✔
188
      WaitLatch(worker_state->shared_latch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 1000, PG_WAIT_EXTENSION);
80✔
189
      ResetLatch(worker_state->shared_latch);
80✔
190
      break;
80✔
191
  }
192

193
  CHECK_FOR_INTERRUPTS();
108✔
194

195
  if (got_sighup) {
108✔
196
    got_sighup = false;
3✔
197
    ProcessConfigFile(PGC_SIGHUP);
3✔
198
  }
199

200
  if (pg_atomic_exchange_u32(&worker_state->got_restart, 0)){
108✔
201
    *should_restart = true;
19✔
202
  }
203
}
108✔
204

205
static bool is_extension_locked(Oid ext_table_oids[static total_extension_tables]){
84✔
206
  Oid net_oid = get_namespace_oid("net", true);
84✔
207

208
  if(!OidIsValid(net_oid)){
84✔
209
    return false;
210
  }
211

212
  Oid queue_oid = get_relname_relid("http_request_queue", net_oid);
81✔
213
  Oid resp_oid = get_relname_relid("_http_response", net_oid);
81✔
214

215
  bool is_locked = ConditionalLockRelationOid(queue_oid, AccessShareLock) && ConditionalLockRelationOid(resp_oid, AccessShareLock);
81✔
216

217
  if (is_locked) {
80✔
218
    ext_table_oids[0] = queue_oid;
80✔
219
    ext_table_oids[1] = resp_oid;
80✔
220
  }
221

222
  return is_locked;
223
}
224

225
static void unlock_extension(Oid ext_table_oids[static total_extension_tables]){
80✔
226
  UnlockRelationOid(ext_table_oids[0], AccessShareLock);
80✔
227
  UnlockRelationOid(ext_table_oids[1], AccessShareLock);
80✔
228
}
80✔
229

230
void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
19✔
231
  worker_state->shared_latch = &MyProc->procLatch;
19✔
232
  on_proc_exit(net_on_exit, 0);
19✔
233

234
  BackgroundWorkerUnblockSignals();
19✔
235
  pqsignal(SIGTERM, handle_sigterm);
19✔
236
  pqsignal(SIGHUP, handle_sighup);
19✔
237
  pqsignal(SIGUSR1, handle_sigusr1);
19✔
238

239
  BackgroundWorkerInitializeConnection(guc_database_name, guc_username, 0);
19✔
240
  pgstat_report_appname("pg_net " EXTVERSION); // set appname for pg_stat_activity
19✔
241

242
  elog(INFO, "pg_net worker started with a config of: pg_net.ttl=%s, pg_net.batch_size=%d, pg_net.username=%s, pg_net.database_name=%s", guc_ttl, guc_batch_size, guc_username, guc_database_name);
19✔
243

244
  int curl_ret = curl_global_init(CURL_GLOBAL_ALL);
19✔
245
  if(curl_ret != CURLE_OK)
19✔
246
    ereport(ERROR, errmsg("curl_global_init() returned %s\n", curl_easy_strerror(curl_ret)));
×
247

248
  worker_state->epfd = event_monitor();
19✔
249

250
  if (worker_state->epfd < 0) {
19✔
251
    ereport(ERROR, errmsg("Failed to create event monitor file descriptor"));
×
252
  }
253

254
  worker_state->curl_mhandle = curl_multi_init();
19✔
255
  if(!worker_state->curl_mhandle)
19✔
256
    ereport(ERROR, errmsg("curl_multi_init()"));
×
257

258
  set_curl_mhandle(worker_state);
19✔
259

260
  publish_state(WS_RUNNING);
19✔
261

262
  do {
65✔
263

264
    uint32 expected = 1;
65✔
265
    if (!pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 0)){
65✔
266
      elog(DEBUG1, "pg_net worker waiting for wake");
28✔
267
      wait_while_processing_interrupts(WORKER_WAIT_NO_TIMEOUT, &worker_should_restart);
28✔
268
      continue;
28✔
269
    }
270

271
    uint64 requests_consumed = 0;
84✔
272
    uint64 expired_responses = 0;
84✔
273

274
    do {
84✔
275
      SetCurrentStatementStartTimestamp();
84✔
276
      StartTransactionCommand();
84✔
277
      PushActiveSnapshot(GetTransactionSnapshot());
84✔
278

279
      Oid ext_table_oids[total_extension_tables];
84✔
280

281
      if(!is_extension_locked(ext_table_oids)){
84✔
282
        elog(DEBUG1, "pg_net extension not loaded");
4✔
283
        PopActiveSnapshot();
4✔
284
        AbortCurrentTransaction();
4✔
285
        break;
4✔
286
      }
287

288
      expired_responses = delete_expired_responses(guc_ttl, guc_batch_size);
80✔
289

290
      elog(DEBUG1, "Deleted "UINT64_FORMAT" expired rows", expired_responses);
80✔
291

292
      requests_consumed = consume_request_queue(worker_state->curl_mhandle, guc_batch_size, CurlMemContext);
80✔
293

294
      elog(DEBUG1, "Consumed "UINT64_FORMAT" request rows", requests_consumed);
80✔
295

296
      if(requests_consumed > 0){
80✔
297
        int running_handles = 0;
51✔
298
        int maxevents = guc_batch_size + 1; // 1 extra for the timer
51✔
299
        event events[maxevents];
51✔
300

301
        do {
169✔
302
          int nfds = wait_event(worker_state->epfd, events, maxevents, curl_handle_event_timeout_ms);
169✔
303
          if (nfds < 0) {
169✔
304
            int save_errno = errno;
×
305
            if(save_errno == EINTR) { // can happen when the wait is interrupted, for example when running under GDB. Just continue in this case.
×
306
              elog(DEBUG1, "wait_event() got %s, continuing", strerror(save_errno));
×
307
              continue;
×
308
            }
309
            else {
310
              ereport(ERROR, errmsg("wait_event() failed: %s", strerror(save_errno)));
×
311
              break;
312
            }
313
          }
314

315
          for (int i = 0; i < nfds; i++) {
1,027✔
316
            if (is_timer(events[i])) {
858✔
317
              EREPORT_MULTI(
62✔
318
                curl_multi_socket_action(worker_state->curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles)
319
              );
320
            } else {
321
              int curl_event = get_curl_event(events[i]);
796✔
322
              int sockfd = get_socket_fd(events[i]);
796✔
323

324
              EREPORT_MULTI(
796✔
325
                curl_multi_socket_action(
326
                  worker_state->curl_mhandle,
327
                  sockfd,
328
                  curl_event,
329
                  &running_handles)
330
              );
331
            }
332

333
            insert_curl_responses(worker_state, CurlMemContext);
858✔
334
          }
335

336
          elog(DEBUG1, "Pending curl running_handles: %d", running_handles);
169✔
337
        } while (running_handles > 0); // run while there are curl handles, some won't finish in a single iteration since they could be slow and waiting for a timeout
169✔
338
      }
339

340
      unlock_extension(ext_table_oids);
80✔
341

342
      PopActiveSnapshot();
80✔
343
      CommitTransactionCommand();
80✔
344

345
      MemoryContextReset(CurlMemContext);
80✔
346

347
      // slow down queue processing to avoid using too much CPU
348
      wait_while_processing_interrupts(WORKER_WAIT_ONE_SECOND, &worker_should_restart);
80✔
349

350
    } while (!worker_should_restart && (requests_consumed > 0 || expired_responses > 0));
80✔
351

352
  } while (!worker_should_restart);
65✔
353

354
  publish_state(WS_EXITED);
19✔
355

356
  // causing a failure on exit will make the postmaster process restart the bg worker
357
  proc_exit(EXIT_FAILURE);
19✔
358
}
359

360
static void net_shmem_startup(void) {
95✔
361
  if (prev_shmem_startup_hook)
95✔
362
    prev_shmem_startup_hook();
×
363

364
  bool found;
95✔
365

366
  worker_state = ShmemInitStruct("pg_net worker state", sizeof(WorkerState), &found);
95✔
367

368
  if (!found) {
95✔
369
    pg_atomic_init_u32(&worker_state->got_restart, 0);
95✔
370
    pg_atomic_init_u32(&worker_state->status, WS_NOT_YET);
95✔
371
    pg_atomic_init_u32(&worker_state->should_wake, 1);
95✔
372
    worker_state->shared_latch = NULL;
95✔
373

374
    ConditionVariableInit(&worker_state->cv);
95✔
375
    worker_state->epfd = 0;
95✔
376
    worker_state->curl_mhandle = NULL;
95✔
377
  }
378
}
95✔
379

380
void _PG_init(void) {
95✔
381
  if (IsBinaryUpgrade) {
95✔
382
      return;
×
383
  }
384

385
  if (!process_shared_preload_libraries_in_progress) {
95✔
386
      ereport(ERROR, errmsg("pg_net is not in shared_preload_libraries"),
×
387
              errhint("Add pg_net to the shared_preload_libraries "
388
                      "configuration variable in postgresql.conf."));
389
  }
390

391
  RegisterBackgroundWorker(&(BackgroundWorker){
95✔
392
    .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
393
    .bgw_start_time = BgWorkerStart_RecoveryFinished,
394
    .bgw_library_name = "pg_net",
395
    .bgw_function_name = "pg_net_worker",
396
    .bgw_name = "pg_net " EXTVERSION " worker",
397
    .bgw_restart_time = net_worker_restart_time_sec,
398
  });
399

400
  prev_shmem_startup_hook = shmem_startup_hook;
95✔
401
  shmem_startup_hook = net_shmem_startup;
95✔
402

403
  CurlMemContext = AllocSetContextCreate(TopMemoryContext,
95✔
404
                       "pg_net curl context",
405
                       ALLOCSET_DEFAULT_MINSIZE,
406
                       ALLOCSET_DEFAULT_INITSIZE,
407
                       ALLOCSET_DEFAULT_MAXSIZE);
408

409
  DefineCustomStringVariable("pg_net.ttl",
95✔
410
                 "time to live for request/response rows",
411
                 "should be a valid interval type",
412
                 &guc_ttl,
413
                 "6 hours",
414
                 PGC_SIGHUP, 0,
415
                 NULL, NULL, NULL);
416

417
  DefineCustomIntVariable("pg_net.batch_size",
95✔
418
                 "number of requests executed in one iteration of the background worker",
419
                 NULL,
420
                 &guc_batch_size,
421
                 200,
422
                 0, PG_INT16_MAX,
423
                 PGC_SIGHUP, 0,
424
                 NULL, NULL, NULL);
425

426
  DefineCustomStringVariable("pg_net.database_name",
95✔
427
                "Database where the worker will connect to",
428
                NULL,
429
                &guc_database_name,
430
                "postgres",
431
                PGC_SU_BACKEND, 0,
432
                NULL, NULL, NULL);
433

434
  DefineCustomStringVariable("pg_net.username",
95✔
435
                "Connection user for the worker",
436
                NULL,
437
                &guc_username,
438
                NULL,
439
                PGC_SU_BACKEND, 0,
440
                NULL, NULL, NULL);
441
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc