1 /* $NetBSD: pktqueue.c,v 1.8 2014/07/04 01:50:22 ozaki-r Exp $ */ 2 3 /*- 4 * Copyright (c) 2014 The NetBSD Foundation, Inc. 5 * All rights reserved. 6 * 7 * This code is derived from software contributed to The NetBSD Foundation 8 * by Mindaugas Rasiukevicius. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 1. Redistributions of source code must retain the above copyright 14 * notice, this list of conditions and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 * POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32 /* 33 * The packet queue (pktqueue) interface is a lockless IP input queue 34 * which also abstracts and handles network ISR scheduling. It provides 35 * a mechanism to enable receiver-side packet steering (RPS). 36 */ 37 38 #include <sys/cdefs.h> 39 __KERNEL_RCSID(0, "$NetBSD: pktqueue.c,v 1.8 2014/07/04 01:50:22 ozaki-r Exp $"); 40 41 #include <sys/param.h> 42 #include <sys/types.h> 43 44 #include <sys/atomic.h> 45 #include <sys/cpu.h> 46 #include <sys/pcq.h> 47 #include <sys/intr.h> 48 #include <sys/mbuf.h> 49 #include <sys/proc.h> 50 #include <sys/percpu.h> 51 52 #include <net/pktqueue.h> 53 54 /* 55 * WARNING: update this if struct pktqueue changes. 56 */ 57 #define PKTQ_CLPAD \ 58 MAX(COHERENCY_UNIT, COHERENCY_UNIT - sizeof(kmutex_t) - sizeof(u_int)) 59 60 struct pktqueue { 61 /* 62 * The lock used for a barrier mechanism. The barrier counter, 63 * as well as the drop counter, are managed atomically though. 64 * Ensure this group is in a separate cache line. 65 */ 66 kmutex_t pq_lock; 67 volatile u_int pq_barrier; 68 uint8_t _pad[PKTQ_CLPAD]; 69 70 /* The size of the queue, counters and the interrupt handler. */ 71 u_int pq_maxlen; 72 percpu_t * pq_counters; 73 void * pq_sih; 74 75 /* Finally, per-CPU queues. */ 76 pcq_t * pq_queue[]; 77 }; 78 79 /* The counters of the packet queue. */ 80 #define PQCNT_ENQUEUE 0 81 #define PQCNT_DEQUEUE 1 82 #define PQCNT_DROP 2 83 #define PQCNT_NCOUNTERS 3 84 85 typedef struct { 86 uint64_t count[PQCNT_NCOUNTERS]; 87 } pktq_counters_t; 88 89 /* Special marker value used by pktq_barrier() mechanism. */ 90 #define PKTQ_MARKER ((void *)(~0ULL)) 91 92 /* 93 * The total size of pktqueue_t which depends on the number of CPUs. 94 */ 95 #define PKTQUEUE_STRUCT_LEN(ncpu) \ 96 roundup2(offsetof(pktqueue_t, pq_queue[ncpu]), coherency_unit) 97 98 pktqueue_t * 99 pktq_create(size_t maxlen, void (*intrh)(void *), void *sc) 100 { 101 const u_int sflags = SOFTINT_NET | SOFTINT_MPSAFE | SOFTINT_RCPU; 102 const size_t len = PKTQUEUE_STRUCT_LEN(ncpu); 103 pktqueue_t *pq; 104 percpu_t *pc; 105 void *sih; 106 107 if ((pc = percpu_alloc(sizeof(pktq_counters_t))) == NULL) { 108 return NULL; 109 } 110 if ((sih = softint_establish(sflags, intrh, sc)) == NULL) { 111 percpu_free(pc, sizeof(pktq_counters_t)); 112 return NULL; 113 } 114 115 pq = kmem_zalloc(len, KM_SLEEP); 116 for (u_int i = 0; i < ncpu; i++) { 117 pq->pq_queue[i] = pcq_create(maxlen, KM_SLEEP); 118 } 119 mutex_init(&pq->pq_lock, MUTEX_DEFAULT, IPL_NONE); 120 pq->pq_maxlen = maxlen; 121 pq->pq_counters = pc; 122 pq->pq_sih = sih; 123 124 return pq; 125 } 126 127 void 128 pktq_destroy(pktqueue_t *pq) 129 { 130 const size_t len = PKTQUEUE_STRUCT_LEN(ncpu); 131 132 for (u_int i = 0; i < ncpu; i++) { 133 pcq_t *q = pq->pq_queue[i]; 134 KASSERT(pcq_peek(q) == NULL); 135 pcq_destroy(q); 136 } 137 percpu_free(pq->pq_counters, sizeof(pktq_counters_t)); 138 softint_disestablish(pq->pq_sih); 139 mutex_destroy(&pq->pq_lock); 140 kmem_free(pq, len); 141 } 142 143 /* 144 * - pktq_inc_counter: increment the counter given an ID. 145 * - pktq_collect_counts: handler to sum up the counts from each CPU. 146 * - pktq_getcount: return the effective count given an ID. 147 */ 148 149 static inline void 150 pktq_inc_count(pktqueue_t *pq, u_int i) 151 { 152 percpu_t *pc = pq->pq_counters; 153 pktq_counters_t *c; 154 155 c = percpu_getref(pc); 156 c->count[i]++; 157 percpu_putref(pc); 158 } 159 160 static void 161 pktq_collect_counts(void *mem, void *arg, struct cpu_info *ci) 162 { 163 const pktq_counters_t *c = mem; 164 pktq_counters_t *sum = arg; 165 166 for (u_int i = 0; i < PQCNT_NCOUNTERS; i++) { 167 sum->count[i] += c->count[i]; 168 } 169 } 170 171 uint64_t 172 pktq_get_count(pktqueue_t *pq, pktq_count_t c) 173 { 174 pktq_counters_t sum; 175 176 if (c != PKTQ_MAXLEN) { 177 memset(&sum, 0, sizeof(sum)); 178 percpu_foreach(pq->pq_counters, pktq_collect_counts, &sum); 179 } 180 switch (c) { 181 case PKTQ_NITEMS: 182 return sum.count[PQCNT_ENQUEUE] - sum.count[PQCNT_DEQUEUE]; 183 case PKTQ_DROPS: 184 return sum.count[PQCNT_DROP]; 185 case PKTQ_MAXLEN: 186 return pq->pq_maxlen; 187 } 188 return 0; 189 } 190 191 uint32_t 192 pktq_rps_hash(const struct mbuf *m __unused) 193 { 194 /* 195 * XXX: No distribution yet; the softnet_lock contention 196 * XXX: must be eliminated first. 197 */ 198 return 0; 199 } 200 201 /* 202 * pktq_enqueue: inject the packet into the end of the queue. 203 * 204 * => Must be called from the interrupt or with the preemption disabled. 205 * => Consumes the packet and returns true on success. 206 * => Returns false on failure; caller is responsible to free the packet. 207 */ 208 bool 209 pktq_enqueue(pktqueue_t *pq, struct mbuf *m, const u_int hash __unused) 210 { 211 #if defined(_RUMPKERNEL) || defined(_RUMP_NATIVE_ABI) 212 const unsigned cpuid = curcpu()->ci_index; 213 #else 214 const unsigned cpuid = hash % ncpu; 215 #endif 216 217 KASSERT(kpreempt_disabled()); 218 219 if (__predict_false(!pcq_put(pq->pq_queue[cpuid], m))) { 220 pktq_inc_count(pq, PQCNT_DROP); 221 return false; 222 } 223 softint_schedule_cpu(pq->pq_sih, cpu_lookup(cpuid)); 224 pktq_inc_count(pq, PQCNT_ENQUEUE); 225 return true; 226 } 227 228 /* 229 * pktq_dequeue: take a packet from the queue. 230 * 231 * => Must be called with preemption disabled. 232 * => Must ensure there are not concurrent dequeue calls. 233 */ 234 struct mbuf * 235 pktq_dequeue(pktqueue_t *pq) 236 { 237 const struct cpu_info *ci = curcpu(); 238 const unsigned cpuid = cpu_index(ci); 239 struct mbuf *m; 240 241 m = pcq_get(pq->pq_queue[cpuid]); 242 if (__predict_false(m == PKTQ_MARKER)) { 243 /* Note the marker entry. */ 244 atomic_inc_uint(&pq->pq_barrier); 245 return NULL; 246 } 247 if (__predict_true(m != NULL)) { 248 pktq_inc_count(pq, PQCNT_DEQUEUE); 249 } 250 return m; 251 } 252 253 /* 254 * pktq_barrier: waits for a grace period when all packets enqueued at 255 * the moment of calling this routine will be processed. This is used 256 * to ensure that e.g. packets referencing some interface were drained. 257 */ 258 void 259 pktq_barrier(pktqueue_t *pq) 260 { 261 u_int pending = 0; 262 263 mutex_enter(&pq->pq_lock); 264 KASSERT(pq->pq_barrier == 0); 265 266 for (u_int i = 0; i < ncpu; i++) { 267 pcq_t *q = pq->pq_queue[i]; 268 269 /* If the queue is empty - nothing to do. */ 270 if (pcq_peek(q) == NULL) { 271 continue; 272 } 273 /* Otherwise, put the marker and entry. */ 274 while (!pcq_put(q, PKTQ_MARKER)) { 275 kpause("pktqsync", false, 1, NULL); 276 } 277 kpreempt_disable(); 278 softint_schedule_cpu(pq->pq_sih, cpu_lookup(i)); 279 kpreempt_enable(); 280 pending++; 281 } 282 283 /* Wait for each queue to process the markers. */ 284 while (pq->pq_barrier != pending) { 285 kpause("pktqsync", false, 1, NULL); 286 } 287 pq->pq_barrier = 0; 288 mutex_exit(&pq->pq_lock); 289 } 290 291 /* 292 * pktq_flush: free mbufs in all queues. 293 * 294 * => The caller must ensure there are no concurrent writers or flush calls. 295 */ 296 void 297 pktq_flush(pktqueue_t *pq) 298 { 299 struct mbuf *m; 300 301 for (u_int i = 0; i < ncpu; i++) { 302 while ((m = pcq_get(pq->pq_queue[i])) != NULL) { 303 pktq_inc_count(pq, PQCNT_DEQUEUE); 304 m_freem(m); 305 } 306 } 307 } 308 309 /* 310 * pktq_set_maxlen: create per-CPU queues using a new size and replace 311 * the existing queues without losing any packets. 312 */ 313 int 314 pktq_set_maxlen(pktqueue_t *pq, size_t maxlen) 315 { 316 const u_int slotbytes = ncpu * sizeof(pcq_t *); 317 pcq_t **qs; 318 319 if (!maxlen || maxlen > PCQ_MAXLEN) 320 return EINVAL; 321 if (pq->pq_maxlen == maxlen) 322 return 0; 323 324 /* First, allocate the new queues and replace them. */ 325 qs = kmem_zalloc(slotbytes, KM_SLEEP); 326 for (u_int i = 0; i < ncpu; i++) { 327 qs[i] = pcq_create(maxlen, KM_SLEEP); 328 } 329 mutex_enter(&pq->pq_lock); 330 for (u_int i = 0; i < ncpu; i++) { 331 /* Swap: store of a word is atomic. */ 332 pcq_t *q = pq->pq_queue[i]; 333 pq->pq_queue[i] = qs[i]; 334 qs[i] = q; 335 } 336 pq->pq_maxlen = maxlen; 337 mutex_exit(&pq->pq_lock); 338 339 /* 340 * At this point, the new packets are flowing into the new 341 * queues. However, the old queues may have some packets 342 * present which are no longer being processed. We are going 343 * to re-enqueue them. This may change the order of packet 344 * arrival, but it is not considered an issue. 345 * 346 * There may be in-flight interrupts calling pktq_dequeue() 347 * which reference the old queues. Issue a barrier to ensure 348 * that we are going to be the only pcq_get() callers on the 349 * old queues. 350 */ 351 pktq_barrier(pq); 352 353 for (u_int i = 0; i < ncpu; i++) { 354 struct mbuf *m; 355 356 while ((m = pcq_get(qs[i])) != NULL) { 357 while (!pcq_put(pq->pq_queue[i], m)) { 358 kpause("pktqrenq", false, 1, NULL); 359 } 360 } 361 pcq_destroy(qs[i]); 362 } 363 364 /* Well, that was fun. */ 365 kmem_free(qs, slotbytes); 366 return 0; 367 } 368 369 int 370 sysctl_pktq_maxlen(SYSCTLFN_ARGS, pktqueue_t *pq) 371 { 372 u_int nmaxlen = pktq_get_count(pq, PKTQ_MAXLEN); 373 struct sysctlnode node = *rnode; 374 int error; 375 376 node.sysctl_data = &nmaxlen; 377 error = sysctl_lookup(SYSCTLFN_CALL(&node)); 378 if (error || newp == NULL) 379 return error; 380 return pktq_set_maxlen(pq, nmaxlen); 381 } 382 383 int 384 sysctl_pktq_count(SYSCTLFN_ARGS, pktqueue_t *pq, u_int count_id) 385 { 386 int count = pktq_get_count(pq, count_id); 387 struct sysctlnode node = *rnode; 388 node.sysctl_data = &count; 389 return sysctl_lookup(SYSCTLFN_CALL(&node)); 390 } 391