Lines Matching defs:sock
16 #include "spdk/sock.h"
22 #include "spdk_internal/sock.h"
62 struct spdk_uring_sock *sock;
156 #define __uring_sock(sock) (struct spdk_uring_sock *)sock
235 struct spdk_uring_sock *sock = __uring_sock(_sock);
237 assert(sock != NULL);
238 return spdk_net_getaddr(sock->fd, saddr, slen, sport, caddr, clen, cport);
244 struct spdk_uring_sock *sock = __uring_sock(_sock);
248 rc = spdk_net_getaddr(sock->fd, saddr, sizeof(saddr), NULL, NULL, 0, NULL);
253 rc = spdk_net_get_interface_name(saddr, sock->interface_name,
254 sizeof(sock->interface_name));
259 return sock->interface_name;
263 uring_sock_get_numa_id(struct spdk_sock *sock)
269 interface_name = uring_sock_get_interface_name(sock);
289 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
299 if (sock->recv_buf_sz == sz) {
305 spdk_pipe_destroy(sock->recv_pipe);
306 free(sock->recv_buf);
307 sock->recv_pipe = NULL;
308 sock->recv_buf = NULL;
330 if (sock->recv_pipe != NULL) {
332 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
346 spdk_pipe_destroy(sock->recv_pipe);
347 free(sock->recv_buf);
350 sock->recv_buf_sz = sz;
351 sock->recv_buf = new_buf;
352 sock->recv_pipe = new_pipe;
360 struct spdk_uring_sock *sock = __uring_sock(_sock);
364 assert(sock != NULL);
367 rc = uring_sock_alloc_pipe(sock, sz);
369 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
382 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
395 struct spdk_uring_sock *sock = __uring_sock(_sock);
399 assert(sock != NULL);
409 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
422 struct spdk_uring_sock *sock;
428 sock = calloc(1, sizeof(*sock));
429 if (sock == NULL) {
430 SPDK_ERRLOG("sock allocation failed\n");
434 sock->fd = fd;
435 memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts));
437 STAILQ_INIT(&sock->recv_stream);
442 if (sock->base.impl_opts.enable_quickack) {
443 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
449 spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id,
450 &sock->placement_id);
456 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
458 sock->zcopy = true;
459 sock->zcopy_send_flags = MSG_ZEROCOPY;
465 return sock;
473 struct spdk_uring_sock *sock;
690 sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts);
691 if (sock == NULL) {
692 SPDK_ERRLOG("sock allocation failed\n");
697 return &sock->base;
704 SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation.");
715 SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation.");
725 struct spdk_uring_sock *sock = __uring_sock(_sock);
735 assert(sock != NULL);
737 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
754 if (sock->base.opts.priority) {
755 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
763 new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy);
775 struct spdk_uring_sock *sock = __uring_sock(_sock);
778 assert(sock->group == NULL);
781 * leak the fd but continue to free the rest of the sock
783 close(sock->fd);
785 spdk_pipe_destroy(sock->recv_pipe);
786 free(sock->recv_buf);
787 free(sock);
793 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
800 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
817 spdk_pipe_reader_advance(sock->recv_pipe, bytes);
820 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
821 group = __uring_group_impl(sock->base.group_impl);
822 TAILQ_REMOVE(&group->pending_recv, sock, link);
823 sock->pending_recv = false;
841 uring_sock_read(struct spdk_uring_sock *sock)
847 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
850 bytes = sock_readv(sock->fd, iov, 2);
852 spdk_pipe_writer_advance(sock->recv_pipe, bytes);
853 if (sock->base.group_impl && !sock->pending_recv) {
854 group = __uring_group_impl(sock->base.group_impl);
855 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
856 sock->pending_recv = true;
867 struct spdk_uring_sock *sock = __uring_sock(_sock);
871 if (sock->connection_status < 0) {
872 errno = -sock->connection_status;
876 if (sock->recv_pipe != NULL) {
883 tr = STAILQ_FIRST(&sock->recv_stream);
885 if (sock->group->buf_ring_count > 0) {
895 assert(sock->pending_recv == true);
898 *_buf = tr->buf + sock->recv_offset;
901 STAILQ_REMOVE_HEAD(&sock->recv_stream, link);
904 if (STAILQ_EMPTY(&sock->recv_stream)) {
905 sock->pending_recv = false;
906 TAILQ_REMOVE(&group->pending_recv, sock, link);
909 return tr->len - sock->recv_offset;
915 struct spdk_uring_sock *sock = __uring_sock(_sock);
921 if (sock->connection_status < 0) {
922 errno = -sock->connection_status;
928 return sock_readv(sock->fd, iovs, iovcnt);
931 if (STAILQ_EMPTY(&sock->recv_stream)) {
932 if (sock->group->buf_ring_count == 0) {
936 if (sock->pending_recv) {
937 sock->pending_recv = false;
938 TAILQ_REMOVE(&(__uring_group_impl(_sock->group_impl))->pending_recv, sock, link);
941 return sock_readv(sock->fd, iovs, iovcnt);
953 tr = STAILQ_FIRST(&sock->recv_stream);
955 len = spdk_min(iov.iov_len, tr->len - sock->recv_offset);
956 memcpy(iov.iov_base, tr->buf + sock->recv_offset, len);
959 sock->recv_offset += len;
963 if (sock->recv_offset == tr->len) {
964 sock->recv_offset = 0;
965 STAILQ_REMOVE_HEAD(&sock->recv_stream, link);
966 STAILQ_INSERT_HEAD(&sock->group->free_trackers, tr, link);
967 spdk_sock_group_provide_buf(sock->group->base.group, tr->buf, tr->buflen, tr->ctx);
968 tr = STAILQ_FIRST(&sock->recv_stream);
977 if (STAILQ_EMPTY(&sock->recv_stream)) {
981 sock->pending_recv = false;
982 TAILQ_REMOVE(&group->pending_recv, sock, link);
992 struct spdk_uring_sock *sock = __uring_sock(_sock);
996 if (sock->connection_status < 0) {
997 errno = -sock->connection_status;
1001 if (sock->recv_pipe == NULL) {
1010 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
1014 return sock_readv(sock->fd, iov, iovcnt);
1018 rc = uring_sock_read(sock);
1024 return uring_sock_recv_from_pipe(sock, iov, iovcnt);
1028 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
1035 return uring_sock_readv(sock, iov, 1);
1041 struct spdk_uring_sock *sock = __uring_sock(_sock);
1047 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1052 return sendmsg(sock->fd, &msg, MSG_DONTWAIT);
1089 struct spdk_uring_sock *sock = __uring_sock(_sock);
1096 if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) {
1097 sock->sendmsg_idx = 1;
1099 sock->sendmsg_idx++;
1127 req->internal.offset = sock->sendmsg_idx - 1;
1144 struct spdk_uring_sock *sock = __uring_sock(_sock);
1152 assert(sock->zcopy == true);
1163 cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg);
1210 struct spdk_uring_sock *sock = __uring_sock(_sock);
1211 struct spdk_uring_task *task = &sock->errqueue_task;
1218 if (sock->pending_group_remove) {
1222 assert(sock->group != NULL);
1223 sock->group->io_queued++;
1225 sqe = io_uring_get_sqe(&sock->group->uring);
1226 io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE);
1236 struct spdk_uring_sock *sock = __uring_sock(_sock);
1237 struct spdk_uring_task *task = &sock->write_task;
1247 if (sock->zcopy) {
1248 flags = MSG_DONTWAIT | sock->zcopy_send_flags;
1255 iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags);
1261 assert(sock->group != NULL);
1267 sock->group->io_queued++;
1269 sqe = io_uring_get_sqe(&sock->group->uring);
1270 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags);
1278 struct spdk_uring_sock *sock = __uring_sock(_sock);
1279 struct spdk_uring_task *task = &sock->read_task;
1287 if (sock->pending_group_remove) {
1291 assert(sock->group != NULL);
1292 sock->group->io_queued++;
1294 sqe = io_uring_get_sqe(&sock->group->uring);
1295 io_uring_prep_recv(sqe, sock->fd, NULL, URING_MAX_RECV_SIZE, 0);
1305 struct spdk_uring_sock *sock = __uring_sock(_sock);
1306 struct spdk_uring_task *task = &sock->cancel_task;
1313 assert(sock->group != NULL);
1314 sock->group->io_queued++;
1316 sqe = io_uring_get_sqe(&sock->group->uring);
1323 uring_sock_fail(struct spdk_uring_sock *sock, int status)
1325 struct spdk_uring_sock_group_impl *group = sock->group;
1328 sock->connection_status = status;
1329 rc = spdk_sock_abort_requests(&sock->base);
1332 if (rc == 0 && sock->base.cb_fn != NULL &&
1333 sock->pending_recv == false) {
1334 sock->pending_recv = true;
1335 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1345 struct spdk_uring_sock *sock, *tmp;
1362 sock = task->sock;
1363 assert(sock != NULL);
1364 assert(sock->group != NULL);
1365 assert(sock->group == group);
1366 sock->group->io_inflight--;
1367 sock->group->io_avail++;
1379 _sock_prep_read(&sock->base);
1385 if (sock->base.cb_fn != NULL &&
1386 sock->pending_recv == false) {
1387 sock->pending_recv = true;
1388 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1391 _sock_prep_read(&sock->base);
1393 uring_sock_fail(sock, status < 0 ? status : -ECONNRESET);
1407 STAILQ_INSERT_TAIL(&sock->recv_stream, tracker, link);
1411 if (sock->base.cb_fn != NULL &&
1412 sock->pending_recv == false) {
1413 sock->pending_recv = true;
1414 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1417 _sock_prep_read(&sock->base);
1422 (status == -ENOBUFS && sock->zcopy) ||
1426 uring_sock_fail(sock, status);
1432 sock_complete_write_reqs(&sock->base, status, is_zcopy);
1439 _sock_prep_errqueue(&sock->base);
1443 uring_sock_fail(sock, status);
1445 _sock_check_zcopy(&sock->base, status);
1446 _sock_prep_errqueue(&sock->base);
1462 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
1468 if (spdk_unlikely(sock->base.cb_fn == NULL)) {
1469 assert(sock->pending_recv == true);
1470 sock->pending_recv = false;
1471 TAILQ_REMOVE(&group->pending_recv, sock, link);
1475 socks[count++] = &sock->base;
1489 if (sock != NULL) {
1493 ud = sock;
1534 struct spdk_uring_sock *sock = __uring_sock(_sock);
1537 if (spdk_unlikely(sock->connection_status)) {
1538 req->cb_fn(req->cb_arg, sock->connection_status);
1544 if (!sock->group) {
1557 struct spdk_uring_sock *sock = __uring_sock(_sock);
1561 assert(sock != NULL);
1564 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1574 struct spdk_uring_sock *sock = __uring_sock(_sock);
1579 assert(sock != NULL);
1583 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1595 struct spdk_uring_sock *sock = __uring_sock(_sock);
1600 assert(sock != NULL);
1604 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1616 struct spdk_uring_sock *sock = __uring_sock(_sock);
1620 rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT);
1639 struct spdk_uring_sock *sock = __uring_sock(_sock);
1642 if (sock->placement_id != -1) {
1643 spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint);
1735 "uring sock implementation is likely not supported on this kernel.\n");
1752 struct spdk_uring_sock *sock = __uring_sock(_sock);
1756 sock->group = group;
1757 sock->write_task.sock = sock;
1758 sock->write_task.type = URING_TASK_WRITE;
1760 sock->read_task.sock = sock;
1761 sock->read_task.type = URING_TASK_READ;
1763 sock->errqueue_task.sock = sock;
1764 sock->errqueue_task.type = URING_TASK_ERRQUEUE;
1765 sock->errqueue_task.msg.msg_control = sock->buf;
1766 sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf);
1768 sock->cancel_task.sock = sock;
1769 sock->cancel_task.type = URING_TASK_CANCEL;
1772 if (spdk_unlikely(sock->recv_pipe != NULL &&
1773 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1774 assert(sock->pending_recv == false);
1775 sock->pending_recv = true;
1776 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1779 if (sock->placement_id != -1) {
1780 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1782 SPDK_ERRLOG("Failed to insert sock group into map: %d", rc);
1788 _sock_prep_read(&sock->base);
1790 if (sock->zcopy) {
1842 struct spdk_uring_sock *sock;
1846 sock = __uring_sock(_sock);
1847 if (spdk_unlikely(sock->connection_status)) {
1885 struct spdk_uring_sock *sock = __uring_sock(_sock);
1888 sock->pending_group_remove = true;
1890 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1891 _sock_prep_cancel_task(_sock, &sock->write_task);
1894 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1895 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1900 if (sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1901 _sock_prep_cancel_task(_sock, &sock->read_task);
1904 while ((sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1905 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1910 if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1911 _sock_prep_cancel_task(_sock, &sock->errqueue_task);
1914 while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1915 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1921 assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1922 assert(sock->read_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1923 assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1925 if (sock->pending_recv) {
1926 TAILQ_REMOVE(&group->pending_recv, sock, link);
1927 sock->pending_recv = false;
1929 assert(sock->pending_recv == false);
1934 assert(STAILQ_EMPTY(&sock->recv_stream));
1936 if (sock->placement_id != -1) {
1937 spdk_sock_map_release(&g_map, sock->placement_id);
1940 sock->pending_group_remove = false;
1941 sock->group = NULL;
1972 struct spdk_uring_sock *sock = __uring_sock(_sock);
1977 int flags = sock->zcopy_send_flags;
1980 struct spdk_uring_task *task = &sock->errqueue_task;
1989 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
2004 rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT);
2006 if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && sock->zcopy)) {
2024 if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) {
2025 retval = recvmsg(sock->fd, &task->msg, MSG_ERRQUEUE);
2042 SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation.");
2090 /* Check if we can create a uring sock group before we register