• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_net / 17082809744

19 Aug 2025 09:59PM UTC coverage: 92.365% (+0.2%) from 92.12%
17082809744

Pull #226

github

web-flow
Merge 3ed0801d9 into a7792bfd9
Pull Request #226: fix: timed_out in net._http_response is always NULL

43 of 44 new or added lines in 1 file covered. (97.73%)

5 existing lines in 2 files now uncovered.

496 of 537 relevant lines covered (92.36%)

207.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.65
/src/core.c
1
#include <unistd.h>
2
#include <errno.h>
3
#include <string.h>
4
#include <inttypes.h>
5

6
#include "pg_prelude.h"
7
#include "curl_prelude.h"
8
#include "core.h"
9
#include "event.h"
10
#include "errors.h"
11

12
typedef struct {
13
  int64 id;
14
  StringInfo body;
15
  struct curl_slist* request_headers;
16
  int32 timeout_milliseconds;
17
} CurlData;
18

19
static size_t
20
body_cb(void *contents, size_t size, size_t nmemb, void *userp)
24✔
21
{
22
  CurlData *cdata = (CurlData*) userp;
24✔
23
  size_t realsize = size * nmemb;
24✔
24
  appendBinaryStringInfo(cdata->body, (const char*)contents, (int)realsize);
24✔
25
  return realsize;
24✔
26
}
27

28
static struct curl_slist *pg_text_array_to_slist(ArrayType *array,
308✔
29
                                          struct curl_slist *headers) {
30
    ArrayIterator iterator;
308✔
31
    Datum value;
308✔
32
    bool isnull;
308✔
33
    char *hdr;
308✔
34

35
    iterator = array_create_iterator(array, 0, NULL);
308✔
36

37
    while (array_iterate(iterator, &value, &isnull)) {
318✔
38
        if (isnull) {
10✔
39
            continue;
×
40
        }
41

42
        hdr = TextDatumGetCString(value);
10✔
43
        EREPORT_CURL_SLIST_APPEND(headers, hdr);
10✔
44
        pfree(hdr);
10✔
45
    }
46
    array_free_iterator(iterator);
308✔
47

48
    return headers;
308✔
49
}
50

51
// We need a different memory context here, as the parent function will have an SPI memory context, which has a shorter lifetime.
52
static void init_curl_handle(CURLM *curl_mhandle, MemoryContext curl_memctx, int64 id, Datum urlBin, NullableDatum bodyBin, NullableDatum headersBin, Datum methodBin, int32 timeout_milliseconds){
308✔
53
  MemoryContext old_ctx = MemoryContextSwitchTo(curl_memctx);
308✔
54

55
  CurlData *cdata = palloc(sizeof(CurlData));
308✔
56
  cdata->id   = id;
308✔
57
  cdata->body = makeStringInfo();
308✔
58

59
  cdata->timeout_milliseconds = timeout_milliseconds;
308✔
60

61
  if (!headersBin.isnull) {
308✔
62
    ArrayType *pgHeaders = DatumGetArrayTypeP(headersBin.value);
308✔
63
    struct curl_slist *request_headers = NULL;
308✔
64

65
    request_headers = pg_text_array_to_slist(pgHeaders, request_headers);
308✔
66

67
    EREPORT_CURL_SLIST_APPEND(request_headers, "User-Agent: pg_net/" EXTVERSION);
308✔
68

69
    cdata->request_headers = request_headers;
308✔
70
  }
71

72
  char *url = TextDatumGetCString(urlBin);
308✔
73

74
  char *reqBody = !bodyBin.isnull ? TextDatumGetCString(bodyBin.value) : NULL;
308✔
75

76
  char *method = TextDatumGetCString(methodBin);
308✔
77
  if (strcasecmp(method, "GET") != 0 && strcasecmp(method, "POST") != 0 && strcasecmp(method, "DELETE") != 0) {
308✔
78
    ereport(ERROR, errmsg("Unsupported request method %s", method));
×
79
  }
80

81
  CURL *curl_ez_handle = curl_easy_init();
308✔
82
  if(!curl_ez_handle)
308✔
83
    ereport(ERROR, errmsg("curl_easy_init()"));
×
84

85
  if (strcasecmp(method, "GET") == 0) {
308✔
86
    if (reqBody) {
298✔
87
      EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_POSTFIELDS, reqBody);
×
88
      EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_CUSTOMREQUEST, "GET");
×
89
    }
90
  }
91

92
  if (strcasecmp(method, "POST") == 0) {
308✔
93
    if (reqBody) {
4✔
94
      EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_POSTFIELDS, reqBody);
3✔
95
    }
96
    else {
97
      EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_POST, 1L);
1✔
98
      EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_POSTFIELDSIZE, 0L);
1✔
99
    }
100
  }
101

102
  if (strcasecmp(method, "DELETE") == 0) {
308✔
103
    EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_CUSTOMREQUEST, "DELETE");
6✔
104
    if (reqBody) {
6✔
105
      EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_POSTFIELDS, reqBody);
1✔
106
    }
107
  }
108

109
  EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_WRITEFUNCTION, body_cb);
308✔
110
  EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_WRITEDATA, cdata);
308✔
111
  EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_HEADER, 0L);
308✔
112
  EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_URL, url);
308✔
113
  EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_HTTPHEADER, cdata->request_headers);
308✔
114
  EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_TIMEOUT_MS, (long) cdata->timeout_milliseconds);
308✔
115
  EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_PRIVATE, cdata);
308✔
116
  EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_FOLLOWLOCATION, (long) true);
308✔
117
  if (log_min_messages <= DEBUG2)
308✔
118
    EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_VERBOSE, 1L);
×
119
#if LIBCURL_VERSION_NUM >= 0x075500 /* libcurl 7.85.0 */
120
  EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_PROTOCOLS_STR, "http,https");
308✔
121
#else
122
  EREPORT_CURL_SETOPT(curl_ez_handle, CURLOPT_PROTOCOLS, CURLPROTO_HTTP | CURLPROTO_HTTPS);
123
#endif
124

125
  EREPORT_MULTI(
308✔
126
    curl_multi_add_handle(curl_mhandle, curl_ez_handle)
127
  );
128

129
  MemoryContextSwitchTo(old_ctx);
308✔
130
}
308✔
131

132
void set_curl_mhandle(WorkerState *wstate){
19✔
133
  EREPORT_CURL_MULTI_SETOPT(wstate->curl_mhandle, CURLMOPT_SOCKETFUNCTION, multi_socket_cb);
19✔
134
  EREPORT_CURL_MULTI_SETOPT(wstate->curl_mhandle, CURLMOPT_SOCKETDATA, wstate);
19✔
135
  EREPORT_CURL_MULTI_SETOPT(wstate->curl_mhandle, CURLMOPT_TIMERFUNCTION, multi_timer_cb);
19✔
136
  EREPORT_CURL_MULTI_SETOPT(wstate->curl_mhandle, CURLMOPT_TIMERDATA, wstate);
19✔
137
}
19✔
138

139
uint64 delete_expired_responses(char *ttl, int batch_size){
80✔
140
  SPI_connect();
80✔
141

142
  int ret_code = SPI_execute_with_args("\
240✔
143
    WITH\
144
    rows AS (\
145
      SELECT ctid\
146
      FROM net._http_response\
147
      WHERE created < now() - $1\
148
      ORDER BY created\
149
      LIMIT $2\
150
    )\
151
    DELETE FROM net._http_response r\
152
    USING rows WHERE r.ctid = rows.ctid",
153
    2,
154
    (Oid[]){INTERVALOID, INT4OID},
80✔
155
    (Datum[]){
80✔
156
      DirectFunctionCall3(interval_in, CStringGetDatum(ttl), ObjectIdGetDatum(InvalidOid), Int32GetDatum(-1))
80✔
157
    , Int32GetDatum(batch_size)
80✔
158
    }, NULL, false, 0);
159

160
  uint64 affected_rows = SPI_processed;
80✔
161

162
  if (ret_code != SPI_OK_DELETE)
80✔
163
  {
164
    ereport(ERROR, errmsg("Error expiring response table rows: %s", SPI_result_code_string(ret_code)));
×
165
  }
166

167
  SPI_finish();
80✔
168

169
  return affected_rows;
80✔
170
}
171

172
uint64 consume_request_queue(CURLM *curl_mhandle, int batch_size, MemoryContext curl_memctx){
80✔
173
  SPI_connect();
80✔
174

175
  int ret_code = SPI_execute_with_args("\
80✔
176
    WITH\
177
    rows AS (\
178
      SELECT id\
179
      FROM net.http_request_queue\
180
      ORDER BY id\
181
      LIMIT $1\
182
    )\
183
    DELETE FROM net.http_request_queue q\
184
    USING rows WHERE q.id = rows.id\
185
    RETURNING q.id, q.method, q.url, timeout_milliseconds, array(select key || ': ' || value from jsonb_each_text(q.headers)), q.body",
186
    1,
187
    (Oid[]){INT4OID},
80✔
188
    (Datum[]){Int32GetDatum(batch_size)},
80✔
189
    NULL, false, 0);
190

191
  if (ret_code != SPI_OK_DELETE_RETURNING)
80✔
192
    ereport(ERROR, errmsg("Error getting http request queue: %s", SPI_result_code_string(ret_code)));
×
193

194
  uint64 affected_rows = SPI_processed;
80✔
195

196
  for (size_t j = 0; j < affected_rows; j++) {
388✔
197
    bool tupIsNull = false;
308✔
198

199
    int64 id = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 1, &tupIsNull));
308✔
200
    EREPORT_NULL_ATTR(tupIsNull, id);
308✔
201

202
    int32 timeout_milliseconds = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 4, &tupIsNull));
308✔
203
    EREPORT_NULL_ATTR(tupIsNull, timeout_milliseconds);
308✔
204

205
    Datum method = SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 2, &tupIsNull);
308✔
206
    EREPORT_NULL_ATTR(tupIsNull, method);
308✔
207

208
    Datum url = SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 3, &tupIsNull);
308✔
209
    EREPORT_NULL_ATTR(tupIsNull, url);
308✔
210

211
    NullableDatum headersBin = {
616✔
212
      .value = SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 5, &tupIsNull),
308✔
213
      .isnull = tupIsNull
214
    };
215

216
    NullableDatum bodyBin = {
616✔
217
      .value = SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 6, &tupIsNull),
308✔
218
      .isnull = tupIsNull
219
    };
220

221
    init_curl_handle(curl_mhandle, curl_memctx, id, url, bodyBin, headersBin, method, timeout_milliseconds);
308✔
222
  }
223

224
  SPI_finish();
80✔
225

226
  return affected_rows;
80✔
227
}
228

229
static void pfree_curl_data(CurlData *cdata){
308✔
230
  if(cdata->body){
308✔
231
    pfree(cdata->body->data);
308✔
232
    pfree(cdata->body);
308✔
233
  }
234
  if(cdata->request_headers) //curl_slist_free_all already handles the NULL case, but be explicit about it
308✔
235
    curl_slist_free_all(cdata->request_headers);
308✔
236
}
308✔
237

238
static Jsonb *jsonb_headers_from_curl_handle(CURL *ez_handle){
235✔
239
  struct curl_header *header, *prev = NULL;
235✔
240

241
  JsonbParseState *headers = NULL;
235✔
242
  (void)pushJsonbValue(&headers, WJB_BEGIN_OBJECT, NULL);
235✔
243

244
  while((header = curl_easy_nextheader(ez_handle, CURLH_HEADER, 0, prev))) {
1,203✔
245
    JsonbValue key   = {.type = jbvString, .val = {.string = {.val = header->name,  .len = strlen(header->name)}}};
968✔
246
    JsonbValue value = {.type = jbvString, .val = {.string = {.val = header->value, .len = strlen(header->value)}}};
968✔
247
    (void)pushJsonbValue(&headers, WJB_KEY,   &key);
968✔
248
    (void)pushJsonbValue(&headers, WJB_VALUE, &value);
968✔
249
    prev = header;
968✔
250
  }
251

252
  Jsonb *jsonb_headers = JsonbValueToJsonb(pushJsonbValue(&headers, WJB_END_OBJECT, NULL));
235✔
253

254
  return jsonb_headers;
235✔
255
}
256

257
static void insert_response(CURL *ez_handle, CurlData *cdata, CURLcode curl_return_code){
308✔
258
  enum { nparams = 7 }; // using an enum because const size_t nparams doesn't compile
308✔
259
  Datum vals[nparams];
308✔
260
  char  nulls[nparams] = {' '};
308✔
261

262
  vals[0] = Int64GetDatum(cdata->id);
308✔
263

264
  if (curl_return_code == CURLE_OK) {
308✔
265
    Jsonb *jsonb_headers = jsonb_headers_from_curl_handle(ez_handle);
235✔
266
    long res_http_status_code = 0;
235✔
267

268
    EREPORT_CURL_GETINFO(ez_handle, CURLINFO_RESPONSE_CODE, &res_http_status_code);
235✔
269

270
    vals[1] = Int32GetDatum(res_http_status_code);
235✔
271

272
    if (cdata->body && cdata->body->data[0] != '\0')
235✔
273
      vals[2] = CStringGetDatum(cdata->body->data);
24✔
274
    else
275
      nulls[2] = 'n';
211✔
276

277
    vals[3] = JsonbPGetDatum(jsonb_headers);
235✔
278

279
    struct curl_header *hdr;
235✔
280
    if (curl_easy_header(ez_handle, "content-type", 0, CURLH_HEADER, -1, &hdr) == CURLHE_OK)
235✔
281
      vals[4] = CStringGetTextDatum(hdr->value);
24✔
282
    else
283
      nulls[4] = 'n';
211✔
284

285
    vals[5] = BoolGetDatum(false);
235✔
286

287
    nulls[6] = 'n';
235✔
288
  } else {
289
    bool timed_out = curl_return_code == CURLE_OPERATION_TIMEDOUT;
73✔
290
    char *error_msg = NULL;
73✔
291

292
    if (timed_out){
73✔
293
      error_msg = detailed_timeout_strerror(ez_handle, cdata->timeout_milliseconds).msg;
52✔
294
    } else {
295
      error_msg = (char *) curl_easy_strerror(curl_return_code);
21✔
296
    }
297

298
    nulls[1] = 'n';
73✔
299
    nulls[2] = 'n';
73✔
300
    nulls[3] = 'n';
73✔
301
    nulls[4] = 'n';
73✔
302

303
    vals[5] = BoolGetDatum(timed_out);
73✔
304

305
    if (error_msg)
73✔
306
        vals[6] = CStringGetTextDatum(error_msg);
73✔
307
  }
308

309
  int ret_code = SPI_execute_with_args("\
616✔
310
      insert into net._http_response(id, status_code, content, headers, content_type, timed_out, error_msg) values ($1, $2, $3, $4, $5, $6, $7)",
311
      nparams,
312
      (Oid[nparams]){INT8OID, INT4OID, CSTRINGOID, JSONBOID, TEXTOID, BOOLOID, TEXTOID},
308✔
313
      vals, nulls,
314
      false, 1);
315

316
  if (ret_code != SPI_OK_INSERT)
308✔
317
  {
NEW
318
    ereport(ERROR, errmsg("Error when inserting response: %s", SPI_result_code_string(ret_code)));
×
319
  }
320
}
308✔
321

322
// Switch back to the curl memory context, which has the curl handles stored
323
void insert_curl_responses(WorkerState *wstate, MemoryContext curl_memctx){
857✔
324
  MemoryContext old_ctx = MemoryContextSwitchTo(curl_memctx);
857✔
325
  int msgs_left=0;
857✔
326
  CURLMsg *msg = NULL;
857✔
327
  CURLM *curl_mhandle = wstate->curl_mhandle;
857✔
328

329
  while ((msg = curl_multi_info_read(curl_mhandle, &msgs_left))) {
2,022✔
330
    if (msg->msg == CURLMSG_DONE) {
308✔
331
      CURLcode return_code = msg->data.result;
308✔
332
      CURL *ez_handle= msg->easy_handle;
308✔
333
      CurlData *cdata = NULL;
308✔
334
      EREPORT_CURL_GETINFO(ez_handle, CURLINFO_PRIVATE, &cdata);
308✔
335

336
      SPI_connect();
308✔
337
      insert_response(ez_handle, cdata, return_code);
308✔
338
      SPI_finish();
308✔
339

340
      pfree_curl_data(cdata);
308✔
341

342
      int res = curl_multi_remove_handle(curl_mhandle, ez_handle);
308✔
343
      if(res != CURLM_OK)
308✔
UNCOV
344
        ereport(ERROR, errmsg("curl_multi_remove_handle: %s", curl_multi_strerror(res)));
×
345

346
      curl_easy_cleanup(ez_handle);
308✔
347
    } else {
348
      ereport(ERROR, errmsg("curl_multi_info_read(), CURLMsg=%d\n", msg->msg));
1,165✔
349
    }
350
  }
351

352
  MemoryContextSwitchTo(old_ctx);
857✔
353
}
857✔
354

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc