1 /* $NetBSD: rwlock.c,v 1.1 2024/02/18 20:57:50 christos Exp $ */
2
3 /*
4 * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
5 *
6 * SPDX-License-Identifier: MPL-2.0
7 *
8 * This Source Code Form is subject to the terms of the Mozilla Public
9 * License, v. 2.0. If a copy of the MPL was not distributed with this
10 * file, you can obtain one at https://mozilla.org/MPL/2.0/.
11 *
12 * See the COPYRIGHT file distributed with this work for additional
13 * information regarding copyright ownership.
14 */
15
16 /*! \file */
17
18 #include <inttypes.h>
19 #include <stdbool.h>
20 #include <stddef.h>
21
22 #if defined(sun) && (defined(__sparc) || defined(__sparc__))
23 #include <synch.h> /* for smt_pause(3c) */
24 #endif /* if defined(sun) && (defined(__sparc) || defined(__sparc__)) */
25
26 #include <isc/atomic.h>
27 #include <isc/magic.h>
28 #include <isc/platform.h>
29 #include <isc/print.h>
30 #include <isc/rwlock.h>
31 #include <isc/util.h>
32
33 #if USE_PTHREAD_RWLOCK
34
35 #include <errno.h>
36 #include <pthread.h>
37
38 void
isc_rwlock_init(isc_rwlock_t * rwl,unsigned int read_quota,unsigned int write_quota)39 isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
40 unsigned int write_quota) {
41 UNUSED(read_quota);
42 UNUSED(write_quota);
43 REQUIRE(pthread_rwlock_init(&rwl->rwlock, NULL) == 0);
44 atomic_init(&rwl->downgrade, false);
45 }
46
47 isc_result_t
isc_rwlock_lock(isc_rwlock_t * rwl,isc_rwlocktype_t type)48 isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
49 switch (type) {
50 case isc_rwlocktype_read:
51 REQUIRE(pthread_rwlock_rdlock(&rwl->rwlock) == 0);
52 break;
53 case isc_rwlocktype_write:
54 while (true) {
55 REQUIRE(pthread_rwlock_wrlock(&rwl->rwlock) == 0);
56 /* Unlock if in middle of downgrade operation */
57 if (atomic_load_acquire(&rwl->downgrade)) {
58 REQUIRE(pthread_rwlock_unlock(&rwl->rwlock) ==
59 0);
60 while (atomic_load_acquire(&rwl->downgrade)) {
61 }
62 continue;
63 }
64 break;
65 }
66 break;
67 default:
68 UNREACHABLE();
69 }
70 return (ISC_R_SUCCESS);
71 }
72
73 isc_result_t
isc_rwlock_trylock(isc_rwlock_t * rwl,isc_rwlocktype_t type)74 isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
75 int ret = 0;
76 switch (type) {
77 case isc_rwlocktype_read:
78 ret = pthread_rwlock_tryrdlock(&rwl->rwlock);
79 break;
80 case isc_rwlocktype_write:
81 ret = pthread_rwlock_trywrlock(&rwl->rwlock);
82 if ((ret == 0) && atomic_load_acquire(&rwl->downgrade)) {
83 isc_rwlock_unlock(rwl, type);
84 return (ISC_R_LOCKBUSY);
85 }
86 break;
87 default:
88 UNREACHABLE();
89 }
90
91 switch (ret) {
92 case 0:
93 return (ISC_R_SUCCESS);
94 case EBUSY:
95 return (ISC_R_LOCKBUSY);
96 case EAGAIN:
97 return (ISC_R_LOCKBUSY);
98 default:
99 UNREACHABLE();
100 }
101 }
102
103 isc_result_t
isc_rwlock_unlock(isc_rwlock_t * rwl,isc_rwlocktype_t type)104 isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
105 UNUSED(type);
106 REQUIRE(pthread_rwlock_unlock(&rwl->rwlock) == 0);
107 return (ISC_R_SUCCESS);
108 }
109
110 isc_result_t
isc_rwlock_tryupgrade(isc_rwlock_t * rwl)111 isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
112 UNUSED(rwl);
113 return (ISC_R_LOCKBUSY);
114 }
115
116 void
isc_rwlock_downgrade(isc_rwlock_t * rwl)117 isc_rwlock_downgrade(isc_rwlock_t *rwl) {
118 isc_result_t result;
119 atomic_store_release(&rwl->downgrade, true);
120 result = isc_rwlock_unlock(rwl, isc_rwlocktype_write);
121 RUNTIME_CHECK(result == ISC_R_SUCCESS);
122 result = isc_rwlock_lock(rwl, isc_rwlocktype_read);
123 RUNTIME_CHECK(result == ISC_R_SUCCESS);
124 atomic_store_release(&rwl->downgrade, false);
125 }
126
127 void
isc_rwlock_destroy(isc_rwlock_t * rwl)128 isc_rwlock_destroy(isc_rwlock_t *rwl) {
129 pthread_rwlock_destroy(&rwl->rwlock);
130 }
131
132 #else /* if USE_PTHREAD_RWLOCK */
133
134 #define RWLOCK_MAGIC ISC_MAGIC('R', 'W', 'L', 'k')
135 #define VALID_RWLOCK(rwl) ISC_MAGIC_VALID(rwl, RWLOCK_MAGIC)
136
137 #ifndef RWLOCK_DEFAULT_READ_QUOTA
138 #define RWLOCK_DEFAULT_READ_QUOTA 4
139 #endif /* ifndef RWLOCK_DEFAULT_READ_QUOTA */
140
141 #ifndef RWLOCK_DEFAULT_WRITE_QUOTA
142 #define RWLOCK_DEFAULT_WRITE_QUOTA 4
143 #endif /* ifndef RWLOCK_DEFAULT_WRITE_QUOTA */
144
145 #ifndef RWLOCK_MAX_ADAPTIVE_COUNT
146 #define RWLOCK_MAX_ADAPTIVE_COUNT 100
147 #endif /* ifndef RWLOCK_MAX_ADAPTIVE_COUNT */
148
149 #ifdef __lint__
150 # define isc_rwlock_pause()
151 #else
152 #if defined(_MSC_VER)
153 #include <intrin.h>
154 #define isc_rwlock_pause() YieldProcessor()
155 #elif defined(__x86_64__)
156 #include <immintrin.h>
157 #define isc_rwlock_pause() _mm_pause()
158 #elif defined(__i386__)
159 #define isc_rwlock_pause() __asm__ __volatile__("rep; nop")
160 #elif defined(__ia64__)
161 #define isc_rwlock_pause() __asm__ __volatile__("hint @pause")
162 #elif defined(__arm__) && (defined(_ARM_ARCH_6) || HAVE_ARM_YIELD)
163 #define isc_rwlock_pause() __asm__ __volatile__("yield")
164 #elif defined(sun) && (defined(__sparc) || defined(__sparc__))
165 #define isc_rwlock_pause() smt_pause()
166 #elif (defined(__sparc) || defined(__sparc__)) && HAVE_SPARC_PAUSE
167 #define isc_rwlock_pause() __asm__ __volatile__("pause")
168 #elif defined(__ppc__) || defined(_ARCH_PPC) || defined(_ARCH_PWR) || \
169 defined(_ARCH_PWR2) || defined(_POWER)
170 #define isc_rwlock_pause() __asm__ volatile("or 27,27,27")
171 #else /* if defined(_MSC_VER) */
172 #define isc_rwlock_pause()
173 #endif /* if defined(_MSC_VER) */
174 #endif
175
176 static isc_result_t
177 isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type);
178
179 #ifdef ISC_RWLOCK_TRACE
180 #include <stdio.h> /* Required for fprintf/stderr. */
181
182 #include <isc/thread.h> /* Required for isc_thread_self(). */
183
184 static void
print_lock(const char * operation,isc_rwlock_t * rwl,isc_rwlocktype_t type)185 print_lock(const char *operation, isc_rwlock_t *rwl, isc_rwlocktype_t type) {
186 fprintf(stderr,
187 "rwlock %p thread %" PRIuPTR " %s(%s): "
188 "write_requests=%u, write_completions=%u, "
189 "cnt_and_flag=0x%x, readers_waiting=%u, "
190 "write_granted=%u, write_quota=%u\n",
191 rwl, isc_thread_self(), operation,
192 (type == isc_rwlocktype_read ? "read" : "write"),
193 atomic_load_acquire(&rwl->write_requests),
194 atomic_load_acquire(&rwl->write_completions),
195 atomic_load_acquire(&rwl->cnt_and_flag), rwl->readers_waiting,
196 atomic_load_acquire(&rwl->write_granted), rwl->write_quota);
197 }
198 #endif /* ISC_RWLOCK_TRACE */
199
200 void
isc_rwlock_init(isc_rwlock_t * rwl,unsigned int read_quota,unsigned int write_quota)201 isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
202 unsigned int write_quota) {
203 REQUIRE(rwl != NULL);
204
205 /*
206 * In case there's trouble initializing, we zero magic now. If all
207 * goes well, we'll set it to RWLOCK_MAGIC.
208 */
209 rwl->magic = 0;
210
211 atomic_init(&rwl->spins, 0);
212 atomic_init(&rwl->write_requests, 0);
213 atomic_init(&rwl->write_completions, 0);
214 atomic_init(&rwl->cnt_and_flag, 0);
215 rwl->readers_waiting = 0;
216 atomic_init(&rwl->write_granted, 0);
217 if (read_quota != 0) {
218 UNEXPECTED_ERROR(__FILE__, __LINE__,
219 "read quota is not supported");
220 }
221 if (write_quota == 0) {
222 write_quota = RWLOCK_DEFAULT_WRITE_QUOTA;
223 }
224 rwl->write_quota = write_quota;
225
226 isc_mutex_init(&rwl->lock);
227
228 isc_condition_init(&rwl->readable);
229 isc_condition_init(&rwl->writeable);
230
231 rwl->magic = RWLOCK_MAGIC;
232 }
233
234 void
isc_rwlock_destroy(isc_rwlock_t * rwl)235 isc_rwlock_destroy(isc_rwlock_t *rwl) {
236 REQUIRE(VALID_RWLOCK(rwl));
237
238 REQUIRE(atomic_load_acquire(&rwl->write_requests) ==
239 atomic_load_acquire(&rwl->write_completions) &&
240 atomic_load_acquire(&rwl->cnt_and_flag) == 0 &&
241 rwl->readers_waiting == 0);
242
243 rwl->magic = 0;
244 (void)isc_condition_destroy(&rwl->readable);
245 (void)isc_condition_destroy(&rwl->writeable);
246 isc_mutex_destroy(&rwl->lock);
247 }
248
249 /*
250 * When some architecture-dependent atomic operations are available,
251 * rwlock can be more efficient than the generic algorithm defined below.
252 * The basic algorithm is described in the following URL:
253 * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/rw.html
254 *
255 * The key is to use the following integer variables modified atomically:
256 * write_requests, write_completions, and cnt_and_flag.
257 *
258 * write_requests and write_completions act as a waiting queue for writers
259 * in order to ensure the FIFO order. Both variables begin with the initial
260 * value of 0. When a new writer tries to get a write lock, it increments
261 * write_requests and gets the previous value of the variable as a "ticket".
262 * When write_completions reaches the ticket number, the new writer can start
263 * writing. When the writer completes its work, it increments
264 * write_completions so that another new writer can start working. If the
265 * write_requests is not equal to write_completions, it means a writer is now
266 * working or waiting. In this case, a new readers cannot start reading, or
267 * in other words, this algorithm basically prefers writers.
268 *
269 * cnt_and_flag is a "lock" shared by all readers and writers. This integer
270 * variable is a kind of structure with two members: writer_flag (1 bit) and
271 * reader_count (31 bits). The writer_flag shows whether a writer is working,
272 * and the reader_count shows the number of readers currently working or almost
273 * ready for working. A writer who has the current "ticket" tries to get the
274 * lock by exclusively setting the writer_flag to 1, provided that the whole
275 * 32-bit is 0 (meaning no readers or writers working). On the other hand,
276 * a new reader tries to increment the "reader_count" field provided that
277 * the writer_flag is 0 (meaning there is no writer working).
278 *
279 * If some of the above operations fail, the reader or the writer sleeps
280 * until the related condition changes. When a working reader or writer
281 * completes its work, some readers or writers are sleeping, and the condition
282 * that suspended the reader or writer has changed, it wakes up the sleeping
283 * readers or writers.
284 *
285 * As already noted, this algorithm basically prefers writers. In order to
286 * prevent readers from starving, however, the algorithm also introduces the
287 * "writer quota" (Q). When Q consecutive writers have completed their work,
288 * suspending readers, the last writer will wake up the readers, even if a new
289 * writer is waiting.
290 *
291 * Implementation specific note: due to the combination of atomic operations
292 * and a mutex lock, ordering between the atomic operation and locks can be
293 * very sensitive in some cases. In particular, it is generally very important
294 * to check the atomic variable that requires a reader or writer to sleep after
295 * locking the mutex and before actually sleeping; otherwise, it could be very
296 * likely to cause a deadlock. For example, assume "var" is a variable
297 * atomically modified, then the corresponding code would be:
298 * if (var == need_sleep) {
299 * LOCK(lock);
300 * if (var == need_sleep)
301 * WAIT(cond, lock);
302 * UNLOCK(lock);
303 * }
304 * The second check is important, since "var" is protected by the atomic
305 * operation, not by the mutex, and can be changed just before sleeping.
306 * (The first "if" could be omitted, but this is also important in order to
307 * make the code efficient by avoiding the use of the mutex unless it is
308 * really necessary.)
309 */
310
311 #define WRITER_ACTIVE 0x1
312 #define READER_INCR 0x2
313
314 static isc_result_t
isc__rwlock_lock(isc_rwlock_t * rwl,isc_rwlocktype_t type)315 isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
316 int32_t cntflag;
317
318 REQUIRE(VALID_RWLOCK(rwl));
319
320 #ifdef ISC_RWLOCK_TRACE
321 print_lock("prelock", rwl, type);
322 #endif /* ifdef ISC_RWLOCK_TRACE */
323
324 if (type == isc_rwlocktype_read) {
325 if (atomic_load_acquire(&rwl->write_requests) !=
326 atomic_load_acquire(&rwl->write_completions))
327 {
328 /* there is a waiting or active writer */
329 LOCK(&rwl->lock);
330 if (atomic_load_acquire(&rwl->write_requests) !=
331 atomic_load_acquire(&rwl->write_completions))
332 {
333 rwl->readers_waiting++;
334 WAIT(&rwl->readable, &rwl->lock);
335 rwl->readers_waiting--;
336 }
337 UNLOCK(&rwl->lock);
338 }
339
340 cntflag = atomic_fetch_add_release(&rwl->cnt_and_flag,
341 READER_INCR);
342 POST(cntflag);
343 while (1) {
344 if ((atomic_load_acquire(&rwl->cnt_and_flag) &
345 WRITER_ACTIVE) == 0)
346 {
347 break;
348 }
349
350 /* A writer is still working */
351 LOCK(&rwl->lock);
352 rwl->readers_waiting++;
353 if ((atomic_load_acquire(&rwl->cnt_and_flag) &
354 WRITER_ACTIVE) != 0)
355 {
356 WAIT(&rwl->readable, &rwl->lock);
357 }
358 rwl->readers_waiting--;
359 UNLOCK(&rwl->lock);
360
361 /*
362 * Typically, the reader should be able to get a lock
363 * at this stage:
364 * (1) there should have been no pending writer when
365 * the reader was trying to increment the
366 * counter; otherwise, the writer should be in
367 * the waiting queue, preventing the reader from
368 * proceeding to this point.
369 * (2) once the reader increments the counter, no
370 * more writer can get a lock.
371 * Still, it is possible another writer can work at
372 * this point, e.g. in the following scenario:
373 * A previous writer unlocks the writer lock.
374 * This reader proceeds to point (1).
375 * A new writer appears, and gets a new lock before
376 * the reader increments the counter.
377 * The reader then increments the counter.
378 * The previous writer notices there is a waiting
379 * reader who is almost ready, and wakes it up.
380 * So, the reader needs to confirm whether it can now
381 * read explicitly (thus we loop). Note that this is
382 * not an infinite process, since the reader has
383 * incremented the counter at this point.
384 */
385 }
386
387 /*
388 * If we are temporarily preferred to writers due to the writer
389 * quota, reset the condition (race among readers doesn't
390 * matter).
391 */
392 atomic_store_release(&rwl->write_granted, 0);
393 } else {
394 int32_t prev_writer;
395
396 /* enter the waiting queue, and wait for our turn */
397 prev_writer = atomic_fetch_add_release(&rwl->write_requests, 1);
398 while (atomic_load_acquire(&rwl->write_completions) !=
399 prev_writer)
400 {
401 LOCK(&rwl->lock);
402 if (atomic_load_acquire(&rwl->write_completions) !=
403 prev_writer)
404 {
405 WAIT(&rwl->writeable, &rwl->lock);
406 UNLOCK(&rwl->lock);
407 continue;
408 }
409 UNLOCK(&rwl->lock);
410 break;
411 }
412
413 while (!atomic_compare_exchange_weak_acq_rel(
414 &rwl->cnt_and_flag, &(int_fast32_t){ 0 },
415 WRITER_ACTIVE))
416 {
417 /* Another active reader or writer is working. */
418 LOCK(&rwl->lock);
419 if (atomic_load_acquire(&rwl->cnt_and_flag) != 0) {
420 WAIT(&rwl->writeable, &rwl->lock);
421 }
422 UNLOCK(&rwl->lock);
423 }
424
425 INSIST((atomic_load_acquire(&rwl->cnt_and_flag) &
426 WRITER_ACTIVE));
427 atomic_fetch_add_release(&rwl->write_granted, 1);
428 }
429
430 #ifdef ISC_RWLOCK_TRACE
431 print_lock("postlock", rwl, type);
432 #endif /* ifdef ISC_RWLOCK_TRACE */
433
434 return (ISC_R_SUCCESS);
435 }
436
437 isc_result_t
isc_rwlock_lock(isc_rwlock_t * rwl,isc_rwlocktype_t type)438 isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
439 int32_t cnt = 0;
440 int32_t spins = atomic_load_acquire(&rwl->spins) * 2 + 10;
441 int32_t max_cnt = ISC_MAX(spins, RWLOCK_MAX_ADAPTIVE_COUNT);
442 isc_result_t result = ISC_R_SUCCESS;
443
444 do {
445 if (cnt++ >= max_cnt) {
446 result = isc__rwlock_lock(rwl, type);
447 break;
448 }
449 isc_rwlock_pause();
450 } while (isc_rwlock_trylock(rwl, type) != ISC_R_SUCCESS);
451
452 atomic_fetch_add_release(&rwl->spins, (cnt - spins) / 8);
453
454 return (result);
455 }
456
457 isc_result_t
isc_rwlock_trylock(isc_rwlock_t * rwl,isc_rwlocktype_t type)458 isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
459 int32_t cntflag;
460
461 REQUIRE(VALID_RWLOCK(rwl));
462
463 #ifdef ISC_RWLOCK_TRACE
464 print_lock("prelock", rwl, type);
465 #endif /* ifdef ISC_RWLOCK_TRACE */
466
467 if (type == isc_rwlocktype_read) {
468 /* If a writer is waiting or working, we fail. */
469 if (atomic_load_acquire(&rwl->write_requests) !=
470 atomic_load_acquire(&rwl->write_completions))
471 {
472 return (ISC_R_LOCKBUSY);
473 }
474
475 /* Otherwise, be ready for reading. */
476 cntflag = atomic_fetch_add_release(&rwl->cnt_and_flag,
477 READER_INCR);
478 if ((cntflag & WRITER_ACTIVE) != 0) {
479 /*
480 * A writer is working. We lose, and cancel the read
481 * request.
482 */
483 cntflag = atomic_fetch_sub_release(&rwl->cnt_and_flag,
484 READER_INCR);
485 /*
486 * If no other readers are waiting and we've suspended
487 * new writers in this short period, wake them up.
488 */
489 if (cntflag == READER_INCR &&
490 atomic_load_acquire(&rwl->write_completions) !=
491 atomic_load_acquire(&rwl->write_requests))
492 {
493 LOCK(&rwl->lock);
494 BROADCAST(&rwl->writeable);
495 UNLOCK(&rwl->lock);
496 }
497
498 return (ISC_R_LOCKBUSY);
499 }
500 } else {
501 /* Try locking without entering the waiting queue. */
502 int_fast32_t zero = 0;
503 if (!atomic_compare_exchange_strong_acq_rel(
504 &rwl->cnt_and_flag, &zero, WRITER_ACTIVE))
505 {
506 return (ISC_R_LOCKBUSY);
507 }
508
509 /*
510 * XXXJT: jump into the queue, possibly breaking the writer
511 * order.
512 */
513 atomic_fetch_sub_release(&rwl->write_completions, 1);
514 atomic_fetch_add_release(&rwl->write_granted, 1);
515 }
516
517 #ifdef ISC_RWLOCK_TRACE
518 print_lock("postlock", rwl, type);
519 #endif /* ifdef ISC_RWLOCK_TRACE */
520
521 return (ISC_R_SUCCESS);
522 }
523
524 isc_result_t
isc_rwlock_tryupgrade(isc_rwlock_t * rwl)525 isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
526 REQUIRE(VALID_RWLOCK(rwl));
527
528 int_fast32_t reader_incr = READER_INCR;
529
530 /* Try to acquire write access. */
531 atomic_compare_exchange_strong_acq_rel(&rwl->cnt_and_flag, &reader_incr,
532 WRITER_ACTIVE);
533 /*
534 * There must have been no writer, and there must have
535 * been at least one reader.
536 */
537 INSIST((reader_incr & WRITER_ACTIVE) == 0 &&
538 (reader_incr & ~WRITER_ACTIVE) != 0);
539
540 if (reader_incr == READER_INCR) {
541 /*
542 * We are the only reader and have been upgraded.
543 * Now jump into the head of the writer waiting queue.
544 */
545 atomic_fetch_sub_release(&rwl->write_completions, 1);
546 } else {
547 return (ISC_R_LOCKBUSY);
548 }
549
550 return (ISC_R_SUCCESS);
551 }
552
553 void
isc_rwlock_downgrade(isc_rwlock_t * rwl)554 isc_rwlock_downgrade(isc_rwlock_t *rwl) {
555 int32_t prev_readers;
556
557 REQUIRE(VALID_RWLOCK(rwl));
558
559 /* Become an active reader. */
560 prev_readers = atomic_fetch_add_release(&rwl->cnt_and_flag,
561 READER_INCR);
562 /* We must have been a writer. */
563 INSIST((prev_readers & WRITER_ACTIVE) != 0);
564
565 /* Complete write */
566 atomic_fetch_sub_release(&rwl->cnt_and_flag, WRITER_ACTIVE);
567 atomic_fetch_add_release(&rwl->write_completions, 1);
568
569 /* Resume other readers */
570 LOCK(&rwl->lock);
571 if (rwl->readers_waiting > 0) {
572 BROADCAST(&rwl->readable);
573 }
574 UNLOCK(&rwl->lock);
575 }
576
577 isc_result_t
isc_rwlock_unlock(isc_rwlock_t * rwl,isc_rwlocktype_t type)578 isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
579 int32_t prev_cnt;
580
581 REQUIRE(VALID_RWLOCK(rwl));
582
583 #ifdef ISC_RWLOCK_TRACE
584 print_lock("preunlock", rwl, type);
585 #endif /* ifdef ISC_RWLOCK_TRACE */
586
587 if (type == isc_rwlocktype_read) {
588 prev_cnt = atomic_fetch_sub_release(&rwl->cnt_and_flag,
589 READER_INCR);
590 /*
591 * If we're the last reader and any writers are waiting, wake
592 * them up. We need to wake up all of them to ensure the
593 * FIFO order.
594 */
595 if (prev_cnt == READER_INCR &&
596 atomic_load_acquire(&rwl->write_completions) !=
597 atomic_load_acquire(&rwl->write_requests))
598 {
599 LOCK(&rwl->lock);
600 BROADCAST(&rwl->writeable);
601 UNLOCK(&rwl->lock);
602 }
603 } else {
604 bool wakeup_writers = true;
605
606 /*
607 * Reset the flag, and (implicitly) tell other writers
608 * we are done.
609 */
610 atomic_fetch_sub_release(&rwl->cnt_and_flag, WRITER_ACTIVE);
611 atomic_fetch_add_release(&rwl->write_completions, 1);
612
613 if ((atomic_load_acquire(&rwl->write_granted) >=
614 rwl->write_quota) ||
615 (atomic_load_acquire(&rwl->write_requests) ==
616 atomic_load_acquire(&rwl->write_completions)) ||
617 (atomic_load_acquire(&rwl->cnt_and_flag) & ~WRITER_ACTIVE))
618 {
619 /*
620 * We have passed the write quota, no writer is
621 * waiting, or some readers are almost ready, pending
622 * possible writers. Note that the last case can
623 * happen even if write_requests != write_completions
624 * (which means a new writer in the queue), so we need
625 * to catch the case explicitly.
626 */
627 LOCK(&rwl->lock);
628 if (rwl->readers_waiting > 0) {
629 wakeup_writers = false;
630 BROADCAST(&rwl->readable);
631 }
632 UNLOCK(&rwl->lock);
633 }
634
635 if ((atomic_load_acquire(&rwl->write_requests) !=
636 atomic_load_acquire(&rwl->write_completions)) &&
637 wakeup_writers)
638 {
639 LOCK(&rwl->lock);
640 BROADCAST(&rwl->writeable);
641 UNLOCK(&rwl->lock);
642 }
643 }
644
645 #ifdef ISC_RWLOCK_TRACE
646 print_lock("postunlock", rwl, type);
647 #endif /* ifdef ISC_RWLOCK_TRACE */
648
649 return (ISC_R_SUCCESS);
650 }
651
652 #endif /* USE_PTHREAD_RWLOCK */
653