xref: /plan9-contrib/sys/src/libsunrpc/udp.c (revision ec59a3ddbfceee0efe34584c2c9981a5e5ff1ec4)
1 #include <u.h>
2 #include <libc.h>
3 #include <ip.h>
4 #include <thread.h>
5 #include <sunrpc.h>
6 
7 typedef struct SunMsgUdp SunMsgUdp;
8 struct SunMsgUdp
9 {
10 	SunMsg msg;
11 	Udphdr udp;
12 };
13 
14 typedef struct Arg Arg;
15 struct Arg
16 {
17 	SunSrv *srv;
18 	Channel *creply;
19 	Channel *csync;
20 	int fd;
21 };
22 
23 enum
24 {
25 	UdpMaxRead = 65536+Udphdrsize
26 };
27 static void
28 sunUdpRead(void *v)
29 {
30 	int n;
31 	uchar *buf;
32 	Arg arg = *(Arg*)v;
33 	SunMsgUdp *msg;
34 
35 	sendp(arg.csync, 0);
36 
37 	buf = emalloc(UdpMaxRead);
38 	while((n = read(arg.fd, buf, UdpMaxRead)) > 0){
39 		if(arg.srv->chatty)
40 			fprint(2, "udp got %d (%d)\n", n, Udphdrsize);
41 		msg = emalloc(sizeof(SunMsgUdp));
42 		memmove(&msg->udp, buf, Udphdrsize);
43 		msg->msg.data = emalloc(n);
44 		msg->msg.count = n-Udphdrsize;
45 		memmove(msg->msg.data, buf+Udphdrsize, n-Udphdrsize);
46 		memmove(&msg->udp, buf, Udphdrsize);
47 		msg->msg.creply = arg.creply;
48 		if(arg.srv->chatty)
49 			fprint(2, "message %p count %d\n", msg, msg->msg.count);
50 		sendp(arg.srv->crequest, msg);
51 	}
52 }
53 
54 static void
55 sunUdpWrite(void *v)
56 {
57 	uchar *buf;
58 	Arg arg = *(Arg*)v;
59 	SunMsgUdp *msg;
60 
61 	sendp(arg.csync, 0);
62 
63 	buf = emalloc(UdpMaxRead);
64 	while((msg = recvp(arg.creply)) != nil){
65 		memmove(buf+Udphdrsize, msg->msg.data, msg->msg.count);
66 		memmove(buf, &msg->udp, Udphdrsize);
67 		msg->msg.count += Udphdrsize;
68 		if(write(arg.fd, buf, msg->msg.count) != msg->msg.count)
69 			fprint(2, "udpWrite: %r\n");
70 		free(msg->msg.data);
71 		free(msg);
72 	}
73 }
74 
75 int
76 sunSrvUdp(SunSrv *srv, char *address)
77 {
78 	int acfd, fd;
79 	char adir[40], data[60];
80 	Arg *arg;
81 
82 	acfd = announce(address, adir);
83 	if(acfd < 0)
84 		return -1;
85 	if(write(acfd, "headers", 7) < 0){
86 		werrstr("setting headers: %r");
87 		close(acfd);
88 		return -1;
89 	}
90 	write(acfd, "oldheaders", 10);
91 	snprint(data, sizeof data, "%s/data", adir);
92 	if((fd = open(data, ORDWR)) < 0){
93 		werrstr("open %s: %r", data);
94 		close(acfd);
95 		return -1;
96 	}
97 	close(acfd);
98 
99 	arg = emalloc(sizeof(Arg));
100 	arg->fd = fd;
101 	arg->srv = srv;
102 	arg->creply = chancreate(sizeof(SunMsg*), 10);
103 	arg->csync = chancreate(sizeof(void*), 10);
104 
105 	proccreate(sunUdpRead, arg, SunStackSize);
106 	proccreate(sunUdpWrite, arg, SunStackSize);
107 	recvp(arg->csync);
108 	recvp(arg->csync);
109 	chanfree(arg->csync);
110 	free(arg);
111 
112 	return 0;
113 }
114