1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2019 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/pipe.h" 7 #include "spdk/util.h" 8 #include "spdk/queue.h" 9 #include "spdk/log.h" 10 11 struct spdk_pipe_buf { 12 SLIST_ENTRY(spdk_pipe_buf) link; 13 uint32_t sz; 14 }; 15 16 struct spdk_pipe_group { 17 SLIST_HEAD(, spdk_pipe_buf) bufs; 18 }; 19 20 struct spdk_pipe { 21 uint8_t *buf; 22 uint32_t sz; 23 24 uint32_t write; 25 uint32_t read; 26 bool full; 27 28 struct spdk_pipe_group *group; 29 }; 30 31 struct spdk_pipe * 32 spdk_pipe_create(void *buf, uint32_t sz) 33 { 34 struct spdk_pipe *pipe; 35 36 pipe = calloc(1, sizeof(*pipe)); 37 if (pipe == NULL) { 38 return NULL; 39 } 40 41 pipe->buf = buf; 42 pipe->sz = sz; 43 44 return pipe; 45 } 46 47 void * 48 spdk_pipe_destroy(struct spdk_pipe *pipe) 49 { 50 void *buf; 51 52 if (pipe == NULL) { 53 return NULL; 54 } 55 56 if (pipe->group) { 57 spdk_pipe_group_remove(pipe->group, pipe); 58 } 59 60 buf = pipe->buf; 61 free(pipe); 62 return buf; 63 } 64 65 static void 66 pipe_alloc_buf_from_group(struct spdk_pipe *pipe) 67 { 68 struct spdk_pipe_buf *buf; 69 struct spdk_pipe_group *group; 70 71 assert(pipe->group != NULL); 72 group = pipe->group; 73 74 /* We have to pick a buffer that's the correct size. It's almost always 75 * the first one. */ 76 buf = SLIST_FIRST(&group->bufs); 77 while (buf != NULL) { 78 if (buf->sz == pipe->sz) { 79 /* TODO: Could track the previous and do an SLIST_REMOVE_AFTER */ 80 SLIST_REMOVE(&pipe->group->bufs, buf, spdk_pipe_buf, link); 81 pipe->buf = (void *)buf; 82 return; 83 } 84 buf = SLIST_NEXT(buf, link); 85 } 86 /* Should never get here. */ 87 assert(false); 88 } 89 90 int 91 spdk_pipe_writer_get_buffer(struct spdk_pipe *pipe, uint32_t requested_sz, struct iovec *iovs) 92 { 93 uint32_t sz; 94 uint32_t read; 95 uint32_t write; 96 97 read = pipe->read; 98 write = pipe->write; 99 100 if (pipe->full || requested_sz == 0) { 101 iovs[0].iov_base = NULL; 102 iovs[0].iov_len = 0; 103 return 0; 104 } 105 106 if (pipe->buf == NULL) { 107 pipe_alloc_buf_from_group(pipe); 108 } 109 110 if (read <= write) { 111 sz = spdk_min(requested_sz, pipe->sz - write); 112 113 iovs[0].iov_base = pipe->buf + write; 114 iovs[0].iov_len = sz; 115 116 requested_sz -= sz; 117 118 if (requested_sz > 0) { 119 sz = spdk_min(requested_sz, read); 120 121 iovs[1].iov_base = (sz == 0) ? NULL : pipe->buf; 122 iovs[1].iov_len = sz; 123 } else { 124 iovs[1].iov_base = NULL; 125 iovs[1].iov_len = 0; 126 } 127 } else { 128 sz = spdk_min(requested_sz, read - write); 129 130 iovs[0].iov_base = pipe->buf + write; 131 iovs[0].iov_len = sz; 132 iovs[1].iov_base = NULL; 133 iovs[1].iov_len = 0; 134 } 135 136 return iovs[0].iov_len + iovs[1].iov_len; 137 } 138 139 int 140 spdk_pipe_writer_advance(struct spdk_pipe *pipe, uint32_t requested_sz) 141 { 142 uint32_t sz; 143 uint32_t read; 144 uint32_t write; 145 146 read = pipe->read; 147 write = pipe->write; 148 149 if (requested_sz > pipe->sz || pipe->full) { 150 return -EINVAL; 151 } 152 153 if (read <= write) { 154 if (requested_sz > (pipe->sz - write) + read) { 155 return -EINVAL; 156 } 157 158 sz = spdk_min(requested_sz, pipe->sz - write); 159 160 write += sz; 161 if (write == pipe->sz) { 162 write = 0; 163 } 164 requested_sz -= sz; 165 166 if (requested_sz > 0) { 167 write = requested_sz; 168 } 169 } else { 170 if (requested_sz > (read - write)) { 171 return -EINVAL; 172 } 173 174 write += requested_sz; 175 } 176 177 if (read == write) { 178 pipe->full = true; 179 } 180 pipe->write = write; 181 182 return 0; 183 } 184 185 uint32_t 186 spdk_pipe_reader_bytes_available(struct spdk_pipe *pipe) 187 { 188 uint32_t read; 189 uint32_t write; 190 191 read = pipe->read; 192 write = pipe->write; 193 194 if (read == write && !pipe->full) { 195 return 0; 196 } else if (read < write) { 197 return write - read; 198 } else { 199 return (pipe->sz - read) + write; 200 } 201 } 202 203 int 204 spdk_pipe_reader_get_buffer(struct spdk_pipe *pipe, uint32_t requested_sz, struct iovec *iovs) 205 { 206 uint32_t sz; 207 uint32_t read; 208 uint32_t write; 209 210 read = pipe->read; 211 write = pipe->write; 212 213 if ((read == write && !pipe->full) || requested_sz == 0) { 214 iovs[0].iov_base = NULL; 215 iovs[0].iov_len = 0; 216 iovs[1].iov_base = NULL; 217 iovs[1].iov_len = 0; 218 } else if (read < write) { 219 sz = spdk_min(requested_sz, write - read); 220 221 iovs[0].iov_base = pipe->buf + read; 222 iovs[0].iov_len = sz; 223 iovs[1].iov_base = NULL; 224 iovs[1].iov_len = 0; 225 } else { 226 sz = spdk_min(requested_sz, pipe->sz - read); 227 228 iovs[0].iov_base = pipe->buf + read; 229 iovs[0].iov_len = sz; 230 231 requested_sz -= sz; 232 233 if (requested_sz > 0) { 234 sz = spdk_min(requested_sz, write); 235 iovs[1].iov_base = (sz == 0) ? NULL : pipe->buf; 236 iovs[1].iov_len = sz; 237 } else { 238 iovs[1].iov_base = NULL; 239 iovs[1].iov_len = 0; 240 } 241 } 242 243 return iovs[0].iov_len + iovs[1].iov_len; 244 } 245 246 int 247 spdk_pipe_reader_advance(struct spdk_pipe *pipe, uint32_t requested_sz) 248 { 249 uint32_t sz; 250 uint32_t read; 251 uint32_t write; 252 253 read = pipe->read; 254 write = pipe->write; 255 256 if (requested_sz == 0) { 257 return 0; 258 } 259 260 if (read < write) { 261 if (requested_sz > (write - read)) { 262 return -EINVAL; 263 } 264 265 read += requested_sz; 266 } else { 267 sz = spdk_min(requested_sz, pipe->sz - read); 268 269 read += sz; 270 if (read == pipe->sz) { 271 read = 0; 272 } 273 requested_sz -= sz; 274 275 if (requested_sz > 0) { 276 if (requested_sz > write) { 277 return -EINVAL; 278 } 279 280 read = requested_sz; 281 } 282 } 283 284 /* We know we advanced at least one byte, so the pipe isn't full. */ 285 pipe->full = false; 286 287 if (read == write) { 288 /* The pipe is empty. To re-use the same memory more frequently, jump 289 * both pointers back to the beginning of the pipe. */ 290 read = 0; 291 pipe->write = 0; 292 293 /* Additionally, release the buffer to the shared pool */ 294 if (pipe->group) { 295 struct spdk_pipe_buf *buf = (struct spdk_pipe_buf *)pipe->buf; 296 buf->sz = pipe->sz; 297 SLIST_INSERT_HEAD(&pipe->group->bufs, buf, link); 298 pipe->buf = NULL; 299 } 300 } 301 302 pipe->read = read; 303 304 return 0; 305 } 306 307 struct spdk_pipe_group * 308 spdk_pipe_group_create(void) 309 { 310 struct spdk_pipe_group *group; 311 312 group = calloc(1, sizeof(*group)); 313 if (!group) { 314 return NULL; 315 } 316 317 SLIST_INIT(&group->bufs); 318 319 return group; 320 } 321 322 void 323 spdk_pipe_group_destroy(struct spdk_pipe_group *group) 324 { 325 if (!SLIST_EMPTY(&group->bufs)) { 326 SPDK_ERRLOG("Destroying a pipe group that still has buffers!\n"); 327 assert(false); 328 } 329 330 free(group); 331 } 332 333 int 334 spdk_pipe_group_add(struct spdk_pipe_group *group, struct spdk_pipe *pipe) 335 { 336 struct spdk_pipe_buf *buf; 337 338 assert(pipe->group == NULL); 339 340 pipe->group = group; 341 if (pipe->read != pipe->write || pipe->full) { 342 /* Pipe currently has valid data, so keep the buffer attached 343 * to the pipe for now. We can move it to the group's SLIST 344 * later when it gets emptied. 345 */ 346 return 0; 347 } 348 349 buf = (struct spdk_pipe_buf *)pipe->buf; 350 buf->sz = pipe->sz; 351 SLIST_INSERT_HEAD(&group->bufs, buf, link); 352 pipe->buf = NULL; 353 return 0; 354 } 355 356 int 357 spdk_pipe_group_remove(struct spdk_pipe_group *group, struct spdk_pipe *pipe) 358 { 359 assert(pipe->group == group); 360 361 if (pipe->buf == NULL) { 362 /* Associate a buffer with the pipe before returning. */ 363 pipe_alloc_buf_from_group(pipe); 364 assert(pipe->buf != NULL); 365 } 366 367 pipe->group = NULL; 368 return 0; 369 } 370