30 #include <sys/eventfd.h> 35 #include <tas_ll_connect.h> 36 #include <kernel_appif.h> 38 #include <tas_memif.h> 39 #include <utils_timeout.h> 42 static inline int event_kappin_conn_opened(
45 static inline void event_kappin_listen_newconn(
47 static inline int event_kappin_accept_conn(
50 static inline void event_kappin_st_conn_move(
52 static inline void event_kappin_st_listen_open(
54 static inline void event_kappin_st_conn_closed(
62 struct flextcp_event *events,
int *used) __attribute__((noinline));
65 __attribute__((used,noinline));
67 struct flextcp_event *events,
int *used) __attribute__((used,noinline));
68 static void conns_bump(
struct flextcp_context *ctx) __attribute__((noinline));
69 static void txq_probe(
struct flextcp_context *ctx,
unsigned n) __attribute__((noinline));
71 void *flexnic_mem = NULL;
73 int flexnic_evfd[FLEXTCP_MAX_FTCPCORES];
77 if (flextcp_kernel_connect() != 0) {
78 fprintf(stderr,
"flextcp_init: connecting to kernel failed\n");
82 if (flexnic_driver_connect(&flexnic_info, &flexnic_mem) != 0) {
83 fprintf(stderr,
"flextcp_init: connecting to flexnic failed\n");
92 static uint16_t ctx_id = 0;
94 memset(ctx, 0,
sizeof(*ctx));
96 ctx->ctx_id = __sync_fetch_and_add(&ctx_id, 1);
97 if (ctx->ctx_id >= FLEXTCP_MAX_CONTEXTS) {
98 fprintf(stderr,
"flextcp_context_create: maximum number of contexts " 103 ctx->evfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
105 perror(
"flextcp_context_create: eventfd for waiting fd failed");
109 return flextcp_kernel_newctx(ctx);
114 int debug_flextcp_on = 0;
125 pos = ctx->kout_head;
126 for (i = 0; i < num;) {
133 if (type == KERNEL_APPIN_INVALID) {
135 }
else if (type == KERNEL_APPIN_CONN_OPENED) {
136 j = event_kappin_conn_opened(&kout->data.conn_opened, &events[i],
138 }
else if (type == KERNEL_APPIN_LISTEN_NEWCONN) {
139 event_kappin_listen_newconn(&kout->data.listen_newconn, &events[i]);
140 }
else if (type == KERNEL_APPIN_ACCEPTED_CONN) {
141 j = event_kappin_accept_conn(&kout->data.accept_connection, &events[i],
143 }
else if (type == KERNEL_APPIN_STATUS_LISTEN_OPEN) {
144 event_kappin_st_listen_open(&kout->data.status, &events[i]);
145 }
else if (type == KERNEL_APPIN_STATUS_CONN_MOVE) {
146 event_kappin_st_conn_move(&kout->data.status, &events[i]);
147 }
else if (type == KERNEL_APPIN_STATUS_CONN_CLOSE) {
148 event_kappin_st_conn_closed(&kout->data.status, &events[i]);
150 fprintf(stderr,
"flextcp_context_poll: unexpected kout type=%u pos=%u len=%u\n",
151 type, pos, ctx->kout_len);
154 ctx->flags |= CTX_FLAG_POLL_EVENTS;
163 kout->type = KERNEL_APPIN_INVALID;
166 if (pos >= ctx->kout_len) {
170 ctx->kout_head = pos;
173 return (j == -1 ? -1 : 0);
185 for (k = 0; k < ctx->num_queues && i < num; k++) {
189 ctx->queues[ctx->next_queue].rxq_base;
190 head = ctx->queues[ctx->next_queue].rxq_head;
193 arx = &arx_q[head /
sizeof(*arx)];
194 if (arx->type == FLEXTCP_PL_ARX_INVALID) {
196 }
else if (arx->type == FLEXTCP_PL_ARX_CONNUPDATE) {
197 j = event_arx_connupdate(ctx, &arx->msg.connupdate, events + i, num - i, ctx->next_queue);
199 fprintf(stderr,
"flextcp_context_poll: kout type=%u head=%x\n", arx->type, head);
201 ctx->flags |= CTX_FLAG_POLL_EVENTS;
212 head +=
sizeof(*arx);
213 if (head >= ctx->rxq_len) {
214 head -= ctx->rxq_len;
218 ctx->queues[ctx->next_queue].rxq_head = head;
224 ctx->next_queue = ctx->next_queue + 1;
225 if (ctx->next_queue >= ctx->num_queues)
226 ctx->next_queue -= ctx->num_queues;
233 static inline void fetch_8ts(
struct flextcp_context *ctx, uint32_t *heads,
234 uint16_t q, uint8_t *ts)
238 p0 = (
struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
239 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
240 p1 = (
struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
241 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
242 p2 = (
struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
243 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
244 p3 = (
struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
245 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
246 p4 = (
struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
247 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
248 p5 = (
struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
249 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
250 p6 = (
struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
251 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
252 p7 = (
struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
253 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
282 :
"r" (p0),
"r" (p1),
"r" (p2),
"r" (p3),
283 "r" (p4),
"r" (p5),
"r" (p6),
"r" (p7),
"r" (ts)
288 static inline void fetch_4ts(
struct flextcp_context *ctx, uint32_t *heads,
289 uint16_t q, uint8_t *ts)
293 p0 = (
struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
294 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
295 p1 = (
struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
296 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
297 p2 = (
struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
298 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
299 p3 = (
struct flextcp_pl_arx *) (ctx->queues[q].rxq_base + heads[q]);
300 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
316 :
"r" (p0),
"r" (p1),
"r" (p2),
"r" (p3),
"r" (ts)
324 int i, j, ran_out, found, found_inner;
329 uint8_t types[ctx->num_queues];
330 uint32_t qheads[ctx->num_queues];
335 for (q = 0; q < ctx->num_queues; q++) {
336 qheads[q] = ctx->queues[q].rxq_head;
342 while (i < num && !ran_out) {
344 for (found_inner = 1; found_inner && i + l < num; ) {
348 uint16_t qs = ctx->num_queues;
352 fetch_8ts(ctx, qheads, q, types + k);
354 q = (q + 8 < ctx->num_queues ? q + 8 : q + 8 - ctx->num_queues);
359 fetch_4ts(ctx, qheads, q, types + k);
361 q = (q + 4 < ctx->num_queues ? q + 4 : q + 4 - ctx->num_queues);
367 (ctx->queues[q].rxq_base + qheads[q]);
368 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
369 types[k] = arx->type;
375 for (k = 0, q = ctx->next_queue; k < ctx->num_queues && i + l < num; k++) {
376 if (types[k] == FLEXTCP_PL_ARX_CONNUPDATE) {
378 (ctx->queues[q].rxq_base + qheads[q]);
379 util_prefetch0(OPAQUE_PTR(arx->msg.connupdate.opaque) + 64);
380 util_prefetch0(OPAQUE_PTR(arx->msg.connupdate.opaque));
387 qheads[q] = qheads[q] +
sizeof(*arx);
388 if (qheads[q] >= ctx->rxq_len) {
389 qheads[q] -= ctx->rxq_len;
392 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
399 for (k = 0; k < l && i < num; k++) {
402 head = ctx->queues[q].rxq_head;
405 assert(t != FLEXTCP_PL_ARX_INVALID);
407 if (t == FLEXTCP_PL_ARX_CONNUPDATE) {
408 j = event_arx_connupdate(ctx, &arx->msg.connupdate, events + i,
412 fprintf(stderr,
"flextcp_context_poll: kout type=%u head=%x\n",
427 head +=
sizeof(*arx);
428 if (head >= ctx->rxq_len) {
429 head -= ctx->rxq_len;
431 ctx->queues[q].rxq_head = head;
433 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
439 for (k = 0, q = ctx->next_queue; k < ctx->num_queues; k++) {
441 ctx->queues[q].rxq_head);
443 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
446 ctx->flags |= CTX_FLAG_POLL_EVENTS;
461 ctx->flags |= CTX_FLAG_POLL_CALLED;
465 for (k = 0, q = ctx->next_queue; k < ctx->num_queues; k++) {
466 util_prefetch0((
struct flextcp_pl_arx *) (ctx->queues[q].rxq_base +
467 ctx->queues[q].rxq_head));
468 q = (q + 1 < ctx->num_queues ? q + 1 : 0);
472 if (kernel_poll(ctx, num, events, &i) == -1) {
478 fastpath_poll_vec(ctx, num - i, events + i, &j);
490 if (ctx->queues[core].txq_avail == 0) {
495 (ctx->queues[core].txq_base + ctx->queues[core].txq_tail);
499 static void flextcp_flexnic_kick(
struct flextcp_context *ctx,
int core)
501 uint64_t now = util_rdtsc();
508 if(now - ctx->queues[core].last_ts > flexnic_info->
poll_cycle_tas) {
511 int r = write(flexnic_evfd[core], &val,
sizeof(uint64_t));
512 assert(r ==
sizeof(uint64_t));
515 ctx->queues[core].last_ts = now;
518 void flextcp_context_tx_done(
struct flextcp_context *ctx, uint16_t core)
521 if (ctx->queues[core].txq_tail >= ctx->txq_len) {
522 ctx->queues[core].txq_tail -= ctx->txq_len;
527 flextcp_flexnic_kick(ctx, core);
530 static inline int event_kappin_conn_opened(
537 conn = OPAQUE_PTR(inev->opaque);
540 outev->ev.
conn_open.status = inev->status;
543 if (inev->status != 0) {
546 }
else if (conn->
rxb_used > 0 && conn->rx_closed && avail < 3) {
549 }
else if ((conn->
rxb_used > 0 || conn->rx_closed) && avail < 2) {
555 conn->local_ip = inev->local_ip;
556 conn->local_port = inev->local_port;
557 conn->seq_rx = inev->seq_rx;
558 conn->seq_tx = inev->seq_tx;
559 conn->flow_id = inev->flow_id;
560 conn->fn_core = inev->fn_core;
562 conn->rxb_base = (uint8_t *) flexnic_mem + inev->rx_off;
563 conn->rxb_len = inev->rx_len;
565 conn->txb_base = (uint8_t *) flexnic_mem + inev->tx_off;
566 conn->txb_len = inev->tx_len;
580 if (conn->rx_closed) {
589 static inline void event_kappin_listen_newconn(
594 listener = OPAQUE_PTR(inev->opaque);
602 static inline int event_kappin_accept_conn(
609 conn = OPAQUE_PTR(inev->opaque);
615 if (inev->status != 0) {
618 }
else if (conn->
rxb_used > 0 && conn->rx_closed && avail < 3) {
621 }
else if ((conn->
rxb_used > 0 || conn->rx_closed) && avail < 2) {
627 conn->local_ip = inev->local_ip;
628 conn->remote_ip = inev->remote_ip;
629 conn->remote_port = inev->remote_port;
630 conn->seq_rx = inev->seq_rx;
631 conn->seq_tx = inev->seq_tx;
632 conn->flow_id = inev->flow_id;
633 conn->fn_core = inev->fn_core;
635 conn->rxb_base = (uint8_t *) flexnic_mem + inev->rx_off;
636 conn->rxb_len = inev->rx_len;
638 conn->txb_base = (uint8_t *) flexnic_mem + inev->tx_off;
639 conn->txb_len = inev->tx_len;
653 if (conn->rx_closed) {
662 static inline void event_kappin_st_conn_move(
667 conn = OPAQUE_PTR(inev->opaque);
674 static inline void event_kappin_st_listen_open(
679 listener = OPAQUE_PTR(inev->opaque);
686 static inline void event_kappin_st_conn_closed(
691 conn = OPAQUE_PTR(inev->opaque);
705 uint32_t rx_bump, rx_len, tx_bump, tx_sent;
706 int i = 0, evs_needed, tx_avail_ev, eos;
708 conn = OPAQUE_PTR(inev->opaque);
710 conn->fn_core = fn_core;
712 rx_bump = inev->rx_bump;
713 tx_bump = inev->tx_bump;
714 eos = ((inev->flags & FLEXTCP_PL_ARX_FLRXDONE) == FLEXTCP_PL_ARX_FLRXDONE);
716 if (conn->status == CONN_OPEN_REQUESTED ||
717 conn->status == CONN_ACCEPT_REQUESTED)
721 assert(tx_bump == 0);
722 conn->rx_closed = !!eos;
728 conn->status == CONN_CLOSE_REQUESTED)
740 if (conn->
rxb_head + rx_bump > conn->rxb_len) {
746 tx_avail_ev = (tx_bump > 0 && flextcp_conn_txbuf_available(conn) == 0);
754 if ((conn->flags & CONN_FLAG_TXEOS_ALLOC) == CONN_FLAG_TXEOS_ALLOC &&
766 if (evs_needed > outn) {
775 util_prefetch0(conn->rxb_base + conn->
rxb_head);
776 if (conn->
rxb_head + rx_bump > conn->rxb_len) {
778 rx_len = conn->rxb_len - conn->
rxb_head;
792 conn->seq_rx += rx_bump;
794 if (conn->
rxb_head >= conn->rxb_len) {
811 if ((conn->flags & CONN_FLAG_TXEOS) == CONN_FLAG_TXEOS &&
812 !(conn->flags & CONN_FLAG_TXEOS_ALLOC))
814 if (flextcp_conn_pushtxeos(ctx, conn) != 0) {
816 fprintf(stderr,
"event_arx_connupdate: flextcp_conn_pushtxeos " 820 }
else if ((conn->flags & CONN_FLAG_TXEOS_ALLOC) == CONN_FLAG_TXEOS_ALLOC) {
822 assert(!(conn->flags & CONN_FLAG_TXEOS_ACK));
826 conn->flags |= CONN_FLAG_TXEOS_ACK;
850 uint32_t pos, i, q, tail, avail, len;
853 for (q = 0; q < ctx->num_queues; q++) {
854 avail = ctx->queues[q].txq_avail;
859 tail = ctx->queues[q].txq_tail;
866 while (avail < len && i < 2 * n) {
869 if (atx->type != 0) {
873 avail +=
sizeof(*atx);
882 ctx->queues[q].txq_avail = avail;
892 while ((c = ctx->bump_pending_first) != NULL) {
895 if (flextcp_context_tx_alloc(ctx, &atx, c->fn_core) != 0) {
901 if ((c->flags & CONN_FLAG_TXEOS_ALLOC) == CONN_FLAG_TXEOS_ALLOC) {
902 flags |= FLEXTCP_PL_ATX_FLTXDONE;
905 atx->msg.connupdate.rx_bump = c->
rxb_bump;
906 atx->msg.connupdate.tx_bump = c->
txb_bump;
907 atx->msg.connupdate.flow_id = c->flow_id;
908 atx->msg.connupdate.bump_seq = c->bump_seq++;
909 atx->msg.connupdate.flags = flags;
911 atx->type = FLEXTCP_PL_ATX_CONNUPDATE;
913 flextcp_context_tx_done(ctx, c->fn_core);
918 if (c->bump_next == NULL) {
919 ctx->bump_pending_last = NULL;
921 ctx->bump_pending_first = c->bump_next;
946 if ((ctx->flags & CTX_FLAG_POLL_EVENTS) != 0) {
947 ctx->flags &= ~(CTX_FLAG_POLL_EVENTS | CTX_FLAG_WANTWAIT |
955 if ((ctx->flags & CTX_FLAG_WANTWAIT) != 0) {
957 if ((util_rdtsc() - ctx->last_inev_ts) > flexnic_info->
poll_cycle_app) {
960 ctx->flags &= ~(CTX_FLAG_POLL_CALLED | CTX_FLAG_WANTWAIT);
961 ctx->flags |= CTX_FLAG_LASTWAIT;
963 }
else if ((ctx->flags & CTX_FLAG_LASTWAIT) != 0) {
965 if ((ctx->flags & CTX_FLAG_POLL_CALLED) != 0) {
970 }
else if ((ctx->flags & CTX_FLAG_POLL_CALLED) != 0) {
972 ctx->last_inev_ts = util_rdtsc();
973 ctx->flags |= CTX_FLAG_WANTWAIT;
984 ret = read(ctx->evfd, &val,
sizeof(uint64_t));
985 if ((ret >= 0 && ret !=
sizeof(uint64_t)) ||
986 (ret < 0 && errno != EAGAIN && errno != EWOULDBLOCK))
988 perror(
"flextcp_context_waitclear: read failed");
992 ctx->flags &= ~(CTX_FLAG_WANTWAIT | CTX_FLAG_LASTWAIT | CTX_FLAG_POLL_CALLED);
1005 pfd.events = POLLIN;
1007 ret = poll(&pfd, 1, timeout_ms);
1009 perror(
"flextcp_context_wait: poll returned error");
int flextcp_context_wait(struct flextcp_context *ctx, int timeout_ms)
int flextcp_context_waitfd(struct flextcp_context *ctx)
struct flextcp_event::@5::@12 conn_rxclosed
int flextcp_context_poll(struct flextcp_context *ctx, int num, struct flextcp_event *events)
int flextcp_context_create(struct flextcp_context *ctx)
struct flextcp_event::@5::@8 listen_accept
struct flextcp_event::@5::@9 conn_open
struct flextcp_event::@5::@15 conn_closed
struct flextcp_event::@5::@10 conn_received
struct flextcp_event::@5::@6 listen_open
Public low-level application interface for TAS.
struct flextcp_event::@5::@13 conn_txclosed
int flextcp_context_canwait(struct flextcp_context *ctx)
struct flextcp_event::@5::@7 listen_newconn
struct flextcp_event::@5::@14 conn_moved
struct flextcp_event::@5::@11 conn_sendbuf
void flextcp_context_waitclear(struct flextcp_context *ctx)