xref: /plan9-contrib/sys/src/cmd/fossil/9proc.c (revision 9b943567965ba040fd275927fbe088656eb8ce4f)
1 #include "stdinc.h"
2 
3 #include "9.h"
4 #include "dat.h"
5 #include "fns.h"
6 
7 enum {
8 	NConInit	= 128,
9 	NMsgInit	= 384,
10 	NMsgProcInit	= 64,
11 	NMsizeInit	= 8192+IOHDRSZ,
12 };
13 
14 static struct {
15 	VtLock*	alock;			/* alloc */
16 	Msg*	ahead;
17 	VtRendez* arendez;
18 
19 	int	maxmsg;
20 	int	nmsg;
21 	int	nmsgstarve;
22 
23 	VtLock*	rlock;			/* read */
24 	Msg*	rhead;
25 	Msg*	rtail;
26 	VtRendez* rrendez;
27 
28 	int	maxproc;
29 	int	nproc;
30 	int	nprocstarve;
31 
32 	u32int	msize;			/* immutable */
33 } mbox;
34 
35 static struct {
36 	VtLock*	alock;			/* alloc */
37 	Con*	ahead;
38 	VtRendez* arendez;
39 
40 	VtLock*	clock;
41 	Con*	chead;
42 	Con*	ctail;
43 
44 	int	maxcon;
45 	int	ncon;
46 	int	nconstarve;
47 
48 	u32int	msize;
49 } cbox;
50 
51 static void
52 conFree(Con* con)
53 {
54 	assert(con->version == nil);
55 	assert(con->mhead == nil);
56 	assert(con->whead == nil);
57 	assert(con->nfid == 0);
58 	assert(con->state == ConMoribund);
59 
60 	if(con->fd >= 0){
61 		close(con->fd);
62 		con->fd = -1;
63 	}
64 	con->state = ConDead;
65 
66 	vtLock(cbox.alock);
67 	if(con->cprev != nil)
68 		con->cprev->cnext = con->cnext;
69 	else
70 		cbox.chead = con->cnext;
71 	if(con->cnext != nil)
72 		con->cnext->cprev = con->cprev;
73 	else
74 		cbox.ctail = con->cprev;
75 	con->cprev = con->cnext = nil;
76 
77 	if(cbox.ncon > cbox.maxcon){
78 		if(con->name != nil)
79 			vtMemFree(con->name);
80 		vtLockFree(con->fidlock);
81 		vtMemFree(con->data);
82 		vtRendezFree(con->wrendez);
83 		vtLockFree(con->wlock);
84 		vtRendezFree(con->mrendez);
85 		vtLockFree(con->mlock);
86 		vtRendezFree(con->rendez);
87 		vtLockFree(con->lock);
88 		vtMemFree(con);
89 		cbox.ncon--;
90 		vtUnlock(cbox.alock);
91 		return;
92 	}
93 	con->anext = cbox.ahead;
94 	cbox.ahead = con;
95 	if(con->anext == nil)
96 		vtWakeup(cbox.arendez);
97 	vtUnlock(cbox.alock);
98 }
99 
100 static void
101 msgFree(Msg* m)
102 {
103 	assert(m->rwnext == nil);
104 	assert(m->flush == nil);
105 
106 	vtLock(mbox.alock);
107 	if(mbox.nmsg > mbox.maxmsg){
108 		vtMemFree(m->data);
109 		vtMemFree(m);
110 		mbox.nmsg--;
111 		vtUnlock(mbox.alock);
112 		return;
113 	}
114 	m->anext = mbox.ahead;
115 	mbox.ahead = m;
116 	if(m->anext == nil)
117 		vtWakeup(mbox.arendez);
118 	vtUnlock(mbox.alock);
119 }
120 
121 static Msg*
122 msgAlloc(Con* con)
123 {
124 	Msg *m;
125 
126 	vtLock(mbox.alock);
127 	while(mbox.ahead == nil){
128 		if(mbox.nmsg >= mbox.maxmsg){
129 			mbox.nmsgstarve++;
130 			vtSleep(mbox.arendez);
131 			continue;
132 		}
133 		m = vtMemAllocZ(sizeof(Msg));
134 		m->data = vtMemAlloc(mbox.msize);
135 		m->msize = mbox.msize;
136 		mbox.nmsg++;
137 		mbox.ahead = m;
138 		break;
139 	}
140 	m = mbox.ahead;
141 	mbox.ahead = m->anext;
142 	m->anext = nil;
143 	vtUnlock(mbox.alock);
144 
145 	m->con = con;
146 	m->state = MsgR;
147 	m->nowq = 0;
148 
149 	return m;
150 }
151 
152 static void
153 msgMunlink(Msg* m)
154 {
155 	Con *con;
156 
157 	con = m->con;
158 
159 	if(m->mprev != nil)
160 		m->mprev->mnext = m->mnext;
161 	else
162 		con->mhead = m->mnext;
163 	if(m->mnext != nil)
164 		m->mnext->mprev = m->mprev;
165 	else
166 		con->mtail = m->mprev;
167 	m->mprev = m->mnext = nil;
168 }
169 
170 void
171 msgFlush(Msg* m)
172 {
173 	Con *con;
174 	Msg *flush, *old;
175 
176 	con = m->con;
177 
178 	if(Dflag)
179 		fprint(2, "msgFlush %F\n", &m->t);
180 
181 	/*
182 	 * If this Tflush has been flushed, nothing to do.
183 	 * Look for the message to be flushed in the
184 	 * queue of all messages still on this connection.
185 	 * If it's not found must assume Elvis has already
186 	 * left the building and reply normally.
187 	 */
188 	vtLock(con->mlock);
189 	if(m->state == MsgF){
190 		vtUnlock(con->mlock);
191 		return;
192 	}
193 	for(old = con->mhead; old != nil; old = old->mnext)
194 		if(old->t.tag == m->t.oldtag)
195 			break;
196 	if(old == nil){
197 		if(Dflag)
198 			fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag);
199 		vtUnlock(con->mlock);
200 		return;
201 	}
202 
203 	if(Dflag)
204 		fprint(2, "\tmsgFlush found %F\n", &old->t);
205 
206 	/*
207 	 * Found it.
208 	 * There are two cases where the old message can be
209 	 * truly flushed and no reply to the original message given.
210 	 * The first is when the old message is in MsgR state; no
211 	 * processing has been done yet and it is still on the read
212 	 * queue. The second is if old is a Tflush, which doesn't
213 	 * affect the server state. In both cases, put the old
214 	 * message into MsgF state and let MsgWrite toss it after
215 	 * pulling it off the queue.
216 	 */
217 	if(old->state == MsgR || old->t.type == Tflush){
218 		old->state = MsgF;
219 		if(Dflag)
220 			fprint(2, "msgFlush: change %d from MsgR to MsgF\n",
221 				m->t.oldtag);
222 	}
223 
224 	/*
225 	 * Link this flush message and the old message
226 	 * so multiple flushes can be coalesced (if there are
227 	 * multiple Tflush messages for a particular pending
228 	 * request, it is only necessary to respond to the last
229 	 * one, so any previous can be removed) and to be
230 	 * sure flushes wait for their corresponding old
231 	 * message to go out first.
232 	 * Waiting flush messages do not go on the write queue,
233 	 * they are processed after the old message is dealt
234 	 * with. There's no real need to protect the setting of
235 	 * Msg.nowq, the only code to check it runs in this
236 	 * process after this routine returns.
237 	 */
238 	if((flush = old->flush) != nil){
239 		if(Dflag)
240 			fprint(2, "msgFlush: remove %d from %d list\n",
241 				old->flush->t.tag, old->t.tag);
242 		m->flush = flush->flush;
243 		flush->flush = nil;
244 		msgMunlink(flush);
245 		msgFree(flush);
246 	}
247 	old->flush = m;
248 	m->nowq = 1;
249 
250 	if(Dflag)
251 		fprint(2, "msgFlush: add %d to %d queue\n",
252 			m->t.tag, old->t.tag);
253 	vtUnlock(con->mlock);
254 }
255 
256 static void
257 msgProc(void*)
258 {
259 	Msg *m;
260 	char *e;
261 	Con *con;
262 
263 	vtThreadSetName("msgProc");
264 
265 	for(;;){
266 		/*
267 		 * If surplus to requirements, exit.
268 		 * If not, wait for and pull a message off
269 		 * the read queue.
270 		 */
271 		vtLock(mbox.rlock);
272 		if(mbox.nproc > mbox.maxproc){
273 			mbox.nproc--;
274 			vtUnlock(mbox.rlock);
275 			break;
276 		}
277 		while(mbox.rhead == nil)
278 			vtSleep(mbox.rrendez);
279 		m = mbox.rhead;
280 		mbox.rhead = m->rwnext;
281 		m->rwnext = nil;
282 		vtUnlock(mbox.rlock);
283 
284 		con = m->con;
285 		e = nil;
286 
287 		/*
288 		 * If the message has been flushed before
289 		 * any 9P processing has started, mark it so
290 		 * none will be attempted.
291 		 */
292 		vtLock(con->mlock);
293 		if(m->state == MsgF)
294 			e = "flushed";
295 		else
296 			m->state = Msg9;
297 		vtUnlock(con->mlock);
298 
299 		if(e == nil){
300 			/*
301 			 * explain this
302 			 */
303 			vtLock(con->lock);
304 			if(m->t.type == Tversion){
305 				con->version = m;
306 				con->state = ConDown;
307 				while(con->mhead != m)
308 					vtSleep(con->rendez);
309 				assert(con->state == ConDown);
310 				if(con->version == m){
311 					con->version = nil;
312 					con->state = ConInit;
313 				}
314 				else
315 					e = "Tversion aborted";
316 			}
317 			else if(con->state != ConUp)
318 				e = "connection not ready";
319 			vtUnlock(con->lock);
320 		}
321 
322 		/*
323 		 * Dispatch if not error already.
324 		 */
325 		m->r.tag = m->t.tag;
326 		if(e == nil && !(*rFcall[m->t.type])(m))
327 			e = vtGetError();
328 		if(e != nil){
329 			m->r.type = Rerror;
330 			m->r.ename = e;
331 		}
332 		else
333 			m->r.type = m->t.type+1;
334 
335 		/*
336 		 * Put the message (with reply) on the
337 		 * write queue and wakeup the write process.
338 		 */
339 		if(!m->nowq){
340 			vtLock(con->wlock);
341 			if(con->whead == nil)
342 				con->whead = m;
343 			else
344 				con->wtail->rwnext = m;
345 			con->wtail = m;
346 			vtWakeup(con->wrendez);
347 			vtUnlock(con->wlock);
348 		}
349 	}
350 }
351 
352 static void
353 msgRead(void* v)
354 {
355 	Msg *m;
356 	Con *con;
357 	int eof, fd, n;
358 
359 	vtThreadSetName("msgRead");
360 
361 	con = v;
362 	fd = con->fd;
363 	eof = 0;
364 
365 	while(!eof){
366 		m = msgAlloc(con);
367 
368 		while((n = read9pmsg(fd, m->data, con->msize)) == 0)
369 			;
370 		if(n < 0){
371 			m->t.type = Tversion;
372 			m->t.fid = NOFID;
373 			m->t.tag = NOTAG;
374 			m->t.msize = con->msize;
375 			m->t.version = "9PEoF";
376 			eof = 1;
377 		}
378 		else if(convM2S(m->data, n, &m->t) != n){
379 			if(Dflag)
380 				fprint(2, "msgRead: convM2S error: %s\n",
381 					con->name);
382 			msgFree(m);
383 			continue;
384 		}
385 		if(Dflag)
386 			fprint(2, "msgRead %p: t %F\n", con, &m->t);
387 
388 		vtLock(con->mlock);
389 		if(con->mtail != nil){
390 			m->mprev = con->mtail;
391 			con->mtail->mnext = m;
392 		}
393 		else{
394 			con->mhead = m;
395 			m->mprev = nil;
396 		}
397 		con->mtail = m;
398 		vtUnlock(con->mlock);
399 
400 		vtLock(mbox.rlock);
401 		if(mbox.rhead == nil){
402 			mbox.rhead = m;
403 			if(!vtWakeup(mbox.rrendez)){
404 				if(mbox.nproc < mbox.maxproc){
405 					if(vtThread(msgProc, nil) > 0)
406 						mbox.nproc++;
407 				}
408 				else
409 					mbox.nprocstarve++;
410 			}
411 			/*
412 			 * don't need this surely?
413 			vtWakeup(mbox.rrendez);
414 			 */
415 		}
416 		else
417 			mbox.rtail->rwnext = m;
418 		mbox.rtail = m;
419 		vtUnlock(mbox.rlock);
420 	}
421 }
422 
423 static void
424 msgWrite(void* v)
425 {
426 	Con *con;
427 	int eof, n;
428 	Msg *flush, *m;
429 
430 	vtThreadSetName("msgWrite");
431 
432 	con = v;
433 	if(vtThread(msgRead, con) < 0){
434 		conFree(con);
435 		return;
436 	}
437 
438 	for(;;){
439 		/*
440 		 * Wait for and pull a message off the write queue.
441 		 */
442 		vtLock(con->wlock);
443 		while(con->whead == nil)
444 			vtSleep(con->wrendez);
445 		m = con->whead;
446 		con->whead = m->rwnext;
447 		m->rwnext = nil;
448 		assert(!m->nowq);
449 		vtUnlock(con->wlock);
450 
451 		eof = 0;
452 
453 		/*
454 		 * Write each message (if it hasn't been flushed)
455 		 * followed by any messages waiting for it to complete.
456 		 */
457 		vtLock(con->mlock);
458 		while(m != nil){
459 			msgMunlink(m);
460 
461 			if(Dflag)
462 				fprint(2, "msgWrite %d: r %F\n",
463 					m->state, &m->r);
464 
465 			if(m->state != MsgF){
466 				m->state = MsgW;
467 				vtUnlock(con->mlock);
468 
469 				n = convS2M(&m->r, con->data, con->msize);
470 				if(write(con->fd, con->data, n) != n)
471 					eof = 1;
472 
473 				vtLock(con->mlock);
474 			}
475 
476 			if((flush = m->flush) != nil){
477 				assert(flush->nowq);
478 				m->flush = nil;
479 			}
480 			msgFree(m);
481 			m = flush;
482 		}
483 		vtUnlock(con->mlock);
484 
485 		vtLock(con->lock);
486 		if(eof && con->fd >= 0){
487 			close(con->fd);
488 			con->fd = -1;
489 		}
490 		if(con->state == ConDown)
491 			vtWakeup(con->rendez);
492 		if(con->state == ConMoribund && con->mhead == nil){
493 			vtUnlock(con->lock);
494 			conFree(con);
495 			break;
496 		}
497 		vtUnlock(con->lock);
498 	}
499 }
500 
501 Con*
502 conAlloc(int fd, char* name)
503 {
504 	Con *con;
505 
506 	vtLock(cbox.alock);
507 	while(cbox.ahead == nil){
508 		if(cbox.ncon >= cbox.maxcon){
509 			cbox.nconstarve++;
510 			vtSleep(cbox.arendez);
511 			continue;
512 		}
513 		con = vtMemAllocZ(sizeof(Con));
514 		con->lock = vtLockAlloc();
515 		con->rendez = vtRendezAlloc(con->lock);
516 		con->data = vtMemAlloc(cbox.msize);
517 		con->msize = cbox.msize;
518 		con->alock = vtLockAlloc();
519 		con->mlock = vtLockAlloc();
520 		con->mrendez = vtRendezAlloc(con->mlock);
521 		con->wlock = vtLockAlloc();
522 		con->wrendez = vtRendezAlloc(con->wlock);
523 		con->fidlock = vtLockAlloc();
524 
525 		cbox.ncon++;
526 		cbox.ahead = con;
527 		break;
528 	}
529 	con = cbox.ahead;
530 	cbox.ahead = con->anext;
531 	con->anext = nil;
532 
533 	if(cbox.ctail != nil){
534 		con->cprev = cbox.ctail;
535 		cbox.ctail->cnext = con;
536 	}
537 	else{
538 		cbox.chead = con;
539 		con->cprev = nil;
540 	}
541 	cbox.ctail = con;
542 
543 	assert(con->mhead == nil);
544 	assert(con->whead == nil);
545 	assert(con->fhead == nil);
546 	assert(con->nfid == 0);
547 
548 	con->state = ConNew;
549 	con->fd = fd;
550 	if(con->name != nil){
551 		vtMemFree(con->name);
552 		con->name = nil;
553 	}
554 	if(name != nil)
555 		con->name = vtStrDup(name);
556 	else
557 		con->name = vtStrDup("unknown");
558 	con->aok = 0;
559 	vtUnlock(cbox.alock);
560 
561 	if(vtThread(msgWrite, con) < 0){
562 		conFree(con);
563 		return nil;
564 	}
565 
566 	return con;
567 }
568 
569 static int
570 cmdMsg(int argc, char* argv[])
571 {
572 	char *p;
573 	char *usage = "usage: msg [-m nmsg] [-p nproc]";
574 	int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve;
575 
576 	maxmsg = maxproc = 0;
577 
578 	ARGBEGIN{
579 	default:
580 		return cliError(usage);
581 	case 'm':
582 		p = ARGF();
583 		if(p == nil)
584 			return cliError(usage);
585 		maxmsg = strtol(argv[0], &p, 0);
586 		if(maxmsg <= 0 || p == argv[0] || *p != '\0')
587 			return cliError(usage);
588 		break;
589 	case 'p':
590 		p = ARGF();
591 		if(p == nil)
592 			return cliError(usage);
593 		maxproc = strtol(argv[0], &p, 0);
594 		if(maxproc <= 0 || p == argv[0] || *p != '\0')
595 			return cliError(usage);
596 		break;
597 	}ARGEND
598 	if(argc)
599 		return cliError(usage);
600 
601 	vtLock(mbox.alock);
602 	if(maxmsg)
603 		mbox.maxmsg = maxmsg;
604 	maxmsg = mbox.maxmsg;
605 	nmsg = mbox.nmsg;
606 	nmsgstarve = mbox.nmsgstarve;
607 	vtUnlock(mbox.alock);
608 
609 	vtLock(mbox.rlock);
610 	if(maxproc)
611 		mbox.maxproc = maxproc;
612 	maxproc = mbox.maxproc;
613 	nproc = mbox.nproc;
614 	nprocstarve = mbox.nprocstarve;
615 	vtUnlock(mbox.rlock);
616 
617 	consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc);
618 	consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n",
619 		nmsg, nmsgstarve, nproc, nprocstarve);
620 
621 	return 1;
622 }
623 
624 static int
625 scmp(Fid *a, Fid *b)
626 {
627 	if(a == 0)
628 		return 1;
629 	if(b == 0)
630 		return -1;
631 	return strcmp(a->uname, b->uname);
632 }
633 
634 static Fid*
635 fidMerge(Fid *a, Fid *b)
636 {
637 	Fid *s, **l;
638 
639 	l = &s;
640 	while(a || b){
641 		if(scmp(a, b) < 0){
642 			*l = a;
643 			l = &a->sort;
644 			a = a->sort;
645 		}else{
646 			*l = b;
647 			l = &b->sort;
648 			b = b->sort;
649 		}
650 	}
651 	*l = 0;
652 	return s;
653 }
654 
655 static Fid*
656 fidMergeSort(Fid *f)
657 {
658 	int delay;
659 	Fid *a, *b;
660 
661 	if(f == nil)
662 		return nil;
663 	if(f->sort == nil)
664 		return f;
665 
666 	a = b = f;
667 	delay = 1;
668 	while(a && b){
669 		if(delay)	/* easy way to handle 2-element list */
670 			delay = 0;
671 		else
672 			a = a->sort;
673 		if(b = b->sort)
674 			b = b->sort;
675 	}
676 
677 	b = a->sort;
678 	a->sort = nil;
679 
680 	a = fidMergeSort(f);
681 	b = fidMergeSort(b);
682 
683 	return fidMerge(a, b);
684 }
685 
686 static int
687 cmdWho(int argc, char* argv[])
688 {
689 	char *usage = "usage: who";
690 	int i;
691 	Con *con;
692 	Fid *fid, *last;
693 
694 	ARGBEGIN{
695 	default:
696 		return cliError(usage);
697 	}ARGEND
698 
699 	if(argc > 0)
700 		return cliError(usage);
701 
702 	vtRLock(cbox.clock);
703 	for(con=cbox.chead; con; con=con->cnext){
704 		consPrint("\t%q:", con->name);
705 		vtLock(con->fidlock);
706 		last = nil;
707 		for(i=0; i<NFidHash; i++)
708 			for(fid=con->fidhash[i]; fid; fid=fid->hash)
709 				if(fid->fidno != NOFID && fid->uname){
710 					fid->sort = last;
711 					last = fid;
712 				}
713 		fid = fidMergeSort(last);
714 		last = nil;
715 		for(; fid; last=fid, fid=fid->sort)
716 			if(last==nil || strcmp(fid->uname, last->uname) != 0)
717 				consPrint(" %q", fid->uname);
718 		vtUnlock(con->fidlock);
719 		consPrint("\n");
720 	}
721 	vtRUnlock(cbox.clock);
722 	return 1;
723 }
724 
725 void
726 msgInit(void)
727 {
728 	mbox.alock = vtLockAlloc();
729 	mbox.arendez = vtRendezAlloc(mbox.alock);
730 
731 	mbox.rlock = vtLockAlloc();
732 	mbox.rrendez = vtRendezAlloc(mbox.rlock);
733 
734 	mbox.maxmsg = NMsgInit;
735 	mbox.maxproc = NMsgProcInit;
736 	mbox.msize = NMsizeInit;
737 
738 	cliAddCmd("msg", cmdMsg);
739 }
740 
741 static int
742 cmdCon(int argc, char* argv[])
743 {
744 	char *p;
745 	Con *con;
746 	char *usage = "usage: con [-m ncon]";
747 	int maxcon, ncon, nconstarve;
748 
749 	maxcon = 0;
750 
751 	ARGBEGIN{
752 	default:
753 		return cliError(usage);
754 	case 'm':
755 		p = ARGF();
756 		if(p == nil)
757 			return cliError(usage);
758 		maxcon = strtol(argv[0], &p, 0);
759 		if(maxcon <= 0 || p == argv[0] || *p != '\0')
760 			return cliError(usage);
761 		break;
762 	}ARGEND
763 	if(argc)
764 		return cliError(usage);
765 
766 	vtLock(cbox.clock);
767 	if(maxcon)
768 		cbox.maxcon = maxcon;
769 	maxcon = cbox.maxcon;
770 	ncon = cbox.ncon;
771 	nconstarve = cbox.nconstarve;
772 	vtUnlock(cbox.clock);
773 
774 	consPrint("\tcon -m %d\n", maxcon);
775 	consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve);
776 
777 	vtRLock(cbox.clock);
778 	for(con = cbox.chead; con != nil; con = con->cnext){
779 		consPrint("\t%s\n", con->name);
780 	}
781 	vtRUnlock(cbox.clock);
782 
783 	return 1;
784 }
785 
786 void
787 conInit(void)
788 {
789 	cbox.alock = vtLockAlloc();
790 	cbox.arendez = vtRendezAlloc(cbox.alock);
791 
792 	cbox.clock = vtLockAlloc();
793 
794 	cbox.maxcon = NConInit;
795 	cbox.msize = NMsizeInit;
796 
797 	cliAddCmd("con", cmdCon);
798 	cliAddCmd("who", cmdWho);
799 }
800