1 /* $NetBSD: pktqueue.c,v 1.16 2021/12/21 04:09:32 knakahara Exp $ */ 2 3 /*- 4 * Copyright (c) 2014 The NetBSD Foundation, Inc. 5 * All rights reserved. 6 * 7 * This code is derived from software contributed to The NetBSD Foundation 8 * by Mindaugas Rasiukevicius. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 1. Redistributions of source code must retain the above copyright 14 * notice, this list of conditions and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 * POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32 /* 33 * The packet queue (pktqueue) interface is a lockless IP input queue 34 * which also abstracts and handles network ISR scheduling. It provides 35 * a mechanism to enable receiver-side packet steering (RPS). 36 */ 37 38 #include <sys/cdefs.h> 39 __KERNEL_RCSID(0, "$NetBSD: pktqueue.c,v 1.16 2021/12/21 04:09:32 knakahara Exp $"); 40 41 #ifdef _KERNEL_OPT 42 #include "opt_net_mpsafe.h" 43 #endif 44 45 #include <sys/param.h> 46 #include <sys/types.h> 47 48 #include <sys/atomic.h> 49 #include <sys/cpu.h> 50 #include <sys/pcq.h> 51 #include <sys/intr.h> 52 #include <sys/mbuf.h> 53 #include <sys/proc.h> 54 #include <sys/percpu.h> 55 #include <sys/xcall.h> 56 57 #include <net/pktqueue.h> 58 #include <net/rss_config.h> 59 60 #include <netinet/in.h> 61 #include <netinet/ip.h> 62 #include <netinet/ip6.h> 63 64 struct pktqueue { 65 /* 66 * The lock used for a barrier mechanism. The barrier counter, 67 * as well as the drop counter, are managed atomically though. 68 * Ensure this group is in a separate cache line. 69 */ 70 union { 71 struct { 72 kmutex_t pq_lock; 73 volatile u_int pq_barrier; 74 }; 75 uint8_t _pad[COHERENCY_UNIT]; 76 }; 77 78 /* The size of the queue, counters and the interrupt handler. */ 79 u_int pq_maxlen; 80 percpu_t * pq_counters; 81 void * pq_sih; 82 83 /* Finally, per-CPU queues. */ 84 struct percpu * pq_pcq; /* struct pcq * */ 85 }; 86 87 /* The counters of the packet queue. */ 88 #define PQCNT_ENQUEUE 0 89 #define PQCNT_DEQUEUE 1 90 #define PQCNT_DROP 2 91 #define PQCNT_NCOUNTERS 3 92 93 typedef struct { 94 uint64_t count[PQCNT_NCOUNTERS]; 95 } pktq_counters_t; 96 97 /* Special marker value used by pktq_barrier() mechanism. */ 98 #define PKTQ_MARKER ((void *)(~0ULL)) 99 100 static void 101 pktq_init_cpu(void *vqp, void *vpq, struct cpu_info *ci) 102 { 103 struct pcq **qp = vqp; 104 struct pktqueue *pq = vpq; 105 106 *qp = pcq_create(pq->pq_maxlen, KM_SLEEP); 107 } 108 109 static void 110 pktq_fini_cpu(void *vqp, void *vpq, struct cpu_info *ci) 111 { 112 struct pcq **qp = vqp, *q = *qp; 113 114 KASSERT(pcq_peek(q) == NULL); 115 pcq_destroy(q); 116 *qp = NULL; /* paranoia */ 117 } 118 119 static struct pcq * 120 pktq_pcq(struct pktqueue *pq, struct cpu_info *ci) 121 { 122 struct pcq **qp, *q; 123 124 /* 125 * As long as preemption is disabled, the xcall to swap percpu 126 * buffers can't complete, so it is safe to read the pointer. 127 */ 128 KASSERT(kpreempt_disabled()); 129 130 qp = percpu_getptr_remote(pq->pq_pcq, ci); 131 q = *qp; 132 133 return q; 134 } 135 136 pktqueue_t * 137 pktq_create(size_t maxlen, void (*intrh)(void *), void *sc) 138 { 139 const u_int sflags = SOFTINT_NET | SOFTINT_MPSAFE | SOFTINT_RCPU; 140 pktqueue_t *pq; 141 percpu_t *pc; 142 void *sih; 143 144 pc = percpu_alloc(sizeof(pktq_counters_t)); 145 if ((sih = softint_establish(sflags, intrh, sc)) == NULL) { 146 percpu_free(pc, sizeof(pktq_counters_t)); 147 return NULL; 148 } 149 150 pq = kmem_zalloc(sizeof(*pq), KM_SLEEP); 151 mutex_init(&pq->pq_lock, MUTEX_DEFAULT, IPL_NONE); 152 pq->pq_maxlen = maxlen; 153 pq->pq_counters = pc; 154 pq->pq_sih = sih; 155 pq->pq_pcq = percpu_create(sizeof(struct pcq *), 156 pktq_init_cpu, pktq_fini_cpu, pq); 157 158 return pq; 159 } 160 161 void 162 pktq_destroy(pktqueue_t *pq) 163 { 164 165 percpu_free(pq->pq_pcq, sizeof(struct pcq *)); 166 percpu_free(pq->pq_counters, sizeof(pktq_counters_t)); 167 softint_disestablish(pq->pq_sih); 168 mutex_destroy(&pq->pq_lock); 169 kmem_free(pq, sizeof(*pq)); 170 } 171 172 /* 173 * - pktq_inc_counter: increment the counter given an ID. 174 * - pktq_collect_counts: handler to sum up the counts from each CPU. 175 * - pktq_getcount: return the effective count given an ID. 176 */ 177 178 static inline void 179 pktq_inc_count(pktqueue_t *pq, u_int i) 180 { 181 percpu_t *pc = pq->pq_counters; 182 pktq_counters_t *c; 183 184 c = percpu_getref(pc); 185 c->count[i]++; 186 percpu_putref(pc); 187 } 188 189 static void 190 pktq_collect_counts(void *mem, void *arg, struct cpu_info *ci) 191 { 192 const pktq_counters_t *c = mem; 193 pktq_counters_t *sum = arg; 194 195 int s = splnet(); 196 197 for (u_int i = 0; i < PQCNT_NCOUNTERS; i++) { 198 sum->count[i] += c->count[i]; 199 } 200 201 splx(s); 202 } 203 204 uint64_t 205 pktq_get_count(pktqueue_t *pq, pktq_count_t c) 206 { 207 pktq_counters_t sum; 208 209 if (c != PKTQ_MAXLEN) { 210 memset(&sum, 0, sizeof(sum)); 211 percpu_foreach_xcall(pq->pq_counters, 212 XC_HIGHPRI_IPL(IPL_SOFTNET), pktq_collect_counts, &sum); 213 } 214 switch (c) { 215 case PKTQ_NITEMS: 216 return sum.count[PQCNT_ENQUEUE] - sum.count[PQCNT_DEQUEUE]; 217 case PKTQ_DROPS: 218 return sum.count[PQCNT_DROP]; 219 case PKTQ_MAXLEN: 220 return pq->pq_maxlen; 221 } 222 return 0; 223 } 224 225 uint32_t 226 pktq_rps_hash(pktq_rps_hash_func_t *funcp, const struct mbuf *m) 227 { 228 pktq_rps_hash_func_t func = atomic_load_relaxed(funcp); 229 230 KASSERT(func != NULL); 231 232 return (*func)(m); 233 } 234 235 static uint32_t 236 pktq_rps_hash_zero(const struct mbuf *m __unused) 237 { 238 239 return 0; 240 } 241 242 static uint32_t 243 pktq_rps_hash_curcpu(const struct mbuf *m __unused) 244 { 245 246 return cpu_index(curcpu()); 247 } 248 249 static uint32_t 250 pktq_rps_hash_toeplitz(const struct mbuf *m) 251 { 252 struct ip *ip; 253 /* 254 * Disable UDP port - IP fragments aren't currently being handled 255 * and so we end up with a mix of 2-tuple and 4-tuple 256 * traffic. 257 */ 258 const u_int flag = RSS_TOEPLITZ_USE_TCP_PORT; 259 260 /* glance IP version */ 261 if ((m->m_flags & M_PKTHDR) == 0) 262 return 0; 263 264 ip = mtod(m, struct ip *); 265 if (ip->ip_v == IPVERSION) { 266 if (__predict_false(m->m_len < sizeof(struct ip))) 267 return 0; 268 return rss_toeplitz_hash_from_mbuf_ipv4(m, flag); 269 } else if (ip->ip_v == 6) { 270 if (__predict_false(m->m_len < sizeof(struct ip6_hdr))) 271 return 0; 272 return rss_toeplitz_hash_from_mbuf_ipv6(m, flag); 273 } 274 275 return 0; 276 } 277 278 /* 279 * toeplitz without curcpu. 280 * Generally, this has better performance than toeplitz. 281 */ 282 static uint32_t 283 pktq_rps_hash_toeplitz_othercpus(const struct mbuf *m) 284 { 285 uint32_t hash; 286 287 if (ncpu == 1) 288 return 0; 289 290 hash = pktq_rps_hash_toeplitz(m); 291 hash %= ncpu - 1; 292 if (hash >= cpu_index(curcpu())) 293 return hash + 1; 294 else 295 return hash; 296 } 297 298 static struct pktq_rps_hash_table { 299 const char* prh_type; 300 pktq_rps_hash_func_t prh_func; 301 } const pktq_rps_hash_tab[] = { 302 { "zero", pktq_rps_hash_zero }, 303 { "curcpu", pktq_rps_hash_curcpu }, 304 { "toeplitz", pktq_rps_hash_toeplitz }, 305 { "toeplitz-othercpus", pktq_rps_hash_toeplitz_othercpus }, 306 }; 307 const pktq_rps_hash_func_t pktq_rps_hash_default = 308 #ifdef NET_MPSAFE 309 pktq_rps_hash_curcpu; 310 #else 311 pktq_rps_hash_zero; 312 #endif 313 314 static const char * 315 pktq_get_rps_hash_type(pktq_rps_hash_func_t func) 316 { 317 318 for (int i = 0; i < __arraycount(pktq_rps_hash_tab); i++) { 319 if (func == pktq_rps_hash_tab[i].prh_func) { 320 return pktq_rps_hash_tab[i].prh_type; 321 } 322 } 323 324 return NULL; 325 } 326 327 static int 328 pktq_set_rps_hash_type(pktq_rps_hash_func_t *func, const char *type) 329 { 330 331 if (strcmp(type, pktq_get_rps_hash_type(*func)) == 0) 332 return 0; 333 334 for (int i = 0; i < __arraycount(pktq_rps_hash_tab); i++) { 335 if (strcmp(type, pktq_rps_hash_tab[i].prh_type) == 0) { 336 atomic_store_relaxed(func, pktq_rps_hash_tab[i].prh_func); 337 return 0; 338 } 339 } 340 341 return ENOENT; 342 } 343 344 int 345 sysctl_pktq_rps_hash_handler(SYSCTLFN_ARGS) 346 { 347 struct sysctlnode node; 348 pktq_rps_hash_func_t *func; 349 int error; 350 char type[PKTQ_RPS_HASH_NAME_LEN]; 351 352 node = *rnode; 353 func = node.sysctl_data; 354 355 strlcpy(type, pktq_get_rps_hash_type(*func), PKTQ_RPS_HASH_NAME_LEN); 356 357 node.sysctl_data = &type; 358 node.sysctl_size = sizeof(type); 359 error = sysctl_lookup(SYSCTLFN_CALL(&node)); 360 if (error || newp == NULL) 361 return error; 362 363 error = pktq_set_rps_hash_type(func, type); 364 365 return error; 366 } 367 368 /* 369 * pktq_enqueue: inject the packet into the end of the queue. 370 * 371 * => Must be called from the interrupt or with the preemption disabled. 372 * => Consumes the packet and returns true on success. 373 * => Returns false on failure; caller is responsible to free the packet. 374 */ 375 bool 376 pktq_enqueue(pktqueue_t *pq, struct mbuf *m, const u_int hash __unused) 377 { 378 #if defined(_RUMPKERNEL) || defined(_RUMP_NATIVE_ABI) 379 struct cpu_info *ci = curcpu(); 380 #else 381 struct cpu_info *ci = cpu_lookup(hash % ncpu); 382 #endif 383 384 KASSERT(kpreempt_disabled()); 385 386 if (__predict_false(!pcq_put(pktq_pcq(pq, ci), m))) { 387 pktq_inc_count(pq, PQCNT_DROP); 388 return false; 389 } 390 softint_schedule_cpu(pq->pq_sih, ci); 391 pktq_inc_count(pq, PQCNT_ENQUEUE); 392 return true; 393 } 394 395 /* 396 * pktq_dequeue: take a packet from the queue. 397 * 398 * => Must be called with preemption disabled. 399 * => Must ensure there are not concurrent dequeue calls. 400 */ 401 struct mbuf * 402 pktq_dequeue(pktqueue_t *pq) 403 { 404 struct cpu_info *ci = curcpu(); 405 struct mbuf *m; 406 407 KASSERT(kpreempt_disabled()); 408 409 m = pcq_get(pktq_pcq(pq, ci)); 410 if (__predict_false(m == PKTQ_MARKER)) { 411 /* Note the marker entry. */ 412 atomic_inc_uint(&pq->pq_barrier); 413 return NULL; 414 } 415 if (__predict_true(m != NULL)) { 416 pktq_inc_count(pq, PQCNT_DEQUEUE); 417 } 418 return m; 419 } 420 421 /* 422 * pktq_barrier: waits for a grace period when all packets enqueued at 423 * the moment of calling this routine will be processed. This is used 424 * to ensure that e.g. packets referencing some interface were drained. 425 */ 426 void 427 pktq_barrier(pktqueue_t *pq) 428 { 429 CPU_INFO_ITERATOR cii; 430 struct cpu_info *ci; 431 u_int pending = 0; 432 433 mutex_enter(&pq->pq_lock); 434 KASSERT(pq->pq_barrier == 0); 435 436 for (CPU_INFO_FOREACH(cii, ci)) { 437 struct pcq *q; 438 439 kpreempt_disable(); 440 q = pktq_pcq(pq, ci); 441 kpreempt_enable(); 442 443 /* If the queue is empty - nothing to do. */ 444 if (pcq_peek(q) == NULL) { 445 continue; 446 } 447 /* Otherwise, put the marker and entry. */ 448 while (!pcq_put(q, PKTQ_MARKER)) { 449 kpause("pktqsync", false, 1, NULL); 450 } 451 kpreempt_disable(); 452 softint_schedule_cpu(pq->pq_sih, ci); 453 kpreempt_enable(); 454 pending++; 455 } 456 457 /* Wait for each queue to process the markers. */ 458 while (pq->pq_barrier != pending) { 459 kpause("pktqsync", false, 1, NULL); 460 } 461 pq->pq_barrier = 0; 462 mutex_exit(&pq->pq_lock); 463 } 464 465 /* 466 * pktq_flush: free mbufs in all queues. 467 * 468 * => The caller must ensure there are no concurrent writers or flush calls. 469 */ 470 void 471 pktq_flush(pktqueue_t *pq) 472 { 473 CPU_INFO_ITERATOR cii; 474 struct cpu_info *ci; 475 struct mbuf *m; 476 477 for (CPU_INFO_FOREACH(cii, ci)) { 478 struct pcq *q; 479 480 kpreempt_disable(); 481 q = pktq_pcq(pq, ci); 482 kpreempt_enable(); 483 484 /* 485 * XXX This can't be right -- if the softint is running 486 * then pcq_get isn't safe here. 487 */ 488 while ((m = pcq_get(q)) != NULL) { 489 pktq_inc_count(pq, PQCNT_DEQUEUE); 490 m_freem(m); 491 } 492 } 493 } 494 495 static void 496 pktq_set_maxlen_cpu(void *vpq, void *vqs) 497 { 498 struct pktqueue *pq = vpq; 499 struct pcq **qp, *q, **qs = vqs; 500 unsigned i = cpu_index(curcpu()); 501 int s; 502 503 s = splnet(); 504 qp = percpu_getref(pq->pq_pcq); 505 q = *qp; 506 *qp = qs[i]; 507 qs[i] = q; 508 percpu_putref(pq->pq_pcq); 509 splx(s); 510 } 511 512 /* 513 * pktq_set_maxlen: create per-CPU queues using a new size and replace 514 * the existing queues without losing any packets. 515 * 516 * XXX ncpu must remain stable throughout. 517 */ 518 int 519 pktq_set_maxlen(pktqueue_t *pq, size_t maxlen) 520 { 521 const u_int slotbytes = ncpu * sizeof(pcq_t *); 522 pcq_t **qs; 523 524 if (!maxlen || maxlen > PCQ_MAXLEN) 525 return EINVAL; 526 if (pq->pq_maxlen == maxlen) 527 return 0; 528 529 /* First, allocate the new queues. */ 530 qs = kmem_zalloc(slotbytes, KM_SLEEP); 531 for (u_int i = 0; i < ncpu; i++) { 532 qs[i] = pcq_create(maxlen, KM_SLEEP); 533 } 534 535 /* 536 * Issue an xcall to replace the queue pointers on each CPU. 537 * This implies all the necessary memory barriers. 538 */ 539 mutex_enter(&pq->pq_lock); 540 xc_wait(xc_broadcast(XC_HIGHPRI, pktq_set_maxlen_cpu, pq, qs)); 541 pq->pq_maxlen = maxlen; 542 mutex_exit(&pq->pq_lock); 543 544 /* 545 * At this point, the new packets are flowing into the new 546 * queues. However, the old queues may have some packets 547 * present which are no longer being processed. We are going 548 * to re-enqueue them. This may change the order of packet 549 * arrival, but it is not considered an issue. 550 * 551 * There may be in-flight interrupts calling pktq_dequeue() 552 * which reference the old queues. Issue a barrier to ensure 553 * that we are going to be the only pcq_get() callers on the 554 * old queues. 555 */ 556 pktq_barrier(pq); 557 558 for (u_int i = 0; i < ncpu; i++) { 559 struct pcq *q; 560 struct mbuf *m; 561 562 kpreempt_disable(); 563 q = pktq_pcq(pq, cpu_lookup(i)); 564 kpreempt_enable(); 565 566 while ((m = pcq_get(qs[i])) != NULL) { 567 while (!pcq_put(q, m)) { 568 kpause("pktqrenq", false, 1, NULL); 569 } 570 } 571 pcq_destroy(qs[i]); 572 } 573 574 /* Well, that was fun. */ 575 kmem_free(qs, slotbytes); 576 return 0; 577 } 578 579 int 580 sysctl_pktq_maxlen(SYSCTLFN_ARGS, pktqueue_t *pq) 581 { 582 u_int nmaxlen = pktq_get_count(pq, PKTQ_MAXLEN); 583 struct sysctlnode node = *rnode; 584 int error; 585 586 node.sysctl_data = &nmaxlen; 587 error = sysctl_lookup(SYSCTLFN_CALL(&node)); 588 if (error || newp == NULL) 589 return error; 590 return pktq_set_maxlen(pq, nmaxlen); 591 } 592 593 int 594 sysctl_pktq_count(SYSCTLFN_ARGS, pktqueue_t *pq, u_int count_id) 595 { 596 uint64_t count = pktq_get_count(pq, count_id); 597 struct sysctlnode node = *rnode; 598 599 node.sysctl_data = &count; 600 return sysctl_lookup(SYSCTLFN_CALL(&node)); 601 } 602