1 #include <u.h> 2 #include <libc.h> 3 #include <thread.h> 4 #include "threadimpl.h" 5 6 /* Value to indicate the channel is closed */ 7 enum { 8 CHANCLOSD = 0xc105ed, 9 }; 10 11 static char errcl[] = "channel was closed"; 12 static Lock chanlock; /* central channel access lock */ 13 14 static void enqueue(Alt*, Channel**); 15 static void dequeue(Alt*); 16 static int canexec(Alt*); 17 static int altexec(Alt*, int); 18 19 #define Closed ((void*)CHANCLOSD) 20 #define Intred ((void*)~0) /* interrupted */ 21 22 static void 23 _chanfree(Channel *c) 24 { 25 int i, inuse; 26 27 if(c->closed == 1) /* chanclose is ongoing */ 28 inuse = 1; 29 else{ 30 inuse = 0; 31 for(i = 0; i < c->nentry; i++) /* alt ongoing */ 32 if(c->qentry[i]) 33 inuse = 1; 34 } 35 if(inuse) 36 c->freed = 1; 37 else{ 38 if(c->qentry) 39 free(c->qentry); 40 free(c); 41 } 42 } 43 44 void 45 chanfree(Channel *c) 46 { 47 lock(&chanlock); 48 _chanfree(c); 49 unlock(&chanlock); 50 } 51 52 int 53 chaninit(Channel *c, int elemsize, int elemcnt) 54 { 55 if(elemcnt < 0 || elemsize <= 0 || c == nil) 56 return -1; 57 c->f = 0; 58 c->n = 0; 59 c->closed = 0; 60 c->freed = 0; 61 c->e = elemsize; 62 c->s = elemcnt; 63 _threaddebug(DBGCHAN, "chaninit %p", c); 64 return 1; 65 } 66 67 Channel* 68 chancreate(int elemsize, int elemcnt) 69 { 70 Channel *c; 71 72 if(elemcnt < 0 || elemsize <= 0) 73 return nil; 74 c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1); 75 c->e = elemsize; 76 c->s = elemcnt; 77 _threaddebug(DBGCHAN, "chancreate %p", c); 78 return c; 79 } 80 81 static int 82 isopenfor(Channel *c, int op) 83 { 84 return c->closed == 0 || (op == CHANRCV && c->n > 0); 85 } 86 87 int 88 alt(Alt *alts) 89 { 90 Alt *a, *xa, *ca; 91 Channel volatile *c; 92 int n, s, waiting, allreadycl; 93 void* r; 94 Thread *t; 95 96 /* 97 * The point of going splhi here is that note handlers 98 * might reasonably want to use channel operations, 99 * but that will hang if the note comes while we hold the 100 * chanlock. Instead, we delay the note until we've dropped 101 * the lock. 102 */ 103 t = _threadgetproc()->thread; 104 if(t->moribund || _threadexitsallstatus) 105 yield(); /* won't return */ 106 s = _procsplhi(); 107 lock(&chanlock); 108 t->alt = alts; 109 t->chan = Chanalt; 110 111 /* test whether any channels can proceed */ 112 n = 0; 113 a = nil; 114 115 for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){ 116 xa->entryno = -1; 117 if(xa->op == CHANNOP) 118 continue; 119 120 c = xa->c; 121 if(c==nil){ 122 unlock(&chanlock); 123 _procsplx(s); 124 t->chan = Channone; 125 return -1; 126 } 127 128 if(isopenfor(c, xa->op) && canexec(xa)) 129 if(nrand(++n) == 0) 130 a = xa; 131 } 132 133 134 if(a==nil){ 135 /* nothing can proceed */ 136 if(xa->op == CHANNOBLK){ 137 unlock(&chanlock); 138 _procsplx(s); 139 t->chan = Channone; 140 if(xa->op == CHANNOBLK) 141 return xa - alts; 142 } 143 144 /* enqueue on all channels open for us. */ 145 c = nil; 146 ca = nil; 147 waiting = 0; 148 allreadycl = 0; 149 for(xa=alts; xa->op!=CHANEND; xa++) 150 if(xa->op==CHANNOP) 151 continue; 152 else if(isopenfor(xa->c, xa->op)){ 153 waiting = 1; 154 enqueue(xa, &c); 155 } else if(xa->err != errcl) 156 ca = xa; 157 else 158 allreadycl = 1; 159 160 if(waiting == 0) 161 if(ca != nil){ 162 /* everything was closed, select last channel */ 163 ca->err = errcl; 164 unlock(&chanlock); 165 _procsplx(s); 166 t->chan = Channone; 167 return ca - alts; 168 } else if(allreadycl){ 169 /* everything was already closed */ 170 unlock(&chanlock); 171 _procsplx(s); 172 t->chan = Channone; 173 return -1; 174 } 175 /* 176 * wait for successful rendezvous. 177 * we can't just give up if the rendezvous 178 * is interrupted -- someone else might come 179 * along and try to rendezvous with us, so 180 * we need to be here. 181 * if the channel was closed, the op is done 182 * and we flag an error for the entry. 183 */ 184 Again: 185 unlock(&chanlock); 186 _procsplx(s); 187 r = _threadrendezvous(&c, 0); 188 s = _procsplhi(); 189 lock(&chanlock); 190 191 if(r==Intred){ /* interrupted */ 192 if(c!=nil) /* someone will meet us; go back */ 193 goto Again; 194 c = (Channel*)~0; /* so no one tries to meet us */ 195 } 196 197 /* dequeue from channels, find selected one */ 198 a = nil; 199 for(xa=alts; xa->op!=CHANEND; xa++){ 200 if(xa->op==CHANNOP) 201 continue; 202 if(xa->c == c){ 203 a = xa; 204 a->err = nil; 205 if(r == Closed) 206 a->err = errcl; 207 } 208 dequeue(xa); 209 } 210 unlock(&chanlock); 211 _procsplx(s); 212 if(a == nil){ /* we were interrupted */ 213 assert(c==(Channel*)~0); 214 return -1; 215 } 216 }else 217 altexec(a, s); /* unlocks chanlock, does splx */ 218 _sched(); 219 t->chan = Channone; 220 return a - alts; 221 } 222 223 int 224 chanclose(Channel *c) 225 { 226 Alt *a; 227 int i, s; 228 229 s = _procsplhi(); /* note handlers; see :/^alt */ 230 lock(&chanlock); 231 if(c->closed){ 232 /* Already close; we fail but it's ok. don't print */ 233 unlock(&chanlock); 234 _procsplx(s); 235 return -1; 236 } 237 c->closed = 1; /* Being closed */ 238 /* 239 * Locate entries that will fail due to close 240 * (send, and receive if nothing buffered) and wake them up. 241 * the situation cannot change because all queries 242 * should be committed by now and new ones will find the channel 243 * closed. We still need to take the lock during the iteration 244 * because we can wake threads on qentrys we have not seen yet 245 * as in alt and there would be a race in the access to *a. 246 */ 247 for(i = 0; i < c->nentry; i++){ 248 if((a = c->qentry[i]) == nil || *a->tag != nil) 249 continue; 250 251 if(a->op != CHANSND && (a->op != CHANRCV || c->n != 0)) 252 continue; 253 *a->tag = c; 254 unlock(&chanlock); 255 _procsplx(s); 256 while(_threadrendezvous(a->tag, Closed) == Intred) 257 ; 258 s = _procsplhi(); 259 lock(&chanlock); 260 } 261 262 c->closed = 2; /* Fully closed */ 263 if(c->freed) 264 _chanfree(c); 265 unlock(&chanlock); 266 _procsplx(s); 267 return 0; 268 } 269 270 int 271 chanclosing(Channel *c) 272 { 273 int n, s; 274 275 s = _procsplhi(); /* note handlers; see :/^alt */ 276 lock(&chanlock); 277 if(c->closed == 0) 278 n = -1; 279 else 280 n = c->n; 281 unlock(&chanlock); 282 _procsplx(s); 283 return n; 284 } 285 286 /* 287 * superseded by chanclosing 288 int 289 chanisclosed(Channel *c) 290 { 291 return chanisclosing(c) >= 0; 292 } 293 */ 294 295 static int 296 runop(int op, Channel *c, void *v, int nb) 297 { 298 int r; 299 Alt a[2]; 300 301 /* 302 * we could do this without calling alt, 303 * but the only reason would be performance, 304 * and i'm not convinced it matters. 305 */ 306 a[0].op = op; 307 a[0].c = c; 308 a[0].v = v; 309 a[0].err = nil; 310 a[1].op = CHANEND; 311 if(nb) 312 a[1].op = CHANNOBLK; 313 switch(r=alt(a)){ 314 case -1: /* interrupted */ 315 return -1; 316 case 1: /* nonblocking, didn't accomplish anything */ 317 assert(nb); 318 return 0; 319 case 0: 320 /* 321 * Okay, but return -1 if the op is done because of a close. 322 */ 323 if(a[0].err != nil) 324 return -1; 325 return 1; 326 default: 327 fprint(2, "ERROR: channel alt returned %d\n", r); 328 abort(); 329 return -1; 330 } 331 } 332 333 int 334 recv(Channel *c, void *v) 335 { 336 return runop(CHANRCV, c, v, 0); 337 } 338 339 int 340 nbrecv(Channel *c, void *v) 341 { 342 return runop(CHANRCV, c, v, 1); 343 } 344 345 int 346 send(Channel *c, void *v) 347 { 348 return runop(CHANSND, c, v, 0); 349 } 350 351 int 352 nbsend(Channel *c, void *v) 353 { 354 return runop(CHANSND, c, v, 1); 355 } 356 357 static void 358 channelsize(Channel *c, int sz) 359 { 360 if(c->e != sz){ 361 fprint(2, "expected channel with elements of size %d, got size %d\n", 362 sz, c->e); 363 abort(); 364 } 365 } 366 367 int 368 sendul(Channel *c, ulong v) 369 { 370 channelsize(c, sizeof(ulong)); 371 return send(c, &v); 372 } 373 374 ulong 375 recvul(Channel *c) 376 { 377 ulong v; 378 379 channelsize(c, sizeof(ulong)); 380 if(recv(c, &v) < 0) 381 return ~0; 382 return v; 383 } 384 385 int 386 sendp(Channel *c, void *v) 387 { 388 channelsize(c, sizeof(void*)); 389 return send(c, &v); 390 } 391 392 void* 393 recvp(Channel *c) 394 { 395 void *v; 396 397 channelsize(c, sizeof(void*)); 398 if(recv(c, &v) < 0) 399 return nil; 400 return v; 401 } 402 403 int 404 nbsendul(Channel *c, ulong v) 405 { 406 channelsize(c, sizeof(ulong)); 407 return nbsend(c, &v); 408 } 409 410 ulong 411 nbrecvul(Channel *c) 412 { 413 ulong v; 414 415 channelsize(c, sizeof(ulong)); 416 if(nbrecv(c, &v) == 0) 417 return 0; 418 return v; 419 } 420 421 int 422 nbsendp(Channel *c, void *v) 423 { 424 channelsize(c, sizeof(void*)); 425 return nbsend(c, &v); 426 } 427 428 void* 429 nbrecvp(Channel *c) 430 { 431 void *v; 432 433 channelsize(c, sizeof(void*)); 434 if(nbrecv(c, &v) == 0) 435 return nil; 436 return v; 437 } 438 439 static int 440 emptyentry(Channel *c) 441 { 442 int i, extra; 443 444 assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry)); 445 446 for(i=0; i<c->nentry; i++) 447 if(c->qentry[i]==nil) 448 return i; 449 450 extra = 16; 451 c->nentry += extra; 452 c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0])); 453 if(c->qentry == nil) 454 sysfatal("realloc channel entries: %r"); 455 memset(&c->qentry[i], 0, extra*sizeof(c->qentry[0])); 456 return i; 457 } 458 459 static void 460 enqueue(Alt *a, Channel **c) 461 { 462 int i; 463 464 _threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c); 465 a->tag = c; 466 i = emptyentry(a->c); 467 a->c->qentry[i] = a; 468 } 469 470 static void 471 dequeue(Alt *a) 472 { 473 int i; 474 Channel *c; 475 476 c = a->c; 477 for(i=0; i<c->nentry; i++) 478 if(c->qentry[i]==a){ 479 _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c); 480 c->qentry[i] = nil; 481 /* release if freed and not closing */ 482 if(c->freed && c->closed != 1) 483 _chanfree(c); 484 return; 485 } 486 } 487 488 static int 489 canexec(Alt *a) 490 { 491 int i, otherop; 492 Channel *c; 493 494 c = a->c; 495 /* are there senders or receivers blocked? */ 496 otherop = (CHANSND+CHANRCV) - a->op; 497 for(i=0; i<c->nentry; i++) 498 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){ 499 _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c); 500 return 1; 501 } 502 503 /* is there room in the channel? */ 504 if((a->op==CHANSND && c->n < c->s) 505 || (a->op==CHANRCV && c->n > 0)){ 506 _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c); 507 return 1; 508 } 509 510 return 0; 511 } 512 513 static void* 514 altexecbuffered(Alt *a, int willreplace) 515 { 516 uchar *v; 517 Channel *c; 518 519 c = a->c; 520 /* use buffered channel queue */ 521 if(a->op==CHANRCV && c->n > 0){ 522 _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c); 523 v = c->v + c->e*(c->f%c->s); 524 if(!willreplace) 525 c->n--; 526 c->f++; 527 return v; 528 } 529 if(a->op==CHANSND && c->n < c->s){ 530 _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c); 531 v = c->v + c->e*((c->f+c->n)%c->s); 532 if(!willreplace) 533 c->n++; 534 return v; 535 } 536 abort(); 537 return nil; 538 } 539 540 static void 541 altcopy(void *dst, void *src, int sz) 542 { 543 if(dst){ 544 if(src) 545 memmove(dst, src, sz); 546 else 547 memset(dst, 0, sz); 548 } 549 } 550 551 static int 552 altexec(Alt *a, int spl) 553 { 554 volatile Alt *b; 555 int i, n, otherop; 556 Channel *c; 557 void *me, *waiter, *buf; 558 559 c = a->c; 560 561 /* rendezvous with others */ 562 otherop = (CHANSND+CHANRCV) - a->op; 563 n = 0; 564 b = nil; 565 me = a->v; 566 for(i=0; i<c->nentry; i++) 567 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil) 568 if(nrand(++n) == 0) 569 b = c->qentry[i]; 570 if(b != nil){ 571 _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b); 572 waiter = b->v; 573 if(c->s && c->n){ 574 /* 575 * if buffer is full and there are waiters 576 * and we're meeting a waiter, 577 * we must be receiving. 578 * 579 * we use the value in the channel buffer, 580 * copy the waiter's value into the channel buffer 581 * on behalf of the waiter, and then wake the waiter. 582 */ 583 if(a->op!=CHANRCV) 584 abort(); 585 buf = altexecbuffered(a, 1); 586 altcopy(me, buf, c->e); 587 altcopy(buf, waiter, c->e); 588 }else{ 589 if(a->op==CHANRCV) 590 altcopy(me, waiter, c->e); 591 else 592 altcopy(waiter, me, c->e); 593 } 594 *b->tag = c; /* commits us to rendezvous */ 595 _threaddebug(DBGCHAN, "unlocking the chanlock"); 596 unlock(&chanlock); 597 _procsplx(spl); 598 _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock); 599 while(_threadrendezvous(b->tag, 0) == Intred) 600 ; 601 return 1; 602 } 603 604 buf = altexecbuffered(a, 0); 605 if(a->op==CHANRCV) 606 altcopy(me, buf, c->e); 607 else 608 altcopy(buf, me, c->e); 609 610 unlock(&chanlock); 611 _procsplx(spl); 612 return 1; 613 } 614