• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_net / 15790028800

21 Jun 2025 12:21AM UTC coverage: 93.275% (-0.04%) from 93.318%
15790028800

Pull #192

github

web-flow
Merge dbad75840 into e141cbec5
Pull Request #192: feat: add `net.wait_until_running` sql function

39 of 39 new or added lines in 1 file covered. (100.0%)

2 existing lines in 1 file now uncovered.

430 of 461 relevant lines covered (93.28%)

115.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.11
/src/worker.c
1
#include <unistd.h>
2
#include <errno.h>
3
#include <string.h>
4
#include <inttypes.h>
5

6
#include "pg_prelude.h"
7
#include "curl_prelude.h"
8
#include "util.h"
9
#include "errors.h"
10
#include "core.h"
11
#include "event.h"
12

13
#define MIN_LIBCURL_VERSION_NUM 0x075300 // This is the 7.83.0 version in hex as defined in curl/curlver.h
14
#define REQUIRED_LIBCURL_ERR_MSG "libcurl >= 7.83.0 is required, we use the curl_easy_nextheader() function added in this version"
15
_Static_assert(LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG); // test for older libcurl versions that don't even have LIBCURL_VERSION_NUM defined (e.g. libcurl 6.5).
16
_Static_assert(LIBCURL_VERSION_NUM >= MIN_LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG);
17

18
PG_MODULE_MAGIC;
55✔
19

20
typedef enum {
21
  WS_NOT_YET = 1,
22
  WS_RUNNING,
23
  WS_EXITED,
24
} WorkerStatus;
25

26
typedef struct {
27
  pg_atomic_uint32  should_restart;
28
  pg_atomic_uint32  status;
29
  Latch             latch;
30
  ConditionVariable cv;
31
} WorkerState;
32

33
WorkerState *worker_state = NULL;
34

35
static char*                    guc_ttl;
36
static int                      guc_batch_size;
37
static char*                    guc_database_name;
38
static char*                    guc_username;
39
static MemoryContext            CurlMemContext = NULL;
40
static shmem_startup_hook_type  prev_shmem_startup_hook = NULL;
41
static long                     latch_timeout = 1000;
42
static volatile sig_atomic_t    got_sigterm = false;
43
static volatile sig_atomic_t    got_sighup = false;
44

45
void _PG_init(void);
46
PGDLLEXPORT void pg_net_worker(Datum main_arg) pg_attribute_noreturn();
47

48
PG_FUNCTION_INFO_V1(worker_restart);
49✔
49
Datum worker_restart(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
5✔
50
  bool result = DatumGetBool(DirectFunctionCall1(pg_reload_conf, (Datum) NULL)); // reload the config
5✔
51
  pg_atomic_write_u32(&worker_state->should_restart, 1);
5✔
52
  pg_write_barrier();
5✔
53
  if (worker_state)
5✔
54
    SetLatch(&worker_state->latch);
5✔
55
  PG_RETURN_BOOL(result); // TODO is not necessary to return a bool here, but we do it to maintain backward compatibility
5✔
56
}
57

58
static void wait_until_state(WorkerState *ws, WorkerStatus expected_status){
6✔
59
  if (pg_atomic_read_u32(&ws->status) == expected_status) // fast return without sleeping, in case condition is fulfilled
6✔
60
    return;
61

62
  ConditionVariablePrepareToSleep(&ws->cv);
3✔
63
  while (pg_atomic_read_u32(&ws->status) != expected_status) {
6✔
64
    ConditionVariableSleep(&ws->cv, PG_WAIT_EXTENSION);
3✔
65
  }
66
  ConditionVariableCancelSleep();
3✔
67
}
68

69
PG_FUNCTION_INFO_V1(wait_until_running);
50✔
70
Datum wait_until_running(__attribute__ ((unused)) PG_FUNCTION_ARGS){
6✔
71
  wait_until_state(worker_state, WS_RUNNING);
6✔
72

73
  PG_RETURN_VOID();
6✔
74
}
75

76
static void
77
handle_sigterm(__attribute__ ((unused)) SIGNAL_ARGS)
1✔
78
{
79
  int save_errno = errno;
1✔
80
  got_sigterm = true;
1✔
81
  if (worker_state)
1✔
82
    SetLatch(&worker_state->latch);
1✔
83
  errno = save_errno;
1✔
84
}
1✔
85

86
static void
87
handle_sighup(__attribute__ ((unused)) SIGNAL_ARGS)
5✔
88
{
89
  int     save_errno = errno;
5✔
90
  got_sighup = true;
5✔
91
  if (worker_state)
5✔
92
    SetLatch(&worker_state->latch);
5✔
93
  errno = save_errno;
5✔
94
}
5✔
95

96
static bool is_extension_loaded(){
59✔
97
  Oid extensionOid;
59✔
98

99
  StartTransactionCommand();
59✔
100

101
  extensionOid = get_extension_oid("pg_net", true);
59✔
102

103
  CommitTransactionCommand();
59✔
104

105
  return OidIsValid(extensionOid);
59✔
106
}
107

108
static void publish_state(WorkerStatus s) {
64✔
109
  pg_atomic_write_u32(&worker_state->status, (uint32)s);
64✔
110
  pg_write_barrier();
64✔
111
  ConditionVariableBroadcast(&worker_state->cv);
64✔
112
}
64✔
113

114
void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
5✔
115
  OwnLatch(&worker_state->latch);
5✔
116

117
  pqsignal(SIGTERM, handle_sigterm);
5✔
118
  pqsignal(SIGHUP, handle_sighup);
5✔
119
  pqsignal(SIGUSR1, procsignal_sigusr1_handler);
5✔
120

121
  BackgroundWorkerUnblockSignals();
5✔
122

123
  BackgroundWorkerInitializeConnection(guc_database_name, guc_username, 0);
5✔
124
  pgstat_report_appname("pg_net " EXTVERSION); // set appname for pg_stat_activity
5✔
125

126
  elog(INFO, "pg_net_worker started with a config of: pg_net.ttl=%s, pg_net.batch_size=%d, pg_net.username=%s, pg_net.database_name=%s", guc_ttl, guc_batch_size, guc_username, guc_database_name);
5✔
127

128
  int curl_ret = curl_global_init(CURL_GLOBAL_ALL);
5✔
129
  if(curl_ret != CURLE_OK)
5✔
130
    ereport(ERROR, errmsg("curl_global_init() returned %s\n", curl_easy_strerror(curl_ret)));
×
131

132
  LoopState lstate = {
15✔
133
    .epfd = event_monitor(),
5✔
134
    .curl_mhandle = curl_multi_init(),
5✔
135
  };
136

137
  if (lstate.epfd < 0) {
5✔
138
    ereport(ERROR, errmsg("Failed to create event monitor file descriptor"));
×
139
  }
140

141
  if(!lstate.curl_mhandle)
5✔
142
    ereport(ERROR, errmsg("curl_multi_init()"));
×
143

144
  set_curl_mhandle(lstate.curl_mhandle, &lstate);
5✔
145

146
  while (!got_sigterm) {
60✔
147
    WaitLatch(&worker_state->latch,
59✔
148
          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
149
          latch_timeout,
150
          PG_WAIT_EXTENSION);
151
    ResetLatch(&worker_state->latch);
59✔
152

153
    publish_state(WS_RUNNING);
59✔
154

155
    CHECK_FOR_INTERRUPTS();
59✔
156

157
    if(!is_extension_loaded()){
59✔
158
      elog(DEBUG1, "pg_net worker: extension not yet loaded");
4✔
159
      continue;
21✔
160
    }
161

162
    if (got_sighup) {
55✔
UNCOV
163
      got_sighup = false;
×
UNCOV
164
      ProcessConfigFile(PGC_SIGHUP);
×
165
    }
166

167
    if (pg_atomic_read_u32(&worker_state->should_restart) == 1){
55✔
168
      elog(INFO, "Restarting pg_net worker");
4✔
169
      break;
4✔
170
    }
171

172
    uint64 expired_responses = delete_expired_responses(guc_ttl, guc_batch_size);
51✔
173

174
    elog(DEBUG1, "Deleted %zu expired rows", expired_responses);
51✔
175

176
    uint64 requests_consumed = consume_request_queue(lstate.curl_mhandle, guc_batch_size, CurlMemContext);
51✔
177

178
    elog(DEBUG1, "Consumed %zu request rows", requests_consumed);
51✔
179

180
    if(requests_consumed == 0)
51✔
181
      continue;
17✔
182

183
    int running_handles = 0;
34✔
184
    int maxevents = guc_batch_size + 1; // 1 extra for the timer
34✔
185
    event *events = palloc0(sizeof(event) * maxevents);
34✔
186

187
    do {
104✔
188
      int nfds = wait_event(lstate.epfd, events, maxevents, 1000);
104✔
189
      if (nfds < 0) {
104✔
190
        int save_errno = errno;
×
191
        if(save_errno == EINTR) { // can happen when the wait is interrupted, for example when running under GDB. Just continue in this case.
×
192
          continue;
×
193
        }
194
        else {
195
          ereport(ERROR, errmsg("wait_event() failed: %s", strerror(save_errno)));
×
196
          break;
197
        }
198
      }
199

200
      for (int i = 0; i < nfds; i++) {
519✔
201
        if (is_timer(events[i])) {
415✔
202
          EREPORT_MULTI(
46✔
203
            curl_multi_socket_action(lstate.curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles)
204
          );
205
        } else {
206
          int curl_event = get_curl_event(events[i]);
369✔
207
          int sockfd = get_socket_fd(events[i]);
369✔
208

209
          EREPORT_MULTI(
369✔
210
            curl_multi_socket_action(
211
              lstate.curl_mhandle,
212
              sockfd,
213
              curl_event,
214
              &running_handles)
215
          );
216
        }
217

218
        insert_curl_responses(&lstate, CurlMemContext);
415✔
219
      }
220

221
    } while (running_handles > 0); // run again while there are curl handles, this will prevent waiting for the latch_timeout (which will cause the cause the curl timeouts to be wrong)
104✔
222

223
    pfree(events);
34✔
224

225
    MemoryContextReset(CurlMemContext);
34✔
226
  }
227

228
  pg_atomic_write_u32(&worker_state->should_restart, 0);
5✔
229

230
  ev_monitor_close(&lstate);
5✔
231

232
  curl_multi_cleanup(lstate.curl_mhandle);
5✔
233
  curl_global_cleanup();
5✔
234

235
  publish_state(WS_EXITED);
5✔
236
  DisownLatch(&worker_state->latch);
5✔
237

238
  // causing a failure on exit will make the postmaster process restart the bg worker
239
  proc_exit(EXIT_FAILURE);
5✔
240
}
241

242
static void net_shmem_startup(void) {
55✔
243
  if (prev_shmem_startup_hook)
55✔
244
    prev_shmem_startup_hook();
×
245

246
  bool found;
55✔
247

248
  worker_state = ShmemInitStruct("pg_net worker state", sizeof(WorkerState), &found);
55✔
249

250
  if (!found) { // only at worker initialization, once worker restarts it will be found
55✔
251
    pg_atomic_init_u32(&worker_state->should_restart, 0);
55✔
252
    pg_atomic_init_u32(&worker_state->status, WS_NOT_YET);
55✔
253
    InitSharedLatch(&worker_state->latch);
55✔
254
    ConditionVariableInit(&worker_state->cv);
55✔
255
  }
256
}
55✔
257

258
void _PG_init(void) {
55✔
259
  if (IsBinaryUpgrade) {
55✔
260
      return;
×
261
  }
262

263
  if (!process_shared_preload_libraries_in_progress) {
55✔
264
      ereport(ERROR, errmsg("pg_net is not in shared_preload_libraries"),
×
265
              errhint("Add pg_net to the shared_preload_libraries "
266
                      "configuration variable in postgresql.conf."));
267
  }
268

269
  RegisterBackgroundWorker(&(BackgroundWorker){
55✔
270
    .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
271
    .bgw_start_time = BgWorkerStart_RecoveryFinished,
272
    .bgw_library_name = "pg_net",
273
    .bgw_function_name = "pg_net_worker",
274
    .bgw_name = "pg_net " EXTVERSION " worker",
275
    .bgw_restart_time = 1,
276
  });
277

278
  prev_shmem_startup_hook = shmem_startup_hook;
55✔
279
  shmem_startup_hook = net_shmem_startup;
55✔
280

281
  CurlMemContext = AllocSetContextCreate(TopMemoryContext,
55✔
282
                       "pg_net curl context",
283
                       ALLOCSET_DEFAULT_MINSIZE,
284
                       ALLOCSET_DEFAULT_INITSIZE,
285
                       ALLOCSET_DEFAULT_MAXSIZE);
286

287
  DefineCustomStringVariable("pg_net.ttl",
55✔
288
                 "time to live for request/response rows",
289
                 "should be a valid interval type",
290
                 &guc_ttl,
291
                 "6 hours",
292
                 PGC_SUSET, 0,
293
                 NULL, NULL, NULL);
294

295
  DefineCustomIntVariable("pg_net.batch_size",
55✔
296
                 "number of requests executed in one iteration of the background worker",
297
                 NULL,
298
                 &guc_batch_size,
299
                 200,
300
                 0, PG_INT16_MAX,
301
                 PGC_SUSET, 0,
302
                 NULL, NULL, NULL);
303

304
  DefineCustomStringVariable("pg_net.database_name",
55✔
305
                "Database where the worker will connect to",
306
                NULL,
307
                &guc_database_name,
308
                "postgres",
309
                PGC_SIGHUP, 0,
310
                NULL, NULL, NULL);
311

312
  DefineCustomStringVariable("pg_net.username",
55✔
313
                "Connection user for the worker",
314
                NULL,
315
                &guc_username,
316
                NULL,
317
                PGC_SIGHUP, 0,
318
                NULL, NULL, NULL);
319
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc