1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright (C) 2019 Intel Corporation.
3 * All rights reserved.
4 */
5
6 #include "spdk/pipe.h"
7 #include "spdk/util.h"
8 #include "spdk/queue.h"
9 #include "spdk/log.h"
10
11 struct spdk_pipe_buf {
12 SLIST_ENTRY(spdk_pipe_buf) link;
13 uint32_t sz;
14 };
15
16 struct spdk_pipe_group {
17 SLIST_HEAD(, spdk_pipe_buf) bufs;
18 };
19
20 struct spdk_pipe {
21 uint8_t *buf;
22 uint32_t sz;
23
24 uint32_t write;
25 uint32_t read;
26 bool full;
27
28 struct spdk_pipe_group *group;
29 };
30
31 struct spdk_pipe *
spdk_pipe_create(void * buf,uint32_t sz)32 spdk_pipe_create(void *buf, uint32_t sz)
33 {
34 struct spdk_pipe *pipe;
35
36 pipe = calloc(1, sizeof(*pipe));
37 if (pipe == NULL) {
38 return NULL;
39 }
40
41 pipe->buf = buf;
42 pipe->sz = sz;
43
44 return pipe;
45 }
46
47 void *
spdk_pipe_destroy(struct spdk_pipe * pipe)48 spdk_pipe_destroy(struct spdk_pipe *pipe)
49 {
50 void *buf;
51
52 if (pipe == NULL) {
53 return NULL;
54 }
55
56 if (pipe->group) {
57 spdk_pipe_group_remove(pipe->group, pipe);
58 }
59
60 buf = pipe->buf;
61 free(pipe);
62 return buf;
63 }
64
65 static void
pipe_alloc_buf_from_group(struct spdk_pipe * pipe)66 pipe_alloc_buf_from_group(struct spdk_pipe *pipe)
67 {
68 struct spdk_pipe_buf *buf;
69 struct spdk_pipe_group *group;
70
71 assert(pipe->group != NULL);
72 group = pipe->group;
73
74 /* We have to pick a buffer that's the correct size. It's almost always
75 * the first one. */
76 buf = SLIST_FIRST(&group->bufs);
77 while (buf != NULL) {
78 if (buf->sz == pipe->sz) {
79 /* TODO: Could track the previous and do an SLIST_REMOVE_AFTER */
80 SLIST_REMOVE(&pipe->group->bufs, buf, spdk_pipe_buf, link);
81 pipe->buf = (void *)buf;
82 return;
83 }
84 buf = SLIST_NEXT(buf, link);
85 }
86 /* Should never get here. */
87 assert(false);
88 }
89
90 int
spdk_pipe_writer_get_buffer(struct spdk_pipe * pipe,uint32_t requested_sz,struct iovec * iovs)91 spdk_pipe_writer_get_buffer(struct spdk_pipe *pipe, uint32_t requested_sz, struct iovec *iovs)
92 {
93 uint32_t sz;
94 uint32_t read;
95 uint32_t write;
96
97 read = pipe->read;
98 write = pipe->write;
99
100 if (pipe->full || requested_sz == 0) {
101 iovs[0].iov_base = NULL;
102 iovs[0].iov_len = 0;
103 return 0;
104 }
105
106 if (pipe->buf == NULL) {
107 pipe_alloc_buf_from_group(pipe);
108 }
109
110 if (read <= write) {
111 sz = spdk_min(requested_sz, pipe->sz - write);
112
113 iovs[0].iov_base = pipe->buf + write;
114 iovs[0].iov_len = sz;
115
116 requested_sz -= sz;
117
118 if (requested_sz > 0) {
119 sz = spdk_min(requested_sz, read);
120
121 iovs[1].iov_base = (sz == 0) ? NULL : pipe->buf;
122 iovs[1].iov_len = sz;
123 } else {
124 iovs[1].iov_base = NULL;
125 iovs[1].iov_len = 0;
126 }
127 } else {
128 sz = spdk_min(requested_sz, read - write);
129
130 iovs[0].iov_base = pipe->buf + write;
131 iovs[0].iov_len = sz;
132 iovs[1].iov_base = NULL;
133 iovs[1].iov_len = 0;
134 }
135
136 return iovs[0].iov_len + iovs[1].iov_len;
137 }
138
139 int
spdk_pipe_writer_advance(struct spdk_pipe * pipe,uint32_t requested_sz)140 spdk_pipe_writer_advance(struct spdk_pipe *pipe, uint32_t requested_sz)
141 {
142 uint32_t sz;
143 uint32_t read;
144 uint32_t write;
145
146 read = pipe->read;
147 write = pipe->write;
148
149 if (requested_sz > pipe->sz || pipe->full) {
150 return -EINVAL;
151 }
152
153 if (read <= write) {
154 if (requested_sz > (pipe->sz - write) + read) {
155 return -EINVAL;
156 }
157
158 sz = spdk_min(requested_sz, pipe->sz - write);
159
160 write += sz;
161 if (write == pipe->sz) {
162 write = 0;
163 }
164 requested_sz -= sz;
165
166 if (requested_sz > 0) {
167 write = requested_sz;
168 }
169 } else {
170 if (requested_sz > (read - write)) {
171 return -EINVAL;
172 }
173
174 write += requested_sz;
175 }
176
177 if (read == write) {
178 pipe->full = true;
179 }
180 pipe->write = write;
181
182 return 0;
183 }
184
185 uint32_t
spdk_pipe_reader_bytes_available(struct spdk_pipe * pipe)186 spdk_pipe_reader_bytes_available(struct spdk_pipe *pipe)
187 {
188 uint32_t read;
189 uint32_t write;
190
191 read = pipe->read;
192 write = pipe->write;
193
194 if (read == write && !pipe->full) {
195 return 0;
196 } else if (read < write) {
197 return write - read;
198 } else {
199 return (pipe->sz - read) + write;
200 }
201 }
202
203 int
spdk_pipe_reader_get_buffer(struct spdk_pipe * pipe,uint32_t requested_sz,struct iovec * iovs)204 spdk_pipe_reader_get_buffer(struct spdk_pipe *pipe, uint32_t requested_sz, struct iovec *iovs)
205 {
206 uint32_t sz;
207 uint32_t read;
208 uint32_t write;
209
210 read = pipe->read;
211 write = pipe->write;
212
213 if ((read == write && !pipe->full) || requested_sz == 0) {
214 iovs[0].iov_base = NULL;
215 iovs[0].iov_len = 0;
216 iovs[1].iov_base = NULL;
217 iovs[1].iov_len = 0;
218 } else if (read < write) {
219 sz = spdk_min(requested_sz, write - read);
220
221 iovs[0].iov_base = pipe->buf + read;
222 iovs[0].iov_len = sz;
223 iovs[1].iov_base = NULL;
224 iovs[1].iov_len = 0;
225 } else {
226 sz = spdk_min(requested_sz, pipe->sz - read);
227
228 iovs[0].iov_base = pipe->buf + read;
229 iovs[0].iov_len = sz;
230
231 requested_sz -= sz;
232
233 if (requested_sz > 0) {
234 sz = spdk_min(requested_sz, write);
235 iovs[1].iov_base = (sz == 0) ? NULL : pipe->buf;
236 iovs[1].iov_len = sz;
237 } else {
238 iovs[1].iov_base = NULL;
239 iovs[1].iov_len = 0;
240 }
241 }
242
243 return iovs[0].iov_len + iovs[1].iov_len;
244 }
245
246 int
spdk_pipe_reader_advance(struct spdk_pipe * pipe,uint32_t requested_sz)247 spdk_pipe_reader_advance(struct spdk_pipe *pipe, uint32_t requested_sz)
248 {
249 uint32_t sz;
250 uint32_t read;
251 uint32_t write;
252
253 read = pipe->read;
254 write = pipe->write;
255
256 if (requested_sz == 0) {
257 return 0;
258 }
259
260 if (read < write) {
261 if (requested_sz > (write - read)) {
262 return -EINVAL;
263 }
264
265 read += requested_sz;
266 } else {
267 sz = spdk_min(requested_sz, pipe->sz - read);
268
269 read += sz;
270 if (read == pipe->sz) {
271 read = 0;
272 }
273 requested_sz -= sz;
274
275 if (requested_sz > 0) {
276 if (requested_sz > write) {
277 return -EINVAL;
278 }
279
280 read = requested_sz;
281 }
282 }
283
284 /* We know we advanced at least one byte, so the pipe isn't full. */
285 pipe->full = false;
286
287 if (read == write) {
288 /* The pipe is empty. To re-use the same memory more frequently, jump
289 * both pointers back to the beginning of the pipe. */
290 read = 0;
291 pipe->write = 0;
292
293 /* Additionally, release the buffer to the shared pool */
294 if (pipe->group) {
295 struct spdk_pipe_buf *buf = (struct spdk_pipe_buf *)pipe->buf;
296 buf->sz = pipe->sz;
297 SLIST_INSERT_HEAD(&pipe->group->bufs, buf, link);
298 pipe->buf = NULL;
299 }
300 }
301
302 pipe->read = read;
303
304 return 0;
305 }
306
307 struct spdk_pipe_group *
spdk_pipe_group_create(void)308 spdk_pipe_group_create(void)
309 {
310 struct spdk_pipe_group *group;
311
312 group = calloc(1, sizeof(*group));
313 if (!group) {
314 return NULL;
315 }
316
317 SLIST_INIT(&group->bufs);
318
319 return group;
320 }
321
322 void
spdk_pipe_group_destroy(struct spdk_pipe_group * group)323 spdk_pipe_group_destroy(struct spdk_pipe_group *group)
324 {
325 if (!SLIST_EMPTY(&group->bufs)) {
326 SPDK_ERRLOG("Destroying a pipe group that still has buffers!\n");
327 assert(false);
328 }
329
330 free(group);
331 }
332
333 int
spdk_pipe_group_add(struct spdk_pipe_group * group,struct spdk_pipe * pipe)334 spdk_pipe_group_add(struct spdk_pipe_group *group, struct spdk_pipe *pipe)
335 {
336 struct spdk_pipe_buf *buf;
337
338 assert(pipe->group == NULL);
339
340 pipe->group = group;
341 if (pipe->read != pipe->write || pipe->full) {
342 /* Pipe currently has valid data, so keep the buffer attached
343 * to the pipe for now. We can move it to the group's SLIST
344 * later when it gets emptied.
345 */
346 return 0;
347 }
348
349 buf = (struct spdk_pipe_buf *)pipe->buf;
350 buf->sz = pipe->sz;
351 SLIST_INSERT_HEAD(&group->bufs, buf, link);
352 pipe->buf = NULL;
353 return 0;
354 }
355
356 int
spdk_pipe_group_remove(struct spdk_pipe_group * group,struct spdk_pipe * pipe)357 spdk_pipe_group_remove(struct spdk_pipe_group *group, struct spdk_pipe *pipe)
358 {
359 assert(pipe->group == group);
360
361 if (pipe->buf == NULL) {
362 /* Associate a buffer with the pipe before returning. */
363 pipe_alloc_buf_from_group(pipe);
364 assert(pipe->buf != NULL);
365 }
366
367 pipe->group = NULL;
368 return 0;
369 }
370