xref: /plan9-contrib/sys/src/cmd/fossil/9proc.c (revision a6a9e07217f318acf170f99684a55fba5200524f)
1 #include "stdinc.h"
2 
3 #include "9.h"
4 #include "dat.h"
5 #include "fns.h"
6 
7 enum {
8 	NConInit	= 128,
9 	NMsgInit	= 20,
10 	NMsgProcInit	= 4,
11 	NMsizeInit	= 8192+IOHDRSZ,
12 };
13 
14 static struct {
15 	VtLock*	alock;			/* alloc */
16 	Msg*	ahead;
17 	VtRendez* arendez;
18 
19 	int	maxmsg;
20 	int	nmsg;
21 	int	nmsgstarve;
22 
23 	VtLock*	rlock;			/* read */
24 	Msg*	rhead;
25 	Msg*	rtail;
26 	VtRendez* rrendez;
27 
28 	int	maxproc;
29 	int	nproc;
30 	int	nprocstarve;
31 
32 	u32int	msize;			/* immutable */
33 } mbox;
34 
35 static struct {
36 	VtLock*	alock;			/* alloc */
37 	Con*	ahead;
38 	VtRendez* arendez;
39 
40 	VtLock*	clock;
41 	Con*	chead;
42 	Con*	ctail;
43 
44 	int	maxcon;
45 	int	ncon;
46 	int	nconstarve;
47 
48 	u32int	msize;
49 } cbox;
50 
51 static void
52 conFree(Con* con)
53 {
54 	assert(con->version == nil);
55 	assert(con->mhead == nil);
56 	assert(con->whead == nil);
57 	assert(con->nfid == 0);
58 	assert(con->state == ConMoribund);
59 
60 	if(con->fd >= 0){
61 		close(con->fd);
62 		con->fd = -1;
63 	}
64 	con->state = ConDead;
65 
66 	vtLock(cbox.alock);
67 	if(con->cprev != nil)
68 		con->cprev->cnext = con->cnext;
69 	else
70 		cbox.chead = con->cnext;
71 	if(con->cnext != nil)
72 		con->cnext->cprev = con->cprev;
73 	else
74 		cbox.ctail = con->cprev;
75 	con->cprev = con->cnext = nil;
76 
77 	if(cbox.ncon > cbox.maxcon){
78 		if(con->name != nil)
79 			vtMemFree(con->name);
80 		vtLockFree(con->fidlock);
81 		vtMemFree(con->data);
82 		vtRendezAlloc(con->wrendez);
83 		vtLockFree(con->wlock);
84 		vtRendezAlloc(con->mrendez);
85 		vtLockFree(con->mlock);
86 		vtRendezAlloc(con->rendez);
87 		vtLockFree(con->lock);
88 		vtMemFree(con);
89 		cbox.ncon--;
90 		vtUnlock(cbox.alock);
91 		return;
92 	}
93 	con->anext = cbox.ahead;
94 	cbox.ahead = con;
95 	if(con->anext == nil)
96 		vtWakeup(cbox.arendez);
97 	vtUnlock(cbox.alock);
98 }
99 
100 static void
101 msgFree(Msg* m)
102 {
103 	assert(m->rwnext == nil);
104 	assert(m->fnext == nil && m->fprev == nil);
105 
106 	vtLock(mbox.alock);
107 	if(mbox.nmsg > mbox.maxmsg){
108 		vtMemFree(m->data);
109 		vtMemFree(m);
110 		mbox.nmsg--;
111 		vtUnlock(mbox.alock);
112 		return;
113 	}
114 	m->anext = mbox.ahead;
115 	mbox.ahead = m;
116 	if(m->anext == nil)
117 		vtWakeup(mbox.arendez);
118 	vtUnlock(mbox.alock);
119 }
120 
121 static Msg*
122 msgAlloc(Con* con)
123 {
124 	Msg *m;
125 
126 	vtLock(mbox.alock);
127 	while(mbox.ahead == nil){
128 		if(mbox.nmsg >= mbox.maxmsg){
129 			mbox.nmsgstarve++;
130 			vtSleep(mbox.arendez);
131 			continue;
132 		}
133 		m = vtMemAllocZ(sizeof(Msg));
134 		m->data = vtMemAlloc(mbox.msize);
135 		m->msize = mbox.msize;
136 		mbox.nmsg++;
137 		mbox.ahead = m;
138 		break;
139 	}
140 	m = mbox.ahead;
141 	mbox.ahead = m->anext;
142 	m->anext = nil;
143 	vtUnlock(mbox.alock);
144 
145 	m->con = con;
146 	m->state = MsgR;
147 
148 	return m;
149 }
150 
151 static void
152 msgMunlink(Msg* m)
153 {
154 	Con *con;
155 
156 	con = m->con;
157 
158 	if(m->mprev != nil)
159 		m->mprev->mnext = m->mnext;
160 	else
161 		con->mhead = m->mnext;
162 	if(m->mnext != nil)
163 		m->mnext->mprev = m->mprev;
164 	else
165 		con->mtail = m->mprev;
166 	m->mprev = m->mnext = nil;
167 }
168 
169 static void
170 msgUnlinkUnlockAndFree(Msg* m)
171 {
172 	/*
173 	 * Unlink the message from the flush and message queues,
174 	 * unlock the connection message lock and free the message.
175 	 * Called with con->mlock locked.
176 	 */
177 	if(m->fprev != nil)
178 		m->fprev->fnext = m->fnext;
179 	if(m->fnext != nil)
180 		m->fnext->fprev = m->fprev;
181 	m->fprev = m->fnext = nil;
182 
183 	msgMunlink(m);
184 	vtUnlock(m->con->mlock);
185 	msgFree(m);
186 }
187 
188 void
189 msgFlush(Msg* m)
190 {
191 	Msg *old;
192 	Con *con;
193 
194 	con = m->con;
195 
196 	/*
197 	 * Look for the message to be flushed in the
198 	 * queue of all messages still on this connection.
199 	 */
200 	vtLock(con->mlock);
201 	for(old = con->mhead; old != nil; old = old->mnext)
202 		if(old->t.tag == m->t.oldtag)
203 			break;
204 	if(old == nil){
205 		if(Dflag)
206 			fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag);
207 		vtUnlock(con->mlock);
208 		return;
209 	}
210 
211 	/*
212 	 * Found it.
213 	 *
214 	 * Easy case is no 9P processing done yet,
215 	 * message is on the read queue.
216 	 * Mark the message as flushed and let the read
217 	 * process throw it away after after pulling
218 	 * it off the read queue.
219 	 */
220 	if(old->state == MsgR){
221 		old->state = MsgF;
222 		if(Dflag)
223 			fprint(2, "msgFlush: change %d from MsgR to MsgF\n", m->t.oldtag);
224 		vtUnlock(con->mlock);
225 		return;
226 	}
227 
228 	/*
229 	 * Flushing flushes.
230 	 * Since they don't affect the server state, flushes
231 	 * can be deleted when in Msg9 or MsgW state.
232 	 */
233 	if(old->t.type == Tflush){
234 		/*
235 		 * For Msg9 state, the old message may
236 		 * or may not be on the write queue.
237 		 * Mark the message as flushed and let
238 		 * the write process throw it away after
239 		 * after pulling it off the write queue.
240 		 */
241 		if(old->state == Msg9){
242 			old->state = MsgF;
243 			if(Dflag)
244 				fprint(2, "msgFlush: change %d from Msg9 to MsgF\n", m->t.oldtag);
245 			vtUnlock(con->mlock);
246 			return;
247 		}
248 		assert(old->state == MsgW);
249 
250 		/*
251 		 * A flush in MsgW state implies it is waiting
252 		 * for its corresponding old message to be written,
253 		 * so it can be deleted right here, right now...
254 		 * right here, right now... right here, right now...
255 		 * right about now... the funk soul brother.
256 		 */
257 		if(Dflag)
258 			fprint(2, "msgFlush: delete pending flush %F\n", &old->t);
259 		msgUnlinkUnlockAndFree(old);
260 		return;
261 	}
262 
263 	/*
264 	 * Must wait for the old message to be written.
265 	 * Add m to old's flush queue.
266 	 * Old is the head of its own flush queue.
267 	 */
268 	m->fprev = old;
269 	m->fnext = old->fnext;
270 	if(m->fnext)
271 		m->fnext->fprev = m;
272 	old->fnext = m;
273 	if(Dflag)
274 		fprint(2, "msgFlush: add %d to %d queue\n", m->t.tag, old->t.tag);
275 	vtUnlock(con->mlock);
276 }
277 
278 static void
279 msgProc(void*)
280 {
281 	Msg *m;
282 	char *e;
283 	Con *con;
284 
285 	vtThreadSetName("msgProc");
286 
287 	for(;;){
288 		/*
289 		 * If surplus to requirements, exit.
290 		 * If not, wait for and pull a message off
291 		 * the read queue.
292 		 */
293 		vtLock(mbox.rlock);
294 		if(mbox.nproc > mbox.maxproc){
295 			mbox.nproc--;
296 			vtUnlock(mbox.rlock);
297 			break;
298 		}
299 		while(mbox.rhead == nil)
300 			vtSleep(mbox.rrendez);
301 		m = mbox.rhead;
302 		mbox.rhead = m->rwnext;
303 		m->rwnext = nil;
304 		vtUnlock(mbox.rlock);
305 
306 		con = m->con;
307 		e = nil;
308 
309 		/*
310 		 * If the message has been flushed before any
311 		 * 9P processing has started, just throw it away.
312 		 */
313 		vtLock(con->mlock);
314 		if(m->state == MsgF){
315 			msgUnlinkUnlockAndFree(m);
316 			continue;
317 		}
318 		m->state = Msg9;
319 		vtUnlock(con->mlock);
320 
321 		/*
322 		 * explain this
323 		 */
324 		vtLock(con->lock);
325 		if(m->t.type == Tversion){
326 			con->version = m;
327 			con->state = ConDown;
328 			while(con->mhead != m)
329 				vtSleep(con->rendez);
330 			assert(con->state == ConDown);
331 			if(con->version == m){
332 				con->version = nil;
333 				con->state = ConInit;
334 			}
335 			else
336 				e = "Tversion aborted";
337 		}
338 		else if(con->state != ConUp)
339 			e = "connection not ready";
340 		vtUnlock(con->lock);
341 
342 		/*
343 		 * Dispatch if not error already.
344 		 */
345 		m->r.tag = m->t.tag;
346 		if(e == nil && !(*rFcall[m->t.type])(m))
347 			e = vtGetError();
348 		if(e != nil){
349 			m->r.type = Rerror;
350 			m->r.ename = e;
351 		}
352 		else
353 			m->r.type = m->t.type+1;
354 
355 
356 		/*
357 		 * Put the message (with reply) on the
358 		 * write queue and wakeup the write process.
359 		 */
360 		vtLock(con->wlock);
361 		if(con->whead == nil)
362 			con->whead = m;
363 		else
364 			con->wtail->rwnext = m;
365 		con->wtail = m;
366 		vtWakeup(con->wrendez);
367 		vtUnlock(con->wlock);
368 	}
369 }
370 
371 static void
372 msgRead(void* v)
373 {
374 	Msg *m;
375 	Con *con;
376 	int eof, fd, n;
377 
378 	vtThreadSetName("msgRead");
379 
380 	con = v;
381 	fd = con->fd;
382 	eof = 0;
383 
384 	while(!eof){
385 		m = msgAlloc(con);
386 
387 		while((n = read9pmsg(fd, m->data, con->msize)) == 0)
388 			;
389 		if(n < 0){
390 			m->t.type = Tversion;
391 			m->t.fid = NOFID;
392 			m->t.tag = NOTAG;
393 			m->t.msize = con->msize;
394 			m->t.version = "9PEoF";
395 			eof = 1;
396 		}
397 		else if(convM2S(m->data, n, &m->t) != n){
398 			if(Dflag)
399 				fprint(2, "msgRead: convM2S error: %s\n",
400 					con->name);
401 			msgFree(m);
402 			continue;
403 		}
404 		if(Dflag)
405 			fprint(2, "msgRead: t %F\n", &m->t);
406 
407 		vtLock(con->mlock);
408 		if(con->mtail != nil){
409 			m->mprev = con->mtail;
410 			con->mtail->mnext = m;
411 		}
412 		else{
413 			con->mhead = m;
414 			m->mprev = nil;
415 		}
416 		con->mtail = m;
417 		vtUnlock(con->mlock);
418 
419 		vtLock(mbox.rlock);
420 		if(mbox.rhead == nil){
421 			mbox.rhead = m;
422 			if(!vtWakeup(mbox.rrendez)){
423 				if(mbox.nproc < mbox.maxproc){
424 					if(vtThread(msgProc, nil) > 0)
425 						mbox.nproc++;
426 				}
427 				else
428 					mbox.nprocstarve++;
429 			}
430 			/*
431 			 * don't need this surely?
432 			vtWakeup(mbox.rrendez);
433 			 */
434 		}
435 		else
436 			mbox.rtail->rwnext = m;
437 		mbox.rtail = m;
438 		vtUnlock(mbox.rlock);
439 	}
440 }
441 
442 static int
443 _msgWrite(Msg* m)
444 {
445 	Con *con;
446 	int eof, n;
447 
448 	con = m->con;
449 
450 	/*
451 	 * An Rflush with a .fprev implies it is on a flush queue waiting for
452 	 * its corresponding 'oldtag' message to go out first, so punt
453 	 * until the 'oldtag' message goes out (see below).
454 	 */
455 	if(m->r.type == Rflush && m->fprev != nil){
456 		fprint(2, "msgWrite: delay r %F\n", &m->r);
457 		return 0;
458 	}
459 
460 	msgMunlink(m);
461 	vtUnlock(con->mlock);
462 
463 	/*
464 	 * TODO: optimise this copy away somehow for
465 	 * read, stat, etc.
466 	 */
467 	assert(n = convS2M(&m->r, con->data, con->msize));
468 	if(write(con->fd, con->data, n) != n)
469 		eof = 1;
470 	else
471 		eof = 0;
472 
473 	if(Dflag)
474 		fprint(2, "msgWrite: r %F\n", &m->r);
475 
476 	/*
477 	 * Just wrote a reply. If it has any flushes waiting
478 	 * for it to have gone out, recurse down the list writing
479 	 * them out too.
480 	 */
481 	vtLock(con->mlock);
482 	if(m->fnext != nil){
483 		m->fnext->fprev = nil;
484 		eof += _msgWrite(m->fnext);
485 		m->fnext = nil;
486 	}
487 	msgFree(m);
488 
489 	return eof;
490 }
491 
492 static void
493 msgWrite(void* v)
494 {
495 	Msg *m;
496 	int eof;
497 	Con *con;
498 
499 	vtThreadSetName("msgWrite");
500 
501 	con = v;
502 	if(vtThread(msgRead, con) < 0){
503 		conFree(con);
504 		return;
505 	}
506 
507 	for(;;){
508 		/*
509 		 * Wait for and pull a message off the write queue.
510 		 */
511 		vtLock(con->wlock);
512 		while(con->whead == nil)
513 			vtSleep(con->wrendez);
514 		m = con->whead;
515 		con->whead = m->rwnext;
516 		m->rwnext = nil;
517 		vtUnlock(con->wlock);
518 
519 		/*
520 		 * Throw the message away if it's a flushed flush,
521 		 * otherwise change its state and try to write it out.
522 		 */
523 		vtLock(con->mlock);
524 		if(m->state == MsgF){
525 			assert(m->t.type == Tflush);
526 			msgUnlinkUnlockAndFree(m);
527 			continue;
528 		}
529 		m->state = MsgW;
530 		eof = _msgWrite(m);
531 		vtUnlock(con->mlock);
532 
533 		vtLock(con->lock);
534 		if(eof && con->fd >= 0){
535 			close(con->fd);
536 			con->fd = -1;
537 		}
538 		if(con->state == ConDown)
539 			vtWakeup(con->rendez);
540 		if(con->state == ConMoribund && con->mhead == nil){
541 			vtUnlock(con->lock);
542 			conFree(con);
543 			break;
544 		}
545 		vtUnlock(con->lock);
546 	}
547 }
548 
549 Con*
550 conAlloc(int fd, char* name)
551 {
552 	Con *con;
553 
554 	vtLock(cbox.alock);
555 	while(cbox.ahead == nil){
556 		if(cbox.ncon >= cbox.maxcon){
557 			cbox.nconstarve++;
558 			vtSleep(cbox.arendez);
559 			continue;
560 		}
561 		con = vtMemAllocZ(sizeof(Con));
562 		con->lock = vtLockAlloc();
563 		con->rendez = vtRendezAlloc(con->lock);
564 		con->data = vtMemAlloc(cbox.msize);
565 		con->msize = cbox.msize;
566 		con->alock = vtLockAlloc();
567 		con->mlock = vtLockAlloc();
568 		con->mrendez = vtRendezAlloc(con->mlock);
569 		con->wlock = vtLockAlloc();
570 		con->wrendez = vtRendezAlloc(con->wlock);
571 		con->fidlock = vtLockAlloc();
572 
573 		cbox.ncon++;
574 		cbox.ahead = con;
575 		break;
576 	}
577 	con = cbox.ahead;
578 	cbox.ahead = con->anext;
579 	con->anext = nil;
580 
581 	if(cbox.ctail != nil){
582 		con->cprev = cbox.ctail;
583 		cbox.ctail->cnext = con;
584 	}
585 	else{
586 		cbox.chead = con;
587 		con->cprev = nil;
588 	}
589 	cbox.ctail = con;
590 
591 	assert(con->mhead == nil);
592 	assert(con->whead == nil);
593 	assert(con->fhead == nil);
594 	assert(con->nfid == 0);
595 
596 	con->state = ConNew;
597 	con->fd = fd;
598 	if(con->name != nil){
599 		vtMemFree(con->name);
600 		con->name = nil;
601 	}
602 	if(name != nil)
603 		con->name = vtStrDup(name);
604 	else
605 		con->name = vtStrDup("unknown");
606 	con->aok = 0;
607 	vtUnlock(cbox.alock);
608 
609 	if(vtThread(msgWrite, con) < 0){
610 		conFree(con);
611 		return nil;
612 	}
613 
614 	return con;
615 }
616 
617 static int
618 cmdMsg(int argc, char* argv[])
619 {
620 	char *p;
621 	char *usage = "usage: msg [-m nmsg] [-p nproc]";
622 	int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve;
623 
624 	maxmsg = maxproc = 0;
625 
626 	ARGBEGIN{
627 	default:
628 		return cliError(usage);
629 	case 'm':
630 		p = ARGF();
631 		if(p == nil)
632 			return cliError(usage);
633 		maxmsg = strtol(argv[0], &p, 0);
634 		if(maxmsg <= 0 || p == argv[0] || *p != '\0')
635 			return cliError(usage);
636 		break;
637 	case 'p':
638 		p = ARGF();
639 		if(p == nil)
640 			return cliError(usage);
641 		maxproc = strtol(argv[0], &p, 0);
642 		if(maxproc <= 0 || p == argv[0] || *p != '\0')
643 			return cliError(usage);
644 		break;
645 	}ARGEND
646 	if(argc)
647 		return cliError(usage);
648 
649 	vtLock(mbox.alock);
650 	if(maxmsg)
651 		mbox.maxmsg = maxmsg;
652 	maxmsg = mbox.maxmsg;
653 	nmsg = mbox.nmsg;
654 	nmsgstarve = mbox.nmsgstarve;
655 	vtUnlock(mbox.alock);
656 
657 	vtLock(mbox.rlock);
658 	if(maxproc)
659 		mbox.maxproc = maxproc;
660 	maxproc = mbox.maxproc;
661 	nproc = mbox.nproc;
662 	nprocstarve = mbox.nprocstarve;
663 	vtUnlock(mbox.rlock);
664 
665 	consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc);
666 	consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n",
667 		nmsg, nmsgstarve, nproc, nprocstarve);
668 
669 	return 1;
670 }
671 
672 void
673 msgInit(void)
674 {
675 	mbox.alock = vtLockAlloc();
676 	mbox.arendez = vtRendezAlloc(mbox.alock);
677 
678 	mbox.rlock = vtLockAlloc();
679 	mbox.rrendez = vtRendezAlloc(mbox.rlock);
680 
681 	mbox.maxmsg = NMsgInit;
682 	mbox.maxproc = NMsgProcInit;
683 	mbox.msize = NMsizeInit;
684 
685 	cliAddCmd("msg", cmdMsg);
686 }
687 
688 static int
689 cmdCon(int argc, char* argv[])
690 {
691 	char *p;
692 	Con *con;
693 	char *usage = "usage: con [-m ncon]";
694 	int maxcon, ncon, nconstarve;
695 
696 	maxcon = 0;
697 
698 	ARGBEGIN{
699 	default:
700 		return cliError(usage);
701 	case 'm':
702 		p = ARGF();
703 		if(p == nil)
704 			return cliError(usage);
705 		maxcon = strtol(argv[0], &p, 0);
706 		if(maxcon <= 0 || p == argv[0] || *p != '\0')
707 			return cliError(usage);
708 		break;
709 	}ARGEND
710 	if(argc)
711 		return cliError(usage);
712 
713 	vtLock(cbox.clock);
714 	if(maxcon)
715 		cbox.maxcon = maxcon;
716 	maxcon = cbox.maxcon;
717 	ncon = cbox.ncon;
718 	nconstarve = cbox.nconstarve;
719 	vtUnlock(cbox.clock);
720 
721 	consPrint("\tcon -m %d\n", maxcon);
722 	consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve);
723 
724 	vtRLock(cbox.clock);
725 	for(con = cbox.chead; con != nil; con = con->cnext){
726 		consPrint("\t%s\n", con->name);
727 	}
728 	vtRUnlock(cbox.clock);
729 
730 	return 1;
731 }
732 
733 void
734 conInit(void)
735 {
736 	cbox.alock = vtLockAlloc();
737 	cbox.arendez = vtRendezAlloc(cbox.alock);
738 
739 	cbox.clock = vtLockAlloc();
740 
741 	cbox.maxcon = NConInit;
742 	cbox.msize = NMsizeInit;
743 
744 	cliAddCmd("con", cmdCon);
745 }
746