TAS
TCP Acceleration as an OS Service
appif.c
Go to the documentation of this file.
1 /*
2  * Copyright 2019 University of Washington, Max Planck Institute for
3  * Software Systems, and The University of Texas at Austin
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining
6  * a copy of this software and associated documentation files (the
7  * "Software"), to deal in the Software without restriction, including
8  * without limitation the rights to use, copy, modify, merge, publish,
9  * distribute, sublicense, and/or sell copies of the Software, and to
10  * permit persons to whom the Software is furnished to do so, subject to
11  * the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be
14  * included in all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
19  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
20  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
21  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
22  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23  */
24 
45 #include <stdlib.h>
46 #include <stdio.h>
47 #include <stdbool.h>
48 #include <errno.h>
49 #include <unistd.h>
50 #include <pthread.h>
51 #include <sys/un.h>
52 #include <sys/socket.h>
53 #include <sys/epoll.h>
54 #include <sys/eventfd.h>
55 
56 #include <tas.h>
57 #include "internal.h"
58 #include "appif.h"
59 #include <kernel_appif.h>
60 #include <utils_nbqueue.h>
61 #include <tas_memif.h>
62 #include <fastpath.h>
63 
65 #define EP_LISTEN (NULL)
66 
67 #define EP_NOTIFY ((void *) (-1))
68 
69 static int uxsocket_init(void);
70 static void *uxsocket_thread(void *arg);
71 static void uxsocket_accept(void);
72 static void uxsocket_notify(void);
73 static void uxsocket_error(struct application *app);
74 static void uxsocket_receive(struct application *app);
75 static void uxsocket_notify_app(struct application *app);
76 
78 static int uxfd = -1;
80 static int epfd = -1;
82 static int notifyfd = -1;
84 static struct nbqueue poll_to_ux;
86 static struct nbqueue ux_to_poll;
87 
89 static pthread_t pt_ux;
90 
92 static struct app_doorbell *free_doorbells = NULL;
94 static uint16_t app_id_next = 0;
95 
97 static struct application *applications = NULL;
98 
99 
100 int appif_init(void)
101 {
102  struct app_doorbell *adb;
103  uint32_t i;
104 
105  if (uxsocket_init()) {
106  return -1;
107  }
108 
109  /* create freelist of doorbells (0 is used by kernel) */
110  for (i = FLEXNIC_PL_APPST_CTX_NUM; i > 0; i--) {
111  if ((adb = malloc(sizeof(*adb))) == NULL) {
112  perror("appif_init: malloc doorbell failed");
113  return -1;
114  }
115  adb->id = i;
116  adb->next = free_doorbells;
117  free_doorbells = adb;
118  }
119 
120  nbqueue_init(&ux_to_poll);
121  nbqueue_init(&poll_to_ux);
122 
123  if (pthread_create(&pt_ux, NULL, uxsocket_thread, NULL) != 0) {
124  return -1;
125  }
126 
127  return 0;
128 }
129 
130 unsigned appif_poll(void)
131 {
132  uint8_t *p;
133  struct application *app;
134  struct app_context *ctx;
135  ssize_t ret;
136  uint16_t i;
137  uint64_t rxq_offs[tas_info->cores_num], txq_offs[tas_info->cores_num];
138  uint64_t cnt = 1;
139  unsigned n = 0;
140 
141  /* add new applications to list */
142  while ((p = nbqueue_deq(&ux_to_poll)) != NULL) {
143  app = (struct application *) (p - offsetof(struct application, nqe));
144  app->next = applications;
145  applications = app;
146  }
147 
148  for (app = applications; app != NULL; app = app->next) {
149  /* register context with NIC */
150  if (app->need_reg_ctx != NULL) {
151  ctx = app->need_reg_ctx;
152  app->need_reg_ctx = NULL;
153 
154  for (i = 0; i < tas_info->cores_num; i++) {
155  rxq_offs[i] = app->resp->flexnic_qs[i].rxq_off;
156  txq_offs[i] = app->resp->flexnic_qs[i].txq_off;
157  }
158 
159  if (nicif_appctx_add(app->id, ctx->doorbell->id, rxq_offs,
160  app->req.rxq_len, txq_offs, app->req.txq_len, ctx->evfd) != 0)
161  {
162  fprintf(stderr, "appif_poll: registering context failed\n");
163  uxsocket_error(app);
164  continue;
165  }
166 
167  nbqueue_enq(&poll_to_ux, &app->comp.el);
168  ret = write(notifyfd, &cnt, sizeof(cnt));
169  if (ret <= 0) {
170  perror("appif_poll: error writing to notify fd");
171  }
172  }
173 
174  for (ctx = app->contexts; ctx != NULL; ctx = ctx->next) {
175  if (ctx->ready == 0) {
176  continue;
177  }
178  n += appif_ctx_poll(app, ctx);
179  }
180  }
181 
182  return n;
183 }
184 
185 
186 static int uxsocket_init(void)
187 {
188  int fd, efd, nfd;
189  struct sockaddr_un saun;
190  struct epoll_event ev;
191 
192  if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
193  perror("uxsocket_init: socket failed");
194  goto error_exit;
195  }
196 
197  memset(&saun, 0, sizeof(saun));
198  saun.sun_family = AF_UNIX;
199  memcpy(saun.sun_path, KERNEL_SOCKET_PATH, sizeof(KERNEL_SOCKET_PATH));
200  if (bind(fd, (struct sockaddr *) &saun, sizeof(saun))) {
201  perror("uxsocket_init: bind failed");
202  goto error_close;
203  }
204 
205  if (listen(fd, 5)) {
206  perror("uxsocket_init: listen failed");
207  goto error_close;
208  }
209 
210  if ((nfd = eventfd(0, EFD_NONBLOCK)) == -1) {
211  perror("uxsocket_init: eventfd failed");
212  }
213 
214  if ((efd = epoll_create1(0)) == -1) {
215  perror("uxsocket_init: epoll_create1 failed");
216  goto error_close;
217  }
218 
219  ev.events = EPOLLIN;
220  ev.data.ptr = EP_LISTEN;
221  if (epoll_ctl(efd, EPOLL_CTL_ADD, fd, &ev) != 0) {
222  perror("uxsocket_init: epoll_ctl listen failed");
223  goto error_close_ep;
224  }
225 
226  ev.events = EPOLLIN;
227  ev.data.ptr = EP_NOTIFY;
228  if (epoll_ctl(efd, EPOLL_CTL_ADD, nfd, &ev) != 0) {
229  perror("uxsocket_init: epoll_ctl notify failed");
230  goto error_close_ep;
231  }
232 
233  uxfd = fd;
234  epfd = efd;
235  notifyfd = nfd;
236  return 0;
237 
238 error_close_ep:
239  close(efd);
240 error_close:
241  close(fd);
242 error_exit:
243  return -1;
244 }
245 
246 static void *uxsocket_thread(void *arg)
247 {
248  int n, i;
249  struct epoll_event evs[32];
250  struct application *app;
251 
252  while (1) {
253  again:
254  n = epoll_wait(epfd, evs, 32, -1);
255  if (n < 0) {
256  if(errno == EINTR) {
257  // XXX: To support attaching GDB
258  goto again;
259  }
260  perror("uxsocket_thread: epoll_wait");
261  abort();
262  }
263 
264  for (i = 0; i < n; i++) {
265  app = evs[i].data.ptr;
266  if (app == EP_LISTEN) {
267  uxsocket_accept();
268  } else if (app == EP_NOTIFY) {
269  uxsocket_notify();
270  } else {
271  if ((evs[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) != 0) {
272  uxsocket_error(app);
273  } else if ((evs[i].events & EPOLLIN) != 0) {
274  uxsocket_receive(app);
275  }
276  }
277  }
278 
279  /* signal main slowpath thread */
280  notify_slowpath_core();
281  }
282 
283  return NULL;
284 }
285 
286 static void uxsocket_accept(void)
287 {
288  int cfd, *pfd;
289  struct application *app;
290  struct epoll_event ev;
291  size_t sz;
292  ssize_t tx;
293  uint32_t off, j, n;
294  uint8_t b = 0;
295 
296  /* new connection on unix socket */
297  if ((cfd = accept(uxfd, NULL, NULL)) < 0) {
298  fprintf(stderr, "uxsocket_accept: accept failed\n");
299  return;
300  }
301 
302  struct iovec iov = {
303  .iov_base = &tas_info->cores_num,
304  .iov_len = sizeof(uint32_t),
305  };
306  union {
307  char buf[CMSG_SPACE(sizeof(int) * 4)];
308  struct cmsghdr align;
309  } u;
310  struct msghdr msg = {
311  .msg_name = NULL,
312  .msg_namelen = 0,
313  .msg_iov = &iov,
314  .msg_iovlen = 1,
315  .msg_control = u.buf,
316  .msg_controllen = sizeof(u.buf),
317  .msg_flags = 0,
318  };
319 
320  struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
321  cmsg->cmsg_level = SOL_SOCKET;
322  cmsg->cmsg_type = SCM_RIGHTS;
323  cmsg->cmsg_len = CMSG_LEN(sizeof(int));
324 
325  pfd = (int *) CMSG_DATA(cmsg);
326  *pfd = kernel_notifyfd;
327 
328  /* send out kernel notify fd */
329  if((tx = sendmsg(cfd, &msg, 0)) != sizeof(uint32_t)) {
330  fprintf(stderr, "tx == %zd\n", tx);
331  if(tx == -1) {
332  fprintf(stderr, "errno == %d\n", errno);
333  }
334  }
335 
336  /* send out fast path fds */
337  off = 0;
338  for (; off < tas_info->cores_num;) {
339  iov.iov_base = &b;
340  iov.iov_len = 1;
341 
342  memset(&msg, 0, sizeof(msg));
343  msg.msg_iov = &iov;
344  msg.msg_iovlen = 1;
345  msg.msg_control = u.buf;
346  msg.msg_controllen = sizeof(u.buf);
347 
348  n = (tas_info->cores_num - off >= 4 ? 4 : tas_info->cores_num - off);
349 
350  cmsg->cmsg_level = SOL_SOCKET;
351  cmsg->cmsg_type = SCM_RIGHTS;
352  cmsg->cmsg_len = CMSG_LEN(sizeof(int) * n);
353 
354  pfd = (int *) CMSG_DATA(cmsg);
355  for (j = 0; j < n; j++) {
356  pfd[j] = ctxs[off++]->evfd;
357  }
358 
359  /* send out kernel notify fd */
360  if((tx = sendmsg(cfd, &msg, 0)) != 1) {
361  fprintf(stderr, "tx fd == %zd\n", tx);
362  if(tx == -1) {
363  fprintf(stderr, "errno fd == %d\n", errno);
364  }
365  abort();
366  }
367  }
368 
369  /* allocate application struct */
370  if ((app = malloc(sizeof(*app))) == NULL) {
371  fprintf(stderr, "uxsocket_accept: malloc of app struct failed\n");
372  close(cfd);
373  return;
374  }
375 
376  sz = sizeof(*app->resp) +
377  tas_info->cores_num * sizeof(app->resp->flexnic_qs[0]);
378  app->resp_sz = sz;
379  if ((app->resp = malloc(sz)) == NULL) {
380  fprintf(stderr, "uxsocket_accept: malloc of app resp struct failed\n");
381  free(app);
382  close(cfd);
383  return;
384  }
385 
386  /* add to epoll */
387  ev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
388  ev.data.ptr = app;
389  if (epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev) != 0) {
390  perror("uxsocket_accept: epoll_ctl failed");
391  free(app->resp);
392  free(app);
393  close(cfd);
394  return;
395  }
396 
397  app->fd = cfd;
398  app->contexts = NULL;
399  app->need_reg_ctx = NULL;
400  app->closed = false;
401  app->conns = NULL;
402  app->listeners = NULL;
403  app->id = app_id_next++;
404  nbqueue_enq(&ux_to_poll, &app->nqe);
405 }
406 
407 static void uxsocket_notify(void)
408 {
409  uint8_t *p;
410  struct application *app;
411  uint64_t x;
412  ssize_t ret;
413 
414  ret = read(notifyfd, &x, sizeof(x));
415  if (ret == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
416  perror("uxsocket_notify: read on notifyfd failed");
417  abort();
418  }
419 
420  while ((p = nbqueue_deq(&poll_to_ux)) != NULL) {
421  app = (struct application *) (p - offsetof(struct application, comp.el));
422  uxsocket_notify_app(app);
423  }
424 }
425 
426 static void uxsocket_error(struct application *app)
427 {
428  close(app->fd);
429  app->closed = true;
430 }
431 
432 static void uxsocket_receive(struct application *app)
433 {
434  ssize_t rx;
435  struct app_context *ctx;
436  struct packetmem_handle *pm_in, *pm_out;
437  uintptr_t off_in, off_out, off_rxq, off_txq;
438  size_t kin_qsize, kout_qsize, ctx_sz;
439  struct epoll_event ev;
440  uint16_t i;
441  int evfd = 0;
442 
443  /* receive data to hopefully complete request */
444  struct iovec iov = {
445  .iov_base = &app->req,
446  .iov_len = sizeof(app->req) - app->req_rx,
447  };
448  union {
449  char buf[CMSG_SPACE(sizeof(int))];
450  struct cmsghdr align;
451  } u;
452  struct msghdr msg = {
453  .msg_name = NULL,
454  .msg_namelen = 0,
455  .msg_iov = &iov,
456  .msg_iovlen = 1,
457  .msg_control = u.buf,
458  .msg_controllen = sizeof(u.buf),
459  .msg_flags = 0,
460  };
461  rx = recvmsg(app->fd, &msg, 0);
462 
463  if(msg.msg_controllen > 0) {
464  struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
465  assert(cmsg->cmsg_len == CMSG_LEN(sizeof(int)));
466  int* data = (int*) CMSG_DATA(cmsg);
467  evfd = *data;
468  }
469 
470  if (rx < 0) {
471  perror("uxsocket_receive: recv failed");
472  goto error_abort_app;
473  } else if (rx + app->req_rx < sizeof(app->req)) {
474  /* request not complete yet */
475  app->req_rx += rx;
476  return;
477  }
478 
479  /* request complete */
480  app->req_rx = 0;
481 
482  /* allocate context struct */
483  ctx_sz = sizeof(*ctx) + tas_info->cores_num * sizeof(ctx->handles[0]);
484  if ((ctx = malloc(ctx_sz)) == NULL) {
485  perror("uxsocket_receive: ctx malloc failed");
486  goto error_ctxmalloc;
487  }
488 
489  /* queue sizes */
490  kin_qsize = config.app_kin_len;
491  kout_qsize = config.app_kout_len;
492 
493  /* allocate packet memory for kernel queues */
494  if (packetmem_alloc(kin_qsize, &off_in, &pm_in) != 0) {
495  fprintf(stderr, "uxsocket_receive: packetmem_alloc in failed\n");
496  goto error_pktmem_in;
497  }
498  if (packetmem_alloc(kout_qsize, &off_out, &pm_out) != 0) {
499  fprintf(stderr, "uxsocket_receive: packetmem_alloc out failed\n");
500  goto error_pktmem_out;
501  }
502 
503  /* allocate packet memory for flexnic queues */
504  for (i = 0; i < tas_info->cores_num; i++) {
505  if (packetmem_alloc(app->req.rxq_len, &off_rxq, &ctx->handles[i].rxq)
506  != 0)
507  {
508  fprintf(stderr, "uxsocket_receive: packetmem_alloc rxq failed\n");
509  goto error_pktmem;
510  }
511  if (packetmem_alloc(app->req.txq_len, &off_txq, &ctx->handles[i].txq)
512  != 0)
513  {
514  fprintf(stderr, "uxsocket_receive: packetmem_alloc txq failed\n");
515  packetmem_free(ctx->handles[i].rxq);
516  goto error_pktmem;
517  }
518  memset((uint8_t *) tas_shm + off_rxq, 0, app->req.rxq_len);
519  memset((uint8_t *) tas_shm + off_txq, 0, app->req.txq_len);
520  app->resp->flexnic_qs[i].rxq_off = off_rxq;
521  app->resp->flexnic_qs[i].txq_off = off_txq;
522  }
523 
524  /* allocate doorbell */
525  if ((ctx->doorbell = free_doorbells) == NULL) {
526  fprintf(stderr, "uxsocket_receive: allocating doorbell failed\n");
527  goto error_dballoc;
528  }
529  free_doorbells = ctx->doorbell->next;
530 
531 
532  /* initialize queuepair struct and queues */
533  ctx->app = app;
534 
535  ctx->kin_handle = pm_in;
536  ctx->kin_base = (uint8_t *) tas_shm + off_in;
537  ctx->kin_len = kin_qsize / sizeof(struct kernel_appout);
538  ctx->kin_pos = 0;
539  memset(ctx->kin_base, 0, kin_qsize);
540 
541  ctx->kout_handle = pm_out;
542  ctx->kout_base = (uint8_t *) tas_shm + off_out;
543  ctx->kout_len = kout_qsize / sizeof(struct kernel_appin);
544  ctx->kout_pos = 0;
545  memset(ctx->kout_base, 0, kout_qsize);
546 
547  ctx->ready = 0;
548  assert(evfd != 0); // XXX: Will be 0 if request was broken up
549  ctx->evfd = evfd;
550 
551  ctx->next = app->contexts;
552  MEM_BARRIER();
553  app->contexts = ctx;
554 
555  /* initialize response */
556  app->resp->app_out_off = off_in;
557  app->resp->app_out_len = kin_qsize;
558  app->resp->app_in_off = off_out;
559  app->resp->app_in_len = kout_qsize;
560  app->resp->flexnic_db_id = ctx->doorbell->id;
561  app->resp->flexnic_qs_num = tas_info->cores_num;
562  app->resp->status = 0;
563 
564  /* no longer wait on epoll in for this socket until we get the completion */
565  ev.events = EPOLLRDHUP | EPOLLERR;
566  ev.data.ptr = app;
567  if (epoll_ctl(epfd, EPOLL_CTL_MOD, app->fd, &ev) != 0) {
568  /* not sure how to handle this */
569  perror("uxsocket_receive: epoll_ctl failed");
570  abort();
571  }
572 
573  app->need_reg_ctx_done = ctx;
574  MEM_BARRIER();
575  app->need_reg_ctx = ctx;
576 #if 0
577  /* send out response */
578  tx = send(app->fd, &resp, sizeof(resp), 0);
579  if (tx < 0) {
580  perror("uxsocket_receive: send failed");
581  goto error_abort_app;
582  } else if (tx < sizeof(resp)) {
583  /* FIXME */
584  fprintf(stderr, "uxsocket_receive: short send for response (TODO)\n");
585  goto error_abort_app;
586  }
587 #endif
588 
589  return;
590 
591 
592 error_dballoc:
593  /* TODO: for () packetmem_free(ctx->txq_handle) */
594 error_pktmem:
595  packetmem_free(pm_out);
596 error_pktmem_out:
597  packetmem_free(pm_in);
598 error_pktmem_in:
599  free(ctx);
600 error_ctxmalloc:
601 error_abort_app:
602  uxsocket_error(app);
603  return;
604 
605 }
606 
607 static void uxsocket_notify_app(struct application *app)
608 {
609  ssize_t tx;
610  struct epoll_event ev;
611  struct app_context *ctx;
612 
613  if (app->comp.status != 0) {
614  /* TODO: cleanup and return error */
615  fprintf(stderr, "uxsocket_notify_app: status = %d, terminating app\n",
616  app->comp.status);
617  goto error_status;
618  return;
619  }
620 
621  ctx = app->need_reg_ctx_done;
622  ctx->ready = 1;
623 
624  /* send out response */
625  tx = send(app->fd, app->resp, app->resp_sz, 0);
626  if (tx < 0) {
627  perror("uxsocket_notify_app: send failed");
628  goto error_send;
629  } else if (tx < app->resp_sz) {
630  /* FIXME */
631  fprintf(stderr, "uxsocket_notify_app: short send for response (TODO)\n");
632  goto error_send;
633  }
634 
635  /* wait for epoll in again */
636  ev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
637  ev.data.ptr = app;
638  if (epoll_ctl(epfd, EPOLL_CTL_MOD, app->fd, &ev) != 0) {
639  /* not sure how to handle this */
640  perror("uxsocket_notify_app: epoll_ctl failed");
641  abort();
642  }
643 
644  return;
645 
646 error_status:
647 error_send:
648  uxsocket_error(app);
649 }
static struct application * applications
Definition: appif.c:97
uint32_t cores_num
Definition: tas_memif.h:72
static uint16_t app_id_next
Definition: appif.c:94
static struct nbqueue poll_to_ux
Definition: appif.c:84
int packetmem_alloc(size_t length, uintptr_t *off, struct packetmem_handle **handle)
Definition: packetmem.c:61
static int notifyfd
Definition: appif.c:82
static struct nbqueue ux_to_poll
Definition: appif.c:86
int appif_init(void)
Definition: appif.c:100
void packetmem_free(struct packetmem_handle *handle)
Definition: packetmem.c:113
uint64_t app_kout_len
Definition: config.h:53
uint64_t app_kin_len
Definition: config.h:51
int nicif_appctx_add(uint16_t appid, uint32_t db, uint64_t *rxq_base, uint32_t rxq_len, uint64_t *txq_base, uint32_t txq_len, int evfd)
Definition: nicif.c:130
static pthread_t pt_ux
Definition: appif.c:89
#define EP_LISTEN
Definition: appif.c:65
#define EP_NOTIFY
Definition: appif.c:67
static struct app_doorbell * free_doorbells
Definition: appif.c:92
unsigned appif_poll(void)
Definition: appif.c:130
static int uxfd
Definition: appif.c:78
unsigned appif_ctx_poll(struct application *app, struct app_context *ctx)
Definition: appif_ctx.c:222
static int epfd
Definition: appif.c:80