1 #include <u.h> 2 #include <libc.h> 3 #include "assert.h" 4 #include "threadimpl.h" 5 6 static Lock chanlock; // Central channel access lock 7 8 void 9 chanfree(Channel *c) { 10 11 lock(&chanlock); 12 if (c->qused == 0) { 13 free(c); 14 } else { 15 c->freed = 1; 16 } 17 unlock(&chanlock); 18 } 19 20 int 21 chaninit(Channel *c, int elemsize, int elemcnt) { 22 if(elemcnt < 0 || elemsize <= 0 || c == nil) 23 return -1; 24 c->f = 0; 25 c->n = 0; 26 c->freed = 0; 27 c->qused = 0; 28 c->s = elemcnt; 29 c->e = elemsize; 30 _threaddebug(DBGCHAN, "chaninit %lux", c); 31 return 1; 32 } 33 34 Channel * 35 chancreate(int elemsize, int elemcnt) { 36 Channel *c; 37 38 if(elemcnt < 0 || elemsize <= 0) 39 return nil; 40 c = _threadmalloc(sizeof(Channel) + elemcnt * elemsize); 41 if(c == nil) 42 return c; 43 c->f = 0; 44 c->n = 0; 45 c->freed = 0; 46 c->qused = 0; 47 c->s = elemcnt; 48 c->e = elemsize; 49 _threaddebug(DBGCHAN, "chancreate %lux", c); 50 return c; 51 } 52 53 int 54 alt(Alt *alts) { 55 Alt *a, *xa; 56 Channel *c; 57 uchar *v; 58 int i, n, entry; 59 Proc *p; 60 Thread *t; 61 62 lock(&chanlock); 63 64 (*procp)->curthread->alt = alts; 65 (*procp)->curthread->call = Callalt; 66 repeat: 67 68 // Test which channels can proceed 69 n = 1; 70 a = nil; 71 entry = -1; 72 for (xa = alts; xa->op; xa++) { 73 if (xa->op == CHANNOP) continue; 74 if (xa->op == CHANNOBLK) { 75 if (a == nil) { 76 (*procp)->curthread->call = Callnil; 77 unlock(&chanlock); 78 return xa - alts; 79 } else 80 break; 81 } 82 83 c = xa->c; 84 if ((xa->op == CHANSND && c->n < c->s) || 85 (xa->op == CHANRCV && c->n)) { 86 // There's room to send in the channel 87 if (nrand(n) == 0) { 88 a = xa; 89 entry = -1; 90 } 91 n++; 92 } else { 93 // Test for blocked senders or receivers 94 for (i = 0; i < 32; i++) { 95 // Is it claimed? 96 if ( 97 (c->qused & (1 << i)) 98 && xa->op == (CHANSND+CHANRCV) - c->qentry[i]->op 99 // complementary op 100 && *c->qentry[i]->tag == nil 101 ) { 102 // No 103 if (nrand(n) == 0) { 104 a = xa; 105 entry = i; 106 } 107 n++; 108 break; 109 } 110 } 111 } 112 } 113 114 if (a == nil) { 115 // Nothing can proceed, enqueue on all channels 116 c = nil; 117 for (a = alts; a->op; a++) { 118 Channel *cp; 119 120 if (a->op == CHANNOP || a->op == CHANNOBLK) continue; 121 cp = a->c; 122 a->tag = &c; 123 for (i = 0; i < 32; i++) { 124 if ((cp->qused & (1 << i)) == 0) { 125 cp->qentry[i] = a; 126 cp->qused |= a->q = 1 << i; 127 break; 128 } 129 } 130 threadassert(i != 32); 131 } 132 133 // And wait for the rendez vous 134 unlock(&chanlock); 135 if (_threadrendezvous((ulong)&c, 0) == ~0) { 136 (*procp)->curthread->call = Callnil; 137 return -1; 138 } 139 140 lock(&chanlock); 141 142 /* We rendezed-vous on channel c, dequeue from all channels 143 * and find the Alt struct to which c belongs 144 */ 145 a = nil; 146 for (xa = alts; xa->op; xa++) { 147 Channel *xc; 148 149 if (xa->op == CHANNOP || xa->op == CHANNOBLK) continue; 150 xc = xa->c; 151 xc->qused &= ~xa->q; 152 if (xc == c) 153 a = xa; 154 155 } 156 157 if (c->s) { 158 // Buffered channel, try again 159 sleep(0); 160 goto repeat; 161 } 162 163 unlock(&chanlock); 164 165 if (c->freed) chanfree(c); 166 167 p = *procp; 168 t = p->curthread; 169 if (t->exiting) 170 threadexits(nil); 171 (*procp)->curthread->call = Callnil; 172 return a - alts; 173 } 174 175 c = a->c; 176 // Channel c can proceed 177 178 if (c->s) { 179 // Send or receive via the buffered channel 180 if (a->op == CHANSND) { 181 v = c->v + ((c->f + c->n) % c->s) * c->e; 182 if (a->v) 183 memmove(v, a->v, c->e); 184 else 185 memset(v, 0, c->e); 186 c->n++; 187 } else { 188 if (a->v) { 189 v = c->v + (c->f % c->s) * c->e; 190 memmove(a->v, v, c->e); 191 } 192 c->n--; 193 c->f++; 194 } 195 } 196 if (entry < 0) 197 for (i = 0; i < 32; i++) { 198 if ( 199 (c->qused & (1 << i)) 200 && c->qentry[i]->op == (CHANSND+CHANRCV) - a->op 201 && *c->qentry[i]->tag == nil 202 ) { 203 // Unblock peer process 204 xa = c->qentry[i]; 205 *xa->tag = c; 206 207 unlock(&chanlock); 208 if (_threadrendezvous((ulong)xa->tag, 0) == ~0) { 209 (*procp)->curthread->call = Callnil; 210 return -1; 211 } 212 (*procp)->curthread->call = Callnil; 213 return a - alts; 214 } 215 } 216 if (entry >= 0) { 217 xa = c->qentry[entry]; 218 if (a->op == CHANSND) { 219 if (xa->v) { 220 if (a->v) 221 memmove(xa->v, a->v, c->e); 222 else 223 memset(xa->v, 0, c->e); 224 } 225 } else { 226 if (a->v) { 227 if (xa->v) 228 memmove(a->v, xa->v, c->e); 229 else 230 memset(a->v, 0, c->e); 231 } 232 } 233 *xa->tag = c; 234 235 unlock(&chanlock); 236 if (_threadrendezvous((ulong)xa->tag, 0) == ~0) { 237 (*procp)->curthread->call = Callnil; 238 return -1; 239 } 240 (*procp)->curthread->call = Callnil; 241 return a - alts; 242 } 243 unlock(&chanlock); 244 yield(); 245 (*procp)->curthread->call = Callnil; 246 return a - alts; 247 } 248 249 int 250 nbrecv(Channel *c, void *v) { 251 Alt *a; 252 int i; 253 254 lock(&chanlock); 255 if (c->qused) // There's somebody waiting 256 for (i = 0; i < 32; i++) { 257 a = c->qentry[i]; 258 if ( 259 (c->qused & (1 << i)) 260 && a->op == CHANSND 261 && *a->tag == nil 262 ) { 263 *a->tag = c; 264 if (c->n) { 265 // There's an item to receive in the buffered channel 266 if (v) 267 memmove(v, c->v + (c->f % c->s) * c->e, c->e); 268 c->n--; 269 c->f++; 270 } else { 271 if (v) { 272 if (a->v) 273 memmove(v, a->v, c->e); 274 else 275 memset(v, 0, c->e); 276 } 277 } 278 279 unlock(&chanlock); 280 if (_threadrendezvous((ulong)a->tag, 0) == ~0) 281 return -1; 282 return 1; 283 } 284 } 285 if (c->n) { 286 // There's an item to receive in the buffered channel 287 if (v) 288 memmove(v, c->v + (c->f % c->s) * c->e, c->e); 289 c->n--; 290 c->f++; 291 unlock(&chanlock); 292 yield(); 293 return 1; 294 } 295 unlock(&chanlock); 296 return 0; 297 } 298 299 int 300 recv(Channel *c, void *v) { 301 Alt a, *xa; 302 Channel *tag; 303 int i; 304 Proc *p; 305 Thread *t; 306 307 308 lock(&chanlock); 309 retry: 310 if (c->qused) // There's somebody waiting 311 for (i = 0; i < 32; i++) { 312 xa = c->qentry[i]; 313 if ( 314 (c->qused & (1 << i)) 315 && xa->op == CHANSND 316 && *xa->tag == nil 317 ) { 318 *xa->tag = c; 319 if (c->n) { 320 // There's an item to receive in the buffered channel 321 if (v) 322 memmove(v, c->v + (c->f % c->s) * c->e, c->e); 323 c->n--; 324 c->f++; 325 } else { 326 if (v) { 327 if (xa->v) 328 memmove(v, xa->v, c->e); 329 else 330 memset(v, 0, c->e); 331 } 332 } 333 334 unlock(&chanlock); 335 if (_threadrendezvous((ulong)xa->tag, 0) == ~0) 336 return -1; 337 return 1; 338 } 339 } 340 if (c->n) { 341 // There's an item to receive in the buffered channel 342 if (v) 343 memmove(v, c->v + (c->f % c->s) * c->e, c->e); 344 c->n--; 345 c->f++; 346 unlock(&chanlock); 347 yield(); 348 return 1; 349 } 350 // Unbuffered, or buffered but empty 351 tag = nil; 352 a.c = c; 353 a.v = v; 354 a.tag = &tag; 355 a.op = CHANRCV; 356 p = *procp; 357 t = p->curthread; 358 t->alt = &a; 359 t->call = Callrcv; 360 361 // enqueue on the channel 362 for (i = 0; i < 32; i++) 363 if ((c->qused & (1 << i)) == 0) { 364 c->qentry[i] = &a; 365 c->qused |= a.q = 1 << i; 366 break; 367 } 368 369 unlock(&chanlock); 370 if (_threadrendezvous((ulong)&tag, 0) == ~0) { 371 t->call = Callnil; 372 return -1; 373 } 374 lock(&chanlock); 375 376 // dequeue from the channel 377 c->qused &= ~a.q; 378 if (c->s) goto retry; // Buffered channels: try the queue again 379 unlock(&chanlock); 380 if (c->freed) chanfree(c); 381 t->call = Callnil; 382 if (t->exiting) 383 threadexits(nil); 384 return 1; 385 } 386 387 int 388 nbsend(Channel *c, void *v) { 389 Alt *a; 390 int i; 391 392 lock(&chanlock); 393 if (c->qused) // Anybody waiting? 394 for (i = 0; i < 32; i++) { 395 a = c->qentry[i]; 396 if ( 397 (c->qused & (1 << i)) 398 && a->op == CHANRCV 399 && *a->tag == nil 400 ) { 401 *a->tag = c; 402 if (c->n < c->s) { 403 // There's room to send in the buffered channel 404 if (v) 405 memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e); 406 else 407 memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e); 408 c->n++; 409 } else { 410 if (a->v) { 411 if (v) 412 memmove(a->v, v, c->e); 413 else 414 memset(a->v, 0, c->e); 415 } 416 } 417 418 unlock(&chanlock); 419 if (_threadrendezvous((ulong)a->tag, 0) == ~0) 420 return -1; 421 return 1; 422 } 423 } 424 if (c->n < c->s) { 425 // There's room to send in the buffered channel 426 if (v) 427 memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e); 428 else 429 memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e); 430 c->n++; 431 unlock(&chanlock); 432 yield(); 433 return 1; 434 } 435 unlock(&chanlock); 436 return 0; 437 } 438 439 int 440 send(Channel *c, void *v) { 441 Alt a, *xa; 442 Channel *tag; 443 int i; 444 Proc *p; 445 Thread *t; 446 447 lock(&chanlock); 448 retry: 449 if (c->qused) // Anybody waiting? 450 for (i = 0; i < 32; i++) { 451 xa = c->qentry[i]; 452 threadassert(!(c->qused & (1 << i)) || xa != nil); 453 if ( 454 (c->qused & (1 << i)) 455 && xa->op == CHANRCV 456 && *xa->tag == nil 457 ) { 458 *xa->tag = c; 459 if (c->n < c->s) { 460 // There's room to send in the buffered channel 461 if (v) 462 memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e); 463 else 464 memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e); 465 c->n++; 466 } else { 467 if (xa->v) { 468 if (v) 469 memmove(xa->v, v, c->e); 470 else 471 memset(xa->v, 0, c->e); 472 } 473 } 474 475 unlock(&chanlock); 476 if (_threadrendezvous((ulong)xa->tag, 0) == ~0) 477 return -1; 478 return 1; 479 } 480 } 481 if (c->n < c->s) { 482 // There's room to send in the buffered channel 483 if (v) 484 memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e); 485 else 486 memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e); 487 c->n++; 488 unlock(&chanlock); 489 yield(); 490 return 1; 491 } 492 tag = nil; 493 a.c = c; 494 a.v = v; 495 a.tag = &tag; 496 a.op = CHANSND; 497 p = *procp; 498 t = p->curthread; 499 t->alt = &a; 500 t->call = Callsnd; 501 502 // enqueue on the channel 503 for (i = 0; i < 32; i++) 504 if ((c->qused & (1 << i)) == 0) { 505 c->qentry[i] = &a; 506 c->qused |= a.q = 1 << i; 507 break; 508 } 509 unlock(&chanlock); 510 if (_threadrendezvous((ulong)&tag, 0) == ~0) { 511 t->call = Callnil; 512 return -1; 513 } 514 // Unbuffered channels: data was transferred; dequeue 515 lock(&chanlock); 516 // dequeue from the channel 517 c->qused &= ~a.q; 518 if (c->s) goto retry; // Buffered channels: try the queue again 519 unlock(&chanlock); 520 if (c->freed) chanfree(c); 521 t->call = Callnil; 522 if (t->exiting) 523 threadexits(nil); 524 return 1; 525 } 526 527 int 528 sendul(Channel *c, ulong v) { 529 threadassert(c->e == sizeof(ulong)); 530 return send(c, &v); 531 } 532 533 ulong 534 recvul(Channel *c) { 535 ulong v; 536 537 threadassert(c->e == sizeof(ulong)); 538 recv(c, &v); 539 return v; 540 } 541 542 int 543 sendp(Channel *c, void *v) { 544 threadassert(c->e == sizeof(void *)); 545 return send(c, &v); 546 } 547 548 void * 549 recvp(Channel *c) { 550 void * v; 551 552 threadassert(c->e == sizeof(void *)); 553 recv(c, &v); 554 return v; 555 } 556 557 int 558 nbsendul(Channel *c, ulong v) { 559 threadassert(c->e == sizeof(ulong)); 560 return nbsend(c, &v); 561 } 562 563 ulong 564 nbrecvul(Channel *c) { 565 ulong v; 566 567 threadassert(c->e == sizeof(ulong)); 568 if (nbrecv(c, &v) == 0) 569 return 0; 570 return v; 571 } 572 573 int 574 nbsendp(Channel *c, void *v) { 575 threadassert(c->e == sizeof(void *)); 576 return nbsend(c, &v); 577 } 578 579 void * 580 nbrecvp(Channel *c) { 581 void * v; 582 583 threadassert(c->e == sizeof(void *)); 584 if (nbrecv(c, &v) == 0) 585 return nil; 586 return v; 587 } 588