xref: /netbsd-src/external/mpl/dhcp/bind/dist/lib/isc/rwlock.c (revision 4afad4b7fa6d4a0d3dedf41d1587a7250710ae54)
1 /*	$NetBSD: rwlock.c,v 1.1 2024/02/18 20:57:50 christos Exp $	*/
2 
3 /*
4  * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
5  *
6  * SPDX-License-Identifier: MPL-2.0
7  *
8  * This Source Code Form is subject to the terms of the Mozilla Public
9  * License, v. 2.0. If a copy of the MPL was not distributed with this
10  * file, you can obtain one at https://mozilla.org/MPL/2.0/.
11  *
12  * See the COPYRIGHT file distributed with this work for additional
13  * information regarding copyright ownership.
14  */
15 
16 /*! \file */
17 
18 #include <inttypes.h>
19 #include <stdbool.h>
20 #include <stddef.h>
21 
22 #if defined(sun) && (defined(__sparc) || defined(__sparc__))
23 #include <synch.h> /* for smt_pause(3c) */
24 #endif /* if defined(sun) && (defined(__sparc) || defined(__sparc__)) */
25 
26 #include <isc/atomic.h>
27 #include <isc/magic.h>
28 #include <isc/platform.h>
29 #include <isc/print.h>
30 #include <isc/rwlock.h>
31 #include <isc/util.h>
32 
33 #if USE_PTHREAD_RWLOCK
34 
35 #include <errno.h>
36 #include <pthread.h>
37 
38 void
isc_rwlock_init(isc_rwlock_t * rwl,unsigned int read_quota,unsigned int write_quota)39 isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
40 		unsigned int write_quota) {
41 	UNUSED(read_quota);
42 	UNUSED(write_quota);
43 	REQUIRE(pthread_rwlock_init(&rwl->rwlock, NULL) == 0);
44 	atomic_init(&rwl->downgrade, false);
45 }
46 
47 isc_result_t
isc_rwlock_lock(isc_rwlock_t * rwl,isc_rwlocktype_t type)48 isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
49 	switch (type) {
50 	case isc_rwlocktype_read:
51 		REQUIRE(pthread_rwlock_rdlock(&rwl->rwlock) == 0);
52 		break;
53 	case isc_rwlocktype_write:
54 		while (true) {
55 			REQUIRE(pthread_rwlock_wrlock(&rwl->rwlock) == 0);
56 			/* Unlock if in middle of downgrade operation */
57 			if (atomic_load_acquire(&rwl->downgrade)) {
58 				REQUIRE(pthread_rwlock_unlock(&rwl->rwlock) ==
59 					0);
60 				while (atomic_load_acquire(&rwl->downgrade)) {
61 				}
62 				continue;
63 			}
64 			break;
65 		}
66 		break;
67 	default:
68 		UNREACHABLE();
69 	}
70 	return (ISC_R_SUCCESS);
71 }
72 
73 isc_result_t
isc_rwlock_trylock(isc_rwlock_t * rwl,isc_rwlocktype_t type)74 isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
75 	int ret = 0;
76 	switch (type) {
77 	case isc_rwlocktype_read:
78 		ret = pthread_rwlock_tryrdlock(&rwl->rwlock);
79 		break;
80 	case isc_rwlocktype_write:
81 		ret = pthread_rwlock_trywrlock(&rwl->rwlock);
82 		if ((ret == 0) && atomic_load_acquire(&rwl->downgrade)) {
83 			isc_rwlock_unlock(rwl, type);
84 			return (ISC_R_LOCKBUSY);
85 		}
86 		break;
87 	default:
88 		UNREACHABLE();
89 	}
90 
91 	switch (ret) {
92 	case 0:
93 		return (ISC_R_SUCCESS);
94 	case EBUSY:
95 		return (ISC_R_LOCKBUSY);
96 	case EAGAIN:
97 		return (ISC_R_LOCKBUSY);
98 	default:
99 		UNREACHABLE();
100 	}
101 }
102 
103 isc_result_t
isc_rwlock_unlock(isc_rwlock_t * rwl,isc_rwlocktype_t type)104 isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
105 	UNUSED(type);
106 	REQUIRE(pthread_rwlock_unlock(&rwl->rwlock) == 0);
107 	return (ISC_R_SUCCESS);
108 }
109 
110 isc_result_t
isc_rwlock_tryupgrade(isc_rwlock_t * rwl)111 isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
112 	UNUSED(rwl);
113 	return (ISC_R_LOCKBUSY);
114 }
115 
116 void
isc_rwlock_downgrade(isc_rwlock_t * rwl)117 isc_rwlock_downgrade(isc_rwlock_t *rwl) {
118 	isc_result_t result;
119 	atomic_store_release(&rwl->downgrade, true);
120 	result = isc_rwlock_unlock(rwl, isc_rwlocktype_write);
121 	RUNTIME_CHECK(result == ISC_R_SUCCESS);
122 	result = isc_rwlock_lock(rwl, isc_rwlocktype_read);
123 	RUNTIME_CHECK(result == ISC_R_SUCCESS);
124 	atomic_store_release(&rwl->downgrade, false);
125 }
126 
127 void
isc_rwlock_destroy(isc_rwlock_t * rwl)128 isc_rwlock_destroy(isc_rwlock_t *rwl) {
129 	pthread_rwlock_destroy(&rwl->rwlock);
130 }
131 
132 #else /* if USE_PTHREAD_RWLOCK */
133 
134 #define RWLOCK_MAGIC	  ISC_MAGIC('R', 'W', 'L', 'k')
135 #define VALID_RWLOCK(rwl) ISC_MAGIC_VALID(rwl, RWLOCK_MAGIC)
136 
137 #ifndef RWLOCK_DEFAULT_READ_QUOTA
138 #define RWLOCK_DEFAULT_READ_QUOTA 4
139 #endif /* ifndef RWLOCK_DEFAULT_READ_QUOTA */
140 
141 #ifndef RWLOCK_DEFAULT_WRITE_QUOTA
142 #define RWLOCK_DEFAULT_WRITE_QUOTA 4
143 #endif /* ifndef RWLOCK_DEFAULT_WRITE_QUOTA */
144 
145 #ifndef RWLOCK_MAX_ADAPTIVE_COUNT
146 #define RWLOCK_MAX_ADAPTIVE_COUNT 100
147 #endif /* ifndef RWLOCK_MAX_ADAPTIVE_COUNT */
148 
149 #ifdef __lint__
150 # define isc_rwlock_pause()
151 #else
152 #if defined(_MSC_VER)
153 #include <intrin.h>
154 #define isc_rwlock_pause() YieldProcessor()
155 #elif defined(__x86_64__)
156 #include <immintrin.h>
157 #define isc_rwlock_pause() _mm_pause()
158 #elif defined(__i386__)
159 #define isc_rwlock_pause() __asm__ __volatile__("rep; nop")
160 #elif defined(__ia64__)
161 #define isc_rwlock_pause() __asm__ __volatile__("hint @pause")
162 #elif defined(__arm__) && (defined(_ARM_ARCH_6) || HAVE_ARM_YIELD)
163 #define isc_rwlock_pause() __asm__ __volatile__("yield")
164 #elif defined(sun) && (defined(__sparc) || defined(__sparc__))
165 #define isc_rwlock_pause() smt_pause()
166 #elif (defined(__sparc) || defined(__sparc__)) && HAVE_SPARC_PAUSE
167 #define isc_rwlock_pause() __asm__ __volatile__("pause")
168 #elif defined(__ppc__) || defined(_ARCH_PPC) || defined(_ARCH_PWR) || \
169 	defined(_ARCH_PWR2) || defined(_POWER)
170 #define isc_rwlock_pause() __asm__ volatile("or 27,27,27")
171 #else /* if defined(_MSC_VER) */
172 #define isc_rwlock_pause()
173 #endif /* if defined(_MSC_VER) */
174 #endif
175 
176 static isc_result_t
177 isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type);
178 
179 #ifdef ISC_RWLOCK_TRACE
180 #include <stdio.h> /* Required for fprintf/stderr. */
181 
182 #include <isc/thread.h> /* Required for isc_thread_self(). */
183 
184 static void
print_lock(const char * operation,isc_rwlock_t * rwl,isc_rwlocktype_t type)185 print_lock(const char *operation, isc_rwlock_t *rwl, isc_rwlocktype_t type) {
186 	fprintf(stderr,
187 		"rwlock %p thread %" PRIuPTR " %s(%s): "
188 		"write_requests=%u, write_completions=%u, "
189 		"cnt_and_flag=0x%x, readers_waiting=%u, "
190 		"write_granted=%u, write_quota=%u\n",
191 		rwl, isc_thread_self(), operation,
192 		(type == isc_rwlocktype_read ? "read" : "write"),
193 		atomic_load_acquire(&rwl->write_requests),
194 		atomic_load_acquire(&rwl->write_completions),
195 		atomic_load_acquire(&rwl->cnt_and_flag), rwl->readers_waiting,
196 		atomic_load_acquire(&rwl->write_granted), rwl->write_quota);
197 }
198 #endif			/* ISC_RWLOCK_TRACE */
199 
200 void
isc_rwlock_init(isc_rwlock_t * rwl,unsigned int read_quota,unsigned int write_quota)201 isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
202 		unsigned int write_quota) {
203 	REQUIRE(rwl != NULL);
204 
205 	/*
206 	 * In case there's trouble initializing, we zero magic now.  If all
207 	 * goes well, we'll set it to RWLOCK_MAGIC.
208 	 */
209 	rwl->magic = 0;
210 
211 	atomic_init(&rwl->spins, 0);
212 	atomic_init(&rwl->write_requests, 0);
213 	atomic_init(&rwl->write_completions, 0);
214 	atomic_init(&rwl->cnt_and_flag, 0);
215 	rwl->readers_waiting = 0;
216 	atomic_init(&rwl->write_granted, 0);
217 	if (read_quota != 0) {
218 		UNEXPECTED_ERROR(__FILE__, __LINE__,
219 				 "read quota is not supported");
220 	}
221 	if (write_quota == 0) {
222 		write_quota = RWLOCK_DEFAULT_WRITE_QUOTA;
223 	}
224 	rwl->write_quota = write_quota;
225 
226 	isc_mutex_init(&rwl->lock);
227 
228 	isc_condition_init(&rwl->readable);
229 	isc_condition_init(&rwl->writeable);
230 
231 	rwl->magic = RWLOCK_MAGIC;
232 }
233 
234 void
isc_rwlock_destroy(isc_rwlock_t * rwl)235 isc_rwlock_destroy(isc_rwlock_t *rwl) {
236 	REQUIRE(VALID_RWLOCK(rwl));
237 
238 	REQUIRE(atomic_load_acquire(&rwl->write_requests) ==
239 			atomic_load_acquire(&rwl->write_completions) &&
240 		atomic_load_acquire(&rwl->cnt_and_flag) == 0 &&
241 		rwl->readers_waiting == 0);
242 
243 	rwl->magic = 0;
244 	(void)isc_condition_destroy(&rwl->readable);
245 	(void)isc_condition_destroy(&rwl->writeable);
246 	isc_mutex_destroy(&rwl->lock);
247 }
248 
249 /*
250  * When some architecture-dependent atomic operations are available,
251  * rwlock can be more efficient than the generic algorithm defined below.
252  * The basic algorithm is described in the following URL:
253  *   http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/rw.html
254  *
255  * The key is to use the following integer variables modified atomically:
256  *   write_requests, write_completions, and cnt_and_flag.
257  *
258  * write_requests and write_completions act as a waiting queue for writers
259  * in order to ensure the FIFO order.  Both variables begin with the initial
260  * value of 0.  When a new writer tries to get a write lock, it increments
261  * write_requests and gets the previous value of the variable as a "ticket".
262  * When write_completions reaches the ticket number, the new writer can start
263  * writing.  When the writer completes its work, it increments
264  * write_completions so that another new writer can start working.  If the
265  * write_requests is not equal to write_completions, it means a writer is now
266  * working or waiting.  In this case, a new readers cannot start reading, or
267  * in other words, this algorithm basically prefers writers.
268  *
269  * cnt_and_flag is a "lock" shared by all readers and writers.  This integer
270  * variable is a kind of structure with two members: writer_flag (1 bit) and
271  * reader_count (31 bits).  The writer_flag shows whether a writer is working,
272  * and the reader_count shows the number of readers currently working or almost
273  * ready for working.  A writer who has the current "ticket" tries to get the
274  * lock by exclusively setting the writer_flag to 1, provided that the whole
275  * 32-bit is 0 (meaning no readers or writers working).  On the other hand,
276  * a new reader tries to increment the "reader_count" field provided that
277  * the writer_flag is 0 (meaning there is no writer working).
278  *
279  * If some of the above operations fail, the reader or the writer sleeps
280  * until the related condition changes.  When a working reader or writer
281  * completes its work, some readers or writers are sleeping, and the condition
282  * that suspended the reader or writer has changed, it wakes up the sleeping
283  * readers or writers.
284  *
285  * As already noted, this algorithm basically prefers writers.  In order to
286  * prevent readers from starving, however, the algorithm also introduces the
287  * "writer quota" (Q).  When Q consecutive writers have completed their work,
288  * suspending readers, the last writer will wake up the readers, even if a new
289  * writer is waiting.
290  *
291  * Implementation specific note: due to the combination of atomic operations
292  * and a mutex lock, ordering between the atomic operation and locks can be
293  * very sensitive in some cases.  In particular, it is generally very important
294  * to check the atomic variable that requires a reader or writer to sleep after
295  * locking the mutex and before actually sleeping; otherwise, it could be very
296  * likely to cause a deadlock.  For example, assume "var" is a variable
297  * atomically modified, then the corresponding code would be:
298  *	if (var == need_sleep) {
299  *		LOCK(lock);
300  *		if (var == need_sleep)
301  *			WAIT(cond, lock);
302  *		UNLOCK(lock);
303  *	}
304  * The second check is important, since "var" is protected by the atomic
305  * operation, not by the mutex, and can be changed just before sleeping.
306  * (The first "if" could be omitted, but this is also important in order to
307  * make the code efficient by avoiding the use of the mutex unless it is
308  * really necessary.)
309  */
310 
311 #define WRITER_ACTIVE 0x1
312 #define READER_INCR   0x2
313 
314 static isc_result_t
isc__rwlock_lock(isc_rwlock_t * rwl,isc_rwlocktype_t type)315 isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
316 	int32_t cntflag;
317 
318 	REQUIRE(VALID_RWLOCK(rwl));
319 
320 #ifdef ISC_RWLOCK_TRACE
321 	print_lock("prelock", rwl, type);
322 #endif /* ifdef ISC_RWLOCK_TRACE */
323 
324 	if (type == isc_rwlocktype_read) {
325 		if (atomic_load_acquire(&rwl->write_requests) !=
326 		    atomic_load_acquire(&rwl->write_completions))
327 		{
328 			/* there is a waiting or active writer */
329 			LOCK(&rwl->lock);
330 			if (atomic_load_acquire(&rwl->write_requests) !=
331 			    atomic_load_acquire(&rwl->write_completions))
332 			{
333 				rwl->readers_waiting++;
334 				WAIT(&rwl->readable, &rwl->lock);
335 				rwl->readers_waiting--;
336 			}
337 			UNLOCK(&rwl->lock);
338 		}
339 
340 		cntflag = atomic_fetch_add_release(&rwl->cnt_and_flag,
341 						   READER_INCR);
342 		POST(cntflag);
343 		while (1) {
344 			if ((atomic_load_acquire(&rwl->cnt_and_flag) &
345 			     WRITER_ACTIVE) == 0)
346 			{
347 				break;
348 			}
349 
350 			/* A writer is still working */
351 			LOCK(&rwl->lock);
352 			rwl->readers_waiting++;
353 			if ((atomic_load_acquire(&rwl->cnt_and_flag) &
354 			     WRITER_ACTIVE) != 0)
355 			{
356 				WAIT(&rwl->readable, &rwl->lock);
357 			}
358 			rwl->readers_waiting--;
359 			UNLOCK(&rwl->lock);
360 
361 			/*
362 			 * Typically, the reader should be able to get a lock
363 			 * at this stage:
364 			 *   (1) there should have been no pending writer when
365 			 *       the reader was trying to increment the
366 			 *       counter; otherwise, the writer should be in
367 			 *       the waiting queue, preventing the reader from
368 			 *       proceeding to this point.
369 			 *   (2) once the reader increments the counter, no
370 			 *       more writer can get a lock.
371 			 * Still, it is possible another writer can work at
372 			 * this point, e.g. in the following scenario:
373 			 *   A previous writer unlocks the writer lock.
374 			 *   This reader proceeds to point (1).
375 			 *   A new writer appears, and gets a new lock before
376 			 *   the reader increments the counter.
377 			 *   The reader then increments the counter.
378 			 *   The previous writer notices there is a waiting
379 			 *   reader who is almost ready, and wakes it up.
380 			 * So, the reader needs to confirm whether it can now
381 			 * read explicitly (thus we loop).  Note that this is
382 			 * not an infinite process, since the reader has
383 			 * incremented the counter at this point.
384 			 */
385 		}
386 
387 		/*
388 		 * If we are temporarily preferred to writers due to the writer
389 		 * quota, reset the condition (race among readers doesn't
390 		 * matter).
391 		 */
392 		atomic_store_release(&rwl->write_granted, 0);
393 	} else {
394 		int32_t prev_writer;
395 
396 		/* enter the waiting queue, and wait for our turn */
397 		prev_writer = atomic_fetch_add_release(&rwl->write_requests, 1);
398 		while (atomic_load_acquire(&rwl->write_completions) !=
399 		       prev_writer)
400 		{
401 			LOCK(&rwl->lock);
402 			if (atomic_load_acquire(&rwl->write_completions) !=
403 			    prev_writer)
404 			{
405 				WAIT(&rwl->writeable, &rwl->lock);
406 				UNLOCK(&rwl->lock);
407 				continue;
408 			}
409 			UNLOCK(&rwl->lock);
410 			break;
411 		}
412 
413 		while (!atomic_compare_exchange_weak_acq_rel(
414 			&rwl->cnt_and_flag, &(int_fast32_t){ 0 },
415 			WRITER_ACTIVE))
416 		{
417 			/* Another active reader or writer is working. */
418 			LOCK(&rwl->lock);
419 			if (atomic_load_acquire(&rwl->cnt_and_flag) != 0) {
420 				WAIT(&rwl->writeable, &rwl->lock);
421 			}
422 			UNLOCK(&rwl->lock);
423 		}
424 
425 		INSIST((atomic_load_acquire(&rwl->cnt_and_flag) &
426 			WRITER_ACTIVE));
427 		atomic_fetch_add_release(&rwl->write_granted, 1);
428 	}
429 
430 #ifdef ISC_RWLOCK_TRACE
431 	print_lock("postlock", rwl, type);
432 #endif /* ifdef ISC_RWLOCK_TRACE */
433 
434 	return (ISC_R_SUCCESS);
435 }
436 
437 isc_result_t
isc_rwlock_lock(isc_rwlock_t * rwl,isc_rwlocktype_t type)438 isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
439 	int32_t cnt = 0;
440 	int32_t spins = atomic_load_acquire(&rwl->spins) * 2 + 10;
441 	int32_t max_cnt = ISC_MAX(spins, RWLOCK_MAX_ADAPTIVE_COUNT);
442 	isc_result_t result = ISC_R_SUCCESS;
443 
444 	do {
445 		if (cnt++ >= max_cnt) {
446 			result = isc__rwlock_lock(rwl, type);
447 			break;
448 		}
449 		isc_rwlock_pause();
450 	} while (isc_rwlock_trylock(rwl, type) != ISC_R_SUCCESS);
451 
452 	atomic_fetch_add_release(&rwl->spins, (cnt - spins) / 8);
453 
454 	return (result);
455 }
456 
457 isc_result_t
isc_rwlock_trylock(isc_rwlock_t * rwl,isc_rwlocktype_t type)458 isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
459 	int32_t cntflag;
460 
461 	REQUIRE(VALID_RWLOCK(rwl));
462 
463 #ifdef ISC_RWLOCK_TRACE
464 	print_lock("prelock", rwl, type);
465 #endif /* ifdef ISC_RWLOCK_TRACE */
466 
467 	if (type == isc_rwlocktype_read) {
468 		/* If a writer is waiting or working, we fail. */
469 		if (atomic_load_acquire(&rwl->write_requests) !=
470 		    atomic_load_acquire(&rwl->write_completions))
471 		{
472 			return (ISC_R_LOCKBUSY);
473 		}
474 
475 		/* Otherwise, be ready for reading. */
476 		cntflag = atomic_fetch_add_release(&rwl->cnt_and_flag,
477 						   READER_INCR);
478 		if ((cntflag & WRITER_ACTIVE) != 0) {
479 			/*
480 			 * A writer is working.  We lose, and cancel the read
481 			 * request.
482 			 */
483 			cntflag = atomic_fetch_sub_release(&rwl->cnt_and_flag,
484 							   READER_INCR);
485 			/*
486 			 * If no other readers are waiting and we've suspended
487 			 * new writers in this short period, wake them up.
488 			 */
489 			if (cntflag == READER_INCR &&
490 			    atomic_load_acquire(&rwl->write_completions) !=
491 				    atomic_load_acquire(&rwl->write_requests))
492 			{
493 				LOCK(&rwl->lock);
494 				BROADCAST(&rwl->writeable);
495 				UNLOCK(&rwl->lock);
496 			}
497 
498 			return (ISC_R_LOCKBUSY);
499 		}
500 	} else {
501 		/* Try locking without entering the waiting queue. */
502 		int_fast32_t zero = 0;
503 		if (!atomic_compare_exchange_strong_acq_rel(
504 			    &rwl->cnt_and_flag, &zero, WRITER_ACTIVE))
505 		{
506 			return (ISC_R_LOCKBUSY);
507 		}
508 
509 		/*
510 		 * XXXJT: jump into the queue, possibly breaking the writer
511 		 * order.
512 		 */
513 		atomic_fetch_sub_release(&rwl->write_completions, 1);
514 		atomic_fetch_add_release(&rwl->write_granted, 1);
515 	}
516 
517 #ifdef ISC_RWLOCK_TRACE
518 	print_lock("postlock", rwl, type);
519 #endif /* ifdef ISC_RWLOCK_TRACE */
520 
521 	return (ISC_R_SUCCESS);
522 }
523 
524 isc_result_t
isc_rwlock_tryupgrade(isc_rwlock_t * rwl)525 isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
526 	REQUIRE(VALID_RWLOCK(rwl));
527 
528 	int_fast32_t reader_incr = READER_INCR;
529 
530 	/* Try to acquire write access. */
531 	atomic_compare_exchange_strong_acq_rel(&rwl->cnt_and_flag, &reader_incr,
532 					       WRITER_ACTIVE);
533 	/*
534 	 * There must have been no writer, and there must have
535 	 * been at least one reader.
536 	 */
537 	INSIST((reader_incr & WRITER_ACTIVE) == 0 &&
538 	       (reader_incr & ~WRITER_ACTIVE) != 0);
539 
540 	if (reader_incr == READER_INCR) {
541 		/*
542 		 * We are the only reader and have been upgraded.
543 		 * Now jump into the head of the writer waiting queue.
544 		 */
545 		atomic_fetch_sub_release(&rwl->write_completions, 1);
546 	} else {
547 		return (ISC_R_LOCKBUSY);
548 	}
549 
550 	return (ISC_R_SUCCESS);
551 }
552 
553 void
isc_rwlock_downgrade(isc_rwlock_t * rwl)554 isc_rwlock_downgrade(isc_rwlock_t *rwl) {
555 	int32_t prev_readers;
556 
557 	REQUIRE(VALID_RWLOCK(rwl));
558 
559 	/* Become an active reader. */
560 	prev_readers = atomic_fetch_add_release(&rwl->cnt_and_flag,
561 						READER_INCR);
562 	/* We must have been a writer. */
563 	INSIST((prev_readers & WRITER_ACTIVE) != 0);
564 
565 	/* Complete write */
566 	atomic_fetch_sub_release(&rwl->cnt_and_flag, WRITER_ACTIVE);
567 	atomic_fetch_add_release(&rwl->write_completions, 1);
568 
569 	/* Resume other readers */
570 	LOCK(&rwl->lock);
571 	if (rwl->readers_waiting > 0) {
572 		BROADCAST(&rwl->readable);
573 	}
574 	UNLOCK(&rwl->lock);
575 }
576 
577 isc_result_t
isc_rwlock_unlock(isc_rwlock_t * rwl,isc_rwlocktype_t type)578 isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
579 	int32_t prev_cnt;
580 
581 	REQUIRE(VALID_RWLOCK(rwl));
582 
583 #ifdef ISC_RWLOCK_TRACE
584 	print_lock("preunlock", rwl, type);
585 #endif /* ifdef ISC_RWLOCK_TRACE */
586 
587 	if (type == isc_rwlocktype_read) {
588 		prev_cnt = atomic_fetch_sub_release(&rwl->cnt_and_flag,
589 						    READER_INCR);
590 		/*
591 		 * If we're the last reader and any writers are waiting, wake
592 		 * them up.  We need to wake up all of them to ensure the
593 		 * FIFO order.
594 		 */
595 		if (prev_cnt == READER_INCR &&
596 		    atomic_load_acquire(&rwl->write_completions) !=
597 			    atomic_load_acquire(&rwl->write_requests))
598 		{
599 			LOCK(&rwl->lock);
600 			BROADCAST(&rwl->writeable);
601 			UNLOCK(&rwl->lock);
602 		}
603 	} else {
604 		bool wakeup_writers = true;
605 
606 		/*
607 		 * Reset the flag, and (implicitly) tell other writers
608 		 * we are done.
609 		 */
610 		atomic_fetch_sub_release(&rwl->cnt_and_flag, WRITER_ACTIVE);
611 		atomic_fetch_add_release(&rwl->write_completions, 1);
612 
613 		if ((atomic_load_acquire(&rwl->write_granted) >=
614 		     rwl->write_quota) ||
615 		    (atomic_load_acquire(&rwl->write_requests) ==
616 		     atomic_load_acquire(&rwl->write_completions)) ||
617 		    (atomic_load_acquire(&rwl->cnt_and_flag) & ~WRITER_ACTIVE))
618 		{
619 			/*
620 			 * We have passed the write quota, no writer is
621 			 * waiting, or some readers are almost ready, pending
622 			 * possible writers.  Note that the last case can
623 			 * happen even if write_requests != write_completions
624 			 * (which means a new writer in the queue), so we need
625 			 * to catch the case explicitly.
626 			 */
627 			LOCK(&rwl->lock);
628 			if (rwl->readers_waiting > 0) {
629 				wakeup_writers = false;
630 				BROADCAST(&rwl->readable);
631 			}
632 			UNLOCK(&rwl->lock);
633 		}
634 
635 		if ((atomic_load_acquire(&rwl->write_requests) !=
636 		     atomic_load_acquire(&rwl->write_completions)) &&
637 		    wakeup_writers)
638 		{
639 			LOCK(&rwl->lock);
640 			BROADCAST(&rwl->writeable);
641 			UNLOCK(&rwl->lock);
642 		}
643 	}
644 
645 #ifdef ISC_RWLOCK_TRACE
646 	print_lock("postunlock", rwl, type);
647 #endif /* ifdef ISC_RWLOCK_TRACE */
648 
649 	return (ISC_R_SUCCESS);
650 }
651 
652 #endif /* USE_PTHREAD_RWLOCK */
653