1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
22 /* All Rights Reserved */
23
24 /*
25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
26 * Use is subject to license terms.
27 */
28
29 #include <sys/types.h>
30 #include <sys/param.h>
31 #include <sys/thread.h>
32 #include <sys/sysmacros.h>
33 #include <sys/stropts.h>
34 #include <sys/stream.h>
35 #include <sys/strsubr.h>
36 #include <sys/strsun.h>
37 #include <sys/conf.h>
38 #include <sys/debug.h>
39 #include <sys/cmn_err.h>
40 #include <sys/kmem.h>
41 #include <sys/atomic.h>
42 #include <sys/errno.h>
43 #include <sys/vtrace.h>
44 #include <sys/ftrace.h>
45 #include <sys/ontrap.h>
46 #include <sys/multidata.h>
47 #include <sys/multidata_impl.h>
48 #include <sys/sdt.h>
49 #include <sys/strft.h>
50
51 #ifdef DEBUG
52 #include <sys/kmem_impl.h>
53 #endif
54
55 /*
56 * This file contains all the STREAMS utility routines that may
57 * be used by modules and drivers.
58 */
59
60 /*
61 * STREAMS message allocator: principles of operation
62 *
63 * The streams message allocator consists of all the routines that
64 * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
65 * dupb(), freeb() and freemsg(). What follows is a high-level view
66 * of how the allocator works.
67 *
68 * Every streams message consists of one or more mblks, a dblk, and data.
69 * All mblks for all types of messages come from a common mblk_cache.
70 * The dblk and data come in several flavors, depending on how the
71 * message is allocated:
72 *
73 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
74 * fixed-size dblk/data caches. For message sizes that are multiples of
75 * PAGESIZE, dblks are allocated separately from the buffer.
76 * The associated buffer is allocated by the constructor using kmem_alloc().
77 * For all other message sizes, dblk and its associated data is allocated
78 * as a single contiguous chunk of memory.
79 * Objects in these caches consist of a dblk plus its associated data.
80 * allocb() determines the nearest-size cache by table lookup:
81 * the dblk_cache[] array provides the mapping from size to dblk cache.
82 *
83 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
84 * kmem_alloc()'ing a buffer for the data and supplying that
85 * buffer to gesballoc(), described below.
86 *
87 * (3) The four flavors of [d]esballoc[a] are all implemented by a
88 * common routine, gesballoc() ("generic esballoc"). gesballoc()
89 * allocates a dblk from the global dblk_esb_cache and sets db_base,
90 * db_lim and db_frtnp to describe the caller-supplied buffer.
91 *
92 * While there are several routines to allocate messages, there is only
93 * one routine to free messages: freeb(). freeb() simply invokes the
94 * dblk's free method, dbp->db_free(), which is set at allocation time.
95 *
96 * dupb() creates a new reference to a message by allocating a new mblk,
97 * incrementing the dblk reference count and setting the dblk's free
98 * method to dblk_decref(). The dblk's original free method is retained
99 * in db_lastfree. dblk_decref() decrements the reference count on each
100 * freeb(). If this is not the last reference it just frees the mblk;
101 * if this *is* the last reference, it restores db_free to db_lastfree,
102 * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
103 *
104 * The implementation makes aggressive use of kmem object caching for
105 * maximum performance. This makes the code simple and compact, but
106 * also a bit abstruse in some places. The invariants that constitute a
107 * message's constructed state, described below, are more subtle than usual.
108 *
109 * Every dblk has an "attached mblk" as part of its constructed state.
110 * The mblk is allocated by the dblk's constructor and remains attached
111 * until the message is either dup'ed or pulled up. In the dupb() case
112 * the mblk association doesn't matter until the last free, at which time
113 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects
114 * the mblk association because it swaps the leading mblks of two messages,
115 * so it is responsible for swapping their db_mblk pointers accordingly.
116 * From a constructed-state viewpoint it doesn't matter that a dblk's
117 * attached mblk can change while the message is allocated; all that
118 * matters is that the dblk has *some* attached mblk when it's freed.
119 *
120 * The sizes of the allocb() small-message caches are not magical.
121 * They represent a good trade-off between internal and external
122 * fragmentation for current workloads. They should be reevaluated
123 * periodically, especially if allocations larger than DBLK_MAX_CACHE
124 * become common. We use 64-byte alignment so that dblks don't
125 * straddle cache lines unnecessarily.
126 */
127 #define DBLK_MAX_CACHE 73728
128 #define DBLK_CACHE_ALIGN 64
129 #define DBLK_MIN_SIZE 8
130 #define DBLK_SIZE_SHIFT 3
131
132 #ifdef _BIG_ENDIAN
133 #define DBLK_RTFU_SHIFT(field) \
134 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
135 #else
136 #define DBLK_RTFU_SHIFT(field) \
137 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
138 #endif
139
140 #define DBLK_RTFU(ref, type, flags, uioflag) \
141 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
142 ((type) << DBLK_RTFU_SHIFT(db_type)) | \
143 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
144 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
145 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
146 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref))
147 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band))
148
149 static size_t dblk_sizes[] = {
150 #ifdef _LP64
151 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
152 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
153 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
154 #else
155 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
156 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
157 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
158 #endif
159 DBLK_MAX_CACHE, 0
160 };
161
162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
163 static struct kmem_cache *mblk_cache;
164 static struct kmem_cache *dblk_esb_cache;
165 static struct kmem_cache *fthdr_cache;
166 static struct kmem_cache *ftblk_cache;
167
168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
169 static mblk_t *allocb_oversize(size_t size, int flags);
170 static int allocb_tryhard_fails;
171 static void frnop_func(void *arg);
172 frtn_t frnop = { frnop_func };
173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
174
175 static boolean_t rwnext_enter(queue_t *qp);
176 static void rwnext_exit(queue_t *qp);
177
178 /*
179 * Patchable mblk/dblk kmem_cache flags.
180 */
181 int dblk_kmem_flags = 0;
182 int mblk_kmem_flags = 0;
183
184 static int
dblk_constructor(void * buf,void * cdrarg,int kmflags)185 dblk_constructor(void *buf, void *cdrarg, int kmflags)
186 {
187 dblk_t *dbp = buf;
188 ssize_t msg_size = (ssize_t)cdrarg;
189 size_t index;
190
191 ASSERT(msg_size != 0);
192
193 index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
194
195 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
196
197 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
198 return (-1);
199 if ((msg_size & PAGEOFFSET) == 0) {
200 dbp->db_base = kmem_alloc(msg_size, kmflags);
201 if (dbp->db_base == NULL) {
202 kmem_cache_free(mblk_cache, dbp->db_mblk);
203 return (-1);
204 }
205 } else {
206 dbp->db_base = (unsigned char *)&dbp[1];
207 }
208
209 dbp->db_mblk->b_datap = dbp;
210 dbp->db_cache = dblk_cache[index];
211 dbp->db_lim = dbp->db_base + msg_size;
212 dbp->db_free = dbp->db_lastfree = dblk_lastfree;
213 dbp->db_frtnp = NULL;
214 dbp->db_fthdr = NULL;
215 dbp->db_credp = NULL;
216 dbp->db_cpid = -1;
217 dbp->db_struioflag = 0;
218 dbp->db_struioun.cksum.flags = 0;
219 return (0);
220 }
221
222 /*ARGSUSED*/
223 static int
dblk_esb_constructor(void * buf,void * cdrarg,int kmflags)224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
225 {
226 dblk_t *dbp = buf;
227
228 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
229 return (-1);
230 dbp->db_mblk->b_datap = dbp;
231 dbp->db_cache = dblk_esb_cache;
232 dbp->db_fthdr = NULL;
233 dbp->db_credp = NULL;
234 dbp->db_cpid = -1;
235 dbp->db_struioflag = 0;
236 dbp->db_struioun.cksum.flags = 0;
237 return (0);
238 }
239
240 static int
bcache_dblk_constructor(void * buf,void * cdrarg,int kmflags)241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
242 {
243 dblk_t *dbp = buf;
244 bcache_t *bcp = cdrarg;
245
246 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
247 return (-1);
248
249 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
250 if (dbp->db_base == NULL) {
251 kmem_cache_free(mblk_cache, dbp->db_mblk);
252 return (-1);
253 }
254
255 dbp->db_mblk->b_datap = dbp;
256 dbp->db_cache = (void *)bcp;
257 dbp->db_lim = dbp->db_base + bcp->size;
258 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
259 dbp->db_frtnp = NULL;
260 dbp->db_fthdr = NULL;
261 dbp->db_credp = NULL;
262 dbp->db_cpid = -1;
263 dbp->db_struioflag = 0;
264 dbp->db_struioun.cksum.flags = 0;
265 return (0);
266 }
267
268 /*ARGSUSED*/
269 static void
dblk_destructor(void * buf,void * cdrarg)270 dblk_destructor(void *buf, void *cdrarg)
271 {
272 dblk_t *dbp = buf;
273 ssize_t msg_size = (ssize_t)cdrarg;
274
275 ASSERT(dbp->db_mblk->b_datap == dbp);
276 ASSERT(msg_size != 0);
277 ASSERT(dbp->db_struioflag == 0);
278 ASSERT(dbp->db_struioun.cksum.flags == 0);
279
280 if ((msg_size & PAGEOFFSET) == 0) {
281 kmem_free(dbp->db_base, msg_size);
282 }
283
284 kmem_cache_free(mblk_cache, dbp->db_mblk);
285 }
286
287 static void
bcache_dblk_destructor(void * buf,void * cdrarg)288 bcache_dblk_destructor(void *buf, void *cdrarg)
289 {
290 dblk_t *dbp = buf;
291 bcache_t *bcp = cdrarg;
292
293 kmem_cache_free(bcp->buffer_cache, dbp->db_base);
294
295 ASSERT(dbp->db_mblk->b_datap == dbp);
296 ASSERT(dbp->db_struioflag == 0);
297 ASSERT(dbp->db_struioun.cksum.flags == 0);
298
299 kmem_cache_free(mblk_cache, dbp->db_mblk);
300 }
301
302 /* ARGSUSED */
303 static int
ftblk_constructor(void * buf,void * cdrarg,int kmflags)304 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
305 {
306 ftblk_t *fbp = buf;
307 int i;
308
309 bzero(fbp, sizeof (ftblk_t));
310 if (str_ftstack != 0) {
311 for (i = 0; i < FTBLK_EVNTS; i++)
312 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
313 }
314
315 return (0);
316 }
317
318 /* ARGSUSED */
319 static void
ftblk_destructor(void * buf,void * cdrarg)320 ftblk_destructor(void *buf, void *cdrarg)
321 {
322 ftblk_t *fbp = buf;
323 int i;
324
325 if (str_ftstack != 0) {
326 for (i = 0; i < FTBLK_EVNTS; i++) {
327 if (fbp->ev[i].stk != NULL) {
328 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
329 fbp->ev[i].stk = NULL;
330 }
331 }
332 }
333 }
334
335 static int
fthdr_constructor(void * buf,void * cdrarg,int kmflags)336 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
337 {
338 fthdr_t *fhp = buf;
339
340 return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
341 }
342
343 static void
fthdr_destructor(void * buf,void * cdrarg)344 fthdr_destructor(void *buf, void *cdrarg)
345 {
346 fthdr_t *fhp = buf;
347
348 ftblk_destructor(&fhp->first, cdrarg);
349 }
350
351 void
streams_msg_init(void)352 streams_msg_init(void)
353 {
354 char name[40];
355 size_t size;
356 size_t lastsize = DBLK_MIN_SIZE;
357 size_t *sizep;
358 struct kmem_cache *cp;
359 size_t tot_size;
360 int offset;
361
362 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
363 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
364
365 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
366
367 if ((offset = (size & PAGEOFFSET)) != 0) {
368 /*
369 * We are in the middle of a page, dblk should
370 * be allocated on the same page
371 */
372 tot_size = size + sizeof (dblk_t);
373 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
374 < PAGESIZE);
375 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
376
377 } else {
378
379 /*
380 * buf size is multiple of page size, dblk and
381 * buffer are allocated separately.
382 */
383
384 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
385 tot_size = sizeof (dblk_t);
386 }
387
388 (void) sprintf(name, "streams_dblk_%ld", size);
389 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
390 dblk_constructor, dblk_destructor, NULL, (void *)(size),
391 NULL, dblk_kmem_flags);
392
393 while (lastsize <= size) {
394 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
395 lastsize += DBLK_MIN_SIZE;
396 }
397 }
398
399 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
400 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
401 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
402 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
403 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
404 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
405 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
406
407 /* Initialize Multidata caches */
408 mmd_init();
409
410 /* initialize throttling queue for esballoc */
411 esballoc_queue_init();
412 }
413
414 /*ARGSUSED*/
415 mblk_t *
allocb(size_t size,uint_t pri)416 allocb(size_t size, uint_t pri)
417 {
418 dblk_t *dbp;
419 mblk_t *mp;
420 size_t index;
421
422 index = (size - 1) >> DBLK_SIZE_SHIFT;
423
424 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
425 if (size != 0) {
426 mp = allocb_oversize(size, KM_NOSLEEP);
427 goto out;
428 }
429 index = 0;
430 }
431
432 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
433 mp = NULL;
434 goto out;
435 }
436
437 mp = dbp->db_mblk;
438 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
439 mp->b_next = mp->b_prev = mp->b_cont = NULL;
440 mp->b_rptr = mp->b_wptr = dbp->db_base;
441 mp->b_queue = NULL;
442 MBLK_BAND_FLAG_WORD(mp) = 0;
443 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
444 out:
445 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
446
447 return (mp);
448 }
449
450 /*
451 * Allocate an mblk taking db_credp and db_cpid from the template.
452 * Allow the cred to be NULL.
453 */
454 mblk_t *
allocb_tmpl(size_t size,const mblk_t * tmpl)455 allocb_tmpl(size_t size, const mblk_t *tmpl)
456 {
457 mblk_t *mp = allocb(size, 0);
458
459 if (mp != NULL) {
460 dblk_t *src = tmpl->b_datap;
461 dblk_t *dst = mp->b_datap;
462 cred_t *cr;
463 pid_t cpid;
464
465 cr = msg_getcred(tmpl, &cpid);
466 if (cr != NULL)
467 crhold(dst->db_credp = cr);
468 dst->db_cpid = cpid;
469 dst->db_type = src->db_type;
470 }
471 return (mp);
472 }
473
474 mblk_t *
allocb_cred(size_t size,cred_t * cr,pid_t cpid)475 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
476 {
477 mblk_t *mp = allocb(size, 0);
478
479 ASSERT(cr != NULL);
480 if (mp != NULL) {
481 dblk_t *dbp = mp->b_datap;
482
483 crhold(dbp->db_credp = cr);
484 dbp->db_cpid = cpid;
485 }
486 return (mp);
487 }
488
489 mblk_t *
allocb_cred_wait(size_t size,uint_t flags,int * error,cred_t * cr,pid_t cpid)490 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
491 {
492 mblk_t *mp = allocb_wait(size, 0, flags, error);
493
494 ASSERT(cr != NULL);
495 if (mp != NULL) {
496 dblk_t *dbp = mp->b_datap;
497
498 crhold(dbp->db_credp = cr);
499 dbp->db_cpid = cpid;
500 }
501
502 return (mp);
503 }
504
505 /*
506 * Extract the db_cred (and optionally db_cpid) from a message.
507 * We find the first mblk which has a non-NULL db_cred and use that.
508 * If none found we return NULL.
509 * Does NOT get a hold on the cred.
510 */
511 cred_t *
msg_getcred(const mblk_t * mp,pid_t * cpidp)512 msg_getcred(const mblk_t *mp, pid_t *cpidp)
513 {
514 cred_t *cr = NULL;
515 cred_t *cr2;
516 mblk_t *mp2;
517
518 while (mp != NULL) {
519 dblk_t *dbp = mp->b_datap;
520
521 cr = dbp->db_credp;
522 if (cr == NULL) {
523 mp = mp->b_cont;
524 continue;
525 }
526 if (cpidp != NULL)
527 *cpidp = dbp->db_cpid;
528
529 #ifdef DEBUG
530 /*
531 * Normally there should at most one db_credp in a message.
532 * But if there are multiple (as in the case of some M_IOC*
533 * and some internal messages in TCP/IP bind logic) then
534 * they must be identical in the normal case.
535 * However, a socket can be shared between different uids
536 * in which case data queued in TCP would be from different
537 * creds. Thus we can only assert for the zoneid being the
538 * same. Due to Multi-level Level Ports for TX, some
539 * cred_t can have a NULL cr_zone, and we skip the comparison
540 * in that case.
541 */
542 mp2 = mp->b_cont;
543 while (mp2 != NULL) {
544 cr2 = DB_CRED(mp2);
545 if (cr2 != NULL) {
546 DTRACE_PROBE2(msg__getcred,
547 cred_t *, cr, cred_t *, cr2);
548 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
549 crgetzone(cr) == NULL ||
550 crgetzone(cr2) == NULL);
551 }
552 mp2 = mp2->b_cont;
553 }
554 #endif
555 return (cr);
556 }
557 if (cpidp != NULL)
558 *cpidp = NOPID;
559 return (NULL);
560 }
561
562 /*
563 * Variant of msg_getcred which, when a cred is found
564 * 1. Returns with a hold on the cred
565 * 2. Clears the first cred in the mblk.
566 * This is more efficient to use than a msg_getcred() + crhold() when
567 * the message is freed after the cred has been extracted.
568 *
569 * The caller is responsible for ensuring that there is no other reference
570 * on the message since db_credp can not be cleared when there are other
571 * references.
572 */
573 cred_t *
msg_extractcred(mblk_t * mp,pid_t * cpidp)574 msg_extractcred(mblk_t *mp, pid_t *cpidp)
575 {
576 cred_t *cr = NULL;
577 cred_t *cr2;
578 mblk_t *mp2;
579
580 while (mp != NULL) {
581 dblk_t *dbp = mp->b_datap;
582
583 cr = dbp->db_credp;
584 if (cr == NULL) {
585 mp = mp->b_cont;
586 continue;
587 }
588 ASSERT(dbp->db_ref == 1);
589 dbp->db_credp = NULL;
590 if (cpidp != NULL)
591 *cpidp = dbp->db_cpid;
592 #ifdef DEBUG
593 /*
594 * Normally there should at most one db_credp in a message.
595 * But if there are multiple (as in the case of some M_IOC*
596 * and some internal messages in TCP/IP bind logic) then
597 * they must be identical in the normal case.
598 * However, a socket can be shared between different uids
599 * in which case data queued in TCP would be from different
600 * creds. Thus we can only assert for the zoneid being the
601 * same. Due to Multi-level Level Ports for TX, some
602 * cred_t can have a NULL cr_zone, and we skip the comparison
603 * in that case.
604 */
605 mp2 = mp->b_cont;
606 while (mp2 != NULL) {
607 cr2 = DB_CRED(mp2);
608 if (cr2 != NULL) {
609 DTRACE_PROBE2(msg__extractcred,
610 cred_t *, cr, cred_t *, cr2);
611 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
612 crgetzone(cr) == NULL ||
613 crgetzone(cr2) == NULL);
614 }
615 mp2 = mp2->b_cont;
616 }
617 #endif
618 return (cr);
619 }
620 return (NULL);
621 }
622 /*
623 * Get the label for a message. Uses the first mblk in the message
624 * which has a non-NULL db_credp.
625 * Returns NULL if there is no credp.
626 */
627 extern struct ts_label_s *
msg_getlabel(const mblk_t * mp)628 msg_getlabel(const mblk_t *mp)
629 {
630 cred_t *cr = msg_getcred(mp, NULL);
631
632 if (cr == NULL)
633 return (NULL);
634
635 return (crgetlabel(cr));
636 }
637
638 void
freeb(mblk_t * mp)639 freeb(mblk_t *mp)
640 {
641 dblk_t *dbp = mp->b_datap;
642
643 ASSERT(dbp->db_ref > 0);
644 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
645 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
646
647 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
648
649 dbp->db_free(mp, dbp);
650 }
651
652 void
freemsg(mblk_t * mp)653 freemsg(mblk_t *mp)
654 {
655 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
656 while (mp) {
657 dblk_t *dbp = mp->b_datap;
658 mblk_t *mp_cont = mp->b_cont;
659
660 ASSERT(dbp->db_ref > 0);
661 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
662
663 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
664
665 dbp->db_free(mp, dbp);
666 mp = mp_cont;
667 }
668 }
669
670 /*
671 * Reallocate a block for another use. Try hard to use the old block.
672 * If the old data is wanted (copy), leave b_wptr at the end of the data,
673 * otherwise return b_wptr = b_rptr.
674 *
675 * This routine is private and unstable.
676 */
677 mblk_t *
reallocb(mblk_t * mp,size_t size,uint_t copy)678 reallocb(mblk_t *mp, size_t size, uint_t copy)
679 {
680 mblk_t *mp1;
681 unsigned char *old_rptr;
682 ptrdiff_t cur_size;
683
684 if (mp == NULL)
685 return (allocb(size, BPRI_HI));
686
687 cur_size = mp->b_wptr - mp->b_rptr;
688 old_rptr = mp->b_rptr;
689
690 ASSERT(mp->b_datap->db_ref != 0);
691
692 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
693 /*
694 * If the data is wanted and it will fit where it is, no
695 * work is required.
696 */
697 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
698 return (mp);
699
700 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
701 mp1 = mp;
702 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
703 /* XXX other mp state could be copied too, db_flags ... ? */
704 mp1->b_cont = mp->b_cont;
705 } else {
706 return (NULL);
707 }
708
709 if (copy) {
710 bcopy(old_rptr, mp1->b_rptr, cur_size);
711 mp1->b_wptr = mp1->b_rptr + cur_size;
712 }
713
714 if (mp != mp1)
715 freeb(mp);
716
717 return (mp1);
718 }
719
720 static void
dblk_lastfree(mblk_t * mp,dblk_t * dbp)721 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
722 {
723 ASSERT(dbp->db_mblk == mp);
724 if (dbp->db_fthdr != NULL)
725 str_ftfree(dbp);
726
727 /* set credp and projid to be 'unspecified' before returning to cache */
728 if (dbp->db_credp != NULL) {
729 crfree(dbp->db_credp);
730 dbp->db_credp = NULL;
731 }
732 dbp->db_cpid = -1;
733
734 /* Reset the struioflag and the checksum flag fields */
735 dbp->db_struioflag = 0;
736 dbp->db_struioun.cksum.flags = 0;
737
738 /* and the COOKED and/or UIOA flag(s) */
739 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
740
741 kmem_cache_free(dbp->db_cache, dbp);
742 }
743
744 static void
dblk_decref(mblk_t * mp,dblk_t * dbp)745 dblk_decref(mblk_t *mp, dblk_t *dbp)
746 {
747 if (dbp->db_ref != 1) {
748 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
749 -(1 << DBLK_RTFU_SHIFT(db_ref)));
750 /*
751 * atomic_add_32_nv() just decremented db_ref, so we no longer
752 * have a reference to the dblk, which means another thread
753 * could free it. Therefore we cannot examine the dblk to
754 * determine whether ours was the last reference. Instead,
755 * we extract the new and minimum reference counts from rtfu.
756 * Note that all we're really saying is "if (ref != refmin)".
757 */
758 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
759 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
760 kmem_cache_free(mblk_cache, mp);
761 return;
762 }
763 }
764 dbp->db_mblk = mp;
765 dbp->db_free = dbp->db_lastfree;
766 dbp->db_lastfree(mp, dbp);
767 }
768
769 mblk_t *
dupb(mblk_t * mp)770 dupb(mblk_t *mp)
771 {
772 dblk_t *dbp = mp->b_datap;
773 mblk_t *new_mp;
774 uint32_t oldrtfu, newrtfu;
775
776 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
777 goto out;
778
779 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
780 new_mp->b_rptr = mp->b_rptr;
781 new_mp->b_wptr = mp->b_wptr;
782 new_mp->b_datap = dbp;
783 new_mp->b_queue = NULL;
784 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
785
786 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
787
788 dbp->db_free = dblk_decref;
789 do {
790 ASSERT(dbp->db_ref > 0);
791 oldrtfu = DBLK_RTFU_WORD(dbp);
792 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
793 /*
794 * If db_ref is maxed out we can't dup this message anymore.
795 */
796 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
797 kmem_cache_free(mblk_cache, new_mp);
798 new_mp = NULL;
799 goto out;
800 }
801 } while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
802
803 out:
804 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
805 return (new_mp);
806 }
807
808 static void
dblk_lastfree_desb(mblk_t * mp,dblk_t * dbp)809 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
810 {
811 frtn_t *frp = dbp->db_frtnp;
812
813 ASSERT(dbp->db_mblk == mp);
814 frp->free_func(frp->free_arg);
815 if (dbp->db_fthdr != NULL)
816 str_ftfree(dbp);
817
818 /* set credp and projid to be 'unspecified' before returning to cache */
819 if (dbp->db_credp != NULL) {
820 crfree(dbp->db_credp);
821 dbp->db_credp = NULL;
822 }
823 dbp->db_cpid = -1;
824 dbp->db_struioflag = 0;
825 dbp->db_struioun.cksum.flags = 0;
826
827 kmem_cache_free(dbp->db_cache, dbp);
828 }
829
830 /*ARGSUSED*/
831 static void
frnop_func(void * arg)832 frnop_func(void *arg)
833 {
834 }
835
836 /*
837 * Generic esballoc used to implement the four flavors: [d]esballoc[a].
838 */
839 static mblk_t *
gesballoc(unsigned char * base,size_t size,uint32_t db_rtfu,frtn_t * frp,void (* lastfree)(mblk_t *,dblk_t *),int kmflags)840 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
841 void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
842 {
843 dblk_t *dbp;
844 mblk_t *mp;
845
846 ASSERT(base != NULL && frp != NULL);
847
848 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
849 mp = NULL;
850 goto out;
851 }
852
853 mp = dbp->db_mblk;
854 dbp->db_base = base;
855 dbp->db_lim = base + size;
856 dbp->db_free = dbp->db_lastfree = lastfree;
857 dbp->db_frtnp = frp;
858 DBLK_RTFU_WORD(dbp) = db_rtfu;
859 mp->b_next = mp->b_prev = mp->b_cont = NULL;
860 mp->b_rptr = mp->b_wptr = base;
861 mp->b_queue = NULL;
862 MBLK_BAND_FLAG_WORD(mp) = 0;
863
864 out:
865 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
866 return (mp);
867 }
868
869 /*ARGSUSED*/
870 mblk_t *
esballoc(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)871 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
872 {
873 mblk_t *mp;
874
875 /*
876 * Note that this is structured to allow the common case (i.e.
877 * STREAMS flowtracing disabled) to call gesballoc() with tail
878 * call optimization.
879 */
880 if (!str_ftnever) {
881 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
882 frp, freebs_enqueue, KM_NOSLEEP);
883
884 if (mp != NULL)
885 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
886 return (mp);
887 }
888
889 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
890 frp, freebs_enqueue, KM_NOSLEEP));
891 }
892
893 /*
894 * Same as esballoc() but sleeps waiting for memory.
895 */
896 /*ARGSUSED*/
897 mblk_t *
esballoc_wait(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)898 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
899 {
900 mblk_t *mp;
901
902 /*
903 * Note that this is structured to allow the common case (i.e.
904 * STREAMS flowtracing disabled) to call gesballoc() with tail
905 * call optimization.
906 */
907 if (!str_ftnever) {
908 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
909 frp, freebs_enqueue, KM_SLEEP);
910
911 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
912 return (mp);
913 }
914
915 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
916 frp, freebs_enqueue, KM_SLEEP));
917 }
918
919 /*ARGSUSED*/
920 mblk_t *
desballoc(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)921 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
922 {
923 mblk_t *mp;
924
925 /*
926 * Note that this is structured to allow the common case (i.e.
927 * STREAMS flowtracing disabled) to call gesballoc() with tail
928 * call optimization.
929 */
930 if (!str_ftnever) {
931 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
932 frp, dblk_lastfree_desb, KM_NOSLEEP);
933
934 if (mp != NULL)
935 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
936 return (mp);
937 }
938
939 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
940 frp, dblk_lastfree_desb, KM_NOSLEEP));
941 }
942
943 /*ARGSUSED*/
944 mblk_t *
esballoca(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)945 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
946 {
947 mblk_t *mp;
948
949 /*
950 * Note that this is structured to allow the common case (i.e.
951 * STREAMS flowtracing disabled) to call gesballoc() with tail
952 * call optimization.
953 */
954 if (!str_ftnever) {
955 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
956 frp, freebs_enqueue, KM_NOSLEEP);
957
958 if (mp != NULL)
959 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
960 return (mp);
961 }
962
963 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
964 frp, freebs_enqueue, KM_NOSLEEP));
965 }
966
967 /*ARGSUSED*/
968 mblk_t *
desballoca(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)969 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
970 {
971 mblk_t *mp;
972
973 /*
974 * Note that this is structured to allow the common case (i.e.
975 * STREAMS flowtracing disabled) to call gesballoc() with tail
976 * call optimization.
977 */
978 if (!str_ftnever) {
979 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
980 frp, dblk_lastfree_desb, KM_NOSLEEP);
981
982 if (mp != NULL)
983 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
984 return (mp);
985 }
986
987 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
988 frp, dblk_lastfree_desb, KM_NOSLEEP));
989 }
990
991 static void
bcache_dblk_lastfree(mblk_t * mp,dblk_t * dbp)992 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
993 {
994 bcache_t *bcp = dbp->db_cache;
995
996 ASSERT(dbp->db_mblk == mp);
997 if (dbp->db_fthdr != NULL)
998 str_ftfree(dbp);
999
1000 /* set credp and projid to be 'unspecified' before returning to cache */
1001 if (dbp->db_credp != NULL) {
1002 crfree(dbp->db_credp);
1003 dbp->db_credp = NULL;
1004 }
1005 dbp->db_cpid = -1;
1006 dbp->db_struioflag = 0;
1007 dbp->db_struioun.cksum.flags = 0;
1008
1009 mutex_enter(&bcp->mutex);
1010 kmem_cache_free(bcp->dblk_cache, dbp);
1011 bcp->alloc--;
1012
1013 if (bcp->alloc == 0 && bcp->destroy != 0) {
1014 kmem_cache_destroy(bcp->dblk_cache);
1015 kmem_cache_destroy(bcp->buffer_cache);
1016 mutex_exit(&bcp->mutex);
1017 mutex_destroy(&bcp->mutex);
1018 kmem_free(bcp, sizeof (bcache_t));
1019 } else {
1020 mutex_exit(&bcp->mutex);
1021 }
1022 }
1023
1024 bcache_t *
bcache_create(char * name,size_t size,uint_t align)1025 bcache_create(char *name, size_t size, uint_t align)
1026 {
1027 bcache_t *bcp;
1028 char buffer[255];
1029
1030 ASSERT((align & (align - 1)) == 0);
1031
1032 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1033 return (NULL);
1034
1035 bcp->size = size;
1036 bcp->align = align;
1037 bcp->alloc = 0;
1038 bcp->destroy = 0;
1039
1040 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1041
1042 (void) sprintf(buffer, "%s_buffer_cache", name);
1043 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1044 NULL, NULL, NULL, 0);
1045 (void) sprintf(buffer, "%s_dblk_cache", name);
1046 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1047 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1048 NULL, (void *)bcp, NULL, 0);
1049
1050 return (bcp);
1051 }
1052
1053 void
bcache_destroy(bcache_t * bcp)1054 bcache_destroy(bcache_t *bcp)
1055 {
1056 ASSERT(bcp != NULL);
1057
1058 mutex_enter(&bcp->mutex);
1059 if (bcp->alloc == 0) {
1060 kmem_cache_destroy(bcp->dblk_cache);
1061 kmem_cache_destroy(bcp->buffer_cache);
1062 mutex_exit(&bcp->mutex);
1063 mutex_destroy(&bcp->mutex);
1064 kmem_free(bcp, sizeof (bcache_t));
1065 } else {
1066 bcp->destroy++;
1067 mutex_exit(&bcp->mutex);
1068 }
1069 }
1070
1071 /*ARGSUSED*/
1072 mblk_t *
bcache_allocb(bcache_t * bcp,uint_t pri)1073 bcache_allocb(bcache_t *bcp, uint_t pri)
1074 {
1075 dblk_t *dbp;
1076 mblk_t *mp = NULL;
1077
1078 ASSERT(bcp != NULL);
1079
1080 mutex_enter(&bcp->mutex);
1081 if (bcp->destroy != 0) {
1082 mutex_exit(&bcp->mutex);
1083 goto out;
1084 }
1085
1086 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1087 mutex_exit(&bcp->mutex);
1088 goto out;
1089 }
1090 bcp->alloc++;
1091 mutex_exit(&bcp->mutex);
1092
1093 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1094
1095 mp = dbp->db_mblk;
1096 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1097 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1098 mp->b_rptr = mp->b_wptr = dbp->db_base;
1099 mp->b_queue = NULL;
1100 MBLK_BAND_FLAG_WORD(mp) = 0;
1101 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1102 out:
1103 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1104
1105 return (mp);
1106 }
1107
1108 static void
dblk_lastfree_oversize(mblk_t * mp,dblk_t * dbp)1109 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1110 {
1111 ASSERT(dbp->db_mblk == mp);
1112 if (dbp->db_fthdr != NULL)
1113 str_ftfree(dbp);
1114
1115 /* set credp and projid to be 'unspecified' before returning to cache */
1116 if (dbp->db_credp != NULL) {
1117 crfree(dbp->db_credp);
1118 dbp->db_credp = NULL;
1119 }
1120 dbp->db_cpid = -1;
1121 dbp->db_struioflag = 0;
1122 dbp->db_struioun.cksum.flags = 0;
1123
1124 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1125 kmem_cache_free(dbp->db_cache, dbp);
1126 }
1127
1128 static mblk_t *
allocb_oversize(size_t size,int kmflags)1129 allocb_oversize(size_t size, int kmflags)
1130 {
1131 mblk_t *mp;
1132 void *buf;
1133
1134 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1135 if ((buf = kmem_alloc(size, kmflags)) == NULL)
1136 return (NULL);
1137 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1138 &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1139 kmem_free(buf, size);
1140
1141 if (mp != NULL)
1142 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1143
1144 return (mp);
1145 }
1146
1147 mblk_t *
allocb_tryhard(size_t target_size)1148 allocb_tryhard(size_t target_size)
1149 {
1150 size_t size;
1151 mblk_t *bp;
1152
1153 for (size = target_size; size < target_size + 512;
1154 size += DBLK_CACHE_ALIGN)
1155 if ((bp = allocb(size, BPRI_HI)) != NULL)
1156 return (bp);
1157 allocb_tryhard_fails++;
1158 return (NULL);
1159 }
1160
1161 /*
1162 * This routine is consolidation private for STREAMS internal use
1163 * This routine may only be called from sync routines (i.e., not
1164 * from put or service procedures). It is located here (rather
1165 * than strsubr.c) so that we don't have to expose all of the
1166 * allocb() implementation details in header files.
1167 */
1168 mblk_t *
allocb_wait(size_t size,uint_t pri,uint_t flags,int * error)1169 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1170 {
1171 dblk_t *dbp;
1172 mblk_t *mp;
1173 size_t index;
1174
1175 index = (size -1) >> DBLK_SIZE_SHIFT;
1176
1177 if (flags & STR_NOSIG) {
1178 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1179 if (size != 0) {
1180 mp = allocb_oversize(size, KM_SLEEP);
1181 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1182 (uintptr_t)mp);
1183 return (mp);
1184 }
1185 index = 0;
1186 }
1187
1188 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1189 mp = dbp->db_mblk;
1190 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1191 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1192 mp->b_rptr = mp->b_wptr = dbp->db_base;
1193 mp->b_queue = NULL;
1194 MBLK_BAND_FLAG_WORD(mp) = 0;
1195 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1196
1197 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1198
1199 } else {
1200 while ((mp = allocb(size, pri)) == NULL) {
1201 if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1202 return (NULL);
1203 }
1204 }
1205
1206 return (mp);
1207 }
1208
1209 /*
1210 * Call function 'func' with 'arg' when a class zero block can
1211 * be allocated with priority 'pri'.
1212 */
1213 bufcall_id_t
esbbcall(uint_t pri,void (* func)(void *),void * arg)1214 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1215 {
1216 return (bufcall(1, pri, func, arg));
1217 }
1218
1219 /*
1220 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1221 * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1222 * This provides consistency for all internal allocators of ioctl.
1223 */
1224 mblk_t *
mkiocb(uint_t cmd)1225 mkiocb(uint_t cmd)
1226 {
1227 struct iocblk *ioc;
1228 mblk_t *mp;
1229
1230 /*
1231 * Allocate enough space for any of the ioctl related messages.
1232 */
1233 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1234 return (NULL);
1235
1236 bzero(mp->b_rptr, sizeof (union ioctypes));
1237
1238 /*
1239 * Set the mblk_t information and ptrs correctly.
1240 */
1241 mp->b_wptr += sizeof (struct iocblk);
1242 mp->b_datap->db_type = M_IOCTL;
1243
1244 /*
1245 * Fill in the fields.
1246 */
1247 ioc = (struct iocblk *)mp->b_rptr;
1248 ioc->ioc_cmd = cmd;
1249 ioc->ioc_cr = kcred;
1250 ioc->ioc_id = getiocseqno();
1251 ioc->ioc_flag = IOC_NATIVE;
1252 return (mp);
1253 }
1254
1255 /*
1256 * test if block of given size can be allocated with a request of
1257 * the given priority.
1258 * 'pri' is no longer used, but is retained for compatibility.
1259 */
1260 /* ARGSUSED */
1261 int
testb(size_t size,uint_t pri)1262 testb(size_t size, uint_t pri)
1263 {
1264 return ((size + sizeof (dblk_t)) <= kmem_avail());
1265 }
1266
1267 /*
1268 * Call function 'func' with argument 'arg' when there is a reasonably
1269 * good chance that a block of size 'size' can be allocated.
1270 * 'pri' is no longer used, but is retained for compatibility.
1271 */
1272 /* ARGSUSED */
1273 bufcall_id_t
bufcall(size_t size,uint_t pri,void (* func)(void *),void * arg)1274 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1275 {
1276 static long bid = 1; /* always odd to save checking for zero */
1277 bufcall_id_t bc_id;
1278 struct strbufcall *bcp;
1279
1280 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1281 return (0);
1282
1283 bcp->bc_func = func;
1284 bcp->bc_arg = arg;
1285 bcp->bc_size = size;
1286 bcp->bc_next = NULL;
1287 bcp->bc_executor = NULL;
1288
1289 mutex_enter(&strbcall_lock);
1290 /*
1291 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1292 * should be no references to bcp since it may be freed by
1293 * runbufcalls(). Since bcp_id field is returned, we save its value in
1294 * the local var.
1295 */
1296 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */
1297
1298 /*
1299 * add newly allocated stream event to existing
1300 * linked list of events.
1301 */
1302 if (strbcalls.bc_head == NULL) {
1303 strbcalls.bc_head = strbcalls.bc_tail = bcp;
1304 } else {
1305 strbcalls.bc_tail->bc_next = bcp;
1306 strbcalls.bc_tail = bcp;
1307 }
1308
1309 cv_signal(&strbcall_cv);
1310 mutex_exit(&strbcall_lock);
1311 return (bc_id);
1312 }
1313
1314 /*
1315 * Cancel a bufcall request.
1316 */
1317 void
unbufcall(bufcall_id_t id)1318 unbufcall(bufcall_id_t id)
1319 {
1320 strbufcall_t *bcp, *pbcp;
1321
1322 mutex_enter(&strbcall_lock);
1323 again:
1324 pbcp = NULL;
1325 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1326 if (id == bcp->bc_id)
1327 break;
1328 pbcp = bcp;
1329 }
1330 if (bcp) {
1331 if (bcp->bc_executor != NULL) {
1332 if (bcp->bc_executor != curthread) {
1333 cv_wait(&bcall_cv, &strbcall_lock);
1334 goto again;
1335 }
1336 } else {
1337 if (pbcp)
1338 pbcp->bc_next = bcp->bc_next;
1339 else
1340 strbcalls.bc_head = bcp->bc_next;
1341 if (bcp == strbcalls.bc_tail)
1342 strbcalls.bc_tail = pbcp;
1343 kmem_free(bcp, sizeof (strbufcall_t));
1344 }
1345 }
1346 mutex_exit(&strbcall_lock);
1347 }
1348
1349 /*
1350 * Duplicate a message block by block (uses dupb), returning
1351 * a pointer to the duplicate message.
1352 * Returns a non-NULL value only if the entire message
1353 * was dup'd.
1354 */
1355 mblk_t *
dupmsg(mblk_t * bp)1356 dupmsg(mblk_t *bp)
1357 {
1358 mblk_t *head, *nbp;
1359
1360 if (!bp || !(nbp = head = dupb(bp)))
1361 return (NULL);
1362
1363 while (bp->b_cont) {
1364 if (!(nbp->b_cont = dupb(bp->b_cont))) {
1365 freemsg(head);
1366 return (NULL);
1367 }
1368 nbp = nbp->b_cont;
1369 bp = bp->b_cont;
1370 }
1371 return (head);
1372 }
1373
1374 #define DUPB_NOLOAN(bp) \
1375 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1376 copyb((bp)) : dupb((bp)))
1377
1378 mblk_t *
dupmsg_noloan(mblk_t * bp)1379 dupmsg_noloan(mblk_t *bp)
1380 {
1381 mblk_t *head, *nbp;
1382
1383 if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1384 ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1385 return (NULL);
1386
1387 while (bp->b_cont) {
1388 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1389 freemsg(head);
1390 return (NULL);
1391 }
1392 nbp = nbp->b_cont;
1393 bp = bp->b_cont;
1394 }
1395 return (head);
1396 }
1397
1398 /*
1399 * Copy data from message and data block to newly allocated message and
1400 * data block. Returns new message block pointer, or NULL if error.
1401 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1402 * as in the original even when db_base is not word aligned. (bug 1052877)
1403 */
1404 mblk_t *
copyb(mblk_t * bp)1405 copyb(mblk_t *bp)
1406 {
1407 mblk_t *nbp;
1408 dblk_t *dp, *ndp;
1409 uchar_t *base;
1410 size_t size;
1411 size_t unaligned;
1412
1413 ASSERT(bp->b_wptr >= bp->b_rptr);
1414
1415 dp = bp->b_datap;
1416 if (dp->db_fthdr != NULL)
1417 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1418
1419 /*
1420 * Special handling for Multidata message; this should be
1421 * removed once a copy-callback routine is made available.
1422 */
1423 if (dp->db_type == M_MULTIDATA) {
1424 cred_t *cr;
1425
1426 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1427 return (NULL);
1428
1429 nbp->b_flag = bp->b_flag;
1430 nbp->b_band = bp->b_band;
1431 ndp = nbp->b_datap;
1432
1433 /* See comments below on potential issues. */
1434 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1435
1436 ASSERT(ndp->db_type == dp->db_type);
1437 cr = dp->db_credp;
1438 if (cr != NULL)
1439 crhold(ndp->db_credp = cr);
1440 ndp->db_cpid = dp->db_cpid;
1441 return (nbp);
1442 }
1443
1444 size = dp->db_lim - dp->db_base;
1445 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1446 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1447 return (NULL);
1448 nbp->b_flag = bp->b_flag;
1449 nbp->b_band = bp->b_band;
1450 ndp = nbp->b_datap;
1451
1452 /*
1453 * Well, here is a potential issue. If we are trying to
1454 * trace a flow, and we copy the message, we might lose
1455 * information about where this message might have been.
1456 * So we should inherit the FT data. On the other hand,
1457 * a user might be interested only in alloc to free data.
1458 * So I guess the real answer is to provide a tunable.
1459 */
1460 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1461
1462 base = ndp->db_base + unaligned;
1463 bcopy(dp->db_base, ndp->db_base + unaligned, size);
1464
1465 nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1466 nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1467
1468 return (nbp);
1469 }
1470
1471 /*
1472 * Copy data from message to newly allocated message using new
1473 * data blocks. Returns a pointer to the new message, or NULL if error.
1474 */
1475 mblk_t *
copymsg(mblk_t * bp)1476 copymsg(mblk_t *bp)
1477 {
1478 mblk_t *head, *nbp;
1479
1480 if (!bp || !(nbp = head = copyb(bp)))
1481 return (NULL);
1482
1483 while (bp->b_cont) {
1484 if (!(nbp->b_cont = copyb(bp->b_cont))) {
1485 freemsg(head);
1486 return (NULL);
1487 }
1488 nbp = nbp->b_cont;
1489 bp = bp->b_cont;
1490 }
1491 return (head);
1492 }
1493
1494 /*
1495 * link a message block to tail of message
1496 */
1497 void
linkb(mblk_t * mp,mblk_t * bp)1498 linkb(mblk_t *mp, mblk_t *bp)
1499 {
1500 ASSERT(mp && bp);
1501
1502 for (; mp->b_cont; mp = mp->b_cont)
1503 ;
1504 mp->b_cont = bp;
1505 }
1506
1507 /*
1508 * unlink a message block from head of message
1509 * return pointer to new message.
1510 * NULL if message becomes empty.
1511 */
1512 mblk_t *
unlinkb(mblk_t * bp)1513 unlinkb(mblk_t *bp)
1514 {
1515 mblk_t *bp1;
1516
1517 bp1 = bp->b_cont;
1518 bp->b_cont = NULL;
1519 return (bp1);
1520 }
1521
1522 /*
1523 * remove a message block "bp" from message "mp"
1524 *
1525 * Return pointer to new message or NULL if no message remains.
1526 * Return -1 if bp is not found in message.
1527 */
1528 mblk_t *
rmvb(mblk_t * mp,mblk_t * bp)1529 rmvb(mblk_t *mp, mblk_t *bp)
1530 {
1531 mblk_t *tmp;
1532 mblk_t *lastp = NULL;
1533
1534 ASSERT(mp && bp);
1535 for (tmp = mp; tmp; tmp = tmp->b_cont) {
1536 if (tmp == bp) {
1537 if (lastp)
1538 lastp->b_cont = tmp->b_cont;
1539 else
1540 mp = tmp->b_cont;
1541 tmp->b_cont = NULL;
1542 return (mp);
1543 }
1544 lastp = tmp;
1545 }
1546 return ((mblk_t *)-1);
1547 }
1548
1549 /*
1550 * Concatenate and align first len bytes of common
1551 * message type. Len == -1, means concat everything.
1552 * Returns 1 on success, 0 on failure
1553 * After the pullup, mp points to the pulled up data.
1554 */
1555 int
pullupmsg(mblk_t * mp,ssize_t len)1556 pullupmsg(mblk_t *mp, ssize_t len)
1557 {
1558 mblk_t *bp, *b_cont;
1559 dblk_t *dbp;
1560 ssize_t n;
1561
1562 ASSERT(mp->b_datap->db_ref > 0);
1563 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1564
1565 /*
1566 * We won't handle Multidata message, since it contains
1567 * metadata which this function has no knowledge of; we
1568 * assert on DEBUG, and return failure otherwise.
1569 */
1570 ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1571 if (mp->b_datap->db_type == M_MULTIDATA)
1572 return (0);
1573
1574 if (len == -1) {
1575 if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1576 return (1);
1577 len = xmsgsize(mp);
1578 } else {
1579 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1580 ASSERT(first_mblk_len >= 0);
1581 /*
1582 * If the length is less than that of the first mblk,
1583 * we want to pull up the message into an aligned mblk.
1584 * Though not part of the spec, some callers assume it.
1585 */
1586 if (len <= first_mblk_len) {
1587 if (str_aligned(mp->b_rptr))
1588 return (1);
1589 len = first_mblk_len;
1590 } else if (xmsgsize(mp) < len)
1591 return (0);
1592 }
1593
1594 if ((bp = allocb_tmpl(len, mp)) == NULL)
1595 return (0);
1596
1597 dbp = bp->b_datap;
1598 *bp = *mp; /* swap mblks so bp heads the old msg... */
1599 mp->b_datap = dbp; /* ... and mp heads the new message */
1600 mp->b_datap->db_mblk = mp;
1601 bp->b_datap->db_mblk = bp;
1602 mp->b_rptr = mp->b_wptr = dbp->db_base;
1603
1604 do {
1605 ASSERT(bp->b_datap->db_ref > 0);
1606 ASSERT(bp->b_wptr >= bp->b_rptr);
1607 n = MIN(bp->b_wptr - bp->b_rptr, len);
1608 ASSERT(n >= 0); /* allow zero-length mblk_t's */
1609 if (n > 0)
1610 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1611 mp->b_wptr += n;
1612 bp->b_rptr += n;
1613 len -= n;
1614 if (bp->b_rptr != bp->b_wptr)
1615 break;
1616 b_cont = bp->b_cont;
1617 freeb(bp);
1618 bp = b_cont;
1619 } while (len && bp);
1620
1621 mp->b_cont = bp; /* tack on whatever wasn't pulled up */
1622
1623 return (1);
1624 }
1625
1626 /*
1627 * Concatenate and align at least the first len bytes of common message
1628 * type. Len == -1 means concatenate everything. The original message is
1629 * unaltered. Returns a pointer to a new message on success, otherwise
1630 * returns NULL.
1631 */
1632 mblk_t *
msgpullup(mblk_t * mp,ssize_t len)1633 msgpullup(mblk_t *mp, ssize_t len)
1634 {
1635 mblk_t *newmp;
1636 ssize_t totlen;
1637 ssize_t n;
1638
1639 /*
1640 * We won't handle Multidata message, since it contains
1641 * metadata which this function has no knowledge of; we
1642 * assert on DEBUG, and return failure otherwise.
1643 */
1644 ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1645 if (mp->b_datap->db_type == M_MULTIDATA)
1646 return (NULL);
1647
1648 totlen = xmsgsize(mp);
1649
1650 if ((len > 0) && (len > totlen))
1651 return (NULL);
1652
1653 /*
1654 * Copy all of the first msg type into one new mblk, then dupmsg
1655 * and link the rest onto this.
1656 */
1657
1658 len = totlen;
1659
1660 if ((newmp = allocb_tmpl(len, mp)) == NULL)
1661 return (NULL);
1662
1663 newmp->b_flag = mp->b_flag;
1664 newmp->b_band = mp->b_band;
1665
1666 while (len > 0) {
1667 n = mp->b_wptr - mp->b_rptr;
1668 ASSERT(n >= 0); /* allow zero-length mblk_t's */
1669 if (n > 0)
1670 bcopy(mp->b_rptr, newmp->b_wptr, n);
1671 newmp->b_wptr += n;
1672 len -= n;
1673 mp = mp->b_cont;
1674 }
1675
1676 if (mp != NULL) {
1677 newmp->b_cont = dupmsg(mp);
1678 if (newmp->b_cont == NULL) {
1679 freemsg(newmp);
1680 return (NULL);
1681 }
1682 }
1683
1684 return (newmp);
1685 }
1686
1687 /*
1688 * Trim bytes from message
1689 * len > 0, trim from head
1690 * len < 0, trim from tail
1691 * Returns 1 on success, 0 on failure.
1692 */
1693 int
adjmsg(mblk_t * mp,ssize_t len)1694 adjmsg(mblk_t *mp, ssize_t len)
1695 {
1696 mblk_t *bp;
1697 mblk_t *save_bp = NULL;
1698 mblk_t *prev_bp;
1699 mblk_t *bcont;
1700 unsigned char type;
1701 ssize_t n;
1702 int fromhead;
1703 int first;
1704
1705 ASSERT(mp != NULL);
1706 /*
1707 * We won't handle Multidata message, since it contains
1708 * metadata which this function has no knowledge of; we
1709 * assert on DEBUG, and return failure otherwise.
1710 */
1711 ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1712 if (mp->b_datap->db_type == M_MULTIDATA)
1713 return (0);
1714
1715 if (len < 0) {
1716 fromhead = 0;
1717 len = -len;
1718 } else {
1719 fromhead = 1;
1720 }
1721
1722 if (xmsgsize(mp) < len)
1723 return (0);
1724
1725 if (fromhead) {
1726 first = 1;
1727 while (len) {
1728 ASSERT(mp->b_wptr >= mp->b_rptr);
1729 n = MIN(mp->b_wptr - mp->b_rptr, len);
1730 mp->b_rptr += n;
1731 len -= n;
1732
1733 /*
1734 * If this is not the first zero length
1735 * message remove it
1736 */
1737 if (!first && (mp->b_wptr == mp->b_rptr)) {
1738 bcont = mp->b_cont;
1739 freeb(mp);
1740 mp = save_bp->b_cont = bcont;
1741 } else {
1742 save_bp = mp;
1743 mp = mp->b_cont;
1744 }
1745 first = 0;
1746 }
1747 } else {
1748 type = mp->b_datap->db_type;
1749 while (len) {
1750 bp = mp;
1751 save_bp = NULL;
1752
1753 /*
1754 * Find the last message of same type
1755 */
1756 while (bp && bp->b_datap->db_type == type) {
1757 ASSERT(bp->b_wptr >= bp->b_rptr);
1758 prev_bp = save_bp;
1759 save_bp = bp;
1760 bp = bp->b_cont;
1761 }
1762 if (save_bp == NULL)
1763 break;
1764 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1765 save_bp->b_wptr -= n;
1766 len -= n;
1767
1768 /*
1769 * If this is not the first message
1770 * and we have taken away everything
1771 * from this message, remove it
1772 */
1773
1774 if ((save_bp != mp) &&
1775 (save_bp->b_wptr == save_bp->b_rptr)) {
1776 bcont = save_bp->b_cont;
1777 freeb(save_bp);
1778 prev_bp->b_cont = bcont;
1779 }
1780 }
1781 }
1782 return (1);
1783 }
1784
1785 /*
1786 * get number of data bytes in message
1787 */
1788 size_t
msgdsize(mblk_t * bp)1789 msgdsize(mblk_t *bp)
1790 {
1791 size_t count = 0;
1792
1793 for (; bp; bp = bp->b_cont)
1794 if (bp->b_datap->db_type == M_DATA) {
1795 ASSERT(bp->b_wptr >= bp->b_rptr);
1796 count += bp->b_wptr - bp->b_rptr;
1797 }
1798 return (count);
1799 }
1800
1801 /*
1802 * Get a message off head of queue
1803 *
1804 * If queue has no buffers then mark queue
1805 * with QWANTR. (queue wants to be read by
1806 * someone when data becomes available)
1807 *
1808 * If there is something to take off then do so.
1809 * If queue falls below hi water mark turn off QFULL
1810 * flag. Decrement weighted count of queue.
1811 * Also turn off QWANTR because queue is being read.
1812 *
1813 * The queue count is maintained on a per-band basis.
1814 * Priority band 0 (normal messages) uses q_count,
1815 * q_lowat, etc. Non-zero priority bands use the
1816 * fields in their respective qband structures
1817 * (qb_count, qb_lowat, etc.) All messages appear
1818 * on the same list, linked via their b_next pointers.
1819 * q_first is the head of the list. q_count does
1820 * not reflect the size of all the messages on the
1821 * queue. It only reflects those messages in the
1822 * normal band of flow. The one exception to this
1823 * deals with high priority messages. They are in
1824 * their own conceptual "band", but are accounted
1825 * against q_count.
1826 *
1827 * If queue count is below the lo water mark and QWANTW
1828 * is set, enable the closest backq which has a service
1829 * procedure and turn off the QWANTW flag.
1830 *
1831 * getq could be built on top of rmvq, but isn't because
1832 * of performance considerations.
1833 *
1834 * A note on the use of q_count and q_mblkcnt:
1835 * q_count is the traditional byte count for messages that
1836 * have been put on a queue. Documentation tells us that
1837 * we shouldn't rely on that count, but some drivers/modules
1838 * do. What was needed, however, is a mechanism to prevent
1839 * runaway streams from consuming all of the resources,
1840 * and particularly be able to flow control zero-length
1841 * messages. q_mblkcnt is used for this purpose. It
1842 * counts the number of mblk's that are being put on
1843 * the queue. The intention here, is that each mblk should
1844 * contain one byte of data and, for the purpose of
1845 * flow-control, logically does. A queue will become
1846 * full when EITHER of these values (q_count and q_mblkcnt)
1847 * reach the highwater mark. It will clear when BOTH
1848 * of them drop below the highwater mark. And it will
1849 * backenable when BOTH of them drop below the lowwater
1850 * mark.
1851 * With this algorithm, a driver/module might be able
1852 * to find a reasonably accurate q_count, and the
1853 * framework can still try and limit resource usage.
1854 */
1855 mblk_t *
getq(queue_t * q)1856 getq(queue_t *q)
1857 {
1858 mblk_t *bp;
1859 uchar_t band = 0;
1860
1861 bp = getq_noenab(q, 0);
1862 if (bp != NULL)
1863 band = bp->b_band;
1864
1865 /*
1866 * Inlined from qbackenable().
1867 * Quick check without holding the lock.
1868 */
1869 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1870 return (bp);
1871
1872 qbackenable(q, band);
1873 return (bp);
1874 }
1875
1876 /*
1877 * Calculate number of data bytes in a single data message block taking
1878 * multidata messages into account.
1879 */
1880
1881 #define ADD_MBLK_SIZE(mp, size) \
1882 if (DB_TYPE(mp) != M_MULTIDATA) { \
1883 (size) += MBLKL(mp); \
1884 } else { \
1885 uint_t pinuse; \
1886 \
1887 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \
1888 (size) += pinuse; \
1889 }
1890
1891 /*
1892 * Returns the number of bytes in a message (a message is defined as a
1893 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1894 * also return the number of distinct mblks in the message.
1895 */
1896 int
mp_cont_len(mblk_t * bp,int * mblkcnt)1897 mp_cont_len(mblk_t *bp, int *mblkcnt)
1898 {
1899 mblk_t *mp;
1900 int mblks = 0;
1901 int bytes = 0;
1902
1903 for (mp = bp; mp != NULL; mp = mp->b_cont) {
1904 ADD_MBLK_SIZE(mp, bytes);
1905 mblks++;
1906 }
1907
1908 if (mblkcnt != NULL)
1909 *mblkcnt = mblks;
1910
1911 return (bytes);
1912 }
1913
1914 /*
1915 * Like getq() but does not backenable. This is used by the stream
1916 * head when a putback() is likely. The caller must call qbackenable()
1917 * after it is done with accessing the queue.
1918 * The rbytes arguments to getq_noneab() allows callers to specify a
1919 * the maximum number of bytes to return. If the current amount on the
1920 * queue is less than this then the entire message will be returned.
1921 * A value of 0 returns the entire message and is equivalent to the old
1922 * default behaviour prior to the addition of the rbytes argument.
1923 */
1924 mblk_t *
getq_noenab(queue_t * q,ssize_t rbytes)1925 getq_noenab(queue_t *q, ssize_t rbytes)
1926 {
1927 mblk_t *bp, *mp1;
1928 mblk_t *mp2 = NULL;
1929 qband_t *qbp;
1930 kthread_id_t freezer;
1931 int bytecnt = 0, mblkcnt = 0;
1932
1933 /* freezestr should allow its caller to call getq/putq */
1934 freezer = STREAM(q)->sd_freezer;
1935 if (freezer == curthread) {
1936 ASSERT(frozenstr(q));
1937 ASSERT(MUTEX_HELD(QLOCK(q)));
1938 } else
1939 mutex_enter(QLOCK(q));
1940
1941 if ((bp = q->q_first) == 0) {
1942 q->q_flag |= QWANTR;
1943 } else {
1944 /*
1945 * If the caller supplied a byte threshold and there is
1946 * more than this amount on the queue then break up the
1947 * the message appropriately. We can only safely do
1948 * this for M_DATA messages.
1949 */
1950 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1951 (q->q_count > rbytes)) {
1952 /*
1953 * Inline version of mp_cont_len() which terminates
1954 * when we meet or exceed rbytes.
1955 */
1956 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1957 mblkcnt++;
1958 ADD_MBLK_SIZE(mp1, bytecnt);
1959 if (bytecnt >= rbytes)
1960 break;
1961 }
1962 /*
1963 * We need to account for the following scenarios:
1964 *
1965 * 1) Too much data in the first message:
1966 * mp1 will be the mblk which puts us over our
1967 * byte limit.
1968 * 2) Not enough data in the first message:
1969 * mp1 will be NULL.
1970 * 3) Exactly the right amount of data contained within
1971 * whole mblks:
1972 * mp1->b_cont will be where we break the message.
1973 */
1974 if (bytecnt > rbytes) {
1975 /*
1976 * Dup/copy mp1 and put what we don't need
1977 * back onto the queue. Adjust the read/write
1978 * and continuation pointers appropriately
1979 * and decrement the current mblk count to
1980 * reflect we are putting an mblk back onto
1981 * the queue.
1982 * When adjusting the message pointers, it's
1983 * OK to use the existing bytecnt and the
1984 * requested amount (rbytes) to calculate the
1985 * the new write offset (b_wptr) of what we
1986 * are taking. However, we cannot use these
1987 * values when calculating the read offset of
1988 * the mblk we are putting back on the queue.
1989 * This is because the begining (b_rptr) of the
1990 * mblk represents some arbitrary point within
1991 * the message.
1992 * It's simplest to do this by advancing b_rptr
1993 * by the new length of mp1 as we don't have to
1994 * remember any intermediate state.
1995 */
1996 ASSERT(mp1 != NULL);
1997 mblkcnt--;
1998 if ((mp2 = dupb(mp1)) == NULL &&
1999 (mp2 = copyb(mp1)) == NULL) {
2000 bytecnt = mblkcnt = 0;
2001 goto dup_failed;
2002 }
2003 mp2->b_cont = mp1->b_cont;
2004 mp1->b_wptr -= bytecnt - rbytes;
2005 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
2006 mp1->b_cont = NULL;
2007 bytecnt = rbytes;
2008 } else {
2009 /*
2010 * Either there is not enough data in the first
2011 * message or there is no excess data to deal
2012 * with. If mp1 is NULL, we are taking the
2013 * whole message. No need to do anything.
2014 * Otherwise we assign mp1->b_cont to mp2 as
2015 * we will be putting this back onto the head of
2016 * the queue.
2017 */
2018 if (mp1 != NULL) {
2019 mp2 = mp1->b_cont;
2020 mp1->b_cont = NULL;
2021 }
2022 }
2023 /*
2024 * If mp2 is not NULL then we have part of the message
2025 * to put back onto the queue.
2026 */
2027 if (mp2 != NULL) {
2028 if ((mp2->b_next = bp->b_next) == NULL)
2029 q->q_last = mp2;
2030 else
2031 bp->b_next->b_prev = mp2;
2032 q->q_first = mp2;
2033 } else {
2034 if ((q->q_first = bp->b_next) == NULL)
2035 q->q_last = NULL;
2036 else
2037 q->q_first->b_prev = NULL;
2038 }
2039 } else {
2040 /*
2041 * Either no byte threshold was supplied, there is
2042 * not enough on the queue or we failed to
2043 * duplicate/copy a data block. In these cases we
2044 * just take the entire first message.
2045 */
2046 dup_failed:
2047 bytecnt = mp_cont_len(bp, &mblkcnt);
2048 if ((q->q_first = bp->b_next) == NULL)
2049 q->q_last = NULL;
2050 else
2051 q->q_first->b_prev = NULL;
2052 }
2053 if (bp->b_band == 0) {
2054 q->q_count -= bytecnt;
2055 q->q_mblkcnt -= mblkcnt;
2056 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2057 (q->q_mblkcnt < q->q_hiwat))) {
2058 q->q_flag &= ~QFULL;
2059 }
2060 } else {
2061 int i;
2062
2063 ASSERT(bp->b_band <= q->q_nband);
2064 ASSERT(q->q_bandp != NULL);
2065 ASSERT(MUTEX_HELD(QLOCK(q)));
2066 qbp = q->q_bandp;
2067 i = bp->b_band;
2068 while (--i > 0)
2069 qbp = qbp->qb_next;
2070 if (qbp->qb_first == qbp->qb_last) {
2071 qbp->qb_first = NULL;
2072 qbp->qb_last = NULL;
2073 } else {
2074 qbp->qb_first = bp->b_next;
2075 }
2076 qbp->qb_count -= bytecnt;
2077 qbp->qb_mblkcnt -= mblkcnt;
2078 if (qbp->qb_mblkcnt == 0 ||
2079 ((qbp->qb_count < qbp->qb_hiwat) &&
2080 (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2081 qbp->qb_flag &= ~QB_FULL;
2082 }
2083 }
2084 q->q_flag &= ~QWANTR;
2085 bp->b_next = NULL;
2086 bp->b_prev = NULL;
2087 }
2088 if (freezer != curthread)
2089 mutex_exit(QLOCK(q));
2090
2091 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
2092
2093 return (bp);
2094 }
2095
2096 /*
2097 * Determine if a backenable is needed after removing a message in the
2098 * specified band.
2099 * NOTE: This routine assumes that something like getq_noenab() has been
2100 * already called.
2101 *
2102 * For the read side it is ok to hold sd_lock across calling this (and the
2103 * stream head often does).
2104 * But for the write side strwakeq might be invoked and it acquires sd_lock.
2105 */
2106 void
qbackenable(queue_t * q,uchar_t band)2107 qbackenable(queue_t *q, uchar_t band)
2108 {
2109 int backenab = 0;
2110 qband_t *qbp;
2111 kthread_id_t freezer;
2112
2113 ASSERT(q);
2114 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2115
2116 /*
2117 * Quick check without holding the lock.
2118 * OK since after getq() has lowered the q_count these flags
2119 * would not change unless either the qbackenable() is done by
2120 * another thread (which is ok) or the queue has gotten QFULL
2121 * in which case another backenable will take place when the queue
2122 * drops below q_lowat.
2123 */
2124 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2125 return;
2126
2127 /* freezestr should allow its caller to call getq/putq */
2128 freezer = STREAM(q)->sd_freezer;
2129 if (freezer == curthread) {
2130 ASSERT(frozenstr(q));
2131 ASSERT(MUTEX_HELD(QLOCK(q)));
2132 } else
2133 mutex_enter(QLOCK(q));
2134
2135 if (band == 0) {
2136 if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2137 q->q_mblkcnt < q->q_lowat)) {
2138 backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2139 }
2140 } else {
2141 int i;
2142
2143 ASSERT((unsigned)band <= q->q_nband);
2144 ASSERT(q->q_bandp != NULL);
2145
2146 qbp = q->q_bandp;
2147 i = band;
2148 while (--i > 0)
2149 qbp = qbp->qb_next;
2150
2151 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2152 qbp->qb_mblkcnt < qbp->qb_lowat)) {
2153 backenab = qbp->qb_flag & QB_WANTW;
2154 }
2155 }
2156
2157 if (backenab == 0) {
2158 if (freezer != curthread)
2159 mutex_exit(QLOCK(q));
2160 return;
2161 }
2162
2163 /* Have to drop the lock across strwakeq and backenable */
2164 if (backenab & QWANTWSYNC)
2165 q->q_flag &= ~QWANTWSYNC;
2166 if (backenab & (QWANTW|QB_WANTW)) {
2167 if (band != 0)
2168 qbp->qb_flag &= ~QB_WANTW;
2169 else {
2170 q->q_flag &= ~QWANTW;
2171 }
2172 }
2173
2174 if (freezer != curthread)
2175 mutex_exit(QLOCK(q));
2176
2177 if (backenab & QWANTWSYNC)
2178 strwakeq(q, QWANTWSYNC);
2179 if (backenab & (QWANTW|QB_WANTW))
2180 backenable(q, band);
2181 }
2182
2183 /*
2184 * Remove a message from a queue. The queue count and other
2185 * flow control parameters are adjusted and the back queue
2186 * enabled if necessary.
2187 *
2188 * rmvq can be called with the stream frozen, but other utility functions
2189 * holding QLOCK, and by streams modules without any locks/frozen.
2190 */
2191 void
rmvq(queue_t * q,mblk_t * mp)2192 rmvq(queue_t *q, mblk_t *mp)
2193 {
2194 ASSERT(mp != NULL);
2195
2196 rmvq_noenab(q, mp);
2197 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2198 /*
2199 * qbackenable can handle a frozen stream but not a "random"
2200 * qlock being held. Drop lock across qbackenable.
2201 */
2202 mutex_exit(QLOCK(q));
2203 qbackenable(q, mp->b_band);
2204 mutex_enter(QLOCK(q));
2205 } else {
2206 qbackenable(q, mp->b_band);
2207 }
2208 }
2209
2210 /*
2211 * Like rmvq() but without any backenabling.
2212 * This exists to handle SR_CONSOL_DATA in strrput().
2213 */
2214 void
rmvq_noenab(queue_t * q,mblk_t * mp)2215 rmvq_noenab(queue_t *q, mblk_t *mp)
2216 {
2217 int i;
2218 qband_t *qbp = NULL;
2219 kthread_id_t freezer;
2220 int bytecnt = 0, mblkcnt = 0;
2221
2222 freezer = STREAM(q)->sd_freezer;
2223 if (freezer == curthread) {
2224 ASSERT(frozenstr(q));
2225 ASSERT(MUTEX_HELD(QLOCK(q)));
2226 } else if (MUTEX_HELD(QLOCK(q))) {
2227 /* Don't drop lock on exit */
2228 freezer = curthread;
2229 } else
2230 mutex_enter(QLOCK(q));
2231
2232 ASSERT(mp->b_band <= q->q_nband);
2233 if (mp->b_band != 0) { /* Adjust band pointers */
2234 ASSERT(q->q_bandp != NULL);
2235 qbp = q->q_bandp;
2236 i = mp->b_band;
2237 while (--i > 0)
2238 qbp = qbp->qb_next;
2239 if (mp == qbp->qb_first) {
2240 if (mp->b_next && mp->b_band == mp->b_next->b_band)
2241 qbp->qb_first = mp->b_next;
2242 else
2243 qbp->qb_first = NULL;
2244 }
2245 if (mp == qbp->qb_last) {
2246 if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2247 qbp->qb_last = mp->b_prev;
2248 else
2249 qbp->qb_last = NULL;
2250 }
2251 }
2252
2253 /*
2254 * Remove the message from the list.
2255 */
2256 if (mp->b_prev)
2257 mp->b_prev->b_next = mp->b_next;
2258 else
2259 q->q_first = mp->b_next;
2260 if (mp->b_next)
2261 mp->b_next->b_prev = mp->b_prev;
2262 else
2263 q->q_last = mp->b_prev;
2264 mp->b_next = NULL;
2265 mp->b_prev = NULL;
2266
2267 /* Get the size of the message for q_count accounting */
2268 bytecnt = mp_cont_len(mp, &mblkcnt);
2269
2270 if (mp->b_band == 0) { /* Perform q_count accounting */
2271 q->q_count -= bytecnt;
2272 q->q_mblkcnt -= mblkcnt;
2273 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2274 (q->q_mblkcnt < q->q_hiwat))) {
2275 q->q_flag &= ~QFULL;
2276 }
2277 } else { /* Perform qb_count accounting */
2278 qbp->qb_count -= bytecnt;
2279 qbp->qb_mblkcnt -= mblkcnt;
2280 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2281 (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2282 qbp->qb_flag &= ~QB_FULL;
2283 }
2284 }
2285 if (freezer != curthread)
2286 mutex_exit(QLOCK(q));
2287
2288 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
2289 }
2290
2291 /*
2292 * Empty a queue.
2293 * If flag is set, remove all messages. Otherwise, remove
2294 * only non-control messages. If queue falls below its low
2295 * water mark, and QWANTW is set, enable the nearest upstream
2296 * service procedure.
2297 *
2298 * Historical note: when merging the M_FLUSH code in strrput with this
2299 * code one difference was discovered. flushq did not have a check
2300 * for q_lowat == 0 in the backenabling test.
2301 *
2302 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2303 * if one exists on the queue.
2304 */
2305 void
flushq_common(queue_t * q,int flag,int pcproto_flag)2306 flushq_common(queue_t *q, int flag, int pcproto_flag)
2307 {
2308 mblk_t *mp, *nmp;
2309 qband_t *qbp;
2310 int backenab = 0;
2311 unsigned char bpri;
2312 unsigned char qbf[NBAND]; /* band flushing backenable flags */
2313
2314 if (q->q_first == NULL)
2315 return;
2316
2317 mutex_enter(QLOCK(q));
2318 mp = q->q_first;
2319 q->q_first = NULL;
2320 q->q_last = NULL;
2321 q->q_count = 0;
2322 q->q_mblkcnt = 0;
2323 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2324 qbp->qb_first = NULL;
2325 qbp->qb_last = NULL;
2326 qbp->qb_count = 0;
2327 qbp->qb_mblkcnt = 0;
2328 qbp->qb_flag &= ~QB_FULL;
2329 }
2330 q->q_flag &= ~QFULL;
2331 mutex_exit(QLOCK(q));
2332 while (mp) {
2333 nmp = mp->b_next;
2334 mp->b_next = mp->b_prev = NULL;
2335
2336 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2337
2338 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2339 (void) putq(q, mp);
2340 else if (flag || datamsg(mp->b_datap->db_type))
2341 freemsg(mp);
2342 else
2343 (void) putq(q, mp);
2344 mp = nmp;
2345 }
2346 bpri = 1;
2347 mutex_enter(QLOCK(q));
2348 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2349 if ((qbp->qb_flag & QB_WANTW) &&
2350 (((qbp->qb_count < qbp->qb_lowat) &&
2351 (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2352 qbp->qb_lowat == 0)) {
2353 qbp->qb_flag &= ~QB_WANTW;
2354 backenab = 1;
2355 qbf[bpri] = 1;
2356 } else
2357 qbf[bpri] = 0;
2358 bpri++;
2359 }
2360 ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2361 if ((q->q_flag & QWANTW) &&
2362 (((q->q_count < q->q_lowat) &&
2363 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2364 q->q_flag &= ~QWANTW;
2365 backenab = 1;
2366 qbf[0] = 1;
2367 } else
2368 qbf[0] = 0;
2369
2370 /*
2371 * If any band can now be written to, and there is a writer
2372 * for that band, then backenable the closest service procedure.
2373 */
2374 if (backenab) {
2375 mutex_exit(QLOCK(q));
2376 for (bpri = q->q_nband; bpri != 0; bpri--)
2377 if (qbf[bpri])
2378 backenable(q, bpri);
2379 if (qbf[0])
2380 backenable(q, 0);
2381 } else
2382 mutex_exit(QLOCK(q));
2383 }
2384
2385 /*
2386 * The real flushing takes place in flushq_common. This is done so that
2387 * a flag which specifies whether or not M_PCPROTO messages should be flushed
2388 * or not. Currently the only place that uses this flag is the stream head.
2389 */
2390 void
flushq(queue_t * q,int flag)2391 flushq(queue_t *q, int flag)
2392 {
2393 flushq_common(q, flag, 0);
2394 }
2395
2396 /*
2397 * Flush the queue of messages of the given priority band.
2398 * There is some duplication of code between flushq and flushband.
2399 * This is because we want to optimize the code as much as possible.
2400 * The assumption is that there will be more messages in the normal
2401 * (priority 0) band than in any other.
2402 *
2403 * Historical note: when merging the M_FLUSH code in strrput with this
2404 * code one difference was discovered. flushband had an extra check for
2405 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2406 * case. That check does not match the man page for flushband and was not
2407 * in the strrput flush code hence it was removed.
2408 */
2409 void
flushband(queue_t * q,unsigned char pri,int flag)2410 flushband(queue_t *q, unsigned char pri, int flag)
2411 {
2412 mblk_t *mp;
2413 mblk_t *nmp;
2414 mblk_t *last;
2415 qband_t *qbp;
2416 int band;
2417
2418 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2419 if (pri > q->q_nband) {
2420 return;
2421 }
2422 mutex_enter(QLOCK(q));
2423 if (pri == 0) {
2424 mp = q->q_first;
2425 q->q_first = NULL;
2426 q->q_last = NULL;
2427 q->q_count = 0;
2428 q->q_mblkcnt = 0;
2429 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2430 qbp->qb_first = NULL;
2431 qbp->qb_last = NULL;
2432 qbp->qb_count = 0;
2433 qbp->qb_mblkcnt = 0;
2434 qbp->qb_flag &= ~QB_FULL;
2435 }
2436 q->q_flag &= ~QFULL;
2437 mutex_exit(QLOCK(q));
2438 while (mp) {
2439 nmp = mp->b_next;
2440 mp->b_next = mp->b_prev = NULL;
2441 if ((mp->b_band == 0) &&
2442 ((flag == FLUSHALL) ||
2443 datamsg(mp->b_datap->db_type)))
2444 freemsg(mp);
2445 else
2446 (void) putq(q, mp);
2447 mp = nmp;
2448 }
2449 mutex_enter(QLOCK(q));
2450 if ((q->q_flag & QWANTW) &&
2451 (((q->q_count < q->q_lowat) &&
2452 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2453 q->q_flag &= ~QWANTW;
2454 mutex_exit(QLOCK(q));
2455
2456 backenable(q, pri);
2457 } else
2458 mutex_exit(QLOCK(q));
2459 } else { /* pri != 0 */
2460 boolean_t flushed = B_FALSE;
2461 band = pri;
2462
2463 ASSERT(MUTEX_HELD(QLOCK(q)));
2464 qbp = q->q_bandp;
2465 while (--band > 0)
2466 qbp = qbp->qb_next;
2467 mp = qbp->qb_first;
2468 if (mp == NULL) {
2469 mutex_exit(QLOCK(q));
2470 return;
2471 }
2472 last = qbp->qb_last->b_next;
2473 /*
2474 * rmvq_noenab() and freemsg() are called for each mblk that
2475 * meets the criteria. The loop is executed until the last
2476 * mblk has been processed.
2477 */
2478 while (mp != last) {
2479 ASSERT(mp->b_band == pri);
2480 nmp = mp->b_next;
2481 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2482 rmvq_noenab(q, mp);
2483 freemsg(mp);
2484 flushed = B_TRUE;
2485 }
2486 mp = nmp;
2487 }
2488 mutex_exit(QLOCK(q));
2489
2490 /*
2491 * If any mblk(s) has been freed, we know that qbackenable()
2492 * will need to be called.
2493 */
2494 if (flushed)
2495 qbackenable(q, pri);
2496 }
2497 }
2498
2499 /*
2500 * Return 1 if the queue is not full. If the queue is full, return
2501 * 0 (may not put message) and set QWANTW flag (caller wants to write
2502 * to the queue).
2503 */
2504 int
canput(queue_t * q)2505 canput(queue_t *q)
2506 {
2507 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2508
2509 /* this is for loopback transports, they should not do a canput */
2510 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2511
2512 /* Find next forward module that has a service procedure */
2513 q = q->q_nfsrv;
2514
2515 if (!(q->q_flag & QFULL)) {
2516 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2517 return (1);
2518 }
2519 mutex_enter(QLOCK(q));
2520 if (q->q_flag & QFULL) {
2521 q->q_flag |= QWANTW;
2522 mutex_exit(QLOCK(q));
2523 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2524 return (0);
2525 }
2526 mutex_exit(QLOCK(q));
2527 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2528 return (1);
2529 }
2530
2531 /*
2532 * This is the new canput for use with priority bands. Return 1 if the
2533 * band is not full. If the band is full, return 0 (may not put message)
2534 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2535 * write to the queue).
2536 */
2537 int
bcanput(queue_t * q,unsigned char pri)2538 bcanput(queue_t *q, unsigned char pri)
2539 {
2540 qband_t *qbp;
2541
2542 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2543 if (!q)
2544 return (0);
2545
2546 /* Find next forward module that has a service procedure */
2547 q = q->q_nfsrv;
2548
2549 mutex_enter(QLOCK(q));
2550 if (pri == 0) {
2551 if (q->q_flag & QFULL) {
2552 q->q_flag |= QWANTW;
2553 mutex_exit(QLOCK(q));
2554 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2555 "bcanput:%p %X %d", q, pri, 0);
2556 return (0);
2557 }
2558 } else { /* pri != 0 */
2559 if (pri > q->q_nband) {
2560 /*
2561 * No band exists yet, so return success.
2562 */
2563 mutex_exit(QLOCK(q));
2564 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2565 "bcanput:%p %X %d", q, pri, 1);
2566 return (1);
2567 }
2568 qbp = q->q_bandp;
2569 while (--pri)
2570 qbp = qbp->qb_next;
2571 if (qbp->qb_flag & QB_FULL) {
2572 qbp->qb_flag |= QB_WANTW;
2573 mutex_exit(QLOCK(q));
2574 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2575 "bcanput:%p %X %d", q, pri, 0);
2576 return (0);
2577 }
2578 }
2579 mutex_exit(QLOCK(q));
2580 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2581 "bcanput:%p %X %d", q, pri, 1);
2582 return (1);
2583 }
2584
2585 /*
2586 * Put a message on a queue.
2587 *
2588 * Messages are enqueued on a priority basis. The priority classes
2589 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2590 * and B_NORMAL (type < QPCTL && band == 0).
2591 *
2592 * Add appropriate weighted data block sizes to queue count.
2593 * If queue hits high water mark then set QFULL flag.
2594 *
2595 * If QNOENAB is not set (putq is allowed to enable the queue),
2596 * enable the queue only if the message is PRIORITY,
2597 * or the QWANTR flag is set (indicating that the service procedure
2598 * is ready to read the queue. This implies that a service
2599 * procedure must NEVER put a high priority message back on its own
2600 * queue, as this would result in an infinite loop (!).
2601 */
2602 int
putq(queue_t * q,mblk_t * bp)2603 putq(queue_t *q, mblk_t *bp)
2604 {
2605 mblk_t *tmp;
2606 qband_t *qbp = NULL;
2607 int mcls = (int)queclass(bp);
2608 kthread_id_t freezer;
2609 int bytecnt = 0, mblkcnt = 0;
2610
2611 freezer = STREAM(q)->sd_freezer;
2612 if (freezer == curthread) {
2613 ASSERT(frozenstr(q));
2614 ASSERT(MUTEX_HELD(QLOCK(q)));
2615 } else
2616 mutex_enter(QLOCK(q));
2617
2618 /*
2619 * Make sanity checks and if qband structure is not yet
2620 * allocated, do so.
2621 */
2622 if (mcls == QPCTL) {
2623 if (bp->b_band != 0)
2624 bp->b_band = 0; /* force to be correct */
2625 } else if (bp->b_band != 0) {
2626 int i;
2627 qband_t **qbpp;
2628
2629 if (bp->b_band > q->q_nband) {
2630
2631 /*
2632 * The qband structure for this priority band is
2633 * not on the queue yet, so we have to allocate
2634 * one on the fly. It would be wasteful to
2635 * associate the qband structures with every
2636 * queue when the queues are allocated. This is
2637 * because most queues will only need the normal
2638 * band of flow which can be described entirely
2639 * by the queue itself.
2640 */
2641 qbpp = &q->q_bandp;
2642 while (*qbpp)
2643 qbpp = &(*qbpp)->qb_next;
2644 while (bp->b_band > q->q_nband) {
2645 if ((*qbpp = allocband()) == NULL) {
2646 if (freezer != curthread)
2647 mutex_exit(QLOCK(q));
2648 return (0);
2649 }
2650 (*qbpp)->qb_hiwat = q->q_hiwat;
2651 (*qbpp)->qb_lowat = q->q_lowat;
2652 q->q_nband++;
2653 qbpp = &(*qbpp)->qb_next;
2654 }
2655 }
2656 ASSERT(MUTEX_HELD(QLOCK(q)));
2657 qbp = q->q_bandp;
2658 i = bp->b_band;
2659 while (--i)
2660 qbp = qbp->qb_next;
2661 }
2662
2663 /*
2664 * If queue is empty, add the message and initialize the pointers.
2665 * Otherwise, adjust message pointers and queue pointers based on
2666 * the type of the message and where it belongs on the queue. Some
2667 * code is duplicated to minimize the number of conditionals and
2668 * hopefully minimize the amount of time this routine takes.
2669 */
2670 if (!q->q_first) {
2671 bp->b_next = NULL;
2672 bp->b_prev = NULL;
2673 q->q_first = bp;
2674 q->q_last = bp;
2675 if (qbp) {
2676 qbp->qb_first = bp;
2677 qbp->qb_last = bp;
2678 }
2679 } else if (!qbp) { /* bp->b_band == 0 */
2680
2681 /*
2682 * If queue class of message is less than or equal to
2683 * that of the last one on the queue, tack on to the end.
2684 */
2685 tmp = q->q_last;
2686 if (mcls <= (int)queclass(tmp)) {
2687 bp->b_next = NULL;
2688 bp->b_prev = tmp;
2689 tmp->b_next = bp;
2690 q->q_last = bp;
2691 } else {
2692 tmp = q->q_first;
2693 while ((int)queclass(tmp) >= mcls)
2694 tmp = tmp->b_next;
2695
2696 /*
2697 * Insert bp before tmp.
2698 */
2699 bp->b_next = tmp;
2700 bp->b_prev = tmp->b_prev;
2701 if (tmp->b_prev)
2702 tmp->b_prev->b_next = bp;
2703 else
2704 q->q_first = bp;
2705 tmp->b_prev = bp;
2706 }
2707 } else { /* bp->b_band != 0 */
2708 if (qbp->qb_first) {
2709 tmp = qbp->qb_last;
2710
2711 /*
2712 * Insert bp after the last message in this band.
2713 */
2714 bp->b_next = tmp->b_next;
2715 if (tmp->b_next)
2716 tmp->b_next->b_prev = bp;
2717 else
2718 q->q_last = bp;
2719 bp->b_prev = tmp;
2720 tmp->b_next = bp;
2721 } else {
2722 tmp = q->q_last;
2723 if ((mcls < (int)queclass(tmp)) ||
2724 (bp->b_band <= tmp->b_band)) {
2725
2726 /*
2727 * Tack bp on end of queue.
2728 */
2729 bp->b_next = NULL;
2730 bp->b_prev = tmp;
2731 tmp->b_next = bp;
2732 q->q_last = bp;
2733 } else {
2734 tmp = q->q_first;
2735 while (tmp->b_datap->db_type >= QPCTL)
2736 tmp = tmp->b_next;
2737 while (tmp->b_band >= bp->b_band)
2738 tmp = tmp->b_next;
2739
2740 /*
2741 * Insert bp before tmp.
2742 */
2743 bp->b_next = tmp;
2744 bp->b_prev = tmp->b_prev;
2745 if (tmp->b_prev)
2746 tmp->b_prev->b_next = bp;
2747 else
2748 q->q_first = bp;
2749 tmp->b_prev = bp;
2750 }
2751 qbp->qb_first = bp;
2752 }
2753 qbp->qb_last = bp;
2754 }
2755
2756 /* Get message byte count for q_count accounting */
2757 bytecnt = mp_cont_len(bp, &mblkcnt);
2758
2759 if (qbp) {
2760 qbp->qb_count += bytecnt;
2761 qbp->qb_mblkcnt += mblkcnt;
2762 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2763 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2764 qbp->qb_flag |= QB_FULL;
2765 }
2766 } else {
2767 q->q_count += bytecnt;
2768 q->q_mblkcnt += mblkcnt;
2769 if ((q->q_count >= q->q_hiwat) ||
2770 (q->q_mblkcnt >= q->q_hiwat)) {
2771 q->q_flag |= QFULL;
2772 }
2773 }
2774
2775 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2776
2777 if ((mcls > QNORM) ||
2778 (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2779 qenable_locked(q);
2780 ASSERT(MUTEX_HELD(QLOCK(q)));
2781 if (freezer != curthread)
2782 mutex_exit(QLOCK(q));
2783
2784 return (1);
2785 }
2786
2787 /*
2788 * Put stuff back at beginning of Q according to priority order.
2789 * See comment on putq above for details.
2790 */
2791 int
putbq(queue_t * q,mblk_t * bp)2792 putbq(queue_t *q, mblk_t *bp)
2793 {
2794 mblk_t *tmp;
2795 qband_t *qbp = NULL;
2796 int mcls = (int)queclass(bp);
2797 kthread_id_t freezer;
2798 int bytecnt = 0, mblkcnt = 0;
2799
2800 ASSERT(q && bp);
2801 ASSERT(bp->b_next == NULL);
2802 freezer = STREAM(q)->sd_freezer;
2803 if (freezer == curthread) {
2804 ASSERT(frozenstr(q));
2805 ASSERT(MUTEX_HELD(QLOCK(q)));
2806 } else
2807 mutex_enter(QLOCK(q));
2808
2809 /*
2810 * Make sanity checks and if qband structure is not yet
2811 * allocated, do so.
2812 */
2813 if (mcls == QPCTL) {
2814 if (bp->b_band != 0)
2815 bp->b_band = 0; /* force to be correct */
2816 } else if (bp->b_band != 0) {
2817 int i;
2818 qband_t **qbpp;
2819
2820 if (bp->b_band > q->q_nband) {
2821 qbpp = &q->q_bandp;
2822 while (*qbpp)
2823 qbpp = &(*qbpp)->qb_next;
2824 while (bp->b_band > q->q_nband) {
2825 if ((*qbpp = allocband()) == NULL) {
2826 if (freezer != curthread)
2827 mutex_exit(QLOCK(q));
2828 return (0);
2829 }
2830 (*qbpp)->qb_hiwat = q->q_hiwat;
2831 (*qbpp)->qb_lowat = q->q_lowat;
2832 q->q_nband++;
2833 qbpp = &(*qbpp)->qb_next;
2834 }
2835 }
2836 qbp = q->q_bandp;
2837 i = bp->b_band;
2838 while (--i)
2839 qbp = qbp->qb_next;
2840 }
2841
2842 /*
2843 * If queue is empty or if message is high priority,
2844 * place on the front of the queue.
2845 */
2846 tmp = q->q_first;
2847 if ((!tmp) || (mcls == QPCTL)) {
2848 bp->b_next = tmp;
2849 if (tmp)
2850 tmp->b_prev = bp;
2851 else
2852 q->q_last = bp;
2853 q->q_first = bp;
2854 bp->b_prev = NULL;
2855 if (qbp) {
2856 qbp->qb_first = bp;
2857 qbp->qb_last = bp;
2858 }
2859 } else if (qbp) { /* bp->b_band != 0 */
2860 tmp = qbp->qb_first;
2861 if (tmp) {
2862
2863 /*
2864 * Insert bp before the first message in this band.
2865 */
2866 bp->b_next = tmp;
2867 bp->b_prev = tmp->b_prev;
2868 if (tmp->b_prev)
2869 tmp->b_prev->b_next = bp;
2870 else
2871 q->q_first = bp;
2872 tmp->b_prev = bp;
2873 } else {
2874 tmp = q->q_last;
2875 if ((mcls < (int)queclass(tmp)) ||
2876 (bp->b_band < tmp->b_band)) {
2877
2878 /*
2879 * Tack bp on end of queue.
2880 */
2881 bp->b_next = NULL;
2882 bp->b_prev = tmp;
2883 tmp->b_next = bp;
2884 q->q_last = bp;
2885 } else {
2886 tmp = q->q_first;
2887 while (tmp->b_datap->db_type >= QPCTL)
2888 tmp = tmp->b_next;
2889 while (tmp->b_band > bp->b_band)
2890 tmp = tmp->b_next;
2891
2892 /*
2893 * Insert bp before tmp.
2894 */
2895 bp->b_next = tmp;
2896 bp->b_prev = tmp->b_prev;
2897 if (tmp->b_prev)
2898 tmp->b_prev->b_next = bp;
2899 else
2900 q->q_first = bp;
2901 tmp->b_prev = bp;
2902 }
2903 qbp->qb_last = bp;
2904 }
2905 qbp->qb_first = bp;
2906 } else { /* bp->b_band == 0 && !QPCTL */
2907
2908 /*
2909 * If the queue class or band is less than that of the last
2910 * message on the queue, tack bp on the end of the queue.
2911 */
2912 tmp = q->q_last;
2913 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2914 bp->b_next = NULL;
2915 bp->b_prev = tmp;
2916 tmp->b_next = bp;
2917 q->q_last = bp;
2918 } else {
2919 tmp = q->q_first;
2920 while (tmp->b_datap->db_type >= QPCTL)
2921 tmp = tmp->b_next;
2922 while (tmp->b_band > bp->b_band)
2923 tmp = tmp->b_next;
2924
2925 /*
2926 * Insert bp before tmp.
2927 */
2928 bp->b_next = tmp;
2929 bp->b_prev = tmp->b_prev;
2930 if (tmp->b_prev)
2931 tmp->b_prev->b_next = bp;
2932 else
2933 q->q_first = bp;
2934 tmp->b_prev = bp;
2935 }
2936 }
2937
2938 /* Get message byte count for q_count accounting */
2939 bytecnt = mp_cont_len(bp, &mblkcnt);
2940
2941 if (qbp) {
2942 qbp->qb_count += bytecnt;
2943 qbp->qb_mblkcnt += mblkcnt;
2944 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2945 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2946 qbp->qb_flag |= QB_FULL;
2947 }
2948 } else {
2949 q->q_count += bytecnt;
2950 q->q_mblkcnt += mblkcnt;
2951 if ((q->q_count >= q->q_hiwat) ||
2952 (q->q_mblkcnt >= q->q_hiwat)) {
2953 q->q_flag |= QFULL;
2954 }
2955 }
2956
2957 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2958
2959 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2960 qenable_locked(q);
2961 ASSERT(MUTEX_HELD(QLOCK(q)));
2962 if (freezer != curthread)
2963 mutex_exit(QLOCK(q));
2964
2965 return (1);
2966 }
2967
2968 /*
2969 * Insert a message before an existing message on the queue. If the
2970 * existing message is NULL, the new messages is placed on the end of
2971 * the queue. The queue class of the new message is ignored. However,
2972 * the priority band of the new message must adhere to the following
2973 * ordering:
2974 *
2975 * emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2976 *
2977 * All flow control parameters are updated.
2978 *
2979 * insq can be called with the stream frozen, but other utility functions
2980 * holding QLOCK, and by streams modules without any locks/frozen.
2981 */
2982 int
insq(queue_t * q,mblk_t * emp,mblk_t * mp)2983 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2984 {
2985 mblk_t *tmp;
2986 qband_t *qbp = NULL;
2987 int mcls = (int)queclass(mp);
2988 kthread_id_t freezer;
2989 int bytecnt = 0, mblkcnt = 0;
2990
2991 freezer = STREAM(q)->sd_freezer;
2992 if (freezer == curthread) {
2993 ASSERT(frozenstr(q));
2994 ASSERT(MUTEX_HELD(QLOCK(q)));
2995 } else if (MUTEX_HELD(QLOCK(q))) {
2996 /* Don't drop lock on exit */
2997 freezer = curthread;
2998 } else
2999 mutex_enter(QLOCK(q));
3000
3001 if (mcls == QPCTL) {
3002 if (mp->b_band != 0)
3003 mp->b_band = 0; /* force to be correct */
3004 if (emp && emp->b_prev &&
3005 (emp->b_prev->b_datap->db_type < QPCTL))
3006 goto badord;
3007 }
3008 if (emp) {
3009 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
3010 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
3011 (emp->b_prev->b_band < mp->b_band))) {
3012 goto badord;
3013 }
3014 } else {
3015 tmp = q->q_last;
3016 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3017 badord:
3018 cmn_err(CE_WARN,
3019 "insq: attempt to insert message out of order "
3020 "on q %p", (void *)q);
3021 if (freezer != curthread)
3022 mutex_exit(QLOCK(q));
3023 return (0);
3024 }
3025 }
3026
3027 if (mp->b_band != 0) {
3028 int i;
3029 qband_t **qbpp;
3030
3031 if (mp->b_band > q->q_nband) {
3032 qbpp = &q->q_bandp;
3033 while (*qbpp)
3034 qbpp = &(*qbpp)->qb_next;
3035 while (mp->b_band > q->q_nband) {
3036 if ((*qbpp = allocband()) == NULL) {
3037 if (freezer != curthread)
3038 mutex_exit(QLOCK(q));
3039 return (0);
3040 }
3041 (*qbpp)->qb_hiwat = q->q_hiwat;
3042 (*qbpp)->qb_lowat = q->q_lowat;
3043 q->q_nband++;
3044 qbpp = &(*qbpp)->qb_next;
3045 }
3046 }
3047 qbp = q->q_bandp;
3048 i = mp->b_band;
3049 while (--i)
3050 qbp = qbp->qb_next;
3051 }
3052
3053 if ((mp->b_next = emp) != NULL) {
3054 if ((mp->b_prev = emp->b_prev) != NULL)
3055 emp->b_prev->b_next = mp;
3056 else
3057 q->q_first = mp;
3058 emp->b_prev = mp;
3059 } else {
3060 if ((mp->b_prev = q->q_last) != NULL)
3061 q->q_last->b_next = mp;
3062 else
3063 q->q_first = mp;
3064 q->q_last = mp;
3065 }
3066
3067 /* Get mblk and byte count for q_count accounting */
3068 bytecnt = mp_cont_len(mp, &mblkcnt);
3069
3070 if (qbp) { /* adjust qband pointers and count */
3071 if (!qbp->qb_first) {
3072 qbp->qb_first = mp;
3073 qbp->qb_last = mp;
3074 } else {
3075 if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3076 (mp->b_prev->b_band != mp->b_band)))
3077 qbp->qb_first = mp;
3078 else if (mp->b_next == NULL || (mp->b_next != NULL &&
3079 (mp->b_next->b_band != mp->b_band)))
3080 qbp->qb_last = mp;
3081 }
3082 qbp->qb_count += bytecnt;
3083 qbp->qb_mblkcnt += mblkcnt;
3084 if ((qbp->qb_count >= qbp->qb_hiwat) ||
3085 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3086 qbp->qb_flag |= QB_FULL;
3087 }
3088 } else {
3089 q->q_count += bytecnt;
3090 q->q_mblkcnt += mblkcnt;
3091 if ((q->q_count >= q->q_hiwat) ||
3092 (q->q_mblkcnt >= q->q_hiwat)) {
3093 q->q_flag |= QFULL;
3094 }
3095 }
3096
3097 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
3098
3099 if (canenable(q) && (q->q_flag & QWANTR))
3100 qenable_locked(q);
3101
3102 ASSERT(MUTEX_HELD(QLOCK(q)));
3103 if (freezer != curthread)
3104 mutex_exit(QLOCK(q));
3105
3106 return (1);
3107 }
3108
3109 /*
3110 * Create and put a control message on queue.
3111 */
3112 int
putctl(queue_t * q,int type)3113 putctl(queue_t *q, int type)
3114 {
3115 mblk_t *bp;
3116
3117 if ((datamsg(type) && (type != M_DELAY)) ||
3118 (bp = allocb_tryhard(0)) == NULL)
3119 return (0);
3120 bp->b_datap->db_type = (unsigned char) type;
3121
3122 put(q, bp);
3123
3124 return (1);
3125 }
3126
3127 /*
3128 * Control message with a single-byte parameter
3129 */
3130 int
putctl1(queue_t * q,int type,int param)3131 putctl1(queue_t *q, int type, int param)
3132 {
3133 mblk_t *bp;
3134
3135 if ((datamsg(type) && (type != M_DELAY)) ||
3136 (bp = allocb_tryhard(1)) == NULL)
3137 return (0);
3138 bp->b_datap->db_type = (unsigned char)type;
3139 *bp->b_wptr++ = (unsigned char)param;
3140
3141 put(q, bp);
3142
3143 return (1);
3144 }
3145
3146 int
putnextctl1(queue_t * q,int type,int param)3147 putnextctl1(queue_t *q, int type, int param)
3148 {
3149 mblk_t *bp;
3150
3151 if ((datamsg(type) && (type != M_DELAY)) ||
3152 ((bp = allocb_tryhard(1)) == NULL))
3153 return (0);
3154
3155 bp->b_datap->db_type = (unsigned char)type;
3156 *bp->b_wptr++ = (unsigned char)param;
3157
3158 putnext(q, bp);
3159
3160 return (1);
3161 }
3162
3163 int
putnextctl(queue_t * q,int type)3164 putnextctl(queue_t *q, int type)
3165 {
3166 mblk_t *bp;
3167
3168 if ((datamsg(type) && (type != M_DELAY)) ||
3169 ((bp = allocb_tryhard(0)) == NULL))
3170 return (0);
3171 bp->b_datap->db_type = (unsigned char)type;
3172
3173 putnext(q, bp);
3174
3175 return (1);
3176 }
3177
3178 /*
3179 * Return the queue upstream from this one
3180 */
3181 queue_t *
backq(queue_t * q)3182 backq(queue_t *q)
3183 {
3184 q = _OTHERQ(q);
3185 if (q->q_next) {
3186 q = q->q_next;
3187 return (_OTHERQ(q));
3188 }
3189 return (NULL);
3190 }
3191
3192 /*
3193 * Send a block back up the queue in reverse from this
3194 * one (e.g. to respond to ioctls)
3195 */
3196 void
qreply(queue_t * q,mblk_t * bp)3197 qreply(queue_t *q, mblk_t *bp)
3198 {
3199 ASSERT(q && bp);
3200
3201 putnext(_OTHERQ(q), bp);
3202 }
3203
3204 /*
3205 * Streams Queue Scheduling
3206 *
3207 * Queues are enabled through qenable() when they have messages to
3208 * process. They are serviced by queuerun(), which runs each enabled
3209 * queue's service procedure. The call to queuerun() is processor
3210 * dependent - the general principle is that it be run whenever a queue
3211 * is enabled but before returning to user level. For system calls,
3212 * the function runqueues() is called if their action causes a queue
3213 * to be enabled. For device interrupts, queuerun() should be
3214 * called before returning from the last level of interrupt. Beyond
3215 * this, no timing assumptions should be made about queue scheduling.
3216 */
3217
3218 /*
3219 * Enable a queue: put it on list of those whose service procedures are
3220 * ready to run and set up the scheduling mechanism.
3221 * The broadcast is done outside the mutex -> to avoid the woken thread
3222 * from contending with the mutex. This is OK 'cos the queue has been
3223 * enqueued on the runlist and flagged safely at this point.
3224 */
3225 void
qenable(queue_t * q)3226 qenable(queue_t *q)
3227 {
3228 mutex_enter(QLOCK(q));
3229 qenable_locked(q);
3230 mutex_exit(QLOCK(q));
3231 }
3232 /*
3233 * Return number of messages on queue
3234 */
3235 int
qsize(queue_t * qp)3236 qsize(queue_t *qp)
3237 {
3238 int count = 0;
3239 mblk_t *mp;
3240
3241 mutex_enter(QLOCK(qp));
3242 for (mp = qp->q_first; mp; mp = mp->b_next)
3243 count++;
3244 mutex_exit(QLOCK(qp));
3245 return (count);
3246 }
3247
3248 /*
3249 * noenable - set queue so that putq() will not enable it.
3250 * enableok - set queue so that putq() can enable it.
3251 */
3252 void
noenable(queue_t * q)3253 noenable(queue_t *q)
3254 {
3255 mutex_enter(QLOCK(q));
3256 q->q_flag |= QNOENB;
3257 mutex_exit(QLOCK(q));
3258 }
3259
3260 void
enableok(queue_t * q)3261 enableok(queue_t *q)
3262 {
3263 mutex_enter(QLOCK(q));
3264 q->q_flag &= ~QNOENB;
3265 mutex_exit(QLOCK(q));
3266 }
3267
3268 /*
3269 * Set queue fields.
3270 */
3271 int
strqset(queue_t * q,qfields_t what,unsigned char pri,intptr_t val)3272 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3273 {
3274 qband_t *qbp = NULL;
3275 queue_t *wrq;
3276 int error = 0;
3277 kthread_id_t freezer;
3278
3279 freezer = STREAM(q)->sd_freezer;
3280 if (freezer == curthread) {
3281 ASSERT(frozenstr(q));
3282 ASSERT(MUTEX_HELD(QLOCK(q)));
3283 } else
3284 mutex_enter(QLOCK(q));
3285
3286 if (what >= QBAD) {
3287 error = EINVAL;
3288 goto done;
3289 }
3290 if (pri != 0) {
3291 int i;
3292 qband_t **qbpp;
3293
3294 if (pri > q->q_nband) {
3295 qbpp = &q->q_bandp;
3296 while (*qbpp)
3297 qbpp = &(*qbpp)->qb_next;
3298 while (pri > q->q_nband) {
3299 if ((*qbpp = allocband()) == NULL) {
3300 error = EAGAIN;
3301 goto done;
3302 }
3303 (*qbpp)->qb_hiwat = q->q_hiwat;
3304 (*qbpp)->qb_lowat = q->q_lowat;
3305 q->q_nband++;
3306 qbpp = &(*qbpp)->qb_next;
3307 }
3308 }
3309 qbp = q->q_bandp;
3310 i = pri;
3311 while (--i)
3312 qbp = qbp->qb_next;
3313 }
3314 switch (what) {
3315
3316 case QHIWAT:
3317 if (qbp)
3318 qbp->qb_hiwat = (size_t)val;
3319 else
3320 q->q_hiwat = (size_t)val;
3321 break;
3322
3323 case QLOWAT:
3324 if (qbp)
3325 qbp->qb_lowat = (size_t)val;
3326 else
3327 q->q_lowat = (size_t)val;
3328 break;
3329
3330 case QMAXPSZ:
3331 if (qbp)
3332 error = EINVAL;
3333 else
3334 q->q_maxpsz = (ssize_t)val;
3335
3336 /*
3337 * Performance concern, strwrite looks at the module below
3338 * the stream head for the maxpsz each time it does a write
3339 * we now cache it at the stream head. Check to see if this
3340 * queue is sitting directly below the stream head.
3341 */
3342 wrq = STREAM(q)->sd_wrq;
3343 if (q != wrq->q_next)
3344 break;
3345
3346 /*
3347 * If the stream is not frozen drop the current QLOCK and
3348 * acquire the sd_wrq QLOCK which protects sd_qn_*
3349 */
3350 if (freezer != curthread) {
3351 mutex_exit(QLOCK(q));
3352 mutex_enter(QLOCK(wrq));
3353 }
3354 ASSERT(MUTEX_HELD(QLOCK(wrq)));
3355
3356 if (strmsgsz != 0) {
3357 if (val == INFPSZ)
3358 val = strmsgsz;
3359 else {
3360 if (STREAM(q)->sd_vnode->v_type == VFIFO)
3361 val = MIN(PIPE_BUF, val);
3362 else
3363 val = MIN(strmsgsz, val);
3364 }
3365 }
3366 STREAM(q)->sd_qn_maxpsz = val;
3367 if (freezer != curthread) {
3368 mutex_exit(QLOCK(wrq));
3369 mutex_enter(QLOCK(q));
3370 }
3371 break;
3372
3373 case QMINPSZ:
3374 if (qbp)
3375 error = EINVAL;
3376 else
3377 q->q_minpsz = (ssize_t)val;
3378
3379 /*
3380 * Performance concern, strwrite looks at the module below
3381 * the stream head for the maxpsz each time it does a write
3382 * we now cache it at the stream head. Check to see if this
3383 * queue is sitting directly below the stream head.
3384 */
3385 wrq = STREAM(q)->sd_wrq;
3386 if (q != wrq->q_next)
3387 break;
3388
3389 /*
3390 * If the stream is not frozen drop the current QLOCK and
3391 * acquire the sd_wrq QLOCK which protects sd_qn_*
3392 */
3393 if (freezer != curthread) {
3394 mutex_exit(QLOCK(q));
3395 mutex_enter(QLOCK(wrq));
3396 }
3397 STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3398
3399 if (freezer != curthread) {
3400 mutex_exit(QLOCK(wrq));
3401 mutex_enter(QLOCK(q));
3402 }
3403 break;
3404
3405 case QSTRUIOT:
3406 if (qbp)
3407 error = EINVAL;
3408 else
3409 q->q_struiot = (ushort_t)val;
3410 break;
3411
3412 case QCOUNT:
3413 case QFIRST:
3414 case QLAST:
3415 case QFLAG:
3416 error = EPERM;
3417 break;
3418
3419 default:
3420 error = EINVAL;
3421 break;
3422 }
3423 done:
3424 if (freezer != curthread)
3425 mutex_exit(QLOCK(q));
3426 return (error);
3427 }
3428
3429 /*
3430 * Get queue fields.
3431 */
3432 int
strqget(queue_t * q,qfields_t what,unsigned char pri,void * valp)3433 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3434 {
3435 qband_t *qbp = NULL;
3436 int error = 0;
3437 kthread_id_t freezer;
3438
3439 freezer = STREAM(q)->sd_freezer;
3440 if (freezer == curthread) {
3441 ASSERT(frozenstr(q));
3442 ASSERT(MUTEX_HELD(QLOCK(q)));
3443 } else
3444 mutex_enter(QLOCK(q));
3445 if (what >= QBAD) {
3446 error = EINVAL;
3447 goto done;
3448 }
3449 if (pri != 0) {
3450 int i;
3451 qband_t **qbpp;
3452
3453 if (pri > q->q_nband) {
3454 qbpp = &q->q_bandp;
3455 while (*qbpp)
3456 qbpp = &(*qbpp)->qb_next;
3457 while (pri > q->q_nband) {
3458 if ((*qbpp = allocband()) == NULL) {
3459 error = EAGAIN;
3460 goto done;
3461 }
3462 (*qbpp)->qb_hiwat = q->q_hiwat;
3463 (*qbpp)->qb_lowat = q->q_lowat;
3464 q->q_nband++;
3465 qbpp = &(*qbpp)->qb_next;
3466 }
3467 }
3468 qbp = q->q_bandp;
3469 i = pri;
3470 while (--i)
3471 qbp = qbp->qb_next;
3472 }
3473 switch (what) {
3474 case QHIWAT:
3475 if (qbp)
3476 *(size_t *)valp = qbp->qb_hiwat;
3477 else
3478 *(size_t *)valp = q->q_hiwat;
3479 break;
3480
3481 case QLOWAT:
3482 if (qbp)
3483 *(size_t *)valp = qbp->qb_lowat;
3484 else
3485 *(size_t *)valp = q->q_lowat;
3486 break;
3487
3488 case QMAXPSZ:
3489 if (qbp)
3490 error = EINVAL;
3491 else
3492 *(ssize_t *)valp = q->q_maxpsz;
3493 break;
3494
3495 case QMINPSZ:
3496 if (qbp)
3497 error = EINVAL;
3498 else
3499 *(ssize_t *)valp = q->q_minpsz;
3500 break;
3501
3502 case QCOUNT:
3503 if (qbp)
3504 *(size_t *)valp = qbp->qb_count;
3505 else
3506 *(size_t *)valp = q->q_count;
3507 break;
3508
3509 case QFIRST:
3510 if (qbp)
3511 *(mblk_t **)valp = qbp->qb_first;
3512 else
3513 *(mblk_t **)valp = q->q_first;
3514 break;
3515
3516 case QLAST:
3517 if (qbp)
3518 *(mblk_t **)valp = qbp->qb_last;
3519 else
3520 *(mblk_t **)valp = q->q_last;
3521 break;
3522
3523 case QFLAG:
3524 if (qbp)
3525 *(uint_t *)valp = qbp->qb_flag;
3526 else
3527 *(uint_t *)valp = q->q_flag;
3528 break;
3529
3530 case QSTRUIOT:
3531 if (qbp)
3532 error = EINVAL;
3533 else
3534 *(short *)valp = q->q_struiot;
3535 break;
3536
3537 default:
3538 error = EINVAL;
3539 break;
3540 }
3541 done:
3542 if (freezer != curthread)
3543 mutex_exit(QLOCK(q));
3544 return (error);
3545 }
3546
3547 /*
3548 * Function awakes all in cvwait/sigwait/pollwait, on one of:
3549 * QWANTWSYNC or QWANTR or QWANTW,
3550 *
3551 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3552 * deferred wakeup will be done. Also if strpoll() in progress then a
3553 * deferred pollwakeup will be done.
3554 */
3555 void
strwakeq(queue_t * q,int flag)3556 strwakeq(queue_t *q, int flag)
3557 {
3558 stdata_t *stp = STREAM(q);
3559 pollhead_t *pl;
3560
3561 mutex_enter(&stp->sd_lock);
3562 pl = &stp->sd_pollist;
3563 if (flag & QWANTWSYNC) {
3564 ASSERT(!(q->q_flag & QREADR));
3565 if (stp->sd_flag & WSLEEP) {
3566 stp->sd_flag &= ~WSLEEP;
3567 cv_broadcast(&stp->sd_wrq->q_wait);
3568 } else {
3569 stp->sd_wakeq |= WSLEEP;
3570 }
3571
3572 mutex_exit(&stp->sd_lock);
3573 pollwakeup(pl, POLLWRNORM);
3574 mutex_enter(&stp->sd_lock);
3575
3576 if (stp->sd_sigflags & S_WRNORM)
3577 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3578 } else if (flag & QWANTR) {
3579 if (stp->sd_flag & RSLEEP) {
3580 stp->sd_flag &= ~RSLEEP;
3581 cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3582 } else {
3583 stp->sd_wakeq |= RSLEEP;
3584 }
3585
3586 mutex_exit(&stp->sd_lock);
3587 pollwakeup(pl, POLLIN | POLLRDNORM);
3588 mutex_enter(&stp->sd_lock);
3589
3590 {
3591 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3592
3593 if (events)
3594 strsendsig(stp->sd_siglist, events, 0, 0);
3595 }
3596 } else {
3597 if (stp->sd_flag & WSLEEP) {
3598 stp->sd_flag &= ~WSLEEP;
3599 cv_broadcast(&stp->sd_wrq->q_wait);
3600 }
3601
3602 mutex_exit(&stp->sd_lock);
3603 pollwakeup(pl, POLLWRNORM);
3604 mutex_enter(&stp->sd_lock);
3605
3606 if (stp->sd_sigflags & S_WRNORM)
3607 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3608 }
3609 mutex_exit(&stp->sd_lock);
3610 }
3611
3612 int
struioget(queue_t * q,mblk_t * mp,struiod_t * dp,int noblock)3613 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3614 {
3615 stdata_t *stp = STREAM(q);
3616 int typ = STRUIOT_STANDARD;
3617 uio_t *uiop = &dp->d_uio;
3618 dblk_t *dbp;
3619 ssize_t uiocnt;
3620 ssize_t cnt;
3621 unsigned char *ptr;
3622 ssize_t resid;
3623 int error = 0;
3624 on_trap_data_t otd;
3625 queue_t *stwrq;
3626
3627 /*
3628 * Plumbing may change while taking the type so store the
3629 * queue in a temporary variable. It doesn't matter even
3630 * if the we take the type from the previous plumbing,
3631 * that's because if the plumbing has changed when we were
3632 * holding the queue in a temporary variable, we can continue
3633 * processing the message the way it would have been processed
3634 * in the old plumbing, without any side effects but a bit
3635 * extra processing for partial ip header checksum.
3636 *
3637 * This has been done to avoid holding the sd_lock which is
3638 * very hot.
3639 */
3640
3641 stwrq = stp->sd_struiowrq;
3642 if (stwrq)
3643 typ = stwrq->q_struiot;
3644
3645 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3646 dbp = mp->b_datap;
3647 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3648 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3649 cnt = MIN(uiocnt, uiop->uio_resid);
3650 if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3651 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3652 /*
3653 * Either this mblk has already been processed
3654 * or there is no more room in this mblk (?).
3655 */
3656 continue;
3657 }
3658 switch (typ) {
3659 case STRUIOT_STANDARD:
3660 if (noblock) {
3661 if (on_trap(&otd, OT_DATA_ACCESS)) {
3662 no_trap();
3663 error = EWOULDBLOCK;
3664 goto out;
3665 }
3666 }
3667 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3668 if (noblock)
3669 no_trap();
3670 goto out;
3671 }
3672 if (noblock)
3673 no_trap();
3674 break;
3675
3676 default:
3677 error = EIO;
3678 goto out;
3679 }
3680 dbp->db_struioflag |= STRUIO_DONE;
3681 dbp->db_cksumstuff += cnt;
3682 }
3683 out:
3684 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3685 /*
3686 * A fault has occured and some bytes were moved to the
3687 * current mblk, the uio_t has already been updated by
3688 * the appropriate uio routine, so also update the mblk
3689 * to reflect this in case this same mblk chain is used
3690 * again (after the fault has been handled).
3691 */
3692 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3693 if (uiocnt >= resid)
3694 dbp->db_cksumstuff += resid;
3695 }
3696 return (error);
3697 }
3698
3699 /*
3700 * Try to enter queue synchronously. Any attempt to enter a closing queue will
3701 * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3702 * that removeq() will not try to close the queue while a thread is inside the
3703 * queue.
3704 */
3705 static boolean_t
rwnext_enter(queue_t * qp)3706 rwnext_enter(queue_t *qp)
3707 {
3708 mutex_enter(QLOCK(qp));
3709 if (qp->q_flag & QWCLOSE) {
3710 mutex_exit(QLOCK(qp));
3711 return (B_FALSE);
3712 }
3713 qp->q_rwcnt++;
3714 ASSERT(qp->q_rwcnt != 0);
3715 mutex_exit(QLOCK(qp));
3716 return (B_TRUE);
3717 }
3718
3719 /*
3720 * Decrease the count of threads running in sync stream queue and wake up any
3721 * threads blocked in removeq().
3722 */
3723 static void
rwnext_exit(queue_t * qp)3724 rwnext_exit(queue_t *qp)
3725 {
3726 mutex_enter(QLOCK(qp));
3727 qp->q_rwcnt--;
3728 if (qp->q_flag & QWANTRMQSYNC) {
3729 qp->q_flag &= ~QWANTRMQSYNC;
3730 cv_broadcast(&qp->q_wait);
3731 }
3732 mutex_exit(QLOCK(qp));
3733 }
3734
3735 /*
3736 * The purpose of rwnext() is to call the rw procedure of the next
3737 * (downstream) modules queue.
3738 *
3739 * treated as put entrypoint for perimeter syncronization.
3740 *
3741 * There's no need to grab sq_putlocks here (which only exist for CIPUT
3742 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3743 * not matter if any regular put entrypoints have been already entered. We
3744 * can't increment one of the sq_putcounts (instead of sq_count) because
3745 * qwait_rw won't know which counter to decrement.
3746 *
3747 * It would be reasonable to add the lockless FASTPUT logic.
3748 */
3749 int
rwnext(queue_t * qp,struiod_t * dp)3750 rwnext(queue_t *qp, struiod_t *dp)
3751 {
3752 queue_t *nqp;
3753 syncq_t *sq;
3754 uint16_t count;
3755 uint16_t flags;
3756 struct qinit *qi;
3757 int (*proc)();
3758 struct stdata *stp;
3759 int isread;
3760 int rval;
3761
3762 stp = STREAM(qp);
3763 /*
3764 * Prevent q_next from changing by holding sd_lock until acquiring
3765 * SQLOCK. Note that a read-side rwnext from the streamhead will
3766 * already have sd_lock acquired. In either case sd_lock is always
3767 * released after acquiring SQLOCK.
3768 *
3769 * The streamhead read-side holding sd_lock when calling rwnext is
3770 * required to prevent a race condition were M_DATA mblks flowing
3771 * up the read-side of the stream could be bypassed by a rwnext()
3772 * down-call. In this case sd_lock acts as the streamhead perimeter.
3773 */
3774 if ((nqp = _WR(qp)) == qp) {
3775 isread = 0;
3776 mutex_enter(&stp->sd_lock);
3777 qp = nqp->q_next;
3778 } else {
3779 isread = 1;
3780 if (nqp != stp->sd_wrq)
3781 /* Not streamhead */
3782 mutex_enter(&stp->sd_lock);
3783 qp = _RD(nqp->q_next);
3784 }
3785 qi = qp->q_qinfo;
3786 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3787 /*
3788 * Not a synchronous module or no r/w procedure for this
3789 * queue, so just return EINVAL and let the caller handle it.
3790 */
3791 mutex_exit(&stp->sd_lock);
3792 return (EINVAL);
3793 }
3794
3795 if (rwnext_enter(qp) == B_FALSE) {
3796 mutex_exit(&stp->sd_lock);
3797 return (EINVAL);
3798 }
3799
3800 sq = qp->q_syncq;
3801 mutex_enter(SQLOCK(sq));
3802 mutex_exit(&stp->sd_lock);
3803 count = sq->sq_count;
3804 flags = sq->sq_flags;
3805 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3806
3807 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3808 /*
3809 * if this queue is being closed, return.
3810 */
3811 if (qp->q_flag & QWCLOSE) {
3812 mutex_exit(SQLOCK(sq));
3813 rwnext_exit(qp);
3814 return (EINVAL);
3815 }
3816
3817 /*
3818 * Wait until we can enter the inner perimeter.
3819 */
3820 sq->sq_flags = flags | SQ_WANTWAKEUP;
3821 cv_wait(&sq->sq_wait, SQLOCK(sq));
3822 count = sq->sq_count;
3823 flags = sq->sq_flags;
3824 }
3825
3826 if (isread == 0 && stp->sd_struiowrq == NULL ||
3827 isread == 1 && stp->sd_struiordq == NULL) {
3828 /*
3829 * Stream plumbing changed while waiting for inner perimeter
3830 * so just return EINVAL and let the caller handle it.
3831 */
3832 mutex_exit(SQLOCK(sq));
3833 rwnext_exit(qp);
3834 return (EINVAL);
3835 }
3836 if (!(flags & SQ_CIPUT))
3837 sq->sq_flags = flags | SQ_EXCL;
3838 sq->sq_count = count + 1;
3839 ASSERT(sq->sq_count != 0); /* Wraparound */
3840 /*
3841 * Note: The only message ordering guarantee that rwnext() makes is
3842 * for the write queue flow-control case. All others (r/w queue
3843 * with q_count > 0 (or q_first != 0)) are the resposibilty of
3844 * the queue's rw procedure. This could be genralized here buy
3845 * running the queue's service procedure, but that wouldn't be
3846 * the most efficent for all cases.
3847 */
3848 mutex_exit(SQLOCK(sq));
3849 if (! isread && (qp->q_flag & QFULL)) {
3850 /*
3851 * Write queue may be flow controlled. If so,
3852 * mark the queue for wakeup when it's not.
3853 */
3854 mutex_enter(QLOCK(qp));
3855 if (qp->q_flag & QFULL) {
3856 qp->q_flag |= QWANTWSYNC;
3857 mutex_exit(QLOCK(qp));
3858 rval = EWOULDBLOCK;
3859 goto out;
3860 }
3861 mutex_exit(QLOCK(qp));
3862 }
3863
3864 if (! isread && dp->d_mp)
3865 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3866 dp->d_mp->b_datap->db_base);
3867
3868 rval = (*proc)(qp, dp);
3869
3870 if (isread && dp->d_mp)
3871 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3872 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3873 out:
3874 /*
3875 * The queue is protected from being freed by sq_count, so it is
3876 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3877 */
3878 rwnext_exit(qp);
3879
3880 mutex_enter(SQLOCK(sq));
3881 flags = sq->sq_flags;
3882 ASSERT(sq->sq_count != 0);
3883 sq->sq_count--;
3884 if (flags & SQ_TAIL) {
3885 putnext_tail(sq, qp, flags);
3886 /*
3887 * The only purpose of this ASSERT is to preserve calling stack
3888 * in DEBUG kernel.
3889 */
3890 ASSERT(flags & SQ_TAIL);
3891 return (rval);
3892 }
3893 ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3894 /*
3895 * Safe to always drop SQ_EXCL:
3896 * Not SQ_CIPUT means we set SQ_EXCL above
3897 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3898 * did a qwriter(INNER) in which case nobody else
3899 * is in the inner perimeter and we are exiting.
3900 *
3901 * I would like to make the following assertion:
3902 *
3903 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3904 * sq->sq_count == 0);
3905 *
3906 * which indicates that if we are both putshared and exclusive,
3907 * we became exclusive while executing the putproc, and the only
3908 * claim on the syncq was the one we dropped a few lines above.
3909 * But other threads that enter putnext while the syncq is exclusive
3910 * need to make a claim as they may need to drop SQLOCK in the
3911 * has_writers case to avoid deadlocks. If these threads are
3912 * delayed or preempted, it is possible that the writer thread can
3913 * find out that there are other claims making the (sq_count == 0)
3914 * test invalid.
3915 */
3916
3917 sq->sq_flags = flags & ~SQ_EXCL;
3918 if (sq->sq_flags & SQ_WANTWAKEUP) {
3919 sq->sq_flags &= ~SQ_WANTWAKEUP;
3920 cv_broadcast(&sq->sq_wait);
3921 }
3922 mutex_exit(SQLOCK(sq));
3923 return (rval);
3924 }
3925
3926 /*
3927 * The purpose of infonext() is to call the info procedure of the next
3928 * (downstream) modules queue.
3929 *
3930 * treated as put entrypoint for perimeter syncronization.
3931 *
3932 * There's no need to grab sq_putlocks here (which only exist for CIPUT
3933 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3934 * it does not matter if any regular put entrypoints have been already
3935 * entered.
3936 */
3937 int
infonext(queue_t * qp,infod_t * idp)3938 infonext(queue_t *qp, infod_t *idp)
3939 {
3940 queue_t *nqp;
3941 syncq_t *sq;
3942 uint16_t count;
3943 uint16_t flags;
3944 struct qinit *qi;
3945 int (*proc)();
3946 struct stdata *stp;
3947 int rval;
3948
3949 stp = STREAM(qp);
3950 /*
3951 * Prevent q_next from changing by holding sd_lock until
3952 * acquiring SQLOCK.
3953 */
3954 mutex_enter(&stp->sd_lock);
3955 if ((nqp = _WR(qp)) == qp) {
3956 qp = nqp->q_next;
3957 } else {
3958 qp = _RD(nqp->q_next);
3959 }
3960 qi = qp->q_qinfo;
3961 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3962 mutex_exit(&stp->sd_lock);
3963 return (EINVAL);
3964 }
3965 sq = qp->q_syncq;
3966 mutex_enter(SQLOCK(sq));
3967 mutex_exit(&stp->sd_lock);
3968 count = sq->sq_count;
3969 flags = sq->sq_flags;
3970 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3971
3972 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3973 /*
3974 * Wait until we can enter the inner perimeter.
3975 */
3976 sq->sq_flags = flags | SQ_WANTWAKEUP;
3977 cv_wait(&sq->sq_wait, SQLOCK(sq));
3978 count = sq->sq_count;
3979 flags = sq->sq_flags;
3980 }
3981
3982 if (! (flags & SQ_CIPUT))
3983 sq->sq_flags = flags | SQ_EXCL;
3984 sq->sq_count = count + 1;
3985 ASSERT(sq->sq_count != 0); /* Wraparound */
3986 mutex_exit(SQLOCK(sq));
3987
3988 rval = (*proc)(qp, idp);
3989
3990 mutex_enter(SQLOCK(sq));
3991 flags = sq->sq_flags;
3992 ASSERT(sq->sq_count != 0);
3993 sq->sq_count--;
3994 if (flags & SQ_TAIL) {
3995 putnext_tail(sq, qp, flags);
3996 /*
3997 * The only purpose of this ASSERT is to preserve calling stack
3998 * in DEBUG kernel.
3999 */
4000 ASSERT(flags & SQ_TAIL);
4001 return (rval);
4002 }
4003 ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
4004 /*
4005 * XXXX
4006 * I am not certain the next comment is correct here. I need to consider
4007 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
4008 * might cause other problems. It just might be safer to drop it if
4009 * !SQ_CIPUT because that is when we set it.
4010 */
4011 /*
4012 * Safe to always drop SQ_EXCL:
4013 * Not SQ_CIPUT means we set SQ_EXCL above
4014 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4015 * did a qwriter(INNER) in which case nobody else
4016 * is in the inner perimeter and we are exiting.
4017 *
4018 * I would like to make the following assertion:
4019 *
4020 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4021 * sq->sq_count == 0);
4022 *
4023 * which indicates that if we are both putshared and exclusive,
4024 * we became exclusive while executing the putproc, and the only
4025 * claim on the syncq was the one we dropped a few lines above.
4026 * But other threads that enter putnext while the syncq is exclusive
4027 * need to make a claim as they may need to drop SQLOCK in the
4028 * has_writers case to avoid deadlocks. If these threads are
4029 * delayed or preempted, it is possible that the writer thread can
4030 * find out that there are other claims making the (sq_count == 0)
4031 * test invalid.
4032 */
4033
4034 sq->sq_flags = flags & ~SQ_EXCL;
4035 mutex_exit(SQLOCK(sq));
4036 return (rval);
4037 }
4038
4039 /*
4040 * Return nonzero if the queue is responsible for struio(), else return 0.
4041 */
4042 int
isuioq(queue_t * q)4043 isuioq(queue_t *q)
4044 {
4045 if (q->q_flag & QREADR)
4046 return (STREAM(q)->sd_struiordq == q);
4047 else
4048 return (STREAM(q)->sd_struiowrq == q);
4049 }
4050
4051 #if defined(__sparc)
4052 int disable_putlocks = 0;
4053 #else
4054 int disable_putlocks = 1;
4055 #endif
4056
4057 /*
4058 * called by create_putlock.
4059 */
4060 static void
create_syncq_putlocks(queue_t * q)4061 create_syncq_putlocks(queue_t *q)
4062 {
4063 syncq_t *sq = q->q_syncq;
4064 ciputctrl_t *cip;
4065 int i;
4066
4067 ASSERT(sq != NULL);
4068
4069 ASSERT(disable_putlocks == 0);
4070 ASSERT(n_ciputctrl >= min_n_ciputctrl);
4071 ASSERT(ciputctrl_cache != NULL);
4072
4073 if (!(sq->sq_type & SQ_CIPUT))
4074 return;
4075
4076 for (i = 0; i <= 1; i++) {
4077 if (sq->sq_ciputctrl == NULL) {
4078 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4079 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4080 mutex_enter(SQLOCK(sq));
4081 if (sq->sq_ciputctrl != NULL) {
4082 mutex_exit(SQLOCK(sq));
4083 kmem_cache_free(ciputctrl_cache, cip);
4084 } else {
4085 ASSERT(sq->sq_nciputctrl == 0);
4086 sq->sq_nciputctrl = n_ciputctrl - 1;
4087 /*
4088 * putnext checks sq_ciputctrl without holding
4089 * SQLOCK. if it is not NULL putnext assumes
4090 * sq_nciputctrl is initialized. membar below
4091 * insures that.
4092 */
4093 membar_producer();
4094 sq->sq_ciputctrl = cip;
4095 mutex_exit(SQLOCK(sq));
4096 }
4097 }
4098 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4099 if (i == 1)
4100 break;
4101 q = _OTHERQ(q);
4102 if (!(q->q_flag & QPERQ)) {
4103 ASSERT(sq == q->q_syncq);
4104 break;
4105 }
4106 ASSERT(q->q_syncq != NULL);
4107 ASSERT(sq != q->q_syncq);
4108 sq = q->q_syncq;
4109 ASSERT(sq->sq_type & SQ_CIPUT);
4110 }
4111 }
4112
4113 /*
4114 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4115 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4116 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4117 * starting from q and down to the driver.
4118 *
4119 * This should be called after the affected queues are part of stream
4120 * geometry. It should be called from driver/module open routine after
4121 * qprocson() call. It is also called from nfs syscall where it is known that
4122 * stream is configured and won't change its geometry during create_putlock
4123 * call.
4124 *
4125 * caller normally uses 0 value for the stream argument to speed up MT putnext
4126 * into the perimeter of q for example because its perimeter is per module
4127 * (e.g. IP).
4128 *
4129 * caller normally uses non 0 value for the stream argument to hint the system
4130 * that the stream of q is a very contended global system stream
4131 * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4132 * particularly MT hot.
4133 *
4134 * Caller insures stream plumbing won't happen while we are here and therefore
4135 * q_next can be safely used.
4136 */
4137
4138 void
create_putlocks(queue_t * q,int stream)4139 create_putlocks(queue_t *q, int stream)
4140 {
4141 ciputctrl_t *cip;
4142 struct stdata *stp = STREAM(q);
4143
4144 q = _WR(q);
4145 ASSERT(stp != NULL);
4146
4147 if (disable_putlocks != 0)
4148 return;
4149
4150 if (n_ciputctrl < min_n_ciputctrl)
4151 return;
4152
4153 ASSERT(ciputctrl_cache != NULL);
4154
4155 if (stream != 0 && stp->sd_ciputctrl == NULL) {
4156 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4157 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4158 mutex_enter(&stp->sd_lock);
4159 if (stp->sd_ciputctrl != NULL) {
4160 mutex_exit(&stp->sd_lock);
4161 kmem_cache_free(ciputctrl_cache, cip);
4162 } else {
4163 ASSERT(stp->sd_nciputctrl == 0);
4164 stp->sd_nciputctrl = n_ciputctrl - 1;
4165 /*
4166 * putnext checks sd_ciputctrl without holding
4167 * sd_lock. if it is not NULL putnext assumes
4168 * sd_nciputctrl is initialized. membar below
4169 * insures that.
4170 */
4171 membar_producer();
4172 stp->sd_ciputctrl = cip;
4173 mutex_exit(&stp->sd_lock);
4174 }
4175 }
4176
4177 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4178
4179 while (_SAMESTR(q)) {
4180 create_syncq_putlocks(q);
4181 if (stream == 0)
4182 return;
4183 q = q->q_next;
4184 }
4185 ASSERT(q != NULL);
4186 create_syncq_putlocks(q);
4187 }
4188
4189 /*
4190 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4191 * through a stream.
4192 *
4193 * Data currently record per-event is a timestamp, module/driver name,
4194 * downstream module/driver name, optional callstack, event type and a per
4195 * type datum. Much of the STREAMS framework is instrumented for automatic
4196 * flow tracing (when enabled). Events can be defined and used by STREAMS
4197 * modules and drivers.
4198 *
4199 * Global objects:
4200 *
4201 * str_ftevent() - Add a flow-trace event to a dblk.
4202 * str_ftfree() - Free flow-trace data
4203 *
4204 * Local objects:
4205 *
4206 * fthdr_cache - pointer to the kmem cache for trace header.
4207 * ftblk_cache - pointer to the kmem cache for trace data blocks.
4208 */
4209
4210 int str_ftnever = 1; /* Don't do STREAMS flow tracing */
4211 int str_ftstack = 0; /* Don't record event call stacks */
4212
4213 void
str_ftevent(fthdr_t * hp,void * p,ushort_t evnt,ushort_t data)4214 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4215 {
4216 ftblk_t *bp = hp->tail;
4217 ftblk_t *nbp;
4218 ftevnt_t *ep;
4219 int ix, nix;
4220
4221 ASSERT(hp != NULL);
4222
4223 for (;;) {
4224 if ((ix = bp->ix) == FTBLK_EVNTS) {
4225 /*
4226 * Tail doesn't have room, so need a new tail.
4227 *
4228 * To make this MT safe, first, allocate a new
4229 * ftblk, and initialize it. To make life a
4230 * little easier, reserve the first slot (mostly
4231 * by making ix = 1). When we are finished with
4232 * the initialization, CAS this pointer to the
4233 * tail. If this succeeds, this is the new
4234 * "next" block. Otherwise, another thread
4235 * got here first, so free the block and start
4236 * again.
4237 */
4238 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4239 if (nbp == NULL) {
4240 /* no mem, so punt */
4241 str_ftnever++;
4242 /* free up all flow data? */
4243 return;
4244 }
4245 nbp->nxt = NULL;
4246 nbp->ix = 1;
4247 /*
4248 * Just in case there is another thread about
4249 * to get the next index, we need to make sure
4250 * the value is there for it.
4251 */
4252 membar_producer();
4253 if (casptr(&hp->tail, bp, nbp) == bp) {
4254 /* CAS was successful */
4255 bp->nxt = nbp;
4256 membar_producer();
4257 bp = nbp;
4258 ix = 0;
4259 goto cas_good;
4260 } else {
4261 kmem_cache_free(ftblk_cache, nbp);
4262 bp = hp->tail;
4263 continue;
4264 }
4265 }
4266 nix = ix + 1;
4267 if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
4268 cas_good:
4269 if (curthread != hp->thread) {
4270 hp->thread = curthread;
4271 evnt |= FTEV_CS;
4272 }
4273 if (CPU->cpu_seqid != hp->cpu_seqid) {
4274 hp->cpu_seqid = CPU->cpu_seqid;
4275 evnt |= FTEV_PS;
4276 }
4277 ep = &bp->ev[ix];
4278 break;
4279 }
4280 }
4281
4282 if (evnt & FTEV_QMASK) {
4283 queue_t *qp = p;
4284
4285 if (!(qp->q_flag & QREADR))
4286 evnt |= FTEV_ISWR;
4287
4288 ep->mid = Q2NAME(qp);
4289
4290 /*
4291 * We only record the next queue name for FTEV_PUTNEXT since
4292 * that's the only time we *really* need it, and the putnext()
4293 * code ensures that qp->q_next won't vanish. (We could use
4294 * claimstr()/releasestr() but at a performance cost.)
4295 */
4296 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4297 ep->midnext = Q2NAME(qp->q_next);
4298 else
4299 ep->midnext = NULL;
4300 } else {
4301 ep->mid = p;
4302 ep->midnext = NULL;
4303 }
4304
4305 if (ep->stk != NULL)
4306 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4307
4308 ep->ts = gethrtime();
4309 ep->evnt = evnt;
4310 ep->data = data;
4311 hp->hash = (hp->hash << 9) + hp->hash;
4312 hp->hash += (evnt << 16) | data;
4313 hp->hash += (uintptr_t)ep->mid;
4314 }
4315
4316 /*
4317 * Free flow-trace data.
4318 */
4319 void
str_ftfree(dblk_t * dbp)4320 str_ftfree(dblk_t *dbp)
4321 {
4322 fthdr_t *hp = dbp->db_fthdr;
4323 ftblk_t *bp = &hp->first;
4324 ftblk_t *nbp;
4325
4326 if (bp != hp->tail || bp->ix != 0) {
4327 /*
4328 * Clear out the hash, have the tail point to itself, and free
4329 * any continuation blocks.
4330 */
4331 bp = hp->first.nxt;
4332 hp->tail = &hp->first;
4333 hp->hash = 0;
4334 hp->first.nxt = NULL;
4335 hp->first.ix = 0;
4336 while (bp != NULL) {
4337 nbp = bp->nxt;
4338 kmem_cache_free(ftblk_cache, bp);
4339 bp = nbp;
4340 }
4341 }
4342 kmem_cache_free(fthdr_cache, hp);
4343 dbp->db_fthdr = NULL;
4344 }
4345