112198SEiji.Ota@Sun.COM /*
212198SEiji.Ota@Sun.COM * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
312198SEiji.Ota@Sun.COM */
412198SEiji.Ota@Sun.COM
512198SEiji.Ota@Sun.COM /*
612763SGiri.Adari@Sun.COM * This file contains code imported from the OFED rds source file send.c
712763SGiri.Adari@Sun.COM * Oracle elects to have and use the contents of send.c under and governed
812763SGiri.Adari@Sun.COM * by the OpenIB.org BSD license (see below for full license text). However,
912763SGiri.Adari@Sun.COM * the following notice accompanied the original version of this file:
1012763SGiri.Adari@Sun.COM */
1112763SGiri.Adari@Sun.COM
1212763SGiri.Adari@Sun.COM /*
1312198SEiji.Ota@Sun.COM * Copyright (c) 2006 Oracle. All rights reserved.
1412198SEiji.Ota@Sun.COM *
1512198SEiji.Ota@Sun.COM * This software is available to you under a choice of one of two
1612198SEiji.Ota@Sun.COM * licenses. You may choose to be licensed under the terms of the GNU
1712198SEiji.Ota@Sun.COM * General Public License (GPL) Version 2, available from the file
1812198SEiji.Ota@Sun.COM * COPYING in the main directory of this source tree, or the
1912198SEiji.Ota@Sun.COM * OpenIB.org BSD license below:
2012198SEiji.Ota@Sun.COM *
2112198SEiji.Ota@Sun.COM * Redistribution and use in source and binary forms, with or
2212198SEiji.Ota@Sun.COM * without modification, are permitted provided that the following
2312198SEiji.Ota@Sun.COM * conditions are met:
2412198SEiji.Ota@Sun.COM *
2512198SEiji.Ota@Sun.COM * - Redistributions of source code must retain the above
2612198SEiji.Ota@Sun.COM * copyright notice, this list of conditions and the following
2712198SEiji.Ota@Sun.COM * disclaimer.
2812198SEiji.Ota@Sun.COM *
2912198SEiji.Ota@Sun.COM * - Redistributions in binary form must reproduce the above
3012198SEiji.Ota@Sun.COM * copyright notice, this list of conditions and the following
3112198SEiji.Ota@Sun.COM * disclaimer in the documentation and/or other materials
3212198SEiji.Ota@Sun.COM * provided with the distribution.
3312198SEiji.Ota@Sun.COM *
3412198SEiji.Ota@Sun.COM * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
3512198SEiji.Ota@Sun.COM * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
3612198SEiji.Ota@Sun.COM * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
3712198SEiji.Ota@Sun.COM * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
3812198SEiji.Ota@Sun.COM * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
3912198SEiji.Ota@Sun.COM * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
4012198SEiji.Ota@Sun.COM * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
4112198SEiji.Ota@Sun.COM * SOFTWARE.
4212198SEiji.Ota@Sun.COM *
4312198SEiji.Ota@Sun.COM */
4412198SEiji.Ota@Sun.COM #include <sys/stropts.h>
4512198SEiji.Ota@Sun.COM #include <sys/systm.h>
4612198SEiji.Ota@Sun.COM
4712198SEiji.Ota@Sun.COM #include <sys/rds.h>
4812198SEiji.Ota@Sun.COM #include <sys/socket.h>
4912198SEiji.Ota@Sun.COM #include <sys/socketvar.h>
5012198SEiji.Ota@Sun.COM
5112198SEiji.Ota@Sun.COM #include <sys/ib/clients/rdsv3/rdsv3.h>
5212198SEiji.Ota@Sun.COM #include <sys/ib/clients/rdsv3/rdma.h>
5312198SEiji.Ota@Sun.COM #include <sys/ib/clients/rdsv3/rdsv3_debug.h>
5412198SEiji.Ota@Sun.COM
5512198SEiji.Ota@Sun.COM /*
5612198SEiji.Ota@Sun.COM * When transmitting messages in rdsv3_send_xmit, we need to emerge from
5712198SEiji.Ota@Sun.COM * time to time and briefly release the CPU. Otherwise the softlock watchdog
5812198SEiji.Ota@Sun.COM * will kick our shin.
5912198SEiji.Ota@Sun.COM * Also, it seems fairer to not let one busy connection stall all the
6012198SEiji.Ota@Sun.COM * others.
6112198SEiji.Ota@Sun.COM *
6212198SEiji.Ota@Sun.COM * send_batch_count is the number of times we'll loop in send_xmit. Setting
6312198SEiji.Ota@Sun.COM * it to 0 will restore the old behavior (where we looped until we had
6412198SEiji.Ota@Sun.COM * drained the queue).
6512198SEiji.Ota@Sun.COM */
6612198SEiji.Ota@Sun.COM static int send_batch_count = 64;
6712198SEiji.Ota@Sun.COM
6812198SEiji.Ota@Sun.COM extern void rdsv3_ib_send_unmap_rdma(void *ic, struct rdsv3_rdma_op *op);
6912198SEiji.Ota@Sun.COM /*
7012198SEiji.Ota@Sun.COM * Reset the send state. Caller must hold c_send_lock when calling here.
7112198SEiji.Ota@Sun.COM */
7212198SEiji.Ota@Sun.COM void
rdsv3_send_reset(struct rdsv3_connection * conn)7312198SEiji.Ota@Sun.COM rdsv3_send_reset(struct rdsv3_connection *conn)
7412198SEiji.Ota@Sun.COM {
7512198SEiji.Ota@Sun.COM struct rdsv3_message *rm, *tmp;
7612198SEiji.Ota@Sun.COM struct rdsv3_rdma_op *ro;
7712198SEiji.Ota@Sun.COM
7812198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_reset", "Enter(conn: %p)", conn);
7912198SEiji.Ota@Sun.COM
8012676SEiji.Ota@Sun.COM ASSERT(MUTEX_HELD(&conn->c_send_lock));
8112676SEiji.Ota@Sun.COM
8212198SEiji.Ota@Sun.COM if (conn->c_xmit_rm) {
8312198SEiji.Ota@Sun.COM rm = conn->c_xmit_rm;
8412198SEiji.Ota@Sun.COM ro = rm->m_rdma_op;
8512198SEiji.Ota@Sun.COM if (ro && ro->r_mapped) {
8612198SEiji.Ota@Sun.COM RDSV3_DPRINTF2("rdsv3_send_reset",
8712198SEiji.Ota@Sun.COM "rm %p mflg 0x%x map %d mihdl %p sgl %p",
8812198SEiji.Ota@Sun.COM rm, rm->m_flags, ro->r_mapped,
8912198SEiji.Ota@Sun.COM ro->r_rdma_sg[0].mihdl,
9012198SEiji.Ota@Sun.COM ro->r_rdma_sg[0].swr.wr_sgl);
9112198SEiji.Ota@Sun.COM rdsv3_ib_send_unmap_rdma(conn->c_transport_data, ro);
9212198SEiji.Ota@Sun.COM }
9312198SEiji.Ota@Sun.COM /*
9412198SEiji.Ota@Sun.COM * Tell the user the RDMA op is no longer mapped by the
9512198SEiji.Ota@Sun.COM * transport. This isn't entirely true (it's flushed out
9612198SEiji.Ota@Sun.COM * independently) but as the connection is down, there's
9712198SEiji.Ota@Sun.COM * no ongoing RDMA to/from that memory
9812198SEiji.Ota@Sun.COM */
9912198SEiji.Ota@Sun.COM rdsv3_message_unmapped(conn->c_xmit_rm);
10012198SEiji.Ota@Sun.COM rdsv3_message_put(conn->c_xmit_rm);
10112198SEiji.Ota@Sun.COM conn->c_xmit_rm = NULL;
10212198SEiji.Ota@Sun.COM }
10312676SEiji.Ota@Sun.COM
10412198SEiji.Ota@Sun.COM conn->c_xmit_sg = 0;
10512198SEiji.Ota@Sun.COM conn->c_xmit_hdr_off = 0;
10612198SEiji.Ota@Sun.COM conn->c_xmit_data_off = 0;
10712198SEiji.Ota@Sun.COM conn->c_xmit_rdma_sent = 0;
10812198SEiji.Ota@Sun.COM conn->c_map_queued = 0;
10912198SEiji.Ota@Sun.COM
11012198SEiji.Ota@Sun.COM conn->c_unacked_packets = rdsv3_sysctl_max_unacked_packets;
11112198SEiji.Ota@Sun.COM conn->c_unacked_bytes = rdsv3_sysctl_max_unacked_bytes;
11212198SEiji.Ota@Sun.COM
11312198SEiji.Ota@Sun.COM /* Mark messages as retransmissions, and move them to the send q */
11412198SEiji.Ota@Sun.COM mutex_enter(&conn->c_lock);
11512198SEiji.Ota@Sun.COM RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) {
11612198SEiji.Ota@Sun.COM set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags);
11712198SEiji.Ota@Sun.COM set_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags);
11812198SEiji.Ota@Sun.COM if (rm->m_rdma_op && rm->m_rdma_op->r_mapped) {
11912198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("_send_reset",
12012198SEiji.Ota@Sun.COM "RT rm %p mflg 0x%x sgl %p",
12112198SEiji.Ota@Sun.COM rm, rm->m_flags,
12212198SEiji.Ota@Sun.COM rm->m_rdma_op->r_rdma_sg[0].swr.wr_sgl);
12312198SEiji.Ota@Sun.COM }
12412198SEiji.Ota@Sun.COM }
12512198SEiji.Ota@Sun.COM list_move_tail(&conn->c_send_queue, &conn->c_retrans);
12612198SEiji.Ota@Sun.COM mutex_exit(&conn->c_lock);
12712198SEiji.Ota@Sun.COM
12812198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_reset", "Return(conn: %p)", conn);
12912198SEiji.Ota@Sun.COM }
13012198SEiji.Ota@Sun.COM
13112198SEiji.Ota@Sun.COM /*
13212198SEiji.Ota@Sun.COM * We're making the concious trade-off here to only send one message
13312198SEiji.Ota@Sun.COM * down the connection at a time.
13412198SEiji.Ota@Sun.COM * Pro:
13512198SEiji.Ota@Sun.COM * - tx queueing is a simple fifo list
13612198SEiji.Ota@Sun.COM * - reassembly is optional and easily done by transports per conn
13712198SEiji.Ota@Sun.COM * - no per flow rx lookup at all, straight to the socket
13812198SEiji.Ota@Sun.COM * - less per-frag memory and wire overhead
13912198SEiji.Ota@Sun.COM * Con:
14012198SEiji.Ota@Sun.COM * - queued acks can be delayed behind large messages
14112198SEiji.Ota@Sun.COM * Depends:
14212198SEiji.Ota@Sun.COM * - small message latency is higher behind queued large messages
14312198SEiji.Ota@Sun.COM * - large message latency isn't starved by intervening small sends
14412198SEiji.Ota@Sun.COM */
14512198SEiji.Ota@Sun.COM int
rdsv3_send_xmit(struct rdsv3_connection * conn)14612198SEiji.Ota@Sun.COM rdsv3_send_xmit(struct rdsv3_connection *conn)
14712198SEiji.Ota@Sun.COM {
14812198SEiji.Ota@Sun.COM struct rdsv3_message *rm;
14912198SEiji.Ota@Sun.COM unsigned int tmp;
15012198SEiji.Ota@Sun.COM unsigned int send_quota = send_batch_count;
15112198SEiji.Ota@Sun.COM struct rdsv3_scatterlist *sg;
15212198SEiji.Ota@Sun.COM int ret = 0;
15312198SEiji.Ota@Sun.COM int was_empty = 0;
15412198SEiji.Ota@Sun.COM list_t to_be_dropped;
15512198SEiji.Ota@Sun.COM
15612676SEiji.Ota@Sun.COM restart:
15712676SEiji.Ota@Sun.COM if (!rdsv3_conn_up(conn))
15812676SEiji.Ota@Sun.COM goto out;
15912676SEiji.Ota@Sun.COM
16012198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_xmit", "Enter(conn: %p)", conn);
16112198SEiji.Ota@Sun.COM
16212198SEiji.Ota@Sun.COM list_create(&to_be_dropped, sizeof (struct rdsv3_message),
16312198SEiji.Ota@Sun.COM offsetof(struct rdsv3_message, m_conn_item));
16412198SEiji.Ota@Sun.COM
16512198SEiji.Ota@Sun.COM /*
16612198SEiji.Ota@Sun.COM * sendmsg calls here after having queued its message on the send
16712198SEiji.Ota@Sun.COM * queue. We only have one task feeding the connection at a time. If
16812198SEiji.Ota@Sun.COM * another thread is already feeding the queue then we back off. This
16912198SEiji.Ota@Sun.COM * avoids blocking the caller and trading per-connection data between
17012198SEiji.Ota@Sun.COM * caches per message.
17112198SEiji.Ota@Sun.COM */
17212198SEiji.Ota@Sun.COM if (!mutex_tryenter(&conn->c_send_lock)) {
17312198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_xmit",
17412198SEiji.Ota@Sun.COM "Another thread running(conn: %p)", conn);
17512198SEiji.Ota@Sun.COM rdsv3_stats_inc(s_send_sem_contention);
17612198SEiji.Ota@Sun.COM ret = -ENOMEM;
17712198SEiji.Ota@Sun.COM goto out;
17812198SEiji.Ota@Sun.COM }
17912676SEiji.Ota@Sun.COM atomic_add_32(&conn->c_senders, 1);
18012198SEiji.Ota@Sun.COM
18112198SEiji.Ota@Sun.COM if (conn->c_trans->xmit_prepare)
18212198SEiji.Ota@Sun.COM conn->c_trans->xmit_prepare(conn);
18312198SEiji.Ota@Sun.COM
18412198SEiji.Ota@Sun.COM /*
18512198SEiji.Ota@Sun.COM * spin trying to push headers and data down the connection until
18612676SEiji.Ota@Sun.COM * the connection doesn't make forward progress.
18712198SEiji.Ota@Sun.COM */
18812198SEiji.Ota@Sun.COM while (--send_quota) {
18912198SEiji.Ota@Sun.COM /*
19012198SEiji.Ota@Sun.COM * See if need to send a congestion map update if we're
19112198SEiji.Ota@Sun.COM * between sending messages. The send_sem protects our sole
19212198SEiji.Ota@Sun.COM * use of c_map_offset and _bytes.
19312198SEiji.Ota@Sun.COM * Note this is used only by transports that define a special
19412198SEiji.Ota@Sun.COM * xmit_cong_map function. For all others, we create allocate
19512198SEiji.Ota@Sun.COM * a cong_map message and treat it just like any other send.
19612198SEiji.Ota@Sun.COM */
19712198SEiji.Ota@Sun.COM if (conn->c_map_bytes) {
19812198SEiji.Ota@Sun.COM ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong,
19912198SEiji.Ota@Sun.COM conn->c_map_offset);
20012198SEiji.Ota@Sun.COM if (ret <= 0)
20112198SEiji.Ota@Sun.COM break;
20212198SEiji.Ota@Sun.COM
20312198SEiji.Ota@Sun.COM conn->c_map_offset += ret;
20412198SEiji.Ota@Sun.COM conn->c_map_bytes -= ret;
20512198SEiji.Ota@Sun.COM if (conn->c_map_bytes)
20612198SEiji.Ota@Sun.COM continue;
20712198SEiji.Ota@Sun.COM }
20812198SEiji.Ota@Sun.COM
20912198SEiji.Ota@Sun.COM /*
21012198SEiji.Ota@Sun.COM * If we're done sending the current message, clear the
21112198SEiji.Ota@Sun.COM * offset and S/G temporaries.
21212198SEiji.Ota@Sun.COM */
21312198SEiji.Ota@Sun.COM rm = conn->c_xmit_rm;
21412198SEiji.Ota@Sun.COM if (rm != NULL &&
21512198SEiji.Ota@Sun.COM conn->c_xmit_hdr_off == sizeof (struct rdsv3_header) &&
21612198SEiji.Ota@Sun.COM conn->c_xmit_sg == rm->m_nents) {
21712198SEiji.Ota@Sun.COM conn->c_xmit_rm = NULL;
21812198SEiji.Ota@Sun.COM conn->c_xmit_sg = 0;
21912198SEiji.Ota@Sun.COM conn->c_xmit_hdr_off = 0;
22012198SEiji.Ota@Sun.COM conn->c_xmit_data_off = 0;
22112198SEiji.Ota@Sun.COM conn->c_xmit_rdma_sent = 0;
22212198SEiji.Ota@Sun.COM
22312198SEiji.Ota@Sun.COM /* Release the reference to the previous message. */
22412198SEiji.Ota@Sun.COM rdsv3_message_put(rm);
22512198SEiji.Ota@Sun.COM rm = NULL;
22612198SEiji.Ota@Sun.COM }
22712198SEiji.Ota@Sun.COM
22812198SEiji.Ota@Sun.COM /* If we're asked to send a cong map update, do so. */
22912198SEiji.Ota@Sun.COM if (rm == NULL && test_and_clear_bit(0, &conn->c_map_queued)) {
23012198SEiji.Ota@Sun.COM if (conn->c_trans->xmit_cong_map != NULL) {
23112198SEiji.Ota@Sun.COM conn->c_map_offset = 0;
23212198SEiji.Ota@Sun.COM conn->c_map_bytes =
23312198SEiji.Ota@Sun.COM sizeof (struct rdsv3_header) +
23412198SEiji.Ota@Sun.COM RDSV3_CONG_MAP_BYTES;
23512198SEiji.Ota@Sun.COM continue;
23612198SEiji.Ota@Sun.COM }
23712198SEiji.Ota@Sun.COM
23812198SEiji.Ota@Sun.COM rm = rdsv3_cong_update_alloc(conn);
23912198SEiji.Ota@Sun.COM if (IS_ERR(rm)) {
24012198SEiji.Ota@Sun.COM ret = PTR_ERR(rm);
24112198SEiji.Ota@Sun.COM break;
24212198SEiji.Ota@Sun.COM }
24312198SEiji.Ota@Sun.COM
24412198SEiji.Ota@Sun.COM conn->c_xmit_rm = rm;
24512198SEiji.Ota@Sun.COM }
24612198SEiji.Ota@Sun.COM
24712198SEiji.Ota@Sun.COM /*
24812198SEiji.Ota@Sun.COM * Grab the next message from the send queue, if there is one.
24912198SEiji.Ota@Sun.COM *
25012198SEiji.Ota@Sun.COM * c_xmit_rm holds a ref while we're sending this message down
25112198SEiji.Ota@Sun.COM * the connction. We can use this ref while holding the
25212198SEiji.Ota@Sun.COM * send_sem.. rdsv3_send_reset() is serialized with it.
25312198SEiji.Ota@Sun.COM */
25412198SEiji.Ota@Sun.COM if (rm == NULL) {
25512198SEiji.Ota@Sun.COM unsigned int len;
25612198SEiji.Ota@Sun.COM
25712198SEiji.Ota@Sun.COM mutex_enter(&conn->c_lock);
25812198SEiji.Ota@Sun.COM
25912198SEiji.Ota@Sun.COM if (!list_is_empty(&conn->c_send_queue)) {
26012198SEiji.Ota@Sun.COM rm = list_remove_head(&conn->c_send_queue);
26112198SEiji.Ota@Sun.COM rdsv3_message_addref(rm);
26212198SEiji.Ota@Sun.COM
26312198SEiji.Ota@Sun.COM /*
26412198SEiji.Ota@Sun.COM * Move the message from the send queue to
26512198SEiji.Ota@Sun.COM * the retransmit
26612198SEiji.Ota@Sun.COM * list right away.
26712198SEiji.Ota@Sun.COM */
26812198SEiji.Ota@Sun.COM list_insert_tail(&conn->c_retrans, rm);
26912198SEiji.Ota@Sun.COM }
27012198SEiji.Ota@Sun.COM
27112198SEiji.Ota@Sun.COM mutex_exit(&conn->c_lock);
27212198SEiji.Ota@Sun.COM
27312198SEiji.Ota@Sun.COM if (rm == NULL) {
27412198SEiji.Ota@Sun.COM was_empty = 1;
27512198SEiji.Ota@Sun.COM break;
27612198SEiji.Ota@Sun.COM }
27712198SEiji.Ota@Sun.COM
27812198SEiji.Ota@Sun.COM /*
27912198SEiji.Ota@Sun.COM * Unfortunately, the way Infiniband deals with
28012198SEiji.Ota@Sun.COM * RDMA to a bad MR key is by moving the entire
28112198SEiji.Ota@Sun.COM * queue pair to error state. We cold possibly
28212198SEiji.Ota@Sun.COM * recover from that, but right now we drop the
28312198SEiji.Ota@Sun.COM * connection.
28412198SEiji.Ota@Sun.COM * Therefore, we never retransmit messages with
28512198SEiji.Ota@Sun.COM * RDMA ops.
28612198SEiji.Ota@Sun.COM */
28712198SEiji.Ota@Sun.COM if (rm->m_rdma_op &&
28812198SEiji.Ota@Sun.COM test_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags)) {
28912198SEiji.Ota@Sun.COM mutex_enter(&conn->c_lock);
29012198SEiji.Ota@Sun.COM if (test_and_clear_bit(RDSV3_MSG_ON_CONN,
29112198SEiji.Ota@Sun.COM &rm->m_flags))
29212198SEiji.Ota@Sun.COM list_remove_node(&rm->m_conn_item);
29312198SEiji.Ota@Sun.COM list_insert_tail(&to_be_dropped, rm);
29412198SEiji.Ota@Sun.COM mutex_exit(&conn->c_lock);
29512198SEiji.Ota@Sun.COM rdsv3_message_put(rm);
29612198SEiji.Ota@Sun.COM continue;
29712198SEiji.Ota@Sun.COM }
29812198SEiji.Ota@Sun.COM
29912198SEiji.Ota@Sun.COM /* Require an ACK every once in a while */
30012198SEiji.Ota@Sun.COM len = ntohl(rm->m_inc.i_hdr.h_len);
30112198SEiji.Ota@Sun.COM if (conn->c_unacked_packets == 0 ||
30212198SEiji.Ota@Sun.COM conn->c_unacked_bytes < len) {
30312198SEiji.Ota@Sun.COM set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags);
30412198SEiji.Ota@Sun.COM
30512198SEiji.Ota@Sun.COM conn->c_unacked_packets =
30612198SEiji.Ota@Sun.COM rdsv3_sysctl_max_unacked_packets;
30712198SEiji.Ota@Sun.COM conn->c_unacked_bytes =
30812198SEiji.Ota@Sun.COM rdsv3_sysctl_max_unacked_bytes;
30912198SEiji.Ota@Sun.COM rdsv3_stats_inc(s_send_ack_required);
31012198SEiji.Ota@Sun.COM } else {
31112198SEiji.Ota@Sun.COM conn->c_unacked_bytes -= len;
31212198SEiji.Ota@Sun.COM conn->c_unacked_packets--;
31312198SEiji.Ota@Sun.COM }
31412198SEiji.Ota@Sun.COM
31512198SEiji.Ota@Sun.COM conn->c_xmit_rm = rm;
31612198SEiji.Ota@Sun.COM }
31712198SEiji.Ota@Sun.COM
31812198SEiji.Ota@Sun.COM /*
31912198SEiji.Ota@Sun.COM * Try and send an rdma message. Let's see if we can
32012198SEiji.Ota@Sun.COM * keep this simple and require that the transport either
32112198SEiji.Ota@Sun.COM * send the whole rdma or none of it.
32212198SEiji.Ota@Sun.COM */
32312198SEiji.Ota@Sun.COM if (rm->m_rdma_op && !conn->c_xmit_rdma_sent) {
32412198SEiji.Ota@Sun.COM ret = conn->c_trans->xmit_rdma(conn, rm->m_rdma_op);
32512198SEiji.Ota@Sun.COM if (ret)
32612198SEiji.Ota@Sun.COM break;
32712198SEiji.Ota@Sun.COM conn->c_xmit_rdma_sent = 1;
32812198SEiji.Ota@Sun.COM /*
32912198SEiji.Ota@Sun.COM * The transport owns the mapped memory for now.
33012198SEiji.Ota@Sun.COM * You can't unmap it while it's on the send queue
33112198SEiji.Ota@Sun.COM */
33212198SEiji.Ota@Sun.COM set_bit(RDSV3_MSG_MAPPED, &rm->m_flags);
33312198SEiji.Ota@Sun.COM }
33412198SEiji.Ota@Sun.COM
33512198SEiji.Ota@Sun.COM if (conn->c_xmit_hdr_off < sizeof (struct rdsv3_header) ||
33612198SEiji.Ota@Sun.COM conn->c_xmit_sg < rm->m_nents) {
33712198SEiji.Ota@Sun.COM ret = conn->c_trans->xmit(conn, rm,
33812198SEiji.Ota@Sun.COM conn->c_xmit_hdr_off,
33912198SEiji.Ota@Sun.COM conn->c_xmit_sg,
34012198SEiji.Ota@Sun.COM conn->c_xmit_data_off);
34112198SEiji.Ota@Sun.COM if (ret <= 0)
34212198SEiji.Ota@Sun.COM break;
34312198SEiji.Ota@Sun.COM
34412198SEiji.Ota@Sun.COM if (conn->c_xmit_hdr_off <
34512198SEiji.Ota@Sun.COM sizeof (struct rdsv3_header)) {
34612198SEiji.Ota@Sun.COM tmp = min(ret,
34712198SEiji.Ota@Sun.COM sizeof (struct rdsv3_header) -
34812198SEiji.Ota@Sun.COM conn->c_xmit_hdr_off);
34912198SEiji.Ota@Sun.COM conn->c_xmit_hdr_off += tmp;
35012198SEiji.Ota@Sun.COM ret -= tmp;
35112198SEiji.Ota@Sun.COM }
35212198SEiji.Ota@Sun.COM
35312198SEiji.Ota@Sun.COM sg = &rm->m_sg[conn->c_xmit_sg];
35412198SEiji.Ota@Sun.COM while (ret) {
35512198SEiji.Ota@Sun.COM tmp = min(ret, rdsv3_sg_len(sg) -
35612198SEiji.Ota@Sun.COM conn->c_xmit_data_off);
35712198SEiji.Ota@Sun.COM conn->c_xmit_data_off += tmp;
35812198SEiji.Ota@Sun.COM ret -= tmp;
35912198SEiji.Ota@Sun.COM if (conn->c_xmit_data_off == rdsv3_sg_len(sg)) {
36012198SEiji.Ota@Sun.COM conn->c_xmit_data_off = 0;
36112198SEiji.Ota@Sun.COM sg++;
36212198SEiji.Ota@Sun.COM conn->c_xmit_sg++;
36312198SEiji.Ota@Sun.COM ASSERT(!(ret != 0 &&
36412198SEiji.Ota@Sun.COM conn->c_xmit_sg == rm->m_nents));
36512198SEiji.Ota@Sun.COM }
36612198SEiji.Ota@Sun.COM }
36712198SEiji.Ota@Sun.COM }
36812198SEiji.Ota@Sun.COM }
36912198SEiji.Ota@Sun.COM
37012198SEiji.Ota@Sun.COM /* Nuke any messages we decided not to retransmit. */
37112198SEiji.Ota@Sun.COM if (!list_is_empty(&to_be_dropped))
37212863SEiji.Ota@Sun.COM rdsv3_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
37312198SEiji.Ota@Sun.COM
37412198SEiji.Ota@Sun.COM if (conn->c_trans->xmit_complete)
37512198SEiji.Ota@Sun.COM conn->c_trans->xmit_complete(conn);
37612198SEiji.Ota@Sun.COM
37712198SEiji.Ota@Sun.COM /*
37812198SEiji.Ota@Sun.COM * We might be racing with another sender who queued a message but
37912198SEiji.Ota@Sun.COM * backed off on noticing that we held the c_send_lock. If we check
38012198SEiji.Ota@Sun.COM * for queued messages after dropping the sem then either we'll
38112198SEiji.Ota@Sun.COM * see the queued message or the queuer will get the sem. If we
38212198SEiji.Ota@Sun.COM * notice the queued message then we trigger an immediate retry.
38312198SEiji.Ota@Sun.COM *
38412198SEiji.Ota@Sun.COM * We need to be careful only to do this when we stopped processing
38512198SEiji.Ota@Sun.COM * the send queue because it was empty. It's the only way we
38612198SEiji.Ota@Sun.COM * stop processing the loop when the transport hasn't taken
38712198SEiji.Ota@Sun.COM * responsibility for forward progress.
38812198SEiji.Ota@Sun.COM */
38912198SEiji.Ota@Sun.COM mutex_exit(&conn->c_send_lock);
39012198SEiji.Ota@Sun.COM
39112198SEiji.Ota@Sun.COM if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) {
39212198SEiji.Ota@Sun.COM /*
39312198SEiji.Ota@Sun.COM * We exhausted the send quota, but there's work left to
39412198SEiji.Ota@Sun.COM * do. Return and (re-)schedule the send worker.
39512198SEiji.Ota@Sun.COM */
39612198SEiji.Ota@Sun.COM ret = -EAGAIN;
39712198SEiji.Ota@Sun.COM }
39812198SEiji.Ota@Sun.COM
39912676SEiji.Ota@Sun.COM atomic_dec_32(&conn->c_senders);
40012676SEiji.Ota@Sun.COM
40112198SEiji.Ota@Sun.COM if (ret == 0 && was_empty) {
40212198SEiji.Ota@Sun.COM /*
40312198SEiji.Ota@Sun.COM * A simple bit test would be way faster than taking the
40412198SEiji.Ota@Sun.COM * spin lock
40512198SEiji.Ota@Sun.COM */
40612198SEiji.Ota@Sun.COM mutex_enter(&conn->c_lock);
40712198SEiji.Ota@Sun.COM if (!list_is_empty(&conn->c_send_queue)) {
40812198SEiji.Ota@Sun.COM rdsv3_stats_inc(s_send_sem_queue_raced);
40912198SEiji.Ota@Sun.COM ret = -EAGAIN;
41012198SEiji.Ota@Sun.COM }
41112198SEiji.Ota@Sun.COM mutex_exit(&conn->c_lock);
41212198SEiji.Ota@Sun.COM }
41312198SEiji.Ota@Sun.COM
41412198SEiji.Ota@Sun.COM out:
41512198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_xmit", "Return(conn: %p, ret: %d)",
41612198SEiji.Ota@Sun.COM conn, ret);
41712198SEiji.Ota@Sun.COM return (ret);
41812198SEiji.Ota@Sun.COM }
41912198SEiji.Ota@Sun.COM
42012198SEiji.Ota@Sun.COM static void
rdsv3_send_sndbuf_remove(struct rdsv3_sock * rs,struct rdsv3_message * rm)42112198SEiji.Ota@Sun.COM rdsv3_send_sndbuf_remove(struct rdsv3_sock *rs, struct rdsv3_message *rm)
42212198SEiji.Ota@Sun.COM {
42312198SEiji.Ota@Sun.COM uint32_t len = ntohl(rm->m_inc.i_hdr.h_len);
42412198SEiji.Ota@Sun.COM
42512198SEiji.Ota@Sun.COM ASSERT(mutex_owned(&rs->rs_lock));
42612198SEiji.Ota@Sun.COM
42712198SEiji.Ota@Sun.COM ASSERT(rs->rs_snd_bytes >= len);
42812198SEiji.Ota@Sun.COM rs->rs_snd_bytes -= len;
42912198SEiji.Ota@Sun.COM
43012198SEiji.Ota@Sun.COM if (rs->rs_snd_bytes == 0)
43112198SEiji.Ota@Sun.COM rdsv3_stats_inc(s_send_queue_empty);
43212198SEiji.Ota@Sun.COM }
43312198SEiji.Ota@Sun.COM
43412198SEiji.Ota@Sun.COM static inline int
rdsv3_send_is_acked(struct rdsv3_message * rm,uint64_t ack,is_acked_func is_acked)43512198SEiji.Ota@Sun.COM rdsv3_send_is_acked(struct rdsv3_message *rm, uint64_t ack,
43612198SEiji.Ota@Sun.COM is_acked_func is_acked)
43712198SEiji.Ota@Sun.COM {
43812198SEiji.Ota@Sun.COM if (is_acked)
43912198SEiji.Ota@Sun.COM return (is_acked(rm, ack));
44012198SEiji.Ota@Sun.COM return (ntohll(rm->m_inc.i_hdr.h_sequence) <= ack);
44112198SEiji.Ota@Sun.COM }
44212198SEiji.Ota@Sun.COM
44312198SEiji.Ota@Sun.COM /*
44412198SEiji.Ota@Sun.COM * Returns true if there are no messages on the send and retransmit queues
44512198SEiji.Ota@Sun.COM * which have a sequence number greater than or equal to the given sequence
44612198SEiji.Ota@Sun.COM * number.
44712198SEiji.Ota@Sun.COM */
44812198SEiji.Ota@Sun.COM int
rdsv3_send_acked_before(struct rdsv3_connection * conn,uint64_t seq)44912198SEiji.Ota@Sun.COM rdsv3_send_acked_before(struct rdsv3_connection *conn, uint64_t seq)
45012198SEiji.Ota@Sun.COM {
45112198SEiji.Ota@Sun.COM struct rdsv3_message *rm;
45212198SEiji.Ota@Sun.COM int ret = 1;
45312198SEiji.Ota@Sun.COM
45412198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_acked_before", "Enter(conn: %p)", conn);
45512198SEiji.Ota@Sun.COM
45612198SEiji.Ota@Sun.COM mutex_enter(&conn->c_lock);
45712198SEiji.Ota@Sun.COM
45812198SEiji.Ota@Sun.COM /* XXX - original code spits out warning */
45912198SEiji.Ota@Sun.COM rm = list_head(&conn->c_retrans);
46012198SEiji.Ota@Sun.COM if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq)
46112198SEiji.Ota@Sun.COM ret = 0;
46212198SEiji.Ota@Sun.COM
46312198SEiji.Ota@Sun.COM /* XXX - original code spits out warning */
46412198SEiji.Ota@Sun.COM rm = list_head(&conn->c_send_queue);
46512198SEiji.Ota@Sun.COM if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq)
46612198SEiji.Ota@Sun.COM ret = 0;
46712198SEiji.Ota@Sun.COM
46812198SEiji.Ota@Sun.COM mutex_exit(&conn->c_lock);
46912198SEiji.Ota@Sun.COM
47012198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_acked_before", "Return(conn: %p)", conn);
47112198SEiji.Ota@Sun.COM
47212198SEiji.Ota@Sun.COM return (ret);
47312198SEiji.Ota@Sun.COM }
47412198SEiji.Ota@Sun.COM
47512198SEiji.Ota@Sun.COM /*
47612198SEiji.Ota@Sun.COM * This is pretty similar to what happens below in the ACK
47712198SEiji.Ota@Sun.COM * handling code - except that we call here as soon as we get
47812198SEiji.Ota@Sun.COM * the IB send completion on the RDMA op and the accompanying
47912198SEiji.Ota@Sun.COM * message.
48012198SEiji.Ota@Sun.COM */
48112198SEiji.Ota@Sun.COM void
rdsv3_rdma_send_complete(struct rdsv3_message * rm,int status)48212198SEiji.Ota@Sun.COM rdsv3_rdma_send_complete(struct rdsv3_message *rm, int status)
48312198SEiji.Ota@Sun.COM {
48412198SEiji.Ota@Sun.COM struct rdsv3_sock *rs = NULL;
48512198SEiji.Ota@Sun.COM struct rdsv3_rdma_op *ro;
48612198SEiji.Ota@Sun.COM struct rdsv3_notifier *notifier;
48712198SEiji.Ota@Sun.COM
48812198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Enter(rm: %p)", rm);
48912198SEiji.Ota@Sun.COM
49012198SEiji.Ota@Sun.COM mutex_enter(&rm->m_rs_lock);
49112198SEiji.Ota@Sun.COM
49212198SEiji.Ota@Sun.COM ro = rm->m_rdma_op;
49312198SEiji.Ota@Sun.COM if (test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags) &&
49412414SEiji.Ota@Sun.COM ro && ro->r_notify && ro->r_notifier) {
49512414SEiji.Ota@Sun.COM notifier = ro->r_notifier;
49612198SEiji.Ota@Sun.COM rs = rm->m_rs;
49712198SEiji.Ota@Sun.COM rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs));
49812198SEiji.Ota@Sun.COM
49912198SEiji.Ota@Sun.COM notifier->n_status = status;
50012198SEiji.Ota@Sun.COM mutex_enter(&rs->rs_lock);
50112198SEiji.Ota@Sun.COM list_insert_tail(&rs->rs_notify_queue, notifier);
50212198SEiji.Ota@Sun.COM mutex_exit(&rs->rs_lock);
50312414SEiji.Ota@Sun.COM ro->r_notifier = NULL;
50412198SEiji.Ota@Sun.COM }
50512198SEiji.Ota@Sun.COM
50612198SEiji.Ota@Sun.COM mutex_exit(&rm->m_rs_lock);
50712198SEiji.Ota@Sun.COM
50812198SEiji.Ota@Sun.COM if (rs) {
50912794SGiri.Adari@Sun.COM struct rsock *sk = rdsv3_rs_to_sk(rs);
51012794SGiri.Adari@Sun.COM int error;
51112794SGiri.Adari@Sun.COM
51212198SEiji.Ota@Sun.COM rdsv3_wake_sk_sleep(rs);
51312794SGiri.Adari@Sun.COM
51412794SGiri.Adari@Sun.COM /* wake up anyone waiting in poll */
51512794SGiri.Adari@Sun.COM sk->sk_upcalls->su_recv(sk->sk_upper_handle, NULL,
51612794SGiri.Adari@Sun.COM 0, 0, &error, NULL);
51712794SGiri.Adari@Sun.COM if (error != 0) {
51812794SGiri.Adari@Sun.COM RDSV3_DPRINTF2("rdsv3_recv_incoming",
51912794SGiri.Adari@Sun.COM "su_recv returned: %d", error);
52012794SGiri.Adari@Sun.COM }
52112794SGiri.Adari@Sun.COM
52212198SEiji.Ota@Sun.COM rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs));
52312198SEiji.Ota@Sun.COM }
52412198SEiji.Ota@Sun.COM
52512198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Return(rm: %p)", rm);
52612198SEiji.Ota@Sun.COM }
52712198SEiji.Ota@Sun.COM
52812198SEiji.Ota@Sun.COM /*
52912198SEiji.Ota@Sun.COM * This is the same as rdsv3_rdma_send_complete except we
53012198SEiji.Ota@Sun.COM * don't do any locking - we have all the ingredients (message,
53112198SEiji.Ota@Sun.COM * socket, socket lock) and can just move the notifier.
53212198SEiji.Ota@Sun.COM */
53312198SEiji.Ota@Sun.COM static inline void
__rdsv3_rdma_send_complete(struct rdsv3_sock * rs,struct rdsv3_message * rm,int status)53412198SEiji.Ota@Sun.COM __rdsv3_rdma_send_complete(struct rdsv3_sock *rs, struct rdsv3_message *rm,
53512198SEiji.Ota@Sun.COM int status)
53612198SEiji.Ota@Sun.COM {
53712198SEiji.Ota@Sun.COM struct rdsv3_rdma_op *ro;
53812198SEiji.Ota@Sun.COM void *ic;
53912198SEiji.Ota@Sun.COM
54012198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("__rdsv3_rdma_send_complete",
54112198SEiji.Ota@Sun.COM "Enter(rs: %p, rm: %p)", rs, rm);
54212198SEiji.Ota@Sun.COM
54312198SEiji.Ota@Sun.COM ro = rm->m_rdma_op;
54412198SEiji.Ota@Sun.COM if (ro && ro->r_notify && ro->r_notifier) {
54512198SEiji.Ota@Sun.COM ro->r_notifier->n_status = status;
54612198SEiji.Ota@Sun.COM list_insert_tail(&rs->rs_notify_queue, ro->r_notifier);
54712198SEiji.Ota@Sun.COM ro->r_notifier = NULL;
54812198SEiji.Ota@Sun.COM }
54912198SEiji.Ota@Sun.COM
55012198SEiji.Ota@Sun.COM /* No need to wake the app - caller does this */
55112198SEiji.Ota@Sun.COM }
55212198SEiji.Ota@Sun.COM
55312198SEiji.Ota@Sun.COM /*
55412198SEiji.Ota@Sun.COM * This is called from the IB send completion when we detect
55512198SEiji.Ota@Sun.COM * a RDMA operation that failed with remote access error.
55612198SEiji.Ota@Sun.COM * So speed is not an issue here.
55712198SEiji.Ota@Sun.COM */
55812198SEiji.Ota@Sun.COM struct rdsv3_message *
rdsv3_send_get_message(struct rdsv3_connection * conn,struct rdsv3_rdma_op * op)55912198SEiji.Ota@Sun.COM rdsv3_send_get_message(struct rdsv3_connection *conn,
56012198SEiji.Ota@Sun.COM struct rdsv3_rdma_op *op)
56112198SEiji.Ota@Sun.COM {
56212198SEiji.Ota@Sun.COM struct rdsv3_message *rm, *tmp, *found = NULL;
56312198SEiji.Ota@Sun.COM
56412198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_get_message", "Enter(conn: %p)", conn);
56512198SEiji.Ota@Sun.COM
56612198SEiji.Ota@Sun.COM mutex_enter(&conn->c_lock);
56712198SEiji.Ota@Sun.COM
56812198SEiji.Ota@Sun.COM RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) {
56912198SEiji.Ota@Sun.COM if (rm->m_rdma_op == op) {
57012198SEiji.Ota@Sun.COM atomic_add_32(&rm->m_refcount, 1);
57112198SEiji.Ota@Sun.COM found = rm;
57212198SEiji.Ota@Sun.COM goto out;
57312198SEiji.Ota@Sun.COM }
57412198SEiji.Ota@Sun.COM }
57512198SEiji.Ota@Sun.COM
57612198SEiji.Ota@Sun.COM RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_send_queue,
57712198SEiji.Ota@Sun.COM m_conn_item) {
57812198SEiji.Ota@Sun.COM if (rm->m_rdma_op == op) {
57912198SEiji.Ota@Sun.COM atomic_add_32(&rm->m_refcount, 1);
58012198SEiji.Ota@Sun.COM found = rm;
58112198SEiji.Ota@Sun.COM break;
58212198SEiji.Ota@Sun.COM }
58312198SEiji.Ota@Sun.COM }
58412198SEiji.Ota@Sun.COM
58512198SEiji.Ota@Sun.COM out:
58612198SEiji.Ota@Sun.COM mutex_exit(&conn->c_lock);
58712198SEiji.Ota@Sun.COM
58812198SEiji.Ota@Sun.COM return (found);
58912198SEiji.Ota@Sun.COM }
59012198SEiji.Ota@Sun.COM
59112198SEiji.Ota@Sun.COM /*
59212198SEiji.Ota@Sun.COM * This removes messages from the socket's list if they're on it. The list
59312198SEiji.Ota@Sun.COM * argument must be private to the caller, we must be able to modify it
59412198SEiji.Ota@Sun.COM * without locks. The messages must have a reference held for their
59512198SEiji.Ota@Sun.COM * position on the list. This function will drop that reference after
59612198SEiji.Ota@Sun.COM * removing the messages from the 'messages' list regardless of if it found
59712198SEiji.Ota@Sun.COM * the messages on the socket list or not.
59812198SEiji.Ota@Sun.COM */
59912198SEiji.Ota@Sun.COM void
rdsv3_send_remove_from_sock(struct list * messages,int status)60012198SEiji.Ota@Sun.COM rdsv3_send_remove_from_sock(struct list *messages, int status)
60112198SEiji.Ota@Sun.COM {
60212198SEiji.Ota@Sun.COM struct rdsv3_sock *rs = NULL;
60312198SEiji.Ota@Sun.COM struct rdsv3_message *rm;
60412198SEiji.Ota@Sun.COM
60512198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Enter");
60612198SEiji.Ota@Sun.COM
60712198SEiji.Ota@Sun.COM while (!list_is_empty(messages)) {
60812414SEiji.Ota@Sun.COM int was_on_sock = 0;
60912198SEiji.Ota@Sun.COM rm = list_remove_head(messages);
61012198SEiji.Ota@Sun.COM
61112198SEiji.Ota@Sun.COM /*
61212198SEiji.Ota@Sun.COM * If we see this flag cleared then we're *sure* that someone
61312198SEiji.Ota@Sun.COM * else beat us to removing it from the sock. If we race
61412198SEiji.Ota@Sun.COM * with their flag update we'll get the lock and then really
61512198SEiji.Ota@Sun.COM * see that the flag has been cleared.
61612198SEiji.Ota@Sun.COM *
61712198SEiji.Ota@Sun.COM * The message spinlock makes sure nobody clears rm->m_rs
61812198SEiji.Ota@Sun.COM * while we're messing with it. It does not prevent the
61912198SEiji.Ota@Sun.COM * message from being removed from the socket, though.
62012198SEiji.Ota@Sun.COM */
62112198SEiji.Ota@Sun.COM mutex_enter(&rm->m_rs_lock);
62212198SEiji.Ota@Sun.COM if (!test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags))
62312198SEiji.Ota@Sun.COM goto unlock_and_drop;
62412198SEiji.Ota@Sun.COM
62512198SEiji.Ota@Sun.COM if (rs != rm->m_rs) {
62612198SEiji.Ota@Sun.COM if (rs) {
62712198SEiji.Ota@Sun.COM rdsv3_wake_sk_sleep(rs);
62812198SEiji.Ota@Sun.COM rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs));
62912198SEiji.Ota@Sun.COM }
63012198SEiji.Ota@Sun.COM rs = rm->m_rs;
63112198SEiji.Ota@Sun.COM rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs));
63212198SEiji.Ota@Sun.COM }
63312198SEiji.Ota@Sun.COM
63412198SEiji.Ota@Sun.COM mutex_enter(&rs->rs_lock);
63512198SEiji.Ota@Sun.COM if (test_and_clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) {
63612198SEiji.Ota@Sun.COM struct rdsv3_rdma_op *ro = rm->m_rdma_op;
63712198SEiji.Ota@Sun.COM struct rdsv3_notifier *notifier;
63812198SEiji.Ota@Sun.COM
63912198SEiji.Ota@Sun.COM list_remove_node(&rm->m_sock_item);
64012198SEiji.Ota@Sun.COM rdsv3_send_sndbuf_remove(rs, rm);
64112414SEiji.Ota@Sun.COM if (ro && ro->r_notifier &&
64212198SEiji.Ota@Sun.COM (status || ro->r_notify)) {
64312414SEiji.Ota@Sun.COM notifier = ro->r_notifier;
64412198SEiji.Ota@Sun.COM list_insert_tail(&rs->rs_notify_queue,
64512198SEiji.Ota@Sun.COM notifier);
64612198SEiji.Ota@Sun.COM if (!notifier->n_status)
64712198SEiji.Ota@Sun.COM notifier->n_status = status;
64812198SEiji.Ota@Sun.COM rm->m_rdma_op->r_notifier = NULL;
64912198SEiji.Ota@Sun.COM }
65012414SEiji.Ota@Sun.COM was_on_sock = 1;
65112198SEiji.Ota@Sun.COM rm->m_rs = NULL;
65212198SEiji.Ota@Sun.COM }
65312198SEiji.Ota@Sun.COM mutex_exit(&rs->rs_lock);
65412198SEiji.Ota@Sun.COM
65512198SEiji.Ota@Sun.COM unlock_and_drop:
65612198SEiji.Ota@Sun.COM mutex_exit(&rm->m_rs_lock);
65712198SEiji.Ota@Sun.COM rdsv3_message_put(rm);
65812414SEiji.Ota@Sun.COM if (was_on_sock)
65912414SEiji.Ota@Sun.COM rdsv3_message_put(rm);
66012198SEiji.Ota@Sun.COM }
66112198SEiji.Ota@Sun.COM
66212198SEiji.Ota@Sun.COM if (rs) {
66312198SEiji.Ota@Sun.COM rdsv3_wake_sk_sleep(rs);
66412198SEiji.Ota@Sun.COM rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs));
66512198SEiji.Ota@Sun.COM }
66612198SEiji.Ota@Sun.COM
66712198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Return");
66812198SEiji.Ota@Sun.COM }
66912198SEiji.Ota@Sun.COM
67012198SEiji.Ota@Sun.COM /*
67112198SEiji.Ota@Sun.COM * Transports call here when they've determined that the receiver queued
67212198SEiji.Ota@Sun.COM * messages up to, and including, the given sequence number. Messages are
67312198SEiji.Ota@Sun.COM * moved to the retrans queue when rdsv3_send_xmit picks them off the send
67412198SEiji.Ota@Sun.COM * queue. This means that in the TCP case, the message may not have been
67512198SEiji.Ota@Sun.COM * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked
67612198SEiji.Ota@Sun.COM * checks the RDSV3_MSG_HAS_ACK_SEQ bit.
67712198SEiji.Ota@Sun.COM *
67812198SEiji.Ota@Sun.COM * XXX It's not clear to me how this is safely serialized with socket
67912198SEiji.Ota@Sun.COM * destruction. Maybe it should bail if it sees SOCK_DEAD.
68012198SEiji.Ota@Sun.COM */
68112198SEiji.Ota@Sun.COM void
rdsv3_send_drop_acked(struct rdsv3_connection * conn,uint64_t ack,is_acked_func is_acked)68212198SEiji.Ota@Sun.COM rdsv3_send_drop_acked(struct rdsv3_connection *conn, uint64_t ack,
68312198SEiji.Ota@Sun.COM is_acked_func is_acked)
68412198SEiji.Ota@Sun.COM {
68512198SEiji.Ota@Sun.COM struct rdsv3_message *rm, *tmp;
68612198SEiji.Ota@Sun.COM list_t list;
68712198SEiji.Ota@Sun.COM
68812198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Enter(conn: %p)", conn);
68912198SEiji.Ota@Sun.COM
69012198SEiji.Ota@Sun.COM list_create(&list, sizeof (struct rdsv3_message),
69112198SEiji.Ota@Sun.COM offsetof(struct rdsv3_message, m_conn_item));
69212198SEiji.Ota@Sun.COM
69312198SEiji.Ota@Sun.COM mutex_enter(&conn->c_lock);
69412198SEiji.Ota@Sun.COM
69512198SEiji.Ota@Sun.COM RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) {
69612198SEiji.Ota@Sun.COM if (!rdsv3_send_is_acked(rm, ack, is_acked))
69712198SEiji.Ota@Sun.COM break;
69812198SEiji.Ota@Sun.COM
69912198SEiji.Ota@Sun.COM list_remove_node(&rm->m_conn_item);
70012198SEiji.Ota@Sun.COM list_insert_tail(&list, rm);
70112198SEiji.Ota@Sun.COM clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
70212198SEiji.Ota@Sun.COM }
70312198SEiji.Ota@Sun.COM
70412198SEiji.Ota@Sun.COM #if 0
70512198SEiji.Ota@Sun.COM XXX
70612198SEiji.Ota@Sun.COM /* order flag updates with spin locks */
70712198SEiji.Ota@Sun.COM if (!list_is_empty(&list))
70812198SEiji.Ota@Sun.COM smp_mb__after_clear_bit();
70912198SEiji.Ota@Sun.COM #endif
71012198SEiji.Ota@Sun.COM
71112198SEiji.Ota@Sun.COM mutex_exit(&conn->c_lock);
71212198SEiji.Ota@Sun.COM
71312198SEiji.Ota@Sun.COM /* now remove the messages from the sock list as needed */
71412863SEiji.Ota@Sun.COM rdsv3_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
71512198SEiji.Ota@Sun.COM
71612198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Return(conn: %p)", conn);
71712198SEiji.Ota@Sun.COM }
71812198SEiji.Ota@Sun.COM
71912198SEiji.Ota@Sun.COM void
rdsv3_send_drop_to(struct rdsv3_sock * rs,struct sockaddr_in * dest)72012198SEiji.Ota@Sun.COM rdsv3_send_drop_to(struct rdsv3_sock *rs, struct sockaddr_in *dest)
72112198SEiji.Ota@Sun.COM {
72212198SEiji.Ota@Sun.COM struct rdsv3_message *rm, *tmp;
72312198SEiji.Ota@Sun.COM struct rdsv3_connection *conn;
72412198SEiji.Ota@Sun.COM list_t list;
72512198SEiji.Ota@Sun.COM int wake = 0;
72612198SEiji.Ota@Sun.COM
72712198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_drop_to", "Enter(rs: %p)", rs);
72812198SEiji.Ota@Sun.COM
72912198SEiji.Ota@Sun.COM list_create(&list, sizeof (struct rdsv3_message),
73012198SEiji.Ota@Sun.COM offsetof(struct rdsv3_message, m_sock_item));
73112198SEiji.Ota@Sun.COM
73212198SEiji.Ota@Sun.COM /* get all the messages we're dropping under the rs lock */
73312198SEiji.Ota@Sun.COM mutex_enter(&rs->rs_lock);
73412198SEiji.Ota@Sun.COM
73512198SEiji.Ota@Sun.COM RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &rs->rs_send_queue,
73612198SEiji.Ota@Sun.COM m_sock_item) {
73712198SEiji.Ota@Sun.COM if (dest && (dest->sin_addr.s_addr != rm->m_daddr ||
73812198SEiji.Ota@Sun.COM dest->sin_port != rm->m_inc.i_hdr.h_dport))
73912198SEiji.Ota@Sun.COM continue;
74012198SEiji.Ota@Sun.COM wake = 1;
74112198SEiji.Ota@Sun.COM list_remove(&rs->rs_send_queue, rm);
74212198SEiji.Ota@Sun.COM list_insert_tail(&list, rm);
74312198SEiji.Ota@Sun.COM rdsv3_send_sndbuf_remove(rs, rm);
74412198SEiji.Ota@Sun.COM clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags);
74512198SEiji.Ota@Sun.COM }
74612198SEiji.Ota@Sun.COM
74712198SEiji.Ota@Sun.COM mutex_exit(&rs->rs_lock);
74812198SEiji.Ota@Sun.COM
74912198SEiji.Ota@Sun.COM conn = NULL;
75012198SEiji.Ota@Sun.COM
75112198SEiji.Ota@Sun.COM /* now remove the messages from the conn list as needed */
75212198SEiji.Ota@Sun.COM RDSV3_FOR_EACH_LIST_NODE(rm, &list, m_sock_item) {
75312198SEiji.Ota@Sun.COM /*
75412198SEiji.Ota@Sun.COM * We do this here rather than in the loop above, so that
75512198SEiji.Ota@Sun.COM * we don't have to nest m_rs_lock under rs->rs_lock
75612198SEiji.Ota@Sun.COM */
75712198SEiji.Ota@Sun.COM mutex_enter(&rm->m_rs_lock);
75812198SEiji.Ota@Sun.COM /* If this is a RDMA operation, notify the app. */
75912863SEiji.Ota@Sun.COM __rdsv3_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED);
76012198SEiji.Ota@Sun.COM rm->m_rs = NULL;
76112198SEiji.Ota@Sun.COM mutex_exit(&rm->m_rs_lock);
76212198SEiji.Ota@Sun.COM
76312198SEiji.Ota@Sun.COM /*
76412198SEiji.Ota@Sun.COM * If we see this flag cleared then we're *sure* that someone
76512198SEiji.Ota@Sun.COM * else beat us to removing it from the conn. If we race
76612198SEiji.Ota@Sun.COM * with their flag update we'll get the lock and then really
76712198SEiji.Ota@Sun.COM * see that the flag has been cleared.
76812198SEiji.Ota@Sun.COM */
76912198SEiji.Ota@Sun.COM if (!test_bit(RDSV3_MSG_ON_CONN, &rm->m_flags))
77012198SEiji.Ota@Sun.COM continue;
77112198SEiji.Ota@Sun.COM
77212198SEiji.Ota@Sun.COM if (conn != rm->m_inc.i_conn) {
77312198SEiji.Ota@Sun.COM if (conn)
77412198SEiji.Ota@Sun.COM mutex_exit(&conn->c_lock);
77512198SEiji.Ota@Sun.COM conn = rm->m_inc.i_conn;
77612198SEiji.Ota@Sun.COM mutex_enter(&conn->c_lock);
77712198SEiji.Ota@Sun.COM }
77812198SEiji.Ota@Sun.COM
77912198SEiji.Ota@Sun.COM if (test_and_clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) {
78012198SEiji.Ota@Sun.COM list_remove_node(&rm->m_conn_item);
78112198SEiji.Ota@Sun.COM rdsv3_message_put(rm);
78212198SEiji.Ota@Sun.COM }
78312198SEiji.Ota@Sun.COM }
78412198SEiji.Ota@Sun.COM
78512198SEiji.Ota@Sun.COM if (conn)
78612198SEiji.Ota@Sun.COM mutex_exit(&conn->c_lock);
78712198SEiji.Ota@Sun.COM
78812198SEiji.Ota@Sun.COM if (wake)
78912198SEiji.Ota@Sun.COM rdsv3_wake_sk_sleep(rs);
79012198SEiji.Ota@Sun.COM
79112198SEiji.Ota@Sun.COM while (!list_is_empty(&list)) {
79212198SEiji.Ota@Sun.COM rm = list_remove_head(&list);
79312198SEiji.Ota@Sun.COM
79412198SEiji.Ota@Sun.COM rdsv3_message_wait(rm);
79512198SEiji.Ota@Sun.COM rdsv3_message_put(rm);
79612198SEiji.Ota@Sun.COM }
79712198SEiji.Ota@Sun.COM
79812198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_drop_to", "Return(rs: %p)", rs);
79912198SEiji.Ota@Sun.COM }
80012198SEiji.Ota@Sun.COM
80112198SEiji.Ota@Sun.COM /*
80212198SEiji.Ota@Sun.COM * we only want this to fire once so we use the callers 'queued'. It's
80312198SEiji.Ota@Sun.COM * possible that another thread can race with us and remove the
80412198SEiji.Ota@Sun.COM * message from the flow with RDSV3_CANCEL_SENT_TO.
80512198SEiji.Ota@Sun.COM */
80612198SEiji.Ota@Sun.COM static int
rdsv3_send_queue_rm(struct rdsv3_sock * rs,struct rdsv3_connection * conn,struct rdsv3_message * rm,uint16_be_t sport,uint16_be_t dport,int * queued)80712198SEiji.Ota@Sun.COM rdsv3_send_queue_rm(struct rdsv3_sock *rs, struct rdsv3_connection *conn,
80812198SEiji.Ota@Sun.COM struct rdsv3_message *rm, uint16_be_t sport,
80912198SEiji.Ota@Sun.COM uint16_be_t dport, int *queued)
81012198SEiji.Ota@Sun.COM {
81112198SEiji.Ota@Sun.COM uint32_t len;
81212198SEiji.Ota@Sun.COM
81312198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Enter(rs: %p, rm: %p)", rs, rm);
81412198SEiji.Ota@Sun.COM
81512198SEiji.Ota@Sun.COM if (*queued)
81612198SEiji.Ota@Sun.COM goto out;
81712198SEiji.Ota@Sun.COM
81812198SEiji.Ota@Sun.COM len = ntohl(rm->m_inc.i_hdr.h_len);
81912198SEiji.Ota@Sun.COM
82012198SEiji.Ota@Sun.COM /*
82112198SEiji.Ota@Sun.COM * this is the only place which holds both the socket's rs_lock
82212198SEiji.Ota@Sun.COM * and the connection's c_lock
82312198SEiji.Ota@Sun.COM */
82412198SEiji.Ota@Sun.COM mutex_enter(&rs->rs_lock);
82512198SEiji.Ota@Sun.COM
82612198SEiji.Ota@Sun.COM /*
82712198SEiji.Ota@Sun.COM * If there is a little space in sndbuf, we don't queue anything,
82812198SEiji.Ota@Sun.COM * and userspace gets -EAGAIN. But poll() indicates there's send
82912198SEiji.Ota@Sun.COM * room. This can lead to bad behavior (spinning) if snd_bytes isn't
83012198SEiji.Ota@Sun.COM * freed up by incoming acks. So we check the *old* value of
83112198SEiji.Ota@Sun.COM * rs_snd_bytes here to allow the last msg to exceed the buffer,
83212198SEiji.Ota@Sun.COM * and poll() now knows no more data can be sent.
83312198SEiji.Ota@Sun.COM */
83412198SEiji.Ota@Sun.COM if (rs->rs_snd_bytes < rdsv3_sk_sndbuf(rs)) {
83512198SEiji.Ota@Sun.COM rs->rs_snd_bytes += len;
83612198SEiji.Ota@Sun.COM
83712198SEiji.Ota@Sun.COM /*
83812198SEiji.Ota@Sun.COM * let recv side know we are close to send space exhaustion.
83912198SEiji.Ota@Sun.COM * This is probably not the optimal way to do it, as this
84012198SEiji.Ota@Sun.COM * means we set the flag on *all* messages as soon as our
84112198SEiji.Ota@Sun.COM * throughput hits a certain threshold.
84212198SEiji.Ota@Sun.COM */
84312198SEiji.Ota@Sun.COM if (rs->rs_snd_bytes >= rdsv3_sk_sndbuf(rs) / 2)
84412198SEiji.Ota@Sun.COM set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags);
84512198SEiji.Ota@Sun.COM
84612198SEiji.Ota@Sun.COM list_insert_tail(&rs->rs_send_queue, rm);
84712198SEiji.Ota@Sun.COM set_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags);
84812198SEiji.Ota@Sun.COM
84912198SEiji.Ota@Sun.COM rdsv3_message_addref(rm);
85012198SEiji.Ota@Sun.COM rm->m_rs = rs;
85112198SEiji.Ota@Sun.COM
85212198SEiji.Ota@Sun.COM /*
85312198SEiji.Ota@Sun.COM * The code ordering is a little weird, but we're
85412198SEiji.Ota@Sun.COM * trying to minimize the time we hold c_lock
85512198SEiji.Ota@Sun.COM */
85612198SEiji.Ota@Sun.COM rdsv3_message_populate_header(&rm->m_inc.i_hdr, sport,
85712198SEiji.Ota@Sun.COM dport, 0);
85812198SEiji.Ota@Sun.COM rm->m_inc.i_conn = conn;
85912198SEiji.Ota@Sun.COM rdsv3_message_addref(rm); /* XXX - called twice */
86012198SEiji.Ota@Sun.COM
86112198SEiji.Ota@Sun.COM mutex_enter(&conn->c_lock);
86212198SEiji.Ota@Sun.COM rm->m_inc.i_hdr.h_sequence = htonll(conn->c_next_tx_seq++);
86312198SEiji.Ota@Sun.COM list_insert_tail(&conn->c_send_queue, rm);
86412198SEiji.Ota@Sun.COM set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
86512198SEiji.Ota@Sun.COM mutex_exit(&conn->c_lock);
86612198SEiji.Ota@Sun.COM
86712198SEiji.Ota@Sun.COM RDSV3_DPRINTF5("rdsv3_send_queue_rm",
86812198SEiji.Ota@Sun.COM "queued msg %p len %d, rs %p bytes %d seq %llu",
86912198SEiji.Ota@Sun.COM rm, len, rs, rs->rs_snd_bytes,
87012198SEiji.Ota@Sun.COM (unsigned long long)ntohll(
87112198SEiji.Ota@Sun.COM rm->m_inc.i_hdr.h_sequence));
87212198SEiji.Ota@Sun.COM
87312198SEiji.Ota@Sun.COM *queued = 1;
87412198SEiji.Ota@Sun.COM }
87512198SEiji.Ota@Sun.COM
87612198SEiji.Ota@Sun.COM mutex_exit(&rs->rs_lock);
87712198SEiji.Ota@Sun.COM
87812198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Return(rs: %p)", rs);
87912198SEiji.Ota@Sun.COM out:
88012198SEiji.Ota@Sun.COM return (*queued);
88112198SEiji.Ota@Sun.COM }
88212198SEiji.Ota@Sun.COM
88312198SEiji.Ota@Sun.COM static int
rdsv3_cmsg_send(struct rdsv3_sock * rs,struct rdsv3_message * rm,struct msghdr * msg,int * allocated_mr)88412198SEiji.Ota@Sun.COM rdsv3_cmsg_send(struct rdsv3_sock *rs, struct rdsv3_message *rm,
88512198SEiji.Ota@Sun.COM struct msghdr *msg, int *allocated_mr)
88612198SEiji.Ota@Sun.COM {
88712198SEiji.Ota@Sun.COM struct cmsghdr *cmsg;
88812198SEiji.Ota@Sun.COM int ret = 0;
88912198SEiji.Ota@Sun.COM
89012198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_cmsg_send", "Enter(rs: %p)", rs);
89112198SEiji.Ota@Sun.COM
89212198SEiji.Ota@Sun.COM for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
89312198SEiji.Ota@Sun.COM
89412198SEiji.Ota@Sun.COM if (cmsg->cmsg_level != SOL_RDS)
89512198SEiji.Ota@Sun.COM continue;
89612198SEiji.Ota@Sun.COM
89712198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_cmsg_send", "cmsg(%p, %p) type %d",
89812198SEiji.Ota@Sun.COM cmsg, rm, cmsg->cmsg_type);
89912198SEiji.Ota@Sun.COM /*
90012198SEiji.Ota@Sun.COM * As a side effect, RDMA_DEST and RDMA_MAP will set
90112198SEiji.Ota@Sun.COM * rm->m_rdma_cookie and rm->m_rdma_mr.
90212198SEiji.Ota@Sun.COM */
90312198SEiji.Ota@Sun.COM switch (cmsg->cmsg_type) {
90412863SEiji.Ota@Sun.COM case RDS_CMSG_RDMA_ARGS:
90512198SEiji.Ota@Sun.COM ret = rdsv3_cmsg_rdma_args(rs, rm, cmsg);
90612198SEiji.Ota@Sun.COM break;
90712198SEiji.Ota@Sun.COM
90812863SEiji.Ota@Sun.COM case RDS_CMSG_RDMA_DEST:
90912198SEiji.Ota@Sun.COM ret = rdsv3_cmsg_rdma_dest(rs, rm, cmsg);
91012198SEiji.Ota@Sun.COM break;
91112198SEiji.Ota@Sun.COM
91212863SEiji.Ota@Sun.COM case RDS_CMSG_RDMA_MAP:
91312198SEiji.Ota@Sun.COM ret = rdsv3_cmsg_rdma_map(rs, rm, cmsg);
91412198SEiji.Ota@Sun.COM if (ret)
91512198SEiji.Ota@Sun.COM *allocated_mr = 1;
91612198SEiji.Ota@Sun.COM break;
91712198SEiji.Ota@Sun.COM
91812198SEiji.Ota@Sun.COM default:
91912198SEiji.Ota@Sun.COM return (-EINVAL);
92012198SEiji.Ota@Sun.COM }
92112198SEiji.Ota@Sun.COM
92212198SEiji.Ota@Sun.COM if (ret)
92312198SEiji.Ota@Sun.COM break;
92412198SEiji.Ota@Sun.COM }
92512198SEiji.Ota@Sun.COM
92612198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_cmsg_send", "Return(rs: %p)", rs);
92712198SEiji.Ota@Sun.COM
92812198SEiji.Ota@Sun.COM return (ret);
92912198SEiji.Ota@Sun.COM }
93012198SEiji.Ota@Sun.COM
93112794SGiri.Adari@Sun.COM extern unsigned long rdsv3_max_bcopy_size;
93212794SGiri.Adari@Sun.COM
93312198SEiji.Ota@Sun.COM int
rdsv3_sendmsg(struct rdsv3_sock * rs,uio_t * uio,struct nmsghdr * msg,size_t payload_len)93412198SEiji.Ota@Sun.COM rdsv3_sendmsg(struct rdsv3_sock *rs, uio_t *uio, struct nmsghdr *msg,
93512198SEiji.Ota@Sun.COM size_t payload_len)
93612198SEiji.Ota@Sun.COM {
93712198SEiji.Ota@Sun.COM struct rsock *sk = rdsv3_rs_to_sk(rs);
93812198SEiji.Ota@Sun.COM struct sockaddr_in *usin = (struct sockaddr_in *)msg->msg_name;
93912198SEiji.Ota@Sun.COM uint32_be_t daddr;
94012198SEiji.Ota@Sun.COM uint16_be_t dport;
94112198SEiji.Ota@Sun.COM struct rdsv3_message *rm = NULL;
94212198SEiji.Ota@Sun.COM struct rdsv3_connection *conn;
94312198SEiji.Ota@Sun.COM int ret = 0;
94412198SEiji.Ota@Sun.COM int queued = 0, allocated_mr = 0;
94512198SEiji.Ota@Sun.COM int nonblock = msg->msg_flags & MSG_DONTWAIT;
94612414SEiji.Ota@Sun.COM long timeo = rdsv3_sndtimeo(sk, nonblock);
94712198SEiji.Ota@Sun.COM
94812198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_sendmsg", "Enter(rs: %p)", rs);
94912198SEiji.Ota@Sun.COM
95012198SEiji.Ota@Sun.COM if (msg->msg_namelen) {
95112198SEiji.Ota@Sun.COM /* XXX fail non-unicast destination IPs? */
95212198SEiji.Ota@Sun.COM if (msg->msg_namelen < sizeof (*usin) ||
95312198SEiji.Ota@Sun.COM usin->sin_family != AF_INET_OFFLOAD) {
95412198SEiji.Ota@Sun.COM ret = -EINVAL;
95512198SEiji.Ota@Sun.COM RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret);
95612198SEiji.Ota@Sun.COM goto out;
95712198SEiji.Ota@Sun.COM }
95812198SEiji.Ota@Sun.COM daddr = usin->sin_addr.s_addr;
95912198SEiji.Ota@Sun.COM dport = usin->sin_port;
96012198SEiji.Ota@Sun.COM } else {
96112198SEiji.Ota@Sun.COM /* We only care about consistency with ->connect() */
96212198SEiji.Ota@Sun.COM mutex_enter(&sk->sk_lock);
96312198SEiji.Ota@Sun.COM daddr = rs->rs_conn_addr;
96412198SEiji.Ota@Sun.COM dport = rs->rs_conn_port;
96512198SEiji.Ota@Sun.COM mutex_exit(&sk->sk_lock);
96612198SEiji.Ota@Sun.COM }
96712198SEiji.Ota@Sun.COM
96812198SEiji.Ota@Sun.COM /* racing with another thread binding seems ok here */
96912198SEiji.Ota@Sun.COM if (daddr == 0 || rs->rs_bound_addr == 0) {
97012198SEiji.Ota@Sun.COM ret = -ENOTCONN; /* XXX not a great errno */
97112198SEiji.Ota@Sun.COM RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret);
97212198SEiji.Ota@Sun.COM goto out;
97312198SEiji.Ota@Sun.COM }
97412198SEiji.Ota@Sun.COM
97512794SGiri.Adari@Sun.COM if (payload_len > rdsv3_max_bcopy_size) {
97612794SGiri.Adari@Sun.COM RDSV3_DPRINTF2("rdsv3_sendmsg", "Message too large: %d",
97712794SGiri.Adari@Sun.COM payload_len);
97812794SGiri.Adari@Sun.COM ret = -EMSGSIZE;
97912794SGiri.Adari@Sun.COM goto out;
98012794SGiri.Adari@Sun.COM }
98112794SGiri.Adari@Sun.COM
98212198SEiji.Ota@Sun.COM rm = rdsv3_message_copy_from_user(uio, payload_len);
98312198SEiji.Ota@Sun.COM if (IS_ERR(rm)) {
98412198SEiji.Ota@Sun.COM ret = PTR_ERR(rm);
98512198SEiji.Ota@Sun.COM RDSV3_DPRINTF2("rdsv3_sendmsg",
98612198SEiji.Ota@Sun.COM "rdsv3_message_copy_from_user failed %d", -ret);
98712198SEiji.Ota@Sun.COM rm = NULL;
98812198SEiji.Ota@Sun.COM goto out;
98912198SEiji.Ota@Sun.COM }
99012198SEiji.Ota@Sun.COM
99112198SEiji.Ota@Sun.COM rm->m_daddr = daddr;
99212198SEiji.Ota@Sun.COM
99312414SEiji.Ota@Sun.COM /* Parse any control messages the user may have included. */
99412414SEiji.Ota@Sun.COM ret = rdsv3_cmsg_send(rs, rm, msg, &allocated_mr);
99512414SEiji.Ota@Sun.COM if (ret) {
99612414SEiji.Ota@Sun.COM RDSV3_DPRINTF2("rdsv3_sendmsg",
99712414SEiji.Ota@Sun.COM "rdsv3_cmsg_send(rs: %p rm: %p msg: %p) returned: %d",
99812414SEiji.Ota@Sun.COM rs, rm, msg, ret);
99912414SEiji.Ota@Sun.COM goto out;
100012414SEiji.Ota@Sun.COM }
100112414SEiji.Ota@Sun.COM
100212198SEiji.Ota@Sun.COM /*
100312198SEiji.Ota@Sun.COM * rdsv3_conn_create has a spinlock that runs with IRQ off.
100412198SEiji.Ota@Sun.COM * Caching the conn in the socket helps a lot.
100512198SEiji.Ota@Sun.COM */
100612198SEiji.Ota@Sun.COM mutex_enter(&rs->rs_conn_lock);
100712198SEiji.Ota@Sun.COM if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) {
100812198SEiji.Ota@Sun.COM conn = rs->rs_conn;
100912198SEiji.Ota@Sun.COM } else {
101012198SEiji.Ota@Sun.COM conn = rdsv3_conn_create_outgoing(rs->rs_bound_addr,
101112198SEiji.Ota@Sun.COM daddr, rs->rs_transport, KM_NOSLEEP);
101212198SEiji.Ota@Sun.COM if (IS_ERR(conn)) {
101312198SEiji.Ota@Sun.COM mutex_exit(&rs->rs_conn_lock);
101412198SEiji.Ota@Sun.COM ret = PTR_ERR(conn);
101512198SEiji.Ota@Sun.COM RDSV3_DPRINTF2("rdsv3_sendmsg",
101612198SEiji.Ota@Sun.COM "rdsv3_conn_create_outgoing failed %d",
101712198SEiji.Ota@Sun.COM -ret);
101812198SEiji.Ota@Sun.COM goto out;
101912198SEiji.Ota@Sun.COM }
102012198SEiji.Ota@Sun.COM rs->rs_conn = conn;
102112198SEiji.Ota@Sun.COM }
102212198SEiji.Ota@Sun.COM mutex_exit(&rs->rs_conn_lock);
102312198SEiji.Ota@Sun.COM
102412198SEiji.Ota@Sun.COM if ((rm->m_rdma_cookie || rm->m_rdma_op) &&
102512198SEiji.Ota@Sun.COM conn->c_trans->xmit_rdma == NULL) {
102612320SGiri.Adari@Sun.COM RDSV3_DPRINTF2("rdsv3_sendmsg", "rdma_op %p conn xmit_rdma %p",
102712198SEiji.Ota@Sun.COM rm->m_rdma_op, conn->c_trans->xmit_rdma);
102812198SEiji.Ota@Sun.COM ret = -EOPNOTSUPP;
102912198SEiji.Ota@Sun.COM goto out;
103012198SEiji.Ota@Sun.COM }
103112198SEiji.Ota@Sun.COM
103212198SEiji.Ota@Sun.COM /*
103312198SEiji.Ota@Sun.COM * If the connection is down, trigger a connect. We may
103412198SEiji.Ota@Sun.COM * have scheduled a delayed reconnect however - in this case
103512198SEiji.Ota@Sun.COM * we should not interfere.
103612198SEiji.Ota@Sun.COM */
103712198SEiji.Ota@Sun.COM if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN &&
103812198SEiji.Ota@Sun.COM !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags))
103912198SEiji.Ota@Sun.COM rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0);
104012198SEiji.Ota@Sun.COM
104112198SEiji.Ota@Sun.COM ret = rdsv3_cong_wait(conn->c_fcong, dport, nonblock, rs);
104212198SEiji.Ota@Sun.COM if (ret) {
104312676SEiji.Ota@Sun.COM mutex_enter(&rs->rs_congested_lock);
104412414SEiji.Ota@Sun.COM rs->rs_seen_congestion = 1;
104512676SEiji.Ota@Sun.COM cv_signal(&rs->rs_congested_cv);
104612676SEiji.Ota@Sun.COM mutex_exit(&rs->rs_congested_lock);
104712414SEiji.Ota@Sun.COM
104812198SEiji.Ota@Sun.COM RDSV3_DPRINTF2("rdsv3_sendmsg",
104912198SEiji.Ota@Sun.COM "rdsv3_cong_wait (dport: %d) returned: %d", dport, ret);
105012198SEiji.Ota@Sun.COM goto out;
105112198SEiji.Ota@Sun.COM }
105212198SEiji.Ota@Sun.COM
105312198SEiji.Ota@Sun.COM (void) rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, dport,
105412198SEiji.Ota@Sun.COM &queued);
105512198SEiji.Ota@Sun.COM if (!queued) {
105612198SEiji.Ota@Sun.COM /* rdsv3_stats_inc(s_send_queue_full); */
105712198SEiji.Ota@Sun.COM /* XXX make sure this is reasonable */
105812198SEiji.Ota@Sun.COM if (payload_len > rdsv3_sk_sndbuf(rs)) {
105912198SEiji.Ota@Sun.COM ret = -EMSGSIZE;
106012198SEiji.Ota@Sun.COM RDSV3_DPRINTF2("rdsv3_sendmsg",
106112198SEiji.Ota@Sun.COM "msgsize(%d) too big, returning: %d",
106212198SEiji.Ota@Sun.COM payload_len, -ret);
106312198SEiji.Ota@Sun.COM goto out;
106412198SEiji.Ota@Sun.COM }
106512198SEiji.Ota@Sun.COM if (nonblock) {
106612198SEiji.Ota@Sun.COM ret = -EAGAIN;
106712198SEiji.Ota@Sun.COM RDSV3_DPRINTF3("rdsv3_sendmsg",
106812198SEiji.Ota@Sun.COM "send queue full (%d), returning: %d",
106912198SEiji.Ota@Sun.COM payload_len, -ret);
107012198SEiji.Ota@Sun.COM goto out;
107112198SEiji.Ota@Sun.COM }
107212198SEiji.Ota@Sun.COM
107312320SGiri.Adari@Sun.COM #if 0
107412320SGiri.Adari@Sun.COM ret = rdsv3_wait_sig(sk->sk_sleep,
107512320SGiri.Adari@Sun.COM (rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
107612320SGiri.Adari@Sun.COM dport, &queued)));
107712320SGiri.Adari@Sun.COM if (ret == 0) {
107812320SGiri.Adari@Sun.COM /* signal/timeout pending */
107912320SGiri.Adari@Sun.COM RDSV3_DPRINTF2("rdsv3_sendmsg",
108012320SGiri.Adari@Sun.COM "woke due to signal: %d", ret);
108112320SGiri.Adari@Sun.COM ret = -ERESTART;
108212320SGiri.Adari@Sun.COM goto out;
108312320SGiri.Adari@Sun.COM }
108412320SGiri.Adari@Sun.COM #else
108512198SEiji.Ota@Sun.COM mutex_enter(&sk->sk_sleep->waitq_mutex);
108612320SGiri.Adari@Sun.COM sk->sk_sleep->waitq_waiters++;
108712198SEiji.Ota@Sun.COM while (!rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
108812198SEiji.Ota@Sun.COM dport, &queued)) {
108912198SEiji.Ota@Sun.COM ret = cv_wait_sig(&sk->sk_sleep->waitq_cv,
109012198SEiji.Ota@Sun.COM &sk->sk_sleep->waitq_mutex);
109112198SEiji.Ota@Sun.COM if (ret == 0) {
109212198SEiji.Ota@Sun.COM /* signal/timeout pending */
109312198SEiji.Ota@Sun.COM RDSV3_DPRINTF2("rdsv3_sendmsg",
109412320SGiri.Adari@Sun.COM "woke due to signal: %d", ret);
1095*13118SEiji.Ota@Sun.COM ret = -EINTR;
109612320SGiri.Adari@Sun.COM sk->sk_sleep->waitq_waiters--;
109712198SEiji.Ota@Sun.COM mutex_exit(&sk->sk_sleep->waitq_mutex);
109812198SEiji.Ota@Sun.COM goto out;
109912198SEiji.Ota@Sun.COM }
110012320SGiri.Adari@Sun.COM }
110112320SGiri.Adari@Sun.COM sk->sk_sleep->waitq_waiters--;
110212320SGiri.Adari@Sun.COM mutex_exit(&sk->sk_sleep->waitq_mutex);
110312198SEiji.Ota@Sun.COM #endif
110412198SEiji.Ota@Sun.COM
110512198SEiji.Ota@Sun.COM RDSV3_DPRINTF5("rdsv3_sendmsg", "sendmsg woke queued %d",
110612198SEiji.Ota@Sun.COM queued);
110712198SEiji.Ota@Sun.COM
110812198SEiji.Ota@Sun.COM ASSERT(queued);
110912198SEiji.Ota@Sun.COM ret = 0;
111012198SEiji.Ota@Sun.COM }
111112198SEiji.Ota@Sun.COM
111212198SEiji.Ota@Sun.COM /*
111312198SEiji.Ota@Sun.COM * By now we've committed to the send. We reuse rdsv3_send_worker()
111412198SEiji.Ota@Sun.COM * to retry sends in the rds thread if the transport asks us to.
111512198SEiji.Ota@Sun.COM */
111612198SEiji.Ota@Sun.COM rdsv3_stats_inc(s_send_queued);
111712198SEiji.Ota@Sun.COM
111812198SEiji.Ota@Sun.COM if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags))
111912794SGiri.Adari@Sun.COM (void) rdsv3_send_worker(&conn->c_send_w.work);
112012198SEiji.Ota@Sun.COM
112112198SEiji.Ota@Sun.COM rdsv3_message_put(rm);
112212198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_sendmsg", "Return(rs: %p, len: %d)",
112312198SEiji.Ota@Sun.COM rs, payload_len);
112412198SEiji.Ota@Sun.COM return (payload_len);
112512198SEiji.Ota@Sun.COM
112612198SEiji.Ota@Sun.COM out:
112712198SEiji.Ota@Sun.COM /*
112812198SEiji.Ota@Sun.COM * If the user included a RDMA_MAP cmsg, we allocated a MR on the fly.
112912198SEiji.Ota@Sun.COM * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN
113012198SEiji.Ota@Sun.COM * or in any other way, we need to destroy the MR again
113112198SEiji.Ota@Sun.COM */
113212198SEiji.Ota@Sun.COM if (allocated_mr)
113312198SEiji.Ota@Sun.COM rdsv3_rdma_unuse(rs, rdsv3_rdma_cookie_key(rm->m_rdma_cookie),
113412198SEiji.Ota@Sun.COM 1);
113512198SEiji.Ota@Sun.COM
113612198SEiji.Ota@Sun.COM if (rm)
113712198SEiji.Ota@Sun.COM rdsv3_message_put(rm);
113812198SEiji.Ota@Sun.COM return (ret);
113912198SEiji.Ota@Sun.COM }
114012198SEiji.Ota@Sun.COM
114112198SEiji.Ota@Sun.COM /*
114212198SEiji.Ota@Sun.COM * Reply to a ping packet.
114312198SEiji.Ota@Sun.COM */
114412198SEiji.Ota@Sun.COM int
rdsv3_send_pong(struct rdsv3_connection * conn,uint16_be_t dport)114512198SEiji.Ota@Sun.COM rdsv3_send_pong(struct rdsv3_connection *conn, uint16_be_t dport)
114612198SEiji.Ota@Sun.COM {
114712198SEiji.Ota@Sun.COM struct rdsv3_message *rm;
114812198SEiji.Ota@Sun.COM int ret = 0;
114912198SEiji.Ota@Sun.COM
115012198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_pong", "Enter(conn: %p)", conn);
115112198SEiji.Ota@Sun.COM
115212198SEiji.Ota@Sun.COM rm = rdsv3_message_alloc(0, KM_NOSLEEP);
115312676SEiji.Ota@Sun.COM if (!rm) {
115412198SEiji.Ota@Sun.COM ret = -ENOMEM;
115512198SEiji.Ota@Sun.COM goto out;
115612198SEiji.Ota@Sun.COM }
115712198SEiji.Ota@Sun.COM
115812198SEiji.Ota@Sun.COM rm->m_daddr = conn->c_faddr;
115912198SEiji.Ota@Sun.COM
116012198SEiji.Ota@Sun.COM /*
116112198SEiji.Ota@Sun.COM * If the connection is down, trigger a connect. We may
116212198SEiji.Ota@Sun.COM * have scheduled a delayed reconnect however - in this case
116312198SEiji.Ota@Sun.COM * we should not interfere.
116412198SEiji.Ota@Sun.COM */
116512198SEiji.Ota@Sun.COM if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN &&
116612198SEiji.Ota@Sun.COM !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags))
116712198SEiji.Ota@Sun.COM rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0);
116812198SEiji.Ota@Sun.COM
116912198SEiji.Ota@Sun.COM ret = rdsv3_cong_wait(conn->c_fcong, dport, 1, NULL);
117012198SEiji.Ota@Sun.COM if (ret)
117112198SEiji.Ota@Sun.COM goto out;
117212198SEiji.Ota@Sun.COM
117312198SEiji.Ota@Sun.COM mutex_enter(&conn->c_lock);
117412198SEiji.Ota@Sun.COM list_insert_tail(&conn->c_send_queue, rm);
117512198SEiji.Ota@Sun.COM set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
117612198SEiji.Ota@Sun.COM rdsv3_message_addref(rm);
117712198SEiji.Ota@Sun.COM rm->m_inc.i_conn = conn;
117812198SEiji.Ota@Sun.COM
117912198SEiji.Ota@Sun.COM rdsv3_message_populate_header(&rm->m_inc.i_hdr, 0, dport,
118012198SEiji.Ota@Sun.COM conn->c_next_tx_seq);
118112198SEiji.Ota@Sun.COM conn->c_next_tx_seq++;
118212198SEiji.Ota@Sun.COM mutex_exit(&conn->c_lock);
118312198SEiji.Ota@Sun.COM
118412198SEiji.Ota@Sun.COM rdsv3_stats_inc(s_send_queued);
118512198SEiji.Ota@Sun.COM rdsv3_stats_inc(s_send_pong);
118612198SEiji.Ota@Sun.COM
118712676SEiji.Ota@Sun.COM if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags))
118812676SEiji.Ota@Sun.COM (void) rdsv3_send_xmit(conn);
118912676SEiji.Ota@Sun.COM
119012198SEiji.Ota@Sun.COM rdsv3_message_put(rm);
119112198SEiji.Ota@Sun.COM
119212198SEiji.Ota@Sun.COM RDSV3_DPRINTF4("rdsv3_send_pong", "Return(conn: %p)", conn);
119312198SEiji.Ota@Sun.COM return (0);
119412198SEiji.Ota@Sun.COM
119512198SEiji.Ota@Sun.COM out:
119612198SEiji.Ota@Sun.COM if (rm)
119712198SEiji.Ota@Sun.COM rdsv3_message_put(rm);
119812198SEiji.Ota@Sun.COM return (ret);
119912198SEiji.Ota@Sun.COM }
1200