xref: /plan9/sys/src/libthread/channel.c (revision b39189fd423aed869c5cf5189bc504918cff969b)
1 #include <u.h>
2 #include <libc.h>
3 #include <thread.h>
4 #include "threadimpl.h"
5 
6 /* Value to indicate the channel is closed */
7 enum {
8 	CHANCLOSD = 0xc105ed,
9 };
10 
11 static char errcl[] = "channel was closed";
12 static Lock chanlock;		/* central channel access lock */
13 
14 static void enqueue(Alt*, Channel**);
15 static void dequeue(Alt*);
16 static int canexec(Alt*);
17 static int altexec(Alt*, int);
18 
19 #define Closed	((void*)CHANCLOSD)
20 #define Intred	((void*)~0)		/* interrupted */
21 
22 static void
_chanfree(Channel * c)23 _chanfree(Channel *c)
24 {
25 	int i, inuse;
26 
27 	if(c->closed == 1)			/* chanclose is ongoing */
28 		inuse = 1;
29 	else{
30 		inuse = 0;
31 		for(i = 0; i < c->nentry; i++)	/* alt ongoing */
32 			if(c->qentry[i])
33 				inuse = 1;
34 	}
35 	if(inuse)
36 		c->freed = 1;
37 	else{
38 		if(c->qentry)
39 			free(c->qentry);
40 		free(c);
41 	}
42 }
43 
44 void
chanfree(Channel * c)45 chanfree(Channel *c)
46 {
47 	lock(&chanlock);
48 	_chanfree(c);
49 	unlock(&chanlock);
50 }
51 
52 int
chaninit(Channel * c,int elemsize,int elemcnt)53 chaninit(Channel *c, int elemsize, int elemcnt)
54 {
55 	if(elemcnt < 0 || elemsize <= 0 || c == nil)
56 		return -1;
57 	c->f = 0;
58 	c->n = 0;
59 	c->closed = 0;
60 	c->freed = 0;
61 	c->e = elemsize;
62 	c->s = elemcnt;
63 	_threaddebug(DBGCHAN, "chaninit %p", c);
64 	return 1;
65 }
66 
67 Channel*
chancreate(int elemsize,int elemcnt)68 chancreate(int elemsize, int elemcnt)
69 {
70 	Channel *c;
71 
72 	if(elemcnt < 0 || elemsize <= 0)
73 		return nil;
74 	c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
75 	c->e = elemsize;
76 	c->s = elemcnt;
77 	_threaddebug(DBGCHAN, "chancreate %p", c);
78 	return c;
79 }
80 
81 static int
isopenfor(Channel * c,int op)82 isopenfor(Channel *c, int op)
83 {
84 	return c->closed == 0 || (op == CHANRCV && c->n > 0);
85 }
86 
87 int
alt(Alt * alts)88 alt(Alt *alts)
89 {
90 	Alt *a, *xa, *ca;
91 	Channel volatile *c;
92 	int n, s, waiting, allreadycl;
93 	void* r;
94 	Thread *t;
95 
96 	/*
97 	 * The point of going splhi here is that note handlers
98 	 * might reasonably want to use channel operations,
99 	 * but that will hang if the note comes while we hold the
100 	 * chanlock.  Instead, we delay the note until we've dropped
101 	 * the lock.
102 	 */
103 	t = _threadgetproc()->thread;
104 	if(t->moribund || _threadexitsallstatus)
105 		yield();	/* won't return */
106 	s = _procsplhi();
107 	lock(&chanlock);
108 	t->alt = alts;
109 	t->chan = Chanalt;
110 
111 	/* test whether any channels can proceed */
112 	n = 0;
113 	a = nil;
114 
115 	for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
116 		xa->entryno = -1;
117 		if(xa->op == CHANNOP)
118 			continue;
119 
120 		c = xa->c;
121 		if(c==nil){
122 			unlock(&chanlock);
123 			_procsplx(s);
124 			t->chan = Channone;
125 			return -1;
126 		}
127 
128 		if(isopenfor(c, xa->op) && canexec(xa))
129 			if(nrand(++n) == 0)
130 				a = xa;
131 	}
132 
133 
134 	if(a==nil){
135 		/* nothing can proceed */
136 		if(xa->op == CHANNOBLK){
137 			unlock(&chanlock);
138 			_procsplx(s);
139 			t->chan = Channone;
140 			if(xa->op == CHANNOBLK)
141 				return xa - alts;
142 		}
143 
144 		/* enqueue on all channels open for us. */
145 		c = nil;
146 		ca = nil;
147 		waiting = 0;
148 		allreadycl = 0;
149 		for(xa=alts; xa->op!=CHANEND; xa++)
150 			if(xa->op==CHANNOP)
151 				continue;
152 			else if(isopenfor(xa->c, xa->op)){
153 				waiting = 1;
154 				enqueue(xa, &c);
155 			} else if(xa->err != errcl)
156 				ca = xa;
157 			else
158 				allreadycl = 1;
159 
160 		if(waiting == 0)
161 			if(ca != nil){
162 				/* everything was closed, select last channel */
163 				ca->err = errcl;
164 				unlock(&chanlock);
165 				_procsplx(s);
166 				t->chan = Channone;
167 				return ca - alts;
168 			} else if(allreadycl){
169 				/* everything was already closed */
170 				unlock(&chanlock);
171 				_procsplx(s);
172 				t->chan = Channone;
173 				return -1;
174 			}
175 		/*
176 		 * wait for successful rendezvous.
177 		 * we can't just give up if the rendezvous
178 		 * is interrupted -- someone else might come
179 		 * along and try to rendezvous with us, so
180 		 * we need to be here.
181 		 * if the channel was closed, the op is done
182 		 * and we flag an error for the entry.
183 		 */
184 	    Again:
185 		unlock(&chanlock);
186 		_procsplx(s);
187 		r = _threadrendezvous(&c, 0);
188 		s = _procsplhi();
189 		lock(&chanlock);
190 
191 		if(r==Intred){		/* interrupted */
192 			if(c!=nil)	/* someone will meet us; go back */
193 				goto Again;
194 			c = (Channel*)~0;	/* so no one tries to meet us */
195 		}
196 
197 		/* dequeue from channels, find selected one */
198 		a = nil;
199 		for(xa=alts; xa->op!=CHANEND; xa++){
200 			if(xa->op==CHANNOP)
201 				continue;
202 			if(xa->c == c){
203 				a = xa;
204 				a->err = nil;
205 				if(r == Closed)
206 					a->err = errcl;
207 			}
208 			dequeue(xa);
209 		}
210 		unlock(&chanlock);
211 		_procsplx(s);
212 		if(a == nil){	/* we were interrupted */
213 			assert(c==(Channel*)~0);
214 			return -1;
215 		}
216 	}else
217 		altexec(a, s);	/* unlocks chanlock, does splx */
218 	_sched();
219 	t->chan = Channone;
220 	return a - alts;
221 }
222 
223 int
chanclose(Channel * c)224 chanclose(Channel *c)
225 {
226 	Alt *a;
227 	int i, s;
228 
229 	s = _procsplhi();	/* note handlers; see :/^alt */
230 	lock(&chanlock);
231 	if(c->closed){
232 		/* Already close; we fail but it's ok. don't print */
233 		unlock(&chanlock);
234 		_procsplx(s);
235 		return -1;
236 	}
237 	c->closed = 1;		/* Being closed */
238 	/*
239 	 * Locate entries that will fail due to close
240 	 * (send, and receive if nothing buffered) and wake them up.
241 	 * the situation cannot change because all queries
242 	 * should be committed by now and new ones will find the channel
243 	 * closed.  We still need to take the lock during the iteration
244 	 * because we can wake threads on qentrys we have not seen yet
245 	 * as in alt and there would be a race in the access to *a.
246 	 */
247 	for(i = 0; i < c->nentry; i++){
248 		if((a = c->qentry[i]) == nil || *a->tag != nil)
249 			continue;
250 
251 		if(a->op != CHANSND && (a->op != CHANRCV || c->n != 0))
252 			continue;
253 		*a->tag = c;
254 		unlock(&chanlock);
255 		_procsplx(s);
256 		while(_threadrendezvous(a->tag, Closed) == Intred)
257 			;
258 		s = _procsplhi();
259 		lock(&chanlock);
260 	}
261 
262 	c->closed = 2;		/* Fully closed */
263 	if(c->freed)
264 		_chanfree(c);
265 	unlock(&chanlock);
266 	_procsplx(s);
267 	return 0;
268 }
269 
270 int
chanclosing(Channel * c)271 chanclosing(Channel *c)
272 {
273 	int n, s;
274 
275 	s = _procsplhi();	/* note handlers; see :/^alt */
276 	lock(&chanlock);
277 	if(c->closed == 0)
278 		n = -1;
279 	else
280 		n = c->n;
281 	unlock(&chanlock);
282 	_procsplx(s);
283 	return n;
284 }
285 
286 /*
287  * superseded by chanclosing
288 int
289 chanisclosed(Channel *c)
290 {
291 	return chanisclosing(c) >= 0;
292 }
293  */
294 
295 static int
runop(int op,Channel * c,void * v,int nb)296 runop(int op, Channel *c, void *v, int nb)
297 {
298 	int r;
299 	Alt a[2];
300 
301 	/*
302 	 * we could do this without calling alt,
303 	 * but the only reason would be performance,
304 	 * and i'm not convinced it matters.
305 	 */
306 	a[0].op = op;
307 	a[0].c = c;
308 	a[0].v = v;
309 	a[0].err = nil;
310 	a[1].op = CHANEND;
311 	if(nb)
312 		a[1].op = CHANNOBLK;
313 	switch(r=alt(a)){
314 	case -1:	/* interrupted */
315 		return -1;
316 	case 1:	/* nonblocking, didn't accomplish anything */
317 		assert(nb);
318 		return 0;
319 	case 0:
320 		/*
321 		 * Okay, but return -1 if the op is done because of a close.
322 		 */
323 		if(a[0].err != nil)
324 			return -1;
325 		return 1;
326 	default:
327 		fprint(2, "ERROR: channel alt returned %d\n", r);
328 		abort();
329 		return -1;
330 	}
331 }
332 
333 int
recv(Channel * c,void * v)334 recv(Channel *c, void *v)
335 {
336 	return runop(CHANRCV, c, v, 0);
337 }
338 
339 int
nbrecv(Channel * c,void * v)340 nbrecv(Channel *c, void *v)
341 {
342 	return runop(CHANRCV, c, v, 1);
343 }
344 
345 int
send(Channel * c,void * v)346 send(Channel *c, void *v)
347 {
348 	return runop(CHANSND, c, v, 0);
349 }
350 
351 int
nbsend(Channel * c,void * v)352 nbsend(Channel *c, void *v)
353 {
354 	return runop(CHANSND, c, v, 1);
355 }
356 
357 static void
channelsize(Channel * c,int sz)358 channelsize(Channel *c, int sz)
359 {
360 	if(c->e != sz){
361 		fprint(2, "expected channel with elements of size %d, got size %d\n",
362 			sz, c->e);
363 		abort();
364 	}
365 }
366 
367 int
sendul(Channel * c,ulong v)368 sendul(Channel *c, ulong v)
369 {
370 	channelsize(c, sizeof(ulong));
371 	return send(c, &v);
372 }
373 
374 ulong
recvul(Channel * c)375 recvul(Channel *c)
376 {
377 	ulong v;
378 
379 	channelsize(c, sizeof(ulong));
380 	if(recv(c, &v) < 0)
381 		return ~0;
382 	return v;
383 }
384 
385 int
sendp(Channel * c,void * v)386 sendp(Channel *c, void *v)
387 {
388 	channelsize(c, sizeof(void*));
389 	return send(c, &v);
390 }
391 
392 void*
recvp(Channel * c)393 recvp(Channel *c)
394 {
395 	void *v;
396 
397 	channelsize(c, sizeof(void*));
398 	if(recv(c, &v) < 0)
399 		return nil;
400 	return v;
401 }
402 
403 int
nbsendul(Channel * c,ulong v)404 nbsendul(Channel *c, ulong v)
405 {
406 	channelsize(c, sizeof(ulong));
407 	return nbsend(c, &v);
408 }
409 
410 ulong
nbrecvul(Channel * c)411 nbrecvul(Channel *c)
412 {
413 	ulong v;
414 
415 	channelsize(c, sizeof(ulong));
416 	if(nbrecv(c, &v) == 0)
417 		return 0;
418 	return v;
419 }
420 
421 int
nbsendp(Channel * c,void * v)422 nbsendp(Channel *c, void *v)
423 {
424 	channelsize(c, sizeof(void*));
425 	return nbsend(c, &v);
426 }
427 
428 void*
nbrecvp(Channel * c)429 nbrecvp(Channel *c)
430 {
431 	void *v;
432 
433 	channelsize(c, sizeof(void*));
434 	if(nbrecv(c, &v) == 0)
435 		return nil;
436 	return v;
437 }
438 
439 static int
emptyentry(Channel * c)440 emptyentry(Channel *c)
441 {
442 	int i, extra;
443 
444 	assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
445 
446 	for(i=0; i<c->nentry; i++)
447 		if(c->qentry[i]==nil)
448 			return i;
449 
450 	extra = 16;
451 	c->nentry += extra;
452 	c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
453 	if(c->qentry == nil)
454 		sysfatal("realloc channel entries: %r");
455 	memset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
456 	return i;
457 }
458 
459 static void
enqueue(Alt * a,Channel ** c)460 enqueue(Alt *a, Channel **c)
461 {
462 	int i;
463 
464 	_threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c);
465 	a->tag = c;
466 	i = emptyentry(a->c);
467 	a->c->qentry[i] = a;
468 }
469 
470 static void
dequeue(Alt * a)471 dequeue(Alt *a)
472 {
473 	int i;
474 	Channel *c;
475 
476 	c = a->c;
477 	for(i=0; i<c->nentry; i++)
478 		if(c->qentry[i]==a){
479 			_threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
480 			c->qentry[i] = nil;
481 			/* release if freed and not closing */
482 			if(c->freed && c->closed != 1)
483 				_chanfree(c);
484 			return;
485 		}
486 }
487 
488 static int
canexec(Alt * a)489 canexec(Alt *a)
490 {
491 	int i, otherop;
492 	Channel *c;
493 
494 	c = a->c;
495 	/* are there senders or receivers blocked? */
496 	otherop = (CHANSND+CHANRCV) - a->op;
497 	for(i=0; i<c->nentry; i++)
498 		if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
499 			_threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
500 			return 1;
501 		}
502 
503 	/* is there room in the channel? */
504 	if((a->op==CHANSND && c->n < c->s)
505 	|| (a->op==CHANRCV && c->n > 0)){
506 		_threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
507 		return 1;
508 	}
509 
510 	return 0;
511 }
512 
513 static void*
altexecbuffered(Alt * a,int willreplace)514 altexecbuffered(Alt *a, int willreplace)
515 {
516 	uchar *v;
517 	Channel *c;
518 
519 	c = a->c;
520 	/* use buffered channel queue */
521 	if(a->op==CHANRCV && c->n > 0){
522 		_threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
523 		v = c->v + c->e*(c->f%c->s);
524 		if(!willreplace)
525 			c->n--;
526 		c->f++;
527 		return v;
528 	}
529 	if(a->op==CHANSND && c->n < c->s){
530 		_threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
531 		v = c->v + c->e*((c->f+c->n)%c->s);
532 		if(!willreplace)
533 			c->n++;
534 		return v;
535 	}
536 	abort();
537 	return nil;
538 }
539 
540 static void
altcopy(void * dst,void * src,int sz)541 altcopy(void *dst, void *src, int sz)
542 {
543 	if(dst){
544 		if(src)
545 			memmove(dst, src, sz);
546 		else
547 			memset(dst, 0, sz);
548 	}
549 }
550 
551 static int
altexec(Alt * a,int spl)552 altexec(Alt *a, int spl)
553 {
554 	volatile Alt *b;
555 	int i, n, otherop;
556 	Channel *c;
557 	void *me, *waiter, *buf;
558 
559 	c = a->c;
560 
561 	/* rendezvous with others */
562 	otherop = (CHANSND+CHANRCV) - a->op;
563 	n = 0;
564 	b = nil;
565 	me = a->v;
566 	for(i=0; i<c->nentry; i++)
567 		if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
568 			if(nrand(++n) == 0)
569 				b = c->qentry[i];
570 	if(b != nil){
571 		_threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
572 		waiter = b->v;
573 		if(c->s && c->n){
574 			/*
575 			 * if buffer is full and there are waiters
576 			 * and we're meeting a waiter,
577 			 * we must be receiving.
578 			 *
579 			 * we use the value in the channel buffer,
580 			 * copy the waiter's value into the channel buffer
581 			 * on behalf of the waiter, and then wake the waiter.
582 			 */
583 			if(a->op!=CHANRCV)
584 				abort();
585 			buf = altexecbuffered(a, 1);
586 			altcopy(me, buf, c->e);
587 			altcopy(buf, waiter, c->e);
588 		}else{
589 			if(a->op==CHANRCV)
590 				altcopy(me, waiter, c->e);
591 			else
592 				altcopy(waiter, me, c->e);
593 		}
594 		*b->tag = c;	/* commits us to rendezvous */
595 		_threaddebug(DBGCHAN, "unlocking the chanlock");
596 		unlock(&chanlock);
597 		_procsplx(spl);
598 		_threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock);
599 		while(_threadrendezvous(b->tag, 0) == Intred)
600 			;
601 		return 1;
602 	}
603 
604 	buf = altexecbuffered(a, 0);
605 	if(a->op==CHANRCV)
606 		altcopy(me, buf, c->e);
607 	else
608 		altcopy(buf, me, c->e);
609 
610 	unlock(&chanlock);
611 	_procsplx(spl);
612 	return 1;
613 }
614