1 /* $NetBSD: subr_pcq.c,v 1.10 2018/02/08 09:05:20 dholland Exp $ */ 2 3 /*- 4 * Copyright (c) 2009 The NetBSD Foundation, Inc. 5 * All rights reserved. 6 * 7 * This code is derived from software contributed to The NetBSD Foundation 8 * by Andrew Doran. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 1. Redistributions of source code must retain the above copyright 14 * notice, this list of conditions and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 * POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32 /* 33 * Lockless producer/consumer queue. 34 */ 35 36 #include <sys/cdefs.h> 37 __KERNEL_RCSID(0, "$NetBSD: subr_pcq.c,v 1.10 2018/02/08 09:05:20 dholland Exp $"); 38 39 #include <sys/param.h> 40 #include <sys/types.h> 41 #include <sys/atomic.h> 42 #include <sys/kmem.h> 43 44 #include <sys/pcq.h> 45 46 /* 47 * Internal producer-consumer queue structure. Note: providing a separate 48 * cache-line both for pcq_t::pcq_pc and pcq_t::pcq_items. 49 */ 50 struct pcq { 51 u_int pcq_nitems; 52 uint8_t pcq_pad1[COHERENCY_UNIT - sizeof(u_int)]; 53 volatile uint32_t pcq_pc; 54 uint8_t pcq_pad2[COHERENCY_UNIT - sizeof(uint32_t)]; 55 void * volatile pcq_items[]; 56 }; 57 58 /* 59 * Producer (p) - stored in the lower 16 bits of pcq_t::pcq_pc. 60 * Consumer (c) - in the higher 16 bits. 61 * 62 * We have a limitation of 16 bits i.e. 0xffff items in the queue. 63 * The PCQ_MAXLEN constant is set accordingly. 64 */ 65 66 static inline void 67 pcq_split(uint32_t v, u_int *p, u_int *c) 68 { 69 70 *p = v & 0xffff; 71 *c = v >> 16; 72 } 73 74 static inline uint32_t 75 pcq_combine(u_int p, u_int c) 76 { 77 78 return p | (c << 16); 79 } 80 81 static inline u_int 82 pcq_advance(pcq_t *pcq, u_int pc) 83 { 84 85 if (__predict_false(++pc == pcq->pcq_nitems)) { 86 return 0; 87 } 88 return pc; 89 } 90 91 /* 92 * pcq_put: place an item at the end of the queue. 93 */ 94 bool 95 pcq_put(pcq_t *pcq, void *item) 96 { 97 uint32_t v, nv; 98 u_int op, p, c; 99 100 KASSERT(item != NULL); 101 102 do { 103 v = pcq->pcq_pc; 104 pcq_split(v, &op, &c); 105 p = pcq_advance(pcq, op); 106 if (p == c) { 107 /* Queue is full. */ 108 return false; 109 } 110 nv = pcq_combine(p, c); 111 } while (atomic_cas_32(&pcq->pcq_pc, v, nv) != v); 112 113 /* 114 * Ensure that the update to pcq_pc is globally visible before the 115 * data item. See pcq_get(). This also ensures that any changes 116 * that the caller made to the data item are globally visible 117 * before we put it onto the list. 118 */ 119 #ifndef __HAVE_ATOMIC_AS_MEMBAR 120 membar_producer(); 121 #endif 122 pcq->pcq_items[op] = item; 123 124 /* 125 * Synchronization activity to wake up the consumer will ensure 126 * that the update to pcq_items[] is visible before the wakeup 127 * arrives. So, we do not need an additonal memory barrier here. 128 */ 129 return true; 130 } 131 132 /* 133 * pcq_peek: return the next item from the queue without removal. 134 */ 135 void * 136 pcq_peek(pcq_t *pcq) 137 { 138 const uint32_t v = pcq->pcq_pc; 139 u_int p, c; 140 141 pcq_split(v, &p, &c); 142 143 /* See comment on race below in pcq_get(). */ 144 return (p == c) ? NULL : 145 (membar_datadep_consumer(), pcq->pcq_items[c]); 146 } 147 148 /* 149 * pcq_get: remove and return the next item for consumption or NULL if empty. 150 * 151 * => The caller must prevent concurrent gets from occurring. 152 */ 153 void * 154 pcq_get(pcq_t *pcq) 155 { 156 uint32_t v, nv; 157 u_int p, c; 158 void *item; 159 160 v = pcq->pcq_pc; 161 pcq_split(v, &p, &c); 162 if (p == c) { 163 /* Queue is empty: nothing to return. */ 164 return NULL; 165 } 166 /* Make sure we read pcq->pcq_pc before pcq->pcq_items[c]. */ 167 membar_datadep_consumer(); 168 item = pcq->pcq_items[c]; 169 if (item == NULL) { 170 /* 171 * Raced with sender: we rely on a notification (e.g. softint 172 * or wakeup) being generated after the producer's pcq_put(), 173 * causing us to retry pcq_get() later. 174 */ 175 return NULL; 176 } 177 pcq->pcq_items[c] = NULL; 178 c = pcq_advance(pcq, c); 179 nv = pcq_combine(p, c); 180 181 /* 182 * Ensure that update to pcq_items[] becomes globally visible 183 * before the update to pcq_pc. If it were reodered to occur 184 * after it, we could in theory wipe out a modification made 185 * to pcq_items[] by pcq_put(). 186 */ 187 #ifndef __HAVE_ATOMIC_AS_MEMBAR 188 membar_producer(); 189 #endif 190 while (__predict_false(atomic_cas_32(&pcq->pcq_pc, v, nv) != v)) { 191 v = pcq->pcq_pc; 192 pcq_split(v, &p, &c); 193 c = pcq_advance(pcq, c); 194 nv = pcq_combine(p, c); 195 } 196 return item; 197 } 198 199 pcq_t * 200 pcq_create(size_t nitems, km_flag_t kmflags) 201 { 202 pcq_t *pcq; 203 204 KASSERT(nitems > 0 || nitems <= PCQ_MAXLEN); 205 206 pcq = kmem_zalloc(offsetof(pcq_t, pcq_items[nitems]), kmflags); 207 if (pcq == NULL) { 208 return NULL; 209 } 210 pcq->pcq_nitems = nitems; 211 return pcq; 212 } 213 214 void 215 pcq_destroy(pcq_t *pcq) 216 { 217 218 kmem_free(pcq, offsetof(pcq_t, pcq_items[pcq->pcq_nitems])); 219 } 220 221 size_t 222 pcq_maxitems(pcq_t *pcq) 223 { 224 225 return pcq->pcq_nitems; 226 } 227