xref: /plan9/sys/src/libthread/channel.c (revision 9a747e4fd48b9f4522c70c07e8f882a15030f964)
1 #include <u.h>
2 #include <libc.h>
3 #include <thread.h>
4 #include "threadimpl.h"
5 
6 static Lock chanlock;		/* central channel access lock */
7 
8 static void enqueue(Alt*, Channel**);
9 static void dequeue(Alt*);
10 static int canexec(Alt*);
11 static int altexec(Alt*, int);
12 
13 void
14 chanfree(Channel *c)
15 {
16 	int i, inuse;
17 
18 	lock(&chanlock);
19 	inuse = 0;
20 	for(i = 0; i < c->nentry; i++)
21 		if(c->qentry[i])
22 			inuse = 1;
23 	if(inuse)
24 		c->freed = 1;
25 	else{
26 		if(c->qentry)
27 			free((void*)c->qentry);
28 		free(c);
29 	}
30 	unlock(&chanlock);
31 }
32 
33 int
34 chaninit(Channel *c, int elemsize, int elemcnt)
35 {
36 	if(elemcnt < 0 || elemsize <= 0 || c == nil)
37 		return -1;
38 	c->f = 0;
39 	c->n = 0;
40 	c->freed = 0;
41 	c->e = elemsize;
42 	c->s = elemcnt;
43 	_threaddebug(DBGCHAN, "chaninit %p", c);
44 	return 1;
45 }
46 
47 Channel*
48 chancreate(int elemsize, int elemcnt)
49 {
50 	Channel *c;
51 
52 	if(elemcnt < 0 || elemsize <= 0)
53 		return nil;
54 	c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
55 	c->e = elemsize;
56 	c->s = elemcnt;
57 	_threaddebug(DBGCHAN, "chancreate %p", c);
58 	return c;
59 }
60 
61 int
62 alt(Alt *alts)
63 {
64 	Alt *a, *xa;
65 	Channel *c;
66 	int n, s;
67 	ulong r;
68 	Thread *t;
69 
70 	/*
71 	 * The point of going splhi here is that note handlers
72 	 * might reasonably want to use channel operations,
73 	 * but that will hang if the note comes while we hold the
74 	 * chanlock.  Instead, we delay the note until we've dropped
75 	 * the lock.
76 	 */
77 	t = _threadgetproc()->thread;
78 	if(t->moribund || _threadexitsallstatus)
79 		yield();	/* won't return */
80 	s = _procsplhi();
81 	lock(&chanlock);
82 	t->alt = alts;
83 	t->chan = Chanalt;
84 
85 	/* test whether any channels can proceed */
86 	n = 0;
87 	a = nil;
88 
89 	for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
90 		xa->entryno = -1;
91 		if(xa->op == CHANNOP)
92 			continue;
93 
94 		c = xa->c;
95 		if(c==nil)
96 			sysfatal("alt: nil channel in entry %ld", xa-alts);
97 		if(canexec(xa))
98 			if(nrand(++n) == 0)
99 				a = xa;
100 	}
101 
102 	if(a==nil){
103 		/* nothing can proceed */
104 		if(xa->op == CHANNOBLK){
105 			unlock(&chanlock);
106 			_procsplx(s);
107 			t->chan = Channone;
108 			return xa - alts;
109 		}
110 
111 		/* enqueue on all channels. */
112 		c = nil;
113 		for(xa=alts; xa->op!=CHANEND; xa++){
114 			if(xa->op==CHANNOP)
115 				continue;
116 			enqueue(xa, &c);
117 		}
118 
119 		/*
120 		 * wait for successful rendezvous.
121 		 * we can't just give up if the rendezvous
122 		 * is interrupted -- someone else might come
123 		 * along and try to rendezvous with us, so
124 		 * we need to be here.
125 		 */
126 	    Again:
127 		unlock(&chanlock);
128 		_procsplx(s);
129 		r = _threadrendezvous((ulong)&c, 0);
130 		s = _procsplhi();
131 		lock(&chanlock);
132 
133 		if(r==~0){		/* interrupted */
134 			if(c!=nil)		/* someone will meet us; go back */
135 				goto Again;
136 			c = (Channel*)~0;	/* so no one tries to meet us */
137 		}
138 
139 		/* dequeue from channels, find selected one */
140 		a = nil;
141 		for(xa=alts; xa->op!=CHANEND; xa++){
142 			if(xa->op==CHANNOP)
143 				continue;
144 			if(xa->c == c)
145 				a = xa;
146 			dequeue(xa);
147 		}
148 		unlock(&chanlock);
149 		_procsplx(s);
150 		if(a == nil){	/* we were interrupted */
151 			assert(c==(Channel*)~0);
152 			return -1;
153 		}
154 		if(c->freed)
155 			chanfree(c);
156 	}else{
157 		altexec(a, s);	/* unlocks chanlock, does splx */
158 	}
159 	_sched();
160 	t->chan = Channone;
161 	return a - alts;
162 }
163 
164 static int
165 runop(int op, Channel *c, void *v, int nb)
166 {
167 	int r;
168 	Alt a[2];
169 
170 	/*
171 	 * we could do this without calling alt,
172 	 * but the only reason would be performance,
173 	 * and i'm not convinced it matters.
174 	 */
175 	a[0].op = op;
176 	a[0].c = c;
177 	a[0].v = v;
178 	a[1].op = CHANEND;
179 	if(nb)
180 		a[1].op = CHANNOBLK;
181 	switch(r=alt(a)){
182 	case -1:	/* interrupted */
183 		return -1;
184 	case 1:	/* nonblocking, didn't accomplish anything */
185 		assert(nb);
186 		return 0;
187 	case 0:
188 		return 1;
189 	default:
190 		fprint(2, "ERROR: channel alt returned %d\n", r);
191 		abort();
192 		return -1;
193 	}
194 }
195 
196 int
197 recv(Channel *c, void *v)
198 {
199 	return runop(CHANRCV, c, v, 0);
200 }
201 
202 int
203 nbrecv(Channel *c, void *v)
204 {
205 	return runop(CHANRCV, c, v, 1);
206 }
207 
208 int
209 send(Channel *c, void *v)
210 {
211 	return runop(CHANSND, c, v, 0);
212 }
213 
214 int
215 nbsend(Channel *c, void *v)
216 {
217 	return runop(CHANSND, c, v, 1);
218 }
219 
220 static void
221 channelsize(Channel *c, int sz)
222 {
223 	if(c->e != sz){
224 		fprint(2, "expected channel with elements of size %d, got size %d",
225 			sz, c->e);
226 		abort();
227 	}
228 }
229 
230 int
231 sendul(Channel *c, ulong v)
232 {
233 	channelsize(c, sizeof(ulong));
234 	return send(c, &v);
235 }
236 
237 ulong
238 recvul(Channel *c)
239 {
240 	ulong v;
241 
242 	channelsize(c, sizeof(ulong));
243 	if(recv(c, &v) < 0)
244 		return ~0;
245 	return v;
246 }
247 
248 int
249 sendp(Channel *c, void *v)
250 {
251 	channelsize(c, sizeof(void*));
252 	return send(c, &v);
253 }
254 
255 void*
256 recvp(Channel *c)
257 {
258 	void *v;
259 
260 	channelsize(c, sizeof(void*));
261 	if(recv(c, &v) < 0)
262 		return nil;
263 	return v;
264 }
265 
266 int
267 nbsendul(Channel *c, ulong v)
268 {
269 	channelsize(c, sizeof(ulong));
270 	return nbsend(c, &v);
271 }
272 
273 ulong
274 nbrecvul(Channel *c)
275 {
276 	ulong v;
277 
278 	channelsize(c, sizeof(ulong));
279 	if(nbrecv(c, &v) == 0)
280 		return 0;
281 	return v;
282 }
283 
284 int
285 nbsendp(Channel *c, void *v)
286 {
287 	channelsize(c, sizeof(void*));
288 	return nbsend(c, &v);
289 }
290 
291 void*
292 nbrecvp(Channel *c)
293 {
294 	void *v;
295 
296 	channelsize(c, sizeof(void*));
297 	if(nbrecv(c, &v) == 0)
298 		return nil;
299 	return v;
300 }
301 
302 static int
303 emptyentry(Channel *c)
304 {
305 	int i, extra;
306 
307 	assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
308 
309 	for(i=0; i<c->nentry; i++)
310 		if(c->qentry[i]==nil)
311 			return i;
312 
313 	if(i==0)
314 		extra=1;
315 	else{
316 		extra=i;
317 		if(extra > 16)
318 			extra = 16;
319 	}
320 	c->nentry += extra;
321 	c->qentry = (volatile void*)realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
322 	if(c->qentry == nil)
323 		sysfatal("realloc channel entries: %r");
324 	memset((void*)&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
325 	return i;
326 }
327 
328 static void
329 enqueue(Alt *a, Channel **c)
330 {
331 	int i;
332 
333 	_threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c);
334 	a->tag = c;
335 	i = emptyentry(a->c);
336 	a->c->qentry[i] = a;
337 }
338 
339 static void
340 dequeue(Alt *a)
341 {
342 	int i;
343 	Channel *c;
344 
345 	c = a->c;
346 	for(i=0; i<c->nentry; i++)
347 		if(c->qentry[i]==a){
348 			_threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
349 			c->qentry[i] = nil;
350 			return;
351 		}
352 }
353 
354 static int
355 canexec(Alt *a)
356 {
357 	int i, otherop;
358 	Channel *c;
359 
360 	c = a->c;
361 	/* are there senders or receivers blocked? */
362 	otherop = (CHANSND+CHANRCV) - a->op;
363 	for(i=0; i<c->nentry; i++)
364 		if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
365 			_threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
366 			return 1;
367 		}
368 
369 	/* is there room in the channel? */
370 	if((a->op==CHANSND && c->n < c->s)
371 	|| (a->op==CHANRCV && c->n > 0)){
372 		_threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
373 		return 1;
374 	}
375 
376 	return 0;
377 }
378 
379 static void*
380 altexecbuffered(Alt *a, int willreplace)
381 {
382 	uchar *v;
383 	Channel *c;
384 
385 	c = a->c;
386 	/* use buffered channel queue */
387 	if(a->op==CHANRCV && c->n > 0){
388 		_threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
389 		v = c->v + c->e*(c->f%c->s);
390 		if(!willreplace)
391 			c->n--;
392 		c->f++;
393 		return v;
394 	}
395 	if(a->op==CHANSND && c->n < c->s){
396 		_threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
397 		v = c->v + c->e*((c->f+c->n)%c->s);
398 		if(!willreplace)
399 			c->n++;
400 		return v;
401 	}
402 	abort();
403 	return nil;
404 }
405 
406 static void
407 altcopy(void *dst, void *src, int sz)
408 {
409 	if(dst){
410 		if(src)
411 			memmove(dst, src, sz);
412 		else
413 			memset(dst, 0, sz);
414 	}
415 }
416 
417 static int
418 altexec(Alt *a, int spl)
419 {
420 	volatile Alt *b;
421 	int i, n, otherop;
422 	Channel *c;
423 	void *me, *waiter, *buf;
424 
425 	c = a->c;
426 
427 	/* rendezvous with others */
428 	otherop = (CHANSND+CHANRCV) - a->op;
429 	n = 0;
430 	b = nil;
431 	me = a->v;
432 	for(i=0; i<c->nentry; i++)
433 		if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
434 			if(nrand(++n) == 0)
435 				b = c->qentry[i];
436 	if(b != nil){
437 		_threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
438 		waiter = b->v;
439 		if(c->s && c->n){
440 			/*
441 			 * if buffer is full and there are waiters
442 			 * and we're meeting a waiter,
443 			 * we must be receiving.
444 			 *
445 			 * we use the value in the channel buffer,
446 			 * copy the waiter's value into the channel buffer
447 			 * on behalf of the waiter, and then wake the waiter.
448 			 */
449 			if(a->op!=CHANRCV)
450 				abort();
451 			buf = altexecbuffered(a, 1);
452 			altcopy(me, buf, c->e);
453 			altcopy(buf, waiter, c->e);
454 		}else{
455 			if(a->op==CHANRCV)
456 				altcopy(me, waiter, c->e);
457 			else
458 				altcopy(waiter, me, c->e);
459 		}
460 		*b->tag = c;	/* commits us to rendezvous */
461 		_threaddebug(DBGCHAN, "unlocking the chanlock");
462 		unlock(&chanlock);
463 		_procsplx(spl);
464 		_threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock);
465 		while(_threadrendezvous((ulong)b->tag, 0) == ~0)
466 			;
467 		return 1;
468 	}
469 
470 	buf = altexecbuffered(a, 0);
471 	if(a->op==CHANRCV)
472 		altcopy(me, buf, c->e);
473 	else
474 		altcopy(buf, me, c->e);
475 
476 	unlock(&chanlock);
477 	_procsplx(spl);
478 	return 1;
479 }
480