TAS
TCP Acceleration as an OS Service
epoll.c
1 /*
2  * Copyright 2019 University of Washington, Max Planck Institute for
3  * Software Systems, and The University of Texas at Austin
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining
6  * a copy of this software and associated documentation files (the
7  * "Software"), to deal in the Software without restriction, including
8  * without limitation the rights to use, copy, modify, merge, publish,
9  * distribute, sublicense, and/or sell copies of the Software, and to
10  * permit persons to whom the Software is furnished to do so, subject to
11  * the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be
14  * included in all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
19  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
20  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
21  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
22  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23  */
24 
25 #define _GNU_SOURCE
26 #include <assert.h>
27 #include <errno.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <sys/epoll.h>
32 #include <dlfcn.h>
33 #include <utils.h>
34 
35 #include <tas_sockets.h>
36 #include "internal.h"
37 
38 #define LINUX_POLL_DELAY 10
39 
40 #define EPOLL_DEBUG(x...) do {} while (0)
41 //#define EPOLL_DEBUG(x...) fprintf(stderr, x)
42 
43 static inline void es_add_inactive(struct epoll_socket *es);
44 static inline void es_activate(struct epoll_socket *es);
45 static inline void es_deactivate(struct epoll_socket *es);
46 static inline void es_active_pushback(struct epoll_socket *es);
47 static inline void es_remove_ep(struct epoll_socket *es);
48 static inline void es_remove_sock(struct epoll_socket *es);
49 
50 int tas_epoll_create(int size)
51 {
52  if (size <= 0) {
53  errno = EINVAL;
54  return -1;
55  }
56  return tas_epoll_create1(0);
57 }
58 
59 int tas_epoll_create1(int flags)
60 {
61  int fd;
62  struct epoll *ep;
63 
64  if ((fd = tas_libc_epoll_create1(flags)) == -1) {
65  return -1;
66  }
67 
68  if (flextcp_fd_ealloc(&ep, fd) < 0) {
69  tas_libc_close(fd);
70  return -1;
71  }
72 
73  ep->inactive = NULL;
74  ep->active_first = NULL;
75  ep->active_last = NULL;
76  ep->num_linux = 0;
77  ep->num_tas = 0;
78  ep->linux_next = 0;
79 
80  flextcp_fd_erelease(fd, ep);
81  return fd;
82 }
83 
84 int tas_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
85 {
86  struct epoll *ep;
87  struct socket *s;
88  struct epoll_socket *es;
89  int ret = 0;
90  uint32_t em;
91  int linux_fd = 0;
92 
93  EPOLL_DEBUG("flextcp_epoll_ctl(%d, %d, %d, {events=%x})\n", epfd, op,
94  fd, (event != NULL ? event->events : -1));
95 
96  if (flextcp_fd_elookup(epfd, &ep) != 0) {
97  errno = EBADF;
98  return -1;
99  }
100 
101  /* handle linux fds */
102  if (flextcp_fd_slookup(fd, &s) != 0) {
103  linux_fd = 1;
104  /* this is a linux fd */
105  if ((ret = tas_libc_epoll_ctl(epfd, op, fd, event)) != 0) {
106  goto out;
107  }
108 
109  /* make sure num_linux is accurate */
110  if (op == EPOLL_CTL_ADD) {
111  ep->num_linux++;
112  } else if (op == EPOLL_CTL_DEL) {
113  assert(ep->num_linux > 0);
114  ep->num_linux--;
115  }
116  goto out;
117  }
118 
119  /* look up socket on epoll */
120  for (es = s->eps; es != NULL && es->ep != ep; es = es->so_next);
121 
122  /* validate events */
123  if (op == EPOLL_CTL_ADD || op == EPOLL_CTL_MOD) {
124  em = EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
125  event->events &= (~EPOLLET); // XXX: Mask edge-triggered
126  if ((event->events & (~em)) != 0) {
127  fprintf(stderr, "flextcp epoll_ctl: unsupported events: %x\n",
128  (event->events & (~em)));
129  errno = EINVAL;
130  ret = -1;
131  goto out_sock;
132  }
133  }
134 
135  /* execute operation */
136  if (op == EPOLL_CTL_ADD) {
137  ep->num_tas++;
138 
139  /* add fd to epoll */
140  if (es != NULL) {
141  /* socket not on this epoll */
142  errno = EEXIST;
143  ret = -1;
144  goto out_sock;
145  }
146 
147  /* allocate epoll_socket */
148  if ((es = calloc(1, sizeof(*es))) == NULL) {
149  errno = ENOMEM;
150  ret = -1;
151  goto out_sock;
152  }
153 
154  es->ep = ep;
155  es->s = s;
156  es->data = event->data;
157  es->mask = event->events | EPOLLERR;
158  es->active = 0;
159 
160  /* add to list on socket */
161  es->so_prev = NULL;
162  if (s->eps == NULL) {
163  es->so_next = NULL;
164  s->eps = es;
165  } else {
166  es->so_next = s->eps;
167  s->eps->so_prev = es;
168  s->eps = es;
169  }
170 
171  /* add to inactive queue */
172  es_add_inactive(es);
173 
174  /* check if this epoll is already active */
175  if ((s->ep_events & es->mask) != 0) {
176  es_activate(es);
177  }
178  } else if (op == EPOLL_CTL_MOD) {
179  /* modify fd in epoll */
180 
181  if (es == NULL) {
182  /* socket not on this epoll */
183  errno = ENOENT;
184  ret = -1;
185  goto out_sock;
186  }
187 
188  es->mask = event->events | EPOLLERR;
189  if ((s->ep_events & es->mask) != 0) {
190  es_activate(es);
191  }
192  } else if (op == EPOLL_CTL_DEL) {
193  /* remove fd from epoll */
194  ep->num_tas--;
195 
196  if (es == NULL) {
197  /* socket not on this epoll */
198  errno = ENOENT;
199  ret = -1;
200  goto out_sock;
201  }
202 
203  es_remove_sock(es);
204  es_remove_ep(es);
205  free(es);
206  } else {
207  /* unknown operation */
208  errno = EINVAL;
209  ret = -1;
210  goto out_sock;
211  }
212 
213 out_sock:
214  flextcp_fd_srelease(fd, s);
215 out:
216  flextcp_fd_erelease(epfd, ep);
217 
218  if (!linux_fd)
219  tas_move_conn(fd);
220 
221  return ret;
222 }
223 
224 static unsigned ep_poll_tas(struct flextcp_context *ctx, int epfd,
225  struct epoll *ep, struct epoll_event *events, int maxevents)
226 {
227  struct epoll_socket *es;
228  struct socket *s;
229  uint32_t i, num_active;
230  unsigned n = 0;
231 
232  /* make sure to poll for some events even if there is already enough on the
233  * epoll */
234  epoll_unlock(ep);
235  flextcp_sockctx_poll_n(ctx, maxevents);
236  epoll_lock(ep);
237 
238  num_active = ep->num_active;
239  for (i = 0; i < num_active && n < maxevents; i++) {
240  es = ep->active_first;
241  if (es == NULL) {
242  /* no more active fds */
243  break;
244  }
245 
246  util_prefetch0(es->ep_next);
247 
248  /* check whether fd is actually active */
249  s = es->s;
250  socket_lock(s);
251  if ((s->ep_events & es->mask) != 0) {
252  events[n].events = s->ep_events & es->mask;
253  events[n].data = es->data;
254  n++;
255  es_active_pushback(es);
256  } else {
257  es_deactivate(es);
258  }
259  socket_unlock(s);
260  }
261 
262  return n;
263 }
264 
265 int tas_epoll_wait(int epfd, struct epoll_event *events, int maxevents,
266  int timeout)
267 {
268  struct flextcp_context *ctx;
269  struct epoll *ep;
270  int ret = 0, n = 0;
271  uint64_t mtimeout = 0;
272  struct pollfd pfds[2];
273 
274  EPOLL_DEBUG("flextcp_epoll_wait(%d, %d, %d)\n", epfd, maxevents, timeout);
275 
276  if (maxevents <= 0) {
277  errno = EINVAL;
278  return -1;
279  }
280 
281  if (flextcp_fd_elookup(epfd, &ep) != 0) {
282  errno = EBADF;
283  return -1;
284  }
285 
286  if(ep->num_tas == 0) {
287  /* no TAS fds on the epoll, go straight to linux */
288  flextcp_fd_erelease(epfd, ep);
289  return tas_libc_epoll_wait(epfd, events, maxevents, timeout);
290  }
291 
292  util_prefetch0(ep->active_first);
293 
294  /* calculate timeout */
295  if (timeout > 0) {
296  mtimeout = get_msecs() + timeout;
297  }
298 
299  ctx = flextcp_sockctx_get();
300 
301  do {
302 again:
303  if (ep->num_linux == 0) {
304  /* only tas FDs, we can block as we want */
305  n = ep_poll_tas(ctx, epfd, ep, events, maxevents);
306  } else if (ep->linux_next) {
307  /* start by polling linux and then TAS if there is space */
308  epoll_unlock(ep);
309  n = tas_libc_epoll_wait(epfd, events, maxevents, 0);
310  epoll_lock(ep);
311 
312  if (n >= 0 && n < maxevents) {
313  n += ep_poll_tas(ctx, epfd, ep, events + n, maxevents - n);
314  }
315  ep->linux_next = 0;
316  } else {
317  /* poll tas first */
318  n = ep_poll_tas(ctx, epfd, ep, events, maxevents);
319  if (n < maxevents) {
320  epoll_unlock(ep);
321  ret = tas_libc_epoll_wait(epfd, events + n, maxevents - n, 0);
322  epoll_lock(ep);
323  if (ret >= 0)
324  n += ret;
325  }
326  ep->linux_next = 1;
327  }
328 
329  /* block thread if nothing received for a while */
330  if (n == 0 && timeout != 0) {
331  uint64_t cur_ms = get_msecs();
332  if ((timeout == -1 || cur_ms < mtimeout) &&
333  flextcp_context_canwait(ctx) == 0)
334  {
335  epoll_unlock(ep);
336 
337  /* we wait for events on the linux epoll fd and the tas event fd */
338  pfds[0].fd = epfd;
339  pfds[1].fd = flextcp_context_waitfd(ctx);
340  pfds[0].events = pfds[1].events = POLLIN;
341  pfds[0].revents = pfds[1].revents = 0;
342  ret = tas_libc_poll(pfds, 2, mtimeout - cur_ms);
343  if (ret < 0) {
344  perror("tas_epoll_wait: poll failed");
345  return -1;
346  }
347 
349  epoll_lock(ep);
350  goto again;
351  }
352  }
353  } while (n == 0 && timeout != 0 && (timeout == -1 || get_msecs() < mtimeout));
354 
355  ret = n;
356  flextcp_fd_erelease(epfd, ep);
357  EPOLL_DEBUG(" = %d\n", ret);
358  return ret;
359 }
360 
361 int tas_epoll_pwait(int epfd, struct epoll_event *events, int maxevents,
362  int timeout, const sigset_t *sigmask)
363 {
364  fprintf(stderr, "flextcp epoll_pwait: TODO\n");
365  errno = EINVAL;
366  return -1;
367 }
368 
369 void flextcp_epoll_sockinit(struct socket *s)
370 {
371  s->ep_events = 0;
372  s->eps = NULL;
373 #if 0
374  s->eps_exc_first = NULL;
375  s->eps_exc_last = NULL;
376 #endif
377 }
378 
379 void flextcp_epoll_set(struct socket *s, uint32_t evts)
380 {
381  uint32_t newevs;
382  struct epoll_socket *es;
383 
384  newevs = (~s->ep_events) & evts;
385 
386  EPOLL_DEBUG("flextcp_epoll_set(%p, %x) ne=%x\n", s, evts, newevs);
387  if (newevs == 0) {
388  /* no new events */
389  return;
390  }
391  s->ep_events |= evts;
392 
393  for (es = s->eps; es != NULL; es = es->so_next) {
394  if ((newevs & es->mask) == 0) {
395  continue;
396  }
397 
398  es_activate(es);
399  }
400 }
401 
402 void flextcp_epoll_clear(struct socket *s, uint32_t evts)
403 {
404  EPOLL_DEBUG("flextcp_epoll_clear(%p, %x)\n", s, evts);
405  s->ep_events &= ~evts;
406 }
407 
408 void flextcp_epoll_sockclose(struct socket *s)
409 {
410  struct epoll_socket *es;
411 
412  while ((es = s->eps) != NULL) {
413  es_remove_sock(es);
414  es_remove_ep(es);
415  free(es);
416  }
417 }
418 
419 /* destroy epoll after the underlying file descriptor is already closed */
420 void flextcp_epoll_destroy(struct epoll *ep)
421 {
422  struct epoll_socket *es;
423 
424  assert(ep->refcnt == 0);
425 
426  /* remove inactive epoll socket bindings */
427  while((es = ep->active_first) != NULL){
428  es = ep->active_first;
429  es_remove_sock(es);
430  es_remove_ep(es);
431  free(es);
432  }
433 
434  /* remove active epoll socket bindings */
435  while((es = ep->inactive) != NULL){
436  es_remove_sock(es);
437  es_remove_ep(es);
438  free(es);
439  }
440 
441  free(ep);
442 }
443 
444 /* remove es from epoll's inactive list */
445 static inline void es_remove_inactive(struct epoll_socket *es)
446 {
447  struct epoll *ep = es->ep;
448 
449  assert(es->active == 0);
450 
451  /* update predecessor's next pointer */
452  if (es->ep_prev != NULL) {
453  es->ep_prev->ep_next = es->ep_next;
454  } else {
455  ep->inactive = es->ep_next;
456  }
457 
458  /* update successor's prev pointer */
459  if (es->ep_next != NULL) {
460  es->ep_next->ep_prev = es->ep_prev;
461  }
462 }
463 
464 static inline void es_add_inactive(struct epoll_socket *es)
465 {
466  struct epoll *ep = es->ep;
467 
468  es->ep_prev = NULL;
469  if (ep->inactive == NULL) {
470  es->ep_next = NULL;
471  } else {
472  es->ep_next = ep->inactive;
473  ep->inactive->ep_prev = es;
474  }
475  ep->inactive = es;
476 }
477 
478 /* remove es from epoll's active list */
479 static inline void es_remove_active(struct epoll_socket *es)
480 {
481  struct epoll *ep = es->ep;
482  assert(es->active == 1);
483  assert(ep->num_active > 0);
484 
485  /* update predecessor's next pointer */
486  if (es->ep_prev != NULL) {
487  es->ep_prev->ep_next = es->ep_next;
488  } else {
489  ep->active_first = es->ep_next;
490  }
491 
492  /* update successor's prev pointer */
493  if (es->ep_next != NULL) {
494  es->ep_next->ep_prev = es->ep_prev;
495  } else {
496  ep->active_last = es->ep_prev;
497  }
498 
499  ep->num_active--;
500 }
501 
502 static inline void es_add_active(struct epoll_socket *es)
503 {
504  struct epoll *ep = es->ep;
505 
506  if (ep->active_last == NULL) {
507  es->ep_prev = NULL;
508  es->ep_next = NULL;
509  ep->active_first = es;
510  ep->active_last = es;
511  } else {
512  es->ep_next = NULL;
513  es->ep_prev = ep->active_last;
514  ep->active_last->ep_next = es;
515  ep->active_last = es;
516  }
517 
518  ep->num_active++;
519 }
520 
521 /* ensure es is active */
522 static inline void es_activate(struct epoll_socket *es)
523 {
524  if (es->active != 0) {
525  return;
526  }
527 
528  /* remove from inactive list */
529  es_remove_inactive(es);
530 
531  es->active = 1;
532 
533  /* add to end of active list */
534  es_add_active(es);
535 }
536 
537 static inline void es_deactivate(struct epoll_socket *es)
538 {
539  if (es->active == 0) {
540  return;
541  }
542 
543  /* remove from active list */
544  es_remove_active(es);
545 
546  es->active = 0;
547 
548  /* add to end of inactive list */
549  es_add_inactive(es);
550 }
551 
552 static inline void es_active_pushback(struct epoll_socket *es)
553 {
554  assert(es->active != 0);
555  es_remove_active(es);
556  es_add_active(es);
557 }
558 
559 /* remove es from epoll lists */
560 static inline void es_remove_ep(struct epoll_socket *es)
561 {
562  if (es->active != 0) {
563  es_remove_active(es);
564  } else {
565  es_remove_inactive(es);
566  }
567 }
568 
569 /* remove es from socket lists */
570 static inline void es_remove_sock(struct epoll_socket *es)
571 {
572  struct socket *s = es->s;
573 
574  /* update predecessor's next pointer on socket list */
575  if (es->so_prev == NULL) {
576  s->eps = es->so_next;
577  } else {
578  es->so_prev->so_next = es->so_next;
579  }
580 
581  /* update successor's prev pointer on socket list */
582  if (es->so_next != NULL) {
583  es->so_next->so_prev = es->so_prev;
584  }
585 }
int flextcp_context_waitfd(struct flextcp_context *ctx)
Definition: init.c:925
struct epoll_socket * inactive
Definition: internal.h:126
int flextcp_context_canwait(struct flextcp_context *ctx)
Definition: init.c:930
struct epoll_socket * active_first
Definition: internal.h:128
struct epoll_socket * eps
Definition: internal.h:115
void flextcp_context_waitclear(struct flextcp_context *ctx)
Definition: init.c:979
TAS sockets emulation.
uint32_t ep_events
Definition: internal.h:113
static int epfd
Definition: appif.c:80