31 #include <sys/epoll.h> 38 #define LINUX_POLL_DELAY 10 40 #define EPOLL_DEBUG(x...) do {} while (0) 43 static inline void es_add_inactive(
struct epoll_socket *es);
45 static inline void es_deactivate(
struct epoll_socket *es);
46 static inline void es_active_pushback(
struct epoll_socket *es);
47 static inline void es_remove_ep(
struct epoll_socket *es);
48 static inline void es_remove_sock(
struct epoll_socket *es);
50 int tas_epoll_create(
int size)
56 return tas_epoll_create1(0);
59 int tas_epoll_create1(
int flags)
64 if ((fd = tas_libc_epoll_create1(flags)) == -1) {
68 if (flextcp_fd_ealloc(&ep, fd) < 0) {
75 ep->active_last = NULL;
80 flextcp_fd_erelease(fd, ep);
84 int tas_epoll_ctl(
int epfd,
int op,
int fd,
struct epoll_event *event)
93 EPOLL_DEBUG(
"flextcp_epoll_ctl(%d, %d, %d, {events=%x})\n", epfd, op,
94 fd, (event != NULL ? event->events : -1));
96 if (flextcp_fd_elookup(epfd, &ep) != 0) {
102 if (flextcp_fd_slookup(fd, &s) != 0) {
105 if ((ret = tas_libc_epoll_ctl(epfd, op, fd, event)) != 0) {
110 if (op == EPOLL_CTL_ADD) {
112 }
else if (op == EPOLL_CTL_DEL) {
113 assert(ep->num_linux > 0);
120 for (es = s->
eps; es != NULL && es->ep != ep; es = es->so_next);
123 if (op == EPOLL_CTL_ADD || op == EPOLL_CTL_MOD) {
124 em = EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
125 event->events &= (~EPOLLET);
126 if ((event->events & (~em)) != 0) {
127 fprintf(stderr,
"flextcp epoll_ctl: unsupported events: %x\n",
128 (event->events & (~em)));
136 if (op == EPOLL_CTL_ADD) {
148 if ((es = calloc(1,
sizeof(*es))) == NULL) {
156 es->data =
event->data;
157 es->mask =
event->events | EPOLLERR;
162 if (s->
eps == NULL) {
166 es->so_next = s->
eps;
167 s->
eps->so_prev = es;
178 }
else if (op == EPOLL_CTL_MOD) {
188 es->mask =
event->events | EPOLLERR;
192 }
else if (op == EPOLL_CTL_DEL) {
214 flextcp_fd_srelease(fd, s);
216 flextcp_fd_erelease(epfd, ep);
225 struct epoll *ep,
struct epoll_event *events,
int maxevents)
229 uint32_t i, num_active;
235 flextcp_sockctx_poll_n(ctx, maxevents);
238 num_active = ep->num_active;
239 for (i = 0; i < num_active && n < maxevents; i++) {
246 util_prefetch0(es->ep_next);
252 events[n].events = s->
ep_events & es->mask;
253 events[n].data = es->data;
255 es_active_pushback(es);
265 int tas_epoll_wait(
int epfd,
struct epoll_event *events,
int maxevents,
271 uint64_t mtimeout = 0;
272 struct pollfd pfds[2];
274 EPOLL_DEBUG(
"flextcp_epoll_wait(%d, %d, %d)\n", epfd, maxevents, timeout);
276 if (maxevents <= 0) {
281 if (flextcp_fd_elookup(epfd, &ep) != 0) {
286 if(ep->num_tas == 0) {
288 flextcp_fd_erelease(epfd, ep);
289 return tas_libc_epoll_wait(epfd, events, maxevents, timeout);
296 mtimeout = get_msecs() + timeout;
299 ctx = flextcp_sockctx_get();
303 if (ep->num_linux == 0) {
305 n = ep_poll_tas(ctx, epfd, ep, events, maxevents);
306 }
else if (ep->linux_next) {
309 n = tas_libc_epoll_wait(epfd, events, maxevents, 0);
312 if (n >= 0 && n < maxevents) {
313 n += ep_poll_tas(ctx, epfd, ep, events + n, maxevents - n);
318 n = ep_poll_tas(ctx, epfd, ep, events, maxevents);
321 ret = tas_libc_epoll_wait(epfd, events + n, maxevents - n, 0);
330 if (n == 0 && timeout != 0) {
331 uint64_t cur_ms = get_msecs();
332 if ((timeout == -1 || cur_ms < mtimeout) &&
340 pfds[0].events = pfds[1].events = POLLIN;
341 pfds[0].revents = pfds[1].revents = 0;
342 ret = tas_libc_poll(pfds, 2, mtimeout - cur_ms);
344 perror(
"tas_epoll_wait: poll failed");
353 }
while (n == 0 && timeout != 0 && (timeout == -1 || get_msecs() < mtimeout));
356 flextcp_fd_erelease(epfd, ep);
357 EPOLL_DEBUG(
" = %d\n", ret);
361 int tas_epoll_pwait(
int epfd,
struct epoll_event *events,
int maxevents,
362 int timeout,
const sigset_t *sigmask)
364 fprintf(stderr,
"flextcp epoll_pwait: TODO\n");
369 void flextcp_epoll_sockinit(
struct socket *s)
374 s->eps_exc_first = NULL;
375 s->eps_exc_last = NULL;
379 void flextcp_epoll_set(
struct socket *s, uint32_t evts)
386 EPOLL_DEBUG(
"flextcp_epoll_set(%p, %x) ne=%x\n", s, evts, newevs);
393 for (es = s->
eps; es != NULL; es = es->so_next) {
394 if ((newevs & es->mask) == 0) {
402 void flextcp_epoll_clear(
struct socket *s, uint32_t evts)
404 EPOLL_DEBUG(
"flextcp_epoll_clear(%p, %x)\n", s, evts);
408 void flextcp_epoll_sockclose(
struct socket *s)
412 while ((es = s->
eps) != NULL) {
420 void flextcp_epoll_destroy(
struct epoll *ep)
424 assert(ep->refcnt == 0);
445 static inline void es_remove_inactive(
struct epoll_socket *es)
447 struct epoll *ep = es->ep;
449 assert(es->active == 0);
452 if (es->ep_prev != NULL) {
453 es->ep_prev->ep_next = es->ep_next;
459 if (es->ep_next != NULL) {
460 es->ep_next->ep_prev = es->ep_prev;
464 static inline void es_add_inactive(
struct epoll_socket *es)
466 struct epoll *ep = es->ep;
479 static inline void es_remove_active(
struct epoll_socket *es)
481 struct epoll *ep = es->ep;
482 assert(es->active == 1);
483 assert(ep->num_active > 0);
486 if (es->ep_prev != NULL) {
487 es->ep_prev->ep_next = es->ep_next;
493 if (es->ep_next != NULL) {
494 es->ep_next->ep_prev = es->ep_prev;
496 ep->active_last = es->ep_prev;
502 static inline void es_add_active(
struct epoll_socket *es)
504 struct epoll *ep = es->ep;
506 if (ep->active_last == NULL) {
510 ep->active_last = es;
513 es->ep_prev = ep->active_last;
514 ep->active_last->ep_next = es;
515 ep->active_last = es;
524 if (es->active != 0) {
529 es_remove_inactive(es);
537 static inline void es_deactivate(
struct epoll_socket *es)
539 if (es->active == 0) {
544 es_remove_active(es);
552 static inline void es_active_pushback(
struct epoll_socket *es)
554 assert(es->active != 0);
555 es_remove_active(es);
560 static inline void es_remove_ep(
struct epoll_socket *es)
562 if (es->active != 0) {
563 es_remove_active(es);
565 es_remove_inactive(es);
570 static inline void es_remove_sock(
struct epoll_socket *es)
575 if (es->so_prev == NULL) {
576 s->
eps = es->so_next;
578 es->so_prev->so_next = es->so_next;
582 if (es->so_next != NULL) {
583 es->so_next->so_prev = es->so_prev;
int flextcp_context_waitfd(struct flextcp_context *ctx)
struct epoll_socket * inactive
int flextcp_context_canwait(struct flextcp_context *ctx)
struct epoll_socket * active_first
struct epoll_socket * eps
void flextcp_context_waitclear(struct flextcp_context *ctx)