1 #include "stdinc.h" 2 #include "dat.h" 3 #include "fns.h" 4 #include "error.h" 5 6 static void diskThread(void *a); 7 8 enum { 9 QueueSize = 100, /* maximum block to queue */ 10 }; 11 12 struct Disk { 13 VtLock *lk; 14 int ref; 15 16 int fd; 17 Header h; 18 19 VtRendez *flow; 20 VtRendez *starve; 21 VtRendez *flush; 22 VtRendez *die; 23 24 int nqueue; 25 26 Block *cur; /* block to do on current scan */ 27 Block *next; /* blocks to do next scan */ 28 }; 29 30 31 Disk * 32 diskAlloc(int fd) 33 { 34 u8int buf[HeaderSize]; 35 Header h; 36 Disk *disk; 37 38 if(pread(fd, buf, HeaderSize, HeaderOffset) < HeaderSize){ 39 vtSetError("short read: %r"); 40 vtOSError(); 41 return nil; 42 } 43 44 if(!headerUnpack(&h, buf)){ 45 vtSetError("bad disk header"); 46 return nil; 47 } 48 disk = vtMemAllocZ(sizeof(Disk)); 49 disk->lk = vtLockAlloc(); 50 disk->starve = vtRendezAlloc(disk->lk); 51 disk->flow = vtRendezAlloc(disk->lk); 52 disk->flush = vtRendezAlloc(disk->lk); 53 disk->fd = fd; 54 disk->h = h; 55 56 disk->ref = 2; 57 vtThread(diskThread, disk); 58 59 return disk; 60 } 61 62 void 63 diskFree(Disk *disk) 64 { 65 diskFlush(disk); 66 67 /* kill slave */ 68 vtLock(disk->lk); 69 disk->die = vtRendezAlloc(disk->lk); 70 vtWakeup(disk->starve); 71 while(disk->ref > 1) 72 vtSleep(disk->die); 73 vtUnlock(disk->lk); 74 vtRendezFree(disk->flow); 75 vtRendezFree(disk->starve); 76 vtRendezFree(disk->die); 77 vtLockFree(disk->lk); 78 close(disk->fd); 79 vtMemFree(disk); 80 } 81 82 static u32int 83 partStart(Disk *disk, int part) 84 { 85 switch(part){ 86 default: 87 assert(0); 88 case PartSuper: 89 return disk->h.super; 90 case PartLabel: 91 return disk->h.label; 92 case PartData: 93 return disk->h.data; 94 } 95 } 96 97 98 static u32int 99 partEnd(Disk *disk, int part) 100 { 101 switch(part){ 102 default: 103 assert(0); 104 case PartSuper: 105 return disk->h.super+1; 106 case PartLabel: 107 return disk->h.data; 108 case PartData: 109 return disk->h.end; 110 } 111 } 112 113 int 114 diskReadRaw(Disk *disk, int part, u32int addr, uchar *buf) 115 { 116 ulong start, end; 117 u64int offset; 118 int n, nn; 119 120 start = partStart(disk, part); 121 end = partEnd(disk, part); 122 123 if(addr >= end-start){ 124 vtSetError(EBadAddr); 125 return 0; 126 } 127 128 offset = ((u64int)(addr + start))*disk->h.blockSize; 129 n = disk->h.blockSize; 130 while(n > 0){ 131 nn = pread(disk->fd, buf, n, offset); 132 if(nn < 0){ 133 vtOSError(); 134 return 0; 135 } 136 if(nn == 0){ 137 vtSetError(EIO); 138 return 0; 139 } 140 n -= nn; 141 offset += nn; 142 buf += nn; 143 } 144 return 1; 145 } 146 147 int 148 diskWriteRaw(Disk *disk, int part, u32int addr, uchar *buf) 149 { 150 ulong start, end; 151 u64int offset; 152 int n; 153 154 start = partStart(disk, part); 155 end = partEnd(disk, part); 156 157 if(addr >= end-start){ 158 vtSetError(EBadAddr); 159 return 0; 160 } 161 162 offset = ((u64int)(addr + start))*disk->h.blockSize; 163 n = pwrite(disk->fd, buf, disk->h.blockSize, offset); 164 if(n < 0){ 165 vtOSError(); 166 return 0; 167 } 168 if(n < disk->h.blockSize) { 169 vtSetError("short write"); 170 return 0; 171 } 172 173 return 1; 174 } 175 176 static void 177 diskQueue(Disk *disk, Block *b) 178 { 179 Block **bp, *bb; 180 181 vtLock(disk->lk); 182 while(disk->nqueue >= QueueSize) 183 vtSleep(disk->flow); 184 if(disk->cur == nil || b->addr > disk->cur->addr) 185 bp = &disk->cur; 186 else 187 bp = &disk->next; 188 189 for(bb=*bp; bb; bb=*bp){ 190 if(b->addr < bb->addr) 191 break; 192 bp = &bb->ionext; 193 } 194 b->ionext = bb; 195 *bp = b; 196 if(disk->nqueue == 0) 197 vtWakeup(disk->starve); 198 disk->nqueue++; 199 vtUnlock(disk->lk); 200 } 201 202 203 void 204 diskRead(Disk *disk, Block *b) 205 { 206 assert(b->iostate == BioEmpty || b->iostate == BioLabel); 207 blockSetIOState(b, BioReading); 208 diskQueue(disk, b); 209 } 210 211 void 212 diskWrite(Disk *disk, Block *b) 213 { 214 assert(b->iostate == BioDirty); 215 blockSetIOState(b, BioWriting); 216 diskQueue(disk, b); 217 } 218 219 int 220 diskBlockSize(Disk *disk) 221 { 222 return disk->h.blockSize; /* immuttable */ 223 } 224 225 int 226 diskFlush(Disk *disk) 227 { 228 Dir dir; 229 230 vtLock(disk->lk); 231 while(disk->nqueue > 0) 232 vtSleep(disk->flush); 233 vtUnlock(disk->lk); 234 235 /* there really should be a cleaner interface to flush an fd */ 236 nulldir(&dir); 237 if(dirfwstat(disk->fd, &dir) < 0){ 238 vtOSError(); 239 return 0; 240 } 241 return 1; 242 } 243 244 u32int 245 diskSize(Disk *disk, int part) 246 { 247 return partEnd(disk, part) - partStart(disk, part); 248 } 249 250 static void 251 diskThread(void *a) 252 { 253 Disk *disk = a; 254 Block *b; 255 uchar *buf, *p; 256 double t; 257 int nio; 258 259 vtThreadSetName("disk"); 260 261 //fprint(2, "diskThread %d\n", getpid()); 262 263 buf = vtMemAlloc(disk->h.blockSize); 264 265 vtLock(disk->lk); 266 nio = 0; 267 t = -nsec(); 268 for(;;){ 269 while(disk->nqueue == 0){ 270 t += nsec(); 271 if(nio >= 10000){ 272 fprint(2, "disk: io=%d at %.3fms\n", nio, t*1e-6/nio); 273 nio = 0; 274 t = 0.; 275 } 276 if(disk->die != nil) 277 goto Done; 278 vtSleep(disk->starve); 279 t -= nsec(); 280 } 281 assert(disk->cur != nil || disk->next != nil); 282 283 if(disk->cur == nil){ 284 disk->cur = disk->next; 285 disk->next = nil; 286 } 287 b = disk->cur; 288 disk->cur = b->ionext; 289 vtUnlock(disk->lk); 290 291 /* 292 * no one should hold onto blocking in the 293 * reading or writing state, so this lock should 294 * not cause deadlock. 295 */ 296 if(0)fprint(2, "diskThread: %d:%d %x\n", getpid(), b->part, b->addr); 297 bwatchLock(b); 298 vtLock(b->lk); 299 assert(b->nlock == 1); 300 301 switch(b->iostate){ 302 default: 303 abort(); 304 case BioReading: 305 if(!diskReadRaw(disk, b->part, b->addr, b->data)){ 306 fprint(2, "diskReadRaw failed: part=%d addr=%ux: %r\n", b->part, b->addr); 307 blockSetIOState(b, BioReadError); 308 }else 309 blockSetIOState(b, BioClean); 310 break; 311 case BioWriting: 312 p = blockRollback(b, buf); 313 if(!diskWriteRaw(disk, b->part, b->addr, p)){ 314 fprint(2, "diskWriteRaw failed: date=%s part=%d addr=%ux: %r\n", ctime(times(0)), b->part, b->addr); 315 break; 316 } 317 if(p != buf) 318 blockSetIOState(b, BioClean); 319 else 320 blockSetIOState(b, BioDirty); 321 break; 322 } 323 324 blockPut(b); /* remove extra reference, unlock */ 325 vtLock(disk->lk); 326 disk->nqueue--; 327 if(disk->nqueue == QueueSize-1) 328 vtWakeup(disk->flow); 329 if(disk->nqueue == 0) 330 vtWakeup(disk->flush); 331 nio++; 332 } 333 Done: 334 //fprint(2, "diskThread done\n"); 335 disk->ref--; 336 vtWakeup(disk->die); 337 vtUnlock(disk->lk); 338 vtMemFree(buf); 339 } 340