1 /* $NetBSD: subr_pcq.c,v 1.8 2014/06/09 12:44:06 rmind Exp $ */ 2 3 /*- 4 * Copyright (c) 2009 The NetBSD Foundation, Inc. 5 * All rights reserved. 6 * 7 * This code is derived from software contributed to The NetBSD Foundation 8 * by Andrew Doran. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 1. Redistributions of source code must retain the above copyright 14 * notice, this list of conditions and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 * POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32 /* 33 * Lockless producer/consumer queue. 34 */ 35 36 #include <sys/cdefs.h> 37 __KERNEL_RCSID(0, "$NetBSD: subr_pcq.c,v 1.8 2014/06/09 12:44:06 rmind Exp $"); 38 39 #include <sys/param.h> 40 #include <sys/types.h> 41 #include <sys/atomic.h> 42 #include <sys/kmem.h> 43 44 #include <sys/pcq.h> 45 46 /* 47 * Internal producer-consumer queue structure. Note: providing a separate 48 * cache-line both for pcq_t::pcq_pc and pcq_t::pcq_items. 49 */ 50 struct pcq { 51 u_int pcq_nitems; 52 uint8_t pcq_pad1[COHERENCY_UNIT - sizeof(u_int)]; 53 volatile uint32_t pcq_pc; 54 uint8_t pcq_pad2[COHERENCY_UNIT - sizeof(uint32_t)]; 55 void * volatile pcq_items[]; 56 }; 57 58 /* 59 * Producer (p) - stored in the lower 16 bits of pcq_t::pcq_pc. 60 * Consumer (c) - in the higher 16 bits. 61 * 62 * We have a limitation of 16 bits i.e. 0xffff items in the queue. 63 * The PCQ_MAXLEN constant is set accordingly. 64 */ 65 66 static inline void 67 pcq_split(uint32_t v, u_int *p, u_int *c) 68 { 69 70 *p = v & 0xffff; 71 *c = v >> 16; 72 } 73 74 static inline uint32_t 75 pcq_combine(u_int p, u_int c) 76 { 77 78 return p | (c << 16); 79 } 80 81 static inline u_int 82 pcq_advance(pcq_t *pcq, u_int pc) 83 { 84 85 if (__predict_false(++pc == pcq->pcq_nitems)) { 86 return 0; 87 } 88 return pc; 89 } 90 91 /* 92 * pcq_put: place an item at the end of the queue. 93 */ 94 bool 95 pcq_put(pcq_t *pcq, void *item) 96 { 97 uint32_t v, nv; 98 u_int op, p, c; 99 100 KASSERT(item != NULL); 101 102 do { 103 v = pcq->pcq_pc; 104 pcq_split(v, &op, &c); 105 p = pcq_advance(pcq, op); 106 if (p == c) { 107 /* Queue is full. */ 108 return false; 109 } 110 nv = pcq_combine(p, c); 111 } while (atomic_cas_32(&pcq->pcq_pc, v, nv) != v); 112 113 /* 114 * Ensure that the update to pcq_pc is globally visible before the 115 * data item. See pcq_get(). This also ensures that any changes 116 * that the caller made to the data item are globally visible 117 * before we put it onto the list. 118 */ 119 #ifndef __HAVE_ATOMIC_AS_MEMBAR 120 membar_producer(); 121 #endif 122 pcq->pcq_items[op] = item; 123 124 /* 125 * Synchronization activity to wake up the consumer will ensure 126 * that the update to pcq_items[] is visible before the wakeup 127 * arrives. So, we do not need an additonal memory barrier here. 128 */ 129 return true; 130 } 131 132 /* 133 * pcq_peek: return the next item from the queue without removal. 134 */ 135 void * 136 pcq_peek(pcq_t *pcq) 137 { 138 const uint32_t v = pcq->pcq_pc; 139 u_int p, c; 140 141 pcq_split(v, &p, &c); 142 143 /* See comment on race below in pcq_get(). */ 144 return (p == c) ? NULL : pcq->pcq_items[c]; 145 } 146 147 /* 148 * pcq_get: remove and return the next item for consumption or NULL if empty. 149 * 150 * => The caller must prevent concurrent gets from occuring. 151 */ 152 void * 153 pcq_get(pcq_t *pcq) 154 { 155 uint32_t v, nv; 156 u_int p, c; 157 void *item; 158 159 v = pcq->pcq_pc; 160 pcq_split(v, &p, &c); 161 if (p == c) { 162 /* Queue is empty: nothing to return. */ 163 return NULL; 164 } 165 item = pcq->pcq_items[c]; 166 if (item == NULL) { 167 /* 168 * Raced with sender: we rely on a notification (e.g. softint 169 * or wakeup) being generated after the producer's pcq_put(), 170 * causing us to retry pcq_get() later. 171 */ 172 return NULL; 173 } 174 pcq->pcq_items[c] = NULL; 175 c = pcq_advance(pcq, c); 176 nv = pcq_combine(p, c); 177 178 /* 179 * Ensure that update to pcq_items[] becomes globally visible 180 * before the update to pcq_pc. If it were reodered to occur 181 * after it, we could in theory wipe out a modification made 182 * to pcq_items[] by pcq_put(). 183 */ 184 #ifndef __HAVE_ATOMIC_AS_MEMBAR 185 membar_producer(); 186 #endif 187 while (__predict_false(atomic_cas_32(&pcq->pcq_pc, v, nv) != v)) { 188 v = pcq->pcq_pc; 189 pcq_split(v, &p, &c); 190 c = pcq_advance(pcq, c); 191 nv = pcq_combine(p, c); 192 } 193 return item; 194 } 195 196 pcq_t * 197 pcq_create(size_t nitems, km_flag_t kmflags) 198 { 199 pcq_t *pcq; 200 201 KASSERT(nitems > 0 || nitems <= PCQ_MAXLEN); 202 203 pcq = kmem_zalloc(offsetof(pcq_t, pcq_items[nitems]), kmflags); 204 if (pcq == NULL) { 205 return NULL; 206 } 207 pcq->pcq_nitems = nitems; 208 return pcq; 209 } 210 211 void 212 pcq_destroy(pcq_t *pcq) 213 { 214 215 kmem_free(pcq, offsetof(pcq_t, pcq_items[pcq->pcq_nitems])); 216 } 217 218 size_t 219 pcq_maxitems(pcq_t *pcq) 220 { 221 222 return pcq->pcq_nitems; 223 } 224