33 #include <utils_circ.h> 38 #include "../tas/internal.h" 40 ssize_t tas_recvmsg(
int sockfd,
struct msghdr *msg,
int flags)
49 if (flextcp_fd_slookup(sockfd, &s) != 0) {
57 if (s->type != SOCK_CONNECTION ||
58 s->data.connection.status != SOC_CONNECTED)
68 for (i = 0; i < msg->msg_iovlen; i++) {
69 len += iov[i].iov_len;
75 ctx = flextcp_sockctx_get();
79 while (s->data.connection.rx_len_1 == 0 &&
80 !(s->data.connection.st_flags & CSTF_RXCLOSED))
82 flextcp_epoll_clear(s, EPOLLIN);
90 flextcp_sockctx_poll(ctx);
94 if ((s->flags & SOF_NONBLOCK) == SOF_NONBLOCK &&
95 s->data.connection.rx_len_1 == 0 &&
96 !(s->data.connection.st_flags & CSTF_RXCLOSED))
105 for (i = 0; i < msg->msg_iovlen && s->data.connection.rx_len_1 > 0; i++) {
107 if (s->data.connection.rx_len_1 <= iov[i].iov_len) {
108 off = s->data.connection.rx_len_1;
109 memcpy(iov[i].iov_base, s->data.connection.rx_buf_1, off);
112 s->data.connection.rx_buf_1 = s->data.connection.rx_buf_2;
113 s->data.connection.rx_len_1 = s->data.connection.rx_len_2;
114 s->data.connection.rx_buf_2 = NULL;
115 s->data.connection.rx_len_2 = 0;
118 len = MIN(iov[i].iov_len - off, s->data.connection.rx_len_1);
119 memcpy((uint8_t *) iov[i].iov_base + off, s->data.connection.rx_buf_1, len);
122 s->data.connection.rx_buf_1 = (uint8_t *) s->data.connection.rx_buf_1 + len;
123 s->data.connection.rx_len_1 -= len;
127 if (s->data.connection.rx_len_1 == 0 &&
128 !(s->data.connection.st_flags & CSTF_RXCLOSED))
130 flextcp_epoll_clear(s, EPOLLIN);
135 flextcp_fd_srelease(sockfd, s);
139 static inline ssize_t recv_simple(
int sockfd,
void *buf,
size_t len,
int flags)
147 if (flextcp_fd_slookup(sockfd, &s) != 0) {
155 if (s->type != SOCK_CONNECTION ||
156 s->data.connection.status != SOC_CONNECTED)
168 ctx = flextcp_sockctx_get();
172 while (s->data.connection.rx_len_1 == 0 &&
173 !(s->data.connection.st_flags & CSTF_RXCLOSED))
175 flextcp_epoll_clear(s, EPOLLIN);
183 flextcp_sockctx_poll(ctx);
187 if ((s->flags & SOF_NONBLOCK) == SOF_NONBLOCK &&
188 s->data.connection.rx_len_1 == 0 &&
189 !(s->data.connection.st_flags & CSTF_RXCLOSED))
199 if (s->data.connection.rx_len_1 <= len) {
200 memcpy(buf, s->data.connection.rx_buf_1, s->data.connection.rx_len_1);
201 ret = off = s->data.connection.rx_len_1;
203 s->data.connection.rx_buf_1 = s->data.connection.rx_buf_2;
204 s->data.connection.rx_len_1 = s->data.connection.rx_len_2;
205 s->data.connection.rx_buf_2 = NULL;
206 s->data.connection.rx_len_2 = 0;
208 len_2 = MIN(s->data.connection.rx_len_1, len - off);
209 memcpy((uint8_t *) buf + ret, s->data.connection.rx_buf_1, len_2);
211 s->data.connection.rx_buf_1 += len_2;
212 s->data.connection.rx_len_1 -= len_2;
215 if (s->data.connection.rx_len_1 == 0 &&
216 !(s->data.connection.st_flags & CSTF_RXCLOSED))
218 flextcp_epoll_clear(s, EPOLLIN);
223 flextcp_fd_srelease(sockfd, s);
229 ssize_t tas_sendmsg(
int sockfd,
const struct msghdr *msg,
int flags)
235 size_t len, i, l, len_1, len_2, off;
240 if (flextcp_fd_slookup(sockfd, &s) != 0) {
248 if (s->type != SOCK_CONNECTION ||
249 s->data.connection.status != SOC_CONNECTED ||
250 (s->data.connection.st_flags & CSTF_TXCLOSED) == CSTF_TXCLOSED)
260 for (i = 0; i < msg->msg_iovlen; i++) {
261 len += iov[i].iov_len;
267 ctx = flextcp_sockctx_get();
271 if ((s->flags & SOF_NONBLOCK) == SOF_NONBLOCK &&
283 fprintf(stderr,
"sendmsg: flextcp_connection_tx_alloc failed\n");
296 flextcp_sockctx_poll(ctx);
302 fprintf(stderr,
"sendmsg: flextcp_connection_tx_alloc failed\n");
304 }
else if (ret == 0 && (s->flags & SOF_NONBLOCK) == SOF_NONBLOCK) {
316 for (i = 0; i < msg->msg_iovlen && len > 0; i++) {
317 l = MIN(len, iov[i].iov_len);
318 split_write(iov[i].iov_base, l, dst_1, len_1, dst_2, len_2, off);
333 flextcp_sockctx_poll(ctx);
338 flextcp_fd_srelease(sockfd, s);
342 static inline ssize_t send_simple(
int sockfd,
const void *buf,
size_t len,
352 if (flextcp_fd_slookup(sockfd, &s) != 0) {
360 if (s->type != SOCK_CONNECTION ||
361 s->data.connection.status != SOC_CONNECTED ||
362 (s->data.connection.st_flags & CSTF_TXCLOSED) == CSTF_TXCLOSED)
374 ctx = flextcp_sockctx_get();
378 if ((s->flags & SOF_NONBLOCK) == SOF_NONBLOCK &&
390 fprintf(stderr,
"sendmsg: flextcp_connection_tx_alloc failed\n");
403 flextcp_sockctx_poll(ctx);
409 fprintf(stderr,
"sendmsg: flextcp_connection_tx_alloc failed\n");
411 }
else if (ret == 0 && (s->flags & SOF_NONBLOCK) == SOF_NONBLOCK) {
420 memcpy(dst_1, buf, len_1);
421 memcpy(dst_2, (
const uint8_t *) buf + len_1, len_2);
432 flextcp_sockctx_poll(ctx);
437 flextcp_fd_srelease(sockfd, s);
447 ssize_t tas_read(
int sockfd,
void *buf,
size_t len)
449 return recv_simple(sockfd, buf, len, 0);
452 ssize_t tas_recv(
int sockfd,
void *buf,
size_t len,
int flags)
454 return recv_simple(sockfd, buf, len, flags);
457 ssize_t tas_recvfrom(
int sockfd,
void *buf,
size_t len,
int flags,
458 struct sockaddr *src_addr, socklen_t *addrlen)
462 ret = recv_simple(sockfd, buf, len, flags);
464 if (src_addr != NULL) {
470 ssize_t tas_readv(
int sockfd,
const struct iovec *iov,
int iovlen)
475 msg.msg_iov = (
struct iovec *) iov;
476 msg.msg_iovlen = iovlen;
477 msg.msg_control = NULL;
478 msg.msg_controllen = 0;
481 return tas_recvmsg(sockfd, &msg, 0);
484 ssize_t tas_pread(
int sockfd,
void *buf,
size_t len, off_t offset)
487 return recv_simple(sockfd, buf, len, 0);
491 ssize_t tas_write(
int sockfd,
const void *buf,
size_t len)
493 return send_simple(sockfd, buf, len, 0);
496 ssize_t tas_send(
int sockfd,
const void *buf,
size_t len,
int flags)
498 return send_simple(sockfd, buf, len, flags);
501 ssize_t tas_sendto(
int sockfd,
const void *buf,
size_t len,
int flags,
502 const struct sockaddr *dest_addr, socklen_t addrlen)
504 return send_simple(sockfd, buf, len, flags);
507 ssize_t tas_writev(
int sockfd,
const struct iovec *iov,
int iovlen)
512 msg.msg_iov = (
struct iovec *) iov;
513 msg.msg_iovlen = iovlen;
514 msg.msg_control = NULL;
515 msg.msg_controllen = 0;
518 return tas_sendmsg(sockfd, &msg, 0);
521 ssize_t tas_pwrite(
int sockfd,
const void *buf,
size_t len, off_t offset)
524 return send_simple(sockfd, buf, len, 0);
527 ssize_t tas_sendfile(
int sockfd,
int in_fd, off_t *offset,
size_t len)
int flextcp_connection_tx_send(struct flextcp_context *ctx, struct flextcp_connection *conn, size_t len)
int flextcp_context_wait(struct flextcp_context *ctx, int timeout_ms)
int flextcp_connection_rx_done(struct flextcp_context *ctx, struct flextcp_connection *conn, size_t len)
Public low-level application interface for TAS.
ssize_t flextcp_connection_tx_alloc2(struct flextcp_connection *conn, size_t len, void **buf_1, size_t *len_1, void **buf_2)
int flextcp_connection_tx_possible(struct flextcp_context *ctx, struct flextcp_connection *conn)