xref: /spdk/lib/util/pipe.c (revision 8130039ee5287100d9eb93eb886967645da3d545)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2019 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/pipe.h"
7 #include "spdk/util.h"
8 #include "spdk/queue.h"
9 #include "spdk/log.h"
10 
11 struct spdk_pipe_buf {
12 	SLIST_ENTRY(spdk_pipe_buf)	link;
13 	uint32_t			sz;
14 };
15 
16 struct spdk_pipe_group {
17 	SLIST_HEAD(, spdk_pipe_buf) bufs;
18 };
19 
20 struct spdk_pipe {
21 	uint8_t	*buf;
22 	uint32_t sz;
23 
24 	uint32_t write;
25 	uint32_t read;
26 	bool full;
27 
28 	struct spdk_pipe_group *group;
29 };
30 
31 struct spdk_pipe *
32 spdk_pipe_create(void *buf, uint32_t sz)
33 {
34 	struct spdk_pipe *pipe;
35 
36 	pipe = calloc(1, sizeof(*pipe));
37 	if (pipe == NULL) {
38 		return NULL;
39 	}
40 
41 	pipe->buf = buf;
42 	pipe->sz = sz;
43 
44 	return pipe;
45 }
46 
47 void *
48 spdk_pipe_destroy(struct spdk_pipe *pipe)
49 {
50 	void *buf;
51 
52 	if (pipe == NULL) {
53 		return NULL;
54 	}
55 
56 	if (pipe->group) {
57 		spdk_pipe_group_remove(pipe->group, pipe);
58 	}
59 
60 	buf = pipe->buf;
61 	free(pipe);
62 	return buf;
63 }
64 
65 static void
66 pipe_alloc_buf_from_group(struct spdk_pipe *pipe)
67 {
68 	struct spdk_pipe_buf *buf;
69 	struct spdk_pipe_group *group;
70 
71 	assert(pipe->group != NULL);
72 	group = pipe->group;
73 
74 	/* We have to pick a buffer that's the correct size. It's almost always
75 	 * the first one. */
76 	buf = SLIST_FIRST(&group->bufs);
77 	while (buf != NULL) {
78 		if (buf->sz == pipe->sz) {
79 			/* TODO: Could track the previous and do an SLIST_REMOVE_AFTER */
80 			SLIST_REMOVE(&pipe->group->bufs, buf, spdk_pipe_buf, link);
81 			pipe->buf = (void *)buf;
82 			return;
83 		}
84 		buf = SLIST_NEXT(buf, link);
85 	}
86 	/* Should never get here. */
87 	assert(false);
88 }
89 
90 int
91 spdk_pipe_writer_get_buffer(struct spdk_pipe *pipe, uint32_t requested_sz, struct iovec *iovs)
92 {
93 	uint32_t sz;
94 	uint32_t read;
95 	uint32_t write;
96 
97 	read = pipe->read;
98 	write = pipe->write;
99 
100 	if (pipe->full || requested_sz == 0) {
101 		iovs[0].iov_base = NULL;
102 		iovs[0].iov_len = 0;
103 		return 0;
104 	}
105 
106 	if (pipe->buf == NULL) {
107 		pipe_alloc_buf_from_group(pipe);
108 	}
109 
110 	if (read <= write) {
111 		sz = spdk_min(requested_sz, pipe->sz - write);
112 
113 		iovs[0].iov_base = pipe->buf + write;
114 		iovs[0].iov_len = sz;
115 
116 		requested_sz -= sz;
117 
118 		if (requested_sz > 0) {
119 			sz = spdk_min(requested_sz, read);
120 
121 			iovs[1].iov_base = (sz == 0) ? NULL : pipe->buf;
122 			iovs[1].iov_len = sz;
123 		} else {
124 			iovs[1].iov_base = NULL;
125 			iovs[1].iov_len = 0;
126 		}
127 	} else {
128 		sz = spdk_min(requested_sz, read - write);
129 
130 		iovs[0].iov_base = pipe->buf + write;
131 		iovs[0].iov_len = sz;
132 		iovs[1].iov_base = NULL;
133 		iovs[1].iov_len = 0;
134 	}
135 
136 	return iovs[0].iov_len + iovs[1].iov_len;
137 }
138 
139 int
140 spdk_pipe_writer_advance(struct spdk_pipe *pipe, uint32_t requested_sz)
141 {
142 	uint32_t sz;
143 	uint32_t read;
144 	uint32_t write;
145 
146 	read = pipe->read;
147 	write = pipe->write;
148 
149 	if (requested_sz > pipe->sz || pipe->full) {
150 		return -EINVAL;
151 	}
152 
153 	if (read <= write) {
154 		if (requested_sz > (pipe->sz - write) + read) {
155 			return -EINVAL;
156 		}
157 
158 		sz = spdk_min(requested_sz, pipe->sz - write);
159 
160 		write += sz;
161 		if (write == pipe->sz) {
162 			write = 0;
163 		}
164 		requested_sz -= sz;
165 
166 		if (requested_sz > 0) {
167 			write = requested_sz;
168 		}
169 	} else {
170 		if (requested_sz > (read - write)) {
171 			return -EINVAL;
172 		}
173 
174 		write += requested_sz;
175 	}
176 
177 	if (read == write) {
178 		pipe->full = true;
179 	}
180 	pipe->write = write;
181 
182 	return 0;
183 }
184 
185 uint32_t
186 spdk_pipe_reader_bytes_available(struct spdk_pipe *pipe)
187 {
188 	uint32_t read;
189 	uint32_t write;
190 
191 	read = pipe->read;
192 	write = pipe->write;
193 
194 	if (read == write && !pipe->full) {
195 		return 0;
196 	} else if (read < write) {
197 		return write - read;
198 	} else {
199 		return (pipe->sz - read) + write;
200 	}
201 }
202 
203 int
204 spdk_pipe_reader_get_buffer(struct spdk_pipe *pipe, uint32_t requested_sz, struct iovec *iovs)
205 {
206 	uint32_t sz;
207 	uint32_t read;
208 	uint32_t write;
209 
210 	read = pipe->read;
211 	write = pipe->write;
212 
213 	if ((read == write && !pipe->full) || requested_sz == 0) {
214 		iovs[0].iov_base = NULL;
215 		iovs[0].iov_len = 0;
216 		iovs[1].iov_base = NULL;
217 		iovs[1].iov_len = 0;
218 	} else if (read < write) {
219 		sz = spdk_min(requested_sz, write - read);
220 
221 		iovs[0].iov_base = pipe->buf + read;
222 		iovs[0].iov_len = sz;
223 		iovs[1].iov_base = NULL;
224 		iovs[1].iov_len = 0;
225 	} else {
226 		sz = spdk_min(requested_sz, pipe->sz - read);
227 
228 		iovs[0].iov_base = pipe->buf + read;
229 		iovs[0].iov_len = sz;
230 
231 		requested_sz -= sz;
232 
233 		if (requested_sz > 0) {
234 			sz = spdk_min(requested_sz, write);
235 			iovs[1].iov_base = (sz == 0) ? NULL : pipe->buf;
236 			iovs[1].iov_len = sz;
237 		} else {
238 			iovs[1].iov_base = NULL;
239 			iovs[1].iov_len = 0;
240 		}
241 	}
242 
243 	return iovs[0].iov_len + iovs[1].iov_len;
244 }
245 
246 int
247 spdk_pipe_reader_advance(struct spdk_pipe *pipe, uint32_t requested_sz)
248 {
249 	uint32_t sz;
250 	uint32_t read;
251 	uint32_t write;
252 
253 	read = pipe->read;
254 	write = pipe->write;
255 
256 	if (requested_sz == 0) {
257 		return 0;
258 	}
259 
260 	if (read < write) {
261 		if (requested_sz > (write - read)) {
262 			return -EINVAL;
263 		}
264 
265 		read += requested_sz;
266 	} else {
267 		sz = spdk_min(requested_sz, pipe->sz - read);
268 
269 		read += sz;
270 		if (read == pipe->sz) {
271 			read = 0;
272 		}
273 		requested_sz -= sz;
274 
275 		if (requested_sz > 0) {
276 			if (requested_sz > write) {
277 				return -EINVAL;
278 			}
279 
280 			read = requested_sz;
281 		}
282 	}
283 
284 	/* We know we advanced at least one byte, so the pipe isn't full. */
285 	pipe->full = false;
286 
287 	if (read == write) {
288 		/* The pipe is empty. To re-use the same memory more frequently, jump
289 		 * both pointers back to the beginning of the pipe. */
290 		read = 0;
291 		pipe->write = 0;
292 
293 		/* Additionally, release the buffer to the shared pool */
294 		if (pipe->group) {
295 			struct spdk_pipe_buf *buf = (struct spdk_pipe_buf *)pipe->buf;
296 			buf->sz = pipe->sz;
297 			SLIST_INSERT_HEAD(&pipe->group->bufs, buf, link);
298 			pipe->buf = NULL;
299 		}
300 	}
301 
302 	pipe->read = read;
303 
304 	return 0;
305 }
306 
307 struct spdk_pipe_group *
308 spdk_pipe_group_create(void)
309 {
310 	struct spdk_pipe_group *group;
311 
312 	group = calloc(1, sizeof(*group));
313 	if (!group) {
314 		return NULL;
315 	}
316 
317 	SLIST_INIT(&group->bufs);
318 
319 	return group;
320 }
321 
322 void
323 spdk_pipe_group_destroy(struct spdk_pipe_group *group)
324 {
325 	if (!SLIST_EMPTY(&group->bufs)) {
326 		SPDK_ERRLOG("Destroying a pipe group that still has buffers!\n");
327 		assert(false);
328 	}
329 
330 	free(group);
331 }
332 
333 int
334 spdk_pipe_group_add(struct spdk_pipe_group *group, struct spdk_pipe *pipe)
335 {
336 	struct spdk_pipe_buf *buf;
337 
338 	assert(pipe->group == NULL);
339 
340 	pipe->group = group;
341 	if (pipe->read != pipe->write || pipe->full) {
342 		/* Pipe currently has valid data, so keep the buffer attached
343 		 * to the pipe for now.  We can move it to the group's SLIST
344 		 * later when it gets emptied.
345 		 */
346 		return 0;
347 	}
348 
349 	buf = (struct spdk_pipe_buf *)pipe->buf;
350 	buf->sz = pipe->sz;
351 	SLIST_INSERT_HEAD(&group->bufs, buf, link);
352 	pipe->buf = NULL;
353 	return 0;
354 }
355 
356 int
357 spdk_pipe_group_remove(struct spdk_pipe_group *group, struct spdk_pipe *pipe)
358 {
359 	assert(pipe->group == group);
360 
361 	if (pipe->buf == NULL) {
362 		/* Associate a buffer with the pipe before returning. */
363 		pipe_alloc_buf_from_group(pipe);
364 		assert(pipe->buf != NULL);
365 	}
366 
367 	pipe->group = NULL;
368 	return 0;
369 }
370