1488570ebSJim Harris /* SPDX-License-Identifier: BSD-3-Clause
2a6dbe372Spaul luse * Copyright (C) 2019 Intel Corporation.
3063252faSBen Walker * All rights reserved.
4063252faSBen Walker */
5063252faSBen Walker
6063252faSBen Walker #include "spdk/pipe.h"
7063252faSBen Walker #include "spdk/util.h"
8*158d15b7SBen Walker #include "spdk/queue.h"
9*158d15b7SBen Walker #include "spdk/log.h"
10*158d15b7SBen Walker
11*158d15b7SBen Walker struct spdk_pipe_buf {
12*158d15b7SBen Walker SLIST_ENTRY(spdk_pipe_buf) link;
13*158d15b7SBen Walker uint32_t sz;
14*158d15b7SBen Walker };
15*158d15b7SBen Walker
16*158d15b7SBen Walker struct spdk_pipe_group {
17*158d15b7SBen Walker SLIST_HEAD(, spdk_pipe_buf) bufs;
18*158d15b7SBen Walker };
19063252faSBen Walker
20063252faSBen Walker struct spdk_pipe {
21063252faSBen Walker uint8_t *buf;
22063252faSBen Walker uint32_t sz;
23063252faSBen Walker
24063252faSBen Walker uint32_t write;
25063252faSBen Walker uint32_t read;
266648ea0fSBen Walker bool full;
27*158d15b7SBen Walker
28*158d15b7SBen Walker struct spdk_pipe_group *group;
29063252faSBen Walker };
30063252faSBen Walker
31063252faSBen Walker struct spdk_pipe *
spdk_pipe_create(void * buf,uint32_t sz)32063252faSBen Walker spdk_pipe_create(void *buf, uint32_t sz)
33063252faSBen Walker {
34063252faSBen Walker struct spdk_pipe *pipe;
35063252faSBen Walker
36063252faSBen Walker pipe = calloc(1, sizeof(*pipe));
37063252faSBen Walker if (pipe == NULL) {
38063252faSBen Walker return NULL;
39063252faSBen Walker }
40063252faSBen Walker
41063252faSBen Walker pipe->buf = buf;
42063252faSBen Walker pipe->sz = sz;
43063252faSBen Walker
44063252faSBen Walker return pipe;
45063252faSBen Walker }
46063252faSBen Walker
4794123731SJim Harris void *
spdk_pipe_destroy(struct spdk_pipe * pipe)48063252faSBen Walker spdk_pipe_destroy(struct spdk_pipe *pipe)
49063252faSBen Walker {
5094123731SJim Harris void *buf;
5194123731SJim Harris
5294123731SJim Harris if (pipe == NULL) {
5394123731SJim Harris return NULL;
5494123731SJim Harris }
5594123731SJim Harris
56*158d15b7SBen Walker if (pipe->group) {
57*158d15b7SBen Walker spdk_pipe_group_remove(pipe->group, pipe);
58*158d15b7SBen Walker }
59*158d15b7SBen Walker
6094123731SJim Harris buf = pipe->buf;
61063252faSBen Walker free(pipe);
6294123731SJim Harris return buf;
63063252faSBen Walker }
64063252faSBen Walker
65*158d15b7SBen Walker static void
pipe_alloc_buf_from_group(struct spdk_pipe * pipe)66*158d15b7SBen Walker pipe_alloc_buf_from_group(struct spdk_pipe *pipe)
67*158d15b7SBen Walker {
68*158d15b7SBen Walker struct spdk_pipe_buf *buf;
69*158d15b7SBen Walker struct spdk_pipe_group *group;
70*158d15b7SBen Walker
71*158d15b7SBen Walker assert(pipe->group != NULL);
72*158d15b7SBen Walker group = pipe->group;
73*158d15b7SBen Walker
74*158d15b7SBen Walker /* We have to pick a buffer that's the correct size. It's almost always
75*158d15b7SBen Walker * the first one. */
76*158d15b7SBen Walker buf = SLIST_FIRST(&group->bufs);
77*158d15b7SBen Walker while (buf != NULL) {
78*158d15b7SBen Walker if (buf->sz == pipe->sz) {
79*158d15b7SBen Walker /* TODO: Could track the previous and do an SLIST_REMOVE_AFTER */
80*158d15b7SBen Walker SLIST_REMOVE(&pipe->group->bufs, buf, spdk_pipe_buf, link);
81*158d15b7SBen Walker pipe->buf = (void *)buf;
82*158d15b7SBen Walker return;
83*158d15b7SBen Walker }
84*158d15b7SBen Walker buf = SLIST_NEXT(buf, link);
85*158d15b7SBen Walker }
86*158d15b7SBen Walker /* Should never get here. */
87*158d15b7SBen Walker assert(false);
88*158d15b7SBen Walker }
89*158d15b7SBen Walker
90063252faSBen Walker int
spdk_pipe_writer_get_buffer(struct spdk_pipe * pipe,uint32_t requested_sz,struct iovec * iovs)91063252faSBen Walker spdk_pipe_writer_get_buffer(struct spdk_pipe *pipe, uint32_t requested_sz, struct iovec *iovs)
92063252faSBen Walker {
93063252faSBen Walker uint32_t sz;
94063252faSBen Walker uint32_t read;
95063252faSBen Walker uint32_t write;
96063252faSBen Walker
97063252faSBen Walker read = pipe->read;
98063252faSBen Walker write = pipe->write;
99063252faSBen Walker
1006648ea0fSBen Walker if (pipe->full || requested_sz == 0) {
1016648ea0fSBen Walker iovs[0].iov_base = NULL;
1026648ea0fSBen Walker iovs[0].iov_len = 0;
1036648ea0fSBen Walker return 0;
1046648ea0fSBen Walker }
105063252faSBen Walker
106*158d15b7SBen Walker if (pipe->buf == NULL) {
107*158d15b7SBen Walker pipe_alloc_buf_from_group(pipe);
108*158d15b7SBen Walker }
109*158d15b7SBen Walker
1106648ea0fSBen Walker if (read <= write) {
111063252faSBen Walker sz = spdk_min(requested_sz, pipe->sz - write);
112063252faSBen Walker
1136648ea0fSBen Walker iovs[0].iov_base = pipe->buf + write;
114063252faSBen Walker iovs[0].iov_len = sz;
115063252faSBen Walker
116063252faSBen Walker requested_sz -= sz;
117063252faSBen Walker
118063252faSBen Walker if (requested_sz > 0) {
119063252faSBen Walker sz = spdk_min(requested_sz, read);
120063252faSBen Walker
121d848418dSZiye Yang iovs[1].iov_base = (sz == 0) ? NULL : pipe->buf;
122063252faSBen Walker iovs[1].iov_len = sz;
123063252faSBen Walker } else {
124063252faSBen Walker iovs[1].iov_base = NULL;
125063252faSBen Walker iovs[1].iov_len = 0;
126063252faSBen Walker }
127063252faSBen Walker } else {
1286648ea0fSBen Walker sz = spdk_min(requested_sz, read - write);
129063252faSBen Walker
13030f52282SBen Walker iovs[0].iov_base = pipe->buf + write;
131063252faSBen Walker iovs[0].iov_len = sz;
132063252faSBen Walker iovs[1].iov_base = NULL;
133063252faSBen Walker iovs[1].iov_len = 0;
134063252faSBen Walker }
135063252faSBen Walker
136063252faSBen Walker return iovs[0].iov_len + iovs[1].iov_len;
137063252faSBen Walker }
138063252faSBen Walker
139063252faSBen Walker int
spdk_pipe_writer_advance(struct spdk_pipe * pipe,uint32_t requested_sz)140063252faSBen Walker spdk_pipe_writer_advance(struct spdk_pipe *pipe, uint32_t requested_sz)
141063252faSBen Walker {
142063252faSBen Walker uint32_t sz;
143063252faSBen Walker uint32_t read;
144063252faSBen Walker uint32_t write;
145063252faSBen Walker
146063252faSBen Walker read = pipe->read;
147063252faSBen Walker write = pipe->write;
148063252faSBen Walker
1496648ea0fSBen Walker if (requested_sz > pipe->sz || pipe->full) {
150063252faSBen Walker return -EINVAL;
151063252faSBen Walker }
152063252faSBen Walker
153063252faSBen Walker if (read <= write) {
1546648ea0fSBen Walker if (requested_sz > (pipe->sz - write) + read) {
155063252faSBen Walker return -EINVAL;
156063252faSBen Walker }
157063252faSBen Walker
158063252faSBen Walker sz = spdk_min(requested_sz, pipe->sz - write);
159063252faSBen Walker
160063252faSBen Walker write += sz;
1616648ea0fSBen Walker if (write == pipe->sz) {
162063252faSBen Walker write = 0;
163063252faSBen Walker }
164063252faSBen Walker requested_sz -= sz;
165063252faSBen Walker
166063252faSBen Walker if (requested_sz > 0) {
167063252faSBen Walker write = requested_sz;
168063252faSBen Walker }
169063252faSBen Walker } else {
1706648ea0fSBen Walker if (requested_sz > (read - write)) {
171063252faSBen Walker return -EINVAL;
172063252faSBen Walker }
173063252faSBen Walker
174063252faSBen Walker write += requested_sz;
175063252faSBen Walker }
176063252faSBen Walker
1776648ea0fSBen Walker if (read == write) {
1786648ea0fSBen Walker pipe->full = true;
1796648ea0fSBen Walker }
180063252faSBen Walker pipe->write = write;
181063252faSBen Walker
182063252faSBen Walker return 0;
183063252faSBen Walker }
184063252faSBen Walker
185063252faSBen Walker uint32_t
spdk_pipe_reader_bytes_available(struct spdk_pipe * pipe)186063252faSBen Walker spdk_pipe_reader_bytes_available(struct spdk_pipe *pipe)
187063252faSBen Walker {
188063252faSBen Walker uint32_t read;
189063252faSBen Walker uint32_t write;
190063252faSBen Walker
191063252faSBen Walker read = pipe->read;
192063252faSBen Walker write = pipe->write;
193063252faSBen Walker
1946648ea0fSBen Walker if (read == write && !pipe->full) {
1956648ea0fSBen Walker return 0;
1966648ea0fSBen Walker } else if (read < write) {
197063252faSBen Walker return write - read;
1986648ea0fSBen Walker } else {
1996648ea0fSBen Walker return (pipe->sz - read) + write;
200063252faSBen Walker }
201063252faSBen Walker }
202063252faSBen Walker
203063252faSBen Walker int
spdk_pipe_reader_get_buffer(struct spdk_pipe * pipe,uint32_t requested_sz,struct iovec * iovs)204063252faSBen Walker spdk_pipe_reader_get_buffer(struct spdk_pipe *pipe, uint32_t requested_sz, struct iovec *iovs)
205063252faSBen Walker {
206063252faSBen Walker uint32_t sz;
207063252faSBen Walker uint32_t read;
208063252faSBen Walker uint32_t write;
209063252faSBen Walker
210063252faSBen Walker read = pipe->read;
211063252faSBen Walker write = pipe->write;
212063252faSBen Walker
21330f52282SBen Walker if ((read == write && !pipe->full) || requested_sz == 0) {
2146648ea0fSBen Walker iovs[0].iov_base = NULL;
2156648ea0fSBen Walker iovs[0].iov_len = 0;
2166648ea0fSBen Walker iovs[1].iov_base = NULL;
2176648ea0fSBen Walker iovs[1].iov_len = 0;
2186648ea0fSBen Walker } else if (read < write) {
219063252faSBen Walker sz = spdk_min(requested_sz, write - read);
220063252faSBen Walker
22130f52282SBen Walker iovs[0].iov_base = pipe->buf + read;
222063252faSBen Walker iovs[0].iov_len = sz;
223063252faSBen Walker iovs[1].iov_base = NULL;
224063252faSBen Walker iovs[1].iov_len = 0;
225063252faSBen Walker } else {
226063252faSBen Walker sz = spdk_min(requested_sz, pipe->sz - read);
227063252faSBen Walker
22830f52282SBen Walker iovs[0].iov_base = pipe->buf + read;
229063252faSBen Walker iovs[0].iov_len = sz;
230063252faSBen Walker
231063252faSBen Walker requested_sz -= sz;
232063252faSBen Walker
233063252faSBen Walker if (requested_sz > 0) {
234063252faSBen Walker sz = spdk_min(requested_sz, write);
235d848418dSZiye Yang iovs[1].iov_base = (sz == 0) ? NULL : pipe->buf;
236063252faSBen Walker iovs[1].iov_len = sz;
237063252faSBen Walker } else {
238063252faSBen Walker iovs[1].iov_base = NULL;
239063252faSBen Walker iovs[1].iov_len = 0;
240063252faSBen Walker }
241063252faSBen Walker }
242063252faSBen Walker
243063252faSBen Walker return iovs[0].iov_len + iovs[1].iov_len;
244063252faSBen Walker }
245063252faSBen Walker
246063252faSBen Walker int
spdk_pipe_reader_advance(struct spdk_pipe * pipe,uint32_t requested_sz)247063252faSBen Walker spdk_pipe_reader_advance(struct spdk_pipe *pipe, uint32_t requested_sz)
248063252faSBen Walker {
249063252faSBen Walker uint32_t sz;
250063252faSBen Walker uint32_t read;
251063252faSBen Walker uint32_t write;
252063252faSBen Walker
253063252faSBen Walker read = pipe->read;
254063252faSBen Walker write = pipe->write;
255063252faSBen Walker
2566648ea0fSBen Walker if (requested_sz == 0) {
2576648ea0fSBen Walker return 0;
2586648ea0fSBen Walker }
2596648ea0fSBen Walker
2606648ea0fSBen Walker if (read < write) {
261063252faSBen Walker if (requested_sz > (write - read)) {
262063252faSBen Walker return -EINVAL;
263063252faSBen Walker }
264063252faSBen Walker
265063252faSBen Walker read += requested_sz;
266063252faSBen Walker } else {
267063252faSBen Walker sz = spdk_min(requested_sz, pipe->sz - read);
268063252faSBen Walker
269063252faSBen Walker read += sz;
2706648ea0fSBen Walker if (read == pipe->sz) {
271063252faSBen Walker read = 0;
272063252faSBen Walker }
273063252faSBen Walker requested_sz -= sz;
274063252faSBen Walker
275063252faSBen Walker if (requested_sz > 0) {
276063252faSBen Walker if (requested_sz > write) {
277063252faSBen Walker return -EINVAL;
278063252faSBen Walker }
279063252faSBen Walker
280063252faSBen Walker read = requested_sz;
281063252faSBen Walker }
282063252faSBen Walker }
283063252faSBen Walker
2846648ea0fSBen Walker /* We know we advanced at least one byte, so the pipe isn't full. */
2856648ea0fSBen Walker pipe->full = false;
286f4cb9817SBen Walker
287f4cb9817SBen Walker if (read == write) {
288f4cb9817SBen Walker /* The pipe is empty. To re-use the same memory more frequently, jump
289f4cb9817SBen Walker * both pointers back to the beginning of the pipe. */
290f4cb9817SBen Walker read = 0;
291f4cb9817SBen Walker pipe->write = 0;
292*158d15b7SBen Walker
293*158d15b7SBen Walker /* Additionally, release the buffer to the shared pool */
294*158d15b7SBen Walker if (pipe->group) {
295*158d15b7SBen Walker struct spdk_pipe_buf *buf = (struct spdk_pipe_buf *)pipe->buf;
296*158d15b7SBen Walker buf->sz = pipe->sz;
297*158d15b7SBen Walker SLIST_INSERT_HEAD(&pipe->group->bufs, buf, link);
298*158d15b7SBen Walker pipe->buf = NULL;
299*158d15b7SBen Walker }
300f4cb9817SBen Walker }
301f4cb9817SBen Walker
302063252faSBen Walker pipe->read = read;
303063252faSBen Walker
304063252faSBen Walker return 0;
305063252faSBen Walker }
306*158d15b7SBen Walker
307*158d15b7SBen Walker struct spdk_pipe_group *
spdk_pipe_group_create(void)308*158d15b7SBen Walker spdk_pipe_group_create(void)
309*158d15b7SBen Walker {
310*158d15b7SBen Walker struct spdk_pipe_group *group;
311*158d15b7SBen Walker
312*158d15b7SBen Walker group = calloc(1, sizeof(*group));
313*158d15b7SBen Walker if (!group) {
314*158d15b7SBen Walker return NULL;
315*158d15b7SBen Walker }
316*158d15b7SBen Walker
317*158d15b7SBen Walker SLIST_INIT(&group->bufs);
318*158d15b7SBen Walker
319*158d15b7SBen Walker return group;
320*158d15b7SBen Walker }
321*158d15b7SBen Walker
322*158d15b7SBen Walker void
spdk_pipe_group_destroy(struct spdk_pipe_group * group)323*158d15b7SBen Walker spdk_pipe_group_destroy(struct spdk_pipe_group *group)
324*158d15b7SBen Walker {
325*158d15b7SBen Walker if (!SLIST_EMPTY(&group->bufs)) {
326*158d15b7SBen Walker SPDK_ERRLOG("Destroying a pipe group that still has buffers!\n");
327*158d15b7SBen Walker assert(false);
328*158d15b7SBen Walker }
329*158d15b7SBen Walker
330*158d15b7SBen Walker free(group);
331*158d15b7SBen Walker }
332*158d15b7SBen Walker
333*158d15b7SBen Walker int
spdk_pipe_group_add(struct spdk_pipe_group * group,struct spdk_pipe * pipe)334*158d15b7SBen Walker spdk_pipe_group_add(struct spdk_pipe_group *group, struct spdk_pipe *pipe)
335*158d15b7SBen Walker {
336*158d15b7SBen Walker struct spdk_pipe_buf *buf;
337*158d15b7SBen Walker
338*158d15b7SBen Walker assert(pipe->group == NULL);
339*158d15b7SBen Walker
340*158d15b7SBen Walker pipe->group = group;
341*158d15b7SBen Walker if (pipe->read != pipe->write || pipe->full) {
342*158d15b7SBen Walker /* Pipe currently has valid data, so keep the buffer attached
343*158d15b7SBen Walker * to the pipe for now. We can move it to the group's SLIST
344*158d15b7SBen Walker * later when it gets emptied.
345*158d15b7SBen Walker */
346*158d15b7SBen Walker return 0;
347*158d15b7SBen Walker }
348*158d15b7SBen Walker
349*158d15b7SBen Walker buf = (struct spdk_pipe_buf *)pipe->buf;
350*158d15b7SBen Walker buf->sz = pipe->sz;
351*158d15b7SBen Walker SLIST_INSERT_HEAD(&group->bufs, buf, link);
352*158d15b7SBen Walker pipe->buf = NULL;
353*158d15b7SBen Walker return 0;
354*158d15b7SBen Walker }
355*158d15b7SBen Walker
356*158d15b7SBen Walker int
spdk_pipe_group_remove(struct spdk_pipe_group * group,struct spdk_pipe * pipe)357*158d15b7SBen Walker spdk_pipe_group_remove(struct spdk_pipe_group *group, struct spdk_pipe *pipe)
358*158d15b7SBen Walker {
359*158d15b7SBen Walker assert(pipe->group == group);
360*158d15b7SBen Walker
361*158d15b7SBen Walker if (pipe->buf == NULL) {
362*158d15b7SBen Walker /* Associate a buffer with the pipe before returning. */
363*158d15b7SBen Walker pipe_alloc_buf_from_group(pipe);
364*158d15b7SBen Walker assert(pipe->buf != NULL);
365*158d15b7SBen Walker }
366*158d15b7SBen Walker
367*158d15b7SBen Walker pipe->group = NULL;
368*158d15b7SBen Walker return 0;
369*158d15b7SBen Walker }
370