1 #include "stdinc.h"
2 #include "dat.h"
3 #include "fns.h"
4 #include "error.h"
5
6 static void diskThread(void *a);
7
8 enum {
9 /*
10 * disable measurement since it gets alignment faults on BG
11 * and the guts used to be commented out.
12 */
13 Timing = 0, /* flag */
14 QueueSize = 100, /* maximum block to queue */
15 };
16
17 struct Disk {
18 VtLock *lk;
19 int ref;
20
21 int fd;
22 Header h;
23
24 VtRendez *flow;
25 VtRendez *starve;
26 VtRendez *flush;
27 VtRendez *die;
28
29 int nqueue;
30
31 Block *cur; /* block to do on current scan */
32 Block *next; /* blocks to do next scan */
33 };
34
35 /* keep in sync with Part* enum in dat.h */
36 static char *partname[] = {
37 [PartError] "error",
38 [PartSuper] "super",
39 [PartLabel] "label",
40 [PartData] "data",
41 [PartVenti] "venti",
42 };
43
44 Disk *
diskAlloc(int fd)45 diskAlloc(int fd)
46 {
47 u8int buf[HeaderSize];
48 Header h;
49 Disk *disk;
50
51 if(pread(fd, buf, HeaderSize, HeaderOffset) < HeaderSize){
52 vtSetError("short read: %r");
53 vtOSError();
54 return nil;
55 }
56
57 if(!headerUnpack(&h, buf)){
58 vtSetError("bad disk header");
59 return nil;
60 }
61 disk = vtMemAllocZ(sizeof(Disk));
62 disk->lk = vtLockAlloc();
63 disk->starve = vtRendezAlloc(disk->lk);
64 disk->flow = vtRendezAlloc(disk->lk);
65 disk->flush = vtRendezAlloc(disk->lk);
66 disk->fd = fd;
67 disk->h = h;
68
69 disk->ref = 2;
70 vtThread(diskThread, disk);
71
72 return disk;
73 }
74
75 void
diskFree(Disk * disk)76 diskFree(Disk *disk)
77 {
78 diskFlush(disk);
79
80 /* kill slave */
81 vtLock(disk->lk);
82 disk->die = vtRendezAlloc(disk->lk);
83 vtWakeup(disk->starve);
84 while(disk->ref > 1)
85 vtSleep(disk->die);
86 vtUnlock(disk->lk);
87 vtRendezFree(disk->flow);
88 vtRendezFree(disk->starve);
89 vtRendezFree(disk->die);
90 vtLockFree(disk->lk);
91 close(disk->fd);
92 vtMemFree(disk);
93 }
94
95 static u32int
partStart(Disk * disk,int part)96 partStart(Disk *disk, int part)
97 {
98 switch(part){
99 default:
100 assert(0);
101 case PartSuper:
102 return disk->h.super;
103 case PartLabel:
104 return disk->h.label;
105 case PartData:
106 return disk->h.data;
107 }
108 }
109
110
111 static u32int
partEnd(Disk * disk,int part)112 partEnd(Disk *disk, int part)
113 {
114 switch(part){
115 default:
116 assert(0);
117 case PartSuper:
118 return disk->h.super+1;
119 case PartLabel:
120 return disk->h.data;
121 case PartData:
122 return disk->h.end;
123 }
124 }
125
126 int
diskReadRaw(Disk * disk,int part,u32int addr,uchar * buf)127 diskReadRaw(Disk *disk, int part, u32int addr, uchar *buf)
128 {
129 ulong start, end;
130 u64int offset;
131 int n, nn;
132
133 start = partStart(disk, part);
134 end = partEnd(disk, part);
135
136 if(addr >= end-start){
137 vtSetError(EBadAddr);
138 return 0;
139 }
140
141 offset = ((u64int)(addr + start))*disk->h.blockSize;
142 n = disk->h.blockSize;
143 while(n > 0){
144 nn = pread(disk->fd, buf, n, offset);
145 if(nn < 0){
146 vtOSError();
147 return 0;
148 }
149 if(nn == 0){
150 vtSetError("eof reading disk");
151 return 0;
152 }
153 n -= nn;
154 offset += nn;
155 buf += nn;
156 }
157 return 1;
158 }
159
160 int
diskWriteRaw(Disk * disk,int part,u32int addr,uchar * buf)161 diskWriteRaw(Disk *disk, int part, u32int addr, uchar *buf)
162 {
163 ulong start, end;
164 u64int offset;
165 int n;
166
167 start = partStart(disk, part);
168 end = partEnd(disk, part);
169
170 if(addr >= end - start){
171 vtSetError(EBadAddr);
172 return 0;
173 }
174
175 offset = ((u64int)(addr + start))*disk->h.blockSize;
176 n = pwrite(disk->fd, buf, disk->h.blockSize, offset);
177 if(n < 0){
178 vtOSError();
179 return 0;
180 }
181 if(n < disk->h.blockSize) {
182 vtSetError("short write");
183 return 0;
184 }
185
186 return 1;
187 }
188
189 static void
diskQueue(Disk * disk,Block * b)190 diskQueue(Disk *disk, Block *b)
191 {
192 Block **bp, *bb;
193
194 vtLock(disk->lk);
195 while(disk->nqueue >= QueueSize)
196 vtSleep(disk->flow);
197 if(disk->cur == nil || b->addr > disk->cur->addr)
198 bp = &disk->cur;
199 else
200 bp = &disk->next;
201
202 for(bb=*bp; bb; bb=*bp){
203 if(b->addr < bb->addr)
204 break;
205 bp = &bb->ionext;
206 }
207 b->ionext = bb;
208 *bp = b;
209 if(disk->nqueue == 0)
210 vtWakeup(disk->starve);
211 disk->nqueue++;
212 vtUnlock(disk->lk);
213 }
214
215
216 void
diskRead(Disk * disk,Block * b)217 diskRead(Disk *disk, Block *b)
218 {
219 assert(b->iostate == BioEmpty || b->iostate == BioLabel);
220 blockSetIOState(b, BioReading);
221 diskQueue(disk, b);
222 }
223
224 void
diskWrite(Disk * disk,Block * b)225 diskWrite(Disk *disk, Block *b)
226 {
227 assert(b->nlock == 1);
228 assert(b->iostate == BioDirty);
229 blockSetIOState(b, BioWriting);
230 diskQueue(disk, b);
231 }
232
233 void
diskWriteAndWait(Disk * disk,Block * b)234 diskWriteAndWait(Disk *disk, Block *b)
235 {
236 int nlock;
237
238 /*
239 * If b->nlock > 1, the block is aliased within
240 * a single thread. That thread is us.
241 * DiskWrite does some funny stuff with VtLock
242 * and blockPut that basically assumes b->nlock==1.
243 * We humor diskWrite by temporarily setting
244 * nlock to 1. This needs to be revisited.
245 */
246 nlock = b->nlock;
247 if(nlock > 1)
248 b->nlock = 1;
249 diskWrite(disk, b);
250 while(b->iostate != BioClean)
251 vtSleep(b->ioready);
252 b->nlock = nlock;
253 }
254
255 int
diskBlockSize(Disk * disk)256 diskBlockSize(Disk *disk)
257 {
258 return disk->h.blockSize; /* immuttable */
259 }
260
261 int
diskFlush(Disk * disk)262 diskFlush(Disk *disk)
263 {
264 Dir dir;
265
266 vtLock(disk->lk);
267 while(disk->nqueue > 0)
268 vtSleep(disk->flush);
269 vtUnlock(disk->lk);
270
271 /* there really should be a cleaner interface to flush an fd */
272 nulldir(&dir);
273 if(dirfwstat(disk->fd, &dir) < 0){
274 vtOSError();
275 return 0;
276 }
277 return 1;
278 }
279
280 u32int
diskSize(Disk * disk,int part)281 diskSize(Disk *disk, int part)
282 {
283 return partEnd(disk, part) - partStart(disk, part);
284 }
285
286 static uintptr
mypc(int x)287 mypc(int x)
288 {
289 return getcallerpc(&x);
290 }
291
292 static char *
disk2file(Disk * disk)293 disk2file(Disk *disk)
294 {
295 static char buf[256];
296
297 if (fd2path(disk->fd, buf, sizeof buf) < 0)
298 strncpy(buf, "GOK", sizeof buf);
299 return buf;
300 }
301
302 static void
diskThread(void * a)303 diskThread(void *a)
304 {
305 Disk *disk = a;
306 Block *b;
307 uchar *buf, *p;
308 double t;
309 int nio;
310
311 vtThreadSetName("disk");
312
313 //fprint(2, "diskThread %d\n", getpid());
314
315 buf = vtMemAlloc(disk->h.blockSize);
316
317 vtLock(disk->lk);
318 if (Timing) {
319 nio = 0;
320 t = -nsec();
321 }
322 for(;;){
323 while(disk->nqueue == 0){
324 if (Timing) {
325 t += nsec();
326 if(nio >= 10000){
327 fprint(2, "disk: io=%d at %.3fms\n",
328 nio, t*1e-6/nio);
329 nio = 0;
330 t = 0;
331 }
332 }
333 if(disk->die != nil)
334 goto Done;
335 vtSleep(disk->starve);
336 if (Timing)
337 t -= nsec();
338 }
339 assert(disk->cur != nil || disk->next != nil);
340
341 if(disk->cur == nil){
342 disk->cur = disk->next;
343 disk->next = nil;
344 }
345 b = disk->cur;
346 disk->cur = b->ionext;
347 vtUnlock(disk->lk);
348
349 /*
350 * no one should hold onto blocking in the
351 * reading or writing state, so this lock should
352 * not cause deadlock.
353 */
354 if(0)fprint(2, "fossil: diskThread: %d:%d %x\n", getpid(), b->part, b->addr);
355 bwatchLock(b);
356 vtLock(b->lk);
357 b->pc = mypc(0);
358 assert(b->nlock == 1);
359 switch(b->iostate){
360 default:
361 abort();
362 case BioReading:
363 if(!diskReadRaw(disk, b->part, b->addr, b->data)){
364 fprint(2, "fossil: diskReadRaw failed: %s: "
365 "score %V: part=%s block %ud: %r\n",
366 disk2file(disk), b->score,
367 partname[b->part], b->addr);
368 blockSetIOState(b, BioReadError);
369 }else
370 blockSetIOState(b, BioClean);
371 break;
372 case BioWriting:
373 p = blockRollback(b, buf);
374 /* NB: ctime result ends with a newline */
375 if(!diskWriteRaw(disk, b->part, b->addr, p)){
376 fprint(2, "fossil: diskWriteRaw failed: %s: "
377 "score %V: date %s part=%s block %ud: %r\n",
378 disk2file(disk), b->score,
379 ctime(time(0)),
380 partname[b->part], b->addr);
381 break;
382 }
383 if(p != buf)
384 blockSetIOState(b, BioClean);
385 else
386 blockSetIOState(b, BioDirty);
387 break;
388 }
389
390 blockPut(b); /* remove extra reference, unlock */
391 vtLock(disk->lk);
392 disk->nqueue--;
393 if(disk->nqueue == QueueSize-1)
394 vtWakeup(disk->flow);
395 if(disk->nqueue == 0)
396 vtWakeup(disk->flush);
397 if(Timing)
398 nio++;
399 }
400 Done:
401 //fprint(2, "diskThread done\n");
402 disk->ref--;
403 vtWakeup(disk->die);
404 vtUnlock(disk->lk);
405 vtMemFree(buf);
406 }
407