• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_net / 19383742840

15 Nov 2025 03:28AM UTC coverage: 93.421% (+0.05%) from 93.371%
19383742840

push

github

steve-chavez
bump to 0.20.2

497 of 532 relevant lines covered (93.42%)

201.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.53
/src/event.c
1
#include <errno.h>
2
#include <stddef.h>
3
#include <unistd.h>
4

5
#include "pg_prelude.h"
6

7
#include "event.h"
8

9
#ifdef WAIT_USE_EPOLL
10

11
static int  timerfd       = 0;
12
static bool timer_created = false;
13

14
typedef struct epoll_event epoll_event;
15
typedef struct itimerspec  itimerspec;
16

17
inline int wait_event(int fd, event *events, size_t maxevents, int timeout_milliseconds) {
170✔
18
  return epoll_wait(fd, events, maxevents, timeout_milliseconds);
170✔
19
}
20

21
inline int event_monitor() {
19✔
22
  return epoll_create1(0);
19✔
23
}
24

25
void ev_monitor_close(WorkerState *wstate) {
19✔
26
  close(wstate->epfd);
19✔
27
  close(timerfd);
19✔
28
}
19✔
29

30
int multi_timer_cb(__attribute__((unused)) CURLM *multi, long timeout_ms, void *userp) {
651✔
31
  WorkerState *wstate = (WorkerState *)userp;
651✔
32
  elog(DEBUG2, "multi_timer_cb: Setting timeout to %ld ms\n", timeout_ms);
651✔
33

34
  if (!timer_created) {
651✔
35
    timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
15✔
36
    if (timerfd < 0) {
15✔
37
      ereport(ERROR, errmsg("Failed to create timerfd"));
×
38
    }
39
    timerfd_settime(timerfd, 0, &(itimerspec){}, NULL);
15✔
40
    epoll_ctl(wstate->epfd, EPOLL_CTL_ADD, timerfd,
15✔
41
              &(epoll_event){.events = EPOLLIN, .data.fd = timerfd});
15✔
42

43
    timer_created = true;
15✔
44
  }
45

46
  // disable clang-format as it only hurts readability here
47
  // clang-format off
48
  itimerspec its =
361✔
49
    timeout_ms > 0 ?
50
    // assign the timeout normally
51
    (itimerspec){
52
      .it_value.tv_sec = timeout_ms / 1000,
290✔
53
      .it_value.tv_nsec = (timeout_ms % 1000) * 1000 * 1000,
290✔
54
    }:
651✔
55
    timeout_ms == 0 ?
56
    /* libcurl wants us to timeout now, however setting both fields of
57
     * new_value.it_value to zero disarms the timer. The closest we can
58
     * do is to schedule the timer to fire in 1 ns. */
59
    (itimerspec){
60
      .it_value.tv_sec = 0,
61
      .it_value.tv_nsec = 1,
62
    }:
361✔
63
     // libcurl passes a -1 to indicate the timer should be deleted
64
    (itimerspec){};
65
  // clang-format on
66

67
  int no_flags = 0;
651✔
68
  if (timerfd_settime(timerfd, no_flags, &its, NULL) < 0) {
651✔
69
    ereport(ERROR, errmsg("timerfd_settime failed"));
×
70
  }
71

72
  return 0;
651✔
73
}
74

75
int multi_socket_cb(__attribute__((unused)) CURL *easy, curl_socket_t sockfd, int what, void *userp,
1,428✔
76
                    void *socketp) {
77
  WorkerState *wstate     = (WorkerState *)userp;
1,428✔
78
  static char *whatstrs[] = {"NONE", "CURL_POLL_IN", "CURL_POLL_OUT", "CURL_POLL_INOUT",
1,428✔
79
                             "CURL_POLL_REMOVE"};
80
  elog(DEBUG2, "multi_socket_cb: sockfd %d received %s", sockfd, whatstrs[what]);
1,428✔
81

82
  int epoll_op;
1,428✔
83
  if (!socketp) {
1,428✔
84
    epoll_op           = EPOLL_CTL_ADD;
583✔
85
    bool socket_exists = true;
583✔
86
    curl_multi_assign(wstate->curl_mhandle, sockfd, &socket_exists);
583✔
87
  } else if (what == CURL_POLL_REMOVE) {
845✔
88
    epoll_op           = EPOLL_CTL_DEL;
583✔
89
    bool socket_exists = false;
583✔
90
    curl_multi_assign(wstate->curl_mhandle, sockfd, &socket_exists);
583✔
91
  } else {
92
    epoll_op = EPOLL_CTL_MOD;
93
  }
94

95
  epoll_event ev = {
2,856✔
96
    .data.fd = sockfd,
97
    .events  = (what & CURL_POLL_IN)    ? EPOLLIN
1,428✔
98
               : (what & CURL_POLL_OUT) ? EPOLLOUT
1,428✔
99
                                        : 0, // no event is assigned since here we get
1,132✔
100
                                             // CURL_POLL_REMOVE and the sockfd will be removed
101
  };
102

103
  // epoll_ctl will copy ev, so there's no need to do palloc for the epoll_event
104
  // https://github.com/torvalds/linux/blob/e32cde8d2bd7d251a8f9b434143977ddf13dcec6/fs/eventpoll.c#L2408-L2418
105
  if (epoll_ctl(wstate->epfd, epoll_op, sockfd, &ev) < 0) {
1,428✔
106
    int          e        = errno;
×
107
    static char *opstrs[] = {"NONE", "EPOLL_CTL_ADD", "EPOLL_CTL_DEL", "EPOLL_CTL_MOD"};
×
108
    ereport(ERROR, errmsg("epoll_ctl with %s failed when receiving %s for sockfd %d: %s",
×
109
                          whatstrs[what], opstrs[epoll_op], sockfd, strerror(e)));
110
  }
111

112
  return 0;
1,428✔
113
}
114

115
bool is_timer(event ev) {
857✔
116
  return ev.data.fd == timerfd;
857✔
117
}
118

119
int get_curl_event(event ev) {
793✔
120
  int ev_bitmask = ev.events & EPOLLIN    ? CURL_CSELECT_IN
1,586✔
121
                   : ev.events & EPOLLOUT ? CURL_CSELECT_OUT
793✔
122
                                          : CURL_CSELECT_ERR;
548✔
123
  return ev_bitmask;
793✔
124
}
125

126
int get_socket_fd(event ev) {
793✔
127
  return ev.data.fd;
793✔
128
}
129

130
#else
131

132
typedef struct {
133
  curl_socket_t sockfd;
134
  int           action;
135
} SocketInfo;
136

137
int inline wait_event(int fd, event *events, size_t maxevents, int timeout_milliseconds) {
138
  return kevent(fd, NULL, 0, events, maxevents,
139
                &(struct timespec){.tv_sec = timeout_milliseconds / 1000});
140
}
141

142
int inline event_monitor() {
143
  return kqueue();
144
}
145

146
void ev_monitor_close(WorkerState *wstate) {
147
  close(wstate->epfd);
148
}
149

150
int multi_timer_cb(__attribute__((unused)) CURLM *multi, long timeout_ms, void *userp) {
151
  WorkerState *wstate = (WorkerState *)userp;
152
  elog(DEBUG2, "multi_timer_cb: Setting timeout to %ld ms\n", timeout_ms);
153
  event timer_event;
154
  int   id = 1;
155

156
  if (timeout_ms > 0) {
157
    EV_SET(&timer_event, id, EVFILT_TIMER, EV_ADD, 0, timeout_ms,
158
           NULL); // 0 means milliseconds (the default)
159
  } else if (timeout_ms == 0) {
160
    /* libcurl wants us to timeout now, however setting both fields of
161
     * new_value.it_value to zero disarms the timer. The closest we can
162
     * do is to schedule the timer to fire in 1 ns. */
163
    EV_SET(&timer_event, id, EVFILT_TIMER, EV_ADD, NOTE_NSECONDS, 1, NULL);
164
  } else {
165
    // libcurl passes a -1 to indicate the timer should be deleted
166
    EV_SET(&timer_event, id, EVFILT_TIMER, EV_DELETE, 0, 0, NULL);
167
  }
168

169
  if (kevent(wstate->epfd, &timer_event, 1, NULL, 0, NULL) < 0) {
170
    int save_errno = errno;
171
    ereport(ERROR, errmsg("kevent with EVFILT_TIMER failed: %s", strerror(save_errno)));
172
  }
173

174
  return 0;
175
}
176

177
int multi_socket_cb(__attribute__((unused)) CURL *easy, curl_socket_t sockfd, int what, void *userp,
178
                    void *socketp) {
179
  WorkerState *wstate     = (WorkerState *)userp;
180
  static char *whatstrs[] = {"NONE", "CURL_POLL_IN", "CURL_POLL_OUT", "CURL_POLL_INOUT",
181
                             "CURL_POLL_REMOVE"};
182
  elog(DEBUG2, "multi_socket_cb: sockfd %d received %s", sockfd, whatstrs[what]);
183

184
  SocketInfo   *sock_info = (SocketInfo *)socketp;
185
  struct kevent ev[2];
186
  int           count = 0;
187

188
  if (what == CURL_POLL_REMOVE) {
189
    if (sock_info->action & CURL_POLL_IN)
190
      EV_SET(&ev[count++], sockfd, EVFILT_READ, EV_DELETE, 0, 0, sock_info);
191

192
    if (sock_info->action & CURL_POLL_OUT)
193
      EV_SET(&ev[count++], sockfd, EVFILT_WRITE, EV_DELETE, 0, 0, sock_info);
194

195
    curl_multi_assign(wstate->curl_mhandle, sockfd, NULL);
196
    pfree(sock_info);
197
  } else {
198
    if (!sock_info) {
199
      sock_info         = palloc(sizeof(SocketInfo));
200
      sock_info->sockfd = sockfd;
201
      sock_info->action = what;
202
      curl_multi_assign(wstate->curl_mhandle, sockfd, sock_info);
203
    }
204

205
    if (what & CURL_POLL_IN) EV_SET(&ev[count++], sockfd, EVFILT_READ, EV_ADD, 0, 0, sock_info);
206

207
    if (what & CURL_POLL_OUT) EV_SET(&ev[count++], sockfd, EVFILT_WRITE, EV_ADD, 0, 0, sock_info);
208
  }
209

210
  Assert(count <= 2);
211

212
  if (kevent(wstate->epfd, &ev[0], count, NULL, 0, NULL) < 0) {
213
    int save_errno = errno;
214
    ereport(ERROR, errmsg("kevent with %s failed for sockfd %d: %s", whatstrs[what], sockfd,
215
                          strerror(save_errno)));
216
  }
217

218
  return 0;
219
}
220

221
bool is_timer(event ev) {
222
  return ev.filter == EVFILT_TIMER;
223
}
224

225
int get_curl_event(event ev) {
226
  int ev_bitmask = 0;
227
  if (ev.filter == EVFILT_READ)
228
    ev_bitmask |= CURL_CSELECT_IN;
229
  else if (ev.filter == EVFILT_WRITE)
230
    ev_bitmask |= CURL_CSELECT_OUT;
231
  else
232
    ev_bitmask = CURL_CSELECT_ERR;
233

234
  return ev_bitmask;
235
}
236

237
int get_socket_fd(event ev) {
238
  SocketInfo *sock_info = (SocketInfo *)ev.udata;
239

240
  return sock_info->sockfd;
241
}
242

243
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc