TAS
TCP Acceleration as an OS Service
conn.c
1 /*
2  * Copyright 2019 University of Washington, Max Planck Institute for
3  * Software Systems, and The University of Texas at Austin
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining
6  * a copy of this software and associated documentation files (the
7  * "Software"), to deal in the Software without restriction, including
8  * without limitation the rights to use, copy, modify, merge, publish,
9  * distribute, sublicense, and/or sell copies of the Software, and to
10  * permit persons to whom the Software is furnished to do so, subject to
11  * the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be
14  * included in all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
19  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
20  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
21  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
22  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23  */
24 
25 #include <assert.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 
30 #include <tas_ll.h>
31 #include <kernel_appif.h>
32 #include "internal.h"
33 
34 static void connection_init(struct flextcp_connection *conn);
35 
36 static inline void conn_mark_bump(struct flextcp_context *ctx,
37  struct flextcp_connection *conn);
38 static inline uint32_t conn_tx_allocbytes(struct flextcp_connection *conn);
39 static inline uint32_t conn_tx_sendbytes(struct flextcp_connection *conn);
40 
42  struct flextcp_listener *lst, uint16_t port, uint32_t backlog,
43  uint32_t flags)
44 {
45  uint32_t pos = ctx->kin_head;
46  struct kernel_appout *kin = ctx->kin_base;
47  uint32_t f = 0;
48 
49  memset(lst, 0, sizeof(*lst));
50 
51  if ((flags & ~(FLEXTCP_LISTEN_REUSEPORT)) != 0) {
52  fprintf(stderr, "flextcp_listen_open: unknown flags (%x)\n", flags);
53  return -1;
54  }
55 
56  if ((flags & FLEXTCP_LISTEN_REUSEPORT) == FLEXTCP_LISTEN_REUSEPORT) {
57  f |= KERNEL_APPOUT_LISTEN_REUSEPORT;
58  }
59 
60  kin += pos;
61 
62  if (kin->type != KERNEL_APPOUT_INVALID) {
63  fprintf(stderr, "flextcp_listen_open: no queue space\n");
64  return -1;
65  }
66 
67  lst->conns = NULL;
68  lst->local_port = port;
69  lst->status = 0;
70 
71  kin->data.listen_open.opaque = OPAQUE(lst);
72  kin->data.listen_open.local_port = port;
73  kin->data.listen_open.backlog = backlog;
74  kin->data.listen_open.flags = f;
75  MEM_BARRIER();
76  kin->type = KERNEL_APPOUT_LISTEN_OPEN;
77  flextcp_kernel_kick();
78 
79  pos = pos + 1;
80  if (pos >= ctx->kin_len) {
81  pos = 0;
82  }
83  ctx->kin_head = pos;
84 
85  return 0;
86 
87 }
88 
90  struct flextcp_listener *lst, struct flextcp_connection *conn)
91 {
92  uint32_t pos = ctx->kin_head;
93  struct kernel_appout *kin = ctx->kin_base;
94 
95  connection_init(conn);
96 
97  kin += pos;
98 
99  if (kin->type != KERNEL_APPOUT_INVALID) {
100  fprintf(stderr, "flextcp_listen_accept: no queue space\n");
101  return -1;
102  }
103 
104  conn->status = CONN_ACCEPT_REQUESTED;
105  conn->local_port = lst->local_port;
106 
107  kin->data.accept_conn.listen_opaque = OPAQUE(lst);
108  kin->data.accept_conn.conn_opaque = OPAQUE(conn);
109  kin->data.accept_conn.local_port = lst->local_port;
110  MEM_BARRIER();
111  kin->type = KERNEL_APPOUT_ACCEPT_CONN;
112  flextcp_kernel_kick();
113 
114  pos = pos + 1;
115  if (pos >= ctx->kin_len) {
116  pos = 0;
117  }
118  ctx->kin_head = pos;
119 
120  return 0;
121 }
122 
124  struct flextcp_connection *conn, uint32_t dst_ip, uint16_t dst_port)
125 {
126  uint32_t pos = ctx->kin_head, f = 0;
127  struct kernel_appout *kin = ctx->kin_base;
128 
129  connection_init(conn);
130 
131  kin += pos;
132 
133  if (kin->type != KERNEL_APPOUT_INVALID) {
134  fprintf(stderr, "flextcp_connection_open: no queue space\n");
135  return -1;
136  }
137 
138  conn->status = CONN_OPEN_REQUESTED;
139  conn->remote_ip = dst_ip;
140  conn->remote_port = dst_port;
141 
142  kin->data.conn_open.opaque = OPAQUE(conn);
143  kin->data.conn_open.remote_ip = dst_ip;
144  kin->data.conn_open.remote_port = dst_port;
145  kin->data.conn_open.flags = f;
146  MEM_BARRIER();
147  kin->type = KERNEL_APPOUT_CONN_OPEN;
148  flextcp_kernel_kick();
149 
150  pos = pos + 1;
151  if (pos >= ctx->kin_len) {
152  pos = 0;
153  }
154  ctx->kin_head = pos;
155 
156  return 0;
157 
158 
159 }
160 
162  struct flextcp_connection *conn)
163 {
164  uint32_t pos = ctx->kin_head, f = 0;
165  struct kernel_appout *kin = ctx->kin_base;
166  struct flextcp_connection *p_c;
167 
168  /* need to remove connection from bump queue */
169  if (conn->bump_pending != 0) {
170  if (conn == ctx->bump_pending_first) {
171  ctx->bump_pending_first = conn->bump_next;
172  } else {
173  for (p_c = ctx->bump_pending_first;
174  p_c != NULL && p_c->bump_next != conn;
175  p_c = p_c->bump_next);
176 
177  if (p_c == NULL) {
178  fprintf(stderr, "connection_close: didn't find connection in "
179  "bump list\n");
180  abort();
181  }
182 
183  p_c->bump_next = conn->bump_next;
184  if (p_c->bump_next == NULL) {
185  ctx->bump_pending_last = p_c;
186  }
187  }
188 
189  conn->bump_pending = 0;
190  }
191 
192  kin += pos;
193 
194  if (kin->type != KERNEL_APPOUT_INVALID) {
195  fprintf(stderr, "connection_close: no queue space\n");
196  return -1;
197  }
198 
199  /*if (reset)
200  f |= KERNEL_APPOUT_CLOSE_RESET;*/
201 
202  conn->status = CONN_CLOSE_REQUESTED;
203 
204  kin->data.conn_close.opaque = (uintptr_t) conn;
205  kin->data.conn_close.remote_ip = conn->remote_ip;
206  kin->data.conn_close.remote_port = conn->remote_port;
207  kin->data.conn_close.local_ip = conn->local_ip;
208  kin->data.conn_close.local_port = conn->local_port;
209  kin->data.conn_close.flags = f;
210  MEM_BARRIER();
211  kin->type = KERNEL_APPOUT_CONN_CLOSE;
212  flextcp_kernel_kick();
213 
214  pos = pos + 1;
215  if (pos >= ctx->kin_len) {
216  pos = 0;
217  }
218  ctx->kin_head = pos;
219 
220  return 0;
221 }
222 
224  struct flextcp_connection *conn, size_t len)
225 {
226  if (conn->rxb_used < len) {
227  return -1;
228  }
229 
230  conn->rxb_used -= len;
231 
232  /* Occasionally update the NIC on what we've already read. Force if buffer was
233  * previously completely full*/
234  conn->rxb_bump += len;
235  if(conn->rxb_bump > conn->rxb_len / 4) {
236  conn_mark_bump(ctx, conn);
237  }
238 
239  return 0;
240 }
241 
242 ssize_t flextcp_connection_tx_alloc(struct flextcp_connection *conn, size_t len,
243  void **buf)
244 {
245  uint32_t avail;
246  uint32_t head;
247 
248  /* if outgoing connection has already been closed, abort */
249  if ((conn->flags & CONN_FLAG_TXEOS) == CONN_FLAG_TXEOS)
250  return -1;
251 
252  /* truncate if necessary */
253  avail = conn_tx_allocbytes(conn);
254  if (avail < len) {
255  len = avail;
256  }
257 
258  /* calculate alloc head */
259  head = conn->txb_head + conn->txb_allocated;
260  if (head >= conn->txb_len) {
261  head -= conn->txb_len;
262  }
263 
264  /* short alloc if we wrap around */
265  if (head + len > conn->txb_len) {
266  len = conn->txb_len - head;
267  }
268 
269  *buf = conn->txb_base + head;
270 
271  /* bump head alloc counter */
272  conn->txb_allocated += len;
273 
274  return len;
275 }
276 
277 ssize_t flextcp_connection_tx_alloc2(struct flextcp_connection *conn, size_t len,
278  void **buf_1, size_t *len_1, void **buf_2)
279 {
280  uint32_t avail, head;
281 
282  /* if outgoing connection has already been closed, abort */
283  if ((conn->flags & CONN_FLAG_TXEOS) == CONN_FLAG_TXEOS)
284  return -1;
285 
286  /* truncate if necessary */
287  avail = conn_tx_allocbytes(conn);
288  if (avail < len) {
289  len = avail;
290  }
291 
292  /* calculate alloc head */
293  head = conn->txb_head + conn->txb_allocated;
294  if (head >= conn->txb_len) {
295  head -= conn->txb_len;
296  }
297 
298  *buf_1 = conn->txb_base + head;
299 
300  /* short alloc if we wrap around */
301  if (head + len > conn->txb_len) {
302  *len_1 = conn->txb_len - head;
303  *buf_2 = conn->txb_base;
304  } else {
305  *len_1 = len;
306  *buf_2 = NULL;
307  }
308 
309  /* bump head alloc counter */
310  conn->txb_allocated += len;
311  return len;
312 }
313 
315  struct flextcp_connection *conn, size_t len)
316 {
317  uint32_t next_head;
318 
319  if (conn_tx_sendbytes(conn) < len) {
320  return -1;
321  }
322 
323  conn->txb_allocated -= len;
324  conn->txb_sent += len;
325 
326  next_head = conn->txb_head + len;
327  if (next_head >= conn->txb_len) {
328  next_head -= conn->txb_len;
329  }
330  conn->txb_head = next_head;
331 
332  conn->txb_bump += len;
333  conn_mark_bump(ctx, conn);
334  return 0;
335 }
336 
338  struct flextcp_connection *conn)
339 {
340  /* if app hasn't sent all data yet, abort */
341  if (conn_tx_sendbytes(conn) > 0) {
342  fprintf(stderr, "flextcp_connection_tx_close: has unsent data\n");
343  return -1;
344  }
345 
346  /* if already closed, abort too */
347  if ((conn->flags & CONN_FLAG_TXEOS) == CONN_FLAG_TXEOS) {
348  fprintf(stderr, "flextcp_connection_tx_close: already closed\n");
349  return -1;
350  }
351 
352  conn->flags |= CONN_FLAG_TXEOS;
353 
354  /* try to push out to fastpath */
355  flextcp_conn_pushtxeos(ctx, conn);
356 
357  return 0;
358 }
359 
360 int flextcp_conn_pushtxeos(struct flextcp_context *ctx,
361  struct flextcp_connection *conn)
362 {
363  uint32_t head;
364  assert(conn_tx_sendbytes(conn) == 0);
365  assert((conn->flags & CONN_FLAG_TXEOS));
366 
367  /* if there is no tx buffer space we'll postpone until the next tx bump */
368  if (conn_tx_allocbytes(conn) == 0) {
369  return -1;
370  }
371 
372  conn->txb_sent++;
373  head = conn->txb_head + 1;
374  if (head >= conn->txb_len) {
375  head -= conn->txb_len;
376  }
377  conn->txb_head = head;
378 
379  conn->flags |= CONN_FLAG_TXEOS_ALLOC;
380 
381  conn->txb_bump++;
382  conn_mark_bump(ctx, conn);
383  return 0;
384 }
385 
387  struct flextcp_connection *conn)
388 {
389  return 0;
390 }
391 
392 uint32_t flextcp_conn_txbuf_available(struct flextcp_connection *conn)
393 {
394  return conn_tx_allocbytes(conn);
395 }
396 
398  struct flextcp_connection *conn)
399 {
400  uint32_t pos = ctx->kin_head;
401  struct kernel_appout *kin = ctx->kin_base;
402 
403  kin += pos;
404 
405  if (kin->type != KERNEL_APPOUT_INVALID) {
406  fprintf(stderr, "flextcp_connection_move: no queue space\n");
407  return -1;
408  }
409 
410  kin->data.conn_move.local_ip = conn->local_ip;
411  kin->data.conn_move.remote_ip = conn->remote_ip;
412  kin->data.conn_move.local_port = conn->local_port;
413  kin->data.conn_move.remote_port = conn->remote_port;
414  kin->data.conn_move.db_id = ctx->db_id;
415  kin->data.conn_move.opaque = OPAQUE(conn);
416  MEM_BARRIER();
417  kin->type = KERNEL_APPOUT_CONN_MOVE;
418  flextcp_kernel_kick();
419 
420  pos = pos + 1;
421  if (pos >= ctx->kin_len) {
422  pos = 0;
423  }
424  ctx->kin_head = pos;
425 
426  return 0;
427 }
428 
429 static void connection_init(struct flextcp_connection *conn)
430 {
431  memset(conn, 0, sizeof(*conn));
432  conn->status = CONN_CLOSED;
433 }
434 
435 static inline void conn_mark_bump(struct flextcp_context *ctx,
436  struct flextcp_connection *conn)
437 {
438  struct flextcp_connection *c_prev;
439 
440  if (conn->bump_pending) {
441  return;
442  }
443 
444  c_prev = ctx->bump_pending_last;
445  conn->bump_next = NULL;
446  conn->bump_prev = c_prev;
447  if (c_prev != NULL) {
448  c_prev->bump_next = conn;
449  } else {
450  ctx->bump_pending_first = conn;
451  }
452  ctx->bump_pending_last = conn;
453 
454  conn->bump_pending = 1;
455 }
456 
458 static inline uint32_t conn_tx_allocbytes(struct flextcp_connection *conn)
459 {
460  return conn->txb_len - conn->txb_sent - conn->txb_allocated;
461 }
462 
464 static inline uint32_t conn_tx_sendbytes(struct flextcp_connection *conn)
465 {
466  return conn->txb_allocated;
467 }
int flextcp_connection_tx_send(struct flextcp_context *ctx, struct flextcp_connection *conn, size_t len)
Definition: conn.c:314
uint32_t rxb_bump
Definition: tas_ll.h:108
int flextcp_listen_open(struct flextcp_context *ctx, struct flextcp_listener *lst, uint16_t port, uint32_t backlog, uint32_t flags)
Definition: conn.c:41
int flextcp_connection_rx_done(struct flextcp_context *ctx, struct flextcp_connection *conn, size_t len)
Definition: conn.c:223
int flextcp_connection_tx_close(struct flextcp_context *ctx, struct flextcp_connection *conn)
Definition: conn.c:337
uint32_t rxb_used
Definition: tas_ll.h:106
int flextcp_connection_close(struct flextcp_context *ctx, struct flextcp_connection *conn)
Definition: conn.c:161
Public low-level application interface for TAS.
int flextcp_connection_move(struct flextcp_context *ctx, struct flextcp_connection *conn)
Definition: conn.c:397
uint32_t txb_sent
Definition: tas_ll.h:116
ssize_t flextcp_connection_tx_alloc2(struct flextcp_connection *conn, size_t len, void **buf_1, size_t *len_1, void **buf_2)
Definition: conn.c:277
int flextcp_connection_tx_possible(struct flextcp_context *ctx, struct flextcp_connection *conn)
Definition: conn.c:386
uint32_t txb_allocated
Definition: tas_ll.h:118
int flextcp_connection_open(struct flextcp_context *ctx, struct flextcp_connection *conn, uint32_t dst_ip, uint16_t dst_port)
Definition: conn.c:123
ssize_t flextcp_connection_tx_alloc(struct flextcp_connection *conn, size_t len, void **buf)
Definition: conn.c:242
int flextcp_listen_accept(struct flextcp_context *ctx, struct flextcp_listener *lst, struct flextcp_connection *conn)
Definition: conn.c:89
uint32_t txb_bump
Definition: tas_ll.h:120
uint32_t txb_head
Definition: tas_ll.h:114