• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_net / 15910366097

26 Jun 2025 07:06PM UTC coverage: 93.996% (+0.7%) from 93.275%
15910366097

Pull #199

github

web-flow
Merge f04dacbf3 into 1e7398c63
Pull Request #199: feat: process the queue on demand

67 of 71 new or added lines in 1 file covered. (94.37%)

3 existing lines in 1 file now uncovered.

454 of 483 relevant lines covered (94.0%)

126.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.63
/src/worker.c
1
#include <unistd.h>
2
#include <errno.h>
3
#include <string.h>
4
#include <inttypes.h>
5

6
#include "pg_prelude.h"
7
#include "curl_prelude.h"
8
#include "util.h"
9
#include "errors.h"
10
#include "core.h"
11
#include "event.h"
12

13
#define MIN_LIBCURL_VERSION_NUM 0x075300 // This is the 7.83.0 version in hex as defined in curl/curlver.h
14
#define REQUIRED_LIBCURL_ERR_MSG "libcurl >= 7.83.0 is required, we use the curl_easy_nextheader() function added in this version"
15
_Static_assert(LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG); // test for older libcurl versions that don't even have LIBCURL_VERSION_NUM defined (e.g. libcurl 6.5).
16
_Static_assert(LIBCURL_VERSION_NUM >= MIN_LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG);
17

18
PG_MODULE_MAGIC;
61✔
19

20
typedef enum {
21
  WS_NOT_YET = 1,
22
  WS_RUNNING,
23
  WS_EXITED,
24
} WorkerStatus;
25

26
typedef struct {
27
  pg_atomic_uint32  should_restart;
28
  pg_atomic_uint32  should_work;
29
  pg_atomic_uint32  status;
30
  Latch             latch;
31
  ConditionVariable cv;
32
} WorkerState;
33

34
WorkerState *worker_state = NULL;
35

36
static const int                curl_handle_event_timeout_ms = 1000;
37
static const long               queue_processing_wait_timeout_ms = 1000;
38
static const int                net_worker_restart_time_sec = 1;
39
static const long               no_timeout = -1L;
40

41
static char*                    guc_ttl;
42
static int                      guc_batch_size;
43
static char*                    guc_database_name;
44
static char*                    guc_username;
45
static MemoryContext            CurlMemContext = NULL;
46
static shmem_startup_hook_type  prev_shmem_startup_hook = NULL;
47
static volatile sig_atomic_t    got_sighup = false;
48

49
void _PG_init(void);
50
PGDLLEXPORT void pg_net_worker(Datum main_arg) pg_attribute_noreturn();
51

52
PG_FUNCTION_INFO_V1(worker_restart);
52✔
53
Datum worker_restart(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
9✔
54
  bool result = DatumGetBool(DirectFunctionCall1(pg_reload_conf, (Datum) NULL)); // reload the config
9✔
55
  pg_atomic_write_u32(&worker_state->should_restart, 1);
9✔
56
  pg_write_barrier();
9✔
57
  if (worker_state)
9✔
58
    SetLatch(&worker_state->latch);
9✔
59
  PG_RETURN_BOOL(result); // TODO is not necessary to return a bool here, but we do it to maintain backward compatibility
9✔
60
}
61

62
static void wait_until_state(WorkerState *ws, WorkerStatus expected_status){
11✔
63
  if (pg_atomic_read_u32(&ws->status) == expected_status) // fast return without sleeping, in case condition is fulfilled
11✔
64
    return;
65

66
  ConditionVariablePrepareToSleep(&ws->cv);
8✔
67
  while (pg_atomic_read_u32(&ws->status) != expected_status) {
16✔
68
    ConditionVariableSleep(&ws->cv, PG_WAIT_EXTENSION);
8✔
69
  }
70
  ConditionVariableCancelSleep();
8✔
71
}
72

73
PG_FUNCTION_INFO_V1(wait_until_running);
54✔
74
Datum wait_until_running(__attribute__ ((unused)) PG_FUNCTION_ARGS){
11✔
75
  wait_until_state(worker_state, WS_RUNNING);
11✔
76

77
  PG_RETURN_VOID();
11✔
78
}
79

80
PG_FUNCTION_INFO_V1(wake);
83✔
81
Datum wake(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
191✔
82
  uint32 expected = 0;
191✔
83

84
  bool success = pg_atomic_compare_exchange_u32(&worker_state->should_work, &expected, 1);
191✔
85
  pg_write_barrier();
191✔
86

87
  if (success) // only wake the worker on first put
191✔
88
    SetLatch(&worker_state->latch);
34✔
89

90
  PG_RETURN_VOID();
191✔
91
}
92

93
static void
94
handle_sigterm(__attribute__ ((unused)) SIGNAL_ARGS)
1✔
95
{
96
  int save_errno = errno;
1✔
97
  pg_atomic_write_u32(&worker_state->should_restart, 1);
1✔
98
  pg_write_barrier();
1✔
99
  if (worker_state)
1✔
100
    SetLatch(&worker_state->latch);
1✔
101
  errno = save_errno;
1✔
102
}
1✔
103

104
static void
105
handle_sighup(__attribute__ ((unused)) SIGNAL_ARGS)
8✔
106
{
107
  int     save_errno = errno;
8✔
108
  got_sighup = true;
8✔
109
  if (worker_state)
8✔
110
    SetLatch(&worker_state->latch);
8✔
111
  errno = save_errno;
8✔
112
}
8✔
113

114
static void publish_state(WorkerStatus s) {
70✔
115
  pg_atomic_write_u32(&worker_state->status, (uint32)s);
70✔
116
  pg_write_barrier();
70✔
117
  ConditionVariableBroadcast(&worker_state->cv);
70✔
118
}
70✔
119

120
static bool process_interrupts(){
67✔
121
  bool restart = false;
67✔
122

123
  CHECK_FOR_INTERRUPTS();
67✔
124

125
  if (got_sighup) {
67✔
126
    got_sighup = false;
1✔
127
    ProcessConfigFile(PGC_SIGHUP);
1✔
128
  }
129

130
  if (pg_atomic_read_u32(&worker_state->should_restart) == 1){ // if a restart is issued, make sure we do it again after waiting
67✔
131
    restart = true;
10✔
132
  }
133

134
  return restart;
67✔
135
}
136

137
void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
10✔
138
  OwnLatch(&worker_state->latch);
10✔
139

140
  pqsignal(SIGTERM, handle_sigterm);
10✔
141
  pqsignal(SIGHUP, handle_sighup);
10✔
142
  pqsignal(SIGUSR1, procsignal_sigusr1_handler);
10✔
143

144
  BackgroundWorkerUnblockSignals();
10✔
145

146
  BackgroundWorkerInitializeConnection(guc_database_name, guc_username, 0);
10✔
147
  pgstat_report_appname("pg_net " EXTVERSION); // set appname for pg_stat_activity
10✔
148

149
  elog(INFO, "pg_net_worker started with a config of: pg_net.ttl=%s, pg_net.batch_size=%d, pg_net.username=%s, pg_net.database_name=%s", guc_ttl, guc_batch_size, guc_username, guc_database_name);
10✔
150

151
  int curl_ret = curl_global_init(CURL_GLOBAL_ALL);
10✔
152
  if(curl_ret != CURLE_OK)
10✔
153
    ereport(ERROR, errmsg("curl_global_init() returned %s\n", curl_easy_strerror(curl_ret)));
×
154

155
  LoopState lstate = {
30✔
156
    .epfd = event_monitor(),
10✔
157
    .curl_mhandle = curl_multi_init(),
10✔
158
  };
159

160
  if (lstate.epfd < 0) {
10✔
UNCOV
161
    ereport(ERROR, errmsg("Failed to create event monitor file descriptor"));
×
162
  }
163

164
  if(!lstate.curl_mhandle)
10✔
UNCOV
165
    ereport(ERROR, errmsg("curl_multi_init()"));
×
166

167
  set_curl_mhandle(lstate.curl_mhandle, &lstate);
10✔
168

169
  while (true) {
60✔
170
    publish_state(WS_RUNNING);
60✔
171

172
    uint32 expected = 1;
60✔
173
    if (!pg_atomic_compare_exchange_u32(&worker_state->should_work, &expected, 0)){
60✔
174
      elog(DEBUG1, "pg_net_worker waiting for wake");
27✔
175
      // this will also wait for the `create extension net` to load, since the signal can only come from the request functions inside the `net` schema
176
      WaitLatch(&worker_state->latch,
27✔
177
                WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
178
                no_timeout,
179
                PG_WAIT_EXTENSION);
180
      ResetLatch(&worker_state->latch);
27✔
181
      if(process_interrupts())
27✔
182
        goto restart;
6✔
183

184
      continue;
21✔
185
    }
186

187
    uint64 requests_consumed = 0;
70✔
188
    uint64 expired_responses = 0;
70✔
189

190
    do {
70✔
191
      expired_responses = delete_expired_responses(guc_ttl, guc_batch_size);
70✔
192

193
      elog(DEBUG1, "Deleted %zu expired rows", expired_responses);
70✔
194

195
      requests_consumed = consume_request_queue(lstate.curl_mhandle, guc_batch_size, CurlMemContext);
70✔
196

197
      elog(DEBUG1, "Consumed %zu request rows", requests_consumed);
70✔
198

199
      if(requests_consumed == 0)
70✔
200
        continue;
30✔
201

202
      int running_handles = 0;
40✔
203
      int maxevents = guc_batch_size + 1; // 1 extra for the timer
40✔
204
      event *events = palloc0(sizeof(event) * maxevents);
40✔
205

206
      do {
124✔
207
        int nfds = wait_event(lstate.epfd, events, maxevents, curl_handle_event_timeout_ms);
124✔
208
        if (nfds < 0) {
124✔
NEW
209
          int save_errno = errno;
×
NEW
210
          if(save_errno == EINTR) { // can happen when the wait is interrupted, for example when running under GDB. Just continue in this case.
×
NEW
211
            continue;
×
212
          }
213
          else {
NEW
214
            ereport(ERROR, errmsg("wait_event() failed: %s", strerror(save_errno)));
×
215
            break;
216
          }
217
        }
218

219
        for (int i = 0; i < nfds; i++) {
585✔
220
          if (is_timer(events[i])) {
461✔
221
            EREPORT_MULTI(
52✔
222
              curl_multi_socket_action(lstate.curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles)
223
            );
224
          } else {
225
            int curl_event = get_curl_event(events[i]);
409✔
226
            int sockfd = get_socket_fd(events[i]);
409✔
227

228
            EREPORT_MULTI(
409✔
229
              curl_multi_socket_action(
230
                lstate.curl_mhandle,
231
                sockfd,
232
                curl_event,
233
                &running_handles)
234
            );
235
          }
236

237
          insert_curl_responses(&lstate, CurlMemContext);
461✔
238
        }
239

240
        elog(DEBUG1, "Pending curl running_handles: %d", running_handles);
124✔
241
      } while (running_handles > 0); // run while there are curl handles, some won't finish in a single iteration since they could be slow and waiting for a timeout
124✔
242

243
      pfree(events);
40✔
244

245
      MemoryContextReset(CurlMemContext);
40✔
246

247
      // slow down queue processing to avoid using too much CPU
248
      WaitLatch(&worker_state->latch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, queue_processing_wait_timeout_ms, PG_WAIT_EXTENSION);
40✔
249
      ResetLatch(&worker_state->latch);
40✔
250
      if(process_interrupts()){
40✔
251
        if(requests_consumed > 0 || expired_responses > 0){ // if we have to restart, ensure the remaining work will continue
4✔
252
          pg_atomic_write_u32(&worker_state->should_work, 1);
4✔
253
        }
254

255
        goto restart;
4✔
256
      }
257

258
    } while (requests_consumed > 0 || expired_responses > 0);
66✔
259
  }
260

261
restart:
10✔
262

263
  publish_state(WS_EXITED);
10✔
264

265
  elog(DEBUG1, "pg_net_worker restarting, will it process the queue when started: %s", pg_atomic_read_u32(&worker_state->should_work)?"Yes":"No");
16✔
266

267
  pg_atomic_write_u32(&worker_state->should_restart, 0);
10✔
268

269
  ev_monitor_close(&lstate);
10✔
270

271
  curl_multi_cleanup(lstate.curl_mhandle);
10✔
272
  curl_global_cleanup();
10✔
273

274
  DisownLatch(&worker_state->latch);
10✔
275

276
  // causing a failure on exit will make the postmaster process restart the bg worker
277
  proc_exit(EXIT_FAILURE);
10✔
278
}
279

280
static void net_shmem_startup(void) {
61✔
281
  if (prev_shmem_startup_hook)
61✔
UNCOV
282
    prev_shmem_startup_hook();
×
283

284
  bool found;
61✔
285

286
  worker_state = ShmemInitStruct("pg_net worker state", sizeof(WorkerState), &found);
61✔
287

288
  if (!found) { // only at worker initialization, once worker restarts it will be found
61✔
289
    pg_atomic_init_u32(&worker_state->should_restart, 0);
61✔
290
    pg_atomic_init_u32(&worker_state->status, WS_NOT_YET);
61✔
291
    pg_atomic_init_u32(&worker_state->should_work, 0);
61✔
292
    InitSharedLatch(&worker_state->latch);
61✔
293
    ConditionVariableInit(&worker_state->cv);
61✔
294
  }
295
}
61✔
296

297
void _PG_init(void) {
61✔
298
  if (IsBinaryUpgrade) {
61✔
299
      return;
×
300
  }
301

302
  if (!process_shared_preload_libraries_in_progress) {
61✔
303
      ereport(ERROR, errmsg("pg_net is not in shared_preload_libraries"),
×
304
              errhint("Add pg_net to the shared_preload_libraries "
305
                      "configuration variable in postgresql.conf."));
306
  }
307

308
  RegisterBackgroundWorker(&(BackgroundWorker){
61✔
309
    .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
310
    .bgw_start_time = BgWorkerStart_RecoveryFinished,
311
    .bgw_library_name = "pg_net",
312
    .bgw_function_name = "pg_net_worker",
313
    .bgw_name = "pg_net " EXTVERSION " worker",
314
    .bgw_restart_time = net_worker_restart_time_sec,
315
  });
316

317
  prev_shmem_startup_hook = shmem_startup_hook;
61✔
318
  shmem_startup_hook = net_shmem_startup;
61✔
319

320
  CurlMemContext = AllocSetContextCreate(TopMemoryContext,
61✔
321
                       "pg_net curl context",
322
                       ALLOCSET_DEFAULT_MINSIZE,
323
                       ALLOCSET_DEFAULT_INITSIZE,
324
                       ALLOCSET_DEFAULT_MAXSIZE);
325

326
  DefineCustomStringVariable("pg_net.ttl",
61✔
327
                 "time to live for request/response rows",
328
                 "should be a valid interval type",
329
                 &guc_ttl,
330
                 "6 hours",
331
                 PGC_SIGHUP, 0,
332
                 NULL, NULL, NULL);
333

334
  DefineCustomIntVariable("pg_net.batch_size",
61✔
335
                 "number of requests executed in one iteration of the background worker",
336
                 NULL,
337
                 &guc_batch_size,
338
                 200,
339
                 0, PG_INT16_MAX,
340
                 PGC_SIGHUP, 0,
341
                 NULL, NULL, NULL);
342

343
  DefineCustomStringVariable("pg_net.database_name",
61✔
344
                "Database where the worker will connect to",
345
                NULL,
346
                &guc_database_name,
347
                "postgres",
348
                PGC_SU_BACKEND, 0,
349
                NULL, NULL, NULL);
350

351
  DefineCustomStringVariable("pg_net.username",
61✔
352
                "Connection user for the worker",
353
                NULL,
354
                &guc_username,
355
                NULL,
356
                PGC_SU_BACKEND, 0,
357
                NULL, NULL, NULL);
358
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc