TAS
TCP Acceleration as an OS Service
transfer.c
1 /*
2  * Copyright 2019 University of Washington, Max Planck Institute for
3  * Software Systems, and The University of Texas at Austin
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining
6  * a copy of this software and associated documentation files (the
7  * "Software"), to deal in the Software without restriction, including
8  * without limitation the rights to use, copy, modify, merge, publish,
9  * distribute, sublicense, and/or sell copies of the Software, and to
10  * permit persons to whom the Software is furnished to do so, subject to
11  * the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be
14  * included in all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
19  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
20  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
21  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
22  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23  */
24 
25 #include <assert.h>
26 #include <stdlib.h>
27 #include <stdio.h>
28 #include <stddef.h>
29 #include <errno.h>
30 #include <string.h>
31 
32 #include <utils.h>
33 #include <utils_circ.h>
34 #include <tas_sockets.h>
35 #include <tas_ll.h>
36 
37 #include "internal.h"
38 #include "../tas/internal.h"
39 
40 ssize_t tas_recvmsg(int sockfd, struct msghdr *msg, int flags)
41 {
42  struct socket *s;
43  struct flextcp_context *ctx;
44  ssize_t ret = 0;
45  size_t len, i, off;
46  struct iovec *iov;
47  int block;
48 
49  if (flextcp_fd_slookup(sockfd, &s) != 0) {
50  errno = EBADF;
51  return -1;
52  }
53 
54  tas_sock_move(s);
55 
56  /* not a connection, or not connected */
57  if (s->type != SOCK_CONNECTION ||
58  s->data.connection.status != SOC_CONNECTED)
59  {
60  errno = ENOTCONN;
61  ret = -1;
62  goto out;
63  }
64 
65  /* return 0 if 0 length */
66  len = 0;
67  iov = msg->msg_iov;
68  for (i = 0; i < msg->msg_iovlen; i++) {
69  len += iov[i].iov_len;
70  }
71  if (len == 0) {
72  goto out;
73  }
74 
75  ctx = flextcp_sockctx_get();
76 
77  /* wait for data if necessary, or abort after polling once if non-blocking */
78  block = 0;
79  while (s->data.connection.rx_len_1 == 0 &&
80  !(s->data.connection.st_flags & CSTF_RXCLOSED))
81  {
82  flextcp_epoll_clear(s, EPOLLIN);
83 
84  /* even if non-blocking we have to poll the context at least once to handle
85  * busy polling loops of recvmsg */
86  socket_unlock(s);
87  if (block)
88  flextcp_context_wait(ctx, -1);
89  block = 1;
90  flextcp_sockctx_poll(ctx);
91  socket_lock(s);
92 
93  /* if non-blocking and nothing then we abort now */
94  if ((s->flags & SOF_NONBLOCK) == SOF_NONBLOCK &&
95  s->data.connection.rx_len_1 == 0 &&
96  !(s->data.connection.st_flags & CSTF_RXCLOSED))
97  {
98  errno = EAGAIN;
99  ret = -1;
100  goto out;
101  }
102  }
103 
104  /* copy data into buffer vector */
105  for (i = 0; i < msg->msg_iovlen && s->data.connection.rx_len_1 > 0; i++) {
106  off = 0;
107  if (s->data.connection.rx_len_1 <= iov[i].iov_len) {
108  off = s->data.connection.rx_len_1;
109  memcpy(iov[i].iov_base, s->data.connection.rx_buf_1, off);
110  ret += off;
111 
112  s->data.connection.rx_buf_1 = s->data.connection.rx_buf_2;
113  s->data.connection.rx_len_1 = s->data.connection.rx_len_2;
114  s->data.connection.rx_buf_2 = NULL;
115  s->data.connection.rx_len_2 = 0;
116  }
117 
118  len = MIN(iov[i].iov_len - off, s->data.connection.rx_len_1);
119  memcpy((uint8_t *) iov[i].iov_base + off, s->data.connection.rx_buf_1, len);
120  ret += len;
121 
122  s->data.connection.rx_buf_1 = (uint8_t *) s->data.connection.rx_buf_1 + len;
123  s->data.connection.rx_len_1 -= len;
124  }
125 
126  if (ret > 0) {
127  if (s->data.connection.rx_len_1 == 0 &&
128  !(s->data.connection.st_flags & CSTF_RXCLOSED))
129  {
130  flextcp_epoll_clear(s, EPOLLIN);
131  }
132  flextcp_connection_rx_done(ctx, &s->data.connection.c, ret);
133  }
134 out:
135  flextcp_fd_srelease(sockfd, s);
136  return ret;
137 }
138 
139 static inline ssize_t recv_simple(int sockfd, void *buf, size_t len, int flags)
140 {
141  struct socket *s;
142  struct flextcp_context *ctx;
143  ssize_t ret = 0;
144  size_t off, len_2;
145  int block;
146 
147  if (flextcp_fd_slookup(sockfd, &s) != 0) {
148  errno = EBADF;
149  return -1;
150  }
151 
152  tas_sock_move(s);
153 
154  /* not a connection, or not connected */
155  if (s->type != SOCK_CONNECTION ||
156  s->data.connection.status != SOC_CONNECTED)
157  {
158  errno = ENOTCONN;
159  ret = -1;
160  goto out;
161  }
162 
163  /* return 0 if 0 length */
164  if (len == 0) {
165  goto out;
166  }
167 
168  ctx = flextcp_sockctx_get();
169 
170  /* wait for data if necessary, or abort after polling once if non-blocking */
171  block = 0;
172  while (s->data.connection.rx_len_1 == 0 &&
173  !(s->data.connection.st_flags & CSTF_RXCLOSED))
174  {
175  flextcp_epoll_clear(s, EPOLLIN);
176 
177  /* even if non-blocking we have to poll the context at least once to handle
178  * busy polling loops of recvmsg */
179  socket_unlock(s);
180  if (block)
181  flextcp_context_wait(ctx, -1);
182  block = 1;
183  flextcp_sockctx_poll(ctx);
184  socket_lock(s);
185 
186  /* if non-blocking and nothing then we abort now */
187  if ((s->flags & SOF_NONBLOCK) == SOF_NONBLOCK &&
188  s->data.connection.rx_len_1 == 0 &&
189  !(s->data.connection.st_flags & CSTF_RXCLOSED))
190  {
191  errno = EAGAIN;
192  ret = -1;
193  goto out;
194  }
195  }
196 
197  /* copy to provided buffer */
198  off = 0;
199  if (s->data.connection.rx_len_1 <= len) {
200  memcpy(buf, s->data.connection.rx_buf_1, s->data.connection.rx_len_1);
201  ret = off = s->data.connection.rx_len_1;
202 
203  s->data.connection.rx_buf_1 = s->data.connection.rx_buf_2;
204  s->data.connection.rx_len_1 = s->data.connection.rx_len_2;
205  s->data.connection.rx_buf_2 = NULL;
206  s->data.connection.rx_len_2 = 0;
207  }
208  len_2 = MIN(s->data.connection.rx_len_1, len - off);
209  memcpy((uint8_t *) buf + ret, s->data.connection.rx_buf_1, len_2);
210  ret += len_2;
211  s->data.connection.rx_buf_1 += len_2;
212  s->data.connection.rx_len_1 -= len_2;
213 
214  if (ret > 0) {
215  if (s->data.connection.rx_len_1 == 0 &&
216  !(s->data.connection.st_flags & CSTF_RXCLOSED))
217  {
218  flextcp_epoll_clear(s, EPOLLIN);
219  }
220  flextcp_connection_rx_done(ctx, &s->data.connection.c, ret);
221  }
222 out:
223  flextcp_fd_srelease(sockfd, s);
224  return ret;
225 }
226 
227 #include <unistd.h>
228 
229 ssize_t tas_sendmsg(int sockfd, const struct msghdr *msg, int flags)
230 {
231 
232  struct socket *s;
233  struct flextcp_context *ctx;
234  ssize_t ret = 0;
235  size_t len, i, l, len_1, len_2, off;
236  struct iovec *iov;
237  void *dst_1, *dst_2;
238  int block;
239 
240  if (flextcp_fd_slookup(sockfd, &s) != 0) {
241  errno = EBADF;
242  return -1;
243  }
244 
245  tas_sock_move(s);
246 
247  /* not a connection, or not connected */
248  if (s->type != SOCK_CONNECTION ||
249  s->data.connection.status != SOC_CONNECTED ||
250  (s->data.connection.st_flags & CSTF_TXCLOSED) == CSTF_TXCLOSED)
251  {
252  errno = ENOTCONN;
253  ret = -1;
254  goto out;
255  }
256 
257  /* return 0 if 0 length */
258  len = 0;
259  iov = msg->msg_iov;
260  for (i = 0; i < msg->msg_iovlen; i++) {
261  len += iov[i].iov_len;
262  }
263  if (len == 0) {
264  goto out;
265  }
266 
267  ctx = flextcp_sockctx_get();
268 
269  /* make sure there is space in the transmit queue if the socket is
270  * non-blocking */
271  if ((s->flags & SOF_NONBLOCK) == SOF_NONBLOCK &&
272  flextcp_connection_tx_possible(ctx, &s->data.connection.c) != 0)
273  {
274  errno = EAGAIN;
275  ret = -1;
276  goto out;
277  }
278 
279  /* allocate transmit buffer */
280  ret = flextcp_connection_tx_alloc2(&s->data.connection.c, len, &dst_1, &len_1,
281  &dst_2);
282  if (ret < 0) {
283  fprintf(stderr, "sendmsg: flextcp_connection_tx_alloc failed\n");
284  abort();
285  }
286 
287  /* if tx buffer allocation failed, either block or poll context at least once
288  * to handle busy loops of send on non-blocking sockets. */
289  block = 0;
290  while (ret == 0) {
291  socket_unlock(s);
292  if (block)
293  flextcp_context_wait(ctx, -1);
294  block = 1;
295 
296  flextcp_sockctx_poll(ctx);
297  socket_lock(s);
298 
299  ret = flextcp_connection_tx_alloc2(&s->data.connection.c, len, &dst_1,
300  &len_1, &dst_2);
301  if (ret < 0) {
302  fprintf(stderr, "sendmsg: flextcp_connection_tx_alloc failed\n");
303  abort();
304  } else if (ret == 0 && (s->flags & SOF_NONBLOCK) == SOF_NONBLOCK) {
305  errno = EAGAIN;
306  ret = -1;
307  goto out;
308  }
309  }
310  len_2 = ret - len_1;
311 
312  /* copy into TX buffer */
313  len = ret;
314  iov = msg->msg_iov;
315  off = 0;
316  for (i = 0; i < msg->msg_iovlen && len > 0; i++) {
317  l = MIN(len, iov[i].iov_len);
318  split_write(iov[i].iov_base, l, dst_1, len_1, dst_2, len_2, off);
319 
320  len -= l;
321  off += l;
322  }
323 
324  /* send out */
325  /* TODO: this should not block for non-blocking sockets */
326  block = 0;
327  while (flextcp_connection_tx_send(ctx, &s->data.connection.c, ret) != 0) {
328  socket_unlock(s);
329  if (block)
330  flextcp_context_wait(ctx, -1);
331  block = 1;
332 
333  flextcp_sockctx_poll(ctx);
334  socket_lock(s);
335  }
336 
337 out:
338  flextcp_fd_srelease(sockfd, s);
339  return ret;
340 }
341 
342 static inline ssize_t send_simple(int sockfd, const void *buf, size_t len,
343  int flags)
344 {
345  struct socket *s;
346  struct flextcp_context *ctx;
347  ssize_t ret = 0;
348  size_t len_1, len_2;
349  void *dst_1, *dst_2;
350  int block;
351 
352  if (flextcp_fd_slookup(sockfd, &s) != 0) {
353  errno = EBADF;
354  return -1;
355  }
356 
357  tas_sock_move(s);
358 
359  /* not a connection, or not connected */
360  if (s->type != SOCK_CONNECTION ||
361  s->data.connection.status != SOC_CONNECTED ||
362  (s->data.connection.st_flags & CSTF_TXCLOSED) == CSTF_TXCLOSED)
363  {
364  errno = ENOTCONN;
365  ret = -1;
366  goto out;
367  }
368 
369  /* return 0 if 0 length */
370  if (len == 0) {
371  goto out;
372  }
373 
374  ctx = flextcp_sockctx_get();
375 
376  /* make sure there is space in the transmit queue if the socket is
377  * non-blocking */
378  if ((s->flags & SOF_NONBLOCK) == SOF_NONBLOCK &&
379  flextcp_connection_tx_possible(ctx, &s->data.connection.c) != 0)
380  {
381  errno = EAGAIN;
382  ret = -1;
383  goto out;
384  }
385 
386  /* allocate transmit buffer */
387  ret = flextcp_connection_tx_alloc2(&s->data.connection.c, len, &dst_1, &len_1,
388  &dst_2);
389  if (ret < 0) {
390  fprintf(stderr, "sendmsg: flextcp_connection_tx_alloc failed\n");
391  abort();
392  }
393 
394  /* if tx buffer allocation failed, either block or poll context at least once
395  * to handle busy loops of send on non-blocking sockets. */
396  block = 0;
397  while (ret == 0) {
398  socket_unlock(s);
399  if (block)
400  flextcp_context_wait(ctx, -1);
401  block = 1;
402 
403  flextcp_sockctx_poll(ctx);
404  socket_lock(s);
405 
406  ret = flextcp_connection_tx_alloc2(&s->data.connection.c, len, &dst_1,
407  &len_1, &dst_2);
408  if (ret < 0) {
409  fprintf(stderr, "sendmsg: flextcp_connection_tx_alloc failed\n");
410  abort();
411  } else if (ret == 0 && (s->flags & SOF_NONBLOCK) == SOF_NONBLOCK) {
412  errno = EAGAIN;
413  ret = -1;
414  goto out;
415  }
416  }
417  len_2 = ret - len_1;
418 
419  /* copy into TX buffer */
420  memcpy(dst_1, buf, len_1);
421  memcpy(dst_2, (const uint8_t *) buf + len_1, len_2);
422 
423  /* send out */
424  /* TODO: this should not block for non-blocking sockets */
425  block = 0;
426  while (flextcp_connection_tx_send(ctx, &s->data.connection.c, ret) != 0) {
427  socket_unlock(s);
428  if (block)
429  flextcp_context_wait(ctx, -1);
430  block = 1;
431 
432  flextcp_sockctx_poll(ctx);
433  socket_lock(s);
434  }
435 
436 out:
437  flextcp_fd_srelease(sockfd, s);
438  return ret;
439 }
440 
441 /******************************************************************************/
442 /* map:
443  * - read, recv, recvfrom --> recvmsg
444  * - write, send, sendto --> sendmsg
445  */
446 
447 ssize_t tas_read(int sockfd, void *buf, size_t len)
448 {
449  return recv_simple(sockfd, buf, len, 0);
450 }
451 
452 ssize_t tas_recv(int sockfd, void *buf, size_t len, int flags)
453 {
454  return recv_simple(sockfd, buf, len, flags);
455 }
456 
457 ssize_t tas_recvfrom(int sockfd, void *buf, size_t len, int flags,
458  struct sockaddr *src_addr, socklen_t *addrlen)
459 {
460  ssize_t ret;
461 
462  ret = recv_simple(sockfd, buf, len, flags);
463 
464  if (src_addr != NULL) {
465  *addrlen = *addrlen;
466  }
467  return ret;
468 }
469 
470 ssize_t tas_readv(int sockfd, const struct iovec *iov, int iovlen)
471 {
472  struct msghdr msg;
473  msg.msg_name = NULL;
474  msg.msg_namelen = 0;
475  msg.msg_iov = (struct iovec *) iov;
476  msg.msg_iovlen = iovlen;
477  msg.msg_control = NULL;
478  msg.msg_controllen = 0;
479  msg.msg_flags = 0;
480 
481  return tas_recvmsg(sockfd, &msg, 0);
482 }
483 
484 ssize_t tas_pread(int sockfd, void *buf, size_t len, off_t offset)
485 {
486  /* skipping zero chet for offset here */
487  return recv_simple(sockfd, buf, len, 0);
488 }
489 
490 
491 ssize_t tas_write(int sockfd, const void *buf, size_t len)
492 {
493  return send_simple(sockfd, buf, len, 0);
494 }
495 
496 ssize_t tas_send(int sockfd, const void *buf, size_t len, int flags)
497 {
498  return send_simple(sockfd, buf, len, flags);
499 }
500 
501 ssize_t tas_sendto(int sockfd, const void *buf, size_t len, int flags,
502  const struct sockaddr *dest_addr, socklen_t addrlen)
503 {
504  return send_simple(sockfd, buf, len, flags);
505 }
506 
507 ssize_t tas_writev(int sockfd, const struct iovec *iov, int iovlen)
508 {
509  struct msghdr msg;
510  msg.msg_name = NULL;
511  msg.msg_namelen = 0;
512  msg.msg_iov = (struct iovec *) iov;
513  msg.msg_iovlen = iovlen;
514  msg.msg_control = NULL;
515  msg.msg_controllen = 0;
516  msg.msg_flags = 0;
517 
518  return tas_sendmsg(sockfd, &msg, 0);
519 }
520 
521 ssize_t tas_pwrite(int sockfd, const void *buf, size_t len, off_t offset)
522 {
523  /* skipping zero chet for offset here */
524  return send_simple(sockfd, buf, len, 0);
525 }
526 
527 ssize_t tas_sendfile(int sockfd, int in_fd, off_t *offset, size_t len)
528 {
529  assert(!"NYI");
530  errno = ENOTSUP;
531  return -1;
532 }
int flextcp_connection_tx_send(struct flextcp_context *ctx, struct flextcp_connection *conn, size_t len)
Definition: conn.c:314
int flextcp_context_wait(struct flextcp_context *ctx, int timeout_ms)
Definition: init.c:995
int flextcp_connection_rx_done(struct flextcp_context *ctx, struct flextcp_connection *conn, size_t len)
Definition: conn.c:223
Public low-level application interface for TAS.
ssize_t flextcp_connection_tx_alloc2(struct flextcp_connection *conn, size_t len, void **buf_1, size_t *len_1, void **buf_2)
Definition: conn.c:277
int flextcp_connection_tx_possible(struct flextcp_context *ctx, struct flextcp_connection *conn)
Definition: conn.c:386
TAS sockets emulation.