• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_net / 19383742840

15 Nov 2025 03:28AM UTC coverage: 93.421% (+0.05%) from 93.371%
19383742840

push

github

steve-chavez
bump to 0.20.2

497 of 532 relevant lines covered (93.42%)

201.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.35
/src/core.c
1
#include <errno.h>
2
#include <inttypes.h>
3
#include <string.h>
4
#include <unistd.h>
5

6
#include "pg_prelude.h"
7

8
#include "curl_prelude.h"
9

10
#include "core.h"
11
#include "errors.h"
12
#include "event.h"
13

14
static SPIPlanPtr del_response_plan     = NULL;
15
static SPIPlanPtr del_return_queue_plan = NULL;
16
static SPIPlanPtr ins_response_plan     = NULL;
17

18
static size_t body_cb(void *contents, size_t size, size_t nmemb, void *userp) {
24✔
19
  CurlHandle *handle   = (CurlHandle *)userp;
24✔
20
  size_t      realsize = size * nmemb;
24✔
21
  appendBinaryStringInfo(handle->body, (const char *)contents, (int)realsize);
24✔
22
  return realsize;
24✔
23
}
24

25
static struct curl_slist *pg_text_array_to_slist(ArrayType *array, struct curl_slist *headers) {
308✔
26
  ArrayIterator iterator;
308✔
27
  Datum         value;
308✔
28
  bool          isnull;
308✔
29
  char         *hdr;
308✔
30

31
  iterator = array_create_iterator(array, 0, NULL);
308✔
32

33
  while (array_iterate(iterator, &value, &isnull)) {
318✔
34
    if (isnull) {
10✔
35
      continue;
×
36
    }
37

38
    hdr = TextDatumGetCString(value);
10✔
39
    EREPORT_CURL_SLIST_APPEND(headers, hdr);
10✔
40
    pfree(hdr);
10✔
41
  }
42
  array_free_iterator(iterator);
308✔
43

44
  return headers;
308✔
45
}
46

47
void init_curl_handle(CurlHandle *handle, RequestQueueRow row) {
308✔
48
  handle->id        = row.id;
308✔
49
  handle->body      = makeStringInfo();
308✔
50
  handle->ez_handle = curl_easy_init();
308✔
51

52
  handle->timeout_milliseconds = row.timeout_milliseconds;
308✔
53

54
  if (!row.headersBin.isnull) {
308✔
55
    ArrayType         *pgHeaders       = DatumGetArrayTypeP(row.headersBin.value);
308✔
56
    struct curl_slist *request_headers = NULL;
308✔
57

58
    request_headers = pg_text_array_to_slist(pgHeaders, request_headers);
308✔
59

60
    EREPORT_CURL_SLIST_APPEND(request_headers, "User-Agent: pg_net/" EXTVERSION);
308✔
61

62
    handle->request_headers = request_headers;
308✔
63
  }
64

65
  handle->url = TextDatumGetCString(row.url);
308✔
66

67
  handle->req_body = !row.bodyBin.isnull ? TextDatumGetCString(row.bodyBin.value) : NULL;
308✔
68

69
  handle->method = TextDatumGetCString(row.method);
308✔
70

71
  if (strcasecmp(handle->method, "GET") != 0 && strcasecmp(handle->method, "POST") != 0 &&
308✔
72
      strcasecmp(handle->method, "DELETE") != 0) {
6✔
73
    ereport(ERROR, errmsg("Unsupported request method %s", handle->method));
×
74
  }
75

76
  if (strcasecmp(handle->method, "GET") == 0) {
308✔
77
    if (handle->req_body) {
298✔
78
      EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_POSTFIELDS, handle->req_body);
×
79
      EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_CUSTOMREQUEST, "GET");
×
80
    }
81
  }
82

83
  if (strcasecmp(handle->method, "POST") == 0) {
308✔
84
    if (handle->req_body) {
4✔
85
      EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_POSTFIELDS, handle->req_body);
3✔
86
    } else {
87
      EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_POST, 1L);
1✔
88
      EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_POSTFIELDSIZE, 0L);
1✔
89
    }
90
  }
91

92
  if (strcasecmp(handle->method, "DELETE") == 0) {
308✔
93
    EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_CUSTOMREQUEST, "DELETE");
6✔
94
    if (handle->req_body) {
6✔
95
      EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_POSTFIELDS, handle->req_body);
1✔
96
    }
97
  }
98

99
  EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_WRITEFUNCTION, body_cb);
308✔
100
  EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_WRITEDATA, handle);
308✔
101
  EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_HEADER, 0L);
308✔
102
  EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_URL, handle->url);
308✔
103
  EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_HTTPHEADER, handle->request_headers);
308✔
104
  EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_TIMEOUT_MS, (long)handle->timeout_milliseconds);
308✔
105
  EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_PRIVATE, handle);
308✔
106
  EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_FOLLOWLOCATION, (long)true);
308✔
107
  if (log_min_messages <= DEBUG2) EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_VERBOSE, 1L);
308✔
108
#if LIBCURL_VERSION_NUM >= 0x075500 /* libcurl 7.85.0 */
109
  EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_PROTOCOLS_STR, "http,https");
308✔
110
#else
111
  EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_PROTOCOLS, CURLPROTO_HTTP | CURLPROTO_HTTPS);
112
#endif
113
}
308✔
114

115
void set_curl_mhandle(WorkerState *wstate) {
19✔
116
  EREPORT_CURL_MULTI_SETOPT(wstate->curl_mhandle, CURLMOPT_SOCKETFUNCTION, multi_socket_cb);
19✔
117
  EREPORT_CURL_MULTI_SETOPT(wstate->curl_mhandle, CURLMOPT_SOCKETDATA, wstate);
19✔
118
  EREPORT_CURL_MULTI_SETOPT(wstate->curl_mhandle, CURLMOPT_TIMERFUNCTION, multi_timer_cb);
19✔
119
  EREPORT_CURL_MULTI_SETOPT(wstate->curl_mhandle, CURLMOPT_TIMERDATA, wstate);
19✔
120
}
19✔
121

122
uint64 delete_expired_responses(char *ttl, int batch_size) {
80✔
123
  if (del_response_plan == NULL) {
80✔
124
    SPIPlanPtr tmp = SPI_prepare("\
36✔
125
        WITH\
126
        rows AS (\
127
          SELECT ctid\
128
          FROM net._http_response\
129
          WHERE created < now() - $1\
130
          ORDER BY created\
131
          LIMIT $2\
132
        )\
133
        DELETE FROM net._http_response r\
134
        USING rows WHERE r.ctid = rows.ctid",
135
                                 2, (Oid[]){INTERVALOID, INT4OID});
18✔
136
    if (tmp == NULL)
18✔
137
      ereport(ERROR, errmsg("SPI_prepare failed: %s", SPI_result_code_string(SPI_result)));
×
138

139
    del_response_plan = SPI_saveplan(tmp);
18✔
140
    if (del_response_plan == NULL) ereport(ERROR, errmsg("SPI_saveplan failed"));
18✔
141
  }
142

143
  int ret_code = SPI_execute_plan(
240✔
144
      del_response_plan,
145
      (Datum[]){DirectFunctionCall3(interval_in, CStringGetDatum(ttl), ObjectIdGetDatum(InvalidOid),
80✔
146
                                    Int32GetDatum(-1)),
147
                Int32GetDatum(batch_size)},
80✔
148
      NULL, false, 0);
149

150
  uint64 affected_rows = SPI_processed;
80✔
151

152
  if (ret_code != SPI_OK_DELETE) {
80✔
153
    ereport(ERROR,
×
154
            errmsg("Error expiring response table rows: %s", SPI_result_code_string(ret_code)));
155
  }
156

157
  return affected_rows;
80✔
158
}
159

160
uint64 consume_request_queue(const int batch_size) {
80✔
161
  if (del_return_queue_plan == NULL) {
80✔
162
    SPIPlanPtr tmp = SPI_prepare("\
36✔
163
        WITH\
164
        rows AS (\
165
          SELECT id\
166
          FROM net.http_request_queue\
167
          ORDER BY id\
168
          LIMIT $1\
169
        )\
170
        DELETE FROM net.http_request_queue q\
171
        USING rows WHERE q.id = rows.id\
172
        RETURNING q.id, q.method, q.url, timeout_milliseconds, array(select key || ': ' || value from jsonb_each_text(q.headers)), q.body",
173
                                 1, (Oid[]){INT4OID});
18✔
174

175
    if (tmp == NULL)
18✔
176
      ereport(ERROR, errmsg("SPI_prepare failed: %s", SPI_result_code_string(SPI_result)));
×
177

178
    del_return_queue_plan = SPI_saveplan(tmp);
18✔
179
    if (del_return_queue_plan == NULL) ereport(ERROR, errmsg("SPI_saveplan failed"));
18✔
180
  }
181

182
  int ret_code =
80✔
183
      SPI_execute_plan(del_return_queue_plan, (Datum[]){Int32GetDatum(batch_size)}, NULL, false, 0);
80✔
184

185
  if (ret_code != SPI_OK_DELETE_RETURNING)
80✔
186
    ereport(ERROR,
×
187
            errmsg("Error getting http request queue: %s", SPI_result_code_string(ret_code)));
188

189
  return SPI_processed;
80✔
190
}
191

192
// This has an implicit dependency on the execution of
193
// delete_return_request_queue, unfortunately we're not able to make this
194
// dependency explicit due to the design of SPI (which uses global variables)
195
RequestQueueRow get_request_queue_row(HeapTuple spi_tupval, TupleDesc spi_tupdesc) {
308✔
196
  bool tupIsNull = false;
308✔
197

198
  int64 id = DatumGetInt64(SPI_getbinval(spi_tupval, spi_tupdesc, 1, &tupIsNull));
308✔
199
  EREPORT_NULL_ATTR(tupIsNull, id);
308✔
200

201
  Datum method = SPI_getbinval(spi_tupval, spi_tupdesc, 2, &tupIsNull);
308✔
202
  EREPORT_NULL_ATTR(tupIsNull, method);
308✔
203

204
  Datum url = SPI_getbinval(spi_tupval, spi_tupdesc, 3, &tupIsNull);
308✔
205
  EREPORT_NULL_ATTR(tupIsNull, url);
308✔
206

207
  int32 timeout_milliseconds = DatumGetInt32(SPI_getbinval(spi_tupval, spi_tupdesc, 4, &tupIsNull));
308✔
208
  EREPORT_NULL_ATTR(tupIsNull, timeout_milliseconds);
308✔
209

210
  NullableDatum headersBin = {.value  = SPI_getbinval(spi_tupval, spi_tupdesc, 5, &tupIsNull),
308✔
211
                              .isnull = tupIsNull};
212

213
  NullableDatum bodyBin = {.value  = SPI_getbinval(spi_tupval, spi_tupdesc, 6, &tupIsNull),
308✔
214
                           .isnull = tupIsNull};
215

216
  return (RequestQueueRow){id, method, url, timeout_milliseconds, headersBin, bodyBin};
308✔
217
}
218

219
static Jsonb *jsonb_headers_from_curl_handle(CURL *ez_handle) {
235✔
220
  struct curl_header *header, *prev = NULL;
235✔
221

222
  JsonbParseState *headers = NULL;
235✔
223
  (void)pushJsonbValue(&headers, WJB_BEGIN_OBJECT, NULL);
235✔
224

225
  while ((header = curl_easy_nextheader(ez_handle, CURLH_HEADER, 0, prev))) {
1,203✔
226
    JsonbValue key   = {.type = jbvString,
968✔
227
                        .val  = {.string = {.val = header->name, .len = strlen(header->name)}}};
968✔
228
    JsonbValue value = {.type = jbvString,
968✔
229
                        .val  = {.string = {.val = header->value, .len = strlen(header->value)}}};
968✔
230
    (void)pushJsonbValue(&headers, WJB_KEY, &key);
968✔
231
    (void)pushJsonbValue(&headers, WJB_VALUE, &value);
968✔
232
    prev = header;
968✔
233
  }
234

235
  Jsonb *jsonb_headers = JsonbValueToJsonb(pushJsonbValue(&headers, WJB_END_OBJECT, NULL));
235✔
236

237
  return jsonb_headers;
235✔
238
}
239

240
void insert_response(CurlHandle *handle, CURLcode curl_return_code) {
308✔
241
  enum { nparams = 7 }; // using an enum because const size_t nparams doesn't compile
308✔
242
  Datum vals[nparams];
308✔
243
  char  nulls[nparams];
308✔
244
  MemSet(nulls, 'n', nparams);
308✔
245

246
  vals[0]  = Int64GetDatum(handle->id);
308✔
247
  nulls[0] = ' ';
308✔
248

249
  if (curl_return_code == CURLE_OK) {
308✔
250
    Jsonb *jsonb_headers        = jsonb_headers_from_curl_handle(handle->ez_handle);
235✔
251
    long   res_http_status_code = 0;
235✔
252

253
    EREPORT_CURL_GETINFO(handle->ez_handle, CURLINFO_RESPONSE_CODE, &res_http_status_code);
235✔
254

255
    vals[1]  = Int32GetDatum(res_http_status_code);
235✔
256
    nulls[1] = ' ';
235✔
257

258
    if (handle->body && handle->body->data[0] != '\0') {
235✔
259
      vals[2]  = CStringGetTextDatum(handle->body->data);
24✔
260
      nulls[2] = ' ';
24✔
261
    }
262

263
    vals[3]  = JsonbPGetDatum(jsonb_headers);
235✔
264
    nulls[3] = ' ';
235✔
265

266
    struct curl_header *hdr;
235✔
267
    if (curl_easy_header(handle->ez_handle, "content-type", 0, CURLH_HEADER, -1, &hdr) ==
235✔
268
        CURLHE_OK) {
269
      vals[4]  = CStringGetTextDatum(hdr->value);
24✔
270
      nulls[4] = ' ';
24✔
271
    }
272

273
    vals[5]  = BoolGetDatum(false);
235✔
274
    nulls[5] = ' ';
235✔
275
  } else {
276
    bool timed_out = curl_return_code == CURLE_OPERATION_TIMEDOUT;
73✔
277

278
    vals[5]  = BoolGetDatum(timed_out);
73✔
279
    nulls[5] = ' ';
73✔
280

281
    if (timed_out) {
73✔
282
      curl_timeout_msg timeout_msg =
52✔
283
          detailed_timeout_strerror(handle->ez_handle, handle->timeout_milliseconds);
52✔
284

285
      vals[6]  = CStringGetTextDatum(timeout_msg.msg);
52✔
286
      nulls[6] = ' ';
52✔
287
    } else {
288
      const char *error_msg = curl_easy_strerror(curl_return_code);
21✔
289

290
      if (error_msg) {
21✔
291
        vals[6]  = CStringGetTextDatum(error_msg);
21✔
292
        nulls[6] = ' ';
21✔
293
      }
294
    }
295
  }
296

297
  if (ins_response_plan == NULL) {
308✔
298
    SPIPlanPtr tmp = SPI_prepare(
30✔
299
        "\
300
        insert into net._http_response(id, status_code, content, headers, content_type, timed_out, error_msg) values ($1, $2, $3, $4, $5, $6, $7)",
301
        nparams, (Oid[nparams]){INT8OID, INT4OID, TEXTOID, JSONBOID, TEXTOID, BOOLOID, TEXTOID});
15✔
302

303
    if (tmp == NULL)
15✔
304
      ereport(ERROR, errmsg("SPI_prepare failed: %s", SPI_result_code_string(SPI_result)));
×
305

306
    ins_response_plan = SPI_saveplan(tmp);
15✔
307
    if (ins_response_plan == NULL) ereport(ERROR, errmsg("SPI_saveplan failed"));
15✔
308

309
    SPI_freeplan(tmp);
15✔
310
  }
311

312
  int ret_code = SPI_execute_plan(ins_response_plan, vals, nulls, false, 0);
308✔
313

314
  if (ret_code != SPI_OK_INSERT) {
308✔
315
    ereport(ERROR, errmsg("Error when inserting response: %s", SPI_result_code_string(ret_code)));
×
316
  }
317
}
308✔
318

319
void pfree_handle(CurlHandle *handle) {
308✔
320
  pfree(handle->url);
308✔
321
  pfree(handle->method);
308✔
322
  if (handle->req_body) pfree(handle->req_body);
308✔
323

324
  if (handle->body) destroyStringInfo(handle->body);
308✔
325

326
  if (handle->request_headers) // curl_slist_free_all already handles the NULL
308✔
327
                               // case, but be explicit about it
328
    curl_slist_free_all(handle->request_headers);
308✔
329
}
308✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc