• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_net / 15789677728

20 Jun 2025 11:44PM UTC coverage: 93.261% (-0.06%) from 93.318%
15789677728

Pull #192

github

web-flow
Merge 2a3fa1b55 into e141cbec5
Pull Request #192: test: worker restart without timing

35 of 35 new or added lines in 1 file covered. (100.0%)

2 existing lines in 1 file now uncovered.

429 of 460 relevant lines covered (93.26%)

115.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.04
/src/worker.c
1
#include <unistd.h>
2
#include <errno.h>
3
#include <string.h>
4
#include <inttypes.h>
5

6
#include "pg_prelude.h"
7
#include "curl_prelude.h"
8
#include "util.h"
9
#include "errors.h"
10
#include "core.h"
11
#include "event.h"
12

13
#define MIN_LIBCURL_VERSION_NUM 0x075300 // This is the 7.83.0 version in hex as defined in curl/curlver.h
14
#define REQUIRED_LIBCURL_ERR_MSG "libcurl >= 7.83.0 is required, we use the curl_easy_nextheader() function added in this version"
15
_Static_assert(LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG); // test for older libcurl versions that don't even have LIBCURL_VERSION_NUM defined (e.g. libcurl 6.5).
16
_Static_assert(LIBCURL_VERSION_NUM >= MIN_LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG);
17

18
PG_MODULE_MAGIC;
55✔
19

20
typedef enum {
21
  WS_NOT_YET = 1,
22
  WS_RUNNING,
23
  WS_EXITED,
24
} WorkerStatus;
25

26
typedef struct {
27
  pg_atomic_uint32  should_restart;
28
  pg_atomic_uint32  status;
29
  Latch             latch;
30
  ConditionVariable cv;
31
} WorkerState;
32

33
WorkerState *worker_state = NULL;
34

35
static char*                    guc_ttl;
36
static int                      guc_batch_size;
37
static char*                    guc_database_name;
38
static char*                    guc_username;
39
static MemoryContext            CurlMemContext = NULL;
40
static shmem_startup_hook_type  prev_shmem_startup_hook = NULL;
41
static long                     latch_timeout = 1000;
42
static volatile sig_atomic_t    got_sigterm = false;
43
static volatile sig_atomic_t    got_sighup = false;
44

45
void _PG_init(void);
46
PGDLLEXPORT void pg_net_worker(Datum main_arg) pg_attribute_noreturn();
47

48
PG_FUNCTION_INFO_V1(worker_restart);
49✔
49
Datum worker_restart(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
5✔
50
  bool result = DatumGetBool(DirectFunctionCall1(pg_reload_conf, (Datum) NULL)); // reload the config
5✔
51
  pg_atomic_write_u32(&worker_state->should_restart, 1);
5✔
52
  if (worker_state)
5✔
53
    SetLatch(&worker_state->latch);
5✔
54
  PG_RETURN_BOOL(result); // TODO is not necessary to return a bool here, but we do it to maintain backward compatibility
5✔
55
}
56

57
static void wait_until_state(WorkerState *ws, WorkerStatus expected_status){
6✔
58
  if (pg_atomic_read_u32(&ws->status) == expected_status)
6✔
59
    return;
60

61
  ConditionVariablePrepareToSleep(&ws->cv);
3✔
62
  while (pg_atomic_read_u32(&ws->status) != expected_status) {
6✔
63
    ConditionVariableSleep(&ws->cv, PG_WAIT_EXTENSION);
3✔
64
  }
65
  ConditionVariableCancelSleep();
3✔
66
}
67

68
PG_FUNCTION_INFO_V1(wait_until_running);
50✔
69
Datum wait_until_running(__attribute__ ((unused)) PG_FUNCTION_ARGS){
6✔
70
  wait_until_state(worker_state, WS_RUNNING);
6✔
71

72
  PG_RETURN_VOID();
6✔
73
}
74

75
static void
76
handle_sigterm(__attribute__ ((unused)) SIGNAL_ARGS)
1✔
77
{
78
  int save_errno = errno;
1✔
79
  got_sigterm = true;
1✔
80
  if (worker_state)
1✔
81
    SetLatch(&worker_state->latch);
1✔
82
  errno = save_errno;
1✔
83
}
1✔
84

85
static void
86
handle_sighup(__attribute__ ((unused)) SIGNAL_ARGS)
5✔
87
{
88
  int     save_errno = errno;
5✔
89
  got_sighup = true;
5✔
90
  if (worker_state)
5✔
91
    SetLatch(&worker_state->latch);
5✔
92
  errno = save_errno;
5✔
93
}
5✔
94

95
static bool is_extension_loaded(){
60✔
96
  Oid extensionOid;
60✔
97

98
  StartTransactionCommand();
60✔
99

100
  extensionOid = get_extension_oid("pg_net", true);
60✔
101

102
  CommitTransactionCommand();
60✔
103

104
  return OidIsValid(extensionOid);
60✔
105
}
106

107
static void publish_state(WorkerStatus s) {
65✔
108
  pg_atomic_write_u32(&worker_state->status, (uint32)s);
65✔
109
  pg_write_barrier();
65✔
110
  ConditionVariableBroadcast(&worker_state->cv);
65✔
111
}
65✔
112

113
void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
5✔
114
  OwnLatch(&worker_state->latch);
5✔
115

116
  pqsignal(SIGTERM, handle_sigterm);
5✔
117
  pqsignal(SIGHUP, handle_sighup);
5✔
118
  pqsignal(SIGUSR1, procsignal_sigusr1_handler);
5✔
119

120
  BackgroundWorkerUnblockSignals();
5✔
121

122
  BackgroundWorkerInitializeConnection(guc_database_name, guc_username, 0);
5✔
123
  pgstat_report_appname("pg_net " EXTVERSION); // set appname for pg_stat_activity
5✔
124

125
  elog(INFO, "pg_net_worker started with a config of: pg_net.ttl=%s, pg_net.batch_size=%d, pg_net.username=%s, pg_net.database_name=%s", guc_ttl, guc_batch_size, guc_username, guc_database_name);
5✔
126

127
  int curl_ret = curl_global_init(CURL_GLOBAL_ALL);
5✔
128
  if(curl_ret != CURLE_OK)
5✔
129
    ereport(ERROR, errmsg("curl_global_init() returned %s\n", curl_easy_strerror(curl_ret)));
×
130

131
  LoopState lstate = {
15✔
132
    .epfd = event_monitor(),
5✔
133
    .curl_mhandle = curl_multi_init(),
5✔
134
  };
135

136
  if (lstate.epfd < 0) {
5✔
137
    ereport(ERROR, errmsg("Failed to create event monitor file descriptor"));
×
138
  }
139

140
  if(!lstate.curl_mhandle)
5✔
141
    ereport(ERROR, errmsg("curl_multi_init()"));
×
142

143
  set_curl_mhandle(lstate.curl_mhandle, &lstate);
5✔
144

145
  while (!got_sigterm) {
61✔
146
    WaitLatch(&worker_state->latch,
60✔
147
          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
148
          latch_timeout,
149
          PG_WAIT_EXTENSION);
150
    ResetLatch(&worker_state->latch);
60✔
151

152
    publish_state(WS_RUNNING);
60✔
153

154
    CHECK_FOR_INTERRUPTS();
60✔
155

156
    if(!is_extension_loaded()){
60✔
157
      elog(DEBUG1, "pg_net worker: extension not yet loaded");
4✔
158
      continue;
22✔
159
    }
160

161
    if (got_sighup) {
56✔
UNCOV
162
      got_sighup = false;
×
UNCOV
163
      ProcessConfigFile(PGC_SIGHUP);
×
164
    }
165

166
    if (pg_atomic_read_u32(&worker_state->should_restart) == 1){
56✔
167
      elog(INFO, "Restarting pg_net worker");
4✔
168
      break;
4✔
169
    }
170

171
    uint64 expired_responses = delete_expired_responses(guc_ttl, guc_batch_size);
52✔
172

173
    elog(DEBUG1, "Deleted %zu expired rows", expired_responses);
52✔
174

175
    uint64 requests_consumed = consume_request_queue(lstate.curl_mhandle, guc_batch_size, CurlMemContext);
52✔
176

177
    elog(DEBUG1, "Consumed %zu request rows", requests_consumed);
52✔
178

179
    if(requests_consumed == 0)
52✔
180
      continue;
18✔
181

182
    int running_handles = 0;
34✔
183
    int maxevents = guc_batch_size + 1; // 1 extra for the timer
34✔
184
    event *events = palloc0(sizeof(event) * maxevents);
34✔
185

186
    do {
103✔
187
      int nfds = wait_event(lstate.epfd, events, maxevents, 1000);
103✔
188
      if (nfds < 0) {
103✔
189
        int save_errno = errno;
×
190
        if(save_errno == EINTR) { // can happen when the wait is interrupted, for example when running under GDB. Just continue in this case.
×
191
          continue;
×
192
        }
193
        else {
194
          ereport(ERROR, errmsg("wait_event() failed: %s", strerror(save_errno)));
×
195
          break;
196
        }
197
      }
198

199
      for (int i = 0; i < nfds; i++) {
517✔
200
        if (is_timer(events[i])) {
414✔
201
          EREPORT_MULTI(
45✔
202
            curl_multi_socket_action(lstate.curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles)
203
          );
204
        } else {
205
          int curl_event = get_curl_event(events[i]);
369✔
206
          int sockfd = get_socket_fd(events[i]);
369✔
207

208
          EREPORT_MULTI(
369✔
209
            curl_multi_socket_action(
210
              lstate.curl_mhandle,
211
              sockfd,
212
              curl_event,
213
              &running_handles)
214
          );
215
        }
216

217
        insert_curl_responses(&lstate, CurlMemContext);
414✔
218
      }
219

220
    } while (running_handles > 0); // run again while there are curl handles, this will prevent waiting for the latch_timeout (which will cause the cause the curl timeouts to be wrong)
103✔
221

222
    pfree(events);
34✔
223

224
    MemoryContextReset(CurlMemContext);
34✔
225
  }
226

227
  pg_atomic_write_u32(&worker_state->should_restart, 0);
5✔
228

229
  ev_monitor_close(&lstate);
5✔
230

231
  curl_multi_cleanup(lstate.curl_mhandle);
5✔
232
  curl_global_cleanup();
5✔
233

234
  publish_state(WS_EXITED);
5✔
235
  DisownLatch(&worker_state->latch);
5✔
236

237
  // causing a failure on exit will make the postmaster process restart the bg worker
238
  proc_exit(EXIT_FAILURE);
5✔
239
}
240

241
static void net_shmem_startup(void) {
55✔
242
  if (prev_shmem_startup_hook)
55✔
243
    prev_shmem_startup_hook();
×
244

245
  bool found;
55✔
246

247
  worker_state = ShmemInitStruct("pg_net worker state", sizeof(WorkerState), &found);
55✔
248

249
  if (!found) { // only at worker initialization, once worker restarts it will be found
55✔
250
    pg_atomic_init_u32(&worker_state->should_restart, 0);
55✔
251
    pg_atomic_init_u32(&worker_state->status, WS_NOT_YET);
55✔
252
    InitSharedLatch(&worker_state->latch);
55✔
253
    ConditionVariableInit(&worker_state->cv);
55✔
254
  }
255
}
55✔
256

257
void _PG_init(void) {
55✔
258
  if (IsBinaryUpgrade) {
55✔
259
      return;
×
260
  }
261

262
  if (!process_shared_preload_libraries_in_progress) {
55✔
263
      ereport(ERROR, errmsg("pg_net is not in shared_preload_libraries"),
×
264
              errhint("Add pg_net to the shared_preload_libraries "
265
                      "configuration variable in postgresql.conf."));
266
  }
267

268
  RegisterBackgroundWorker(&(BackgroundWorker){
55✔
269
    .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
270
    .bgw_start_time = BgWorkerStart_RecoveryFinished,
271
    .bgw_library_name = "pg_net",
272
    .bgw_function_name = "pg_net_worker",
273
    .bgw_name = "pg_net " EXTVERSION " worker",
274
    .bgw_restart_time = 1,
275
  });
276

277
  prev_shmem_startup_hook = shmem_startup_hook;
55✔
278
  shmem_startup_hook = net_shmem_startup;
55✔
279

280
  CurlMemContext = AllocSetContextCreate(TopMemoryContext,
55✔
281
                       "pg_net curl context",
282
                       ALLOCSET_DEFAULT_MINSIZE,
283
                       ALLOCSET_DEFAULT_INITSIZE,
284
                       ALLOCSET_DEFAULT_MAXSIZE);
285

286
  DefineCustomStringVariable("pg_net.ttl",
55✔
287
                 "time to live for request/response rows",
288
                 "should be a valid interval type",
289
                 &guc_ttl,
290
                 "6 hours",
291
                 PGC_SUSET, 0,
292
                 NULL, NULL, NULL);
293

294
  DefineCustomIntVariable("pg_net.batch_size",
55✔
295
                 "number of requests executed in one iteration of the background worker",
296
                 NULL,
297
                 &guc_batch_size,
298
                 200,
299
                 0, PG_INT16_MAX,
300
                 PGC_SUSET, 0,
301
                 NULL, NULL, NULL);
302

303
  DefineCustomStringVariable("pg_net.database_name",
55✔
304
                "Database where the worker will connect to",
305
                NULL,
306
                &guc_database_name,
307
                "postgres",
308
                PGC_SIGHUP, 0,
309
                NULL, NULL, NULL);
310

311
  DefineCustomStringVariable("pg_net.username",
55✔
312
                "Connection user for the worker",
313
                NULL,
314
                &guc_username,
315
                NULL,
316
                PGC_SIGHUP, 0,
317
                NULL, NULL, NULL);
318
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc