xref: /plan9/sys/src/libsunrpc/udp.c (revision f27a9a5a0b699d2f44893d9491ecc2336a1fbc19)
1 #include <u.h>
2 #include <libc.h>
3 #include <ip.h>
4 #include <thread.h>
5 #include <sunrpc.h>
6 
7 typedef struct SunMsgUdp SunMsgUdp;
8 struct SunMsgUdp
9 {
10 	SunMsg	msg;
11 	Udphdr	udp;
12 };
13 
14 typedef struct Arg Arg;
15 struct Arg
16 {
17 	SunSrv	*srv;
18 	Channel	*creply;
19 	Channel	*csync;
20 	int 	fd;
21 };
22 
23 enum
24 {
25 	UdpMaxRead = 65536+Udphdrsize
26 };
27 
28 static void
sunUdpRead(void * v)29 sunUdpRead(void *v)
30 {
31 	int n;
32 	uchar *buf;
33 	Arg arg = *(Arg*)v;
34 	SunMsgUdp *msg;
35 
36 	sendp(arg.csync, 0);
37 
38 	buf = emalloc(UdpMaxRead);
39 	while((n = read(arg.fd, buf, UdpMaxRead)) > 0){
40 		if(arg.srv->chatty)
41 			fprint(2, "udp got %d (%d)\n", n, Udphdrsize);
42 		msg = emalloc(sizeof(SunMsgUdp));
43 		memmove(&msg->udp, buf, Udphdrsize);
44 		msg->msg.data = emalloc(n);
45 		msg->msg.count = n-Udphdrsize;
46 		memmove(msg->msg.data, buf+Udphdrsize, n-Udphdrsize);
47 		memmove(&msg->udp, buf, Udphdrsize);
48 		msg->msg.creply = arg.creply;
49 		if(arg.srv->chatty)
50 			fprint(2, "message %p count %d\n", msg, msg->msg.count);
51 		sendp(arg.srv->crequest, msg);
52 	}
53 }
54 
55 static void
sunUdpWrite(void * v)56 sunUdpWrite(void *v)
57 {
58 	uchar *buf;
59 	Arg arg = *(Arg*)v;
60 	SunMsgUdp *msg;
61 
62 	sendp(arg.csync, 0);
63 
64 	buf = emalloc(UdpMaxRead);
65 	while((msg = recvp(arg.creply)) != nil){
66 		memmove(buf+Udphdrsize, msg->msg.data, msg->msg.count);
67 		memmove(buf, &msg->udp, Udphdrsize);
68 		msg->msg.count += Udphdrsize;
69 		if(write(arg.fd, buf, msg->msg.count) != msg->msg.count)
70 			fprint(2, "udpWrite: %r\n");
71 		free(msg->msg.data);
72 		free(msg);
73 	}
74 }
75 
76 int
sunSrvUdp(SunSrv * srv,char * address)77 sunSrvUdp(SunSrv *srv, char *address)
78 {
79 	int acfd, fd;
80 	char adir[40], data[60];
81 	Arg *arg;
82 
83 	acfd = announce(address, adir);
84 	if(acfd < 0)
85 		return -1;
86 	if(write(acfd, "headers", 7) < 0){
87 		werrstr("setting headers: %r");
88 		close(acfd);
89 		return -1;
90 	}
91 	snprint(data, sizeof data, "%s/data", adir);
92 	if((fd = open(data, ORDWR)) < 0){
93 		werrstr("open %s: %r", data);
94 		close(acfd);
95 		return -1;
96 	}
97 	close(acfd);
98 
99 	arg = emalloc(sizeof(Arg));
100 	arg->fd = fd;
101 	arg->srv = srv;
102 	arg->creply = chancreate(sizeof(SunMsg*), 10);
103 	arg->csync =  chancreate(sizeof(void*), 10);
104 
105 	proccreate(sunUdpRead, arg, SunStackSize);
106 	proccreate(sunUdpWrite, arg, SunStackSize);
107 	recvp(arg->csync);
108 	recvp(arg->csync);
109 	chanfree(arg->csync);
110 	free(arg);
111 
112 	return 0;
113 }
114