xref: /spdk/lib/util/pipe.c (revision 158d15b7400ae9b4373cf6a6ef6c880436d09a3c)
1488570ebSJim Harris /*   SPDX-License-Identifier: BSD-3-Clause
2a6dbe372Spaul luse  *   Copyright (C) 2019 Intel Corporation.
3063252faSBen Walker  *   All rights reserved.
4063252faSBen Walker  */
5063252faSBen Walker 
6063252faSBen Walker #include "spdk/pipe.h"
7063252faSBen Walker #include "spdk/util.h"
8*158d15b7SBen Walker #include "spdk/queue.h"
9*158d15b7SBen Walker #include "spdk/log.h"
10*158d15b7SBen Walker 
11*158d15b7SBen Walker struct spdk_pipe_buf {
12*158d15b7SBen Walker 	SLIST_ENTRY(spdk_pipe_buf)	link;
13*158d15b7SBen Walker 	uint32_t			sz;
14*158d15b7SBen Walker };
15*158d15b7SBen Walker 
16*158d15b7SBen Walker struct spdk_pipe_group {
17*158d15b7SBen Walker 	SLIST_HEAD(, spdk_pipe_buf) bufs;
18*158d15b7SBen Walker };
19063252faSBen Walker 
20063252faSBen Walker struct spdk_pipe {
21063252faSBen Walker 	uint8_t	*buf;
22063252faSBen Walker 	uint32_t sz;
23063252faSBen Walker 
24063252faSBen Walker 	uint32_t write;
25063252faSBen Walker 	uint32_t read;
266648ea0fSBen Walker 	bool full;
27*158d15b7SBen Walker 
28*158d15b7SBen Walker 	struct spdk_pipe_group *group;
29063252faSBen Walker };
30063252faSBen Walker 
31063252faSBen Walker struct spdk_pipe *
spdk_pipe_create(void * buf,uint32_t sz)32063252faSBen Walker spdk_pipe_create(void *buf, uint32_t sz)
33063252faSBen Walker {
34063252faSBen Walker 	struct spdk_pipe *pipe;
35063252faSBen Walker 
36063252faSBen Walker 	pipe = calloc(1, sizeof(*pipe));
37063252faSBen Walker 	if (pipe == NULL) {
38063252faSBen Walker 		return NULL;
39063252faSBen Walker 	}
40063252faSBen Walker 
41063252faSBen Walker 	pipe->buf = buf;
42063252faSBen Walker 	pipe->sz = sz;
43063252faSBen Walker 
44063252faSBen Walker 	return pipe;
45063252faSBen Walker }
46063252faSBen Walker 
4794123731SJim Harris void *
spdk_pipe_destroy(struct spdk_pipe * pipe)48063252faSBen Walker spdk_pipe_destroy(struct spdk_pipe *pipe)
49063252faSBen Walker {
5094123731SJim Harris 	void *buf;
5194123731SJim Harris 
5294123731SJim Harris 	if (pipe == NULL) {
5394123731SJim Harris 		return NULL;
5494123731SJim Harris 	}
5594123731SJim Harris 
56*158d15b7SBen Walker 	if (pipe->group) {
57*158d15b7SBen Walker 		spdk_pipe_group_remove(pipe->group, pipe);
58*158d15b7SBen Walker 	}
59*158d15b7SBen Walker 
6094123731SJim Harris 	buf = pipe->buf;
61063252faSBen Walker 	free(pipe);
6294123731SJim Harris 	return buf;
63063252faSBen Walker }
64063252faSBen Walker 
65*158d15b7SBen Walker static void
pipe_alloc_buf_from_group(struct spdk_pipe * pipe)66*158d15b7SBen Walker pipe_alloc_buf_from_group(struct spdk_pipe *pipe)
67*158d15b7SBen Walker {
68*158d15b7SBen Walker 	struct spdk_pipe_buf *buf;
69*158d15b7SBen Walker 	struct spdk_pipe_group *group;
70*158d15b7SBen Walker 
71*158d15b7SBen Walker 	assert(pipe->group != NULL);
72*158d15b7SBen Walker 	group = pipe->group;
73*158d15b7SBen Walker 
74*158d15b7SBen Walker 	/* We have to pick a buffer that's the correct size. It's almost always
75*158d15b7SBen Walker 	 * the first one. */
76*158d15b7SBen Walker 	buf = SLIST_FIRST(&group->bufs);
77*158d15b7SBen Walker 	while (buf != NULL) {
78*158d15b7SBen Walker 		if (buf->sz == pipe->sz) {
79*158d15b7SBen Walker 			/* TODO: Could track the previous and do an SLIST_REMOVE_AFTER */
80*158d15b7SBen Walker 			SLIST_REMOVE(&pipe->group->bufs, buf, spdk_pipe_buf, link);
81*158d15b7SBen Walker 			pipe->buf = (void *)buf;
82*158d15b7SBen Walker 			return;
83*158d15b7SBen Walker 		}
84*158d15b7SBen Walker 		buf = SLIST_NEXT(buf, link);
85*158d15b7SBen Walker 	}
86*158d15b7SBen Walker 	/* Should never get here. */
87*158d15b7SBen Walker 	assert(false);
88*158d15b7SBen Walker }
89*158d15b7SBen Walker 
90063252faSBen Walker int
spdk_pipe_writer_get_buffer(struct spdk_pipe * pipe,uint32_t requested_sz,struct iovec * iovs)91063252faSBen Walker spdk_pipe_writer_get_buffer(struct spdk_pipe *pipe, uint32_t requested_sz, struct iovec *iovs)
92063252faSBen Walker {
93063252faSBen Walker 	uint32_t sz;
94063252faSBen Walker 	uint32_t read;
95063252faSBen Walker 	uint32_t write;
96063252faSBen Walker 
97063252faSBen Walker 	read = pipe->read;
98063252faSBen Walker 	write = pipe->write;
99063252faSBen Walker 
1006648ea0fSBen Walker 	if (pipe->full || requested_sz == 0) {
1016648ea0fSBen Walker 		iovs[0].iov_base = NULL;
1026648ea0fSBen Walker 		iovs[0].iov_len = 0;
1036648ea0fSBen Walker 		return 0;
1046648ea0fSBen Walker 	}
105063252faSBen Walker 
106*158d15b7SBen Walker 	if (pipe->buf == NULL) {
107*158d15b7SBen Walker 		pipe_alloc_buf_from_group(pipe);
108*158d15b7SBen Walker 	}
109*158d15b7SBen Walker 
1106648ea0fSBen Walker 	if (read <= write) {
111063252faSBen Walker 		sz = spdk_min(requested_sz, pipe->sz - write);
112063252faSBen Walker 
1136648ea0fSBen Walker 		iovs[0].iov_base = pipe->buf + write;
114063252faSBen Walker 		iovs[0].iov_len = sz;
115063252faSBen Walker 
116063252faSBen Walker 		requested_sz -= sz;
117063252faSBen Walker 
118063252faSBen Walker 		if (requested_sz > 0) {
119063252faSBen Walker 			sz = spdk_min(requested_sz, read);
120063252faSBen Walker 
121d848418dSZiye Yang 			iovs[1].iov_base = (sz == 0) ? NULL : pipe->buf;
122063252faSBen Walker 			iovs[1].iov_len = sz;
123063252faSBen Walker 		} else {
124063252faSBen Walker 			iovs[1].iov_base = NULL;
125063252faSBen Walker 			iovs[1].iov_len = 0;
126063252faSBen Walker 		}
127063252faSBen Walker 	} else {
1286648ea0fSBen Walker 		sz = spdk_min(requested_sz, read - write);
129063252faSBen Walker 
13030f52282SBen Walker 		iovs[0].iov_base = pipe->buf + write;
131063252faSBen Walker 		iovs[0].iov_len = sz;
132063252faSBen Walker 		iovs[1].iov_base = NULL;
133063252faSBen Walker 		iovs[1].iov_len = 0;
134063252faSBen Walker 	}
135063252faSBen Walker 
136063252faSBen Walker 	return iovs[0].iov_len + iovs[1].iov_len;
137063252faSBen Walker }
138063252faSBen Walker 
139063252faSBen Walker int
spdk_pipe_writer_advance(struct spdk_pipe * pipe,uint32_t requested_sz)140063252faSBen Walker spdk_pipe_writer_advance(struct spdk_pipe *pipe, uint32_t requested_sz)
141063252faSBen Walker {
142063252faSBen Walker 	uint32_t sz;
143063252faSBen Walker 	uint32_t read;
144063252faSBen Walker 	uint32_t write;
145063252faSBen Walker 
146063252faSBen Walker 	read = pipe->read;
147063252faSBen Walker 	write = pipe->write;
148063252faSBen Walker 
1496648ea0fSBen Walker 	if (requested_sz > pipe->sz || pipe->full) {
150063252faSBen Walker 		return -EINVAL;
151063252faSBen Walker 	}
152063252faSBen Walker 
153063252faSBen Walker 	if (read <= write) {
1546648ea0fSBen Walker 		if (requested_sz > (pipe->sz - write) + read) {
155063252faSBen Walker 			return -EINVAL;
156063252faSBen Walker 		}
157063252faSBen Walker 
158063252faSBen Walker 		sz = spdk_min(requested_sz, pipe->sz - write);
159063252faSBen Walker 
160063252faSBen Walker 		write += sz;
1616648ea0fSBen Walker 		if (write == pipe->sz) {
162063252faSBen Walker 			write = 0;
163063252faSBen Walker 		}
164063252faSBen Walker 		requested_sz -= sz;
165063252faSBen Walker 
166063252faSBen Walker 		if (requested_sz > 0) {
167063252faSBen Walker 			write = requested_sz;
168063252faSBen Walker 		}
169063252faSBen Walker 	} else {
1706648ea0fSBen Walker 		if (requested_sz > (read - write)) {
171063252faSBen Walker 			return -EINVAL;
172063252faSBen Walker 		}
173063252faSBen Walker 
174063252faSBen Walker 		write += requested_sz;
175063252faSBen Walker 	}
176063252faSBen Walker 
1776648ea0fSBen Walker 	if (read == write) {
1786648ea0fSBen Walker 		pipe->full = true;
1796648ea0fSBen Walker 	}
180063252faSBen Walker 	pipe->write = write;
181063252faSBen Walker 
182063252faSBen Walker 	return 0;
183063252faSBen Walker }
184063252faSBen Walker 
185063252faSBen Walker uint32_t
spdk_pipe_reader_bytes_available(struct spdk_pipe * pipe)186063252faSBen Walker spdk_pipe_reader_bytes_available(struct spdk_pipe *pipe)
187063252faSBen Walker {
188063252faSBen Walker 	uint32_t read;
189063252faSBen Walker 	uint32_t write;
190063252faSBen Walker 
191063252faSBen Walker 	read = pipe->read;
192063252faSBen Walker 	write = pipe->write;
193063252faSBen Walker 
1946648ea0fSBen Walker 	if (read == write && !pipe->full) {
1956648ea0fSBen Walker 		return 0;
1966648ea0fSBen Walker 	} else if (read < write) {
197063252faSBen Walker 		return write - read;
1986648ea0fSBen Walker 	} else {
1996648ea0fSBen Walker 		return (pipe->sz - read) + write;
200063252faSBen Walker 	}
201063252faSBen Walker }
202063252faSBen Walker 
203063252faSBen Walker int
spdk_pipe_reader_get_buffer(struct spdk_pipe * pipe,uint32_t requested_sz,struct iovec * iovs)204063252faSBen Walker spdk_pipe_reader_get_buffer(struct spdk_pipe *pipe, uint32_t requested_sz, struct iovec *iovs)
205063252faSBen Walker {
206063252faSBen Walker 	uint32_t sz;
207063252faSBen Walker 	uint32_t read;
208063252faSBen Walker 	uint32_t write;
209063252faSBen Walker 
210063252faSBen Walker 	read = pipe->read;
211063252faSBen Walker 	write = pipe->write;
212063252faSBen Walker 
21330f52282SBen Walker 	if ((read == write && !pipe->full) || requested_sz == 0) {
2146648ea0fSBen Walker 		iovs[0].iov_base = NULL;
2156648ea0fSBen Walker 		iovs[0].iov_len = 0;
2166648ea0fSBen Walker 		iovs[1].iov_base = NULL;
2176648ea0fSBen Walker 		iovs[1].iov_len = 0;
2186648ea0fSBen Walker 	} else if (read < write) {
219063252faSBen Walker 		sz = spdk_min(requested_sz, write - read);
220063252faSBen Walker 
22130f52282SBen Walker 		iovs[0].iov_base = pipe->buf + read;
222063252faSBen Walker 		iovs[0].iov_len = sz;
223063252faSBen Walker 		iovs[1].iov_base = NULL;
224063252faSBen Walker 		iovs[1].iov_len = 0;
225063252faSBen Walker 	} else {
226063252faSBen Walker 		sz = spdk_min(requested_sz, pipe->sz - read);
227063252faSBen Walker 
22830f52282SBen Walker 		iovs[0].iov_base = pipe->buf + read;
229063252faSBen Walker 		iovs[0].iov_len = sz;
230063252faSBen Walker 
231063252faSBen Walker 		requested_sz -= sz;
232063252faSBen Walker 
233063252faSBen Walker 		if (requested_sz > 0) {
234063252faSBen Walker 			sz = spdk_min(requested_sz, write);
235d848418dSZiye Yang 			iovs[1].iov_base = (sz == 0) ? NULL : pipe->buf;
236063252faSBen Walker 			iovs[1].iov_len = sz;
237063252faSBen Walker 		} else {
238063252faSBen Walker 			iovs[1].iov_base = NULL;
239063252faSBen Walker 			iovs[1].iov_len = 0;
240063252faSBen Walker 		}
241063252faSBen Walker 	}
242063252faSBen Walker 
243063252faSBen Walker 	return iovs[0].iov_len + iovs[1].iov_len;
244063252faSBen Walker }
245063252faSBen Walker 
246063252faSBen Walker int
spdk_pipe_reader_advance(struct spdk_pipe * pipe,uint32_t requested_sz)247063252faSBen Walker spdk_pipe_reader_advance(struct spdk_pipe *pipe, uint32_t requested_sz)
248063252faSBen Walker {
249063252faSBen Walker 	uint32_t sz;
250063252faSBen Walker 	uint32_t read;
251063252faSBen Walker 	uint32_t write;
252063252faSBen Walker 
253063252faSBen Walker 	read = pipe->read;
254063252faSBen Walker 	write = pipe->write;
255063252faSBen Walker 
2566648ea0fSBen Walker 	if (requested_sz == 0) {
2576648ea0fSBen Walker 		return 0;
2586648ea0fSBen Walker 	}
2596648ea0fSBen Walker 
2606648ea0fSBen Walker 	if (read < write) {
261063252faSBen Walker 		if (requested_sz > (write - read)) {
262063252faSBen Walker 			return -EINVAL;
263063252faSBen Walker 		}
264063252faSBen Walker 
265063252faSBen Walker 		read += requested_sz;
266063252faSBen Walker 	} else {
267063252faSBen Walker 		sz = spdk_min(requested_sz, pipe->sz - read);
268063252faSBen Walker 
269063252faSBen Walker 		read += sz;
2706648ea0fSBen Walker 		if (read == pipe->sz) {
271063252faSBen Walker 			read = 0;
272063252faSBen Walker 		}
273063252faSBen Walker 		requested_sz -= sz;
274063252faSBen Walker 
275063252faSBen Walker 		if (requested_sz > 0) {
276063252faSBen Walker 			if (requested_sz > write) {
277063252faSBen Walker 				return -EINVAL;
278063252faSBen Walker 			}
279063252faSBen Walker 
280063252faSBen Walker 			read = requested_sz;
281063252faSBen Walker 		}
282063252faSBen Walker 	}
283063252faSBen Walker 
2846648ea0fSBen Walker 	/* We know we advanced at least one byte, so the pipe isn't full. */
2856648ea0fSBen Walker 	pipe->full = false;
286f4cb9817SBen Walker 
287f4cb9817SBen Walker 	if (read == write) {
288f4cb9817SBen Walker 		/* The pipe is empty. To re-use the same memory more frequently, jump
289f4cb9817SBen Walker 		 * both pointers back to the beginning of the pipe. */
290f4cb9817SBen Walker 		read = 0;
291f4cb9817SBen Walker 		pipe->write = 0;
292*158d15b7SBen Walker 
293*158d15b7SBen Walker 		/* Additionally, release the buffer to the shared pool */
294*158d15b7SBen Walker 		if (pipe->group) {
295*158d15b7SBen Walker 			struct spdk_pipe_buf *buf = (struct spdk_pipe_buf *)pipe->buf;
296*158d15b7SBen Walker 			buf->sz = pipe->sz;
297*158d15b7SBen Walker 			SLIST_INSERT_HEAD(&pipe->group->bufs, buf, link);
298*158d15b7SBen Walker 			pipe->buf = NULL;
299*158d15b7SBen Walker 		}
300f4cb9817SBen Walker 	}
301f4cb9817SBen Walker 
302063252faSBen Walker 	pipe->read = read;
303063252faSBen Walker 
304063252faSBen Walker 	return 0;
305063252faSBen Walker }
306*158d15b7SBen Walker 
307*158d15b7SBen Walker struct spdk_pipe_group *
spdk_pipe_group_create(void)308*158d15b7SBen Walker spdk_pipe_group_create(void)
309*158d15b7SBen Walker {
310*158d15b7SBen Walker 	struct spdk_pipe_group *group;
311*158d15b7SBen Walker 
312*158d15b7SBen Walker 	group = calloc(1, sizeof(*group));
313*158d15b7SBen Walker 	if (!group) {
314*158d15b7SBen Walker 		return NULL;
315*158d15b7SBen Walker 	}
316*158d15b7SBen Walker 
317*158d15b7SBen Walker 	SLIST_INIT(&group->bufs);
318*158d15b7SBen Walker 
319*158d15b7SBen Walker 	return group;
320*158d15b7SBen Walker }
321*158d15b7SBen Walker 
322*158d15b7SBen Walker void
spdk_pipe_group_destroy(struct spdk_pipe_group * group)323*158d15b7SBen Walker spdk_pipe_group_destroy(struct spdk_pipe_group *group)
324*158d15b7SBen Walker {
325*158d15b7SBen Walker 	if (!SLIST_EMPTY(&group->bufs)) {
326*158d15b7SBen Walker 		SPDK_ERRLOG("Destroying a pipe group that still has buffers!\n");
327*158d15b7SBen Walker 		assert(false);
328*158d15b7SBen Walker 	}
329*158d15b7SBen Walker 
330*158d15b7SBen Walker 	free(group);
331*158d15b7SBen Walker }
332*158d15b7SBen Walker 
333*158d15b7SBen Walker int
spdk_pipe_group_add(struct spdk_pipe_group * group,struct spdk_pipe * pipe)334*158d15b7SBen Walker spdk_pipe_group_add(struct spdk_pipe_group *group, struct spdk_pipe *pipe)
335*158d15b7SBen Walker {
336*158d15b7SBen Walker 	struct spdk_pipe_buf *buf;
337*158d15b7SBen Walker 
338*158d15b7SBen Walker 	assert(pipe->group == NULL);
339*158d15b7SBen Walker 
340*158d15b7SBen Walker 	pipe->group = group;
341*158d15b7SBen Walker 	if (pipe->read != pipe->write || pipe->full) {
342*158d15b7SBen Walker 		/* Pipe currently has valid data, so keep the buffer attached
343*158d15b7SBen Walker 		 * to the pipe for now.  We can move it to the group's SLIST
344*158d15b7SBen Walker 		 * later when it gets emptied.
345*158d15b7SBen Walker 		 */
346*158d15b7SBen Walker 		return 0;
347*158d15b7SBen Walker 	}
348*158d15b7SBen Walker 
349*158d15b7SBen Walker 	buf = (struct spdk_pipe_buf *)pipe->buf;
350*158d15b7SBen Walker 	buf->sz = pipe->sz;
351*158d15b7SBen Walker 	SLIST_INSERT_HEAD(&group->bufs, buf, link);
352*158d15b7SBen Walker 	pipe->buf = NULL;
353*158d15b7SBen Walker 	return 0;
354*158d15b7SBen Walker }
355*158d15b7SBen Walker 
356*158d15b7SBen Walker int
spdk_pipe_group_remove(struct spdk_pipe_group * group,struct spdk_pipe * pipe)357*158d15b7SBen Walker spdk_pipe_group_remove(struct spdk_pipe_group *group, struct spdk_pipe *pipe)
358*158d15b7SBen Walker {
359*158d15b7SBen Walker 	assert(pipe->group == group);
360*158d15b7SBen Walker 
361*158d15b7SBen Walker 	if (pipe->buf == NULL) {
362*158d15b7SBen Walker 		/* Associate a buffer with the pipe before returning. */
363*158d15b7SBen Walker 		pipe_alloc_buf_from_group(pipe);
364*158d15b7SBen Walker 		assert(pipe->buf != NULL);
365*158d15b7SBen Walker 	}
366*158d15b7SBen Walker 
367*158d15b7SBen Walker 	pipe->group = NULL;
368*158d15b7SBen Walker 	return 0;
369*158d15b7SBen Walker }
370