TAS
TCP Acceleration as an OS Service
init.c
1 /*
2  * Copyright 2019 University of Washington, Max Planck Institute for
3  * Software Systems, and The University of Texas at Austin
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining
6  * a copy of this software and associated documentation files (the
7  * "Software"), to deal in the Software without restriction, including
8  * without limitation the rights to use, copy, modify, merge, publish,
9  * distribute, sublicense, and/or sell copies of the Software, and to
10  * permit persons to whom the Software is furnished to do so, subject to
11  * the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be
14  * included in all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
19  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
20  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
21  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
22  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23  */
24 
25 #include <stddef.h>
26 #include <stdio.h>
27 #include <string.h>
28 #include <stdlib.h>
29 #include <poll.h>
30 #include <sys/eventfd.h>
31 #include <unistd.h>
32 #include <signal.h>
33 #include <errno.h>
34 
35 #include <tas_ll_connect.h>
36 #include <kernel_appif.h>
37 #include <tas_ll.h>
38 #include <tas_memif.h>
39 #include <utils_timeout.h>
40 #include "internal.h"
41 
42 static inline int event_kappin_conn_opened(
43  struct kernel_appin_conn_opened *inev, struct flextcp_event *outev,
44  unsigned avail);
45 static inline void event_kappin_listen_newconn(
46  struct kernel_appin_listen_newconn *inev, struct flextcp_event *outev);
47 static inline int event_kappin_accept_conn(
48  struct kernel_appin_accept_conn *inev, struct flextcp_event *outev,
49  unsigned avail);
50 static inline void event_kappin_st_conn_move(
51  struct kernel_appin_status *inev, struct flextcp_event *outev);
52 static inline void event_kappin_st_listen_open(
53  struct kernel_appin_status *inev, struct flextcp_event *outev);
54 static inline void event_kappin_st_conn_closed(
55  struct kernel_appin_status *inev, struct flextcp_event *outev);
56 
57 static inline int event_arx_connupdate(struct flextcp_context *ctx,
58  volatile struct flextcp_pl_arx_connupdate *inev,
59  struct flextcp_event *outevs, int outn, uint16_t fn_core);
60 
61 static int kernel_poll(struct flextcp_context *ctx, int num,
62  struct flextcp_event *events, int *used) __attribute__((noinline));
63 static int fastpath_poll(struct flextcp_context *ctx, int num,
64  struct flextcp_event *events, int *used)
65  __attribute__((used,noinline));
66 static int fastpath_poll_vec(struct flextcp_context *ctx, int num,
67  struct flextcp_event *events, int *used) __attribute__((used,noinline));
68 static void conns_bump(struct flextcp_context *ctx) __attribute__((noinline));
69 static void txq_probe(struct flextcp_context *ctx, unsigned n) __attribute__((noinline));
70 
71 void *flexnic_mem = NULL;
72 struct flexnic_info *flexnic_info = NULL;
73 int flexnic_evfd[FLEXTCP_MAX_FTCPCORES];
74 
75 int flextcp_init(void)
76 {
77  if (flextcp_kernel_connect() != 0) {
78  fprintf(stderr, "flextcp_init: connecting to kernel failed\n");
79  return -1;
80  }
81 
82  if (flexnic_driver_connect(&flexnic_info, &flexnic_mem) != 0) {
83  fprintf(stderr, "flextcp_init: connecting to flexnic failed\n");
84  return -1;
85  }
86 
87  return 0;
88 }
89 
91 {
92  static uint16_t ctx_id = 0;
93 
94  memset(ctx, 0, sizeof(*ctx));
95 
96  ctx->ctx_id = __sync_fetch_and_add(&ctx_id, 1);
97  if (ctx->ctx_id >= FLEXTCP_MAX_CONTEXTS) {
98  fprintf(stderr, "flextcp_context_create: maximum number of contexts "
99  "exeeded\n");
100  return -1;
101  }
102 
103  ctx->evfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
104  if (ctx->evfd < 0) {
105  perror("flextcp_context_create: eventfd for waiting fd failed");
106  return -1;
107  }
108 
109  return flextcp_kernel_newctx(ctx);
110 }
111 
112 #include <pthread.h>
113 
114 int debug_flextcp_on = 0;
115 
116 static int kernel_poll(struct flextcp_context *ctx, int num,
117  struct flextcp_event *events, int *used)
118 {
119  int i, j = 0;
120  uint32_t pos;
121  struct kernel_appin *kout;
122  uint8_t type;
123 
124  /* poll kernel queues */
125  pos = ctx->kout_head;
126  for (i = 0; i < num;) {
127  kout = (struct kernel_appin *) ctx->kout_base + pos;
128  j = 1;
129 
130  type = kout->type;
131  MEM_BARRIER();
132 
133  if (type == KERNEL_APPIN_INVALID) {
134  break;
135  } else if (type == KERNEL_APPIN_CONN_OPENED) {
136  j = event_kappin_conn_opened(&kout->data.conn_opened, &events[i],
137  num - i);
138  } else if (type == KERNEL_APPIN_LISTEN_NEWCONN) {
139  event_kappin_listen_newconn(&kout->data.listen_newconn, &events[i]);
140  } else if (type == KERNEL_APPIN_ACCEPTED_CONN) {
141  j = event_kappin_accept_conn(&kout->data.accept_connection, &events[i],
142  num - i);
143  } else if (type == KERNEL_APPIN_STATUS_LISTEN_OPEN) {
144  event_kappin_st_listen_open(&kout->data.status, &events[i]);
145  } else if (type == KERNEL_APPIN_STATUS_CONN_MOVE) {
146  event_kappin_st_conn_move(&kout->data.status, &events[i]);
147  } else if (type == KERNEL_APPIN_STATUS_CONN_CLOSE) {
148  event_kappin_st_conn_closed(&kout->data.status, &events[i]);
149  } else {
150  fprintf(stderr, "flextcp_context_poll: unexpected kout type=%u pos=%u len=%u\n",
151  type, pos, ctx->kout_len);
152  abort();
153  }
154  ctx->flags |= CTX_FLAG_POLL_EVENTS;
155 
156  if (j == -1) {
157  break;
158  }
159 
160  i += j;
161 
162  MEM_BARRIER();
163  kout->type = KERNEL_APPIN_INVALID;
164 
165  pos = pos + 1;
166  if (pos >= ctx->kout_len) {
167  pos = 0;
168  }
169  }
170  ctx->kout_head = pos;
171 
172  *used = i;
173  return (j == -1 ? -1 : 0);
174 }
175 
176 static int fastpath_poll(struct flextcp_context *ctx, int num,
177  struct flextcp_event *events, int *used)
178 {
179  int i, j, ran_out;
180  volatile struct flextcp_pl_arx *arx_q, *arx;
181  uint32_t head;
182  uint16_t k;
183 
184  i = 0;
185  for (k = 0; k < ctx->num_queues && i < num; k++) {
186  ran_out = 0;
187 
188  arx_q = (volatile struct flextcp_pl_arx *)
189  ctx->queues[ctx->next_queue].rxq_base;
190  head = ctx->queues[ctx->next_queue].rxq_head;
191  for (; i < num;) {
192  j = 0;
193  arx = &arx_q[head / sizeof(*arx)];
194  if (arx->type == FLEXTCP_PL_ARX_INVALID) {
195  break;
196  } else if (arx->type == FLEXTCP_PL_ARX_CONNUPDATE) {
197  j = event_arx_connupdate(ctx, &arx->msg.connupdate, events + i, num - i, ctx->next_queue);
198  } else {
199  fprintf(stderr, "flextcp_context_poll: kout type=%u head=%x\n", arx->type, head);
200  }
201  ctx->flags |= CTX_FLAG_POLL_EVENTS;
202 
203  if (j == -1) {
204  ran_out = 1;
205  break;
206  }
207  i += j;
208 
209  arx->type = 0;
210 
211  /* next entry */
212  head += sizeof(*arx);
213  if (head >= ctx->rxq_len) {
214  head -= ctx->rxq_len;
215  }
216  }
217 
218  ctx->queues[ctx->next_queue].rxq_head = head;
219  if (ran_out) {
220  *used = i;
221  return -1;
222  }
223 
224  ctx->next_queue = ctx->next_queue + 1;
225  if (ctx->next_queue >= ctx->num_queues)
226  ctx->next_queue -= ctx->num_queues;
227  }
228 
229  *used = i;
230  return 0;
231 }
232 
233 static inline void fetch_8ts(struct flextcp_context *ctx, uint32_t *heads,
234  uint16_t q, uint8_t *ts)
235 {
236  struct flextcp_pl_arx *p0, *p1, *p2, *p3, *p4, *p5, *p6, *p7;
237 
238  p0 = (struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
239  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
240  p1 = (struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
241  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
242  p2 = (struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
243  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
244  p3 = (struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
245  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
246  p4 = (struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
247  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
248  p5 = (struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
249  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
250  p6 = (struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
251  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
252  p7 = (struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
253  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
254 
255  asm volatile(
256  "prefetcht0 32(%0);"
257  "prefetcht0 32(%1);"
258  "prefetcht0 32(%2);"
259  "prefetcht0 32(%3);"
260  "prefetcht0 32(%4);"
261  "prefetcht0 32(%5);"
262  "prefetcht0 32(%6);"
263  "prefetcht0 32(%7);"
264  "movb 31(%0), %b0;"
265  "movb 31(%1), %b1;"
266  "movb 31(%2), %b2;"
267  "movb 31(%3), %b3;"
268  "movb 31(%4), %b4;"
269  "movb 31(%5), %b5;"
270  "movb 31(%6), %b6;"
271  "movb 31(%7), %b7;"
272 
273  "movb %b0, 0(%8);"
274  "movb %b1, 1(%8);"
275  "movb %b2, 2(%8);"
276  "movb %b3, 3(%8);"
277  "movb %b4, 4(%8);"
278  "movb %b5, 5(%8);"
279  "movb %b6, 6(%8);"
280  "movb %b7, 7(%8);"
281  :
282  : "r" (p0), "r" (p1), "r" (p2), "r" (p3),
283  "r" (p4), "r" (p5), "r" (p6), "r" (p7), "r" (ts)
284  : "memory");
285 
286 }
287 
288 static inline void fetch_4ts(struct flextcp_context *ctx, uint32_t *heads,
289  uint16_t q, uint8_t *ts)
290 {
291  struct flextcp_pl_arx *p0, *p1, *p2, *p3;
292 
293  p0 = (struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
294  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
295  p1 = (struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
296  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
297  p2 = (struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
298  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
299  p3 = (struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
300  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
301 
302  asm volatile(
303  "prefetcht0 32(%0);"
304  "prefetcht0 32(%1);"
305  "prefetcht0 32(%2);"
306  "prefetcht0 32(%3);"
307  "movb 31(%0), %b0;"
308  "movb 31(%1), %b1;"
309  "movb 31(%2), %b2;"
310  "movb 31(%3), %b3;"
311  "movb %b0, 0(%4);"
312  "movb %b1, 1(%4);"
313  "movb %b2, 2(%4);"
314  "movb %b3, 3(%4);"
315  :
316  : "r" (p0), "r" (p1), "r" (p2), "r" (p3), "r" (ts)
317  : "memory");
318 }
319 
320 
321 static int fastpath_poll_vec(struct flextcp_context *ctx, int num,
322  struct flextcp_event *events, int *used)
323 {
324  int i, j, ran_out, found, found_inner;
325  volatile struct flextcp_pl_arx *arx;
326  uint32_t head;
327  uint16_t l, k, q;
328  uint8_t t;
329  uint8_t types[ctx->num_queues];
330  uint32_t qheads[ctx->num_queues];
331 
332  volatile struct flextcp_pl_arx *arxs[num];
333  uint8_t arx_qs[num];
334 
335  for (q = 0; q < ctx->num_queues; q++) {
336  qheads[q] = ctx->queues[q].rxq_head;
337  }
338 
339  ran_out = found = 0;
340  i = 0;
341  q = ctx->next_queue;
342  while (i < num && !ran_out) {
343  l = 0;
344  for (found_inner = 1; found_inner && i + l < num; ) {
345  found_inner = 0;
346 
347  /* fetch types from all n queues */
348  uint16_t qs = ctx->num_queues;
349  q = ctx->next_queue;
350  k = 0;
351  while (qs > 8) {
352  fetch_8ts(ctx, qheads, q, types + k);
353 
354  q = (q + 8 < ctx->num_queues ? q + 8 : q + 8 - ctx->num_queues);
355  k += 8;
356  qs -= 8;
357  }
358  while (qs > 4) {
359  fetch_4ts(ctx, qheads, q, types + k);
360 
361  q = (q + 4 < ctx->num_queues ? q + 4 : q + 4 - ctx->num_queues);
362  k += 4;
363  qs -= 4;
364  }
365  while (qs > 0) {
366  arx = (volatile struct flextcp_pl_arx *)
367  (ctx->queues[q].rxq_base + qheads[q]);
368  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
369  types[k] = arx->type;
370  k++;
371  qs--;
372  }
373 
374  /* prefetch connection state for all entries */
375  for (k = 0, q = ctx->next_queue; k < ctx->num_queues && i + l < num; k++) {
376  if (types[k] == FLEXTCP_PL_ARX_CONNUPDATE) {
377  arx = (volatile struct flextcp_pl_arx *)
378  (ctx->queues[q].rxq_base + qheads[q]);
379  util_prefetch0(OPAQUE_PTR(arx->msg.connupdate.opaque) + 64);
380  util_prefetch0(OPAQUE_PTR(arx->msg.connupdate.opaque));
381 
382  arxs[l] = arx;
383  arx_qs[l] = q;
384  l++;
385  found_inner = 1;
386 
387  qheads[q] = qheads[q] + sizeof(*arx);
388  if (qheads[q] >= ctx->rxq_len) {
389  qheads[q] -= ctx->rxq_len;
390  }
391  }
392  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
393  }
394  }
395 
396  if (l == 0)
397  break;
398 
399  for (k = 0; k < l && i < num; k++) {
400  arx = arxs[k];
401  q = arx_qs[k];
402  head = ctx->queues[q].rxq_head;
403 
404  t = arx->type;
405  assert(t != FLEXTCP_PL_ARX_INVALID);
406 
407  if (t == FLEXTCP_PL_ARX_CONNUPDATE) {
408  j = event_arx_connupdate(ctx, &arx->msg.connupdate, events + i,
409  num - i, q);
410  } else {
411  j = 0;
412  fprintf(stderr, "flextcp_context_poll: kout type=%u head=%x\n",
413  arx->type, head);
414  }
415 
416  found = 1;
417 
418  if (j == -1) {
419  ran_out = 1;
420  break;
421  }
422  i += j;
423 
424  arx->type = 0;
425 
426  /* next entry */
427  head += sizeof(*arx);
428  if (head >= ctx->rxq_len) {
429  head -= ctx->rxq_len;
430  }
431  ctx->queues[q].rxq_head = head;
432  }
433  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
434  }
435 
436  ctx->next_queue = q;
437 
438  if (found) {
439  for (k = 0, q = ctx->next_queue; k < ctx->num_queues; k++) {
440  arx = (struct flextcp_pl_arx *) (ctx->queues[q].rxq_base +
441  ctx->queues[q].rxq_head);
442  util_prefetch0(arx);
443  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
444  }
445 
446  ctx->flags |= CTX_FLAG_POLL_EVENTS;
447  }
448 
449  *used = i;
450  return 0;
451 }
452 
453 
454 int flextcp_context_poll(struct flextcp_context *ctx, int num,
455  struct flextcp_event *events)
456 {
457  int i, j;
458 
459  i = 0;
460 
461  ctx->flags |= CTX_FLAG_POLL_CALLED;
462 
463  /* prefetch queues */
464  uint32_t k, q;
465  for (k = 0, q = ctx->next_queue; k < ctx->num_queues; k++) {
466  util_prefetch0((struct flextcp_pl_arx *) (ctx->queues[q].rxq_base +
467  ctx->queues[q].rxq_head));
468  q = (q + 1 < ctx->num_queues ? q + 1 : 0);
469  }
470 
471  /* poll kernel */
472  if (kernel_poll(ctx, num, events, &i) == -1) {
473  /* not enough event space, abort */
474  return i;
475  }
476 
477  /* poll NIC queues */
478  fastpath_poll_vec(ctx, num - i, events + i, &j);
479 
480  txq_probe(ctx, num);
481  conns_bump(ctx);
482 
483  return i + j;
484 }
485 
486 int flextcp_context_tx_alloc(struct flextcp_context *ctx,
487  struct flextcp_pl_atx **patx, uint16_t core)
488 {
489  /* if queue is full, abort */
490  if (ctx->queues[core].txq_avail == 0) {
491  return -1;
492  }
493 
494  *patx = (struct flextcp_pl_atx *)
495  (ctx->queues[core].txq_base + ctx->queues[core].txq_tail);
496  return 0;
497 }
498 
499 static void flextcp_flexnic_kick(struct flextcp_context *ctx, int core)
500 {
501  uint64_t now = util_rdtsc();
502 
503  if (flexnic_info->poll_cycle_tas == UINT64_MAX) {
504  /* blocking for TAS disabled */
505  return;
506  }
507 
508  if(now - ctx->queues[core].last_ts > flexnic_info->poll_cycle_tas) {
509  // Kick
510  uint64_t val = 1;
511  int r = write(flexnic_evfd[core], &val, sizeof(uint64_t));
512  assert(r == sizeof(uint64_t));
513  }
514 
515  ctx->queues[core].last_ts = now;
516 }
517 
518 void flextcp_context_tx_done(struct flextcp_context *ctx, uint16_t core)
519 {
520  ctx->queues[core].txq_tail += sizeof(struct flextcp_pl_atx);
521  if (ctx->queues[core].txq_tail >= ctx->txq_len) {
522  ctx->queues[core].txq_tail -= ctx->txq_len;
523  }
524 
525  ctx->queues[core].txq_avail -= sizeof(struct flextcp_pl_atx);
526 
527  flextcp_flexnic_kick(ctx, core);
528 }
529 
530 static inline int event_kappin_conn_opened(
531  struct kernel_appin_conn_opened *inev, struct flextcp_event *outev,
532  unsigned avail)
533 {
534  struct flextcp_connection *conn;
535  int j = 1;
536 
537  conn = OPAQUE_PTR(inev->opaque);
538 
539  outev->event_type = FLEXTCP_EV_CONN_OPEN;
540  outev->ev.conn_open.status = inev->status;
541  outev->ev.conn_open.conn = conn;
542 
543  if (inev->status != 0) {
544  conn->status = CONN_CLOSED;
545  return 1;
546  } else if (conn->rxb_used > 0 && conn->rx_closed && avail < 3) {
547  /* if we've already received updates, we'll need to inject them */
548  return -1;
549  } else if ((conn->rxb_used > 0 || conn->rx_closed) && avail < 2) {
550  /* if we've already received updates, we'll need to inject them */
551  return -1;
552  }
553 
554  conn->status = CONN_OPEN;
555  conn->local_ip = inev->local_ip;
556  conn->local_port = inev->local_port;
557  conn->seq_rx = inev->seq_rx;
558  conn->seq_tx = inev->seq_tx;
559  conn->flow_id = inev->flow_id;
560  conn->fn_core = inev->fn_core;
561 
562  conn->rxb_base = (uint8_t *) flexnic_mem + inev->rx_off;
563  conn->rxb_len = inev->rx_len;
564 
565  conn->txb_base = (uint8_t *) flexnic_mem + inev->tx_off;
566  conn->txb_len = inev->tx_len;
567 
568  /* inject bump if necessary */
569  if (conn->rxb_used > 0) {
570  conn->seq_rx += conn->rxb_used;
571 
572  outev[j].event_type = FLEXTCP_EV_CONN_RECEIVED;
573  outev[j].ev.conn_received.conn = conn;
574  outev[j].ev.conn_received.buf = conn->rxb_base;
575  outev[j].ev.conn_received.len = conn->rxb_used;
576  j++;
577  }
578 
579  /* add end of stream notification if necessary */
580  if (conn->rx_closed) {
581  outev[j].event_type = FLEXTCP_EV_CONN_RXCLOSED;
582  outev[j].ev.conn_rxclosed.conn = conn;
583  j++;
584  }
585 
586  return j;
587 }
588 
589 static inline void event_kappin_listen_newconn(
590  struct kernel_appin_listen_newconn *inev, struct flextcp_event *outev)
591 {
592  struct flextcp_listener *listener;
593 
594  listener = OPAQUE_PTR(inev->opaque);
595 
596  outev->event_type = FLEXTCP_EV_LISTEN_NEWCONN;
597  outev->ev.listen_newconn.remote_ip = inev->remote_ip;
598  outev->ev.listen_newconn.remote_port = inev->remote_port;
599  outev->ev.listen_open.listener = listener;
600 }
601 
602 static inline int event_kappin_accept_conn(
603  struct kernel_appin_accept_conn *inev, struct flextcp_event *outev,
604  unsigned avail)
605 {
606  struct flextcp_connection *conn;
607  int j = 1;
608 
609  conn = OPAQUE_PTR(inev->opaque);
610 
611  outev->event_type = FLEXTCP_EV_LISTEN_ACCEPT;
612  outev->ev.listen_accept.status = inev->status;
613  outev->ev.listen_accept.conn = conn;
614 
615  if (inev->status != 0) {
616  conn->status = CONN_CLOSED;
617  return 1;
618  } else if (conn->rxb_used > 0 && conn->rx_closed && avail < 3) {
619  /* if we've already received updates, we'll need to inject them */
620  return -1;
621  } else if ((conn->rxb_used > 0 || conn->rx_closed) && avail < 2) {
622  /* if we've already received updates, we'll need to inject them */
623  return -1;
624  }
625 
626  conn->status = CONN_OPEN;
627  conn->local_ip = inev->local_ip;
628  conn->remote_ip = inev->remote_ip;
629  conn->remote_port = inev->remote_port;
630  conn->seq_rx = inev->seq_rx;
631  conn->seq_tx = inev->seq_tx;
632  conn->flow_id = inev->flow_id;
633  conn->fn_core = inev->fn_core;
634 
635  conn->rxb_base = (uint8_t *) flexnic_mem + inev->rx_off;
636  conn->rxb_len = inev->rx_len;
637 
638  conn->txb_base = (uint8_t *) flexnic_mem + inev->tx_off;
639  conn->txb_len = inev->tx_len;
640 
641  /* inject bump if necessary */
642  if (conn->rxb_used > 0) {
643  conn->seq_rx += conn->rxb_used;
644 
645  outev[j].event_type = FLEXTCP_EV_CONN_RECEIVED;
646  outev[j].ev.conn_received.conn = conn;
647  outev[j].ev.conn_received.buf = conn->rxb_base;
648  outev[j].ev.conn_received.len = conn->rxb_used;
649  j++;
650  }
651 
652  /* add end of stream notification if necessary */
653  if (conn->rx_closed) {
654  outev[j].event_type = FLEXTCP_EV_CONN_RXCLOSED;
655  outev[j].ev.conn_rxclosed.conn = conn;
656  j++;
657  }
658 
659  return j;
660 }
661 
662 static inline void event_kappin_st_conn_move(
663  struct kernel_appin_status *inev, struct flextcp_event *outev)
664 {
665  struct flextcp_connection *conn;
666 
667  conn = OPAQUE_PTR(inev->opaque);
668 
669  outev->event_type = FLEXTCP_EV_CONN_MOVED;
670  outev->ev.conn_moved.status = inev->status;
671  outev->ev.conn_moved.conn = conn;
672 }
673 
674 static inline void event_kappin_st_listen_open(
675  struct kernel_appin_status *inev, struct flextcp_event *outev)
676 {
677  struct flextcp_listener *listener;
678 
679  listener = OPAQUE_PTR(inev->opaque);
680 
681  outev->event_type = FLEXTCP_EV_LISTEN_OPEN;
682  outev->ev.listen_open.status = inev->status;
683  outev->ev.listen_open.listener = listener;
684 }
685 
686 static inline void event_kappin_st_conn_closed(
687  struct kernel_appin_status *inev, struct flextcp_event *outev)
688 {
689  struct flextcp_connection *conn;
690 
691  conn = OPAQUE_PTR(inev->opaque);
692 
693  outev->event_type = FLEXTCP_EV_CONN_CLOSED;
694  outev->ev.conn_closed.status = inev->status;
695  outev->ev.conn_closed.conn = conn;
696 
697  conn->status = CONN_CLOSED;
698 }
699 
700 static inline int event_arx_connupdate(struct flextcp_context *ctx,
701  volatile struct flextcp_pl_arx_connupdate *inev,
702  struct flextcp_event *outevs, int outn, uint16_t fn_core)
703 {
704  struct flextcp_connection *conn;
705  uint32_t rx_bump, rx_len, tx_bump, tx_sent;
706  int i = 0, evs_needed, tx_avail_ev, eos;
707 
708  conn = OPAQUE_PTR(inev->opaque);
709 
710  conn->fn_core = fn_core;
711 
712  rx_bump = inev->rx_bump;
713  tx_bump = inev->tx_bump;
714  eos = ((inev->flags & FLEXTCP_PL_ARX_FLRXDONE) == FLEXTCP_PL_ARX_FLRXDONE);
715 
716  if (conn->status == CONN_OPEN_REQUESTED ||
717  conn->status == CONN_ACCEPT_REQUESTED)
718  {
719  /* due to a race we might see connection updates before we see the
720  * connection confirmation from the kernel */
721  assert(tx_bump == 0);
722  conn->rx_closed = !!eos;
723  conn->rxb_head += rx_bump;
724  conn->rxb_used += rx_bump;
725  /* TODO: should probably handle eos here as well */
726  return 0;
727  } else if (conn->status == CONN_CLOSED ||
728  conn->status == CONN_CLOSE_REQUESTED)
729  {
730  /* just drop bumps for closed connections */
731  return 0;
732  }
733 
734  assert(conn->status == CONN_OPEN);
735 
736  /* figure out how many events for rx */
737  evs_needed = 0;
738  if (rx_bump > 0) {
739  evs_needed++;
740  if (conn->rxb_head + rx_bump > conn->rxb_len) {
741  evs_needed++;
742  }
743  }
744 
745  /* if tx buffer was depleted, we'll generate a tx avail event */
746  tx_avail_ev = (tx_bump > 0 && flextcp_conn_txbuf_available(conn) == 0);
747  if (tx_avail_ev) {
748  evs_needed++;
749  }
750 
751  tx_sent = conn->txb_sent - tx_bump;
752 
753  /* if tx close was acked, also add that event */
754  if ((conn->flags & CONN_FLAG_TXEOS_ALLOC) == CONN_FLAG_TXEOS_ALLOC &&
755  !tx_sent)
756  {
757  evs_needed++;
758  }
759 
760  /* if receive stream closed need additional event */
761  if (eos) {
762  evs_needed++;
763  }
764 
765  /* if we can't fit all events, try again later */
766  if (evs_needed > outn) {
767  return -1;
768  }
769 
770  /* generate rx events */
771  if (rx_bump > 0) {
772  outevs[i].event_type = FLEXTCP_EV_CONN_RECEIVED;
773  outevs[i].ev.conn_received.conn = conn;
774  outevs[i].ev.conn_received.buf = conn->rxb_base + conn->rxb_head;
775  util_prefetch0(conn->rxb_base + conn->rxb_head);
776  if (conn->rxb_head + rx_bump > conn->rxb_len) {
777  /* wrap around in rx buffer */
778  rx_len = conn->rxb_len - conn->rxb_head;
779  outevs[i].ev.conn_received.len = rx_len;
780 
781  i++;
782  outevs[i].event_type = FLEXTCP_EV_CONN_RECEIVED;
783  outevs[i].ev.conn_received.conn = conn;
784  outevs[i].ev.conn_received.buf = conn->rxb_base;
785  outevs[i].ev.conn_received.len = rx_bump - rx_len;
786  } else {
787  outevs[i].ev.conn_received.len = rx_bump;
788  }
789  i++;
790 
791  /* update rx buffer */
792  conn->seq_rx += rx_bump;
793  conn->rxb_head += rx_bump;
794  if (conn->rxb_head >= conn->rxb_len) {
795  conn->rxb_head -= conn->rxb_len;
796  }
797  conn->rxb_used += rx_bump;
798  }
799 
800  /* bump tx */
801  if (tx_bump > 0) {
802  conn->txb_sent -= tx_bump;
803 
804  if (tx_avail_ev) {
805  outevs[i].event_type = FLEXTCP_EV_CONN_SENDBUF;
806  outevs[i].ev.conn_sendbuf.conn = conn;
807  i++;
808  }
809 
810  /* if we were previously unable to push out TX EOS, do so now. */
811  if ((conn->flags & CONN_FLAG_TXEOS) == CONN_FLAG_TXEOS &&
812  !(conn->flags & CONN_FLAG_TXEOS_ALLOC))
813  {
814  if (flextcp_conn_pushtxeos(ctx, conn) != 0) {
815  /* should never happen */
816  fprintf(stderr, "event_arx_connupdate: flextcp_conn_pushtxeos "
817  "failed\n");
818  abort();
819  }
820  } else if ((conn->flags & CONN_FLAG_TXEOS_ALLOC) == CONN_FLAG_TXEOS_ALLOC) {
821  /* There should be no data after we push out the EOS */
822  assert(!(conn->flags & CONN_FLAG_TXEOS_ACK));
823 
824  /* if this was the last bump, mark TX EOS as acked */
825  if (conn->txb_sent == 0) {
826  conn->flags |= CONN_FLAG_TXEOS_ACK;
827 
828  outevs[i].event_type = FLEXTCP_EV_CONN_TXCLOSED;
829  outevs[i].ev.conn_txclosed.conn = conn;
830  i++;
831  }
832  }
833  }
834 
835  /* add end of stream notification */
836  if (eos) {
837  outevs[i].event_type = FLEXTCP_EV_CONN_RXCLOSED;
838  outevs[i].ev.conn_rxclosed.conn = conn;
839  conn->rx_closed = 1;
840  i++;
841  }
842 
843  return i;
844 }
845 
846 
847 static void txq_probe(struct flextcp_context *ctx, unsigned n)
848 {
849  struct flextcp_pl_atx *atx;
850  uint32_t pos, i, q, tail, avail, len;
851 
852  len = ctx->txq_len;
853  for (q = 0; q < ctx->num_queues; q++) {
854  avail = ctx->queues[q].txq_avail;
855 
856  if (avail > len / 2)
857  continue;
858 
859  tail = ctx->queues[q].txq_tail;
860 
861  pos = tail + avail;
862  if (pos >= len)
863  pos -= len;
864 
865  i = 0;
866  while (avail < len && i < 2 * n) {
867  atx = (struct flextcp_pl_atx *) (ctx->queues[q].txq_base + pos);
868 
869  if (atx->type != 0) {
870  break;
871  }
872 
873  avail += sizeof(*atx);
874  pos += sizeof(*atx);
875  if (pos >= len)
876  pos -= len;
877  i++;
878 
879  MEM_BARRIER();
880  }
881 
882  ctx->queues[q].txq_avail = avail;
883  }
884 }
885 
886 static void conns_bump(struct flextcp_context *ctx)
887 {
888  struct flextcp_connection *c;
889  struct flextcp_pl_atx *atx;
890  uint8_t flags;
891 
892  while ((c = ctx->bump_pending_first) != NULL) {
893  assert(c->status == CONN_OPEN);
894 
895  if (flextcp_context_tx_alloc(ctx, &atx, c->fn_core) != 0) {
896  break;
897  }
898 
899  flags = 0;
900 
901  if ((c->flags & CONN_FLAG_TXEOS_ALLOC) == CONN_FLAG_TXEOS_ALLOC) {
902  flags |= FLEXTCP_PL_ATX_FLTXDONE;
903  }
904 
905  atx->msg.connupdate.rx_bump = c->rxb_bump;
906  atx->msg.connupdate.tx_bump = c->txb_bump;
907  atx->msg.connupdate.flow_id = c->flow_id;
908  atx->msg.connupdate.bump_seq = c->bump_seq++;
909  atx->msg.connupdate.flags = flags;
910  MEM_BARRIER();
911  atx->type = FLEXTCP_PL_ATX_CONNUPDATE;
912 
913  flextcp_context_tx_done(ctx, c->fn_core);
914 
915  c->rxb_bump = c->txb_bump = 0;
916  c->bump_pending = 0;
917 
918  if (c->bump_next == NULL) {
919  ctx->bump_pending_last = NULL;
920  }
921  ctx->bump_pending_first = c->bump_next;
922  }
923 }
924 
926 {
927  return ctx->evfd;
928 }
929 
931 {
932  /* At a high level this code implements a state machine that ensures that at
933  * least POLL_CYCLE time has elapsed between two unsuccessfull poll calls.
934  * This is a bit messier because we don't want to move any of the timestamp
935  * code into the poll call and make it more expensive for apps that don't
936  * block. Instead we use the timing of calls to canwait along with flags that
937  * the poll call sets when it's called and when it finds events.
938  */
939 
940  /* if blocking is disabled, we can never wait */
941  if (flexnic_info->poll_cycle_app == UINT64_MAX) {
942  return -1;
943  }
944 
945  /* if there were events found in the last poll, it's back to square one. */
946  if ((ctx->flags & CTX_FLAG_POLL_EVENTS) != 0) {
947  ctx->flags &= ~(CTX_FLAG_POLL_EVENTS | CTX_FLAG_WANTWAIT |
948  CTX_FLAG_LASTWAIT);
949 
950  return -1;
951  }
952 
953  /* from here on we know that there are no events */
954 
955  if ((ctx->flags & CTX_FLAG_WANTWAIT) != 0) {
956  /* in want wait state: just wait for grace period to be over */
957  if ((util_rdtsc() - ctx->last_inev_ts) > flexnic_info->poll_cycle_app) {
958  /* past grace period, move on to lastwait. clear polled flag, to make sure
959  * it gets polled again before we clear lastwait. */
960  ctx->flags &= ~(CTX_FLAG_POLL_CALLED | CTX_FLAG_WANTWAIT);
961  ctx->flags |= CTX_FLAG_LASTWAIT;
962  }
963  } else if ((ctx->flags & CTX_FLAG_LASTWAIT) != 0) {
964  /* in last wait state */
965  if ((ctx->flags & CTX_FLAG_POLL_CALLED) != 0) {
966  /* if we have polled once more after the grace period, we're good to go to
967  * sleep */
968  return 0;
969  }
970  } else if ((ctx->flags & CTX_FLAG_POLL_CALLED) != 0) {
971  /* not currently getting ready to wait, so start */
972  ctx->last_inev_ts = util_rdtsc();
973  ctx->flags |= CTX_FLAG_WANTWAIT;
974  }
975 
976  return -1;
977 }
978 
980 {
981  ssize_t ret;
982  uint64_t val;
983 
984  ret = read(ctx->evfd, &val, sizeof(uint64_t));
985  if ((ret >= 0 && ret != sizeof(uint64_t)) ||
986  (ret < 0 && errno != EAGAIN && errno != EWOULDBLOCK))
987  {
988  perror("flextcp_context_waitclear: read failed");
989  abort();
990  }
991 
992  ctx->flags &= ~(CTX_FLAG_WANTWAIT | CTX_FLAG_LASTWAIT | CTX_FLAG_POLL_CALLED);
993 }
994 
995 int flextcp_context_wait(struct flextcp_context *ctx, int timeout_ms)
996 {
997  struct pollfd pfd;
998  int ret;
999 
1000  if (flextcp_context_canwait(ctx) != 0) {
1001  return -1;
1002  }
1003 
1004  pfd.fd = ctx->evfd;
1005  pfd.events = POLLIN;
1006  pfd.revents = 0;
1007  ret = poll(&pfd, 1, timeout_ms);
1008  if (ret < 0) {
1009  perror("flextcp_context_wait: poll returned error");
1010  return -1;
1011  }
1012 
1014  return 0;
1015 }
int flextcp_context_wait(struct flextcp_context *ctx, int timeout_ms)
Definition: init.c:995
int flextcp_context_waitfd(struct flextcp_context *ctx)
Definition: init.c:925
struct flextcp_event::@5::@12 conn_rxclosed
uint64_t poll_cycle_app
Definition: tas_memif.h:66
uint32_t rxb_bump
Definition: tas_ll.h:108
int flextcp_context_poll(struct flextcp_context *ctx, int num, struct flextcp_event *events)
Definition: init.c:454
int flextcp_context_create(struct flextcp_context *ctx)
Definition: init.c:90
struct flextcp_event::@5::@8 listen_accept
int flextcp_init(void)
Definition: init.c:75
struct flextcp_event::@5::@9 conn_open
struct flextcp_event::@5::@15 conn_closed
struct flextcp_event::@5::@10 conn_received
struct flextcp_event::@5::@6 listen_open
uint32_t rxb_used
Definition: tas_ll.h:106
uint64_t poll_cycle_tas
Definition: tas_memif.h:68
Public low-level application interface for TAS.
struct flextcp_event::@5::@13 conn_txclosed
uint32_t rxb_head
Definition: tas_ll.h:104
uint32_t txb_sent
Definition: tas_ll.h:116
int flextcp_context_canwait(struct flextcp_context *ctx)
Definition: init.c:930
struct flextcp_event::@5::@7 listen_newconn
struct flextcp_event::@5::@14 conn_moved
struct flextcp_event::@5::@11 conn_sendbuf
void flextcp_context_waitclear(struct flextcp_context *ctx)
Definition: init.c:979
uint32_t txb_bump
Definition: tas_ll.h:120