TAS
TCP Acceleration as an OS Service
fastemu.c
1 /*
2  * Copyright 2019 University of Washington, Max Planck Institute for
3  * Software Systems, and The University of Texas at Austin
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining
6  * a copy of this software and associated documentation files (the
7  * "Software"), to deal in the Software without restriction, including
8  * without limitation the rights to use, copy, modify, merge, publish,
9  * distribute, sublicense, and/or sell copies of the Software, and to
10  * permit persons to whom the Software is furnished to do so, subject to
11  * the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be
14  * included in all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
19  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
20  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
21  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
22  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23  */
24 
25 #include <assert.h>
26 #include <signal.h>
27 #include <unistd.h>
28 #include <sys/epoll.h>
29 #include <sys/eventfd.h>
30 #include <rte_config.h>
31 #include <rte_malloc.h>
32 #include <rte_cycles.h>
33 
34 #include <tas_memif.h>
35 
36 #include "internal.h"
37 #include "fastemu.h"
38 
39 #define DATAPLANE_TSCS
40 
41 #ifdef DATAPLANE_STATS
42 # ifdef DATAPLANE_TSCS
43 # define STATS_TS(n) uint64_t n = rte_get_tsc_cycles()
44 # define STATS_TSADD(c, f, n) __sync_fetch_and_add(&c->stat_##f, n)
45 # else
46 # define STATS_TS(n) do { } while (0)
47 # define STATS_TSADD(c, f, n) do { } while (0)
48 # endif
49 # define STATS_ADD(c, f, n) __sync_fetch_and_add(&c->stat_##f, n)
50 #else
51 # define STATS_TS(n) do { } while (0)
52 # define STATS_TSADD(c, f, n) do { } while (0)
53 # define STATS_ADD(c, f, n) do { } while (0)
54 #endif
55 
56 
57 static void dataplane_block(struct dataplane_context *ctx, uint32_t ts);
58 static unsigned poll_rx(struct dataplane_context *ctx, uint32_t ts,
59  uint64_t tsc) __attribute__((noinline));
60 static unsigned poll_queues(struct dataplane_context *ctx, uint32_t ts) __attribute__((noinline));
61 static unsigned poll_kernel(struct dataplane_context *ctx, uint32_t ts) __attribute__((noinline));
62 static unsigned poll_qman(struct dataplane_context *ctx, uint32_t ts) __attribute__((noinline));
63 static unsigned poll_qman_fwd(struct dataplane_context *ctx, uint32_t ts) __attribute__((noinline));
64 static void poll_scale(struct dataplane_context *ctx);
65 
66 static inline uint8_t bufcache_prealloc(struct dataplane_context *ctx, uint16_t num,
67  struct network_buf_handle ***handles);
68 static inline void bufcache_alloc(struct dataplane_context *ctx, uint16_t num);
69 static inline void bufcache_free(struct dataplane_context *ctx,
70  struct network_buf_handle *handle);
71 
72 static inline void tx_flush(struct dataplane_context *ctx);
73 static inline void tx_send(struct dataplane_context *ctx,
74  struct network_buf_handle *nbh, uint16_t off, uint16_t len);
75 
76 static void arx_cache_flush(struct dataplane_context *ctx, uint64_t tsc) __attribute__((noinline));
77 
78 int dataplane_init(void)
79 {
80  if (FLEXNIC_INTERNAL_MEM_SIZE < sizeof(struct flextcp_pl_mem)) {
81  fprintf(stderr, "dataplane_init: internal flexnic memory size not "
82  "sufficient (got %x, need %zx)\n", FLEXNIC_INTERNAL_MEM_SIZE,
83  sizeof(struct flextcp_pl_mem));
84  return -1;
85  }
86 
87  if (fp_cores_max > FLEXNIC_PL_APPST_CTX_MCS) {
88  fprintf(stderr, "dataplane_init: more cores than FLEXNIC_PL_APPST_CTX_MCS "
89  "(%u)\n", FLEXNIC_PL_APPST_CTX_MCS);
90  return -1;
91  }
92  if (FLEXNIC_PL_FLOWST_NUM > FLEXNIC_NUM_QMQUEUES) {
93  fprintf(stderr, "dataplane_init: more flow states than queue manager queues"
94  "(%u > %u)\n", FLEXNIC_PL_FLOWST_NUM, FLEXNIC_NUM_QMQUEUES);
95  return -1;
96  }
97 
98  return 0;
99 }
100 
101 int dataplane_context_init(struct dataplane_context *ctx)
102 {
103  char name[32];
104 
105  /* initialize forwarding queue */
106  sprintf(name, "qman_fwd_ring_%u", ctx->id);
107  if ((ctx->qman_fwd_ring = rte_ring_create(name, 32 * 1024, rte_socket_id(),
108  RING_F_SC_DEQ)) == NULL)
109  {
110  fprintf(stderr, "initializing rte_ring_create");
111  return -1;
112  }
113 
114  /* initialize queue manager */
115  if (qman_thread_init(ctx) != 0) {
116  fprintf(stderr, "initializing qman thread failed\n");
117  return -1;
118  }
119 
120  /* initialize network queue */
121  if (network_thread_init(ctx) != 0) {
122  fprintf(stderr, "initializing rx thread failed\n");
123  return -1;
124  }
125 
126  ctx->poll_next_ctx = ctx->id;
127 
128  ctx->evfd = eventfd(0, EFD_NONBLOCK);
129  assert(ctx->evfd != -1);
130  ctx->ev.epdata.event = EPOLLIN;
131  int r = rte_epoll_ctl(RTE_EPOLL_PER_THREAD, EPOLL_CTL_ADD, ctx->evfd, &ctx->ev);
132  assert(r == 0);
133  fp_state->kctx[ctx->id].evfd = ctx->evfd;
134 
135  return 0;
136 }
137 
138 void dataplane_context_destroy(struct dataplane_context *ctx)
139 {
140 }
141 
142 void dataplane_loop(struct dataplane_context *ctx)
143 {
144  struct notify_blockstate nbs;
145  uint32_t ts;
146  uint64_t cyc, prev_cyc;
147  int was_idle = 1;
148 
149  notify_canblock_reset(&nbs);
150  while (!exited) {
151  unsigned n = 0;
152 
153  /* count cycles of previous iteration if it was busy */
154  prev_cyc = cyc;
155  cyc = rte_get_tsc_cycles();
156  if (!was_idle)
157  ctx->loadmon_cyc_busy += cyc - prev_cyc;
158 
159 
160  ts = qman_timestamp(cyc);
161 
162  STATS_TS(start);
163  n += poll_rx(ctx, ts, cyc);
164  STATS_TS(rx);
165  tx_flush(ctx);
166 
167  n += poll_qman_fwd(ctx, ts);
168 
169  STATS_TSADD(ctx, cyc_rx, rx - start);
170  n += poll_qman(ctx, ts);
171  STATS_TS(qm);
172  STATS_TSADD(ctx, cyc_qm, qm - rx);
173  n += poll_queues(ctx, ts);
174  STATS_TS(qs);
175  STATS_TSADD(ctx, cyc_qs, qs - qm);
176  n += poll_kernel(ctx, ts);
177 
178  /* flush transmit buffer */
179  tx_flush(ctx);
180 
181  if (ctx->id == 0)
182  poll_scale(ctx);
183 
184  was_idle = (n == 0);
185  if (config.fp_interrupts && notify_canblock(&nbs, !was_idle, cyc)) {
186  dataplane_block(ctx, ts);
187  notify_canblock_reset(&nbs);
188  }
189  }
190 }
191 
192 static void dataplane_block(struct dataplane_context *ctx, uint32_t ts)
193 {
194  uint32_t max_timeout;
195  uint64_t val;
196  int ret, i;
197  struct rte_epoll_event event[2];
198 
199  if (network_rx_interrupt_ctl(&ctx->net, 1) != 0) {
200  return;
201  }
202 
203  max_timeout = qman_next_ts(&ctx->qman, ts);
204 
205  ret = rte_epoll_wait(RTE_EPOLL_PER_THREAD, event, 2,
206  max_timeout == (uint32_t) -1 ? -1 : max_timeout / 1000);
207  if (ret < 0) {
208  perror("dataplane_block: rte_epoll_wait failed");
209  abort();
210  }
211 
212  for(i = 0; i < ret; i++) {
213  if(event[i].fd == ctx->evfd) {
214  ret = read(ctx->evfd, &val, sizeof(uint64_t));
215  if ((ret > 0 && ret != sizeof(uint64_t)) ||
216  (ret < 0 && errno != EAGAIN && errno != EWOULDBLOCK))
217  {
218  perror("dataplane_block: read failed");
219  abort();
220  }
221  }
222  }
223  network_rx_interrupt_ctl(&ctx->net, 0);
224 }
225 
226 #ifdef DATAPLANE_STATS
227 static inline uint64_t read_stat(uint64_t *p)
228 {
229  return __sync_lock_test_and_set(p, 0);
230 }
231 
232 void dataplane_dump_stats(void)
233 {
234  struct dataplane_context *ctx;
235  unsigned i;
236 
237  for (i = 0; i < fp_cores_max; i++) {
238  ctx = ctxs[i];
239  fprintf(stderr, "dp stats %u: "
240  "qm=(%"PRIu64",%"PRIu64",%"PRIu64") "
241  "rx=(%"PRIu64",%"PRIu64",%"PRIu64") "
242  "qs=(%"PRIu64",%"PRIu64",%"PRIu64") "
243  "cyc=(%"PRIu64",%"PRIu64",%"PRIu64",%"PRIu64")\n", i,
244  read_stat(&ctx->stat_qm_poll), read_stat(&ctx->stat_qm_empty),
245  read_stat(&ctx->stat_qm_total),
246  read_stat(&ctx->stat_rx_poll), read_stat(&ctx->stat_rx_empty),
247  read_stat(&ctx->stat_rx_total),
248  read_stat(&ctx->stat_qs_poll), read_stat(&ctx->stat_qs_empty),
249  read_stat(&ctx->stat_qs_total),
250  read_stat(&ctx->stat_cyc_db), read_stat(&ctx->stat_cyc_qm),
251  read_stat(&ctx->stat_cyc_rx), read_stat(&ctx->stat_cyc_qs));
252  }
253 }
254 #endif
255 
256 static unsigned poll_rx(struct dataplane_context *ctx, uint32_t ts,
257  uint64_t tsc)
258 {
259  int ret;
260  unsigned i, n;
261  uint8_t freebuf[BATCH_SIZE] = { 0 };
262  void *fss[BATCH_SIZE];
263  struct tcp_opts tcpopts[BATCH_SIZE];
264  struct network_buf_handle *bhs[BATCH_SIZE];
265 
266  n = BATCH_SIZE;
267  if (TXBUF_SIZE - ctx->tx_num < n)
268  n = TXBUF_SIZE - ctx->tx_num;
269 
270  STATS_ADD(ctx, rx_poll, 1);
271 
272  /* receive packets */
273  ret = network_poll(&ctx->net, n, bhs);
274  if (ret <= 0) {
275  STATS_ADD(ctx, rx_empty, 1);
276  return 0;
277  }
278  STATS_ADD(ctx, rx_total, n);
279  n = ret;
280 
281  /* prefetch packet contents (1st cache line) */
282  for (i = 0; i < n; i++) {
283  rte_prefetch0(network_buf_bufoff(bhs[i]));
284  }
285 
286  /* look up flow states */
287  fast_flows_packet_fss(ctx, bhs, fss, n);
288 
289  /* prefetch packet contents (2nd cache line, TS opt overlaps) */
290  for (i = 0; i < n; i++) {
291  rte_prefetch0(network_buf_bufoff(bhs[i]) + 64);
292  }
293 
294  /* parse packets */
295  fast_flows_packet_parse(ctx, bhs, fss, tcpopts, n);
296 
297  for (i = 0; i < n; i++) {
298  /* run fast-path for flows with flow state */
299  if (fss[i] != NULL) {
300  ret = fast_flows_packet(ctx, bhs[i], fss[i], &tcpopts[i], ts);
301  } else {
302  ret = -1;
303  }
304 
305  if (ret > 0) {
306  freebuf[i] = 1;
307  } else if (ret < 0) {
308  fast_kernel_packet(ctx, bhs[i]);
309  }
310  }
311 
312  arx_cache_flush(ctx, tsc);
313 
314  /* free received buffers */
315  for (i = 0; i < n; i++) {
316  if (freebuf[i] == 0)
317  bufcache_free(ctx, bhs[i]);
318  }
319 
320  return n;
321 }
322 
323 static unsigned poll_queues(struct dataplane_context *ctx, uint32_t ts)
324 {
325  struct network_buf_handle **handles;
326  void *aqes[BATCH_SIZE];
327  unsigned n, i, total = 0;
328  uint16_t max, k = 0, num_bufs = 0, j;
329  int ret;
330 
331  STATS_ADD(ctx, qs_poll, 1);
332 
333  max = BATCH_SIZE;
334  if (TXBUF_SIZE - ctx->tx_num < max)
335  max = TXBUF_SIZE - ctx->tx_num;
336 
337  /* allocate buffers contents */
338  max = bufcache_prealloc(ctx, max, &handles);
339 
340  for (n = 0; n < FLEXNIC_PL_APPCTX_NUM; n++) {
341  fast_appctx_poll_pf(ctx, (ctx->poll_next_ctx + n) % FLEXNIC_PL_APPCTX_NUM);
342  }
343 
344  for (n = 0; n < FLEXNIC_PL_APPCTX_NUM && k < max; n++) {
345  for (i = 0; i < BATCH_SIZE && k < max; i++) {
346  ret = fast_appctx_poll_fetch(ctx, ctx->poll_next_ctx, &aqes[k]);
347  if (ret == 0)
348  k++;
349  else
350  break;
351 
352  total++;
353  }
354 
355  ctx->poll_next_ctx = (ctx->poll_next_ctx + 1) %
356  FLEXNIC_PL_APPCTX_NUM;
357  }
358 
359  for (j = 0; j < k; j++) {
360  ret = fast_appctx_poll_bump(ctx, aqes[j], handles[num_bufs], ts);
361  if (ret == 0)
362  num_bufs++;
363  }
364 
365  /* apply buffer reservations */
366  bufcache_alloc(ctx, num_bufs);
367 
368  for (n = 0; n < FLEXNIC_PL_APPCTX_NUM; n++)
369  fast_actx_rxq_probe(ctx, n);
370 
371  STATS_ADD(ctx, qs_total, total);
372  if (total == 0)
373  STATS_ADD(ctx, qs_empty, total);
374 
375  return total;
376 }
377 
378 static unsigned poll_kernel(struct dataplane_context *ctx, uint32_t ts)
379 {
380  struct network_buf_handle **handles;
381  unsigned total = 0;
382  uint16_t max, k = 0;
383  int ret;
384 
385  max = BATCH_SIZE;
386  if (TXBUF_SIZE - ctx->tx_num < max)
387  max = TXBUF_SIZE - ctx->tx_num;
388 
389  max = (max > 8 ? 8 : max);
390  /* allocate buffers contents */
391  max = bufcache_prealloc(ctx, max, &handles);
392 
393  for (k = 0; k < max;) {
394  ret = fast_kernel_poll(ctx, handles[k], ts);
395 
396  if (ret == 0)
397  k++;
398  else if (ret < 0)
399  break;
400 
401  total++;
402  }
403 
404  /* apply buffer reservations */
405  bufcache_alloc(ctx, k);
406 
407  return total;
408 }
409 
410 static unsigned poll_qman(struct dataplane_context *ctx, uint32_t ts)
411 {
412  unsigned q_ids[BATCH_SIZE];
413  uint16_t q_bytes[BATCH_SIZE];
414  struct network_buf_handle **handles;
415  uint16_t off = 0, max;
416  int ret, i, use;
417 
418  max = BATCH_SIZE;
419  if (TXBUF_SIZE - ctx->tx_num < max)
420  max = TXBUF_SIZE - ctx->tx_num;
421 
422  STATS_ADD(ctx, qm_poll, 1);
423 
424  /* allocate buffers contents */
425  max = bufcache_prealloc(ctx, max, &handles);
426 
427  /* poll queue manager */
428  ret = qman_poll(&ctx->qman, max, q_ids, q_bytes);
429  if (ret <= 0) {
430  STATS_ADD(ctx, qm_empty, 1);
431  return 0;
432  }
433 
434  STATS_ADD(ctx, qm_total, ret);
435 
436  for (i = 0; i < ret; i++) {
437  rte_prefetch0(handles[i]);
438  }
439 
440  for (i = 0; i < ret; i++) {
441  rte_prefetch0((uint8_t *) handles[i] + 64);
442  }
443 
444  /* prefetch packet contents */
445  for (i = 0; i < ret; i++) {
446  rte_prefetch0(network_buf_buf(handles[i]));
447  }
448 
449  fast_flows_qman_pf(ctx, q_ids, ret);
450 
451  fast_flows_qman_pfbufs(ctx, q_ids, ret);
452 
453  for (i = 0; i < ret; i++) {
454  use = fast_flows_qman(ctx, q_ids[i], handles[off], ts);
455 
456  if (use == 0)
457  off++;
458  }
459 
460  /* apply buffer reservations */
461  bufcache_alloc(ctx, off);
462 
463  return ret;
464 }
465 
466 static unsigned poll_qman_fwd(struct dataplane_context *ctx, uint32_t ts)
467 {
468  void *flow_states[4 * BATCH_SIZE];
469  int ret, i;
470 
471  /* poll queue manager forwarding ring */
472  ret = rte_ring_dequeue_burst(ctx->qman_fwd_ring, flow_states, 4 * BATCH_SIZE, NULL);
473  for (i = 0; i < ret; i++) {
474  fast_flows_qman_fwd(ctx, flow_states[i]);
475  }
476 
477  return ret;
478 }
479 
480 static inline uint8_t bufcache_prealloc(struct dataplane_context *ctx, uint16_t num,
481  struct network_buf_handle ***handles)
482 {
483  uint16_t grow, res, head, g, i;
484  struct network_buf_handle *nbh;
485 
486  /* try refilling buffer cache */
487  if (ctx->bufcache_num < num) {
488  grow = BUFCACHE_SIZE - ctx->bufcache_num;
489  head = (ctx->bufcache_head + ctx->bufcache_num) & (BUFCACHE_SIZE - 1);
490 
491  if (head + grow <= BUFCACHE_SIZE) {
492  res = network_buf_alloc(&ctx->net, grow, ctx->bufcache_handles + head);
493  } else {
494  g = BUFCACHE_SIZE - head;
495  res = network_buf_alloc(&ctx->net, g, ctx->bufcache_handles + head);
496  if (res == g) {
497  res += network_buf_alloc(&ctx->net, grow - g, ctx->bufcache_handles);
498  }
499  }
500 
501  for (i = 0; i < res; i++) {
502  g = (head + i) & (BUFCACHE_SIZE - 1);
503  nbh = ctx->bufcache_handles[g];
504  ctx->bufcache_handles[g] = (struct network_buf_handle *)
505  ((uintptr_t) nbh);
506  }
507 
508  ctx->bufcache_num += res;
509  }
510  num = MIN(num, (ctx->bufcache_head + ctx->bufcache_num <= BUFCACHE_SIZE ?
511  ctx->bufcache_num : BUFCACHE_SIZE - ctx->bufcache_head));
512 
513  *handles = ctx->bufcache_handles + ctx->bufcache_head;
514 
515  return num;
516 }
517 
518 static inline void bufcache_alloc(struct dataplane_context *ctx, uint16_t num)
519 {
520  assert(num <= ctx->bufcache_num);
521 
522  ctx->bufcache_head = (ctx->bufcache_head + num) & (BUFCACHE_SIZE - 1);
523  ctx->bufcache_num -= num;
524 }
525 
526 static inline void bufcache_free(struct dataplane_context *ctx,
527  struct network_buf_handle *handle)
528 {
529  uint32_t head, num;
530 
531  num = ctx->bufcache_num;
532  if (num < BUFCACHE_SIZE) {
533  /* free to cache */
534  head = (ctx->bufcache_head + num) & (BUFCACHE_SIZE - 1);
535  ctx->bufcache_handles[head] = handle;
536  ctx->bufcache_num = num + 1;
537  network_buf_reset(handle);
538  } else {
539  /* free to network buffer manager */
540  network_free(1, &handle);
541  }
542 }
543 
544 static inline void tx_flush(struct dataplane_context *ctx)
545 {
546  int ret;
547  unsigned i;
548 
549  if (ctx->tx_num == 0) {
550  return;
551  }
552 
553  /* try to send out packets */
554  ret = network_send(&ctx->net, ctx->tx_num, ctx->tx_handles);
555 
556  if (ret == ctx->tx_num) {
557  /* everything sent */
558  ctx->tx_num = 0;
559  } else if (ret > 0) {
560  /* move unsent packets to front */
561  for (i = ret; i < ctx->tx_num; i++) {
562  ctx->tx_handles[i - ret] = ctx->tx_handles[i];
563  }
564  ctx->tx_num -= ret;
565  }
566 }
567 
568 static void poll_scale(struct dataplane_context *ctx)
569 {
570  unsigned st = fp_scale_to;
571 
572  if (st == 0)
573  return;
574 
575  fprintf(stderr, "Scaling fast path from %u to %u\n", fp_cores_cur, st);
576  if (st < fp_cores_cur) {
577  if (network_scale_down(fp_cores_cur, st) != 0) {
578  fprintf(stderr, "network_scale_down failed\n");
579  abort();
580  }
581  } else if (st > fp_cores_cur) {
582  if (network_scale_up(fp_cores_cur, st) != 0) {
583  fprintf(stderr, "network_scale_up failed\n");
584  abort();
585  }
586  } else {
587  fprintf(stderr, "poll_scale: warning core number didn't change\n");
588  }
589 
590  fp_cores_cur = st;
591  fp_scale_to = 0;
592 }
593 
594 static void arx_cache_flush(struct dataplane_context *ctx, uint64_t tsc)
595 {
596  uint16_t i;
597  struct flextcp_pl_appctx *actx;
598  struct flextcp_pl_arx *parx[BATCH_SIZE];
599 
600  for (i = 0; i < ctx->arx_num; i++) {
601  actx = &fp_state->appctx[ctx->id][ctx->arx_ctx[i]];
602  if (fast_actx_rxq_alloc(ctx, actx, &parx[i]) != 0) {
603  /* TODO: how do we handle this? */
604  fprintf(stderr, "arx_cache_flush: no space in app rx queue\n");
605  abort();
606  }
607  }
608 
609  for (i = 0; i < ctx->arx_num; i++) {
610  rte_prefetch0(parx[i]);
611  }
612 
613  for (i = 0; i < ctx->arx_num; i++) {
614  *parx[i] = ctx->arx_cache[i];
615  }
616 
617  for (i = 0; i < ctx->arx_num; i++) {
618  actx = &fp_state->appctx[ctx->id][ctx->arx_ctx[i]];
619  notify_appctx(actx, tsc);
620  }
621 
622  ctx->arx_num = 0;
623 }
uint32_t fp_interrupts
Definition: config.h:117
struct tcp_timestamp_opt * ts
Definition: tcp_common.h:230