TAS
TCP Acceleration as an OS Service
poll.c
1 /*
2  * Copyright 2020 University of Washington, Max Planck Institute for
3  * Software Systems, and The University of Texas at Austin
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining
6  * a copy of this software and associated documentation files (the
7  * "Software"), to deal in the Software without restriction, including
8  * without limitation the rights to use, copy, modify, merge, publish,
9  * distribute, sublicense, and/or sell copies of the Software, and to
10  * permit persons to whom the Software is furnished to do so, subject to
11  * the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be
14  * included in all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
19  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
20  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
21  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
22  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23  */
24 
25 #define _GNU_SOURCE
26 #include <assert.h>
27 #include <errno.h>
28 #include <stdio.h>
29 #include <sys/epoll.h>
30 #include <poll.h>
31 
32 #include <tas_sockets.h>
33 #include "internal.h"
34 
35 #define SELECT_POLLIN_SET (POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR)
36 #define SELECT_POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR)
37 #define SELECT_POLLEX_SET (POLLPRI)
38 
39 static inline uint32_t events_epoll2poll(uint32_t epoll_event);
40 static int pollfd_cache_alloc(struct sockets_context *ctx, size_t n);
41 static int pollfd_cache_prepare(struct sockets_context *ctx, struct pollfd *fds,
42  nfds_t nfds, nfds_t num_linuxfds);
43 static int selectfd_cache_alloc(struct sockets_context *ctx, size_t n);
44 
45 
46 int tas_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,
47  struct timeval *timeout)
48 {
49  struct sockets_context *ctx;
50  struct pollfd *p;
51  int fd, fd_r, fd_w, fd_e, ret, t;
52  nfds_t i, n = 0;
53 
54  ctx = flextcp_sockctx_getfull();
55 
56  if (selectfd_cache_alloc(ctx, nfds) != 0) {
57  errno = ENOMEM;
58  return -1;
59  }
60 
61  for (fd = 0; fd < nfds; fd++) {
62  p = &ctx->selectfds_cache[n];
63 
64  fd_r = readfds != NULL && FD_ISSET(fd, readfds);
65  fd_w = writefds != NULL && FD_ISSET(fd, writefds);
66  fd_e = exceptfds != NULL && FD_ISSET(fd, exceptfds);
67 
68  if (fd_r || fd_w || fd_e) {
69  p->fd = fd;
70  p->revents = 0;
71  p->events = 0;
72  n++;
73  }
74 
75  if (fd_r)
76  p->events |= SELECT_POLLIN_SET;
77  if (fd_w)
78  p->events |= SELECT_POLLOUT_SET;
79  if (fd_e)
80  p->events |= SELECT_POLLEX_SET;
81  }
82 
83  if (timeout == NULL) {
84  t = -1;
85  } else {
86  t = timeout->tv_sec * 1000 + timeout->tv_usec / 1000;
87  }
88 
89  ret = tas_poll(ctx->selectfds_cache, n, t);
90 
91  if (ret < 0)
92  return ret;
93 
94  for (i = 0; i < n; i++) {
95  p = &ctx->selectfds_cache[i];
96  fd = p->fd;
97 
98  if (readfds != NULL && FD_ISSET(fd, readfds)) {
99  if ((p->revents & SELECT_POLLIN_SET) != 0)
100  FD_SET(fd, readfds);
101  else
102  FD_CLR(fd, readfds);
103  }
104 
105  if (writefds != NULL && FD_ISSET(fd, writefds)) {
106  if ((p->revents & SELECT_POLLOUT_SET) != 0)
107  FD_SET(fd, writefds);
108  else
109  FD_CLR(fd, writefds);
110  }
111 
112  if (exceptfds != NULL && FD_ISSET(fd, exceptfds)) {
113  if ((p->revents & SELECT_POLLEX_SET) != 0)
114  FD_SET(fd, exceptfds);
115  else
116  FD_CLR(fd, exceptfds);
117  }
118  }
119 
120  return ret;
121 }
122 
123 int tas_pselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,
124  const struct timespec *timeout, const sigset_t *sigmask)
125 {
126  assert(!"NYI");
127  errno = ENOTSUP;
128  return -1;
129 }
130 
131 int tas_poll(struct pollfd *fds, nfds_t nfds, int timeout)
132 {
133  struct sockets_context *ctx;
134  struct socket *s;
135  struct pollfd *p;
136  nfds_t nfds_linux, nfds_tas, i;
137  uint32_t s_events;
138  int ret, active_fds, mixed_events = 0, j, block_ms;
139  uint64_t mtimeout = 0, cur_ts;
140  int first = 1;
141 
142  ctx = flextcp_sockctx_getfull();
143 
144  if (timeout != 0 && timeout != -1)
145  mtimeout = get_msecs() + timeout;
146 
147  do {
148 again:
149  flextcp_sockctx_poll_n(&ctx->ctx, nfds);
150 
151  nfds_linux = nfds_tas = 0;
152  active_fds = 0;
153  /* first process any tas fds */
154  for (i = 0; i < nfds; i++) {
155  p = &fds[i];
156 
157  if (flextcp_fd_slookup(p->fd, &s) != 0) {
158  /* this is a linux fd */
159  p->revents = 0;
160  nfds_linux++;
161  continue;
162  }
163  nfds_tas++;
164 
165  s_events = s->ep_events;
166  flextcp_fd_srelease(p->fd, s);
167 
168  if ((p->events & ~(POLLIN | POLLPRI | POLLOUT | POLLRDHUP | POLLERR |
169  POLLHUP | POLLRDNORM | POLLWRNORM | POLLNVAL | POLLRDBAND |
170  POLLWRBAND | POLLERR)) != 0) {
171  errno = EINVAL;
172  fprintf(stderr, "tas_pselect: unsupported fd flags (%x)\n", p->events);
173  return -1;
174  }
175 
176  p->revents = p->events & events_epoll2poll(s_events);
177  if (p->revents != 0)
178  active_fds++;
179  }
180 
181  /* if we have TAS fds, calculate how long we can block for */
182  block_ms = 0;
183  if (nfds_tas > 0 && active_fds == 0 && mixed_events == 0 && timeout != 0) {
184  if (timeout == -1) {
185  block_ms = -1;
186  } else {
187  cur_ts = get_msecs();
188  if (cur_ts < mtimeout) {
189  block_ms = mtimeout - cur_ts;
190  }
191  }
192  }
193 
194  /* now look at linux fds */
195  if (nfds_linux > 0 && nfds_tas == 0) {
196  /* only linux, this is the easy case -> just call into linux */
197  return tas_libc_poll(fds, nfds, timeout);
198  } else if (nfds_linux > 0 && mixed_events == 0) {
199  /* mixed tas and linux, this is annoying as we need to separate out linux
200  * fds */
201 
202  /* the first iteration, we first need to copy only the linux fds into the
203  * pollfd_cache */
204  if (first) {
205  if (pollfd_cache_prepare(ctx, fds, nfds, nfds_linux) != 0) {
206  errno = ENOMEM;
207  return -1;
208  }
209  first = 0;
210  }
211 
212  if (block_ms == 0 || flextcp_context_canwait(&ctx->ctx) != 0) {
213  /* we're not blocking */
214  ret = tas_libc_poll(ctx->pollfds_cache, nfds_linux, 0);
215  if (ret < 0) {
216  perror("tas_poll: mixed poll, linux poll failed");
217  return -1;
218  }
219 
220  mixed_events = ret;
221  } else {
222  /* we're blocking */
223  ret = tas_libc_poll(ctx->pollfds_cache, nfds_linux + 1, block_ms);
224  if (ret < 0) {
225  perror("tas_poll: mixed poll, linux poll failed");
226  return -1;
227  }
228 
229  /* check if the TAS ctx fd is active */
230  if (ret > 0 && ctx->pollfds_cache[nfds_linux].revents != 0) {
231  /* ctx fd is active */
232  flextcp_context_waitclear(&ctx->ctx);
233  mixed_events = ret - 1;
234 
235  /* and here we go poll the TAS fds again, regardless of whether we
236  * have linux events or not */
237  goto again;
238  } else {
239  mixed_events = ret;
240  }
241  }
242  } else if (nfds_tas > 0 && nfds_linux == 0 && block_ms != 0) {
243  /* just tas fds, can block if needed */
244  flextcp_context_wait(&ctx->ctx, block_ms);
245  }
246  } while (active_fds == 0 && mixed_events == 0 && timeout != 0 &&
247  (timeout == -1 || get_msecs() < mtimeout));
248 
249  /* copy events from linux fds over */
250  if (mixed_events > 0) {
251  for (j = 0, i = 0; i < nfds; i++) {
252  p = &fds[i];
253 
254  if (flextcp_fd_slookup(p->fd, &s) == 0) {
255  /* this is a tas fd */
256  flextcp_fd_srelease(p->fd, s);
257  continue;
258  }
259 
260  if (p->fd != ctx->pollfds_cache[j].fd) {
261  fprintf(stderr, "tas_poll: fds in poll cache changed? something is "
262  "wrong\n");
263  abort();
264  }
265 
266  p->revents = ctx->pollfds_cache[j].revents;
267  j++;
268  }
269  }
270 
271  return active_fds + mixed_events;
272 }
273 
274 int tas_ppoll(struct pollfd *fds, nfds_t nfds, const struct timespec *tmo_p,
275  const sigset_t *sigmask)
276 {
277  assert(!"NYI");
278  errno = ENOTSUP;
279  return -1;
280 }
281 
282 static inline uint32_t events_epoll2poll(uint32_t epoll_event)
283 {
284  uint32_t poll_ev = 0;
285 
286  if ((epoll_event & EPOLLIN) != 0)
287  poll_ev |= POLLIN | POLLRDNORM;
288  if ((epoll_event & EPOLLOUT) != 0)
289  poll_ev |= POLLOUT | POLLWRNORM;
290  if ((epoll_event & EPOLLRDHUP) != 0)
291  poll_ev |= POLLRDHUP;
292  if ((epoll_event & EPOLLPRI) != 0)
293  poll_ev |= POLLPRI;
294  if ((epoll_event & EPOLLERR) != 0)
295  poll_ev |= POLLERR;
296  if ((epoll_event & EPOLLHUP) != 0)
297  poll_ev |= POLLHUP;
298 
299  return poll_ev;
300 }
301 
302 /* make sure that ctx->pollfds_cache can hold at least n entries */
303 static int pollfd_cache_alloc(struct sockets_context *ctx, size_t n)
304 {
305  void *ptr;
306  size_t size, cnt = ctx->pollfds_cache_size;
307 
308  if (cnt >= n) {
309  return 0;
310  }
311 
312 
313  /* set initial size to 16 */
314  if (cnt == 0)
315  cnt = 16;
316 
317  /* double size till we have enough to keep it a power of 2 */
318  while (cnt < n)
319  cnt *= 2;
320 
321  size = cnt * sizeof(struct pollfd);
322  if ((ptr = realloc(ctx->pollfds_cache, size)) == NULL) {
323  perror("pollfd_cache_alloc: alloc failed");
324  return -1;
325  }
326 
327  ctx->pollfds_cache = ptr;
328  ctx->pollfds_cache_size = cnt;
329  return 0;
330 }
331 
332 /* copy subset of linux fds from fds to pollfd cache. check that number matches
333  * num_linuxfds_confirm. also adds TAS wait fd as last entry. */
334 static int pollfd_cache_prepare(struct sockets_context *ctx, struct pollfd *fds,
335  nfds_t nfds, nfds_t num_linuxfds_confirm)
336 {
337  struct socket *s;
338  struct pollfd *p;
339  nfds_t i, num_linux = 0;
340 
341  if (pollfd_cache_alloc(ctx, num_linuxfds_confirm + 1) != 0)
342  return -1;
343 
344  /* copy linux fds to pollfds_cache */
345  for (i = 0; i < nfds; i++) {
346  p = &fds[i];
347 
348  if (flextcp_fd_slookup(p->fd, &s) == 0) {
349  /* this is a tas fd */
350  flextcp_fd_srelease(p->fd, s);
351  continue;
352  }
353 
354  if (num_linux >= num_linuxfds_confirm) {
355  fprintf(stderr, "pollfd_cache_prepare: number of linux fds changed?\n");
356  return -1;
357  }
358 
359  ctx->pollfds_cache[num_linux] = *p;
360  num_linux++;
361  }
362 
363  ctx->pollfds_cache[num_linux].fd = flextcp_context_waitfd(&ctx->ctx);
364  ctx->pollfds_cache[num_linux].events = POLLIN;
365  ctx->pollfds_cache[num_linux].revents= 0;
366 
367  return 0;
368 }
369 
370 /* make sure that ctx->selectfds_cache can hold at least n entries */
371 static int selectfd_cache_alloc(struct sockets_context *ctx, size_t n)
372 {
373  void *ptr;
374  size_t size, cnt = ctx->selectfds_cache_size;
375 
376  if (cnt >= n) {
377  return 0;
378  }
379 
380  /* set initial size to 16 */
381  if (cnt == 0)
382  cnt = 16;
383 
384  /* double size till we have enough to keep it a power of 2 */
385  while (cnt < n)
386  cnt *= 2;
387 
388  size = cnt * sizeof(struct pollfd);
389  if ((ptr = realloc(ctx->selectfds_cache, size)) == NULL) {
390  perror("selectfd_cache_alloc: alloc failed");
391  return -1;
392  }
393 
394  ctx->selectfds_cache = ptr;
395  ctx->selectfds_cache_size = cnt;
396  return 0;
397 }
int flextcp_context_wait(struct flextcp_context *ctx, int timeout_ms)
Definition: init.c:995
int flextcp_context_waitfd(struct flextcp_context *ctx)
Definition: init.c:925
int flextcp_context_canwait(struct flextcp_context *ctx)
Definition: init.c:930
void flextcp_context_waitclear(struct flextcp_context *ctx)
Definition: init.c:979
TAS sockets emulation.
uint32_t ep_events
Definition: internal.h:113