1 #include <u.h>
2 #include <libc.h>
3 #include <thread.h>
4 #include "threadimpl.h"
5
6 /* Value to indicate the channel is closed */
7 enum {
8 CHANCLOSD = 0xc105ed,
9 };
10
11 static char errcl[] = "channel was closed";
12 static Lock chanlock; /* central channel access lock */
13
14 static void enqueue(Alt*, Channel**);
15 static void dequeue(Alt*);
16 static int canexec(Alt*);
17 static int altexec(Alt*, int);
18
19 #define Closed ((void*)CHANCLOSD)
20 #define Intred ((void*)~0) /* interrupted */
21
22 static void
_chanfree(Channel * c)23 _chanfree(Channel *c)
24 {
25 int i, inuse;
26
27 if(c->closed == 1) /* chanclose is ongoing */
28 inuse = 1;
29 else{
30 inuse = 0;
31 for(i = 0; i < c->nentry; i++) /* alt ongoing */
32 if(c->qentry[i])
33 inuse = 1;
34 }
35 if(inuse)
36 c->freed = 1;
37 else{
38 if(c->qentry)
39 free(c->qentry);
40 free(c);
41 }
42 }
43
44 void
chanfree(Channel * c)45 chanfree(Channel *c)
46 {
47 lock(&chanlock);
48 _chanfree(c);
49 unlock(&chanlock);
50 }
51
52 int
chaninit(Channel * c,int elemsize,int elemcnt)53 chaninit(Channel *c, int elemsize, int elemcnt)
54 {
55 if(elemcnt < 0 || elemsize <= 0 || c == nil)
56 return -1;
57 c->f = 0;
58 c->n = 0;
59 c->closed = 0;
60 c->freed = 0;
61 c->e = elemsize;
62 c->s = elemcnt;
63 _threaddebug(DBGCHAN, "chaninit %p", c);
64 return 1;
65 }
66
67 Channel*
chancreate(int elemsize,int elemcnt)68 chancreate(int elemsize, int elemcnt)
69 {
70 Channel *c;
71
72 if(elemcnt < 0 || elemsize <= 0)
73 return nil;
74 c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
75 c->e = elemsize;
76 c->s = elemcnt;
77 _threaddebug(DBGCHAN, "chancreate %p", c);
78 return c;
79 }
80
81 static int
isopenfor(Channel * c,int op)82 isopenfor(Channel *c, int op)
83 {
84 return c->closed == 0 || (op == CHANRCV && c->n > 0);
85 }
86
87 int
alt(Alt * alts)88 alt(Alt *alts)
89 {
90 Alt *a, *xa, *ca;
91 Channel volatile *c;
92 int n, s, waiting, allreadycl;
93 void* r;
94 Thread *t;
95
96 /*
97 * The point of going splhi here is that note handlers
98 * might reasonably want to use channel operations,
99 * but that will hang if the note comes while we hold the
100 * chanlock. Instead, we delay the note until we've dropped
101 * the lock.
102 */
103 t = _threadgetproc()->thread;
104 if(t->moribund || _threadexitsallstatus)
105 yield(); /* won't return */
106 s = _procsplhi();
107 lock(&chanlock);
108 t->alt = alts;
109 t->chan = Chanalt;
110
111 /* test whether any channels can proceed */
112 n = 0;
113 a = nil;
114
115 for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
116 xa->entryno = -1;
117 if(xa->op == CHANNOP)
118 continue;
119
120 c = xa->c;
121 if(c==nil){
122 unlock(&chanlock);
123 _procsplx(s);
124 t->chan = Channone;
125 return -1;
126 }
127
128 if(isopenfor(c, xa->op) && canexec(xa))
129 if(nrand(++n) == 0)
130 a = xa;
131 }
132
133
134 if(a==nil){
135 /* nothing can proceed */
136 if(xa->op == CHANNOBLK){
137 unlock(&chanlock);
138 _procsplx(s);
139 t->chan = Channone;
140 if(xa->op == CHANNOBLK)
141 return xa - alts;
142 }
143
144 /* enqueue on all channels open for us. */
145 c = nil;
146 ca = nil;
147 waiting = 0;
148 allreadycl = 0;
149 for(xa=alts; xa->op!=CHANEND; xa++)
150 if(xa->op==CHANNOP)
151 continue;
152 else if(isopenfor(xa->c, xa->op)){
153 waiting = 1;
154 enqueue(xa, &c);
155 } else if(xa->err != errcl)
156 ca = xa;
157 else
158 allreadycl = 1;
159
160 if(waiting == 0)
161 if(ca != nil){
162 /* everything was closed, select last channel */
163 ca->err = errcl;
164 unlock(&chanlock);
165 _procsplx(s);
166 t->chan = Channone;
167 return ca - alts;
168 } else if(allreadycl){
169 /* everything was already closed */
170 unlock(&chanlock);
171 _procsplx(s);
172 t->chan = Channone;
173 return -1;
174 }
175 /*
176 * wait for successful rendezvous.
177 * we can't just give up if the rendezvous
178 * is interrupted -- someone else might come
179 * along and try to rendezvous with us, so
180 * we need to be here.
181 * if the channel was closed, the op is done
182 * and we flag an error for the entry.
183 */
184 Again:
185 unlock(&chanlock);
186 _procsplx(s);
187 r = _threadrendezvous(&c, 0);
188 s = _procsplhi();
189 lock(&chanlock);
190
191 if(r==Intred){ /* interrupted */
192 if(c!=nil) /* someone will meet us; go back */
193 goto Again;
194 c = (Channel*)~0; /* so no one tries to meet us */
195 }
196
197 /* dequeue from channels, find selected one */
198 a = nil;
199 for(xa=alts; xa->op!=CHANEND; xa++){
200 if(xa->op==CHANNOP)
201 continue;
202 if(xa->c == c){
203 a = xa;
204 a->err = nil;
205 if(r == Closed)
206 a->err = errcl;
207 }
208 dequeue(xa);
209 }
210 unlock(&chanlock);
211 _procsplx(s);
212 if(a == nil){ /* we were interrupted */
213 assert(c==(Channel*)~0);
214 return -1;
215 }
216 }else
217 altexec(a, s); /* unlocks chanlock, does splx */
218 _sched();
219 t->chan = Channone;
220 return a - alts;
221 }
222
223 int
chanclose(Channel * c)224 chanclose(Channel *c)
225 {
226 Alt *a;
227 int i, s;
228
229 s = _procsplhi(); /* note handlers; see :/^alt */
230 lock(&chanlock);
231 if(c->closed){
232 /* Already close; we fail but it's ok. don't print */
233 unlock(&chanlock);
234 _procsplx(s);
235 return -1;
236 }
237 c->closed = 1; /* Being closed */
238 /*
239 * Locate entries that will fail due to close
240 * (send, and receive if nothing buffered) and wake them up.
241 * the situation cannot change because all queries
242 * should be committed by now and new ones will find the channel
243 * closed. We still need to take the lock during the iteration
244 * because we can wake threads on qentrys we have not seen yet
245 * as in alt and there would be a race in the access to *a.
246 */
247 for(i = 0; i < c->nentry; i++){
248 if((a = c->qentry[i]) == nil || *a->tag != nil)
249 continue;
250
251 if(a->op != CHANSND && (a->op != CHANRCV || c->n != 0))
252 continue;
253 *a->tag = c;
254 unlock(&chanlock);
255 _procsplx(s);
256 while(_threadrendezvous(a->tag, Closed) == Intred)
257 ;
258 s = _procsplhi();
259 lock(&chanlock);
260 }
261
262 c->closed = 2; /* Fully closed */
263 if(c->freed)
264 _chanfree(c);
265 unlock(&chanlock);
266 _procsplx(s);
267 return 0;
268 }
269
270 int
chanclosing(Channel * c)271 chanclosing(Channel *c)
272 {
273 int n, s;
274
275 s = _procsplhi(); /* note handlers; see :/^alt */
276 lock(&chanlock);
277 if(c->closed == 0)
278 n = -1;
279 else
280 n = c->n;
281 unlock(&chanlock);
282 _procsplx(s);
283 return n;
284 }
285
286 /*
287 * superseded by chanclosing
288 int
289 chanisclosed(Channel *c)
290 {
291 return chanisclosing(c) >= 0;
292 }
293 */
294
295 static int
runop(int op,Channel * c,void * v,int nb)296 runop(int op, Channel *c, void *v, int nb)
297 {
298 int r;
299 Alt a[2];
300
301 /*
302 * we could do this without calling alt,
303 * but the only reason would be performance,
304 * and i'm not convinced it matters.
305 */
306 a[0].op = op;
307 a[0].c = c;
308 a[0].v = v;
309 a[0].err = nil;
310 a[1].op = CHANEND;
311 if(nb)
312 a[1].op = CHANNOBLK;
313 switch(r=alt(a)){
314 case -1: /* interrupted */
315 return -1;
316 case 1: /* nonblocking, didn't accomplish anything */
317 assert(nb);
318 return 0;
319 case 0:
320 /*
321 * Okay, but return -1 if the op is done because of a close.
322 */
323 if(a[0].err != nil)
324 return -1;
325 return 1;
326 default:
327 fprint(2, "ERROR: channel alt returned %d\n", r);
328 abort();
329 return -1;
330 }
331 }
332
333 int
recv(Channel * c,void * v)334 recv(Channel *c, void *v)
335 {
336 return runop(CHANRCV, c, v, 0);
337 }
338
339 int
nbrecv(Channel * c,void * v)340 nbrecv(Channel *c, void *v)
341 {
342 return runop(CHANRCV, c, v, 1);
343 }
344
345 int
send(Channel * c,void * v)346 send(Channel *c, void *v)
347 {
348 return runop(CHANSND, c, v, 0);
349 }
350
351 int
nbsend(Channel * c,void * v)352 nbsend(Channel *c, void *v)
353 {
354 return runop(CHANSND, c, v, 1);
355 }
356
357 static void
channelsize(Channel * c,int sz)358 channelsize(Channel *c, int sz)
359 {
360 if(c->e != sz){
361 fprint(2, "expected channel with elements of size %d, got size %d\n",
362 sz, c->e);
363 abort();
364 }
365 }
366
367 int
sendul(Channel * c,ulong v)368 sendul(Channel *c, ulong v)
369 {
370 channelsize(c, sizeof(ulong));
371 return send(c, &v);
372 }
373
374 ulong
recvul(Channel * c)375 recvul(Channel *c)
376 {
377 ulong v;
378
379 channelsize(c, sizeof(ulong));
380 if(recv(c, &v) < 0)
381 return ~0;
382 return v;
383 }
384
385 int
sendp(Channel * c,void * v)386 sendp(Channel *c, void *v)
387 {
388 channelsize(c, sizeof(void*));
389 return send(c, &v);
390 }
391
392 void*
recvp(Channel * c)393 recvp(Channel *c)
394 {
395 void *v;
396
397 channelsize(c, sizeof(void*));
398 if(recv(c, &v) < 0)
399 return nil;
400 return v;
401 }
402
403 int
nbsendul(Channel * c,ulong v)404 nbsendul(Channel *c, ulong v)
405 {
406 channelsize(c, sizeof(ulong));
407 return nbsend(c, &v);
408 }
409
410 ulong
nbrecvul(Channel * c)411 nbrecvul(Channel *c)
412 {
413 ulong v;
414
415 channelsize(c, sizeof(ulong));
416 if(nbrecv(c, &v) == 0)
417 return 0;
418 return v;
419 }
420
421 int
nbsendp(Channel * c,void * v)422 nbsendp(Channel *c, void *v)
423 {
424 channelsize(c, sizeof(void*));
425 return nbsend(c, &v);
426 }
427
428 void*
nbrecvp(Channel * c)429 nbrecvp(Channel *c)
430 {
431 void *v;
432
433 channelsize(c, sizeof(void*));
434 if(nbrecv(c, &v) == 0)
435 return nil;
436 return v;
437 }
438
439 static int
emptyentry(Channel * c)440 emptyentry(Channel *c)
441 {
442 int i, extra;
443
444 assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
445
446 for(i=0; i<c->nentry; i++)
447 if(c->qentry[i]==nil)
448 return i;
449
450 extra = 16;
451 c->nentry += extra;
452 c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
453 if(c->qentry == nil)
454 sysfatal("realloc channel entries: %r");
455 memset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
456 return i;
457 }
458
459 static void
enqueue(Alt * a,Channel ** c)460 enqueue(Alt *a, Channel **c)
461 {
462 int i;
463
464 _threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c);
465 a->tag = c;
466 i = emptyentry(a->c);
467 a->c->qentry[i] = a;
468 }
469
470 static void
dequeue(Alt * a)471 dequeue(Alt *a)
472 {
473 int i;
474 Channel *c;
475
476 c = a->c;
477 for(i=0; i<c->nentry; i++)
478 if(c->qentry[i]==a){
479 _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
480 c->qentry[i] = nil;
481 /* release if freed and not closing */
482 if(c->freed && c->closed != 1)
483 _chanfree(c);
484 return;
485 }
486 }
487
488 static int
canexec(Alt * a)489 canexec(Alt *a)
490 {
491 int i, otherop;
492 Channel *c;
493
494 c = a->c;
495 /* are there senders or receivers blocked? */
496 otherop = (CHANSND+CHANRCV) - a->op;
497 for(i=0; i<c->nentry; i++)
498 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
499 _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
500 return 1;
501 }
502
503 /* is there room in the channel? */
504 if((a->op==CHANSND && c->n < c->s)
505 || (a->op==CHANRCV && c->n > 0)){
506 _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
507 return 1;
508 }
509
510 return 0;
511 }
512
513 static void*
altexecbuffered(Alt * a,int willreplace)514 altexecbuffered(Alt *a, int willreplace)
515 {
516 uchar *v;
517 Channel *c;
518
519 c = a->c;
520 /* use buffered channel queue */
521 if(a->op==CHANRCV && c->n > 0){
522 _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
523 v = c->v + c->e*(c->f%c->s);
524 if(!willreplace)
525 c->n--;
526 c->f++;
527 return v;
528 }
529 if(a->op==CHANSND && c->n < c->s){
530 _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
531 v = c->v + c->e*((c->f+c->n)%c->s);
532 if(!willreplace)
533 c->n++;
534 return v;
535 }
536 abort();
537 return nil;
538 }
539
540 static void
altcopy(void * dst,void * src,int sz)541 altcopy(void *dst, void *src, int sz)
542 {
543 if(dst){
544 if(src)
545 memmove(dst, src, sz);
546 else
547 memset(dst, 0, sz);
548 }
549 }
550
551 static int
altexec(Alt * a,int spl)552 altexec(Alt *a, int spl)
553 {
554 volatile Alt *b;
555 int i, n, otherop;
556 Channel *c;
557 void *me, *waiter, *buf;
558
559 c = a->c;
560
561 /* rendezvous with others */
562 otherop = (CHANSND+CHANRCV) - a->op;
563 n = 0;
564 b = nil;
565 me = a->v;
566 for(i=0; i<c->nentry; i++)
567 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
568 if(nrand(++n) == 0)
569 b = c->qentry[i];
570 if(b != nil){
571 _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
572 waiter = b->v;
573 if(c->s && c->n){
574 /*
575 * if buffer is full and there are waiters
576 * and we're meeting a waiter,
577 * we must be receiving.
578 *
579 * we use the value in the channel buffer,
580 * copy the waiter's value into the channel buffer
581 * on behalf of the waiter, and then wake the waiter.
582 */
583 if(a->op!=CHANRCV)
584 abort();
585 buf = altexecbuffered(a, 1);
586 altcopy(me, buf, c->e);
587 altcopy(buf, waiter, c->e);
588 }else{
589 if(a->op==CHANRCV)
590 altcopy(me, waiter, c->e);
591 else
592 altcopy(waiter, me, c->e);
593 }
594 *b->tag = c; /* commits us to rendezvous */
595 _threaddebug(DBGCHAN, "unlocking the chanlock");
596 unlock(&chanlock);
597 _procsplx(spl);
598 _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock);
599 while(_threadrendezvous(b->tag, 0) == Intred)
600 ;
601 return 1;
602 }
603
604 buf = altexecbuffered(a, 0);
605 if(a->op==CHANRCV)
606 altcopy(me, buf, c->e);
607 else
608 altcopy(buf, me, c->e);
609
610 unlock(&chanlock);
611 _procsplx(spl);
612 return 1;
613 }
614