TAS
TCP Acceleration as an OS Service
context.c
1 /*
2  * Copyright 2019 University of Washington, Max Planck Institute for
3  * Software Systems, and The University of Texas at Austin
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining
6  * a copy of this software and associated documentation files (the
7  * "Software"), to deal in the Software without restriction, including
8  * without limitation the rights to use, copy, modify, merge, publish,
9  * distribute, sublicense, and/or sell copies of the Software, and to
10  * permit persons to whom the Software is furnished to do so, subject to
11  * the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be
14  * included in all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
19  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
20  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
21  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
22  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23  */
24 
25 #include <assert.h>
26 #include <stdlib.h>
27 #include <stdio.h>
28 #include <stddef.h>
29 #include <string.h>
30 #include <pthread.h>
31 
32 #include <utils.h>
33 #include <tas_sockets.h>
34 #include <tas_ll.h>
35 
36 #include "internal.h"
37 #include "../tas/internal.h"
38 
39 static inline void ev_listen_open(struct flextcp_context *ctx,
40  struct flextcp_event *ev);
41 static inline void ev_listen_newconn(struct flextcp_context *ctx,
42  struct flextcp_event *ev);
43 static inline void ev_listen_accept(struct flextcp_context *ctx,
44  struct flextcp_event *ev);
45 static inline void ev_conn_open(struct flextcp_context *ctx,
46  struct flextcp_event *ev);
47 static inline void ev_conn_received(struct flextcp_context *ctx,
48  struct flextcp_event *ev);
49 static inline void ev_conn_sendbuf(struct flextcp_context *ctx,
50  struct flextcp_event *ev);
51 static inline void ev_conn_moved(struct flextcp_context *ctx,
52  struct flextcp_event *ev);
53 static inline void ev_conn_rxclosed(struct flextcp_context *ctx,
54  struct flextcp_event *ev);
55 static inline void ev_conn_txclosed(struct flextcp_context *ctx,
56  struct flextcp_event *ev);
57 static inline void ev_conn_closed(struct flextcp_context *ctx,
58  struct flextcp_event *ev);
59 
60 static __thread struct sockets_context *local_context;
61 static pthread_mutex_t context_init_mutex = PTHREAD_MUTEX_INITIALIZER;
62 
63 struct sockets_context *flextcp_sockctx_getfull(void)
64 {
65  struct sockets_context *ctx = local_context;
66  int ret;
67 
68  if (ctx == NULL) {
69  if ((ctx = calloc(1, sizeof(*ctx))) == NULL) {
70  fprintf(stderr, "flextcp socket flextcp_sockctx_get: calloc failed\n");
71  abort();
72  }
73 
74  pthread_mutex_lock(&context_init_mutex);
75  ret = flextcp_context_create(&ctx->ctx);
76  pthread_mutex_unlock(&context_init_mutex);
77  if (ret != 0) {
78  fprintf(stderr, "flextcp socket flextcp_sockctx_get: flextcp_context_create "
79  "failed\n");
80  abort();
81  }
82 
83  local_context = ctx;
84  }
85 
86  return ctx;
87 
88 }
89 
90 struct flextcp_context *flextcp_sockctx_get(void)
91 {
92  return &flextcp_sockctx_getfull()->ctx;
93 }
94 
95 int flextcp_sockctx_poll(struct flextcp_context *ctx)
96 {
97  struct flextcp_event evs[16];
98  int i, num;
99 
100  if ((num = flextcp_context_poll(ctx, sizeof(evs) / sizeof(evs[0]), evs)) < 0) {
101  fprintf(stderr, "sockets poll_ctx: flextcp_context_poll failed\n");
102  abort();
103  }
104 
105  for (i = 0; i < num; i++) {
106  switch (evs[i].event_type) {
108  ev_listen_open(ctx, &evs[i]);
109  break;
110 
112  ev_listen_newconn(ctx, &evs[i]);
113  break;
114 
116  ev_listen_accept(ctx, &evs[i]);
117  break;
118 
120  ev_conn_open(ctx, &evs[i]);
121  break;
122 
124  ev_conn_received(ctx, &evs[i]);
125  break;
126 
128  ev_conn_sendbuf(ctx, &evs[i]);
129  break;
130 
132  ev_conn_moved(ctx, &evs[i]);
133  break;
134 
136  ev_conn_rxclosed(ctx, &evs[i]);
137  break;
138 
140  ev_conn_txclosed(ctx, &evs[i]);
141  break;
142 
144  ev_conn_closed(ctx, &evs[i]);
145  break;
146 
147  default:
148  fprintf(stderr, "sockets poll_ctx: unexpected event: %u\n",
149  evs[i].event_type);
150  break;
151  }
152  }
153 
154  return num;
155 }
156 
157 int flextcp_sockctx_poll_n(struct flextcp_context *ctx, unsigned n)
158 {
159  int nevents = 0;
160 
161  do {
162  nevents += flextcp_sockctx_poll(ctx);
163  n = (n >= 16 ? n - 16 : 0);
164  } while (n > 0);
165 
166  return nevents;
167 }
168 
169 static inline void ev_listen_open(struct flextcp_context *ctx,
170  struct flextcp_event *ev)
171 {
172  struct flextcp_listener *l;
173  struct socket *s;
174 
175  l = ev->ev.listen_open.listener;
176  s = (struct socket *)
177  ((uint8_t *) l - offsetof(struct socket, data.listener.l));
178 
179  socket_lock(s);
180 
181  assert(s->type == SOCK_LISTENER);
182  assert(s->data.listener.status == SOL_OPENING);
183  if (ev->ev.listen_open.status == 0) {
184  s->data.listener.status = SOL_OPEN;
185  } else {
186  s->data.listener.status = SOL_FAILED;
187  }
188 
189  socket_unlock(s);
190 }
191 
192 static inline void ev_listen_newconn(struct flextcp_context *ctx,
193  struct flextcp_event *ev)
194 {
195  struct flextcp_listener *l;
196  struct socket *s;
197 
198  l = ev->ev.listen_newconn.listener;
199  s = (struct socket *)
200  ((uint8_t *) l - offsetof(struct socket, data.listener.l));
201 
202  socket_lock(s);
203 
204  assert(s->type == SOCK_LISTENER);
205 
206  flextcp_epoll_set(s, EPOLLIN);
207 
208  socket_unlock(s);
209 }
210 
211 static inline void ev_listen_accept(struct flextcp_context *ctx,
212  struct flextcp_event *ev)
213 {
214  struct flextcp_connection *c;
215  struct socket *s, *sl;
216 
217  c = ev->ev.listen_accept.conn;
218  s = (struct socket *)
219  ((uint8_t *) c - offsetof(struct socket, data.connection.c));
220 
221  socket_lock(s);
222 
223  assert(s->type == SOCK_CONNECTION);
224  assert(s->data.connection.status == SOC_CONNECTING);
225  sl = s->data.connection.listener;
226  assert(sl != NULL);
227 
228  flextcp_epoll_set(sl, EPOLLIN);
229 
230  if (ev->ev.listen_accept.status == 0) {
231  s->data.connection.status = SOC_CONNECTED;
232  flextcp_epoll_set(s, EPOLLOUT);
233  } else {
234  s->data.connection.status = SOC_FAILED;
235  flextcp_epoll_set(s, EPOLLERR);
236  }
237 
238  socket_unlock(s);
239 }
240 
241 static inline void ev_conn_open(struct flextcp_context *ctx,
242  struct flextcp_event *ev)
243 {
244  struct flextcp_connection *c;
245  struct socket *s;
246 
247  c = ev->ev.conn_open.conn;
248  s = (struct socket *)
249  ((uint8_t *) c - offsetof(struct socket, data.connection.c));
250 
251  socket_lock(s);
252 
253  assert(s->type == SOCK_CONNECTION);
254  assert(s->data.connection.status == SOC_CONNECTING);
255 
256  if (ev->ev.conn_open.status == 0) {
257  s->data.connection.status = SOC_CONNECTED;
258  flextcp_epoll_set(s, EPOLLOUT);
259  } else {
260  s->data.connection.status = SOC_FAILED;
261  flextcp_epoll_set(s, EPOLLERR);
262  }
263 
264  socket_unlock(s);
265 }
266 
267 static inline void ev_conn_received(struct flextcp_context *ctx,
268  struct flextcp_event *ev)
269 {
270  struct flextcp_connection *c;
271  struct socket *s;
272  void *buf;
273  size_t len;
274 
275  c = ev->ev.conn_received.conn;
276  s = (struct socket *)
277  ((uint8_t *) c - offsetof(struct socket, data.connection.c));
278 
279  socket_lock(s);
280 
281  if (s->data.connection.status == SOC_CLOSED) {
282  /* ignore data on socket we have already closed */
283  goto out;
284  }
285 
286  assert(s->type == SOCK_CONNECTION);
287  assert(s->data.connection.status == SOC_CONNECTED);
288 
289  buf = ev->ev.conn_received.buf;
290  len = ev->ev.conn_received.len;
291 
292  if (s->data.connection.rx_len_1 == 0) {
293  /* no data currently ready on socket */
294  s->data.connection.rx_len_1 = len;
295  s->data.connection.rx_buf_1 = buf;
296  s->data.connection.rx_len_2 = 0;
297  } else if (s->data.connection.rx_len_2 == 0 && buf ==
298  (uint8_t *) s->data.connection.rx_buf_1 + s->data.connection.rx_len_1)
299  {
300  /* append to previous location 1 */
301  s->data.connection.rx_len_1 += len;
302  } else if (buf ==
303  (uint8_t *) s->data.connection.rx_buf_2 + s->data.connection.rx_len_2)
304  {
305  /* append to previous location 2 */
306  s->data.connection.rx_len_2 += len;
307  } else {
308  /* because we know that the underlying buffer is circular, we know that
309  * there can't be more than 2 positions */
310  if (s->data.connection.rx_len_2 != 0) {
311  fprintf(stderr, "ev_conn_received: More than two non-contiguous "
312  "buffer pieces, this should not happen\n");
313  abort();
314  }
315 
316  s->data.connection.rx_len_2 = len;
317  s->data.connection.rx_buf_2 = buf;
318  }
319 
320  flextcp_epoll_set(s, EPOLLIN);
321 
322 out:
323  socket_unlock(s);
324 }
325 
326 static inline void ev_conn_sendbuf(struct flextcp_context *ctx,
327  struct flextcp_event *ev)
328 {
329  struct flextcp_connection *c;
330  struct socket *s;
331 
332  c = ev->ev.conn_sendbuf.conn;
333  s = (struct socket *)
334  ((uint8_t *) c - offsetof(struct socket, data.connection.c));
335 
336  socket_lock(s);
337 
338  assert(s->type == SOCK_CONNECTION);
339  assert(s->data.connection.status == SOC_CONNECTED);
340 
341  flextcp_epoll_set(s, EPOLLOUT);
342 
343  socket_unlock(s);
344 }
345 
346 static inline void ev_conn_moved(struct flextcp_context *ctx,
347  struct flextcp_event *ev)
348 {
349  struct flextcp_connection *c;
350  struct socket *s;
351 
352  c = ev->ev.conn_moved.conn;
353  s = (struct socket *)
354  ((uint8_t *) c - offsetof(struct socket, data.connection.c));
355 
356  socket_lock(s);
357 
358  assert(s->type == SOCK_CONNECTION);
359  assert(s->data.connection.status == SOC_CONNECTED);
360 
361  s->data.connection.move_status = ev->ev.conn_moved.status;
362 
363  socket_unlock(s);
364 }
365 
366 static inline void ev_conn_rxclosed(struct flextcp_context *ctx,
367  struct flextcp_event *ev)
368 {
369  struct flextcp_connection *c;
370  struct socket *s;
371 
372  c = ev->ev.conn_rxclosed.conn;
373  s = (struct socket *)
374  ((uint8_t *) c - offsetof(struct socket, data.connection.c));
375 
376  socket_lock(s);
377 
378  assert(s->type == SOCK_CONNECTION);
379  assert(s->data.connection.status == SOC_CONNECTED ||
380  s->data.connection.status == SOC_CLOSED);
381 
382  s->data.connection.st_flags |= CSTF_RXCLOSED;
383  flextcp_epoll_set(s, EPOLLIN | EPOLLRDHUP);
384 
385  if (s->data.connection.status == SOC_CLOSED &&
386  (s->data.connection.st_flags & CSTF_TXCLOSED_ACK))
387  {
388  /* if socket is being closed and both rx and tx are closed now, finish
389  * close. */
390  flextcp_sockclose_finish(ctx, s);
391  }
392 
393  socket_unlock(s);
394 }
395 
396 static inline void ev_conn_txclosed(struct flextcp_context *ctx,
397  struct flextcp_event *ev)
398 {
399  struct flextcp_connection *c;
400  struct socket *s;
401 
402  c = ev->ev.conn_txclosed.conn;
403  s = (struct socket *)
404  ((uint8_t *) c - offsetof(struct socket, data.connection.c));
405 
406  socket_lock(s);
407 
408  assert(s->type == SOCK_CONNECTION);
409  assert(s->data.connection.status == SOC_CONNECTED ||
410  s->data.connection.status == SOC_CLOSED);
411 
412  s->data.connection.st_flags |= CSTF_TXCLOSED_ACK;
413 
414  if (s->data.connection.status == SOC_CLOSED &&
415  (s->data.connection.st_flags & CSTF_RXCLOSED))
416  {
417  /* if socket is being closed and both rx and tx are closed now, finish
418  * close. */
419  flextcp_sockclose_finish(ctx, s);
420  }
421 
422  socket_unlock(s);
423 }
424 
425 static inline void ev_conn_closed(struct flextcp_context *ctx,
426  struct flextcp_event *ev)
427 {
428  struct flextcp_connection *c;
429  struct socket *s;
430 
431  c = ev->ev.conn_closed.conn;
432  s = (struct socket *)
433  ((uint8_t *) c - offsetof(struct socket, data.connection.c));
434 
435  socket_lock(s);
436 
437  assert(s->type == SOCK_CONNECTION);
438  assert(s->data.connection.status == SOC_CLOSED);
439 
440  free(s);
441 }
struct flextcp_event::@5::@12 conn_rxclosed
int flextcp_context_poll(struct flextcp_context *ctx, int num, struct flextcp_event *events)
Definition: init.c:454
int flextcp_context_create(struct flextcp_context *ctx)
Definition: init.c:90
struct flextcp_event::@5::@8 listen_accept
struct flextcp_event::@5::@9 conn_open
struct flextcp_event::@5::@15 conn_closed
struct flextcp_event::@5::@10 conn_received
struct flextcp_event::@5::@6 listen_open
Public low-level application interface for TAS.
struct flextcp_event::@5::@13 conn_txclosed
struct flextcp_event::@5::@7 listen_newconn
struct flextcp_event::@5::@14 conn_moved
struct flextcp_event::@5::@11 conn_sendbuf
TAS sockets emulation.