1eda14cbcSMatt Macy /* 2eda14cbcSMatt Macy * CDDL HEADER START 3eda14cbcSMatt Macy * 4eda14cbcSMatt Macy * This file and its contents are supplied under the terms of the 5eda14cbcSMatt Macy * Common Development and Distribution License ("CDDL"), version 1.0. 6eda14cbcSMatt Macy * You may only use this file in accordance with the terms of version 7eda14cbcSMatt Macy * 1.0 of the CDDL. 8eda14cbcSMatt Macy * 9eda14cbcSMatt Macy * A full copy of the text of the CDDL should have accompanied this 10eda14cbcSMatt Macy * source. A copy of the CDDL is also available via the Internet at 11eda14cbcSMatt Macy * http://www.illumos.org/license/CDDL. 12eda14cbcSMatt Macy * 13eda14cbcSMatt Macy * CDDL HEADER END 14eda14cbcSMatt Macy */ 15eda14cbcSMatt Macy /* 16eda14cbcSMatt Macy * Copyright (c) 2014, 2018 by Delphix. All rights reserved. 17eda14cbcSMatt Macy */ 18eda14cbcSMatt Macy 19eda14cbcSMatt Macy #include <sys/bqueue.h> 20eda14cbcSMatt Macy #include <sys/zfs_context.h> 21eda14cbcSMatt Macy 22eda14cbcSMatt Macy static inline bqueue_node_t * 23eda14cbcSMatt Macy obj2node(bqueue_t *q, void *data) 24eda14cbcSMatt Macy { 25eda14cbcSMatt Macy return ((bqueue_node_t *)((char *)data + q->bq_node_offset)); 26eda14cbcSMatt Macy } 27eda14cbcSMatt Macy 28eda14cbcSMatt Macy /* 29eda14cbcSMatt Macy * Initialize a blocking queue The maximum capacity of the queue is set to 30eda14cbcSMatt Macy * size. Types that are stored in a bqueue must contain a bqueue_node_t, 31eda14cbcSMatt Macy * and node_offset must be its offset from the start of the struct. 32eda14cbcSMatt Macy * fill_fraction is a performance tuning value; when the queue is full, any 33eda14cbcSMatt Macy * threads attempting to enqueue records will block. They will block until 34eda14cbcSMatt Macy * they're signaled, which will occur when the queue is at least 1/fill_fraction 35eda14cbcSMatt Macy * empty. Similar behavior occurs on dequeue; if the queue is empty, threads 36eda14cbcSMatt Macy * block. They will be signalled when the queue has 1/fill_fraction full, or 37eda14cbcSMatt Macy * when bqueue_flush is called. As a result, you must call bqueue_flush when 38eda14cbcSMatt Macy * you enqueue your final record on a thread, in case the dequeueing threads are 39eda14cbcSMatt Macy * currently blocked and that enqueue does not cause them to be awoken. 40eda14cbcSMatt Macy * Alternatively, this behavior can be disabled (causing signaling to happen 41eda14cbcSMatt Macy * immediately) by setting fill_fraction to any value larger than size. 42eda14cbcSMatt Macy * Return 0 on success, or -1 on failure. 43eda14cbcSMatt Macy */ 44eda14cbcSMatt Macy int 45*c7046f76SMartin Matuska bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset) 46eda14cbcSMatt Macy { 47eda14cbcSMatt Macy if (fill_fraction == 0) { 48eda14cbcSMatt Macy return (-1); 49eda14cbcSMatt Macy } 50eda14cbcSMatt Macy list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t), 51eda14cbcSMatt Macy node_offset + offsetof(bqueue_node_t, bqn_node)); 52eda14cbcSMatt Macy cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL); 53eda14cbcSMatt Macy cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL); 54eda14cbcSMatt Macy mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL); 55eda14cbcSMatt Macy q->bq_node_offset = node_offset; 56eda14cbcSMatt Macy q->bq_size = 0; 57eda14cbcSMatt Macy q->bq_maxsize = size; 58eda14cbcSMatt Macy q->bq_fill_fraction = fill_fraction; 59eda14cbcSMatt Macy return (0); 60eda14cbcSMatt Macy } 61eda14cbcSMatt Macy 62eda14cbcSMatt Macy /* 63eda14cbcSMatt Macy * Destroy a blocking queue. This function asserts that there are no 64eda14cbcSMatt Macy * elements in the queue, and no one is blocked on the condition 65eda14cbcSMatt Macy * variables. 66eda14cbcSMatt Macy */ 67eda14cbcSMatt Macy void 68eda14cbcSMatt Macy bqueue_destroy(bqueue_t *q) 69eda14cbcSMatt Macy { 70eda14cbcSMatt Macy mutex_enter(&q->bq_lock); 71eda14cbcSMatt Macy ASSERT0(q->bq_size); 72eda14cbcSMatt Macy cv_destroy(&q->bq_add_cv); 73eda14cbcSMatt Macy cv_destroy(&q->bq_pop_cv); 74eda14cbcSMatt Macy list_destroy(&q->bq_list); 75eda14cbcSMatt Macy mutex_exit(&q->bq_lock); 76eda14cbcSMatt Macy mutex_destroy(&q->bq_lock); 77eda14cbcSMatt Macy } 78eda14cbcSMatt Macy 79eda14cbcSMatt Macy static void 80*c7046f76SMartin Matuska bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush) 81eda14cbcSMatt Macy { 82eda14cbcSMatt Macy ASSERT3U(item_size, >, 0); 83eda14cbcSMatt Macy ASSERT3U(item_size, <=, q->bq_maxsize); 84eda14cbcSMatt Macy mutex_enter(&q->bq_lock); 85eda14cbcSMatt Macy obj2node(q, data)->bqn_size = item_size; 86*c7046f76SMartin Matuska while (q->bq_size && q->bq_size + item_size > q->bq_maxsize) { 87*c7046f76SMartin Matuska /* 88*c7046f76SMartin Matuska * Wake up bqueue_dequeue() thread if already sleeping in order 89*c7046f76SMartin Matuska * to prevent the deadlock condition 90*c7046f76SMartin Matuska */ 91*c7046f76SMartin Matuska cv_signal(&q->bq_pop_cv); 92eda14cbcSMatt Macy cv_wait_sig(&q->bq_add_cv, &q->bq_lock); 93eda14cbcSMatt Macy } 94eda14cbcSMatt Macy q->bq_size += item_size; 95eda14cbcSMatt Macy list_insert_tail(&q->bq_list, data); 96eda14cbcSMatt Macy if (flush) 97eda14cbcSMatt Macy cv_broadcast(&q->bq_pop_cv); 98*c7046f76SMartin Matuska else if (q->bq_size >= q->bq_maxsize / q->bq_fill_fraction) 99*c7046f76SMartin Matuska cv_signal(&q->bq_pop_cv); 100eda14cbcSMatt Macy mutex_exit(&q->bq_lock); 101eda14cbcSMatt Macy } 102eda14cbcSMatt Macy 103eda14cbcSMatt Macy /* 104eda14cbcSMatt Macy * Add data to q, consuming size units of capacity. If there is insufficient 105eda14cbcSMatt Macy * capacity to consume size units, block until capacity exists. Asserts size is 106eda14cbcSMatt Macy * > 0. 107eda14cbcSMatt Macy */ 108eda14cbcSMatt Macy void 109*c7046f76SMartin Matuska bqueue_enqueue(bqueue_t *q, void *data, size_t item_size) 110eda14cbcSMatt Macy { 111eda14cbcSMatt Macy bqueue_enqueue_impl(q, data, item_size, B_FALSE); 112eda14cbcSMatt Macy } 113eda14cbcSMatt Macy 114eda14cbcSMatt Macy /* 115eda14cbcSMatt Macy * Enqueue an entry, and then flush the queue. This forces the popping threads 116eda14cbcSMatt Macy * to wake up, even if we're below the fill fraction. We have this in a single 117eda14cbcSMatt Macy * function, rather than having a separate call, because it prevents race 118eda14cbcSMatt Macy * conditions between the enqueuing thread and the dequeueing thread, where the 119eda14cbcSMatt Macy * enqueueing thread will wake up the dequeueing thread, that thread will 120eda14cbcSMatt Macy * destroy the condvar before the enqueuing thread is done. 121eda14cbcSMatt Macy */ 122eda14cbcSMatt Macy void 123*c7046f76SMartin Matuska bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size) 124eda14cbcSMatt Macy { 125eda14cbcSMatt Macy bqueue_enqueue_impl(q, data, item_size, B_TRUE); 126eda14cbcSMatt Macy } 127eda14cbcSMatt Macy 128eda14cbcSMatt Macy /* 129eda14cbcSMatt Macy * Take the first element off of q. If there are no elements on the queue, wait 130eda14cbcSMatt Macy * until one is put there. Return the removed element. 131eda14cbcSMatt Macy */ 132eda14cbcSMatt Macy void * 133eda14cbcSMatt Macy bqueue_dequeue(bqueue_t *q) 134eda14cbcSMatt Macy { 135eda14cbcSMatt Macy void *ret = NULL; 136*c7046f76SMartin Matuska size_t item_size; 137eda14cbcSMatt Macy mutex_enter(&q->bq_lock); 138eda14cbcSMatt Macy while (q->bq_size == 0) { 139eda14cbcSMatt Macy cv_wait_sig(&q->bq_pop_cv, &q->bq_lock); 140eda14cbcSMatt Macy } 141eda14cbcSMatt Macy ret = list_remove_head(&q->bq_list); 142eda14cbcSMatt Macy ASSERT3P(ret, !=, NULL); 143eda14cbcSMatt Macy item_size = obj2node(q, ret)->bqn_size; 144eda14cbcSMatt Macy q->bq_size -= item_size; 145eda14cbcSMatt Macy if (q->bq_size <= q->bq_maxsize - (q->bq_maxsize / q->bq_fill_fraction)) 146eda14cbcSMatt Macy cv_signal(&q->bq_add_cv); 147eda14cbcSMatt Macy mutex_exit(&q->bq_lock); 148eda14cbcSMatt Macy return (ret); 149eda14cbcSMatt Macy } 150eda14cbcSMatt Macy 151eda14cbcSMatt Macy /* 152eda14cbcSMatt Macy * Returns true if the space used is 0. 153eda14cbcSMatt Macy */ 154eda14cbcSMatt Macy boolean_t 155eda14cbcSMatt Macy bqueue_empty(bqueue_t *q) 156eda14cbcSMatt Macy { 157eda14cbcSMatt Macy return (q->bq_size == 0); 158eda14cbcSMatt Macy } 159