1 #include <u.h> 2 #include <libc.h> 3 #include <thread.h> 4 #include "threadimpl.h" 5 6 static Lock chanlock; /* central channel access lock */ 7 8 static void enqueue(Alt*, Channel**); 9 static void dequeue(Alt*); 10 static int canexec(Alt*); 11 static int altexec(Alt*, int); 12 13 static void 14 _chanfree(Channel *c) 15 { 16 int i, inuse; 17 18 inuse = 0; 19 for(i = 0; i < c->nentry; i++) 20 if(c->qentry[i]) 21 inuse = 1; 22 if(inuse) 23 c->freed = 1; 24 else{ 25 if(c->qentry) 26 free(c->qentry); 27 free(c); 28 } 29 } 30 31 void 32 chanfree(Channel *c) 33 { 34 lock(&chanlock); 35 _chanfree(c); 36 unlock(&chanlock); 37 } 38 39 int 40 chaninit(Channel *c, int elemsize, int elemcnt) 41 { 42 if(elemcnt < 0 || elemsize <= 0 || c == nil) 43 return -1; 44 c->f = 0; 45 c->n = 0; 46 c->freed = 0; 47 c->e = elemsize; 48 c->s = elemcnt; 49 _threaddebug(DBGCHAN, "chaninit %p", c); 50 return 1; 51 } 52 53 Channel* 54 chancreate(int elemsize, int elemcnt) 55 { 56 Channel *c; 57 58 if(elemcnt < 0 || elemsize <= 0) 59 return nil; 60 c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1); 61 c->e = elemsize; 62 c->s = elemcnt; 63 _threaddebug(DBGCHAN, "chancreate %p", c); 64 return c; 65 } 66 67 int 68 alt(Alt *alts) 69 { 70 Alt *a, *xa; 71 Channel volatile *c; 72 int n, s; 73 void* r; 74 Thread *t; 75 76 /* 77 * The point of going splhi here is that note handlers 78 * might reasonably want to use channel operations, 79 * but that will hang if the note comes while we hold the 80 * chanlock. Instead, we delay the note until we've dropped 81 * the lock. 82 */ 83 t = _threadgetproc()->thread; 84 if(t->moribund || _threadexitsallstatus) 85 yield(); /* won't return */ 86 s = _procsplhi(); 87 lock(&chanlock); 88 t->alt = alts; 89 t->chan = Chanalt; 90 91 /* test whether any channels can proceed */ 92 n = 0; 93 a = nil; 94 95 for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){ 96 xa->entryno = -1; 97 if(xa->op == CHANNOP) 98 continue; 99 100 c = xa->c; 101 if(c==nil){ 102 unlock(&chanlock); 103 _procsplx(s); 104 t->chan = Channone; 105 return -1; 106 } 107 108 if(canexec(xa)) 109 if(nrand(++n) == 0) 110 a = xa; 111 } 112 113 if(a==nil){ 114 /* nothing can proceed */ 115 if(xa->op == CHANNOBLK){ 116 unlock(&chanlock); 117 _procsplx(s); 118 t->chan = Channone; 119 return xa - alts; 120 } 121 122 /* enqueue on all channels. */ 123 c = nil; 124 for(xa=alts; xa->op!=CHANEND; xa++){ 125 if(xa->op==CHANNOP) 126 continue; 127 enqueue(xa, &c); 128 } 129 130 /* 131 * wait for successful rendezvous. 132 * we can't just give up if the rendezvous 133 * is interrupted -- someone else might come 134 * along and try to rendezvous with us, so 135 * we need to be here. 136 */ 137 Again: 138 unlock(&chanlock); 139 _procsplx(s); 140 r = _threadrendezvous(&c, 0); 141 s = _procsplhi(); 142 lock(&chanlock); 143 144 if(r==(void*)~0){ /* interrupted */ 145 if(c!=nil) /* someone will meet us; go back */ 146 goto Again; 147 c = (Channel*)~0; /* so no one tries to meet us */ 148 } 149 150 /* dequeue from channels, find selected one */ 151 a = nil; 152 for(xa=alts; xa->op!=CHANEND; xa++){ 153 if(xa->op==CHANNOP) 154 continue; 155 if(xa->c == c) 156 a = xa; 157 dequeue(xa); 158 } 159 unlock(&chanlock); 160 _procsplx(s); 161 if(a == nil){ /* we were interrupted */ 162 assert(c==(Channel*)~0); 163 return -1; 164 } 165 }else{ 166 altexec(a, s); /* unlocks chanlock, does splx */ 167 } 168 _sched(); 169 t->chan = Channone; 170 return a - alts; 171 } 172 173 static int 174 runop(int op, Channel *c, void *v, int nb) 175 { 176 int r; 177 Alt a[2]; 178 179 /* 180 * we could do this without calling alt, 181 * but the only reason would be performance, 182 * and i'm not convinced it matters. 183 */ 184 a[0].op = op; 185 a[0].c = c; 186 a[0].v = v; 187 a[1].op = CHANEND; 188 if(nb) 189 a[1].op = CHANNOBLK; 190 switch(r=alt(a)){ 191 case -1: /* interrupted */ 192 return -1; 193 case 1: /* nonblocking, didn't accomplish anything */ 194 assert(nb); 195 return 0; 196 case 0: 197 return 1; 198 default: 199 fprint(2, "ERROR: channel alt returned %d\n", r); 200 abort(); 201 return -1; 202 } 203 } 204 205 int 206 recv(Channel *c, void *v) 207 { 208 return runop(CHANRCV, c, v, 0); 209 } 210 211 int 212 nbrecv(Channel *c, void *v) 213 { 214 return runop(CHANRCV, c, v, 1); 215 } 216 217 int 218 send(Channel *c, void *v) 219 { 220 return runop(CHANSND, c, v, 0); 221 } 222 223 int 224 nbsend(Channel *c, void *v) 225 { 226 return runop(CHANSND, c, v, 1); 227 } 228 229 static void 230 channelsize(Channel *c, int sz) 231 { 232 if(c->e != sz){ 233 fprint(2, "expected channel with elements of size %d, got size %d\n", 234 sz, c->e); 235 abort(); 236 } 237 } 238 239 int 240 sendul(Channel *c, ulong v) 241 { 242 channelsize(c, sizeof(ulong)); 243 return send(c, &v); 244 } 245 246 ulong 247 recvul(Channel *c) 248 { 249 ulong v; 250 251 channelsize(c, sizeof(ulong)); 252 if(recv(c, &v) < 0) 253 return ~0; 254 return v; 255 } 256 257 int 258 sendp(Channel *c, void *v) 259 { 260 channelsize(c, sizeof(void*)); 261 return send(c, &v); 262 } 263 264 void* 265 recvp(Channel *c) 266 { 267 void *v; 268 269 channelsize(c, sizeof(void*)); 270 if(recv(c, &v) < 0) 271 return nil; 272 return v; 273 } 274 275 int 276 nbsendul(Channel *c, ulong v) 277 { 278 channelsize(c, sizeof(ulong)); 279 return nbsend(c, &v); 280 } 281 282 ulong 283 nbrecvul(Channel *c) 284 { 285 ulong v; 286 287 channelsize(c, sizeof(ulong)); 288 if(nbrecv(c, &v) == 0) 289 return 0; 290 return v; 291 } 292 293 int 294 nbsendp(Channel *c, void *v) 295 { 296 channelsize(c, sizeof(void*)); 297 return nbsend(c, &v); 298 } 299 300 void* 301 nbrecvp(Channel *c) 302 { 303 void *v; 304 305 channelsize(c, sizeof(void*)); 306 if(nbrecv(c, &v) == 0) 307 return nil; 308 return v; 309 } 310 311 static int 312 emptyentry(Channel *c) 313 { 314 int i, extra; 315 316 assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry)); 317 318 for(i=0; i<c->nentry; i++) 319 if(c->qentry[i]==nil) 320 return i; 321 322 extra = 16; 323 c->nentry += extra; 324 c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0])); 325 if(c->qentry == nil) 326 sysfatal("realloc channel entries: %r"); 327 memset(&c->qentry[i], 0, extra*sizeof(c->qentry[0])); 328 return i; 329 } 330 331 static void 332 enqueue(Alt *a, Channel **c) 333 { 334 int i; 335 336 _threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c); 337 a->tag = c; 338 i = emptyentry(a->c); 339 a->c->qentry[i] = a; 340 } 341 342 static void 343 dequeue(Alt *a) 344 { 345 int i; 346 Channel *c; 347 348 c = a->c; 349 for(i=0; i<c->nentry; i++) 350 if(c->qentry[i]==a){ 351 _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c); 352 c->qentry[i] = nil; 353 if(c->freed) 354 _chanfree(c); 355 return; 356 } 357 } 358 359 static int 360 canexec(Alt *a) 361 { 362 int i, otherop; 363 Channel *c; 364 365 c = a->c; 366 /* are there senders or receivers blocked? */ 367 otherop = (CHANSND+CHANRCV) - a->op; 368 for(i=0; i<c->nentry; i++) 369 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){ 370 _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c); 371 return 1; 372 } 373 374 /* is there room in the channel? */ 375 if((a->op==CHANSND && c->n < c->s) 376 || (a->op==CHANRCV && c->n > 0)){ 377 _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c); 378 return 1; 379 } 380 381 return 0; 382 } 383 384 static void* 385 altexecbuffered(Alt *a, int willreplace) 386 { 387 uchar *v; 388 Channel *c; 389 390 c = a->c; 391 /* use buffered channel queue */ 392 if(a->op==CHANRCV && c->n > 0){ 393 _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c); 394 v = c->v + c->e*(c->f%c->s); 395 if(!willreplace) 396 c->n--; 397 c->f++; 398 return v; 399 } 400 if(a->op==CHANSND && c->n < c->s){ 401 _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c); 402 v = c->v + c->e*((c->f+c->n)%c->s); 403 if(!willreplace) 404 c->n++; 405 return v; 406 } 407 abort(); 408 return nil; 409 } 410 411 static void 412 altcopy(void *dst, void *src, int sz) 413 { 414 if(dst){ 415 if(src) 416 memmove(dst, src, sz); 417 else 418 memset(dst, 0, sz); 419 } 420 } 421 422 static int 423 altexec(Alt *a, int spl) 424 { 425 volatile Alt *b; 426 int i, n, otherop; 427 Channel *c; 428 void *me, *waiter, *buf; 429 430 c = a->c; 431 432 /* rendezvous with others */ 433 otherop = (CHANSND+CHANRCV) - a->op; 434 n = 0; 435 b = nil; 436 me = a->v; 437 for(i=0; i<c->nentry; i++) 438 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil) 439 if(nrand(++n) == 0) 440 b = c->qentry[i]; 441 if(b != nil){ 442 _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b); 443 waiter = b->v; 444 if(c->s && c->n){ 445 /* 446 * if buffer is full and there are waiters 447 * and we're meeting a waiter, 448 * we must be receiving. 449 * 450 * we use the value in the channel buffer, 451 * copy the waiter's value into the channel buffer 452 * on behalf of the waiter, and then wake the waiter. 453 */ 454 if(a->op!=CHANRCV) 455 abort(); 456 buf = altexecbuffered(a, 1); 457 altcopy(me, buf, c->e); 458 altcopy(buf, waiter, c->e); 459 }else{ 460 if(a->op==CHANRCV) 461 altcopy(me, waiter, c->e); 462 else 463 altcopy(waiter, me, c->e); 464 } 465 *b->tag = c; /* commits us to rendezvous */ 466 _threaddebug(DBGCHAN, "unlocking the chanlock"); 467 unlock(&chanlock); 468 _procsplx(spl); 469 _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock); 470 while(_threadrendezvous(b->tag, 0) == (void*)~0) 471 ; 472 return 1; 473 } 474 475 buf = altexecbuffered(a, 0); 476 if(a->op==CHANRCV) 477 altcopy(me, buf, c->e); 478 else 479 altcopy(buf, me, c->e); 480 481 unlock(&chanlock); 482 _procsplx(spl); 483 return 1; 484 } 485