• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_net / 19381500762

15 Nov 2025 12:35AM UTC coverage: 93.371%. Remained the same
19381500762

Pull #246

github

web-flow
Merge c75123a3c into e0e6d08c6
Pull Request #246: chore: upgrade to nixpkgs 2025-11-13

493 of 528 relevant lines covered (93.37%)

195.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.47
/src/event.c
1
#include <errno.h>
2
#include <stddef.h>
3
#include <unistd.h>
4

5
#include "pg_prelude.h"
6

7
#include "event.h"
8

9
#ifdef WAIT_USE_EPOLL
10

11
static int  timerfd       = 0;
12
static bool timer_created = false;
13

14
typedef struct epoll_event epoll_event;
15
typedef struct itimerspec  itimerspec;
16

17
inline int wait_event(int fd, event *events, size_t maxevents, int timeout_milliseconds) {
171✔
18
  return epoll_wait(fd, events, maxevents, timeout_milliseconds);
171✔
19
}
20

21
inline int event_monitor() {
19✔
22
  return epoll_create1(0);
19✔
23
}
24

25
void ev_monitor_close(WorkerState *wstate) {
19✔
26
  close(wstate->epfd);
19✔
27
  close(timerfd);
19✔
28
}
19✔
29

30
int multi_timer_cb(__attribute__((unused)) CURLM *multi, long timeout_ms, WorkerState *wstate) {
649✔
31
  elog(DEBUG2, "multi_timer_cb: Setting timeout to %ld ms\n", timeout_ms);
649✔
32

33
  if (!timer_created) {
649✔
34
    timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
15✔
35
    if (timerfd < 0) {
15✔
36
      ereport(ERROR, errmsg("Failed to create timerfd"));
×
37
    }
38
    timerfd_settime(timerfd, 0, &(itimerspec){}, NULL);
15✔
39
    epoll_ctl(wstate->epfd, EPOLL_CTL_ADD, timerfd,
15✔
40
              &(epoll_event){.events = EPOLLIN, .data.fd = timerfd});
15✔
41

42
    timer_created = true;
15✔
43
  }
44

45
  // disable clang-format as it only hurts readability here
46
  // clang-format off
47
  itimerspec its =
363✔
48
    timeout_ms > 0 ?
49
    // assign the timeout normally
50
    (itimerspec){
51
      .it_value.tv_sec = timeout_ms / 1000,
286✔
52
      .it_value.tv_nsec = (timeout_ms % 1000) * 1000 * 1000,
286✔
53
    }:
649✔
54
    timeout_ms == 0 ?
55
    /* libcurl wants us to timeout now, however setting both fields of
56
     * new_value.it_value to zero disarms the timer. The closest we can
57
     * do is to schedule the timer to fire in 1 ns. */
58
    (itimerspec){
59
      .it_value.tv_sec = 0,
60
      .it_value.tv_nsec = 1,
61
    }:
363✔
62
     // libcurl passes a -1 to indicate the timer should be deleted
63
    (itimerspec){};
64
  // clang-format on
65

66
  int no_flags = 0;
649✔
67
  if (timerfd_settime(timerfd, no_flags, &its, NULL) < 0) {
649✔
68
    ereport(ERROR, errmsg("timerfd_settime failed"));
×
69
  }
70

71
  return 0;
649✔
72
}
73

74
int multi_socket_cb(__attribute__((unused)) CURL *easy, curl_socket_t sockfd, int what,
1,429✔
75
                    WorkerState *wstate, void *socketp) {
76
  static char *whatstrs[] = {"NONE", "CURL_POLL_IN", "CURL_POLL_OUT", "CURL_POLL_INOUT",
1,429✔
77
                             "CURL_POLL_REMOVE"};
78
  elog(DEBUG2, "multi_socket_cb: sockfd %d received %s", sockfd, whatstrs[what]);
1,429✔
79

80
  int epoll_op;
1,429✔
81
  if (!socketp) {
1,429✔
82
    epoll_op           = EPOLL_CTL_ADD;
582✔
83
    bool socket_exists = true;
582✔
84
    curl_multi_assign(wstate->curl_mhandle, sockfd, &socket_exists);
582✔
85
  } else if (what == CURL_POLL_REMOVE) {
847✔
86
    epoll_op           = EPOLL_CTL_DEL;
582✔
87
    bool socket_exists = false;
582✔
88
    curl_multi_assign(wstate->curl_mhandle, sockfd, &socket_exists);
582✔
89
  } else {
90
    epoll_op = EPOLL_CTL_MOD;
91
  }
92

93
  epoll_event ev = {
2,858✔
94
    .data.fd = sockfd,
95
    .events  = (what & CURL_POLL_IN)    ? EPOLLIN
1,429✔
96
               : (what & CURL_POLL_OUT) ? EPOLLOUT
1,429✔
97
                                        : 0, // no event is assigned since here we get
1,131✔
98
                                             // CURL_POLL_REMOVE and the sockfd will be removed
99
  };
100

101
  // epoll_ctl will copy ev, so there's no need to do palloc for the epoll_event
102
  // https://github.com/torvalds/linux/blob/e32cde8d2bd7d251a8f9b434143977ddf13dcec6/fs/eventpoll.c#L2408-L2418
103
  if (epoll_ctl(wstate->epfd, epoll_op, sockfd, &ev) < 0) {
1,429✔
104
    int          e        = errno;
×
105
    static char *opstrs[] = {"NONE", "EPOLL_CTL_ADD", "EPOLL_CTL_DEL", "EPOLL_CTL_MOD"};
×
106
    ereport(ERROR, errmsg("epoll_ctl with %s failed when receiving %s for sockfd %d: %s",
×
107
                          whatstrs[what], opstrs[epoll_op], sockfd, strerror(e)));
108
  }
109

110
  return 0;
1,429✔
111
}
112

113
bool is_timer(event ev) {
860✔
114
  return ev.data.fd == timerfd;
860✔
115
}
116

117
int get_curl_event(event ev) {
795✔
118
  int ev_bitmask = ev.events & EPOLLIN    ? CURL_CSELECT_IN
×
119
                   : ev.events & EPOLLOUT ? CURL_CSELECT_OUT
795✔
120
                                          : CURL_CSELECT_ERR;
548✔
121
  return ev_bitmask;
795✔
122
}
123

124
int get_socket_fd(event ev) {
795✔
125
  return ev.data.fd;
795✔
126
}
127

128
#else
129

130
typedef struct {
131
  curl_socket_t sockfd;
132
  int           action;
133
} SocketInfo;
134

135
int inline wait_event(int fd, event *events, size_t maxevents, int timeout_milliseconds) {
136
  return kevent(fd, NULL, 0, events, maxevents,
137
                &(struct timespec){.tv_sec = timeout_milliseconds / 1000});
138
}
139

140
int inline event_monitor() {
141
  return kqueue();
142
}
143

144
void ev_monitor_close(WorkerState *wstate) {
145
  close(wstate->epfd);
146
}
147

148
int multi_timer_cb(__attribute__((unused)) CURLM *multi, long timeout_ms, WorkerState *wstate) {
149
  elog(DEBUG2, "multi_timer_cb: Setting timeout to %ld ms\n", timeout_ms);
150
  event timer_event;
151
  int   id = 1;
152

153
  if (timeout_ms > 0) {
154
    EV_SET(&timer_event, id, EVFILT_TIMER, EV_ADD, 0, timeout_ms,
155
           NULL); // 0 means milliseconds (the default)
156
  } else if (timeout_ms == 0) {
157
    /* libcurl wants us to timeout now, however setting both fields of
158
     * new_value.it_value to zero disarms the timer. The closest we can
159
     * do is to schedule the timer to fire in 1 ns. */
160
    EV_SET(&timer_event, id, EVFILT_TIMER, EV_ADD, NOTE_NSECONDS, 1, NULL);
161
  } else {
162
    // libcurl passes a -1 to indicate the timer should be deleted
163
    EV_SET(&timer_event, id, EVFILT_TIMER, EV_DELETE, 0, 0, NULL);
164
  }
165

166
  if (kevent(wstate->epfd, &timer_event, 1, NULL, 0, NULL) < 0) {
167
    int save_errno = errno;
168
    ereport(ERROR, errmsg("kevent with EVFILT_TIMER failed: %s", strerror(save_errno)));
169
  }
170

171
  return 0;
172
}
173

174
int multi_socket_cb(__attribute__((unused)) CURL *easy, curl_socket_t sockfd, int what,
175
                    WorkerState *wstate, void *socketp) {
176
  static char *whatstrs[] = {"NONE", "CURL_POLL_IN", "CURL_POLL_OUT", "CURL_POLL_INOUT",
177
                             "CURL_POLL_REMOVE"};
178
  elog(DEBUG2, "multi_socket_cb: sockfd %d received %s", sockfd, whatstrs[what]);
179

180
  SocketInfo   *sock_info = (SocketInfo *)socketp;
181
  struct kevent ev[2];
182
  int           count = 0;
183

184
  if (what == CURL_POLL_REMOVE) {
185
    if (sock_info->action & CURL_POLL_IN)
186
      EV_SET(&ev[count++], sockfd, EVFILT_READ, EV_DELETE, 0, 0, sock_info);
187

188
    if (sock_info->action & CURL_POLL_OUT)
189
      EV_SET(&ev[count++], sockfd, EVFILT_WRITE, EV_DELETE, 0, 0, sock_info);
190

191
    curl_multi_assign(wstate->curl_mhandle, sockfd, NULL);
192
    pfree(sock_info);
193
  } else {
194
    if (!sock_info) {
195
      sock_info         = palloc(sizeof(SocketInfo));
196
      sock_info->sockfd = sockfd;
197
      sock_info->action = what;
198
      curl_multi_assign(wstate->curl_mhandle, sockfd, sock_info);
199
    }
200

201
    if (what & CURL_POLL_IN) EV_SET(&ev[count++], sockfd, EVFILT_READ, EV_ADD, 0, 0, sock_info);
202

203
    if (what & CURL_POLL_OUT) EV_SET(&ev[count++], sockfd, EVFILT_WRITE, EV_ADD, 0, 0, sock_info);
204
  }
205

206
  Assert(count <= 2);
207

208
  if (kevent(wstate->epfd, &ev[0], count, NULL, 0, NULL) < 0) {
209
    int save_errno = errno;
210
    ereport(ERROR, errmsg("kevent with %s failed for sockfd %d: %s", whatstrs[what], sockfd,
211
                          strerror(save_errno)));
212
  }
213

214
  return 0;
215
}
216

217
bool is_timer(event ev) {
218
  return ev.filter == EVFILT_TIMER;
219
}
220

221
int get_curl_event(event ev) {
222
  int ev_bitmask = 0;
223
  if (ev.filter == EVFILT_READ)
224
    ev_bitmask |= CURL_CSELECT_IN;
225
  else if (ev.filter == EVFILT_WRITE)
226
    ev_bitmask |= CURL_CSELECT_OUT;
227
  else
228
    ev_bitmask = CURL_CSELECT_ERR;
229

230
  return ev_bitmask;
231
}
232

233
int get_socket_fd(event ev) {
234
  SocketInfo *sock_info = (SocketInfo *)ev.udata;
235

236
  return sock_info->sockfd;
237
}
238

239
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc