TAS
TCP Acceleration as an OS Service
tcp.c
1 /*
2  * Copyright 2019 University of Washington, Max Planck Institute for
3  * Software Systems, and The University of Texas at Austin
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining
6  * a copy of this software and associated documentation files (the
7  * "Software"), to deal in the Software without restriction, including
8  * without limitation the rights to use, copy, modify, merge, publish,
9  * distribute, sublicense, and/or sell copies of the Software, and to
10  * permit persons to whom the Software is furnished to do so, subject to
11  * the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be
14  * included in all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
19  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
20  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
21  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
22  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23  */
24 
25 #include <assert.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <unistd.h>
30 #include <inttypes.h>
31 #include <rte_config.h>
32 #include <rte_ip.h>
33 #include <rte_hash_crc.h>
34 
35 #include <tas.h>
36 #include <packet_defs.h>
37 #include <utils.h>
38 #include <utils_rng.h>
39 #include "internal.h"
40 
41 #define TCP_MSS 1460
42 #define TCP_HTSIZE 4096
43 
44 #define PORT_MAX ((1u << 16) - 1)
45 #define PORT_FIRST_EPH 8192
46 
47 #define PORT_TYPE_UNUSED 0x0ULL
48 #define PORT_TYPE_LISTEN 0x1ULL
49 #define PORT_TYPE_LMULTI 0x2ULL
50 #define PORT_TYPE_CONN 0x3ULL
51 #define PORT_TYPE_MASK 0x3ULL
52 
53 /* maximum number of listening sockets per port */
54 #define LISTEN_MULTI_MAX 32
55 
56 #define CONN_DEBUG(c, f, x...) do { } while (0)
57 #define CONN_DEBUG0(c, f) do { } while (0)
58 /*#define CONN_DEBUG(c, f, x...) fprintf(stderr, "conn(%p): " f, c, x)
59 #define CONN_DEBUG0(c, f, x...) fprintf(stderr, "conn(%p): " f, c)*/
60 
61 struct listen_multi {
62  size_t num;
63  struct listener *ls[LISTEN_MULTI_MAX];
64 };
65 
66 struct backlog_slot {
67  uint8_t buf[126];
68  uint16_t len;
69 };
70 
71 struct tcp_opts {
72  struct tcp_mss_opt *mss;
73  struct tcp_timestamp_opt *ts;
74 };
75 
76 static int conn_arp_done(struct connection *conn);
77 static void conn_packet(struct connection *c, const struct pkt_tcp *p,
78  const struct tcp_opts *opts, uint32_t fn_core, uint16_t flow_group);
79 static inline struct connection *conn_alloc(void);
80 static inline void conn_free(struct connection *conn);
81 static void conn_register(struct connection *conn);
82 static void conn_unregister(struct connection *conn);
83 static struct connection *conn_lookup(const struct pkt_tcp *p);
84 static int conn_syn_sent_packet(struct connection *c, const struct pkt_tcp *p,
85  const struct tcp_opts *opts);
86 static int conn_reg_synack(struct connection *c);
87 static void conn_failed(struct connection *c, int status);
88 static void conn_timeout_arm(struct connection *c, int type);
89 static void conn_timeout_disarm(struct connection *c);
90 static void conn_close_timeout(struct connection *c);
91 
92 static struct listener *listener_lookup(const struct pkt_tcp *p);
93 static void listener_packet(struct listener *l, const struct pkt_tcp *p,
94  const struct tcp_opts *opts, uint32_t fn_core, uint16_t flow_group);
95 static void listener_accept(struct listener *l);
96 
97 static inline uint16_t port_alloc(void);
98 static inline int send_control(const struct connection *conn, uint16_t flags,
99  int ts_opt, uint32_t ts_echo, uint16_t mss_opt);
100 static inline int send_reset(const struct pkt_tcp *p,
101  const struct tcp_opts *opts);
102 static inline int parse_options(const struct pkt_tcp *p, uint16_t len,
103  struct tcp_opts *opts);
104 
105 static uintptr_t ports[PORT_MAX + 1];
106 static uint16_t port_eph_hint = PORT_FIRST_EPH;
107 static struct nbqueue conn_async_q;
108 struct connection **tcp_hashtable = NULL;
109 static struct utils_rng rng;
110 
111 int tcp_init(void)
112 {
113  nbqueue_init(&conn_async_q);
114  utils_rng_init(&rng, util_timeout_time_us());
115 
116  port_eph_hint = utils_rng_gen32(&rng) % ((1 << 16) - 1 - PORT_FIRST_EPH);
117  if ((tcp_hashtable = calloc(TCP_HTSIZE, sizeof(*tcp_hashtable))) == NULL) {
118  return -1;
119  }
120  return 0;
121 }
122 
123 void tcp_poll(void)
124 {
125  struct connection *conn;
126  uint8_t *p;
127  int ret;
128 
129  while ((p = nbqueue_deq(&conn_async_q)) != NULL) {
130  conn = (struct connection *) (p - offsetof(struct connection, comp.el));
131  if (conn->status == CONN_ARP_PENDING) {
132  if ((ret = conn->comp.status) != 0 || (ret = conn_arp_done(conn)) != 0) {
133  conn_failed(conn, ret);
134  }
135  } else if (conn->status == CONN_REG_SYNACK) {
136  if ((ret = conn->comp.status) != 0 ||
137  (ret = conn_reg_synack(conn)) != 0)
138  {
139  conn_failed(conn, ret);
140  }
141  } else {
142  fprintf(stderr, "tcp_poll: unexpected conn state %u\n", conn->status);
143  }
144  }
145 }
146 
147 int tcp_open(struct app_context *ctx, uint64_t opaque, uint32_t remote_ip,
148  uint16_t remote_port, uint32_t db_id, struct connection **pconn)
149 {
150  int ret;
151  struct connection *conn;
152  uint16_t local_port;
153 
154  /* allocate connection struct */
155  if ((conn = conn_alloc()) == NULL) {
156  fprintf(stderr, "tcp_open: malloc failed\n");
157  return -1;
158  }
159 
160  CONN_DEBUG(conn, "opening connection (ctx=%p, op=%"PRIx64", rip=%x, rp=%u, "
161  "db=%u)\n", ctx, opaque, remote_ip, remote_port, db_id);
162 
163  /* allocate local port */
164  if ((local_port = port_alloc()) == 0) {
165  fprintf(stderr, "tcp_open: port_alloc failed\n");
166  conn_free(conn);
167  return -1;
168  }
169 
170  conn->ctx = ctx;
171  conn->opaque = opaque;
172  conn->status = CONN_ARP_PENDING;
173  conn->remote_ip = remote_ip;
174  conn->local_ip = config.ip;
175  conn->remote_port = remote_port;
176  conn->local_port = local_port;
177  conn->local_seq = 0; /* TODO: assign random */
178  conn->remote_seq = 0;
179  conn->cnt_tx_pending = 0;
180  conn->db_id = db_id;
181  conn->flags = 0;
182 
183  conn->comp.q = &conn_async_q;
184  conn->comp.notify_fd = -1;
185  conn->comp.status = 0;
186 
187 
188  /* resolve IP to mac */
189  ret = routing_resolve(&conn->comp, remote_ip, &conn->remote_mac);
190  if (ret < 0) {
191  fprintf(stderr, "tcp_open: nicif_arp failed\n");
192  conn_free(conn);
193  return -1;
194  } else if (ret == 0) {
195  CONN_DEBUG0(conn, "routing_resolve succeeded immediately\n");
196  conn_register(conn);
197 
198  ret = conn_arp_done(conn);
199  } else {
200  CONN_DEBUG0(conn, "routing_resolve pending\n");
201  conn_register(conn);
202  ret = 0;
203  }
204 
205  ports[local_port] = (uintptr_t) conn | PORT_TYPE_CONN;
206 
207  *pconn = conn;
208  return ret;
209 }
210 
211 int tcp_listen(struct app_context *ctx, uint64_t opaque, uint16_t local_port,
212  uint32_t backlog, int reuseport, struct listener **listen)
213 {
214  struct listener *lst;
215  uint32_t i;
216  struct backlog_slot *bls;
217  struct listen_multi *lm = NULL, *lm_new = NULL;
218  uint8_t type;
219 
220  /* make sure port is unused */
221  type = ports[local_port] & PORT_TYPE_MASK;
222  if (type != PORT_TYPE_UNUSED && reuseport == 0) {
223  fprintf(stderr, "tcp_listen: port not unused\n");
224  return -1;
225  } else if (reuseport != 0 && type != PORT_TYPE_UNUSED &&
226  type != PORT_TYPE_LMULTI)
227  {
228  fprintf(stderr, "tcp_listen: port not unused or multi listener\n");
229  return -1;
230  }
231 
232  /* allocate listen_multi if required */
233  if (reuseport != 0 && type == PORT_TYPE_UNUSED) {
234  if ((lm_new = calloc(1, sizeof(*lm_new))) == NULL) {
235  fprintf(stderr, "tcp_listen: calloc listen_multi failed\n");
236  return -1;
237  }
238  lm = lm_new;
239  } else if (reuseport != 0) {
240  lm = (struct listen_multi *) (ports[local_port] & ~PORT_TYPE_MASK);
241  if (lm->num == LISTEN_MULTI_MAX) {
242  fprintf(stderr, "tcp_listen: no more additional listeners supported\n");
243  return -1;
244  }
245  }
246 
247  /* allocate listener struct */
248  if ((lst = calloc(1, sizeof(*lst))) == NULL) {
249  fprintf(stderr, "tcp_listen: malloc failed\n");
250  free(lm_new);
251  return -1;
252  }
253 
254  /* allocate backlog queue */
255  if ((lst->backlog_ptrs = calloc(backlog, sizeof(void *))) == NULL) {
256  fprintf(stderr, "tcp_listen: malloc backlog_ptrss failed\n");
257  free(lst);
258  free(lm_new);
259  return -1;
260  }
261  if ((lst->backlog_cores = calloc(backlog, sizeof(*lst->backlog_cores))) ==
262  NULL)
263  {
264  fprintf(stderr, "tcp_listen: malloc backlog_cores failed\n");
265  free(lst->backlog_ptrs);
266  free(lst);
267  free(lm_new);
268  return -1;
269  }
270  if ((lst->backlog_fgs = calloc(backlog, sizeof(*lst->backlog_fgs))) ==
271  NULL)
272  {
273  fprintf(stderr, "tcp_listen: malloc backlog_fgs failed\n");
274  free(lst->backlog_cores);
275  free(lst->backlog_ptrs);
276  free(lst);
277  free(lm_new);
278  return -1;
279  }
280 
281  /* allocate backlog buffers */
282  if ((bls = malloc(sizeof(*bls) * backlog)) == NULL) {
283  fprintf(stderr, "tcp_listen: malloc backlog bufs failed\n");
284  free(lst->backlog_fgs);
285  free(lst->backlog_cores);
286  free(lst->backlog_ptrs);
287  free(lst);
288  free(lm_new);
289  return -1;
290  }
291  for (i = 0; i < backlog; i++) {
292  lst->backlog_ptrs[i] = &bls[i];
293  }
294 
295  /* initialize listener */
296  lst->ctx = ctx;
297  lst->opaque = opaque;
298  lst->port = local_port;
299  lst->wait_conns = NULL;
300  lst->backlog_len = backlog;
301  lst->backlog_pos = 0;
302  lst->backlog_used = 0;
303  lst->flags = 0;
304 
305  /* add to port tables */
306  if (reuseport == 0) {
307  ports[local_port] = (uintptr_t) lst | PORT_TYPE_LISTEN;
308  } else {
309  lm->ls[lm->num] = lst;
310  lm->num++;
311  if (lm_new != NULL) {
312  lm = lm_new;
313  ports[local_port] = (uintptr_t) lm | PORT_TYPE_LMULTI;
314  }
315  }
316 
317  *listen = lst;
318 
319  return 0;
320 }
321 
322 int tcp_accept(struct app_context *ctx, uint64_t opaque,
323  struct listener *listen, uint32_t db_id)
324 {
325  struct connection *conn;
326 
327  /* allocate listener struct */
328  if ((conn = conn_alloc()) == NULL) {
329  fprintf(stderr, "tcp_accept: conn_alloc failed\n");
330  return -1;
331  }
332 
333  conn->ctx = ctx;
334  conn->opaque = opaque;
335  conn->status = CONN_SYN_WAIT;
336  conn->local_port = listen->port;
337  conn->db_id = db_id;
338  conn->flags = listen->flags;
339  conn->cnt_tx_pending = 0;
340 
341  conn->ht_next = listen->wait_conns;
342  listen->wait_conns = conn;
343 
344  if (listen->backlog_used > 0) {
345  listener_accept(listen);
346  }
347  return 0;
348 }
349 
350 int tcp_packet(const void *pkt, uint16_t len, uint32_t fn_core,
351  uint16_t flow_group)
352 {
353  struct connection *c;
354  struct listener *l;
355  const struct pkt_tcp *p = pkt;
356  struct tcp_opts opts;
357  int ret = 0;
358 
359  if (len < sizeof(*p)) {
360  fprintf(stderr, "tcp_packet: incomplete TCP receive (%u received, "
361  "%u expected)\n", len, (unsigned) sizeof(*p));
362  return -1;
363  }
364 
365  if (f_beui32(p->ip.dest) != config.ip) {
366  fprintf(stderr, "tcp_packet: unexpected destination IP (%x received, "
367  "%x expected)\n", f_beui32(p->ip.dest), config.ip);
368  return -1;
369  }
370 
371  if (parse_options(p, len, &opts) != 0) {
372  fprintf(stderr, "tcp_packet: parsing TCP options failed\n");
373  return -1;
374  }
375 
376  if ((c = conn_lookup(p)) != NULL) {
377  conn_packet(c, p, &opts, fn_core, flow_group);
378  } else if ((l = listener_lookup(p)) != NULL) {
379  listener_packet(l, p, &opts, fn_core, flow_group);
380  } else {
381  ret = -1;
382 
383  /* send reset if the packet received wasn't a reset */
384  if (!(TCPH_FLAGS(&p->tcp) & TCP_RST) &&
385  config.kni_name == NULL)
386  send_reset(p, &opts);
387  }
388 
389  return ret;
390 }
391 
392 int tcp_close(struct connection *conn)
393 {
394  uint32_t tx_seq, rx_seq;
395  int tx_c, rx_c;
396 
397  if (conn->status != CONN_OPEN) {
398  fprintf(stderr, "tcp_close: currently no support for non-opened conns.\n");
399  return -1;
400  }
401 
402  /* disable connection on fastpath */
403  if (nicif_connection_disable(conn->flow_id, &tx_seq, &rx_seq, &tx_c, &rx_c)
404  != 0)
405  {
406  fprintf(stderr, "tcp_close: nicif_connection_disable failed unexpected\n");
407  return -1;
408  }
409 
410  conn->remote_seq = rx_seq;
411  conn->local_seq = tx_seq;
412 
413  if (!tx_c || !rx_c) {
414  send_control(conn, TCP_RST, 0, 0, 0);
415  }
416 
417  cc_conn_remove(conn);
418 
419  conn->status = CONN_CLOSED;
420 
421  /* set timer to free connection state */
422  assert(conn->to_armed == 0);
423  util_timeout_arm(&timeout_mgr, &conn->to, 10000, TO_TCP_CLOSED);
424  conn->to_armed = 1;
425  return 0;
426 }
427 
428 void tcp_destroy(struct connection *conn)
429 {
430  assert(conn->status == CONN_FAILED);
431  conn_free(conn);
432 }
433 
434 void tcp_timeout(struct timeout *to, enum timeout_type type)
435 {
436  struct connection *c = (struct connection *)
437  ((uintptr_t) to - offsetof(struct connection, to));
438 
439  assert(c->to_armed);
440  c->to_armed = 0;
441 
442  /* validate type and connection state */
443  if (type == TO_TCP_CLOSED) {
444  conn_close_timeout(c);
445  return;
446  } else if (type != TO_TCP_HANDSHAKE) {
447  fprintf(stderr, "tcp_timeout: unexpected timeout type (%u)\n", type);
448  abort();
449  }
450  if (c->status != CONN_SYN_SENT) {
451  fprintf(stderr, "tcp_timeout: unexpected connection state (%u)\n", c->status);
452  abort();
453  }
454 
455  /* close connection if too many retries */
456  if (++c->to_attempts > config.tcp_handshake_retries) {
457  fprintf(stderr, "tcp_timeout: giving up because of too many retries\n");
458  conn_failed(c, -1);
459  return;
460  }
461 
462  /* re-arm timeout */
463  c->timeout *= 2;
464  conn_timeout_arm(c, TO_TCP_HANDSHAKE);
465 
466  /* re-send SYN packet */
467  send_control(c, TCP_SYN | TCP_ECE | TCP_CWR, 1, 0, TCP_MSS);
468 }
469 
470 static void conn_packet(struct connection *c, const struct pkt_tcp *p,
471  const struct tcp_opts *opts, uint32_t fn_core, uint16_t flow_group)
472 {
473  int ret;
474  uint32_t ecn_flags = 0;
475 
476  if (c->status == CONN_SYN_SENT) {
477  /* hopefully a SYN-ACK received */
478  c->fn_core = fn_core;
479  c->flow_group = flow_group;
480  if ((ret = conn_syn_sent_packet(c, p, opts)) != 0) {
481  conn_failed(c, ret);
482  }
483  } else if (c->status == CONN_OPEN &&
484  (TCPH_FLAGS(&p->tcp) & ~ecn_flags) == TCP_SYN)
485  {
486  /* handle re-transmitted SYN for dropped SYN-ACK */
487  /* TODO: should only do this if we're still waiting for initial ACK,
488  * otherwise we should send a challenge ACK */
489  if (opts->ts == NULL) {
490  fprintf(stderr, "conn_packet: re-transmitted SYN does not have TS "
491  "option\n");
492  conn_failed(c, -1);
493  return;
494  }
495 
496  /* send ECN accepting SYN-ACK */
497  if ((c->flags & NICIF_CONN_ECN) == NICIF_CONN_ECN) {
498  ecn_flags = TCP_ECE;
499  }
500 
501  send_control(c, TCP_SYN | TCP_ACK | ecn_flags, 1,
502  f_beui32(opts->ts->ts_val), TCP_MSS);
503  } else if (c->status == CONN_OPEN &&
504  (TCPH_FLAGS(&p->tcp) & TCP_SYN) == TCP_SYN)
505  {
506  /* silently ignore a re-transmited SYN_ACK */
507  } else if (c->status == CONN_CLOSED &&
508  (TCPH_FLAGS(&p->tcp) & TCP_FIN) == TCP_FIN)
509  {
510  /* silently ignore a FIN for an already closed connection: TODO figure out
511  * why necessary*/
512  send_control(c, TCP_ACK, 1, 0, 0);
513  } else {
514  fprintf(stderr, "tcp_packet: unexpected connection state %u\n", c->status);
515  }
516 }
517 
518 static int conn_arp_done(struct connection *conn)
519 {
520  CONN_DEBUG0(conn, "arp resolution done\n");
521 
522  conn->status = CONN_SYN_SENT;
523 
524  /* arm timeout */
525  conn->to_attempts = 0;
526  conn->timeout = config.tcp_handshake_to;
527  conn_timeout_arm(conn, TO_TCP_HANDSHAKE);
528 
529  /* send SYN */
530  send_control(conn, TCP_SYN | TCP_ECE | TCP_CWR, 1, 0, TCP_MSS);
531 
532  CONN_DEBUG0(conn, "SYN SENT\n");
533  return 0;
534 }
535 
536 static int conn_syn_sent_packet(struct connection *c, const struct pkt_tcp *p,
537  const struct tcp_opts *opts)
538 {
539  uint32_t ecn_flags = TCPH_FLAGS(&p->tcp) & (TCP_ECE | TCP_CWR);
540 
541  /* dis-arm timeout */
542  conn_timeout_disarm(c);
543 
544  if ((TCPH_FLAGS(&p->tcp) & (TCP_SYN | TCP_ACK)) != (TCP_SYN | TCP_ACK)) {
545  fprintf(stderr, "conn_syn_sent_packet: unexpected flags %x\n",
546  TCPH_FLAGS(&p->tcp));
547  return -1;
548  }
549  if (opts->ts == NULL) {
550  fprintf(stderr, "conn_syn_sent_packet: no timestamp option received\n");
551  return -1;
552  }
553 
554  CONN_DEBUG0(c, "conn_syn_sent_packet: syn-ack received\n");
555 
556  c->remote_seq = f_beui32(p->tcp.seqno) + 1;
557  c->local_seq = f_beui32(p->tcp.ackno);
558  c->syn_ts = f_beui32(opts->ts->ts_val);
559 
560  /* enable ECN if SYN-ACK confirms */
561  if (ecn_flags == TCP_ECE) {
562  c->flags |= NICIF_CONN_ECN;
563  }
564 
565  cc_conn_init(c);
566 
567  c->comp.q = &conn_async_q;
568  c->comp.notify_fd = -1;
569  c->comp.status = 0;
570 
572  c->remote_ip, c->remote_port, c->rx_buf - (uint8_t *) tas_shm,
573  c->rx_len, c->tx_buf - (uint8_t *) tas_shm, c->tx_len,
574  c->remote_seq, c->local_seq, c->opaque, c->flags, c->cc_rate,
575  c->fn_core, c->flow_group, &c->flow_id)
576  != 0)
577  {
578  fprintf(stderr, "conn_syn_sent_packet: nicif_connection_add failed\n");
579  return -1;
580  }
581 
582  CONN_DEBUG0(c, "conn_syn_sent_packet: connection registered\n");
583 
584  c->status = CONN_OPEN;
585 
586  /* send ACK */
587  send_control(c, TCP_ACK, 1, c->syn_ts, 0);
588 
589  CONN_DEBUG0(c, "conn_syn_sent_packet: ACK sent\n");
590 
591  appif_conn_opened(c, 0);
592 
593  return 0;
594 }
595 
596 static int conn_reg_synack(struct connection *c)
597 {
598  uint32_t ecn_flags = 0;
599 
600  c->status = CONN_OPEN;
601 
602  if ((c->flags & NICIF_CONN_ECN) == NICIF_CONN_ECN) {
603  ecn_flags = TCP_ECE;
604  }
605 
606  /* send ACK */
607  send_control(c, TCP_SYN | TCP_ACK | ecn_flags, 1, c->syn_ts, TCP_MSS);
608 
609  appif_accept_conn(c, 0);
610 
611  return 0;
612 }
613 
614 static inline uint16_t port_alloc(void)
615 {
616  uint16_t p, p_start, p_next;
617 
618  p = p_start = port_eph_hint;
619  do {
620  p_next = (((uint16_t) (p + 1)) < (uint16_t) PORT_FIRST_EPH ?
621  PORT_FIRST_EPH : p + 1);
622 
623  if ((ports[p] & PORT_TYPE_MASK) == PORT_TYPE_UNUSED) {
624  port_eph_hint = p_next;
625  return p;
626  }
627 
628  p = p_next;
629  } while (p != p_start);
630 
631  return 0;
632 }
633 
634 static inline struct connection *conn_alloc(void)
635 {
636  struct connection *conn;
637  uintptr_t off_rx, off_tx;
638 
639  if ((conn = malloc(sizeof(*conn))) == NULL) {
640  fprintf(stderr, "conn_alloc: malloc failed\n");
641  return NULL;
642  }
643 
644  if (packetmem_alloc(config.tcp_rxbuf_len, &off_rx, &conn->rx_handle) != 0) {
645  fprintf(stderr, "conn_alloc: packetmem_alloc rx failed\n");
646  free(conn);
647  return NULL;
648  }
649 
650  if (packetmem_alloc(config.tcp_txbuf_len, &off_tx, &conn->tx_handle) != 0) {
651  fprintf(stderr, "conn_alloc: packetmem_alloc tx failed\n");
652  packetmem_free(conn->rx_handle);
653  free(conn);
654  return NULL;
655  }
656 
657  conn->rx_buf = (uint8_t *) tas_shm + off_rx;
658  conn->rx_len = config.tcp_rxbuf_len;
659  conn->tx_buf = (uint8_t *) tas_shm + off_tx;
660  conn->tx_len = config.tcp_txbuf_len;
661  conn->to_armed = 0;
662 
663  return conn;
664 }
665 
666 static inline void conn_free(struct connection *conn)
667 {
668  packetmem_free(conn->tx_handle);
669  packetmem_free(conn->rx_handle);
670  free(conn);
671 }
672 
673 static inline uint32_t conn_hash(uint32_t l_ip, uint32_t r_ip, uint16_t l_po,
674  uint16_t r_po)
675 {
676  return crc32c_sse42_u32(l_po | (((uint32_t) r_po) << 16),
677  crc32c_sse42_u64(l_ip | (((uint64_t) r_ip) << 32), 0));
678 }
679 
680 static void conn_register(struct connection *conn)
681 {
682  uint32_t h;
683 
684  h = conn_hash(conn->local_ip, conn->remote_ip, conn->local_port,
685  conn->remote_port) % TCP_HTSIZE;
686 
687  conn->ht_next = tcp_hashtable[h];
688  tcp_hashtable[h] = conn;
689 }
690 
691 static void conn_unregister(struct connection *conn)
692 {
693  struct connection *cp = NULL;
694  uint32_t h;
695 
696  h = conn_hash(conn->local_ip, conn->remote_ip, conn->local_port,
697  conn->remote_port) % TCP_HTSIZE;
698  if (tcp_hashtable[h] == conn) {
699  tcp_hashtable[h] = conn->ht_next;
700  } else {
701  for (cp = tcp_hashtable[h]; cp != NULL && cp->ht_next != conn;
702  cp = cp->ht_next);
703  if (cp == NULL) {
704  fprintf(stderr, "conn_unregister: connection not found in ht\n");
705  abort();
706  }
707 
708  cp->ht_next = conn->ht_next;
709  }
710 }
711 
712 static struct connection *conn_lookup(const struct pkt_tcp *p)
713 {
714  uint32_t h;
715  struct connection *c;
716 
717  h = conn_hash(f_beui32(p->ip.dest), f_beui32(p->ip.src),
718  f_beui16(p->tcp.dest), f_beui16(p->tcp.src)) % TCP_HTSIZE;
719 
720  for (c = tcp_hashtable[h]; c != NULL; c = c->ht_next) {
721  if (f_beui32(p->ip.src) == c->remote_ip &&
722  f_beui16(p->tcp.dest) == c->local_port &&
723  f_beui16(p->tcp.src) == c->remote_port)
724  {
725  return c;
726  }
727  }
728  return NULL;
729 }
730 
731 static void conn_failed(struct connection *c, int status)
732 {
733  conn_unregister(c);
734  if (c->to_armed) {
735  conn_timeout_disarm(c);
736  }
737 
738  c->status = CONN_FAILED;
739 
740  appif_conn_opened(c, status);
741 }
742 
743 static void conn_timeout_arm(struct connection *c, int type)
744 {
745  uint32_t to;
746 
747  assert(!c->to_armed);
748  c->to_armed = 1;
749 
750  /* randomize timeout +/- 50% to avoid thundering herds */
751  to = c->timeout / 2 + (utils_rng_gen32(&rng) % c->timeout);
752  util_timeout_arm(&timeout_mgr, &c->to, to, TO_TCP_HANDSHAKE);
753 }
754 
755 static void conn_timeout_disarm(struct connection *c)
756 {
757  assert(c->to_armed);
758  c->to_armed = 0;
759 
760  util_timeout_disarm(&timeout_mgr, &c->to);
761 }
762 
763 static void conn_close_timeout(struct connection *c)
764 {
765  /* free port */
766  if ((ports[c->local_port] & PORT_TYPE_MASK) == PORT_TYPE_CONN) {
767  ports[c->local_port] = 0;
768  }
769 
770  /* remove from global connection list */
771  conn_unregister(c);
772 
773  /* free connection data buffers */
776 
777  /* free connection id */
779 
780  /* free ephemeral port */
781  if ((ports[c->local_port] & PORT_TYPE_MASK) == PORT_TYPE_CONN) {
782  ports[c->local_port] = PORT_TYPE_UNUSED;
783  }
784 
785  /* notify application */
786  appif_conn_closed(c, 0);
787 
788  free(c);
789 }
790 
792 static inline uint32_t hash_64_to_32(uint64_t key)
793 {
794  key = (~key) + (key << 18);
795  key = key ^ (key >> 31);
796  key = key * 21;
797  key = key ^ (key >> 11);
798  key = key + (key << 6);
799  key = key ^ (key >> 22);
800  return (uint32_t) key;
801 }
802 
803 static struct listener *listener_lookup(const struct pkt_tcp *p)
804 {
805  uint16_t local_port = f_beui16(p->tcp.dest);
806  uint32_t hash;
807  uint8_t type;
808  struct listen_multi *lm;
809 
810  type = ports[local_port] & PORT_TYPE_MASK;
811  if (type == PORT_TYPE_LISTEN) {
812  /* single listener socket */
813  return (struct listener *) (ports[local_port] & ~PORT_TYPE_MASK);
814  } else if (type == PORT_TYPE_LMULTI) {
815  /* multiple listener sockets, calculate hash */
816  lm = (struct listen_multi *) (ports[local_port] & ~PORT_TYPE_MASK);
817  hash = hash_64_to_32(((uint64_t) f_beui32(p->ip.src) << 32) |
818  ((uint32_t) f_beui16(p->tcp.src) << 16) | local_port);
819  return lm->ls[hash % lm->num];
820  } else {
821  return NULL;
822  }
823 
824  return (struct listener *) (ports[local_port] & ~PORT_TYPE_MASK);
825 }
826 
827 static void listener_packet(struct listener *l, const struct pkt_tcp *p,
828  const struct tcp_opts *opts, uint32_t fn_core, uint16_t flow_group)
829 {
830  struct backlog_slot *bls;
831  uint16_t len;
832  uint32_t bp, n;
833  struct pkt_tcp *bl_p;
834 
835  if ((TCPH_FLAGS(&p->tcp) & ~(TCP_ECE | TCP_CWR)) != TCP_SYN) {
836  fprintf(stderr, "listener_packet: Not a SYN (flags %x)\n",
837  TCPH_FLAGS(&p->tcp));
838  send_reset(p, opts);
839  return;
840  }
841 
842  /* make sure packet is not too long */
843  len = sizeof(p->eth) + f_beui16(p->ip.len);
844  if (len > sizeof(bls->buf)) {
845  fprintf(stderr, "listener_packet: SYN larger than backlog buffer, "
846  "dropping\n");
847  return;
848  }
849 
850  /* make sure we don't already have this 4-tuple */
851  for (n = 0, bp = l->backlog_pos; n < l->backlog_used;
852  n++, bp = (bp + 1) % l->backlog_len)
853  {
854  bls = l->backlog_ptrs[bp];
855  bl_p = (struct pkt_tcp *) bls->buf;
856  if (f_beui32(p->ip.src) == f_beui32(bl_p->ip.src) &&
857  f_beui32(p->ip.dest) == f_beui32(bl_p->ip.dest) &&
858  f_beui16(p->tcp.src) == f_beui16(bl_p->tcp.src) &&
859  f_beui16(p->tcp.dest) == f_beui16(bl_p->tcp.dest))
860  {
861  return;
862  }
863  }
864 
865  if (l->backlog_len == l->backlog_used) {
866  fprintf(stderr, "listener_packet: backlog queue full\n");
867  return;
868  }
869 
870 
871  bp = l->backlog_pos + l->backlog_used;
872  if (bp >= l->backlog_len) {
873  bp -= l->backlog_len;
874  }
875 
876  /* copy packet into backlog buffer */
877  l->backlog_cores[bp] = fn_core;
878  l->backlog_fgs[bp] = flow_group;
879  bls = l->backlog_ptrs[bp];
880  memcpy(bls->buf, p, len);
881  bls->len = len;
882 
883  l->backlog_used++;
884 
885  appif_listen_newconn(l, f_beui32(p->ip.src), f_beui16(p->tcp.src));
886 
887  /* check if there are pending accepts */
888  if (l->wait_conns != NULL) {
889  listener_accept(l);
890  }
891 }
892 
893 static void listener_accept(struct listener *l)
894 {
895  struct connection *c = l->wait_conns;
896  struct backlog_slot *bls;
897  const struct pkt_tcp *p;
898  struct tcp_opts opts;
899  uint32_t ecn_flags, fn_core;
900  uint16_t flow_group;
901  int ret = 0;
902 
903  assert(c != NULL);
904  assert(l->backlog_used > 0);
905 
906  bls = l->backlog_ptrs[l->backlog_pos];
907  fn_core = l->backlog_cores[l->backlog_pos];
908  flow_group = l->backlog_fgs[l->backlog_pos];
909  p = (const struct pkt_tcp *) bls->buf;
910  ret = parse_options(p, bls->len, &opts);
911  if (ret != 0 || opts.ts == NULL) {
912  fprintf(stderr, "listener_packet: parsing options failed or no timestamp "
913  "option\n");
914  goto out;
915  }
916 
917  c->fn_core = fn_core;
918  c->flow_group = flow_group;
919  c->remote_mac = 0;
920  memcpy(&c->remote_mac, &p->eth.src, ETH_ADDR_LEN);
921  c->remote_ip = f_beui32(p->ip.src);
922  c->local_ip = config.ip;
923  c->remote_port = f_beui16(p->tcp.src);
924  c->local_port = l->port;
925 
926  c->remote_seq = f_beui32(p->tcp.seqno) + 1;
927  c->local_seq = 1; /* TODO: generate random */
928  c->syn_ts = f_beui32(opts.ts->ts_val);
929 
930  /* check if ECN is offered */
931  ecn_flags = TCPH_FLAGS(&p->tcp) & (TCP_ECE | TCP_CWR);
932  if (ecn_flags == (TCP_ECE | TCP_CWR)) {
933  c->flags |= NICIF_CONN_ECN;
934  }
935 
936  cc_conn_init(c);
937 
938  c->status = CONN_REG_SYNACK;
939 
940  c->comp.q = &conn_async_q;
941  c->comp.notify_fd = -1;
942  c->comp.status = 0;
943 
945  c->remote_ip, c->remote_port, c->rx_buf - (uint8_t *) tas_shm,
946  c->rx_len, c->tx_buf - (uint8_t *) tas_shm, c->tx_len,
947  c->remote_seq, c->local_seq + 1, c->opaque, c->flags, c->cc_rate,
948  c->fn_core, c->flow_group, &c->flow_id)
949  != 0)
950  {
951  fprintf(stderr, "listener_packet: nicif_connection_add failed\n");
952  goto out;
953  }
954 
955  l->wait_conns = c->ht_next;
956  conn_register(c);
957  nbqueue_enq(&conn_async_q, &c->comp.el);
958 
959 out:
960  l->backlog_used--;
961  l->backlog_pos++;
962  if (l->backlog_pos >= l->backlog_len) {
963  l->backlog_pos -= l->backlog_len;
964  }
965 }
966 
967 static inline int send_control_raw(uint64_t remote_mac, uint32_t remote_ip,
968  uint16_t remote_port, uint16_t local_port, uint32_t local_seq,
969  uint32_t remote_seq, uint16_t flags, int ts_opt, uint32_t ts_echo,
970  uint16_t mss_opt)
971 {
972  uint32_t new_tail;
973  struct pkt_tcp *p;
974  struct tcp_mss_opt *opt_mss;
975  struct tcp_timestamp_opt *opt_ts;
976  uint8_t optlen;
977  uint16_t len, off_ts, off_mss;
978 
979  /* calculate header length depending on options */
980  optlen = 0;
981  off_mss = optlen;
982  optlen += (mss_opt ? sizeof(*opt_mss) : 0);
983  off_ts = optlen;
984  optlen += (ts_opt ? sizeof(*opt_ts) : 0);
985  optlen = (optlen + 3) & ~3;
986  len = sizeof(*p) + optlen;
987 
989  if (nicif_tx_alloc(len, (void **) &p, &new_tail) != 0) {
990  fprintf(stderr, "send_control failed\n");
991  return -1;
992  }
993 
994  /* fill ethernet header */
995  memcpy(&p->eth.dest, &remote_mac, ETH_ADDR_LEN);
996  memcpy(&p->eth.src, &eth_addr, ETH_ADDR_LEN);
997  p->eth.type = t_beui16(ETH_TYPE_IP);
998 
999  /* fill ipv4 header */
1000  IPH_VHL_SET(&p->ip, 4, 5);
1001  p->ip._tos = 0;
1002  p->ip.len = t_beui16(len - offsetof(struct pkt_tcp, ip));
1003  p->ip.id = t_beui16(3); /* TODO: not sure why we have 3 here */
1004  p->ip.offset = t_beui16(0);
1005  p->ip.ttl = 0xff;
1006  p->ip.proto = IP_PROTO_TCP;
1007  p->ip.chksum = 0;
1008  p->ip.src = t_beui32(config.ip);
1009  p->ip.dest = t_beui32(remote_ip);
1010 
1011  /* fill tcp header */
1012  p->tcp.src = t_beui16(local_port);
1013  p->tcp.dest = t_beui16(remote_port);
1014  p->tcp.seqno = t_beui32(local_seq);
1015  p->tcp.ackno = t_beui32(remote_seq);
1016  TCPH_HDRLEN_FLAGS_SET(&p->tcp, 5 + optlen / 4, flags);
1017  p->tcp.wnd = t_beui16(11680); /* TODO */
1018  p->tcp.chksum = 0;
1019  p->tcp.urgp = t_beui16(0);
1020 
1021  /* if requested: add mss option */
1022  if (mss_opt) {
1023  opt_mss = (struct tcp_mss_opt *) ((uint8_t *) (p + 1) + off_mss);
1024  opt_mss->kind = TCP_OPT_MSS;
1025  opt_mss->length = sizeof(*opt_mss);
1026  opt_mss->mss = t_beui16(mss_opt);
1027  }
1028 
1029  /* if requested: add timestamp option */
1030  if (ts_opt) {
1031  opt_ts = (struct tcp_timestamp_opt *) ((uint8_t *) (p + 1) + off_ts);
1032  memset(opt_ts, 0, optlen);
1033  opt_ts->kind = TCP_OPT_TIMESTAMP;
1034  opt_ts->length = sizeof(*opt_ts);
1035  opt_ts->ts_val = t_beui32(0);
1036  opt_ts->ts_ecr = t_beui32(ts_echo);
1037  }
1038 
1039  /* calculate header checksums */
1040  p->ip.chksum = rte_ipv4_cksum((void *) &p->ip);
1041  p->tcp.chksum = rte_ipv4_udptcp_cksum((void *) &p->ip, (void *) &p->tcp);
1042 
1043  /* send packet */
1044  nicif_tx_send(new_tail, 0);
1045  return 0;
1046 }
1047 
1048 static inline int send_control(const struct connection *conn, uint16_t flags,
1049  int ts_opt, uint32_t ts_echo, uint16_t mss_opt)
1050 {
1051  return send_control_raw(conn->remote_mac, conn->remote_ip, conn->remote_port,
1052  conn->local_port, conn->local_seq, conn->remote_seq, flags, ts_opt,
1053  ts_echo, mss_opt);
1054 }
1055 
1056 static inline int send_reset(const struct pkt_tcp *p,
1057  const struct tcp_opts *opts)
1058 {
1059  int ts_opt = 0;
1060  uint32_t ts_val;
1061  uint64_t remote_mac = 0;
1062 
1063  if (opts->ts != NULL) {
1064  ts_opt = 1;
1065  ts_val = f_beui32(opts->ts->ts_val);
1066  }
1067 
1068  memcpy(&remote_mac, &p->eth.src, ETH_ADDR_LEN);
1069  return send_control_raw(remote_mac, f_beui32(p->ip.src), f_beui16(p->tcp.src),
1070  f_beui16(p->tcp.dest), f_beui32(p->tcp.ackno), f_beui32(p->tcp.seqno) + 1,
1071  TCP_RST | TCP_ACK, ts_opt, ts_val, 0);
1072 }
1073 
1074 static inline int parse_options(const struct pkt_tcp *p, uint16_t len,
1075  struct tcp_opts *opts)
1076 {
1077  uint8_t *opt = (uint8_t *) (p + 1);
1078  uint16_t opts_len = TCPH_HDRLEN(&p->tcp) * 4 - 20;
1079  uint16_t off = 0;
1080  uint8_t opt_kind, opt_len, opt_avail;
1081 
1082  opts->ts = NULL;
1083  opts->mss = NULL;
1084 
1085  /* whole header not in buf */
1086  if (TCPH_HDRLEN(&p->tcp) < 5 || opts_len > (len - sizeof(*p))) {
1087  fprintf(stderr, "hdrlen=%u opts_len=%u len=%u so=%zu\n", TCPH_HDRLEN(&p->tcp), opts_len, len, sizeof(*p));
1088  return -1;
1089  }
1090 
1091  while (off < opts_len) {
1092  opt_kind = opt[off];
1093  opt_avail = opts_len - off;
1094  if (opt_kind == TCP_OPT_END_OF_OPTIONS) {
1095  break;
1096  } else if (opt_kind == TCP_OPT_NO_OP) {
1097  opt_len = 1;
1098  } else {
1099  if (opt_avail < 2) {
1100  fprintf(stderr, "parse_options: opt_avail=%u kind=%u off=%u\n", opt_avail, opt_kind, off);
1101  return -1;
1102  }
1103 
1104  opt_len = opt[off + 1];
1105  if (opt_kind == TCP_OPT_MSS) {
1106  if (opt_len != sizeof(struct tcp_mss_opt)) {
1107  fprintf(stderr, "parse_options: mss option size wrong (expect %zu "
1108  "got %u)\n", sizeof(struct tcp_mss_opt), opt_len);
1109  return -1;
1110  }
1111 
1112  opts->mss = (struct tcp_mss_opt *) (opt + off);
1113  } else if (opt_kind == TCP_OPT_TIMESTAMP) {
1114  if (opt_len != sizeof(struct tcp_timestamp_opt)) {
1115  fprintf(stderr, "parse_options: opt_len=%u so=%zu\n", opt_len, sizeof(struct tcp_timestamp_opt));
1116  return -1;
1117  }
1118 
1119  opts->ts = (struct tcp_timestamp_opt *) (opt + off);
1120  }
1121  }
1122  off += opt_len;
1123  }
1124 
1125  return 0;
1126 }
uint32_t flags
Definition: internal.h:542
void ** backlog_ptrs
Definition: internal.h:574
int tcp_init(void)
Definition: tcp.c:111
void tcp_destroy(struct connection *conn)
Definition: tcp.c:428
int to_armed
Definition: internal.h:492
struct packetmem_handle * rx_handle
Definition: internal.h:438
uint32_t db_id
Definition: internal.h:430
void appif_conn_closed(struct connection *c, int status)
Definition: appif_ctx.c:100
int to_attempts
Definition: internal.h:490
uint64_t remote_mac
Definition: internal.h:456
uint32_t tcp_handshake_to
Definition: config.h:63
uint16_t local_port
Definition: internal.h:464
struct nicif_completion comp
Definition: internal.h:536
uint32_t fn_core
Definition: internal.h:540
void tcp_poll(void)
Definition: tcp.c:123
int nicif_connection_add(uint32_t db, uint64_t mac_remote, uint32_t ip_local, uint16_t port_local, uint32_t ip_remote, uint16_t port_remote, uint64_t rx_base, uint32_t rx_len, uint64_t tx_base, uint32_t tx_len, uint32_t remote_seq, uint32_t local_seq, uint64_t app_opaque, uint32_t flags, uint32_t rate, uint32_t fn_core, uint16_t flow_group, uint32_t *pf_id)
Definition: nicif.c:175
uint16_t remote_port
Definition: internal.h:462
void nicif_connection_free(uint32_t f_id)
Definition: nicif.c:272
int tcp_packet(const void *pkt, uint16_t len, uint32_t fn_core, uint16_t flow_group)
Definition: tcp.c:350
uint32_t local_ip
Definition: internal.h:460
uint32_t flags
Definition: internal.h:586
struct app_context * ctx
Definition: internal.h:424
uint32_t flow_id
Definition: internal.h:538
void tcp_timeout(struct timeout *to, enum timeout_type type)
Definition: tcp.c:434
int tcp_listen(struct app_context *ctx, uint64_t opaque, uint16_t local_port, uint32_t backlog, int reuseport, struct listener **listen)
Definition: tcp.c:211
uint32_t util_timeout_time_us(void)
Definition: timeout.c:72
enum connection_status status
Definition: internal.h:472
uint32_t remote_ip
Definition: internal.h:458
struct tcp_timestamp_opt * ts
Definition: tcp_common.h:230
void appif_listen_newconn(struct listener *l, uint32_t remote_ip, uint16_t remote_port)
Definition: appif_ctx.c:143
uint32_t local_seq
Definition: internal.h:476
uint8_t * tx_buf
Definition: internal.h:444
int tcp_accept(struct app_context *ctx, uint64_t opaque, struct listener *listen, uint32_t db_id)
Definition: tcp.c:322
uint32_t rx_len
Definition: internal.h:446
int packetmem_alloc(size_t length, uintptr_t *off, struct packetmem_handle **handle)
Definition: packetmem.c:61
uint32_t backlog_pos
Definition: internal.h:570
uint16_t flow_group
Definition: internal.h:544
struct app_context * ctx
Definition: internal.h:556
struct connection * ht_next
Definition: internal.h:534
struct timeout to
Definition: internal.h:488
int tcp_open(struct app_context *ctx, uint64_t opaque, uint32_t remote_ip, uint16_t remote_port, uint32_t db_id, struct connection **pconn)
Definition: tcp.c:147
char * kni_name
Definition: config.h:131
uint64_t opaque
Definition: internal.h:422
void util_timeout_disarm(struct timeout_manager *mgr, struct timeout *to)
Definition: timeout.c:155
uint32_t backlog_len
Definition: internal.h:568
uint32_t backlog_used
Definition: internal.h:572
uint64_t opaque
Definition: internal.h:554
void packetmem_free(struct packetmem_handle *handle)
Definition: packetmem.c:113
uint16_t port
Definition: internal.h:584
uint32_t cnt_tx_pending
Definition: internal.h:526
int tcp_close(struct connection *conn)
Definition: tcp.c:392
void cc_conn_init(struct connection *conn)
Definition: cc.c:175
int nicif_tx_alloc(uint16_t len, void **buf, uint32_t *opaque)
Definition: nicif.c:352
void appif_conn_opened(struct connection *c, int status)
Definition: appif_ctx.c:57
int nicif_connection_disable(uint32_t f_id, uint32_t *tx_seq, uint32_t *rx_seq, int *tx_closed, int *rx_closed)
Definition: nicif.c:250
uint32_t timeout
Definition: internal.h:486
uint32_t tx_len
Definition: internal.h:448
struct packetmem_handle * tx_handle
Definition: internal.h:440
void appif_accept_conn(struct connection *c, int status)
Definition: appif_ctx.c:173
void cc_conn_remove(struct connection *conn)
Definition: cc.c:209
uint32_t ip
Definition: config.h:67
uint32_t remote_seq
Definition: internal.h:474
uint32_t syn_ts
Definition: internal.h:478
uint64_t tcp_txbuf_len
Definition: config.h:57
uint32_t tcp_handshake_retries
Definition: config.h:65
struct connection * wait_conns
Definition: internal.h:582
uint32_t * backlog_cores
Definition: internal.h:576
uint32_t cc_rate
Definition: internal.h:513
void nicif_tx_send(uint32_t opaque, int no_ts)
Definition: nicif.c:368
uint64_t tcp_rxbuf_len
Definition: config.h:55
uint8_t * rx_buf
Definition: internal.h:442
void util_timeout_arm(struct timeout_manager *mgr, struct timeout *to, uint32_t us, uint8_t type)
Definition: timeout.c:110
uint16_t * backlog_fgs
Definition: internal.h:578
int routing_resolve(struct nicif_completion *comp, uint32_t ip, uint64_t *mac)
Definition: routing.c:89