xref: /onnv-gate/usr/src/uts/common/io/ib/clients/rdsv3/send.c (revision 13118:e192495818d4)
112198SEiji.Ota@Sun.COM /*
212198SEiji.Ota@Sun.COM  * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
312198SEiji.Ota@Sun.COM  */
412198SEiji.Ota@Sun.COM 
512198SEiji.Ota@Sun.COM /*
612763SGiri.Adari@Sun.COM  * This file contains code imported from the OFED rds source file send.c
712763SGiri.Adari@Sun.COM  * Oracle elects to have and use the contents of send.c under and governed
812763SGiri.Adari@Sun.COM  * by the OpenIB.org BSD license (see below for full license text). However,
912763SGiri.Adari@Sun.COM  * the following notice accompanied the original version of this file:
1012763SGiri.Adari@Sun.COM  */
1112763SGiri.Adari@Sun.COM 
1212763SGiri.Adari@Sun.COM /*
1312198SEiji.Ota@Sun.COM  * Copyright (c) 2006 Oracle.  All rights reserved.
1412198SEiji.Ota@Sun.COM  *
1512198SEiji.Ota@Sun.COM  * This software is available to you under a choice of one of two
1612198SEiji.Ota@Sun.COM  * licenses.  You may choose to be licensed under the terms of the GNU
1712198SEiji.Ota@Sun.COM  * General Public License (GPL) Version 2, available from the file
1812198SEiji.Ota@Sun.COM  * COPYING in the main directory of this source tree, or the
1912198SEiji.Ota@Sun.COM  * OpenIB.org BSD license below:
2012198SEiji.Ota@Sun.COM  *
2112198SEiji.Ota@Sun.COM  *     Redistribution and use in source and binary forms, with or
2212198SEiji.Ota@Sun.COM  *     without modification, are permitted provided that the following
2312198SEiji.Ota@Sun.COM  *     conditions are met:
2412198SEiji.Ota@Sun.COM  *
2512198SEiji.Ota@Sun.COM  *      - Redistributions of source code must retain the above
2612198SEiji.Ota@Sun.COM  *        copyright notice, this list of conditions and the following
2712198SEiji.Ota@Sun.COM  *        disclaimer.
2812198SEiji.Ota@Sun.COM  *
2912198SEiji.Ota@Sun.COM  *      - Redistributions in binary form must reproduce the above
3012198SEiji.Ota@Sun.COM  *        copyright notice, this list of conditions and the following
3112198SEiji.Ota@Sun.COM  *        disclaimer in the documentation and/or other materials
3212198SEiji.Ota@Sun.COM  *        provided with the distribution.
3312198SEiji.Ota@Sun.COM  *
3412198SEiji.Ota@Sun.COM  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
3512198SEiji.Ota@Sun.COM  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
3612198SEiji.Ota@Sun.COM  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
3712198SEiji.Ota@Sun.COM  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
3812198SEiji.Ota@Sun.COM  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
3912198SEiji.Ota@Sun.COM  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
4012198SEiji.Ota@Sun.COM  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
4112198SEiji.Ota@Sun.COM  * SOFTWARE.
4212198SEiji.Ota@Sun.COM  *
4312198SEiji.Ota@Sun.COM  */
4412198SEiji.Ota@Sun.COM #include <sys/stropts.h>
4512198SEiji.Ota@Sun.COM #include <sys/systm.h>
4612198SEiji.Ota@Sun.COM 
4712198SEiji.Ota@Sun.COM #include <sys/rds.h>
4812198SEiji.Ota@Sun.COM #include <sys/socket.h>
4912198SEiji.Ota@Sun.COM #include <sys/socketvar.h>
5012198SEiji.Ota@Sun.COM 
5112198SEiji.Ota@Sun.COM #include <sys/ib/clients/rdsv3/rdsv3.h>
5212198SEiji.Ota@Sun.COM #include <sys/ib/clients/rdsv3/rdma.h>
5312198SEiji.Ota@Sun.COM #include <sys/ib/clients/rdsv3/rdsv3_debug.h>
5412198SEiji.Ota@Sun.COM 
5512198SEiji.Ota@Sun.COM /*
5612198SEiji.Ota@Sun.COM  * When transmitting messages in rdsv3_send_xmit, we need to emerge from
5712198SEiji.Ota@Sun.COM  * time to time and briefly release the CPU. Otherwise the softlock watchdog
5812198SEiji.Ota@Sun.COM  * will kick our shin.
5912198SEiji.Ota@Sun.COM  * Also, it seems fairer to not let one busy connection stall all the
6012198SEiji.Ota@Sun.COM  * others.
6112198SEiji.Ota@Sun.COM  *
6212198SEiji.Ota@Sun.COM  * send_batch_count is the number of times we'll loop in send_xmit. Setting
6312198SEiji.Ota@Sun.COM  * it to 0 will restore the old behavior (where we looped until we had
6412198SEiji.Ota@Sun.COM  * drained the queue).
6512198SEiji.Ota@Sun.COM  */
6612198SEiji.Ota@Sun.COM static int send_batch_count = 64;
6712198SEiji.Ota@Sun.COM 
6812198SEiji.Ota@Sun.COM extern void rdsv3_ib_send_unmap_rdma(void *ic, struct rdsv3_rdma_op *op);
6912198SEiji.Ota@Sun.COM /*
7012198SEiji.Ota@Sun.COM  * Reset the send state. Caller must hold c_send_lock when calling here.
7112198SEiji.Ota@Sun.COM  */
7212198SEiji.Ota@Sun.COM void
rdsv3_send_reset(struct rdsv3_connection * conn)7312198SEiji.Ota@Sun.COM rdsv3_send_reset(struct rdsv3_connection *conn)
7412198SEiji.Ota@Sun.COM {
7512198SEiji.Ota@Sun.COM 	struct rdsv3_message *rm, *tmp;
7612198SEiji.Ota@Sun.COM 	struct rdsv3_rdma_op *ro;
7712198SEiji.Ota@Sun.COM 
7812198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_reset", "Enter(conn: %p)", conn);
7912198SEiji.Ota@Sun.COM 
8012676SEiji.Ota@Sun.COM 	ASSERT(MUTEX_HELD(&conn->c_send_lock));
8112676SEiji.Ota@Sun.COM 
8212198SEiji.Ota@Sun.COM 	if (conn->c_xmit_rm) {
8312198SEiji.Ota@Sun.COM 		rm = conn->c_xmit_rm;
8412198SEiji.Ota@Sun.COM 		ro = rm->m_rdma_op;
8512198SEiji.Ota@Sun.COM 		if (ro && ro->r_mapped) {
8612198SEiji.Ota@Sun.COM 			RDSV3_DPRINTF2("rdsv3_send_reset",
8712198SEiji.Ota@Sun.COM 			    "rm %p mflg 0x%x map %d mihdl %p sgl %p",
8812198SEiji.Ota@Sun.COM 			    rm, rm->m_flags, ro->r_mapped,
8912198SEiji.Ota@Sun.COM 			    ro->r_rdma_sg[0].mihdl,
9012198SEiji.Ota@Sun.COM 			    ro->r_rdma_sg[0].swr.wr_sgl);
9112198SEiji.Ota@Sun.COM 			rdsv3_ib_send_unmap_rdma(conn->c_transport_data, ro);
9212198SEiji.Ota@Sun.COM 		}
9312198SEiji.Ota@Sun.COM 		/*
9412198SEiji.Ota@Sun.COM 		 * Tell the user the RDMA op is no longer mapped by the
9512198SEiji.Ota@Sun.COM 		 * transport. This isn't entirely true (it's flushed out
9612198SEiji.Ota@Sun.COM 		 * independently) but as the connection is down, there's
9712198SEiji.Ota@Sun.COM 		 * no ongoing RDMA to/from that memory
9812198SEiji.Ota@Sun.COM 		 */
9912198SEiji.Ota@Sun.COM 		rdsv3_message_unmapped(conn->c_xmit_rm);
10012198SEiji.Ota@Sun.COM 		rdsv3_message_put(conn->c_xmit_rm);
10112198SEiji.Ota@Sun.COM 		conn->c_xmit_rm = NULL;
10212198SEiji.Ota@Sun.COM 	}
10312676SEiji.Ota@Sun.COM 
10412198SEiji.Ota@Sun.COM 	conn->c_xmit_sg = 0;
10512198SEiji.Ota@Sun.COM 	conn->c_xmit_hdr_off = 0;
10612198SEiji.Ota@Sun.COM 	conn->c_xmit_data_off = 0;
10712198SEiji.Ota@Sun.COM 	conn->c_xmit_rdma_sent = 0;
10812198SEiji.Ota@Sun.COM 	conn->c_map_queued = 0;
10912198SEiji.Ota@Sun.COM 
11012198SEiji.Ota@Sun.COM 	conn->c_unacked_packets = rdsv3_sysctl_max_unacked_packets;
11112198SEiji.Ota@Sun.COM 	conn->c_unacked_bytes = rdsv3_sysctl_max_unacked_bytes;
11212198SEiji.Ota@Sun.COM 
11312198SEiji.Ota@Sun.COM 	/* Mark messages as retransmissions, and move them to the send q */
11412198SEiji.Ota@Sun.COM 	mutex_enter(&conn->c_lock);
11512198SEiji.Ota@Sun.COM 	RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) {
11612198SEiji.Ota@Sun.COM 		set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags);
11712198SEiji.Ota@Sun.COM 		set_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags);
11812198SEiji.Ota@Sun.COM 		if (rm->m_rdma_op && rm->m_rdma_op->r_mapped) {
11912198SEiji.Ota@Sun.COM 			RDSV3_DPRINTF4("_send_reset",
12012198SEiji.Ota@Sun.COM 			    "RT rm %p mflg 0x%x sgl %p",
12112198SEiji.Ota@Sun.COM 			    rm, rm->m_flags,
12212198SEiji.Ota@Sun.COM 			    rm->m_rdma_op->r_rdma_sg[0].swr.wr_sgl);
12312198SEiji.Ota@Sun.COM 		}
12412198SEiji.Ota@Sun.COM 	}
12512198SEiji.Ota@Sun.COM 	list_move_tail(&conn->c_send_queue, &conn->c_retrans);
12612198SEiji.Ota@Sun.COM 	mutex_exit(&conn->c_lock);
12712198SEiji.Ota@Sun.COM 
12812198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_reset", "Return(conn: %p)", conn);
12912198SEiji.Ota@Sun.COM }
13012198SEiji.Ota@Sun.COM 
13112198SEiji.Ota@Sun.COM /*
13212198SEiji.Ota@Sun.COM  * We're making the concious trade-off here to only send one message
13312198SEiji.Ota@Sun.COM  * down the connection at a time.
13412198SEiji.Ota@Sun.COM  *   Pro:
13512198SEiji.Ota@Sun.COM  *      - tx queueing is a simple fifo list
13612198SEiji.Ota@Sun.COM  *   	- reassembly is optional and easily done by transports per conn
13712198SEiji.Ota@Sun.COM  *      - no per flow rx lookup at all, straight to the socket
13812198SEiji.Ota@Sun.COM  *   	- less per-frag memory and wire overhead
13912198SEiji.Ota@Sun.COM  *   Con:
14012198SEiji.Ota@Sun.COM  *      - queued acks can be delayed behind large messages
14112198SEiji.Ota@Sun.COM  *   Depends:
14212198SEiji.Ota@Sun.COM  *      - small message latency is higher behind queued large messages
14312198SEiji.Ota@Sun.COM  *      - large message latency isn't starved by intervening small sends
14412198SEiji.Ota@Sun.COM  */
14512198SEiji.Ota@Sun.COM int
rdsv3_send_xmit(struct rdsv3_connection * conn)14612198SEiji.Ota@Sun.COM rdsv3_send_xmit(struct rdsv3_connection *conn)
14712198SEiji.Ota@Sun.COM {
14812198SEiji.Ota@Sun.COM 	struct rdsv3_message *rm;
14912198SEiji.Ota@Sun.COM 	unsigned int tmp;
15012198SEiji.Ota@Sun.COM 	unsigned int send_quota = send_batch_count;
15112198SEiji.Ota@Sun.COM 	struct rdsv3_scatterlist *sg;
15212198SEiji.Ota@Sun.COM 	int ret = 0;
15312198SEiji.Ota@Sun.COM 	int was_empty = 0;
15412198SEiji.Ota@Sun.COM 	list_t to_be_dropped;
15512198SEiji.Ota@Sun.COM 
15612676SEiji.Ota@Sun.COM restart:
15712676SEiji.Ota@Sun.COM 	if (!rdsv3_conn_up(conn))
15812676SEiji.Ota@Sun.COM 		goto out;
15912676SEiji.Ota@Sun.COM 
16012198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_xmit", "Enter(conn: %p)", conn);
16112198SEiji.Ota@Sun.COM 
16212198SEiji.Ota@Sun.COM 	list_create(&to_be_dropped, sizeof (struct rdsv3_message),
16312198SEiji.Ota@Sun.COM 	    offsetof(struct rdsv3_message, m_conn_item));
16412198SEiji.Ota@Sun.COM 
16512198SEiji.Ota@Sun.COM 	/*
16612198SEiji.Ota@Sun.COM 	 * sendmsg calls here after having queued its message on the send
16712198SEiji.Ota@Sun.COM 	 * queue.  We only have one task feeding the connection at a time.  If
16812198SEiji.Ota@Sun.COM 	 * another thread is already feeding the queue then we back off.  This
16912198SEiji.Ota@Sun.COM 	 * avoids blocking the caller and trading per-connection data between
17012198SEiji.Ota@Sun.COM 	 * caches per message.
17112198SEiji.Ota@Sun.COM 	 */
17212198SEiji.Ota@Sun.COM 	if (!mutex_tryenter(&conn->c_send_lock)) {
17312198SEiji.Ota@Sun.COM 		RDSV3_DPRINTF4("rdsv3_send_xmit",
17412198SEiji.Ota@Sun.COM 		    "Another thread running(conn: %p)", conn);
17512198SEiji.Ota@Sun.COM 		rdsv3_stats_inc(s_send_sem_contention);
17612198SEiji.Ota@Sun.COM 		ret = -ENOMEM;
17712198SEiji.Ota@Sun.COM 		goto out;
17812198SEiji.Ota@Sun.COM 	}
17912676SEiji.Ota@Sun.COM 	atomic_add_32(&conn->c_senders, 1);
18012198SEiji.Ota@Sun.COM 
18112198SEiji.Ota@Sun.COM 	if (conn->c_trans->xmit_prepare)
18212198SEiji.Ota@Sun.COM 		conn->c_trans->xmit_prepare(conn);
18312198SEiji.Ota@Sun.COM 
18412198SEiji.Ota@Sun.COM 	/*
18512198SEiji.Ota@Sun.COM 	 * spin trying to push headers and data down the connection until
18612676SEiji.Ota@Sun.COM 	 * the connection doesn't make forward progress.
18712198SEiji.Ota@Sun.COM 	 */
18812198SEiji.Ota@Sun.COM 	while (--send_quota) {
18912198SEiji.Ota@Sun.COM 		/*
19012198SEiji.Ota@Sun.COM 		 * See if need to send a congestion map update if we're
19112198SEiji.Ota@Sun.COM 		 * between sending messages.  The send_sem protects our sole
19212198SEiji.Ota@Sun.COM 		 * use of c_map_offset and _bytes.
19312198SEiji.Ota@Sun.COM 		 * Note this is used only by transports that define a special
19412198SEiji.Ota@Sun.COM 		 * xmit_cong_map function. For all others, we create allocate
19512198SEiji.Ota@Sun.COM 		 * a cong_map message and treat it just like any other send.
19612198SEiji.Ota@Sun.COM 		 */
19712198SEiji.Ota@Sun.COM 		if (conn->c_map_bytes) {
19812198SEiji.Ota@Sun.COM 			ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong,
19912198SEiji.Ota@Sun.COM 			    conn->c_map_offset);
20012198SEiji.Ota@Sun.COM 			if (ret <= 0)
20112198SEiji.Ota@Sun.COM 				break;
20212198SEiji.Ota@Sun.COM 
20312198SEiji.Ota@Sun.COM 			conn->c_map_offset += ret;
20412198SEiji.Ota@Sun.COM 			conn->c_map_bytes -= ret;
20512198SEiji.Ota@Sun.COM 			if (conn->c_map_bytes)
20612198SEiji.Ota@Sun.COM 				continue;
20712198SEiji.Ota@Sun.COM 		}
20812198SEiji.Ota@Sun.COM 
20912198SEiji.Ota@Sun.COM 		/*
21012198SEiji.Ota@Sun.COM 		 * If we're done sending the current message, clear the
21112198SEiji.Ota@Sun.COM 		 * offset and S/G temporaries.
21212198SEiji.Ota@Sun.COM 		 */
21312198SEiji.Ota@Sun.COM 		rm = conn->c_xmit_rm;
21412198SEiji.Ota@Sun.COM 		if (rm != NULL &&
21512198SEiji.Ota@Sun.COM 		    conn->c_xmit_hdr_off == sizeof (struct rdsv3_header) &&
21612198SEiji.Ota@Sun.COM 		    conn->c_xmit_sg == rm->m_nents) {
21712198SEiji.Ota@Sun.COM 			conn->c_xmit_rm = NULL;
21812198SEiji.Ota@Sun.COM 			conn->c_xmit_sg = 0;
21912198SEiji.Ota@Sun.COM 			conn->c_xmit_hdr_off = 0;
22012198SEiji.Ota@Sun.COM 			conn->c_xmit_data_off = 0;
22112198SEiji.Ota@Sun.COM 			conn->c_xmit_rdma_sent = 0;
22212198SEiji.Ota@Sun.COM 
22312198SEiji.Ota@Sun.COM 			/* Release the reference to the previous message. */
22412198SEiji.Ota@Sun.COM 			rdsv3_message_put(rm);
22512198SEiji.Ota@Sun.COM 			rm = NULL;
22612198SEiji.Ota@Sun.COM 		}
22712198SEiji.Ota@Sun.COM 
22812198SEiji.Ota@Sun.COM 		/* If we're asked to send a cong map update, do so. */
22912198SEiji.Ota@Sun.COM 		if (rm == NULL && test_and_clear_bit(0, &conn->c_map_queued)) {
23012198SEiji.Ota@Sun.COM 			if (conn->c_trans->xmit_cong_map != NULL) {
23112198SEiji.Ota@Sun.COM 				conn->c_map_offset = 0;
23212198SEiji.Ota@Sun.COM 				conn->c_map_bytes =
23312198SEiji.Ota@Sun.COM 				    sizeof (struct rdsv3_header) +
23412198SEiji.Ota@Sun.COM 				    RDSV3_CONG_MAP_BYTES;
23512198SEiji.Ota@Sun.COM 				continue;
23612198SEiji.Ota@Sun.COM 			}
23712198SEiji.Ota@Sun.COM 
23812198SEiji.Ota@Sun.COM 			rm = rdsv3_cong_update_alloc(conn);
23912198SEiji.Ota@Sun.COM 			if (IS_ERR(rm)) {
24012198SEiji.Ota@Sun.COM 				ret = PTR_ERR(rm);
24112198SEiji.Ota@Sun.COM 				break;
24212198SEiji.Ota@Sun.COM 			}
24312198SEiji.Ota@Sun.COM 
24412198SEiji.Ota@Sun.COM 			conn->c_xmit_rm = rm;
24512198SEiji.Ota@Sun.COM 		}
24612198SEiji.Ota@Sun.COM 
24712198SEiji.Ota@Sun.COM 		/*
24812198SEiji.Ota@Sun.COM 		 * Grab the next message from the send queue, if there is one.
24912198SEiji.Ota@Sun.COM 		 *
25012198SEiji.Ota@Sun.COM 		 * c_xmit_rm holds a ref while we're sending this message down
25112198SEiji.Ota@Sun.COM 		 * the connction.  We can use this ref while holding the
25212198SEiji.Ota@Sun.COM 		 * send_sem.. rdsv3_send_reset() is serialized with it.
25312198SEiji.Ota@Sun.COM 		 */
25412198SEiji.Ota@Sun.COM 		if (rm == NULL) {
25512198SEiji.Ota@Sun.COM 			unsigned int len;
25612198SEiji.Ota@Sun.COM 
25712198SEiji.Ota@Sun.COM 			mutex_enter(&conn->c_lock);
25812198SEiji.Ota@Sun.COM 
25912198SEiji.Ota@Sun.COM 			if (!list_is_empty(&conn->c_send_queue)) {
26012198SEiji.Ota@Sun.COM 				rm = list_remove_head(&conn->c_send_queue);
26112198SEiji.Ota@Sun.COM 				rdsv3_message_addref(rm);
26212198SEiji.Ota@Sun.COM 
26312198SEiji.Ota@Sun.COM 				/*
26412198SEiji.Ota@Sun.COM 				 * Move the message from the send queue to
26512198SEiji.Ota@Sun.COM 				 * the retransmit
26612198SEiji.Ota@Sun.COM 				 * list right away.
26712198SEiji.Ota@Sun.COM 				 */
26812198SEiji.Ota@Sun.COM 				list_insert_tail(&conn->c_retrans, rm);
26912198SEiji.Ota@Sun.COM 			}
27012198SEiji.Ota@Sun.COM 
27112198SEiji.Ota@Sun.COM 			mutex_exit(&conn->c_lock);
27212198SEiji.Ota@Sun.COM 
27312198SEiji.Ota@Sun.COM 			if (rm == NULL) {
27412198SEiji.Ota@Sun.COM 				was_empty = 1;
27512198SEiji.Ota@Sun.COM 				break;
27612198SEiji.Ota@Sun.COM 			}
27712198SEiji.Ota@Sun.COM 
27812198SEiji.Ota@Sun.COM 			/*
27912198SEiji.Ota@Sun.COM 			 * Unfortunately, the way Infiniband deals with
28012198SEiji.Ota@Sun.COM 			 * RDMA to a bad MR key is by moving the entire
28112198SEiji.Ota@Sun.COM 			 * queue pair to error state. We cold possibly
28212198SEiji.Ota@Sun.COM 			 * recover from that, but right now we drop the
28312198SEiji.Ota@Sun.COM 			 * connection.
28412198SEiji.Ota@Sun.COM 			 * Therefore, we never retransmit messages with
28512198SEiji.Ota@Sun.COM 			 * RDMA ops.
28612198SEiji.Ota@Sun.COM 			 */
28712198SEiji.Ota@Sun.COM 			if (rm->m_rdma_op &&
28812198SEiji.Ota@Sun.COM 			    test_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags)) {
28912198SEiji.Ota@Sun.COM 				mutex_enter(&conn->c_lock);
29012198SEiji.Ota@Sun.COM 				if (test_and_clear_bit(RDSV3_MSG_ON_CONN,
29112198SEiji.Ota@Sun.COM 				    &rm->m_flags))
29212198SEiji.Ota@Sun.COM 					list_remove_node(&rm->m_conn_item);
29312198SEiji.Ota@Sun.COM 					list_insert_tail(&to_be_dropped, rm);
29412198SEiji.Ota@Sun.COM 				mutex_exit(&conn->c_lock);
29512198SEiji.Ota@Sun.COM 				rdsv3_message_put(rm);
29612198SEiji.Ota@Sun.COM 				continue;
29712198SEiji.Ota@Sun.COM 			}
29812198SEiji.Ota@Sun.COM 
29912198SEiji.Ota@Sun.COM 			/* Require an ACK every once in a while */
30012198SEiji.Ota@Sun.COM 			len = ntohl(rm->m_inc.i_hdr.h_len);
30112198SEiji.Ota@Sun.COM 			if (conn->c_unacked_packets == 0 ||
30212198SEiji.Ota@Sun.COM 			    conn->c_unacked_bytes < len) {
30312198SEiji.Ota@Sun.COM 				set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags);
30412198SEiji.Ota@Sun.COM 
30512198SEiji.Ota@Sun.COM 				conn->c_unacked_packets =
30612198SEiji.Ota@Sun.COM 				    rdsv3_sysctl_max_unacked_packets;
30712198SEiji.Ota@Sun.COM 				conn->c_unacked_bytes =
30812198SEiji.Ota@Sun.COM 				    rdsv3_sysctl_max_unacked_bytes;
30912198SEiji.Ota@Sun.COM 				rdsv3_stats_inc(s_send_ack_required);
31012198SEiji.Ota@Sun.COM 			} else {
31112198SEiji.Ota@Sun.COM 				conn->c_unacked_bytes -= len;
31212198SEiji.Ota@Sun.COM 				conn->c_unacked_packets--;
31312198SEiji.Ota@Sun.COM 			}
31412198SEiji.Ota@Sun.COM 
31512198SEiji.Ota@Sun.COM 			conn->c_xmit_rm = rm;
31612198SEiji.Ota@Sun.COM 		}
31712198SEiji.Ota@Sun.COM 
31812198SEiji.Ota@Sun.COM 		/*
31912198SEiji.Ota@Sun.COM 		 * Try and send an rdma message.  Let's see if we can
32012198SEiji.Ota@Sun.COM 		 * keep this simple and require that the transport either
32112198SEiji.Ota@Sun.COM 		 * send the whole rdma or none of it.
32212198SEiji.Ota@Sun.COM 		 */
32312198SEiji.Ota@Sun.COM 		if (rm->m_rdma_op && !conn->c_xmit_rdma_sent) {
32412198SEiji.Ota@Sun.COM 			ret = conn->c_trans->xmit_rdma(conn, rm->m_rdma_op);
32512198SEiji.Ota@Sun.COM 			if (ret)
32612198SEiji.Ota@Sun.COM 				break;
32712198SEiji.Ota@Sun.COM 			conn->c_xmit_rdma_sent = 1;
32812198SEiji.Ota@Sun.COM 			/*
32912198SEiji.Ota@Sun.COM 			 * The transport owns the mapped memory for now.
33012198SEiji.Ota@Sun.COM 			 * You can't unmap it while it's on the send queue
33112198SEiji.Ota@Sun.COM 			 */
33212198SEiji.Ota@Sun.COM 			set_bit(RDSV3_MSG_MAPPED, &rm->m_flags);
33312198SEiji.Ota@Sun.COM 		}
33412198SEiji.Ota@Sun.COM 
33512198SEiji.Ota@Sun.COM 		if (conn->c_xmit_hdr_off < sizeof (struct rdsv3_header) ||
33612198SEiji.Ota@Sun.COM 		    conn->c_xmit_sg < rm->m_nents) {
33712198SEiji.Ota@Sun.COM 			ret = conn->c_trans->xmit(conn, rm,
33812198SEiji.Ota@Sun.COM 			    conn->c_xmit_hdr_off,
33912198SEiji.Ota@Sun.COM 			    conn->c_xmit_sg,
34012198SEiji.Ota@Sun.COM 			    conn->c_xmit_data_off);
34112198SEiji.Ota@Sun.COM 			if (ret <= 0)
34212198SEiji.Ota@Sun.COM 				break;
34312198SEiji.Ota@Sun.COM 
34412198SEiji.Ota@Sun.COM 			if (conn->c_xmit_hdr_off <
34512198SEiji.Ota@Sun.COM 			    sizeof (struct rdsv3_header)) {
34612198SEiji.Ota@Sun.COM 				tmp = min(ret,
34712198SEiji.Ota@Sun.COM 				    sizeof (struct rdsv3_header) -
34812198SEiji.Ota@Sun.COM 				    conn->c_xmit_hdr_off);
34912198SEiji.Ota@Sun.COM 				conn->c_xmit_hdr_off += tmp;
35012198SEiji.Ota@Sun.COM 				ret -= tmp;
35112198SEiji.Ota@Sun.COM 			}
35212198SEiji.Ota@Sun.COM 
35312198SEiji.Ota@Sun.COM 			sg = &rm->m_sg[conn->c_xmit_sg];
35412198SEiji.Ota@Sun.COM 			while (ret) {
35512198SEiji.Ota@Sun.COM 				tmp = min(ret, rdsv3_sg_len(sg) -
35612198SEiji.Ota@Sun.COM 				    conn->c_xmit_data_off);
35712198SEiji.Ota@Sun.COM 				conn->c_xmit_data_off += tmp;
35812198SEiji.Ota@Sun.COM 				ret -= tmp;
35912198SEiji.Ota@Sun.COM 				if (conn->c_xmit_data_off == rdsv3_sg_len(sg)) {
36012198SEiji.Ota@Sun.COM 					conn->c_xmit_data_off = 0;
36112198SEiji.Ota@Sun.COM 					sg++;
36212198SEiji.Ota@Sun.COM 					conn->c_xmit_sg++;
36312198SEiji.Ota@Sun.COM 					ASSERT(!(ret != 0 &&
36412198SEiji.Ota@Sun.COM 					    conn->c_xmit_sg == rm->m_nents));
36512198SEiji.Ota@Sun.COM 				}
36612198SEiji.Ota@Sun.COM 			}
36712198SEiji.Ota@Sun.COM 		}
36812198SEiji.Ota@Sun.COM 	}
36912198SEiji.Ota@Sun.COM 
37012198SEiji.Ota@Sun.COM 	/* Nuke any messages we decided not to retransmit. */
37112198SEiji.Ota@Sun.COM 	if (!list_is_empty(&to_be_dropped))
37212863SEiji.Ota@Sun.COM 		rdsv3_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
37312198SEiji.Ota@Sun.COM 
37412198SEiji.Ota@Sun.COM 	if (conn->c_trans->xmit_complete)
37512198SEiji.Ota@Sun.COM 		conn->c_trans->xmit_complete(conn);
37612198SEiji.Ota@Sun.COM 
37712198SEiji.Ota@Sun.COM 	/*
37812198SEiji.Ota@Sun.COM 	 * We might be racing with another sender who queued a message but
37912198SEiji.Ota@Sun.COM 	 * backed off on noticing that we held the c_send_lock.  If we check
38012198SEiji.Ota@Sun.COM 	 * for queued messages after dropping the sem then either we'll
38112198SEiji.Ota@Sun.COM 	 * see the queued message or the queuer will get the sem.  If we
38212198SEiji.Ota@Sun.COM 	 * notice the queued message then we trigger an immediate retry.
38312198SEiji.Ota@Sun.COM 	 *
38412198SEiji.Ota@Sun.COM 	 * We need to be careful only to do this when we stopped processing
38512198SEiji.Ota@Sun.COM 	 * the send queue because it was empty.  It's the only way we
38612198SEiji.Ota@Sun.COM 	 * stop processing the loop when the transport hasn't taken
38712198SEiji.Ota@Sun.COM 	 * responsibility for forward progress.
38812198SEiji.Ota@Sun.COM 	 */
38912198SEiji.Ota@Sun.COM 	mutex_exit(&conn->c_send_lock);
39012198SEiji.Ota@Sun.COM 
39112198SEiji.Ota@Sun.COM 	if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) {
39212198SEiji.Ota@Sun.COM 		/*
39312198SEiji.Ota@Sun.COM 		 * We exhausted the send quota, but there's work left to
39412198SEiji.Ota@Sun.COM 		 * do. Return and (re-)schedule the send worker.
39512198SEiji.Ota@Sun.COM 		 */
39612198SEiji.Ota@Sun.COM 		ret = -EAGAIN;
39712198SEiji.Ota@Sun.COM 	}
39812198SEiji.Ota@Sun.COM 
39912676SEiji.Ota@Sun.COM 	atomic_dec_32(&conn->c_senders);
40012676SEiji.Ota@Sun.COM 
40112198SEiji.Ota@Sun.COM 	if (ret == 0 && was_empty) {
40212198SEiji.Ota@Sun.COM 		/*
40312198SEiji.Ota@Sun.COM 		 * A simple bit test would be way faster than taking the
40412198SEiji.Ota@Sun.COM 		 * spin lock
40512198SEiji.Ota@Sun.COM 		 */
40612198SEiji.Ota@Sun.COM 		mutex_enter(&conn->c_lock);
40712198SEiji.Ota@Sun.COM 		if (!list_is_empty(&conn->c_send_queue)) {
40812198SEiji.Ota@Sun.COM 			rdsv3_stats_inc(s_send_sem_queue_raced);
40912198SEiji.Ota@Sun.COM 			ret = -EAGAIN;
41012198SEiji.Ota@Sun.COM 		}
41112198SEiji.Ota@Sun.COM 		mutex_exit(&conn->c_lock);
41212198SEiji.Ota@Sun.COM 	}
41312198SEiji.Ota@Sun.COM 
41412198SEiji.Ota@Sun.COM out:
41512198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_xmit", "Return(conn: %p, ret: %d)",
41612198SEiji.Ota@Sun.COM 	    conn, ret);
41712198SEiji.Ota@Sun.COM 	return (ret);
41812198SEiji.Ota@Sun.COM }
41912198SEiji.Ota@Sun.COM 
42012198SEiji.Ota@Sun.COM static void
rdsv3_send_sndbuf_remove(struct rdsv3_sock * rs,struct rdsv3_message * rm)42112198SEiji.Ota@Sun.COM rdsv3_send_sndbuf_remove(struct rdsv3_sock *rs, struct rdsv3_message *rm)
42212198SEiji.Ota@Sun.COM {
42312198SEiji.Ota@Sun.COM 	uint32_t len = ntohl(rm->m_inc.i_hdr.h_len);
42412198SEiji.Ota@Sun.COM 
42512198SEiji.Ota@Sun.COM 	ASSERT(mutex_owned(&rs->rs_lock));
42612198SEiji.Ota@Sun.COM 
42712198SEiji.Ota@Sun.COM 	ASSERT(rs->rs_snd_bytes >= len);
42812198SEiji.Ota@Sun.COM 	rs->rs_snd_bytes -= len;
42912198SEiji.Ota@Sun.COM 
43012198SEiji.Ota@Sun.COM 	if (rs->rs_snd_bytes == 0)
43112198SEiji.Ota@Sun.COM 		rdsv3_stats_inc(s_send_queue_empty);
43212198SEiji.Ota@Sun.COM }
43312198SEiji.Ota@Sun.COM 
43412198SEiji.Ota@Sun.COM static inline int
rdsv3_send_is_acked(struct rdsv3_message * rm,uint64_t ack,is_acked_func is_acked)43512198SEiji.Ota@Sun.COM rdsv3_send_is_acked(struct rdsv3_message *rm, uint64_t ack,
43612198SEiji.Ota@Sun.COM     is_acked_func is_acked)
43712198SEiji.Ota@Sun.COM {
43812198SEiji.Ota@Sun.COM 	if (is_acked)
43912198SEiji.Ota@Sun.COM 		return (is_acked(rm, ack));
44012198SEiji.Ota@Sun.COM 	return (ntohll(rm->m_inc.i_hdr.h_sequence) <= ack);
44112198SEiji.Ota@Sun.COM }
44212198SEiji.Ota@Sun.COM 
44312198SEiji.Ota@Sun.COM /*
44412198SEiji.Ota@Sun.COM  * Returns true if there are no messages on the send and retransmit queues
44512198SEiji.Ota@Sun.COM  * which have a sequence number greater than or equal to the given sequence
44612198SEiji.Ota@Sun.COM  * number.
44712198SEiji.Ota@Sun.COM  */
44812198SEiji.Ota@Sun.COM int
rdsv3_send_acked_before(struct rdsv3_connection * conn,uint64_t seq)44912198SEiji.Ota@Sun.COM rdsv3_send_acked_before(struct rdsv3_connection *conn, uint64_t seq)
45012198SEiji.Ota@Sun.COM {
45112198SEiji.Ota@Sun.COM 	struct rdsv3_message *rm;
45212198SEiji.Ota@Sun.COM 	int ret = 1;
45312198SEiji.Ota@Sun.COM 
45412198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_acked_before", "Enter(conn: %p)", conn);
45512198SEiji.Ota@Sun.COM 
45612198SEiji.Ota@Sun.COM 	mutex_enter(&conn->c_lock);
45712198SEiji.Ota@Sun.COM 
45812198SEiji.Ota@Sun.COM 	/* XXX - original code spits out warning */
45912198SEiji.Ota@Sun.COM 	rm = list_head(&conn->c_retrans);
46012198SEiji.Ota@Sun.COM 	if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq)
46112198SEiji.Ota@Sun.COM 		ret = 0;
46212198SEiji.Ota@Sun.COM 
46312198SEiji.Ota@Sun.COM 	/* XXX - original code spits out warning */
46412198SEiji.Ota@Sun.COM 	rm = list_head(&conn->c_send_queue);
46512198SEiji.Ota@Sun.COM 	if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq)
46612198SEiji.Ota@Sun.COM 		ret = 0;
46712198SEiji.Ota@Sun.COM 
46812198SEiji.Ota@Sun.COM 	mutex_exit(&conn->c_lock);
46912198SEiji.Ota@Sun.COM 
47012198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_acked_before", "Return(conn: %p)", conn);
47112198SEiji.Ota@Sun.COM 
47212198SEiji.Ota@Sun.COM 	return (ret);
47312198SEiji.Ota@Sun.COM }
47412198SEiji.Ota@Sun.COM 
47512198SEiji.Ota@Sun.COM /*
47612198SEiji.Ota@Sun.COM  * This is pretty similar to what happens below in the ACK
47712198SEiji.Ota@Sun.COM  * handling code - except that we call here as soon as we get
47812198SEiji.Ota@Sun.COM  * the IB send completion on the RDMA op and the accompanying
47912198SEiji.Ota@Sun.COM  * message.
48012198SEiji.Ota@Sun.COM  */
48112198SEiji.Ota@Sun.COM void
rdsv3_rdma_send_complete(struct rdsv3_message * rm,int status)48212198SEiji.Ota@Sun.COM rdsv3_rdma_send_complete(struct rdsv3_message *rm, int status)
48312198SEiji.Ota@Sun.COM {
48412198SEiji.Ota@Sun.COM 	struct rdsv3_sock *rs = NULL;
48512198SEiji.Ota@Sun.COM 	struct rdsv3_rdma_op *ro;
48612198SEiji.Ota@Sun.COM 	struct rdsv3_notifier *notifier;
48712198SEiji.Ota@Sun.COM 
48812198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Enter(rm: %p)", rm);
48912198SEiji.Ota@Sun.COM 
49012198SEiji.Ota@Sun.COM 	mutex_enter(&rm->m_rs_lock);
49112198SEiji.Ota@Sun.COM 
49212198SEiji.Ota@Sun.COM 	ro = rm->m_rdma_op;
49312198SEiji.Ota@Sun.COM 	if (test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags) &&
49412414SEiji.Ota@Sun.COM 	    ro && ro->r_notify && ro->r_notifier) {
49512414SEiji.Ota@Sun.COM 		notifier = ro->r_notifier;
49612198SEiji.Ota@Sun.COM 		rs = rm->m_rs;
49712198SEiji.Ota@Sun.COM 		rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs));
49812198SEiji.Ota@Sun.COM 
49912198SEiji.Ota@Sun.COM 		notifier->n_status = status;
50012198SEiji.Ota@Sun.COM 		mutex_enter(&rs->rs_lock);
50112198SEiji.Ota@Sun.COM 		list_insert_tail(&rs->rs_notify_queue, notifier);
50212198SEiji.Ota@Sun.COM 		mutex_exit(&rs->rs_lock);
50312414SEiji.Ota@Sun.COM 		ro->r_notifier = NULL;
50412198SEiji.Ota@Sun.COM 	}
50512198SEiji.Ota@Sun.COM 
50612198SEiji.Ota@Sun.COM 	mutex_exit(&rm->m_rs_lock);
50712198SEiji.Ota@Sun.COM 
50812198SEiji.Ota@Sun.COM 	if (rs) {
50912794SGiri.Adari@Sun.COM 		struct rsock *sk = rdsv3_rs_to_sk(rs);
51012794SGiri.Adari@Sun.COM 		int error;
51112794SGiri.Adari@Sun.COM 
51212198SEiji.Ota@Sun.COM 		rdsv3_wake_sk_sleep(rs);
51312794SGiri.Adari@Sun.COM 
51412794SGiri.Adari@Sun.COM 		/* wake up anyone waiting in poll */
51512794SGiri.Adari@Sun.COM 		sk->sk_upcalls->su_recv(sk->sk_upper_handle, NULL,
51612794SGiri.Adari@Sun.COM 		    0, 0, &error, NULL);
51712794SGiri.Adari@Sun.COM 		if (error != 0) {
51812794SGiri.Adari@Sun.COM 			RDSV3_DPRINTF2("rdsv3_recv_incoming",
51912794SGiri.Adari@Sun.COM 			    "su_recv returned: %d", error);
52012794SGiri.Adari@Sun.COM 		}
52112794SGiri.Adari@Sun.COM 
52212198SEiji.Ota@Sun.COM 		rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs));
52312198SEiji.Ota@Sun.COM 	}
52412198SEiji.Ota@Sun.COM 
52512198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Return(rm: %p)", rm);
52612198SEiji.Ota@Sun.COM }
52712198SEiji.Ota@Sun.COM 
52812198SEiji.Ota@Sun.COM /*
52912198SEiji.Ota@Sun.COM  * This is the same as rdsv3_rdma_send_complete except we
53012198SEiji.Ota@Sun.COM  * don't do any locking - we have all the ingredients (message,
53112198SEiji.Ota@Sun.COM  * socket, socket lock) and can just move the notifier.
53212198SEiji.Ota@Sun.COM  */
53312198SEiji.Ota@Sun.COM static inline void
__rdsv3_rdma_send_complete(struct rdsv3_sock * rs,struct rdsv3_message * rm,int status)53412198SEiji.Ota@Sun.COM __rdsv3_rdma_send_complete(struct rdsv3_sock *rs, struct rdsv3_message *rm,
53512198SEiji.Ota@Sun.COM     int status)
53612198SEiji.Ota@Sun.COM {
53712198SEiji.Ota@Sun.COM 	struct rdsv3_rdma_op *ro;
53812198SEiji.Ota@Sun.COM 	void *ic;
53912198SEiji.Ota@Sun.COM 
54012198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("__rdsv3_rdma_send_complete",
54112198SEiji.Ota@Sun.COM 	    "Enter(rs: %p, rm: %p)", rs, rm);
54212198SEiji.Ota@Sun.COM 
54312198SEiji.Ota@Sun.COM 	ro = rm->m_rdma_op;
54412198SEiji.Ota@Sun.COM 	if (ro && ro->r_notify && ro->r_notifier) {
54512198SEiji.Ota@Sun.COM 		ro->r_notifier->n_status = status;
54612198SEiji.Ota@Sun.COM 		list_insert_tail(&rs->rs_notify_queue, ro->r_notifier);
54712198SEiji.Ota@Sun.COM 		ro->r_notifier = NULL;
54812198SEiji.Ota@Sun.COM 	}
54912198SEiji.Ota@Sun.COM 
55012198SEiji.Ota@Sun.COM 	/* No need to wake the app - caller does this */
55112198SEiji.Ota@Sun.COM }
55212198SEiji.Ota@Sun.COM 
55312198SEiji.Ota@Sun.COM /*
55412198SEiji.Ota@Sun.COM  * This is called from the IB send completion when we detect
55512198SEiji.Ota@Sun.COM  * a RDMA operation that failed with remote access error.
55612198SEiji.Ota@Sun.COM  * So speed is not an issue here.
55712198SEiji.Ota@Sun.COM  */
55812198SEiji.Ota@Sun.COM struct rdsv3_message *
rdsv3_send_get_message(struct rdsv3_connection * conn,struct rdsv3_rdma_op * op)55912198SEiji.Ota@Sun.COM rdsv3_send_get_message(struct rdsv3_connection *conn,
56012198SEiji.Ota@Sun.COM     struct rdsv3_rdma_op *op)
56112198SEiji.Ota@Sun.COM {
56212198SEiji.Ota@Sun.COM 	struct rdsv3_message *rm, *tmp, *found = NULL;
56312198SEiji.Ota@Sun.COM 
56412198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_get_message", "Enter(conn: %p)", conn);
56512198SEiji.Ota@Sun.COM 
56612198SEiji.Ota@Sun.COM 	mutex_enter(&conn->c_lock);
56712198SEiji.Ota@Sun.COM 
56812198SEiji.Ota@Sun.COM 	RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) {
56912198SEiji.Ota@Sun.COM 		if (rm->m_rdma_op == op) {
57012198SEiji.Ota@Sun.COM 			atomic_add_32(&rm->m_refcount, 1);
57112198SEiji.Ota@Sun.COM 			found = rm;
57212198SEiji.Ota@Sun.COM 			goto out;
57312198SEiji.Ota@Sun.COM 		}
57412198SEiji.Ota@Sun.COM 	}
57512198SEiji.Ota@Sun.COM 
57612198SEiji.Ota@Sun.COM 	RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_send_queue,
57712198SEiji.Ota@Sun.COM 	    m_conn_item) {
57812198SEiji.Ota@Sun.COM 		if (rm->m_rdma_op == op) {
57912198SEiji.Ota@Sun.COM 			atomic_add_32(&rm->m_refcount, 1);
58012198SEiji.Ota@Sun.COM 			found = rm;
58112198SEiji.Ota@Sun.COM 			break;
58212198SEiji.Ota@Sun.COM 		}
58312198SEiji.Ota@Sun.COM 	}
58412198SEiji.Ota@Sun.COM 
58512198SEiji.Ota@Sun.COM out:
58612198SEiji.Ota@Sun.COM 	mutex_exit(&conn->c_lock);
58712198SEiji.Ota@Sun.COM 
58812198SEiji.Ota@Sun.COM 	return (found);
58912198SEiji.Ota@Sun.COM }
59012198SEiji.Ota@Sun.COM 
59112198SEiji.Ota@Sun.COM /*
59212198SEiji.Ota@Sun.COM  * This removes messages from the socket's list if they're on it.  The list
59312198SEiji.Ota@Sun.COM  * argument must be private to the caller, we must be able to modify it
59412198SEiji.Ota@Sun.COM  * without locks.  The messages must have a reference held for their
59512198SEiji.Ota@Sun.COM  * position on the list.  This function will drop that reference after
59612198SEiji.Ota@Sun.COM  * removing the messages from the 'messages' list regardless of if it found
59712198SEiji.Ota@Sun.COM  * the messages on the socket list or not.
59812198SEiji.Ota@Sun.COM  */
59912198SEiji.Ota@Sun.COM void
rdsv3_send_remove_from_sock(struct list * messages,int status)60012198SEiji.Ota@Sun.COM rdsv3_send_remove_from_sock(struct list *messages, int status)
60112198SEiji.Ota@Sun.COM {
60212198SEiji.Ota@Sun.COM 	struct rdsv3_sock *rs = NULL;
60312198SEiji.Ota@Sun.COM 	struct rdsv3_message *rm;
60412198SEiji.Ota@Sun.COM 
60512198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Enter");
60612198SEiji.Ota@Sun.COM 
60712198SEiji.Ota@Sun.COM 	while (!list_is_empty(messages)) {
60812414SEiji.Ota@Sun.COM 		int was_on_sock = 0;
60912198SEiji.Ota@Sun.COM 		rm = list_remove_head(messages);
61012198SEiji.Ota@Sun.COM 
61112198SEiji.Ota@Sun.COM 		/*
61212198SEiji.Ota@Sun.COM 		 * If we see this flag cleared then we're *sure* that someone
61312198SEiji.Ota@Sun.COM 		 * else beat us to removing it from the sock.  If we race
61412198SEiji.Ota@Sun.COM 		 * with their flag update we'll get the lock and then really
61512198SEiji.Ota@Sun.COM 		 * see that the flag has been cleared.
61612198SEiji.Ota@Sun.COM 		 *
61712198SEiji.Ota@Sun.COM 		 * The message spinlock makes sure nobody clears rm->m_rs
61812198SEiji.Ota@Sun.COM 		 * while we're messing with it. It does not prevent the
61912198SEiji.Ota@Sun.COM 		 * message from being removed from the socket, though.
62012198SEiji.Ota@Sun.COM 		 */
62112198SEiji.Ota@Sun.COM 		mutex_enter(&rm->m_rs_lock);
62212198SEiji.Ota@Sun.COM 		if (!test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags))
62312198SEiji.Ota@Sun.COM 			goto unlock_and_drop;
62412198SEiji.Ota@Sun.COM 
62512198SEiji.Ota@Sun.COM 		if (rs != rm->m_rs) {
62612198SEiji.Ota@Sun.COM 			if (rs) {
62712198SEiji.Ota@Sun.COM 				rdsv3_wake_sk_sleep(rs);
62812198SEiji.Ota@Sun.COM 				rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs));
62912198SEiji.Ota@Sun.COM 			}
63012198SEiji.Ota@Sun.COM 			rs = rm->m_rs;
63112198SEiji.Ota@Sun.COM 			rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs));
63212198SEiji.Ota@Sun.COM 		}
63312198SEiji.Ota@Sun.COM 
63412198SEiji.Ota@Sun.COM 		mutex_enter(&rs->rs_lock);
63512198SEiji.Ota@Sun.COM 		if (test_and_clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) {
63612198SEiji.Ota@Sun.COM 			struct rdsv3_rdma_op *ro = rm->m_rdma_op;
63712198SEiji.Ota@Sun.COM 			struct rdsv3_notifier *notifier;
63812198SEiji.Ota@Sun.COM 
63912198SEiji.Ota@Sun.COM 			list_remove_node(&rm->m_sock_item);
64012198SEiji.Ota@Sun.COM 			rdsv3_send_sndbuf_remove(rs, rm);
64112414SEiji.Ota@Sun.COM 			if (ro && ro->r_notifier &&
64212198SEiji.Ota@Sun.COM 			    (status || ro->r_notify)) {
64312414SEiji.Ota@Sun.COM 				notifier = ro->r_notifier;
64412198SEiji.Ota@Sun.COM 				list_insert_tail(&rs->rs_notify_queue,
64512198SEiji.Ota@Sun.COM 				    notifier);
64612198SEiji.Ota@Sun.COM 				if (!notifier->n_status)
64712198SEiji.Ota@Sun.COM 					notifier->n_status = status;
64812198SEiji.Ota@Sun.COM 				rm->m_rdma_op->r_notifier = NULL;
64912198SEiji.Ota@Sun.COM 			}
65012414SEiji.Ota@Sun.COM 			was_on_sock = 1;
65112198SEiji.Ota@Sun.COM 			rm->m_rs = NULL;
65212198SEiji.Ota@Sun.COM 		}
65312198SEiji.Ota@Sun.COM 		mutex_exit(&rs->rs_lock);
65412198SEiji.Ota@Sun.COM 
65512198SEiji.Ota@Sun.COM unlock_and_drop:
65612198SEiji.Ota@Sun.COM 		mutex_exit(&rm->m_rs_lock);
65712198SEiji.Ota@Sun.COM 		rdsv3_message_put(rm);
65812414SEiji.Ota@Sun.COM 		if (was_on_sock)
65912414SEiji.Ota@Sun.COM 			rdsv3_message_put(rm);
66012198SEiji.Ota@Sun.COM 	}
66112198SEiji.Ota@Sun.COM 
66212198SEiji.Ota@Sun.COM 	if (rs) {
66312198SEiji.Ota@Sun.COM 		rdsv3_wake_sk_sleep(rs);
66412198SEiji.Ota@Sun.COM 		rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs));
66512198SEiji.Ota@Sun.COM 	}
66612198SEiji.Ota@Sun.COM 
66712198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Return");
66812198SEiji.Ota@Sun.COM }
66912198SEiji.Ota@Sun.COM 
67012198SEiji.Ota@Sun.COM /*
67112198SEiji.Ota@Sun.COM  * Transports call here when they've determined that the receiver queued
67212198SEiji.Ota@Sun.COM  * messages up to, and including, the given sequence number.  Messages are
67312198SEiji.Ota@Sun.COM  * moved to the retrans queue when rdsv3_send_xmit picks them off the send
67412198SEiji.Ota@Sun.COM  * queue. This means that in the TCP case, the message may not have been
67512198SEiji.Ota@Sun.COM  * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked
67612198SEiji.Ota@Sun.COM  * checks the RDSV3_MSG_HAS_ACK_SEQ bit.
67712198SEiji.Ota@Sun.COM  *
67812198SEiji.Ota@Sun.COM  * XXX It's not clear to me how this is safely serialized with socket
67912198SEiji.Ota@Sun.COM  * destruction.  Maybe it should bail if it sees SOCK_DEAD.
68012198SEiji.Ota@Sun.COM  */
68112198SEiji.Ota@Sun.COM void
rdsv3_send_drop_acked(struct rdsv3_connection * conn,uint64_t ack,is_acked_func is_acked)68212198SEiji.Ota@Sun.COM rdsv3_send_drop_acked(struct rdsv3_connection *conn, uint64_t ack,
68312198SEiji.Ota@Sun.COM     is_acked_func is_acked)
68412198SEiji.Ota@Sun.COM {
68512198SEiji.Ota@Sun.COM 	struct rdsv3_message *rm, *tmp;
68612198SEiji.Ota@Sun.COM 	list_t list;
68712198SEiji.Ota@Sun.COM 
68812198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Enter(conn: %p)", conn);
68912198SEiji.Ota@Sun.COM 
69012198SEiji.Ota@Sun.COM 	list_create(&list, sizeof (struct rdsv3_message),
69112198SEiji.Ota@Sun.COM 	    offsetof(struct rdsv3_message, m_conn_item));
69212198SEiji.Ota@Sun.COM 
69312198SEiji.Ota@Sun.COM 	mutex_enter(&conn->c_lock);
69412198SEiji.Ota@Sun.COM 
69512198SEiji.Ota@Sun.COM 	RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) {
69612198SEiji.Ota@Sun.COM 		if (!rdsv3_send_is_acked(rm, ack, is_acked))
69712198SEiji.Ota@Sun.COM 			break;
69812198SEiji.Ota@Sun.COM 
69912198SEiji.Ota@Sun.COM 		list_remove_node(&rm->m_conn_item);
70012198SEiji.Ota@Sun.COM 		list_insert_tail(&list, rm);
70112198SEiji.Ota@Sun.COM 		clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
70212198SEiji.Ota@Sun.COM 	}
70312198SEiji.Ota@Sun.COM 
70412198SEiji.Ota@Sun.COM #if 0
70512198SEiji.Ota@Sun.COM XXX
70612198SEiji.Ota@Sun.COM 	/* order flag updates with spin locks */
70712198SEiji.Ota@Sun.COM 	if (!list_is_empty(&list))
70812198SEiji.Ota@Sun.COM 		smp_mb__after_clear_bit();
70912198SEiji.Ota@Sun.COM #endif
71012198SEiji.Ota@Sun.COM 
71112198SEiji.Ota@Sun.COM 	mutex_exit(&conn->c_lock);
71212198SEiji.Ota@Sun.COM 
71312198SEiji.Ota@Sun.COM 	/* now remove the messages from the sock list as needed */
71412863SEiji.Ota@Sun.COM 	rdsv3_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
71512198SEiji.Ota@Sun.COM 
71612198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Return(conn: %p)", conn);
71712198SEiji.Ota@Sun.COM }
71812198SEiji.Ota@Sun.COM 
71912198SEiji.Ota@Sun.COM void
rdsv3_send_drop_to(struct rdsv3_sock * rs,struct sockaddr_in * dest)72012198SEiji.Ota@Sun.COM rdsv3_send_drop_to(struct rdsv3_sock *rs, struct sockaddr_in *dest)
72112198SEiji.Ota@Sun.COM {
72212198SEiji.Ota@Sun.COM 	struct rdsv3_message *rm, *tmp;
72312198SEiji.Ota@Sun.COM 	struct rdsv3_connection *conn;
72412198SEiji.Ota@Sun.COM 	list_t list;
72512198SEiji.Ota@Sun.COM 	int wake = 0;
72612198SEiji.Ota@Sun.COM 
72712198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_drop_to", "Enter(rs: %p)", rs);
72812198SEiji.Ota@Sun.COM 
72912198SEiji.Ota@Sun.COM 	list_create(&list, sizeof (struct rdsv3_message),
73012198SEiji.Ota@Sun.COM 	    offsetof(struct rdsv3_message, m_sock_item));
73112198SEiji.Ota@Sun.COM 
73212198SEiji.Ota@Sun.COM 	/* get all the messages we're dropping under the rs lock */
73312198SEiji.Ota@Sun.COM 	mutex_enter(&rs->rs_lock);
73412198SEiji.Ota@Sun.COM 
73512198SEiji.Ota@Sun.COM 	RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &rs->rs_send_queue,
73612198SEiji.Ota@Sun.COM 	    m_sock_item) {
73712198SEiji.Ota@Sun.COM 		if (dest && (dest->sin_addr.s_addr != rm->m_daddr ||
73812198SEiji.Ota@Sun.COM 		    dest->sin_port != rm->m_inc.i_hdr.h_dport))
73912198SEiji.Ota@Sun.COM 			continue;
74012198SEiji.Ota@Sun.COM 		wake = 1;
74112198SEiji.Ota@Sun.COM 		list_remove(&rs->rs_send_queue, rm);
74212198SEiji.Ota@Sun.COM 		list_insert_tail(&list, rm);
74312198SEiji.Ota@Sun.COM 		rdsv3_send_sndbuf_remove(rs, rm);
74412198SEiji.Ota@Sun.COM 		clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags);
74512198SEiji.Ota@Sun.COM 	}
74612198SEiji.Ota@Sun.COM 
74712198SEiji.Ota@Sun.COM 	mutex_exit(&rs->rs_lock);
74812198SEiji.Ota@Sun.COM 
74912198SEiji.Ota@Sun.COM 	conn = NULL;
75012198SEiji.Ota@Sun.COM 
75112198SEiji.Ota@Sun.COM 	/* now remove the messages from the conn list as needed */
75212198SEiji.Ota@Sun.COM 	RDSV3_FOR_EACH_LIST_NODE(rm, &list, m_sock_item) {
75312198SEiji.Ota@Sun.COM 		/*
75412198SEiji.Ota@Sun.COM 		 * We do this here rather than in the loop above, so that
75512198SEiji.Ota@Sun.COM 		 * we don't have to nest m_rs_lock under rs->rs_lock
75612198SEiji.Ota@Sun.COM 		 */
75712198SEiji.Ota@Sun.COM 		mutex_enter(&rm->m_rs_lock);
75812198SEiji.Ota@Sun.COM 		/* If this is a RDMA operation, notify the app. */
75912863SEiji.Ota@Sun.COM 		__rdsv3_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED);
76012198SEiji.Ota@Sun.COM 		rm->m_rs = NULL;
76112198SEiji.Ota@Sun.COM 		mutex_exit(&rm->m_rs_lock);
76212198SEiji.Ota@Sun.COM 
76312198SEiji.Ota@Sun.COM 		/*
76412198SEiji.Ota@Sun.COM 		 * If we see this flag cleared then we're *sure* that someone
76512198SEiji.Ota@Sun.COM 		 * else beat us to removing it from the conn.  If we race
76612198SEiji.Ota@Sun.COM 		 * with their flag update we'll get the lock and then really
76712198SEiji.Ota@Sun.COM 		 * see that the flag has been cleared.
76812198SEiji.Ota@Sun.COM 		 */
76912198SEiji.Ota@Sun.COM 		if (!test_bit(RDSV3_MSG_ON_CONN, &rm->m_flags))
77012198SEiji.Ota@Sun.COM 			continue;
77112198SEiji.Ota@Sun.COM 
77212198SEiji.Ota@Sun.COM 		if (conn != rm->m_inc.i_conn) {
77312198SEiji.Ota@Sun.COM 			if (conn)
77412198SEiji.Ota@Sun.COM 				mutex_exit(&conn->c_lock);
77512198SEiji.Ota@Sun.COM 			conn = rm->m_inc.i_conn;
77612198SEiji.Ota@Sun.COM 			mutex_enter(&conn->c_lock);
77712198SEiji.Ota@Sun.COM 		}
77812198SEiji.Ota@Sun.COM 
77912198SEiji.Ota@Sun.COM 		if (test_and_clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) {
78012198SEiji.Ota@Sun.COM 			list_remove_node(&rm->m_conn_item);
78112198SEiji.Ota@Sun.COM 			rdsv3_message_put(rm);
78212198SEiji.Ota@Sun.COM 		}
78312198SEiji.Ota@Sun.COM 	}
78412198SEiji.Ota@Sun.COM 
78512198SEiji.Ota@Sun.COM 	if (conn)
78612198SEiji.Ota@Sun.COM 		mutex_exit(&conn->c_lock);
78712198SEiji.Ota@Sun.COM 
78812198SEiji.Ota@Sun.COM 	if (wake)
78912198SEiji.Ota@Sun.COM 		rdsv3_wake_sk_sleep(rs);
79012198SEiji.Ota@Sun.COM 
79112198SEiji.Ota@Sun.COM 	while (!list_is_empty(&list)) {
79212198SEiji.Ota@Sun.COM 		rm = list_remove_head(&list);
79312198SEiji.Ota@Sun.COM 
79412198SEiji.Ota@Sun.COM 		rdsv3_message_wait(rm);
79512198SEiji.Ota@Sun.COM 		rdsv3_message_put(rm);
79612198SEiji.Ota@Sun.COM 	}
79712198SEiji.Ota@Sun.COM 
79812198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_drop_to", "Return(rs: %p)", rs);
79912198SEiji.Ota@Sun.COM }
80012198SEiji.Ota@Sun.COM 
80112198SEiji.Ota@Sun.COM /*
80212198SEiji.Ota@Sun.COM  * we only want this to fire once so we use the callers 'queued'.  It's
80312198SEiji.Ota@Sun.COM  * possible that another thread can race with us and remove the
80412198SEiji.Ota@Sun.COM  * message from the flow with RDSV3_CANCEL_SENT_TO.
80512198SEiji.Ota@Sun.COM  */
80612198SEiji.Ota@Sun.COM static int
rdsv3_send_queue_rm(struct rdsv3_sock * rs,struct rdsv3_connection * conn,struct rdsv3_message * rm,uint16_be_t sport,uint16_be_t dport,int * queued)80712198SEiji.Ota@Sun.COM rdsv3_send_queue_rm(struct rdsv3_sock *rs, struct rdsv3_connection *conn,
80812198SEiji.Ota@Sun.COM     struct rdsv3_message *rm, uint16_be_t sport,
80912198SEiji.Ota@Sun.COM     uint16_be_t dport, int *queued)
81012198SEiji.Ota@Sun.COM {
81112198SEiji.Ota@Sun.COM 	uint32_t len;
81212198SEiji.Ota@Sun.COM 
81312198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Enter(rs: %p, rm: %p)", rs, rm);
81412198SEiji.Ota@Sun.COM 
81512198SEiji.Ota@Sun.COM 	if (*queued)
81612198SEiji.Ota@Sun.COM 		goto out;
81712198SEiji.Ota@Sun.COM 
81812198SEiji.Ota@Sun.COM 	len = ntohl(rm->m_inc.i_hdr.h_len);
81912198SEiji.Ota@Sun.COM 
82012198SEiji.Ota@Sun.COM 	/*
82112198SEiji.Ota@Sun.COM 	 * this is the only place which holds both the socket's rs_lock
82212198SEiji.Ota@Sun.COM 	 * and the connection's c_lock
82312198SEiji.Ota@Sun.COM 	 */
82412198SEiji.Ota@Sun.COM 	mutex_enter(&rs->rs_lock);
82512198SEiji.Ota@Sun.COM 
82612198SEiji.Ota@Sun.COM 	/*
82712198SEiji.Ota@Sun.COM 	 * If there is a little space in sndbuf, we don't queue anything,
82812198SEiji.Ota@Sun.COM 	 * and userspace gets -EAGAIN. But poll() indicates there's send
82912198SEiji.Ota@Sun.COM 	 * room. This can lead to bad behavior (spinning) if snd_bytes isn't
83012198SEiji.Ota@Sun.COM 	 * freed up by incoming acks. So we check the *old* value of
83112198SEiji.Ota@Sun.COM 	 * rs_snd_bytes here to allow the last msg to exceed the buffer,
83212198SEiji.Ota@Sun.COM 	 * and poll() now knows no more data can be sent.
83312198SEiji.Ota@Sun.COM 	 */
83412198SEiji.Ota@Sun.COM 	if (rs->rs_snd_bytes < rdsv3_sk_sndbuf(rs)) {
83512198SEiji.Ota@Sun.COM 		rs->rs_snd_bytes += len;
83612198SEiji.Ota@Sun.COM 
83712198SEiji.Ota@Sun.COM 		/*
83812198SEiji.Ota@Sun.COM 		 * let recv side know we are close to send space exhaustion.
83912198SEiji.Ota@Sun.COM 		 * This is probably not the optimal way to do it, as this
84012198SEiji.Ota@Sun.COM 		 * means we set the flag on *all* messages as soon as our
84112198SEiji.Ota@Sun.COM 		 * throughput hits a certain threshold.
84212198SEiji.Ota@Sun.COM 		 */
84312198SEiji.Ota@Sun.COM 		if (rs->rs_snd_bytes >= rdsv3_sk_sndbuf(rs) / 2)
84412198SEiji.Ota@Sun.COM 			set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags);
84512198SEiji.Ota@Sun.COM 
84612198SEiji.Ota@Sun.COM 		list_insert_tail(&rs->rs_send_queue, rm);
84712198SEiji.Ota@Sun.COM 		set_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags);
84812198SEiji.Ota@Sun.COM 
84912198SEiji.Ota@Sun.COM 		rdsv3_message_addref(rm);
85012198SEiji.Ota@Sun.COM 		rm->m_rs = rs;
85112198SEiji.Ota@Sun.COM 
85212198SEiji.Ota@Sun.COM 		/*
85312198SEiji.Ota@Sun.COM 		 * The code ordering is a little weird, but we're
85412198SEiji.Ota@Sun.COM 		 * trying to minimize the time we hold c_lock
85512198SEiji.Ota@Sun.COM 		 */
85612198SEiji.Ota@Sun.COM 		rdsv3_message_populate_header(&rm->m_inc.i_hdr, sport,
85712198SEiji.Ota@Sun.COM 		    dport, 0);
85812198SEiji.Ota@Sun.COM 		rm->m_inc.i_conn = conn;
85912198SEiji.Ota@Sun.COM 		rdsv3_message_addref(rm);	/* XXX - called twice */
86012198SEiji.Ota@Sun.COM 
86112198SEiji.Ota@Sun.COM 		mutex_enter(&conn->c_lock);
86212198SEiji.Ota@Sun.COM 		rm->m_inc.i_hdr.h_sequence = htonll(conn->c_next_tx_seq++);
86312198SEiji.Ota@Sun.COM 		list_insert_tail(&conn->c_send_queue, rm);
86412198SEiji.Ota@Sun.COM 		set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
86512198SEiji.Ota@Sun.COM 		mutex_exit(&conn->c_lock);
86612198SEiji.Ota@Sun.COM 
86712198SEiji.Ota@Sun.COM 		RDSV3_DPRINTF5("rdsv3_send_queue_rm",
86812198SEiji.Ota@Sun.COM 		    "queued msg %p len %d, rs %p bytes %d seq %llu",
86912198SEiji.Ota@Sun.COM 		    rm, len, rs, rs->rs_snd_bytes,
87012198SEiji.Ota@Sun.COM 		    (unsigned long long)ntohll(
87112198SEiji.Ota@Sun.COM 		    rm->m_inc.i_hdr.h_sequence));
87212198SEiji.Ota@Sun.COM 
87312198SEiji.Ota@Sun.COM 		*queued = 1;
87412198SEiji.Ota@Sun.COM 	}
87512198SEiji.Ota@Sun.COM 
87612198SEiji.Ota@Sun.COM 	mutex_exit(&rs->rs_lock);
87712198SEiji.Ota@Sun.COM 
87812198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Return(rs: %p)", rs);
87912198SEiji.Ota@Sun.COM out:
88012198SEiji.Ota@Sun.COM 	return (*queued);
88112198SEiji.Ota@Sun.COM }
88212198SEiji.Ota@Sun.COM 
88312198SEiji.Ota@Sun.COM static int
rdsv3_cmsg_send(struct rdsv3_sock * rs,struct rdsv3_message * rm,struct msghdr * msg,int * allocated_mr)88412198SEiji.Ota@Sun.COM rdsv3_cmsg_send(struct rdsv3_sock *rs, struct rdsv3_message *rm,
88512198SEiji.Ota@Sun.COM     struct msghdr *msg, int *allocated_mr)
88612198SEiji.Ota@Sun.COM {
88712198SEiji.Ota@Sun.COM 	struct cmsghdr *cmsg;
88812198SEiji.Ota@Sun.COM 	int ret = 0;
88912198SEiji.Ota@Sun.COM 
89012198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_cmsg_send", "Enter(rs: %p)", rs);
89112198SEiji.Ota@Sun.COM 
89212198SEiji.Ota@Sun.COM 	for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
89312198SEiji.Ota@Sun.COM 
89412198SEiji.Ota@Sun.COM 		if (cmsg->cmsg_level != SOL_RDS)
89512198SEiji.Ota@Sun.COM 			continue;
89612198SEiji.Ota@Sun.COM 
89712198SEiji.Ota@Sun.COM 		RDSV3_DPRINTF4("rdsv3_cmsg_send", "cmsg(%p, %p) type %d",
89812198SEiji.Ota@Sun.COM 		    cmsg, rm, cmsg->cmsg_type);
89912198SEiji.Ota@Sun.COM 		/*
90012198SEiji.Ota@Sun.COM 		 * As a side effect, RDMA_DEST and RDMA_MAP will set
90112198SEiji.Ota@Sun.COM 		 * rm->m_rdma_cookie and rm->m_rdma_mr.
90212198SEiji.Ota@Sun.COM 		 */
90312198SEiji.Ota@Sun.COM 		switch (cmsg->cmsg_type) {
90412863SEiji.Ota@Sun.COM 		case RDS_CMSG_RDMA_ARGS:
90512198SEiji.Ota@Sun.COM 			ret = rdsv3_cmsg_rdma_args(rs, rm, cmsg);
90612198SEiji.Ota@Sun.COM 			break;
90712198SEiji.Ota@Sun.COM 
90812863SEiji.Ota@Sun.COM 		case RDS_CMSG_RDMA_DEST:
90912198SEiji.Ota@Sun.COM 			ret = rdsv3_cmsg_rdma_dest(rs, rm, cmsg);
91012198SEiji.Ota@Sun.COM 			break;
91112198SEiji.Ota@Sun.COM 
91212863SEiji.Ota@Sun.COM 		case RDS_CMSG_RDMA_MAP:
91312198SEiji.Ota@Sun.COM 			ret = rdsv3_cmsg_rdma_map(rs, rm, cmsg);
91412198SEiji.Ota@Sun.COM 			if (ret)
91512198SEiji.Ota@Sun.COM 				*allocated_mr = 1;
91612198SEiji.Ota@Sun.COM 			break;
91712198SEiji.Ota@Sun.COM 
91812198SEiji.Ota@Sun.COM 		default:
91912198SEiji.Ota@Sun.COM 			return (-EINVAL);
92012198SEiji.Ota@Sun.COM 		}
92112198SEiji.Ota@Sun.COM 
92212198SEiji.Ota@Sun.COM 		if (ret)
92312198SEiji.Ota@Sun.COM 			break;
92412198SEiji.Ota@Sun.COM 	}
92512198SEiji.Ota@Sun.COM 
92612198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_cmsg_send", "Return(rs: %p)", rs);
92712198SEiji.Ota@Sun.COM 
92812198SEiji.Ota@Sun.COM 	return (ret);
92912198SEiji.Ota@Sun.COM }
93012198SEiji.Ota@Sun.COM 
93112794SGiri.Adari@Sun.COM extern unsigned long rdsv3_max_bcopy_size;
93212794SGiri.Adari@Sun.COM 
93312198SEiji.Ota@Sun.COM int
rdsv3_sendmsg(struct rdsv3_sock * rs,uio_t * uio,struct nmsghdr * msg,size_t payload_len)93412198SEiji.Ota@Sun.COM rdsv3_sendmsg(struct rdsv3_sock *rs, uio_t *uio, struct nmsghdr *msg,
93512198SEiji.Ota@Sun.COM     size_t payload_len)
93612198SEiji.Ota@Sun.COM {
93712198SEiji.Ota@Sun.COM 	struct rsock *sk = rdsv3_rs_to_sk(rs);
93812198SEiji.Ota@Sun.COM 	struct sockaddr_in *usin = (struct sockaddr_in *)msg->msg_name;
93912198SEiji.Ota@Sun.COM 	uint32_be_t daddr;
94012198SEiji.Ota@Sun.COM 	uint16_be_t dport;
94112198SEiji.Ota@Sun.COM 	struct rdsv3_message *rm = NULL;
94212198SEiji.Ota@Sun.COM 	struct rdsv3_connection *conn;
94312198SEiji.Ota@Sun.COM 	int ret = 0;
94412198SEiji.Ota@Sun.COM 	int queued = 0, allocated_mr = 0;
94512198SEiji.Ota@Sun.COM 	int nonblock = msg->msg_flags & MSG_DONTWAIT;
94612414SEiji.Ota@Sun.COM 	long timeo = rdsv3_sndtimeo(sk, nonblock);
94712198SEiji.Ota@Sun.COM 
94812198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_sendmsg", "Enter(rs: %p)", rs);
94912198SEiji.Ota@Sun.COM 
95012198SEiji.Ota@Sun.COM 	if (msg->msg_namelen) {
95112198SEiji.Ota@Sun.COM 		/* XXX fail non-unicast destination IPs? */
95212198SEiji.Ota@Sun.COM 		if (msg->msg_namelen < sizeof (*usin) ||
95312198SEiji.Ota@Sun.COM 		    usin->sin_family != AF_INET_OFFLOAD) {
95412198SEiji.Ota@Sun.COM 			ret = -EINVAL;
95512198SEiji.Ota@Sun.COM 			RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret);
95612198SEiji.Ota@Sun.COM 			goto out;
95712198SEiji.Ota@Sun.COM 		}
95812198SEiji.Ota@Sun.COM 		daddr = usin->sin_addr.s_addr;
95912198SEiji.Ota@Sun.COM 		dport = usin->sin_port;
96012198SEiji.Ota@Sun.COM 	} else {
96112198SEiji.Ota@Sun.COM 		/* We only care about consistency with ->connect() */
96212198SEiji.Ota@Sun.COM 		mutex_enter(&sk->sk_lock);
96312198SEiji.Ota@Sun.COM 		daddr = rs->rs_conn_addr;
96412198SEiji.Ota@Sun.COM 		dport = rs->rs_conn_port;
96512198SEiji.Ota@Sun.COM 		mutex_exit(&sk->sk_lock);
96612198SEiji.Ota@Sun.COM 	}
96712198SEiji.Ota@Sun.COM 
96812198SEiji.Ota@Sun.COM 	/* racing with another thread binding seems ok here */
96912198SEiji.Ota@Sun.COM 	if (daddr == 0 || rs->rs_bound_addr == 0) {
97012198SEiji.Ota@Sun.COM 		ret = -ENOTCONN; /* XXX not a great errno */
97112198SEiji.Ota@Sun.COM 		RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret);
97212198SEiji.Ota@Sun.COM 		goto out;
97312198SEiji.Ota@Sun.COM 	}
97412198SEiji.Ota@Sun.COM 
97512794SGiri.Adari@Sun.COM 	if (payload_len > rdsv3_max_bcopy_size) {
97612794SGiri.Adari@Sun.COM 		RDSV3_DPRINTF2("rdsv3_sendmsg", "Message too large: %d",
97712794SGiri.Adari@Sun.COM 		    payload_len);
97812794SGiri.Adari@Sun.COM 		ret = -EMSGSIZE;
97912794SGiri.Adari@Sun.COM 		goto out;
98012794SGiri.Adari@Sun.COM 	}
98112794SGiri.Adari@Sun.COM 
98212198SEiji.Ota@Sun.COM 	rm = rdsv3_message_copy_from_user(uio, payload_len);
98312198SEiji.Ota@Sun.COM 	if (IS_ERR(rm)) {
98412198SEiji.Ota@Sun.COM 		ret = PTR_ERR(rm);
98512198SEiji.Ota@Sun.COM 		RDSV3_DPRINTF2("rdsv3_sendmsg",
98612198SEiji.Ota@Sun.COM 		    "rdsv3_message_copy_from_user failed %d", -ret);
98712198SEiji.Ota@Sun.COM 		rm = NULL;
98812198SEiji.Ota@Sun.COM 		goto out;
98912198SEiji.Ota@Sun.COM 	}
99012198SEiji.Ota@Sun.COM 
99112198SEiji.Ota@Sun.COM 	rm->m_daddr = daddr;
99212198SEiji.Ota@Sun.COM 
99312414SEiji.Ota@Sun.COM 	/* Parse any control messages the user may have included. */
99412414SEiji.Ota@Sun.COM 	ret = rdsv3_cmsg_send(rs, rm, msg, &allocated_mr);
99512414SEiji.Ota@Sun.COM 	if (ret) {
99612414SEiji.Ota@Sun.COM 		RDSV3_DPRINTF2("rdsv3_sendmsg",
99712414SEiji.Ota@Sun.COM 		    "rdsv3_cmsg_send(rs: %p rm: %p msg: %p) returned: %d",
99812414SEiji.Ota@Sun.COM 		    rs, rm, msg, ret);
99912414SEiji.Ota@Sun.COM 		goto out;
100012414SEiji.Ota@Sun.COM 	}
100112414SEiji.Ota@Sun.COM 
100212198SEiji.Ota@Sun.COM 	/*
100312198SEiji.Ota@Sun.COM 	 * rdsv3_conn_create has a spinlock that runs with IRQ off.
100412198SEiji.Ota@Sun.COM 	 * Caching the conn in the socket helps a lot.
100512198SEiji.Ota@Sun.COM 	 */
100612198SEiji.Ota@Sun.COM 	mutex_enter(&rs->rs_conn_lock);
100712198SEiji.Ota@Sun.COM 	if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) {
100812198SEiji.Ota@Sun.COM 		conn = rs->rs_conn;
100912198SEiji.Ota@Sun.COM 	} else {
101012198SEiji.Ota@Sun.COM 		conn = rdsv3_conn_create_outgoing(rs->rs_bound_addr,
101112198SEiji.Ota@Sun.COM 		    daddr, rs->rs_transport, KM_NOSLEEP);
101212198SEiji.Ota@Sun.COM 		if (IS_ERR(conn)) {
101312198SEiji.Ota@Sun.COM 			mutex_exit(&rs->rs_conn_lock);
101412198SEiji.Ota@Sun.COM 			ret = PTR_ERR(conn);
101512198SEiji.Ota@Sun.COM 			RDSV3_DPRINTF2("rdsv3_sendmsg",
101612198SEiji.Ota@Sun.COM 			    "rdsv3_conn_create_outgoing failed %d",
101712198SEiji.Ota@Sun.COM 			    -ret);
101812198SEiji.Ota@Sun.COM 			goto out;
101912198SEiji.Ota@Sun.COM 		}
102012198SEiji.Ota@Sun.COM 		rs->rs_conn = conn;
102112198SEiji.Ota@Sun.COM 	}
102212198SEiji.Ota@Sun.COM 	mutex_exit(&rs->rs_conn_lock);
102312198SEiji.Ota@Sun.COM 
102412198SEiji.Ota@Sun.COM 	if ((rm->m_rdma_cookie || rm->m_rdma_op) &&
102512198SEiji.Ota@Sun.COM 	    conn->c_trans->xmit_rdma == NULL) {
102612320SGiri.Adari@Sun.COM 		RDSV3_DPRINTF2("rdsv3_sendmsg", "rdma_op %p conn xmit_rdma %p",
102712198SEiji.Ota@Sun.COM 		    rm->m_rdma_op, conn->c_trans->xmit_rdma);
102812198SEiji.Ota@Sun.COM 		ret = -EOPNOTSUPP;
102912198SEiji.Ota@Sun.COM 		goto out;
103012198SEiji.Ota@Sun.COM 	}
103112198SEiji.Ota@Sun.COM 
103212198SEiji.Ota@Sun.COM 	/*
103312198SEiji.Ota@Sun.COM 	 * If the connection is down, trigger a connect. We may
103412198SEiji.Ota@Sun.COM 	 * have scheduled a delayed reconnect however - in this case
103512198SEiji.Ota@Sun.COM 	 * we should not interfere.
103612198SEiji.Ota@Sun.COM 	 */
103712198SEiji.Ota@Sun.COM 	if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN &&
103812198SEiji.Ota@Sun.COM 	    !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags))
103912198SEiji.Ota@Sun.COM 		rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0);
104012198SEiji.Ota@Sun.COM 
104112198SEiji.Ota@Sun.COM 	ret = rdsv3_cong_wait(conn->c_fcong, dport, nonblock, rs);
104212198SEiji.Ota@Sun.COM 	if (ret) {
104312676SEiji.Ota@Sun.COM 		mutex_enter(&rs->rs_congested_lock);
104412414SEiji.Ota@Sun.COM 		rs->rs_seen_congestion = 1;
104512676SEiji.Ota@Sun.COM 		cv_signal(&rs->rs_congested_cv);
104612676SEiji.Ota@Sun.COM 		mutex_exit(&rs->rs_congested_lock);
104712414SEiji.Ota@Sun.COM 
104812198SEiji.Ota@Sun.COM 		RDSV3_DPRINTF2("rdsv3_sendmsg",
104912198SEiji.Ota@Sun.COM 		    "rdsv3_cong_wait (dport: %d) returned: %d", dport, ret);
105012198SEiji.Ota@Sun.COM 		goto out;
105112198SEiji.Ota@Sun.COM 	}
105212198SEiji.Ota@Sun.COM 
105312198SEiji.Ota@Sun.COM 	(void) rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, dport,
105412198SEiji.Ota@Sun.COM 	    &queued);
105512198SEiji.Ota@Sun.COM 	if (!queued) {
105612198SEiji.Ota@Sun.COM 		/* rdsv3_stats_inc(s_send_queue_full); */
105712198SEiji.Ota@Sun.COM 		/* XXX make sure this is reasonable */
105812198SEiji.Ota@Sun.COM 		if (payload_len > rdsv3_sk_sndbuf(rs)) {
105912198SEiji.Ota@Sun.COM 			ret = -EMSGSIZE;
106012198SEiji.Ota@Sun.COM 			RDSV3_DPRINTF2("rdsv3_sendmsg",
106112198SEiji.Ota@Sun.COM 			    "msgsize(%d) too big, returning: %d",
106212198SEiji.Ota@Sun.COM 			    payload_len, -ret);
106312198SEiji.Ota@Sun.COM 			goto out;
106412198SEiji.Ota@Sun.COM 		}
106512198SEiji.Ota@Sun.COM 		if (nonblock) {
106612198SEiji.Ota@Sun.COM 			ret = -EAGAIN;
106712198SEiji.Ota@Sun.COM 			RDSV3_DPRINTF3("rdsv3_sendmsg",
106812198SEiji.Ota@Sun.COM 			    "send queue full (%d), returning: %d",
106912198SEiji.Ota@Sun.COM 			    payload_len, -ret);
107012198SEiji.Ota@Sun.COM 			goto out;
107112198SEiji.Ota@Sun.COM 		}
107212198SEiji.Ota@Sun.COM 
107312320SGiri.Adari@Sun.COM #if 0
107412320SGiri.Adari@Sun.COM 		ret = rdsv3_wait_sig(sk->sk_sleep,
107512320SGiri.Adari@Sun.COM 		    (rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
107612320SGiri.Adari@Sun.COM 		    dport, &queued)));
107712320SGiri.Adari@Sun.COM 		if (ret == 0) {
107812320SGiri.Adari@Sun.COM 			/* signal/timeout pending */
107912320SGiri.Adari@Sun.COM 			RDSV3_DPRINTF2("rdsv3_sendmsg",
108012320SGiri.Adari@Sun.COM 			    "woke due to signal: %d", ret);
108112320SGiri.Adari@Sun.COM 			ret = -ERESTART;
108212320SGiri.Adari@Sun.COM 			goto out;
108312320SGiri.Adari@Sun.COM 		}
108412320SGiri.Adari@Sun.COM #else
108512198SEiji.Ota@Sun.COM 		mutex_enter(&sk->sk_sleep->waitq_mutex);
108612320SGiri.Adari@Sun.COM 		sk->sk_sleep->waitq_waiters++;
108712198SEiji.Ota@Sun.COM 		while (!rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
108812198SEiji.Ota@Sun.COM 		    dport, &queued)) {
108912198SEiji.Ota@Sun.COM 			ret = cv_wait_sig(&sk->sk_sleep->waitq_cv,
109012198SEiji.Ota@Sun.COM 			    &sk->sk_sleep->waitq_mutex);
109112198SEiji.Ota@Sun.COM 			if (ret == 0) {
109212198SEiji.Ota@Sun.COM 				/* signal/timeout pending */
109312198SEiji.Ota@Sun.COM 				RDSV3_DPRINTF2("rdsv3_sendmsg",
109412320SGiri.Adari@Sun.COM 				    "woke due to signal: %d", ret);
1095*13118SEiji.Ota@Sun.COM 				ret = -EINTR;
109612320SGiri.Adari@Sun.COM 				sk->sk_sleep->waitq_waiters--;
109712198SEiji.Ota@Sun.COM 				mutex_exit(&sk->sk_sleep->waitq_mutex);
109812198SEiji.Ota@Sun.COM 				goto out;
109912198SEiji.Ota@Sun.COM 			}
110012320SGiri.Adari@Sun.COM 		}
110112320SGiri.Adari@Sun.COM 		sk->sk_sleep->waitq_waiters--;
110212320SGiri.Adari@Sun.COM 		mutex_exit(&sk->sk_sleep->waitq_mutex);
110312198SEiji.Ota@Sun.COM #endif
110412198SEiji.Ota@Sun.COM 
110512198SEiji.Ota@Sun.COM 		RDSV3_DPRINTF5("rdsv3_sendmsg", "sendmsg woke queued %d",
110612198SEiji.Ota@Sun.COM 		    queued);
110712198SEiji.Ota@Sun.COM 
110812198SEiji.Ota@Sun.COM 		ASSERT(queued);
110912198SEiji.Ota@Sun.COM 		ret = 0;
111012198SEiji.Ota@Sun.COM 	}
111112198SEiji.Ota@Sun.COM 
111212198SEiji.Ota@Sun.COM 	/*
111312198SEiji.Ota@Sun.COM 	 * By now we've committed to the send.  We reuse rdsv3_send_worker()
111412198SEiji.Ota@Sun.COM 	 * to retry sends in the rds thread if the transport asks us to.
111512198SEiji.Ota@Sun.COM 	 */
111612198SEiji.Ota@Sun.COM 	rdsv3_stats_inc(s_send_queued);
111712198SEiji.Ota@Sun.COM 
111812198SEiji.Ota@Sun.COM 	if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags))
111912794SGiri.Adari@Sun.COM 		(void) rdsv3_send_worker(&conn->c_send_w.work);
112012198SEiji.Ota@Sun.COM 
112112198SEiji.Ota@Sun.COM 	rdsv3_message_put(rm);
112212198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_sendmsg", "Return(rs: %p, len: %d)",
112312198SEiji.Ota@Sun.COM 	    rs, payload_len);
112412198SEiji.Ota@Sun.COM 	return (payload_len);
112512198SEiji.Ota@Sun.COM 
112612198SEiji.Ota@Sun.COM out:
112712198SEiji.Ota@Sun.COM 	/*
112812198SEiji.Ota@Sun.COM 	 * If the user included a RDMA_MAP cmsg, we allocated a MR on the fly.
112912198SEiji.Ota@Sun.COM 	 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN
113012198SEiji.Ota@Sun.COM 	 * or in any other way, we need to destroy the MR again
113112198SEiji.Ota@Sun.COM 	 */
113212198SEiji.Ota@Sun.COM 	if (allocated_mr)
113312198SEiji.Ota@Sun.COM 		rdsv3_rdma_unuse(rs, rdsv3_rdma_cookie_key(rm->m_rdma_cookie),
113412198SEiji.Ota@Sun.COM 		    1);
113512198SEiji.Ota@Sun.COM 
113612198SEiji.Ota@Sun.COM 	if (rm)
113712198SEiji.Ota@Sun.COM 		rdsv3_message_put(rm);
113812198SEiji.Ota@Sun.COM 	return (ret);
113912198SEiji.Ota@Sun.COM }
114012198SEiji.Ota@Sun.COM 
114112198SEiji.Ota@Sun.COM /*
114212198SEiji.Ota@Sun.COM  * Reply to a ping packet.
114312198SEiji.Ota@Sun.COM  */
114412198SEiji.Ota@Sun.COM int
rdsv3_send_pong(struct rdsv3_connection * conn,uint16_be_t dport)114512198SEiji.Ota@Sun.COM rdsv3_send_pong(struct rdsv3_connection *conn, uint16_be_t dport)
114612198SEiji.Ota@Sun.COM {
114712198SEiji.Ota@Sun.COM 	struct rdsv3_message *rm;
114812198SEiji.Ota@Sun.COM 	int ret = 0;
114912198SEiji.Ota@Sun.COM 
115012198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_pong", "Enter(conn: %p)", conn);
115112198SEiji.Ota@Sun.COM 
115212198SEiji.Ota@Sun.COM 	rm = rdsv3_message_alloc(0, KM_NOSLEEP);
115312676SEiji.Ota@Sun.COM 	if (!rm) {
115412198SEiji.Ota@Sun.COM 		ret = -ENOMEM;
115512198SEiji.Ota@Sun.COM 		goto out;
115612198SEiji.Ota@Sun.COM 	}
115712198SEiji.Ota@Sun.COM 
115812198SEiji.Ota@Sun.COM 	rm->m_daddr = conn->c_faddr;
115912198SEiji.Ota@Sun.COM 
116012198SEiji.Ota@Sun.COM 	/*
116112198SEiji.Ota@Sun.COM 	 * If the connection is down, trigger a connect. We may
116212198SEiji.Ota@Sun.COM 	 * have scheduled a delayed reconnect however - in this case
116312198SEiji.Ota@Sun.COM 	 * we should not interfere.
116412198SEiji.Ota@Sun.COM 	 */
116512198SEiji.Ota@Sun.COM 	if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN &&
116612198SEiji.Ota@Sun.COM 	    !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags))
116712198SEiji.Ota@Sun.COM 		rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0);
116812198SEiji.Ota@Sun.COM 
116912198SEiji.Ota@Sun.COM 	ret = rdsv3_cong_wait(conn->c_fcong, dport, 1, NULL);
117012198SEiji.Ota@Sun.COM 	if (ret)
117112198SEiji.Ota@Sun.COM 		goto out;
117212198SEiji.Ota@Sun.COM 
117312198SEiji.Ota@Sun.COM 	mutex_enter(&conn->c_lock);
117412198SEiji.Ota@Sun.COM 	list_insert_tail(&conn->c_send_queue, rm);
117512198SEiji.Ota@Sun.COM 	set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
117612198SEiji.Ota@Sun.COM 	rdsv3_message_addref(rm);
117712198SEiji.Ota@Sun.COM 	rm->m_inc.i_conn = conn;
117812198SEiji.Ota@Sun.COM 
117912198SEiji.Ota@Sun.COM 	rdsv3_message_populate_header(&rm->m_inc.i_hdr, 0, dport,
118012198SEiji.Ota@Sun.COM 	    conn->c_next_tx_seq);
118112198SEiji.Ota@Sun.COM 	conn->c_next_tx_seq++;
118212198SEiji.Ota@Sun.COM 	mutex_exit(&conn->c_lock);
118312198SEiji.Ota@Sun.COM 
118412198SEiji.Ota@Sun.COM 	rdsv3_stats_inc(s_send_queued);
118512198SEiji.Ota@Sun.COM 	rdsv3_stats_inc(s_send_pong);
118612198SEiji.Ota@Sun.COM 
118712676SEiji.Ota@Sun.COM 	if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags))
118812676SEiji.Ota@Sun.COM 		(void) rdsv3_send_xmit(conn);
118912676SEiji.Ota@Sun.COM 
119012198SEiji.Ota@Sun.COM 	rdsv3_message_put(rm);
119112198SEiji.Ota@Sun.COM 
119212198SEiji.Ota@Sun.COM 	RDSV3_DPRINTF4("rdsv3_send_pong", "Return(conn: %p)", conn);
119312198SEiji.Ota@Sun.COM 	return (0);
119412198SEiji.Ota@Sun.COM 
119512198SEiji.Ota@Sun.COM out:
119612198SEiji.Ota@Sun.COM 	if (rm)
119712198SEiji.Ota@Sun.COM 		rdsv3_message_put(rm);
119812198SEiji.Ota@Sun.COM 	return (ret);
119912198SEiji.Ota@Sun.COM }
1200