1 #include <u.h> 2 #include <libc.h> 3 #include "assert.h" 4 #include "threadimpl.h" 5 6 /* Channel structure. S is the size of the buffer. For unbuffered channels 7 * s is zero. v is an array of s values. If s is zero, v is unused. 8 * f and n represent the state of the queue pointed to by v. 9 * rcvrs and sndrs must be initialized to nil and should not be touched 10 * by code outside channel.c 11 */ 12 13 struct Channel { 14 int s; // Size of the channel (may be zero) 15 uint f; // Extraction point (insertion pt: (f + n) % s) 16 uint n; // Number of values in the channel 17 int e; // Element size 18 int freed; // Set when channel is being deleted 19 ulong qused; // Bitmap of used entries in rcvrs 20 Alt *qentry[32]; // Receivers/senders waiting 21 uchar v[1]; // Array of max(1, s) values in the channel 22 }; 23 24 static Lock chanlock; // Central channel access lock 25 26 ulong rendezvouses; 27 28 void 29 chanfree(Channel *c) { 30 31 lock(&chanlock); 32 if (c->qused == 0) { 33 free(c); 34 } else { 35 c->freed = 1; 36 } 37 unlock(&chanlock); 38 } 39 40 Channel * 41 chancreate(int elemsize, int elemcnt) { 42 Channel *c; 43 44 if(elemcnt < 0 || elemsize <= 0) 45 return nil; 46 c = _threadmalloc(sizeof(Channel) + elemcnt * elemsize); 47 if(c == nil) 48 return c; 49 c->f = 0; 50 c->n = 0; 51 c->freed = 0; 52 c->qused = 0; 53 c->s = elemcnt; 54 c->e = elemsize; 55 _threaddebug(DBGCHAN, "chancreate %lux", c); 56 return c; 57 } 58 59 int 60 alt(Alt *alts) { 61 Alt *a, *xa; 62 Channel *c; 63 uchar *v; 64 int i, n, entry; 65 Proc *p; 66 Thread *t; 67 68 lock(&chanlock); 69 70 (*procp)->curthread->alt = alts; 71 (*procp)->curthread->call = Callalt; 72 repeat: 73 74 // Test which channels can proceed 75 n = 1; 76 a = nil; 77 entry = -1; 78 for (xa = alts; xa->op; xa++) { 79 if (xa->op == CHANNOP) continue; 80 if (xa->op == CHANNOBLK) { 81 if (a == nil) { 82 (*procp)->curthread->call = Callnil; 83 unlock(&chanlock); 84 return xa - alts; 85 } else 86 continue; 87 } 88 89 c = xa->c; 90 if ((xa->op == CHANSND && c->n < c->s) || 91 (xa->op == CHANRCV && c->n)) { 92 // There's room to send in the channel 93 if (nrand(n) == 0) { 94 a = xa; 95 entry = -1; 96 } 97 n++; 98 } else { 99 // Test for blocked senders or receivers 100 for (i = 0; i < 32; i++) { 101 // Is it claimed? 102 if ( 103 (c->qused & (1 << i)) 104 && xa->op == (CHANSND+CHANRCV) - c->qentry[i]->op 105 // complementary op 106 && *c->qentry[i]->tag == nil 107 ) { 108 // No 109 if (nrand(n) == 0) { 110 a = xa; 111 entry = i; 112 } 113 n++; 114 break; 115 } 116 } 117 } 118 } 119 120 if (a == nil) { 121 // Nothing can proceed, enqueue on all channels 122 c = nil; 123 for (a = alts; a->op; a++) { 124 Channel *cp; 125 126 if (a->op == CHANNOP || a->op == CHANNOBLK) continue; 127 cp = a->c; 128 a->tag = &c; 129 for (i = 0; i < 32; i++) { 130 if ((cp->qused & (1 << i)) == 0) { 131 cp->qentry[i] = a; 132 cp->qused |= a->q = 1 << i; 133 break; 134 } 135 } 136 threadassert(i != 32); 137 } 138 139 // And wait for the rendez vous 140 rendezvouses++; 141 unlock(&chanlock); 142 if (threadrendezvous((ulong)&c, 0) == ~0) { 143 (*procp)->curthread->call = Callnil; 144 return -1; 145 } 146 147 lock(&chanlock); 148 149 /* We rendezed-vous on channel c, dequeue from all channels 150 * and find the Alt struct to which c belongs 151 */ 152 a = nil; 153 for (xa = alts; xa->op; xa++) { 154 Channel *xc; 155 156 if (xa->op == CHANNOP || xa->op == CHANNOBLK) continue; 157 xc = xa->c; 158 xc->qused &= ~xa->q; 159 if (xc == c) 160 a = xa; 161 162 } 163 164 if (c->s) { 165 // Buffered channel, try again 166 sleep(0); 167 goto repeat; 168 } 169 170 unlock(&chanlock); 171 172 if (c->freed) chanfree(c); 173 174 p = *procp; 175 t = p->curthread; 176 if (t->exiting) 177 threadexits(nil); 178 (*procp)->curthread->call = Callnil; 179 return a - alts; 180 } 181 182 c = a->c; 183 // Channel c can proceed 184 185 if (c->s) { 186 // Send or receive via the buffered channel 187 if (a->op == CHANSND) { 188 v = c->v + ((c->f + c->n) % c->s) * c->e; 189 if (a->v) 190 memmove(v, a->v, c->e); 191 else 192 memset(v, 0, c->e); 193 c->n++; 194 } else { 195 if (a->v) { 196 v = c->v + (c->f % c->s) * c->e; 197 memmove(a->v, v, c->e); 198 } 199 c->n--; 200 c->f++; 201 } 202 } 203 if (entry < 0) 204 for (i = 0; i < 32; i++) { 205 if ( 206 (c->qused & (1 << i)) 207 && c->qentry[i]->op == (CHANSND+CHANRCV) - a->op 208 && *c->qentry[i]->tag == nil 209 ) { 210 // Unblock peer process 211 xa = c->qentry[i]; 212 *xa->tag = c; 213 214 rendezvouses++; 215 unlock(&chanlock); 216 if (threadrendezvous((ulong)xa->tag, 0) == ~0) { 217 (*procp)->curthread->call = Callnil; 218 return -1; 219 } 220 (*procp)->curthread->call = Callnil; 221 return a - alts; 222 } 223 } 224 if (entry >= 0) { 225 xa = c->qentry[entry]; 226 if (a->op == CHANSND) { 227 if (xa->v) { 228 if (a->v) 229 memmove(xa->v, a->v, c->e); 230 else 231 memset(xa->v, 0, c->e); 232 } 233 } else { 234 if (a->v) { 235 if (xa->v) 236 memmove(a->v, xa->v, c->e); 237 else 238 memset(a->v, 0, c->e); 239 } 240 } 241 *xa->tag = c; 242 243 rendezvouses++; 244 unlock(&chanlock); 245 if (threadrendezvous((ulong)xa->tag, 0) == ~0) { 246 (*procp)->curthread->call = Callnil; 247 return -1; 248 } 249 (*procp)->curthread->call = Callnil; 250 return a - alts; 251 } 252 unlock(&chanlock); 253 yield(); 254 (*procp)->curthread->call = Callnil; 255 return a - alts; 256 } 257 258 int 259 nbrecv(Channel *c, void *v) { 260 Alt *a; 261 int i; 262 263 lock(&chanlock); 264 if (c->qused) // There's somebody waiting 265 for (i = 0; i < 32; i++) { 266 a = c->qentry[i]; 267 if ( 268 (c->qused & (1 << i)) 269 && a->op == CHANSND 270 && *a->tag == nil 271 ) { 272 *a->tag = c; 273 if (c->n) { 274 // There's an item to receive in the buffered channel 275 if (v) 276 memmove(v, c->v + (c->f % c->s) * c->e, c->e); 277 c->n--; 278 c->f++; 279 } else { 280 if (v) { 281 if (a->v) 282 memmove(v, a->v, c->e); 283 else 284 memset(v, 0, c->e); 285 } 286 } 287 288 rendezvouses++; 289 unlock(&chanlock); 290 if (threadrendezvous((ulong)a->tag, 0) == ~0) 291 return -1; 292 return 1; 293 } 294 } 295 if (c->n) { 296 // There's an item to receive in the buffered channel 297 if (v) 298 memmove(v, c->v + (c->f % c->s) * c->e, c->e); 299 c->n--; 300 c->f++; 301 unlock(&chanlock); 302 yield(); 303 return 1; 304 } 305 unlock(&chanlock); 306 return 0; 307 } 308 309 int 310 recv(Channel *c, void *v) { 311 Alt a, *xa; 312 Channel *tag; 313 int i; 314 Proc *p; 315 Thread *t; 316 317 318 lock(&chanlock); 319 retry: 320 if (c->qused) // There's somebody waiting 321 for (i = 0; i < 32; i++) { 322 xa = c->qentry[i]; 323 if ( 324 (c->qused & (1 << i)) 325 && xa->op == CHANSND 326 && *xa->tag == nil 327 ) { 328 *xa->tag = c; 329 if (c->n) { 330 // There's an item to receive in the buffered channel 331 if (v) 332 memmove(v, c->v + (c->f % c->s) * c->e, c->e); 333 c->n--; 334 c->f++; 335 } else { 336 if (v) { 337 if (xa->v) 338 memmove(v, xa->v, c->e); 339 else 340 memset(v, 0, c->e); 341 } 342 } 343 344 rendezvouses++; 345 unlock(&chanlock); 346 if (threadrendezvous((ulong)xa->tag, 0) == ~0) 347 return -1; 348 return 1; 349 } 350 } 351 if (c->n) { 352 // There's an item to receive in the buffered channel 353 if (v) 354 memmove(v, c->v + (c->f % c->s) * c->e, c->e); 355 c->n--; 356 c->f++; 357 unlock(&chanlock); 358 yield(); 359 return 1; 360 } 361 // Unbuffered, or buffered but empty 362 tag = nil; 363 a.c = c; 364 a.v = v; 365 a.tag = &tag; 366 a.op = CHANRCV; 367 p = *procp; 368 t = p->curthread; 369 t->alt = &a; 370 t->call = Callrcv; 371 372 // enqueue on the channel 373 for (i = 0; i < 32; i++) 374 if ((c->qused & (1 << i)) == 0) { 375 c->qentry[i] = &a; 376 c->qused |= a.q = 1 << i; 377 break; 378 } 379 380 rendezvouses++; 381 unlock(&chanlock); 382 if (threadrendezvous((ulong)&tag, 0) == ~0) { 383 t->call = Callnil; 384 return -1; 385 } 386 lock(&chanlock); 387 388 // dequeue from the channel 389 c->qused &= ~a.q; 390 if (c->s) goto retry; // Buffered channels: try the queue again 391 unlock(&chanlock); 392 if (c->freed) chanfree(c); 393 t->call = Callnil; 394 if (t->exiting) 395 threadexits(nil); 396 return 1; 397 } 398 399 int 400 nbsend(Channel *c, void *v) { 401 Alt *a; 402 int i; 403 404 lock(&chanlock); 405 if (c->qused) // Anybody waiting? 406 for (i = 0; i < 32; i++) { 407 a = c->qentry[i]; 408 if ( 409 (c->qused & (1 << i)) 410 && a->op == CHANRCV 411 && *a->tag == nil 412 ) { 413 *a->tag = c; 414 if (c->n < c->s) { 415 // There's room to send in the buffered channel 416 if (v) 417 memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e); 418 else 419 memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e); 420 c->n++; 421 } else { 422 if (a->v) { 423 if (v) 424 memmove(a->v, v, c->e); 425 else 426 memset(a->v, 0, c->e); 427 } 428 } 429 430 rendezvouses++; 431 unlock(&chanlock); 432 if (threadrendezvous((ulong)a->tag, 0) == ~0) 433 return -1; 434 return 1; 435 } 436 } 437 if (c->n < c->s) { 438 // There's room to send in the buffered channel 439 if (v) 440 memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e); 441 else 442 memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e); 443 c->n++; 444 unlock(&chanlock); 445 yield(); 446 return 1; 447 } 448 unlock(&chanlock); 449 return 0; 450 } 451 452 int 453 send(Channel *c, void *v) { 454 Alt a, *xa; 455 Channel *tag; 456 int i; 457 Proc *p; 458 Thread *t; 459 460 lock(&chanlock); 461 retry: 462 if (c->qused) // Anybody waiting? 463 for (i = 0; i < 32; i++) { 464 xa = c->qentry[i]; 465 threadassert(!(c->qused & (1 << i)) || xa != nil); 466 if ( 467 (c->qused & (1 << i)) 468 && xa->op == CHANRCV 469 && *xa->tag == nil 470 ) { 471 *xa->tag = c; 472 if (c->n < c->s) { 473 // There's room to send in the buffered channel 474 if (v) 475 memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e); 476 else 477 memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e); 478 c->n++; 479 } else { 480 if (xa->v) { 481 if (v) 482 memmove(xa->v, v, c->e); 483 else 484 memset(xa->v, 0, c->e); 485 } 486 } 487 488 rendezvouses++; 489 unlock(&chanlock); 490 if (threadrendezvous((ulong)xa->tag, 0) == ~0) 491 return -1; 492 return 1; 493 } 494 } 495 if (c->n < c->s) { 496 // There's room to send in the buffered channel 497 if (v) 498 memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e); 499 else 500 memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e); 501 c->n++; 502 unlock(&chanlock); 503 yield(); 504 return 1; 505 } 506 tag = nil; 507 a.c = c; 508 a.v = v; 509 a.tag = &tag; 510 a.op = CHANSND; 511 p = *procp; 512 t = p->curthread; 513 t->alt = &a; 514 t->call = Callsnd; 515 516 // enqueue on the channel 517 for (i = 0; i < 32; i++) 518 if ((c->qused & (1 << i)) == 0) { 519 c->qentry[i] = &a; 520 c->qused |= a.q = 1 << i; 521 break; 522 } 523 rendezvouses++; 524 unlock(&chanlock); 525 if (threadrendezvous((ulong)&tag, 0) == ~0) { 526 t->call = Callnil; 527 return -1; 528 } 529 // Unbuffered channels: data was transferred; dequeue 530 lock(&chanlock); 531 // dequeue from the channel 532 c->qused &= ~a.q; 533 if (c->s) goto retry; // Buffered channels: try the queue again 534 unlock(&chanlock); 535 if (c->freed) chanfree(c); 536 t->call = Callnil; 537 if (t->exiting) 538 threadexits(nil); 539 return 1; 540 } 541 542 int 543 sendul(Channel *c, ulong v) { 544 threadassert(c->e == sizeof(ulong)); 545 return send(c, &v); 546 } 547 548 ulong 549 recvul(Channel *c) { 550 ulong v; 551 552 threadassert(c->e == sizeof(ulong)); 553 recv(c, &v); 554 return v; 555 } 556 557 int 558 sendp(Channel *c, void *v) { 559 threadassert(c->e == sizeof(void *)); 560 return send(c, &v); 561 } 562 563 void * 564 recvp(Channel *c) { 565 void * v; 566 567 threadassert(c->e == sizeof(void *)); 568 recv(c, &v); 569 return v; 570 } 571 572 int 573 nbsendul(Channel *c, ulong v) { 574 threadassert(c->e == sizeof(ulong)); 575 return nbsend(c, &v); 576 } 577 578 ulong 579 nbrecvul(Channel *c) { 580 ulong v; 581 582 threadassert(c->e == sizeof(ulong)); 583 if (nbrecv(c, &v) == 0) 584 return 0; 585 return v; 586 } 587 588 int 589 nbsendp(Channel *c, void *v) { 590 threadassert(c->e == sizeof(void *)); 591 return nbsend(c, &v); 592 } 593 594 void * 595 nbrecvp(Channel *c) { 596 void * v; 597 598 threadassert(c->e == sizeof(void *)); 599 if (nbrecv(c, &v) == 0) 600 return nil; 601 return v; 602 } 603