1 #include "stdinc.h" 2 #include "dat.h" 3 #include "fns.h" 4 #include "error.h" 5 6 static void diskThread(void *a); 7 8 enum { 9 QueueSize = 100, /* maximum block to queue */ 10 }; 11 12 struct Disk { 13 VtLock *lk; 14 int ref; 15 16 int fd; 17 Header h; 18 19 VtRendez *flow; 20 VtRendez *starve; 21 VtRendez *flush; 22 VtRendez *die; 23 24 int nqueue; 25 26 Block *cur; /* block to do on current scan */ 27 Block *next; /* blocks to do next scan */ 28 }; 29 30 /* keep in sync with Part* enum in dat.h */ 31 static char *partname[] = { 32 [PartError] "error", 33 [PartSuper] "super", 34 [PartLabel] "label", 35 [PartData] "data", 36 [PartVenti] "venti", 37 }; 38 39 Disk * 40 diskAlloc(int fd) 41 { 42 u8int buf[HeaderSize]; 43 Header h; 44 Disk *disk; 45 46 if(pread(fd, buf, HeaderSize, HeaderOffset) < HeaderSize){ 47 vtSetError("short read: %r"); 48 vtOSError(); 49 return nil; 50 } 51 52 if(!headerUnpack(&h, buf)){ 53 vtSetError("bad disk header"); 54 return nil; 55 } 56 disk = vtMemAllocZ(sizeof(Disk)); 57 disk->lk = vtLockAlloc(); 58 disk->starve = vtRendezAlloc(disk->lk); 59 disk->flow = vtRendezAlloc(disk->lk); 60 disk->flush = vtRendezAlloc(disk->lk); 61 disk->fd = fd; 62 disk->h = h; 63 64 disk->ref = 2; 65 vtThread(diskThread, disk); 66 67 return disk; 68 } 69 70 void 71 diskFree(Disk *disk) 72 { 73 diskFlush(disk); 74 75 /* kill slave */ 76 vtLock(disk->lk); 77 disk->die = vtRendezAlloc(disk->lk); 78 vtWakeup(disk->starve); 79 while(disk->ref > 1) 80 vtSleep(disk->die); 81 vtUnlock(disk->lk); 82 vtRendezFree(disk->flow); 83 vtRendezFree(disk->starve); 84 vtRendezFree(disk->die); 85 vtLockFree(disk->lk); 86 close(disk->fd); 87 vtMemFree(disk); 88 } 89 90 static u32int 91 partStart(Disk *disk, int part) 92 { 93 switch(part){ 94 default: 95 assert(0); 96 case PartSuper: 97 return disk->h.super; 98 case PartLabel: 99 return disk->h.label; 100 case PartData: 101 return disk->h.data; 102 } 103 } 104 105 106 static u32int 107 partEnd(Disk *disk, int part) 108 { 109 switch(part){ 110 default: 111 assert(0); 112 case PartSuper: 113 return disk->h.super+1; 114 case PartLabel: 115 return disk->h.data; 116 case PartData: 117 return disk->h.end; 118 } 119 } 120 121 int 122 diskReadRaw(Disk *disk, int part, u32int addr, uchar *buf) 123 { 124 ulong start, end; 125 u64int offset; 126 int n, nn; 127 128 start = partStart(disk, part); 129 end = partEnd(disk, part); 130 131 if(addr >= end-start){ 132 vtSetError(EBadAddr); 133 return 0; 134 } 135 136 offset = ((u64int)(addr + start))*disk->h.blockSize; 137 n = disk->h.blockSize; 138 while(n > 0){ 139 nn = pread(disk->fd, buf, n, offset); 140 if(nn < 0){ 141 vtOSError(); 142 return 0; 143 } 144 if(nn == 0){ 145 vtSetError("eof reading disk"); 146 return 0; 147 } 148 n -= nn; 149 offset += nn; 150 buf += nn; 151 } 152 return 1; 153 } 154 155 int 156 diskWriteRaw(Disk *disk, int part, u32int addr, uchar *buf) 157 { 158 ulong start, end; 159 u64int offset; 160 int n; 161 162 start = partStart(disk, part); 163 end = partEnd(disk, part); 164 165 if(addr >= end - start){ 166 vtSetError(EBadAddr); 167 return 0; 168 } 169 170 offset = ((u64int)(addr + start))*disk->h.blockSize; 171 n = pwrite(disk->fd, buf, disk->h.blockSize, offset); 172 if(n < 0){ 173 vtOSError(); 174 return 0; 175 } 176 if(n < disk->h.blockSize) { 177 vtSetError("short write"); 178 return 0; 179 } 180 181 return 1; 182 } 183 184 static void 185 diskQueue(Disk *disk, Block *b) 186 { 187 Block **bp, *bb; 188 189 vtLock(disk->lk); 190 while(disk->nqueue >= QueueSize) 191 vtSleep(disk->flow); 192 if(disk->cur == nil || b->addr > disk->cur->addr) 193 bp = &disk->cur; 194 else 195 bp = &disk->next; 196 197 for(bb=*bp; bb; bb=*bp){ 198 if(b->addr < bb->addr) 199 break; 200 bp = &bb->ionext; 201 } 202 b->ionext = bb; 203 *bp = b; 204 if(disk->nqueue == 0) 205 vtWakeup(disk->starve); 206 disk->nqueue++; 207 vtUnlock(disk->lk); 208 } 209 210 211 void 212 diskRead(Disk *disk, Block *b) 213 { 214 assert(b->iostate == BioEmpty || b->iostate == BioLabel); 215 blockSetIOState(b, BioReading); 216 diskQueue(disk, b); 217 } 218 219 void 220 diskWrite(Disk *disk, Block *b) 221 { 222 assert(b->nlock == 1); 223 assert(b->iostate == BioDirty); 224 blockSetIOState(b, BioWriting); 225 diskQueue(disk, b); 226 } 227 228 void 229 diskWriteAndWait(Disk *disk, Block *b) 230 { 231 int nlock; 232 233 /* 234 * If b->nlock > 1, the block is aliased within 235 * a single thread. That thread is us. 236 * DiskWrite does some funny stuff with VtLock 237 * and blockPut that basically assumes b->nlock==1. 238 * We humor diskWrite by temporarily setting 239 * nlock to 1. This needs to be revisited. 240 */ 241 nlock = b->nlock; 242 if(nlock > 1) 243 b->nlock = 1; 244 diskWrite(disk, b); 245 while(b->iostate != BioClean) 246 vtSleep(b->ioready); 247 b->nlock = nlock; 248 } 249 250 int 251 diskBlockSize(Disk *disk) 252 { 253 return disk->h.blockSize; /* immuttable */ 254 } 255 256 int 257 diskFlush(Disk *disk) 258 { 259 Dir dir; 260 261 vtLock(disk->lk); 262 while(disk->nqueue > 0) 263 vtSleep(disk->flush); 264 vtUnlock(disk->lk); 265 266 /* there really should be a cleaner interface to flush an fd */ 267 nulldir(&dir); 268 if(dirfwstat(disk->fd, &dir) < 0){ 269 vtOSError(); 270 return 0; 271 } 272 return 1; 273 } 274 275 u32int 276 diskSize(Disk *disk, int part) 277 { 278 return partEnd(disk, part) - partStart(disk, part); 279 } 280 281 static uintptr 282 mypc(int x) 283 { 284 return getcallerpc(&x); 285 } 286 287 static char * 288 disk2file(Disk *disk) 289 { 290 static char buf[256]; 291 292 if (fd2path(disk->fd, buf, sizeof buf) < 0) 293 strncpy(buf, "GOK", sizeof buf); 294 return buf; 295 } 296 297 static void 298 diskThread(void *a) 299 { 300 Disk *disk = a; 301 Block *b; 302 uchar *buf, *p; 303 double t; 304 int nio; 305 306 vtThreadSetName("disk"); 307 308 //fprint(2, "diskThread %d\n", getpid()); 309 310 buf = vtMemAlloc(disk->h.blockSize); 311 312 vtLock(disk->lk); 313 nio = 0; 314 t = -nsec(); 315 for(;;){ 316 while(disk->nqueue == 0){ 317 t += nsec(); 318 //if(nio >= 10000){ 319 //fprint(2, "disk: io=%d at %.3fms\n", nio, t*1e-6/nio); 320 //nio = 0; 321 //t = 0; 322 //} 323 if(disk->die != nil) 324 goto Done; 325 vtSleep(disk->starve); 326 t -= nsec(); 327 } 328 assert(disk->cur != nil || disk->next != nil); 329 330 if(disk->cur == nil){ 331 disk->cur = disk->next; 332 disk->next = nil; 333 } 334 b = disk->cur; 335 disk->cur = b->ionext; 336 vtUnlock(disk->lk); 337 338 /* 339 * no one should hold onto blocking in the 340 * reading or writing state, so this lock should 341 * not cause deadlock. 342 */ 343 if(0)fprint(2, "fossil: diskThread: %d:%d %x\n", getpid(), b->part, b->addr); 344 bwatchLock(b); 345 vtLock(b->lk); 346 b->pc = mypc(0); 347 assert(b->nlock == 1); 348 switch(b->iostate){ 349 default: 350 abort(); 351 case BioReading: 352 if(!diskReadRaw(disk, b->part, b->addr, b->data)){ 353 fprint(2, "fossil: diskReadRaw failed: %s: " 354 "score %V: part=%s block %ud: %r\n", 355 disk2file(disk), b->score, 356 partname[b->part], b->addr); 357 blockSetIOState(b, BioReadError); 358 }else 359 blockSetIOState(b, BioClean); 360 break; 361 case BioWriting: 362 p = blockRollback(b, buf); 363 /* NB: ctime result ends with a newline */ 364 if(!diskWriteRaw(disk, b->part, b->addr, p)){ 365 fprint(2, "fossil: diskWriteRaw failed: %s: " 366 "score %V: date %s part=%s block %ud: %r\n", 367 disk2file(disk), b->score, 368 ctime(time(0)), 369 partname[b->part], b->addr); 370 break; 371 } 372 if(p != buf) 373 blockSetIOState(b, BioClean); 374 else 375 blockSetIOState(b, BioDirty); 376 break; 377 } 378 379 blockPut(b); /* remove extra reference, unlock */ 380 vtLock(disk->lk); 381 disk->nqueue--; 382 if(disk->nqueue == QueueSize-1) 383 vtWakeup(disk->flow); 384 if(disk->nqueue == 0) 385 vtWakeup(disk->flush); 386 nio++; 387 } 388 Done: 389 //fprint(2, "diskThread done\n"); 390 disk->ref--; 391 vtWakeup(disk->die); 392 vtUnlock(disk->lk); 393 vtMemFree(buf); 394 } 395