1 #include <u.h> 2 #include <libc.h> 3 #include <thread.h> 4 #include "threadimpl.h" 5 6 static Lock chanlock; /* central channel access lock */ 7 8 static void enqueue(Alt*, Channel**); 9 static void dequeue(Alt*); 10 static int canexec(Alt*); 11 static int altexec(Alt*, int); 12 13 void 14 chanfree(Channel *c) 15 { 16 int i, inuse; 17 18 lock(&chanlock); 19 inuse = 0; 20 for(i = 0; i < c->nentry; i++) 21 if(c->qentry[i]) 22 inuse = 1; 23 if(inuse) 24 c->freed = 1; 25 else{ 26 if(c->qentry) 27 free((void*)c->qentry); 28 free(c); 29 } 30 unlock(&chanlock); 31 } 32 33 int 34 chaninit(Channel *c, int elemsize, int elemcnt) 35 { 36 if(elemcnt < 0 || elemsize <= 0 || c == nil) 37 return -1; 38 c->f = 0; 39 c->n = 0; 40 c->freed = 0; 41 c->e = elemsize; 42 c->s = elemcnt; 43 _threaddebug(DBGCHAN, "chaninit %p", c); 44 return 1; 45 } 46 47 Channel* 48 chancreate(int elemsize, int elemcnt) 49 { 50 Channel *c; 51 52 if(elemcnt < 0 || elemsize <= 0) 53 return nil; 54 c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1); 55 c->e = elemsize; 56 c->s = elemcnt; 57 _threaddebug(DBGCHAN, "chancreate %p", c); 58 return c; 59 } 60 61 int 62 alt(Alt *alts) 63 { 64 Alt *a, *xa; 65 Channel *c; 66 int n, s; 67 ulong r; 68 Thread *t; 69 70 /* 71 * The point of going splhi here is that note handlers 72 * might reasonably want to use channel operations, 73 * but that will hang if the note comes while we hold the 74 * chanlock. Instead, we delay the note until we've dropped 75 * the lock. 76 */ 77 t = _threadgetproc()->thread; 78 if(t->moribund || _threadexitsallstatus) 79 yield(); /* won't return */ 80 s = _procsplhi(); 81 lock(&chanlock); 82 t->alt = alts; 83 t->chan = Chanalt; 84 85 /* test whether any channels can proceed */ 86 n = 0; 87 a = nil; 88 89 for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){ 90 xa->entryno = -1; 91 if(xa->op == CHANNOP) 92 continue; 93 94 c = xa->c; 95 if(c==nil) 96 sysfatal("alt: nil channel in entry %ld", xa-alts); 97 if(canexec(xa)) 98 if(nrand(++n) == 0) 99 a = xa; 100 } 101 102 if(a==nil){ 103 /* nothing can proceed */ 104 if(xa->op == CHANNOBLK){ 105 unlock(&chanlock); 106 _procsplx(s); 107 t->chan = Channone; 108 return xa - alts; 109 } 110 111 /* enqueue on all channels. */ 112 c = nil; 113 for(xa=alts; xa->op!=CHANEND; xa++){ 114 if(xa->op==CHANNOP) 115 continue; 116 enqueue(xa, &c); 117 } 118 119 /* 120 * wait for successful rendezvous. 121 * we can't just give up if the rendezvous 122 * is interrupted -- someone else might come 123 * along and try to rendezvous with us, so 124 * we need to be here. 125 */ 126 Again: 127 unlock(&chanlock); 128 _procsplx(s); 129 r = _threadrendezvous((ulong)&c, 0); 130 s = _procsplhi(); 131 lock(&chanlock); 132 133 if(r==~0){ /* interrupted */ 134 if(c!=nil) /* someone will meet us; go back */ 135 goto Again; 136 c = (Channel*)~0; /* so no one tries to meet us */ 137 } 138 139 /* dequeue from channels, find selected one */ 140 a = nil; 141 for(xa=alts; xa->op!=CHANEND; xa++){ 142 if(xa->op==CHANNOP) 143 continue; 144 if(xa->c == c) 145 a = xa; 146 dequeue(xa); 147 } 148 unlock(&chanlock); 149 _procsplx(s); 150 if(a == nil){ /* we were interrupted */ 151 assert(c==(Channel*)~0); 152 return -1; 153 } 154 if(c->freed) 155 chanfree(c); 156 }else{ 157 altexec(a, s); /* unlocks chanlock, does splx */ 158 } 159 _sched(); 160 t->chan = Channone; 161 return a - alts; 162 } 163 164 static int 165 runop(int op, Channel *c, void *v, int nb) 166 { 167 int r; 168 Alt a[2]; 169 170 /* 171 * we could do this without calling alt, 172 * but the only reason would be performance, 173 * and i'm not convinced it matters. 174 */ 175 a[0].op = op; 176 a[0].c = c; 177 a[0].v = v; 178 a[1].op = CHANEND; 179 if(nb) 180 a[1].op = CHANNOBLK; 181 switch(r=alt(a)){ 182 case -1: /* interrupted */ 183 return -1; 184 case 1: /* nonblocking, didn't accomplish anything */ 185 assert(nb); 186 return 0; 187 case 0: 188 return 1; 189 default: 190 fprint(2, "ERROR: channel alt returned %d\n", r); 191 abort(); 192 return -1; 193 } 194 } 195 196 int 197 recv(Channel *c, void *v) 198 { 199 return runop(CHANRCV, c, v, 0); 200 } 201 202 int 203 nbrecv(Channel *c, void *v) 204 { 205 return runop(CHANRCV, c, v, 1); 206 } 207 208 int 209 send(Channel *c, void *v) 210 { 211 return runop(CHANSND, c, v, 0); 212 } 213 214 int 215 nbsend(Channel *c, void *v) 216 { 217 return runop(CHANSND, c, v, 1); 218 } 219 220 static void 221 channelsize(Channel *c, int sz) 222 { 223 if(c->e != sz){ 224 fprint(2, "expected channel with elements of size %d, got size %d", 225 sz, c->e); 226 abort(); 227 } 228 } 229 230 int 231 sendul(Channel *c, ulong v) 232 { 233 channelsize(c, sizeof(ulong)); 234 return send(c, &v); 235 } 236 237 ulong 238 recvul(Channel *c) 239 { 240 ulong v; 241 242 channelsize(c, sizeof(ulong)); 243 if(recv(c, &v) < 0) 244 return ~0; 245 return v; 246 } 247 248 int 249 sendp(Channel *c, void *v) 250 { 251 channelsize(c, sizeof(void*)); 252 return send(c, &v); 253 } 254 255 void* 256 recvp(Channel *c) 257 { 258 void *v; 259 260 channelsize(c, sizeof(void*)); 261 if(recv(c, &v) < 0) 262 return nil; 263 return v; 264 } 265 266 int 267 nbsendul(Channel *c, ulong v) 268 { 269 channelsize(c, sizeof(ulong)); 270 return nbsend(c, &v); 271 } 272 273 ulong 274 nbrecvul(Channel *c) 275 { 276 ulong v; 277 278 channelsize(c, sizeof(ulong)); 279 if(nbrecv(c, &v) == 0) 280 return 0; 281 return v; 282 } 283 284 int 285 nbsendp(Channel *c, void *v) 286 { 287 channelsize(c, sizeof(void*)); 288 return nbsend(c, &v); 289 } 290 291 void* 292 nbrecvp(Channel *c) 293 { 294 void *v; 295 296 channelsize(c, sizeof(void*)); 297 if(nbrecv(c, &v) == 0) 298 return nil; 299 return v; 300 } 301 302 static int 303 emptyentry(Channel *c) 304 { 305 int i, extra; 306 307 assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry)); 308 309 for(i=0; i<c->nentry; i++) 310 if(c->qentry[i]==nil) 311 return i; 312 313 if(i==0) 314 extra=1; 315 else{ 316 extra=i; 317 if(extra > 16) 318 extra = 16; 319 } 320 c->nentry += extra; 321 c->qentry = (volatile void*)realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0])); 322 if(c->qentry == nil) 323 sysfatal("realloc channel entries: %r"); 324 memset((void*)&c->qentry[i], 0, extra*sizeof(c->qentry[0])); 325 return i; 326 } 327 328 static void 329 enqueue(Alt *a, Channel **c) 330 { 331 int i; 332 333 _threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c); 334 a->tag = c; 335 i = emptyentry(a->c); 336 a->c->qentry[i] = a; 337 } 338 339 static void 340 dequeue(Alt *a) 341 { 342 int i; 343 Channel *c; 344 345 c = a->c; 346 for(i=0; i<c->nentry; i++) 347 if(c->qentry[i]==a){ 348 _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c); 349 c->qentry[i] = nil; 350 return; 351 } 352 } 353 354 static int 355 canexec(Alt *a) 356 { 357 int i, otherop; 358 Channel *c; 359 360 c = a->c; 361 /* are there senders or receivers blocked? */ 362 otherop = (CHANSND+CHANRCV) - a->op; 363 for(i=0; i<c->nentry; i++) 364 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){ 365 _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c); 366 return 1; 367 } 368 369 /* is there room in the channel? */ 370 if((a->op==CHANSND && c->n < c->s) 371 || (a->op==CHANRCV && c->n > 0)){ 372 _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c); 373 return 1; 374 } 375 376 return 0; 377 } 378 379 static void* 380 altexecbuffered(Alt *a, int willreplace) 381 { 382 uchar *v; 383 Channel *c; 384 385 c = a->c; 386 /* use buffered channel queue */ 387 if(a->op==CHANRCV && c->n > 0){ 388 _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c); 389 v = c->v + c->e*(c->f%c->s); 390 if(!willreplace) 391 c->n--; 392 c->f++; 393 return v; 394 } 395 if(a->op==CHANSND && c->n < c->s){ 396 _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c); 397 v = c->v + c->e*((c->f+c->n)%c->s); 398 if(!willreplace) 399 c->n++; 400 return v; 401 } 402 abort(); 403 return nil; 404 } 405 406 static void 407 altcopy(void *dst, void *src, int sz) 408 { 409 if(dst){ 410 if(src) 411 memmove(dst, src, sz); 412 else 413 memset(dst, 0, sz); 414 } 415 } 416 417 static int 418 altexec(Alt *a, int spl) 419 { 420 volatile Alt *b; 421 int i, n, otherop; 422 Channel *c; 423 void *me, *waiter, *buf; 424 425 c = a->c; 426 427 /* rendezvous with others */ 428 otherop = (CHANSND+CHANRCV) - a->op; 429 n = 0; 430 b = nil; 431 me = a->v; 432 for(i=0; i<c->nentry; i++) 433 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil) 434 if(nrand(++n) == 0) 435 b = c->qentry[i]; 436 if(b != nil){ 437 _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b); 438 waiter = b->v; 439 if(c->s && c->n){ 440 /* 441 * if buffer is full and there are waiters 442 * and we're meeting a waiter, 443 * we must be receiving. 444 * 445 * we use the value in the channel buffer, 446 * copy the waiter's value into the channel buffer 447 * on behalf of the waiter, and then wake the waiter. 448 */ 449 if(a->op!=CHANRCV) 450 abort(); 451 buf = altexecbuffered(a, 1); 452 altcopy(me, buf, c->e); 453 altcopy(buf, waiter, c->e); 454 }else{ 455 if(a->op==CHANRCV) 456 altcopy(me, waiter, c->e); 457 else 458 altcopy(waiter, me, c->e); 459 } 460 *b->tag = c; /* commits us to rendezvous */ 461 _threaddebug(DBGCHAN, "unlocking the chanlock"); 462 unlock(&chanlock); 463 _procsplx(spl); 464 _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock); 465 while(_threadrendezvous((ulong)b->tag, 0) == ~0) 466 ; 467 return 1; 468 } 469 470 buf = altexecbuffered(a, 0); 471 if(a->op==CHANRCV) 472 altcopy(me, buf, c->e); 473 else 474 altcopy(buf, me, c->e); 475 476 unlock(&chanlock); 477 _procsplx(spl); 478 return 1; 479 } 480