/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License (the "License"). * You may not use this file except in compliance with the License. * * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE * or http://www.opensolaris.org/os/licensing. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at usr/src/OPENSOLARIS.LICENSE. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END */ /* * Copyright 2007 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. */ #pragma ident "%Z%%M% %I% %E% SMI" #include "config.h" #include #ifdef HAVE_SYS_ASYNCH_H #include #endif #include #include #include #include #include #include #ifdef HAVE_UTILITY_H #include #endif /* HAVE_UTILITY_H */ #ifdef HAVE_AIO #include #endif /* HAVE_AIO */ #ifdef HAVE_LIBAIO_H #include #endif /* HAVE_LIBAIO_H */ #ifdef HAVE_SYS_ASYNC_H #include #endif /* HAVE_SYS_ASYNC_H */ #ifdef HAVE_AIO_H #include #endif /* HAVE_AIO_H */ #ifndef HAVE_UINT_T #define uint_t unsigned int #endif /* HAVE_UINT_T */ #ifndef HAVE_AIOCB64_T #define aiocb64 aiocb #endif /* HAVE_AIOCB64_T */ #ifndef HAVE_SYSV_SEM #include #endif /* HAVE_SYSV_SEM */ #include "filebench.h" #include "flowop.h" #include "fileset.h" /* * These routines implement the flowops from the f language. Each * flowop has has a name such as "read", and a set of function pointers * to call for initialization, execution and destruction of the flowop. * The table flowoplib_funcs[] contains a flowoplib struct for each * implemented flowop. Most flowops use a generic initialization function * and all currently use a generic destruction function. All flowop * functions referenced from the table are in this file, though, of * course, they often call functions from other files. * * The flowop_init() routine uses the flowoplib_funcs[] table to * create an initial set of "instance 0" flowops, one for each type of * flowop, from which all other flowops are derived. These "instance 0" * flowops are initialized with information from the table including * pointers for their fo_init, fo_func and fo_destroy functions. When * a flowop definition is encountered in an f language script, the * "type" of flowop, such as "read" is used to search for the * "instance 0" flowop named "read", then a new flowop is allocated * which inherits its function pointers and other initial properties * from the instance 0 flowop, and is given a new name as specified * by the "name=" attribute. */ static int flowoplib_init_generic(flowop_t *flowop); static void flowoplib_destruct_generic(flowop_t *flowop); static int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop); static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop); #ifdef HAVE_AIO static int flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop); static int flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop); #endif static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop); static int flowoplib_block_init(flowop_t *flowop); static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop); static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop); static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop); static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop); static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop); static int flowoplib_sempost_init(flowop_t *flowop); static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop); static int flowoplib_semblock_init(flowop_t *flowop); static void flowoplib_semblock_destruct(flowop_t *flowop); static int flowoplib_eventlimit(threadflow_t *, flowop_t *flowop); static int flowoplib_bwlimit(threadflow_t *, flowop_t *flowop); static int flowoplib_iopslimit(threadflow_t *, flowop_t *flowop); static int flowoplib_opslimit(threadflow_t *, flowop_t *flowop); static int flowoplib_openfile(threadflow_t *, flowop_t *flowop); static int flowoplib_openfile_common(threadflow_t *, flowop_t *flowop, int fd); static int flowoplib_createfile(threadflow_t *, flowop_t *flowop); static int flowoplib_closefile(threadflow_t *, flowop_t *flowop); static int flowoplib_fsync(threadflow_t *, flowop_t *flowop); static int flowoplib_readwholefile(threadflow_t *, flowop_t *flowop); static int flowoplib_writewholefile(threadflow_t *, flowop_t *flowop); static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop); static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop); static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop); static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop); static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop); static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop); static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop); typedef struct flowoplib { int fl_type; int fl_attrs; char *fl_name; int (*fl_init)(); int (*fl_func)(); void (*fl_destruct)(); } flowoplib_t; static flowoplib_t flowoplib_funcs[] = { FLOW_TYPE_IO, FLOW_ATTR_WRITE, "write", flowoplib_init_generic, flowoplib_write, flowoplib_destruct_generic, FLOW_TYPE_IO, FLOW_ATTR_READ, "read", flowoplib_init_generic, flowoplib_read, flowoplib_destruct_generic, #ifdef HAVE_AIO FLOW_TYPE_AIO, FLOW_ATTR_WRITE, "aiowrite", flowoplib_init_generic, flowoplib_aiowrite, flowoplib_destruct_generic, FLOW_TYPE_AIO, 0, "aiowait", flowoplib_init_generic, flowoplib_aiowait, flowoplib_destruct_generic, #endif FLOW_TYPE_SYNC, 0, "block", flowoplib_block_init, flowoplib_block, flowoplib_destruct_generic, FLOW_TYPE_SYNC, 0, "wakeup", flowoplib_init_generic, flowoplib_wakeup, flowoplib_destruct_generic, FLOW_TYPE_SYNC, 0, "semblock", flowoplib_semblock_init, flowoplib_semblock, flowoplib_semblock_destruct, FLOW_TYPE_SYNC, 0, "sempost", flowoplib_sempost_init, flowoplib_sempost, flowoplib_destruct_generic, FLOW_TYPE_OTHER, 0, "hog", flowoplib_init_generic, flowoplib_hog, flowoplib_destruct_generic, FLOW_TYPE_OTHER, 0, "delay", flowoplib_init_generic, flowoplib_delay, flowoplib_destruct_generic, FLOW_TYPE_OTHER, 0, "eventlimit", flowoplib_init_generic, flowoplib_eventlimit, flowoplib_destruct_generic, FLOW_TYPE_OTHER, 0, "bwlimit", flowoplib_init_generic, flowoplib_bwlimit, flowoplib_destruct_generic, FLOW_TYPE_OTHER, 0, "iopslimit", flowoplib_init_generic, flowoplib_iopslimit, flowoplib_destruct_generic, FLOW_TYPE_OTHER, 0, "opslimit", flowoplib_init_generic, flowoplib_opslimit, flowoplib_destruct_generic, FLOW_TYPE_OTHER, 0, "finishoncount", flowoplib_init_generic, flowoplib_finishoncount, flowoplib_destruct_generic, FLOW_TYPE_OTHER, 0, "finishonbytes", flowoplib_init_generic, flowoplib_finishonbytes, flowoplib_destruct_generic, FLOW_TYPE_IO, 0, "openfile", flowoplib_init_generic, flowoplib_openfile, flowoplib_destruct_generic, FLOW_TYPE_IO, 0, "createfile", flowoplib_init_generic, flowoplib_createfile, flowoplib_destruct_generic, FLOW_TYPE_IO, 0, "closefile", flowoplib_init_generic, flowoplib_closefile, flowoplib_destruct_generic, FLOW_TYPE_IO, 0, "fsync", flowoplib_init_generic, flowoplib_fsync, flowoplib_destruct_generic, FLOW_TYPE_IO, 0, "fsyncset", flowoplib_init_generic, flowoplib_fsyncset, flowoplib_destruct_generic, FLOW_TYPE_IO, 0, "statfile", flowoplib_init_generic, flowoplib_statfile, flowoplib_destruct_generic, FLOW_TYPE_IO, FLOW_ATTR_READ, "readwholefile", flowoplib_init_generic, flowoplib_readwholefile, flowoplib_destruct_generic, FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfile", flowoplib_init_generic, flowoplib_appendfile, flowoplib_destruct_generic, FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfilerand", flowoplib_init_generic, flowoplib_appendfilerand, flowoplib_destruct_generic, FLOW_TYPE_IO, 0, "deletefile", flowoplib_init_generic, flowoplib_deletefile, flowoplib_destruct_generic, FLOW_TYPE_IO, FLOW_ATTR_WRITE, "writewholefile", flowoplib_init_generic, flowoplib_writewholefile, flowoplib_destruct_generic }; /* * Loops through the master list of flowops defined in this * module, and creates and initializes a flowop for each one * by calling flowop_define. As a side effect of calling * flowop define, the created flowops are placed on the * master flowop list. All created flowops are set to * instance "0". */ void flowoplib_init() { int nops = sizeof (flowoplib_funcs) / sizeof (flowoplib_t); int i; for (i = 0; i < nops; i++) { flowop_t *flowop; flowoplib_t *fl; fl = &flowoplib_funcs[i]; if ((flowop = flowop_define(NULL, fl->fl_name, NULL, 0, fl->fl_type)) == 0) { filebench_log(LOG_ERROR, "failed to create flowop %s\n", fl->fl_name); filebench_shutdown(1); } flowop->fo_func = fl->fl_func; flowop->fo_init = fl->fl_init; flowop->fo_destruct = fl->fl_destruct; flowop->fo_attrs = fl->fl_attrs; } } static int flowoplib_init_generic(flowop_t *flowop) { (void) ipc_mutex_unlock(&flowop->fo_lock); return (0); } /* ARGSUSED */ static void flowoplib_destruct_generic(flowop_t *flowop) { } /* * Generates a file attribute from flags in the supplied flowop. * Sets FLOW_ATTR_DIRECTIO and/or FLOW_ATTR_DSYNC as needed. */ static int flowoplib_fileattrs(flowop_t *flowop) { int attrs = 0; if (*flowop->fo_directio) attrs |= FLOW_ATTR_DIRECTIO; if (*flowop->fo_dsync) attrs |= FLOW_ATTR_DSYNC; return (attrs); } /* * Searches for a file descriptor. Tries the flowop's * fo_fdnumber first and returns with it if it has been * explicitly set (greater than 0). It next checks to * see if a rotating file descriptor policy is in effect, * and if not returns the fdnumber regardless of what * it is. (note that if it is 0, it just selects to the * default file descriptor in the threadflow's tf_fd * array). If the rotating fd policy is in effect, it * cycles from the end of the tf_fd array to one location * beyond the maximum needed by the number of entries in * the associated fileset on each invocation, then starts * over from the end. * * The routine returns an index into the threadflow's * tf_fd table where the actual file descriptor will be * found. Note: the calling routine must not call this * routine if the flowop does not have a fileset, and the * flowop's fo_fdnumber is zero and fo_rotatefd is * asserted, or an addressing fault may occur. */ int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop) { /* If the script sets the fd explicitly */ if (flowop->fo_fdnumber > 0) return (flowop->fo_fdnumber); /* If the flowop defaults to persistent fd */ if (!integer_isset(flowop->fo_rotatefd)) return (flowop->fo_fdnumber); /* Rotate the fd on each flowop invocation */ if (*(flowop->fo_fileset->fs_entries) > (THREADFLOW_MAXFD / 2)) { filebench_log(LOG_ERROR, "Out of file descriptors in flowop %s" " (too many files : %d", flowop->fo_name, *(flowop->fo_fileset->fs_entries)); return (-1); } /* First time around */ if (threadflow->tf_fdrotor == 0) threadflow->tf_fdrotor = THREADFLOW_MAXFD; /* One fd for every file in the set */ if (*(flowop->fo_fileset->fs_entries) == (THREADFLOW_MAXFD - threadflow->tf_fdrotor)) threadflow->tf_fdrotor = THREADFLOW_MAXFD; threadflow->tf_fdrotor--; filebench_log(LOG_DEBUG_IMPL, "selected fd = %d", threadflow->tf_fdrotor); return (threadflow->tf_fdrotor); } /* * Emulate posix read / pread. If the flowop has a fileset, * a file descriptor number index is fetched, otherwise a * supplied fileobj file is used. In either case the specified * file will be opened if not already open. If the flowop has * neither a fileset or fileobj, an error is logged and -1 * returned. * * The actual read is done to a random offset in the * threadflow's thread memory (tf_mem), with a size set by * fo_iosize and at either a random disk offset within the * working set size, or at the next sequential location. If * any errors are encountered, -1 is returned, if successful, * 0 is returned. */ static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop) { size_t memoffset; vinteger_t wss; long memsize, round; int filedesc; int ret; if (flowop->fo_fileset || flowop->fo_fdnumber) { int fd = flowoplib_fdnum(threadflow, flowop); if (fd == -1) return (-1); if (threadflow->tf_fd[fd] == 0) { (void) flowoplib_openfile_common(threadflow, flowop, fd); filebench_log(LOG_DEBUG_IMPL, "read opened file %s", threadflow->tf_fse[fd]->fse_path); } filedesc = threadflow->tf_fd[fd]; if (*flowop->fo_wss == 0) wss = threadflow->tf_fse[fd]->fse_size; else wss = *flowop->fo_wss; } else { if (flowop->fo_file == NULL) { filebench_log(LOG_ERROR, "flowop NULL file"); return (-1); } if (flowop->fo_fd < 0) flowop->fo_fd = fileobj_open(flowop->fo_file, flowoplib_fileattrs(flowop)); if (flowop->fo_fd < 0) { filebench_log(LOG_ERROR, "failed to open file %s", flowop->fo_file->fo_name); return (-1); } filedesc = flowop->fo_fd; if (*flowop->fo_wss == 0) wss = *flowop->fo_file->fo_size; else wss = *flowop->fo_wss; } if (*flowop->fo_iosize == 0) { filebench_log(LOG_ERROR, "zero iosize for thread %s", flowop->fo_name); return (-1); } memsize = *threadflow->tf_memsize; round = *flowop->fo_iosize; if (filebench_randomno(&memoffset, memsize, round) == -1) { filebench_log(LOG_ERROR, "tf_memsize smaller than IO size for thread %s", flowop->fo_name); return (-1); } if (*flowop->fo_random) { uint64_t fileoffset; if (filebench_randomno64(&fileoffset, wss, *flowop->fo_iosize) == -1) { filebench_log(LOG_ERROR, "file size smaller than IO size for thread %s", flowop->fo_name); return (-1); } (void) flowop_beginop(threadflow, flowop); if ((ret = pread64(filedesc, threadflow->tf_mem + memoffset, *flowop->fo_iosize, (off64_t)fileoffset)) == -1) { (void) flowop_endop(threadflow, flowop); filebench_log(LOG_ERROR, "read file %s failed, offset %lld " "memoffset %zd: %s", flowop->fo_file->fo_name, fileoffset, memoffset, strerror(errno)); flowop_endop(threadflow, flowop); return (-1); } (void) flowop_endop(threadflow, flowop); if ((ret == 0)) (void) lseek64(filedesc, 0, SEEK_SET); } else { (void) flowop_beginop(threadflow, flowop); if ((ret = read(filedesc, threadflow->tf_mem + memoffset, *flowop->fo_iosize)) == -1) { filebench_log(LOG_ERROR, "read file %s failed, memoffset %zd: %s", flowop->fo_file->fo_name, memoffset, strerror(errno)); (void) flowop_endop(threadflow, flowop); return (-1); } (void) flowop_endop(threadflow, flowop); if ((ret == 0)) (void) lseek64(filedesc, 0, SEEK_SET); } return (0); } #ifdef HAVE_AIO /* * Asynchronous write section. An Asynchronous IO element * (aiolist_t) is used to associate the asynchronous write request with * its subsequent completion. This element includes a aiocb64 struct * that is used by posix aio_xxx calls to track the asynchronous writes. * The flowops aiowrite and aiowait result in calls to these posix * aio_xxx system routines to do the actual asynchronous write IO * operations. */ /* * Allocates an asynchronous I/O list (aio, of type * aiolist_t) element. Adds it to the flowop thread's * threadflow aio list. Returns a pointer to the element. */ static aiolist_t * aio_allocate(flowop_t *flowop) { aiolist_t *aiolist; if ((aiolist = malloc(sizeof (aiolist_t))) == NULL) { filebench_log(LOG_ERROR, "malloc aiolist failed"); filebench_shutdown(1); } /* Add to list */ if (flowop->fo_thread->tf_aiolist == NULL) { flowop->fo_thread->tf_aiolist = aiolist; aiolist->al_next = NULL; } else { aiolist->al_next = flowop->fo_thread->tf_aiolist; flowop->fo_thread->tf_aiolist = aiolist; } return (aiolist); } /* * Searches for the aiolist element that has a matching * completion block, aiocb. If none found returns -1. If * found, removes the aiolist element from flowop thread's * list and returns 0. */ static int aio_deallocate(flowop_t *flowop, struct aiocb64 *aiocb) { aiolist_t *aiolist = flowop->fo_thread->tf_aiolist; aiolist_t *previous = NULL; aiolist_t *match = NULL; if (aiocb == NULL) { filebench_log(LOG_ERROR, "null aiocb deallocate"); return (0); } while (aiolist) { if (aiocb == &(aiolist->al_aiocb)) { match = aiolist; break; } previous = aiolist; aiolist = aiolist->al_next; } if (match == NULL) return (-1); /* Remove from the list */ if (previous) previous->al_next = match->al_next; else flowop->fo_thread->tf_aiolist = match->al_next; return (0); } /* * Emulate posix aiowrite(). Determines which file to use, * either one file of a fileset, or the file associated * with a fileobj, allocates and fills an aiolist_t element * for the write, and issues the asynchronous write. This * operation is only valid for random IO, and returns an * error if the flowop is set for sequential IO. Returns 0 * on success, -1 on any encountered error. */ static int flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop) { size_t memoffset; vinteger_t wss; long memsize, round; int filedesc; if (flowop->fo_fileset || flowop->fo_fdnumber) { int fd = flowoplib_fdnum(threadflow, flowop); if (fd == -1) return (-1); if (threadflow->tf_fd[fd] == 0) { (void) flowoplib_openfile_common(threadflow, flowop, fd); filebench_log(LOG_DEBUG_IMPL, "writefile opened file %s", threadflow->tf_fse[fd]->fse_path); } filedesc = threadflow->tf_fd[fd]; if (*flowop->fo_wss == 0) wss = threadflow->tf_fse[fd]->fse_size; else wss = *flowop->fo_wss; } else { if (flowop->fo_file == NULL) { filebench_log(LOG_ERROR, "flowop NULL file"); return (-1); } if (flowop->fo_fd < 0) flowop->fo_fd = fileobj_open(flowop->fo_file, flowoplib_fileattrs(flowop)); if (flowop->fo_fd < 0) { filebench_log(LOG_ERROR, "failed to open file %s", flowop->fo_file->fo_name); return (-1); } filedesc = flowop->fo_fd; if (*flowop->fo_wss == 0) wss = *flowop->fo_file->fo_size; else wss = *flowop->fo_wss; } if (*flowop->fo_iosize == 0) { filebench_log(LOG_ERROR, "zero iosize for thread %s", flowop->fo_name); return (-1); } memsize = *threadflow->tf_memsize; round = *flowop->fo_iosize; /* Select memory offset for IO */ if (filebench_randomno(&memoffset, memsize, round) == -1) { filebench_log(LOG_ERROR, "tf_memsize smaller than IO size for thread %s", flowop->fo_name); return (-1); } if (*flowop->fo_random) { uint64_t fileoffset; struct aiocb64 *aiocb; aiolist_t *aiolist; if (filebench_randomno64(&fileoffset, wss, *flowop->fo_iosize) == -1) { filebench_log(LOG_ERROR, "file size smaller than IO size for thread %s", flowop->fo_name); return (-1); } aiolist = aio_allocate(flowop); aiolist->al_type = AL_WRITE; aiocb = &aiolist->al_aiocb; aiocb->aio_fildes = filedesc; aiocb->aio_buf = threadflow->tf_mem + memoffset; aiocb->aio_nbytes = *flowop->fo_iosize; aiocb->aio_offset = (off64_t)fileoffset; aiocb->aio_reqprio = 0; filebench_log(LOG_DEBUG_IMPL, "aio fd=%d, bytes=%lld, offset=%lld", filedesc, *flowop->fo_iosize, fileoffset); flowop_beginop(threadflow, flowop); if (aio_write64(aiocb) < 0) { filebench_log(LOG_ERROR, "aiowrite failed: %s", strerror(errno)); filebench_shutdown(1); } flowop_endop(threadflow, flowop); } else { return (-1); } return (0); } #define MAXREAP 4096 /* * Emulate posix aiowait(). Waits for the completion of half the * outstanding asynchronous IOs, or a single IO, which ever is * larger. The routine will return after a sufficient number of * completed calls issued by any thread in the procflow have * completed, or a 1 second timout elapses. All completed * IO operations are deleted from the thread's aiolist. */ static int flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop) { struct aiocb64 **worklist; aiolist_t *aio = flowop->fo_thread->tf_aiolist; int uncompleted = 0; worklist = calloc(MAXREAP, sizeof (struct aiocb64 *)); /* Count the list of pending aios */ while (aio) { uncompleted++; aio = aio->al_next; } do { uint_t ncompleted = 0; uint_t todo; struct timespec timeout; int inprogress; int i; /* Wait for half of the outstanding requests */ timeout.tv_sec = 1; timeout.tv_nsec = 0; if (uncompleted > MAXREAP) todo = MAXREAP; else todo = uncompleted / 2; if (todo == 0) todo = 1; flowop_beginop(threadflow, flowop); #ifdef HAVE_AIOWAITN if ((aio_waitn64((struct aiocb64 **)worklist, MAXREAP, &todo, &timeout) == -1) && errno && (errno != ETIME)) { filebench_log(LOG_ERROR, "aiowait failed: %s, outstanding = %d, " "ncompleted = %d ", strerror(errno), uncompleted, todo); } ncompleted = todo; /* Take the completed I/Os from the list */ inprogress = 0; for (i = 0; i < ncompleted; i++) { if ((aio_return64(worklist[i]) == -1) && (errno == EINPROGRESS)) { inprogress++; continue; } if (aio_deallocate(flowop, worklist[i]) < 0) { filebench_log(LOG_ERROR, "Could not remove " "aio from list "); flowop_endop(threadflow, flowop); return (-1); } } uncompleted -= ncompleted; uncompleted += inprogress; #else for (ncompleted = 0, inprogress = 0, aio = flowop->fo_thread->tf_aiolist; ncompleted < todo, aio != NULL; aio = aio->al_next) { result = aio_error64(&aio->al_aiocb); if (result == EINPROGRESS) { inprogress++; continue; } if ((aio_return64(&aio->al_aiocb) == -1) || result) { filebench_log(LOG_ERROR, "aio failed: %s", strerror(result)); continue; } ncompleted++; if (aio_deallocate(flowop, &aio->al_aiocb) < 0) { filebench_log(LOG_ERROR, "Could not remove aio " "from list "); flowop_endop(threadflow, flowop); return (-1); } } uncompleted -= ncompleted; #endif filebench_log(LOG_DEBUG_SCRIPT, "aio2 completed %d ios, uncompleted = %d, inprogress = %d", ncompleted, uncompleted, inprogress); } while (uncompleted > MAXREAP); flowop_endop(threadflow, flowop); free(worklist); return (0); } #endif /* HAVE_AIO */ /* * Initializes a "flowop_block" flowop. Specifically, it * initializes the flowop's fo_cv and unlocks the fo_lock. */ static int flowoplib_block_init(flowop_t *flowop) { filebench_log(LOG_DEBUG_IMPL, "flow %s-%d block init address %zx", flowop->fo_name, flowop->fo_instance, &flowop->fo_cv); (void) pthread_cond_init(&flowop->fo_cv, ipc_condattr()); (void) ipc_mutex_unlock(&flowop->fo_lock); return (0); } /* * Blocks the threadflow until woken up by flowoplib_wakeup. * The routine blocks on the flowop's fo_cv condition variable. */ static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop) { filebench_log(LOG_DEBUG_IMPL, "flow %s-%d blocking at address %zx", flowop->fo_name, flowop->fo_instance, &flowop->fo_cv); (void) ipc_mutex_lock(&flowop->fo_lock); flowop_beginop(threadflow, flowop); (void) pthread_cond_wait(&flowop->fo_cv, &flowop->fo_lock); flowop_endop(threadflow, flowop); filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking", flowop->fo_name, flowop->fo_instance); (void) ipc_mutex_unlock(&flowop->fo_lock); return (0); } /* * Wakes up one or more target blocking flowops. * Sends broadcasts on the fo_cv condition variables of all * flowops on the target list, except those that are * FLOW_MASTER flowops. The target list consists of all * flowops whose name matches this flowop's "fo_targetname" * attribute. The target list is generated on the first * invocation, and the run will be shutdown if no targets * are found. Otherwise the routine always returns 0. */ static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop) { flowop_t *target; /* if this is the first wakeup, create the wakeup list */ if (flowop->fo_targets == NULL) { flowop_t *result = flowop_find(flowop->fo_targetname); flowop->fo_targets = result; if (result == NULL) { filebench_log(LOG_ERROR, "wakeup: could not find op %s for thread %s", flowop->fo_targetname, threadflow->tf_name); filebench_shutdown(1); } while (result) { result->fo_targetnext = result->fo_resultnext; result = result->fo_resultnext; } } target = flowop->fo_targets; /* wakeup the targets */ while (target) { if (target->fo_instance == FLOW_MASTER) { target = target->fo_targetnext; continue; } filebench_log(LOG_DEBUG_IMPL, "wakeup flow %s-%d at address %zx", target->fo_name, target->fo_instance, &target->fo_cv); flowop_beginop(threadflow, flowop); (void) ipc_mutex_lock(&target->fo_lock); (void) pthread_cond_broadcast(&target->fo_cv); (void) ipc_mutex_unlock(&target->fo_lock); flowop_endop(threadflow, flowop); target = target->fo_targetnext; } return (0); } /* * "think time" routines. the "hog" routine consumes cpu cycles as * it "thinks", while the "delay" flowop simply calls sleep() to delay * for a given number of seconds without consuming cpu cycles. */ /* * Consumes CPU cycles and memory bandwidth by looping for * flowop->fo_value times. With each loop sets memory location * threadflow->tf_mem to 1. */ static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop) { uint64_t value = *flowop->fo_value; int i; flowop_beginop(threadflow, flowop); filebench_log(LOG_DEBUG_IMPL, "hog enter"); for (i = 0; i < value; i++) *(threadflow->tf_mem) = 1; flowop_endop(threadflow, flowop); filebench_log(LOG_DEBUG_IMPL, "hog exit"); return (0); } /* * Delays for fo_value seconds. */ static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop) { int value = *flowop->fo_value; flowop_beginop(threadflow, flowop); (void) sleep(value); flowop_endop(threadflow, flowop); return (0); } /* * Rate limiting routines. This is the event consuming half of the * event system. Each of the four following routines will limit the rate * to one unit of either calls, issued I/O operations, issued filebench * operations, or I/O bandwidth. Since there is only one event generator, * the events will be divided amoung multiple instances of an event * consumer, and further divided among different consumers if more than * one has been defined. There is no mechanism to enforce equal sharing * of events. */ /* * Completes one invocation per posted event. If eventgen_q * has an event count greater than zero, one will be removed * (count decremented), otherwise the calling thread will * block until another event has been posted. Always returns 0 */ static int flowoplib_eventlimit(threadflow_t *threadflow, flowop_t *flowop) { /* Immediately bail if not set/enabled */ if (filebench_shm->eventgen_hz == 0) return (0); if (flowop->fo_initted == 0) { filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking", flowop, threadflow->tf_name, threadflow->tf_instance); flowop->fo_initted = 1; } flowop_beginop(threadflow, flowop); while (filebench_shm->eventgen_hz) { (void) ipc_mutex_lock(&filebench_shm->eventgen_lock); if (filebench_shm->eventgen_q > 0) { filebench_shm->eventgen_q--; (void) ipc_mutex_unlock(&filebench_shm->eventgen_lock); break; } (void) pthread_cond_wait(&filebench_shm->eventgen_cv, &filebench_shm->eventgen_lock); (void) ipc_mutex_unlock(&filebench_shm->eventgen_lock); } flowop_endop(threadflow, flowop); return (0); } /* * Blocks the calling thread if the number of issued I/O * operations exceeds the number of posted events, thus * limiting the average I/O operation rate to the rate * specified by eventgen_hz. Always returns 0. */ static int flowoplib_iopslimit(threadflow_t *threadflow, flowop_t *flowop) { uint64_t iops; uint64_t delta; int events; /* Immediately bail if not set/enabled */ if (filebench_shm->eventgen_hz == 0) return (0); if (flowop->fo_initted == 0) { filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking", flowop, threadflow->tf_name, threadflow->tf_instance); flowop->fo_initted = 1; } iops = (controlstats.fs_rcount + controlstats.fs_wcount); /* Is this the first time around */ if (flowop->fo_tputlast == 0) { flowop->fo_tputlast = iops; return (0); } delta = iops - flowop->fo_tputlast; flowop->fo_tputbucket -= delta; flowop->fo_tputlast = iops; /* No need to block if the q isn't empty */ if (flowop->fo_tputbucket >= 0LL) { flowop_endop(threadflow, flowop); return (0); } iops = flowop->fo_tputbucket * -1; events = iops; flowop_beginop(threadflow, flowop); while (filebench_shm->eventgen_hz) { (void) ipc_mutex_lock(&filebench_shm->eventgen_lock); if (filebench_shm->eventgen_q >= events) { filebench_shm->eventgen_q -= events; (void) ipc_mutex_unlock(&filebench_shm->eventgen_lock); flowop->fo_tputbucket += events; break; } (void) pthread_cond_wait(&filebench_shm->eventgen_cv, &filebench_shm->eventgen_lock); (void) ipc_mutex_unlock(&filebench_shm->eventgen_lock); } flowop_endop(threadflow, flowop); return (0); } /* * Blocks the calling thread if the number of issued filebench * operations exceeds the number of posted events, thus limiting * the average filebench operation rate to the rate specified by * eventgen_hz. Always returns 0. */ static int flowoplib_opslimit(threadflow_t *threadflow, flowop_t *flowop) { uint64_t ops; uint64_t delta; int events; /* Immediately bail if not set/enabled */ if (filebench_shm->eventgen_hz == 0) return (0); if (flowop->fo_initted == 0) { filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking", flowop, threadflow->tf_name, threadflow->tf_instance); flowop->fo_initted = 1; } ops = controlstats.fs_count; /* Is this the first time around */ if (flowop->fo_tputlast == 0) { flowop->fo_tputlast = ops; return (0); } delta = ops - flowop->fo_tputlast; flowop->fo_tputbucket -= delta; flowop->fo_tputlast = ops; /* No need to block if the q isn't empty */ if (flowop->fo_tputbucket >= 0LL) { flowop_endop(threadflow, flowop); return (0); } ops = flowop->fo_tputbucket * -1; events = ops; flowop_beginop(threadflow, flowop); while (filebench_shm->eventgen_hz) { (void) ipc_mutex_lock(&filebench_shm->eventgen_lock); if (filebench_shm->eventgen_q >= events) { filebench_shm->eventgen_q -= events; (void) ipc_mutex_unlock(&filebench_shm->eventgen_lock); flowop->fo_tputbucket += events; break; } (void) pthread_cond_wait(&filebench_shm->eventgen_cv, &filebench_shm->eventgen_lock); (void) ipc_mutex_unlock(&filebench_shm->eventgen_lock); } flowop_endop(threadflow, flowop); return (0); } /* * Blocks the calling thread if the number of bytes of I/O * issued exceeds one megabyte times the number of posted * events, thus limiting the average I/O byte rate to one * megabyte times the event rate as set by eventgen_hz. * Always retuns 0. */ static int flowoplib_bwlimit(threadflow_t *threadflow, flowop_t *flowop) { uint64_t bytes; uint64_t delta; int events; /* Immediately bail if not set/enabled */ if (filebench_shm->eventgen_hz == 0) return (0); if (flowop->fo_initted == 0) { filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking", flowop, threadflow->tf_name, threadflow->tf_instance); flowop->fo_initted = 1; } bytes = (controlstats.fs_rbytes + controlstats.fs_wbytes); /* Is this the first time around */ if (flowop->fo_tputlast == 0) { flowop->fo_tputlast = bytes; return (0); } delta = bytes - flowop->fo_tputlast; flowop->fo_tputbucket -= delta; flowop->fo_tputlast = bytes; /* No need to block if the q isn't empty */ if (flowop->fo_tputbucket >= 0LL) { flowop_endop(threadflow, flowop); return (0); } bytes = flowop->fo_tputbucket * -1; events = (bytes / MB) + 1; filebench_log(LOG_DEBUG_IMPL, "%lld bytes, %lld events", bytes, events); flowop_beginop(threadflow, flowop); while (filebench_shm->eventgen_hz) { (void) ipc_mutex_lock(&filebench_shm->eventgen_lock); if (filebench_shm->eventgen_q >= events) { filebench_shm->eventgen_q -= events; (void) ipc_mutex_unlock(&filebench_shm->eventgen_lock); flowop->fo_tputbucket += (events * MB); break; } (void) pthread_cond_wait(&filebench_shm->eventgen_cv, &filebench_shm->eventgen_lock); (void) ipc_mutex_unlock(&filebench_shm->eventgen_lock); } flowop_endop(threadflow, flowop); return (0); } /* * These flowops terminate a benchmark run when either the specified * number of bytes of I/O (flowoplib_finishonbytes) or the specified * number of I/O operations (flowoplib_finishoncount) have been generated. */ /* * Stop filebench run when specified number of I/O bytes have been * transferred. Compares controlstats.fs_bytes with *flowop->value, * and if greater returns 1, stopping the run, if not, returns 0 * to continue running. */ static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop) { uint64_t b; uint64_t bytes = *flowop->fo_value; b = controlstats.fs_bytes; flowop_beginop(threadflow, flowop); if (b > bytes) { flowop_endop(threadflow, flowop); return (1); } flowop_endop(threadflow, flowop); return (0); } /* * Stop filebench run when specified number of I/O operations have * been performed. Compares controlstats.fs_count with *flowop->value, * and if greater returns 1, stopping the run, if not, returns 0 to * continue running. */ static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop) { uint64_t ops; uint64_t count = *flowop->fo_value; ops = controlstats.fs_count; flowop_beginop(threadflow, flowop); if (ops > count) { flowop_endop(threadflow, flowop); return (1); } flowop_endop(threadflow, flowop); return (0); } /* * Semaphore synchronization using either System V semaphores or * posix semaphores. If System V semaphores are available, they will be * used, otherwise posix semaphores will be used. */ /* * Initializes the filebench "block on semaphore" flowop. * If System V semaphores are implemented, the routine * initializes the System V semaphore subsystem if it hasn't * already been initialized, also allocates a pair of semids * and initializes the highwater System V semaphore. * If no System V semaphores, then does nothing special. * Returns -1 if it cannot acquire a set of System V semphores * or if the initial post to the semaphore set fails. Returns 0 * on success. */ static int flowoplib_semblock_init(flowop_t *flowop) { #ifdef HAVE_SYSV_SEM int semid; struct sembuf sbuf[2]; int highwater; ipc_seminit(); flowop->fo_semid_lw = ipc_semidalloc(); flowop->fo_semid_hw = ipc_semidalloc(); filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init semid=%x", flowop->fo_name, flowop->fo_instance, flowop->fo_semid_lw); /* * Raise the number of the hw queue, causing the posting side to * block if queue is > 2 x blocking value */ if ((semid = semget(filebench_shm->semkey, FILEBENCH_NSEMS, 0)) == -1) { filebench_log(LOG_ERROR, "semblock init lookup %x failed: %s", filebench_shm->semkey, strerror(errno)); return (-1); } if ((highwater = flowop->fo_semid_hw) == 0) highwater = *flowop->fo_value; filebench_log(LOG_DEBUG_IMPL, "setting highwater to : %d", highwater); sbuf[0].sem_num = highwater; sbuf[0].sem_op = *flowop->fo_highwater; sbuf[0].sem_flg = 0; if ((semop(semid, &sbuf[0], 1) == -1) && errno) { filebench_log(LOG_ERROR, "semblock init post failed: %s (%d," "%d)", strerror(errno), sbuf[0].sem_num, sbuf[0].sem_op); return (-1); } #else filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init with posix semaphore", flowop->fo_name, flowop->fo_instance); sem_init(&flowop->fo_sem, 1, 0); #endif /* HAVE_SYSV_SEM */ if (!(*flowop->fo_blocking)) (void) ipc_mutex_unlock(&flowop->fo_lock); return (0); } /* * Releases the semids for the System V semaphore allocated * to this flowop. If not using System V semaphores, then * it is effectively just a no-op. Always returns 0. */ static void flowoplib_semblock_destruct(flowop_t *flowop) { #ifdef HAVE_SYSV_SEM ipc_semidfree(flowop->fo_semid_lw); ipc_semidfree(flowop->fo_semid_hw); #else sem_destroy(&flowop->fo_sem); #endif /* HAVE_SYSV_SEM */ } /* * Attempts to pass a System V or posix semaphore as appropriate, * and blocks if necessary. Returns -1 if a set of System V * semphores is not available or cannot be acquired, or if the initial * post to the semaphore set fails. Returns 0 on success. */ static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop) { #ifdef HAVE_SYSV_SEM struct sembuf sbuf[2]; int value = *flowop->fo_value; int semid; struct timespec timeout; if ((semid = semget(filebench_shm->semkey, FILEBENCH_NSEMS, 0)) == -1) { filebench_log(LOG_ERROR, "lookup semop %x failed: %s", filebench_shm->semkey, strerror(errno)); return (-1); } filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem blocking on id %x num %x value %d", flowop->fo_name, flowop->fo_instance, semid, flowop->fo_semid_hw, value); /* Post, decrement the increment the hw queue */ sbuf[0].sem_num = flowop->fo_semid_hw; sbuf[0].sem_op = value; sbuf[0].sem_flg = 0; sbuf[1].sem_num = flowop->fo_semid_lw; sbuf[1].sem_op = value * -1; sbuf[1].sem_flg = 0; timeout.tv_sec = 600; timeout.tv_nsec = 0; if (*flowop->fo_blocking) (void) ipc_mutex_unlock(&flowop->fo_lock); flowop_beginop(threadflow, flowop); #ifdef HAVE_SEMTIMEDOP (void) semtimedop(semid, &sbuf[0], 1, &timeout); (void) semtimedop(semid, &sbuf[1], 1, &timeout); #else (void) semop(semid, &sbuf[0], 1); (void) semop(semid, &sbuf[1], 1); #endif /* HAVE_SEMTIMEDOP */ if (*flowop->fo_blocking) (void) ipc_mutex_lock(&flowop->fo_lock); flowop_endop(threadflow, flowop); #else int value = *flowop->fo_value; int i; filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem blocking on posix semaphore", flowop->fo_name, flowop->fo_instance); /* Decrement sem by value */ for (i = 0; i < value; i++) { if (sem_wait(&flowop->fo_sem) == -1) { filebench_log(LOG_ERROR, "semop wait failed"); return (-1); } } filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem unblocking", flowop->fo_name, flowop->fo_instance); #endif /* HAVE_SYSV_SEM */ return (0); } /* * Calls ipc_seminit(), and does so whether System V semaphores * are available or not. Hence it will cause ipc_seminit to log errors * if they are not. Always returns 0. */ /* ARGSUSED */ static int flowoplib_sempost_init(flowop_t *flowop) { #ifdef HAVE_SYSV_SEM ipc_seminit(); #endif /* HAVE_SYSV_SEM */ return (0); } /* * Post to a System V or posix semaphore as appropriate. * On the first call for a given flowop instance, this routine * will use the fo_targetname attribute to locate all semblock * flowops that are expecting posts from this flowop. All * target flowops on this list will have a post operation done * to their semaphores on each call. */ static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop) { flowop_t *target; filebench_log(LOG_DEBUG_IMPL, "sempost flow %s-%d", flowop->fo_name, flowop->fo_instance); /* if this is the first post, create the post list */ if (flowop->fo_targets == NULL) { flowop_t *result = flowop_find(flowop->fo_targetname); flowop->fo_targets = result; if (result == NULL) { filebench_log(LOG_ERROR, "sempost: could not find op %s for thread %s", flowop->fo_targetname, threadflow->tf_name); filebench_shutdown(1); } while (result) { result->fo_targetnext = result->fo_resultnext; result = result->fo_resultnext; } } target = flowop->fo_targets; flowop_beginop(threadflow, flowop); /* post to the targets */ while (target) { #ifdef HAVE_SYSV_SEM struct sembuf sbuf[2]; int semid; int blocking; #else int i; #endif /* HAVE_SYSV_SEM */ int value = *flowop->fo_value; struct timespec timeout; if (target->fo_instance == FLOW_MASTER) { target = target->fo_targetnext; continue; } #ifdef HAVE_SYSV_SEM filebench_log(LOG_DEBUG_IMPL, "sempost flow %s-%d num %x", target->fo_name, target->fo_instance, target->fo_semid_lw); /* ipc_mutex_lock(&target->fo_lock); */ if ((semid = semget(filebench_shm->semkey, FILEBENCH_NSEMS, 0)) == -1) { filebench_log(LOG_ERROR, "lookup semop %x failed: %s", filebench_shm->semkey, strerror(errno)); /* ipc_mutex_unlock(&target->fo_lock); */ return (-1); } sbuf[0].sem_num = target->fo_semid_lw; sbuf[0].sem_op = value; sbuf[0].sem_flg = 0; sbuf[1].sem_num = target->fo_semid_hw; sbuf[1].sem_op = value * -1; sbuf[1].sem_flg = 0; timeout.tv_sec = 600; timeout.tv_nsec = 0; if (*flowop->fo_blocking) blocking = 1; else blocking = 0; #ifdef HAVE_SEMTIMEDOP if ((semtimedop(semid, &sbuf[0], blocking + 1, &timeout) == -1) && (errno && (errno != EAGAIN))) { #else if ((semop(semid, &sbuf[0], blocking + 1) == -1) && (errno && (errno != EAGAIN))) { #endif /* HAVE_SEMTIMEDOP */ filebench_log(LOG_ERROR, "semop post failed: %s", strerror(errno)); /* ipc_mutex_unlock(&target->fo_lock); */ return (-1); } filebench_log(LOG_DEBUG_IMPL, "flow %s-%d finished posting", target->fo_name, target->fo_instance); #else filebench_log(LOG_DEBUG_IMPL, "sempost flow %s-%d to posix semaphore", target->fo_name, target->fo_instance); /* Increment sem by value */ for (i = 0; i < value; i++) { if (sem_post(&target->fo_sem) == -1) { filebench_log(LOG_ERROR, "semop post failed"); /* ipc_mutex_unlock(&target->fo_lock); */ return (-1); } } filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking", target->fo_name, target->fo_instance); #endif /* HAVE_SYSV_SEM */ target = target->fo_targetnext; } flowop_endop(threadflow, flowop); return (0); } /* * Section for exercising create / open / close / delete operations * on files within a fileset. For proper operation, the flowop attribute * "fd", which sets the fo_fdnumber field in the flowop, must be used * so that the same file is opened and later closed. "fd" is an index * into a pair of arrays maintained by threadflows, one of which * contains the operating system assigned file descriptors and the other * a pointer to the filesetentry whose file the file descriptor * references. An openfile flowop defined without fd being set will use * the default (0) fd or, if specified, rotate through fd indices, but * createfile and closefile must use the default or a specified fd. * Meanwhile deletefile picks and arbitrary file to delete, regardless * of fd attribute. */ /* * XXX Making file selection more consistent among the flowops might good */ /* * Emulates (and actually does) file open. Obtains a file descriptor * index, then calls flowoplib_openfile_common() to open. Returns -1 * if not file descriptor is found or flowoplib_openfile_common * encounters an error, otherwise 0. */ static int flowoplib_openfile(threadflow_t *threadflow, flowop_t *flowop) { int fd = flowoplib_fdnum(threadflow, flowop); if (fd == -1) return (-1); return (flowoplib_openfile_common(threadflow, flowop, fd)); } /* * Common file opening code for filesets. Uses the supplied * file descriptor index to determine the tf_fd entry to use. * If the entry is empty (0) and the fileset exists, fileset * pick is called to select a fileset entry to use. The file * specified in the filesetentry is opened, and the returned * operating system file descriptor and a pointer to the * filesetentry are stored in tf_fd[fd] and tf_fse[fd], * respectively. Returns -1 on error, 0 on success. */ static int flowoplib_openfile_common(threadflow_t *threadflow, flowop_t *flowop, int fd) { filesetentry_t *file; int tid = 0; /* * If the flowop doesn't default to persistent fd * then get unique thread ID for use by fileset_pick */ if (integer_isset(flowop->fo_rotatefd)) tid = threadflow->tf_utid; if (threadflow->tf_fd[fd] != 0) { filebench_log(LOG_ERROR, "flowop %s attempted to open without closing on fd %d", flowop->fo_name, fd); return (-1); } if (flowop->fo_fileset == NULL) { filebench_log(LOG_ERROR, "flowop NULL file"); return (-1); } if ((file = fileset_pick(flowop->fo_fileset, FILESET_PICKEXISTS, tid)) == NULL) { filebench_log(LOG_ERROR, "flowop %s failed to pick file from %s on fd %d", flowop->fo_name, flowop->fo_fileset->fs_name, fd); return (-1); } threadflow->tf_fse[fd] = file; flowop_beginop(threadflow, flowop); threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset, file, O_RDWR, 0666, flowoplib_fileattrs(flowop)); flowop_endop(threadflow, flowop); if (threadflow->tf_fd[fd] < 0) { filebench_log(LOG_ERROR, "failed to open file %s", flowop->fo_name); return (-1); } filebench_log(LOG_DEBUG_SCRIPT, "flowop %s: opened %s fd[%d] = %d", flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]); return (0); } /* * Emulate create of a file. Uses the flowop's fdnumber to select * tf_fd and tf_fse array locations to put the created file's file * descriptor and filesetentry respectively. Uses fileset_pick() * to select a specific filesetentry whose file does not currently * exist for the file create operation. Then calls * fileset_openfile() with the O_CREATE flag set to create the * file. Returns -1 if the array index specified by fdnumber is * already in use, the flowop has no associated fileset, or * the create call fails. Returns 1 if a filesetentry with a * nonexistent file cannot be found. Returns 0 on success. */ static int flowoplib_createfile(threadflow_t *threadflow, flowop_t *flowop) { filesetentry_t *file; int fd = flowop->fo_fdnumber; if (threadflow->tf_fd[fd] != 0) { filebench_log(LOG_ERROR, "flowop %s attempted to create without closing on fd %d", flowop->fo_name, fd); return (-1); } if (flowop->fo_fileset == NULL) { filebench_log(LOG_ERROR, "flowop NULL file"); return (-1); } if ((file = fileset_pick(flowop->fo_fileset, FILESET_PICKNOEXIST, 0)) == NULL) { filebench_log(LOG_DEBUG_SCRIPT, "flowop %s failed to pick file", flowop->fo_name); return (1); } threadflow->tf_fse[fd] = file; flowop_beginop(threadflow, flowop); threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset, file, O_RDWR | O_CREAT, 0666, flowoplib_fileattrs(flowop)); flowop_endop(threadflow, flowop); if (threadflow->tf_fd[fd] < 0) { filebench_log(LOG_ERROR, "failed to create file %s", flowop->fo_name); return (-1); } filebench_log(LOG_DEBUG_SCRIPT, "flowop %s: created %s fd[%d] = %d", flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]); return (0); } /* * Emulates delete of a file. Picks an arbitrary filesetentry * whose file exists and uses unlink() to delete it. Clears * the FSE_EXISTS flag for the filesetentry. Returns -1 if the * flowop has no associated fileset. Returns 1 if an appropriate * filesetentry cannot be found, and 0 on success. */ static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop) { filesetentry_t *file; fileset_t *fileset; char path[MAXPATHLEN]; char *pathtmp; if (flowop->fo_fileset == NULL) { filebench_log(LOG_ERROR, "flowop NULL file"); return (-1); } fileset = flowop->fo_fileset; if ((file = fileset_pick(flowop->fo_fileset, FILESET_PICKEXISTS, 0)) == NULL) { filebench_log(LOG_DEBUG_SCRIPT, "flowop %s failed to pick file", flowop->fo_name); return (1); } *path = 0; (void) strcpy(path, *fileset->fs_path); (void) strcat(path, "/"); (void) strcat(path, fileset->fs_name); pathtmp = fileset_resolvepath(file); (void) strcat(path, pathtmp); free(pathtmp); flowop_beginop(threadflow, flowop); (void) unlink(path); flowop_endop(threadflow, flowop); file->fse_flags &= ~FSE_EXISTS; (void) ipc_mutex_unlock(&file->fse_lock); filebench_log(LOG_DEBUG_SCRIPT, "deleted file %s", file->fse_path); return (0); } /* * Emulates fsync of a file. Obtains the file descriptor index * from the flowop, obtains the actual file descriptor from * the threadflow's table, checks to be sure it is still an * open file, then does an fsync operation on it. Returns -1 * if the file no longer is open, 0 otherwise. */ static int flowoplib_fsync(threadflow_t *threadflow, flowop_t *flowop) { filesetentry_t *file; int fd = flowop->fo_fdnumber; if (threadflow->tf_fd[fd] == 0) { filebench_log(LOG_ERROR, "flowop %s attempted to fsync a closed fd %d", flowop->fo_name, fd); return (-1); } /* Measure time to fsync */ flowop_beginop(threadflow, flowop); (void) fsync(threadflow->tf_fd[fd]); flowop_endop(threadflow, flowop); file = threadflow->tf_fse[fd]; filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path); return (0); } /* * Emulate fsync of an entire fileset. Search through the * threadflow's file descriptor array, doing fsync() on each * open file that belongs to the flowop's fileset. Always * returns 0. */ static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop) { int fd; for (fd = 0; fd < THREADFLOW_MAXFD; fd++) { filesetentry_t *file; /* Match the file set to fsync */ if ((threadflow->tf_fse[fd] == NULL) || (flowop->fo_fileset != threadflow->tf_fse[fd]->fse_fileset)) continue; /* Measure time to fsync */ flowop_beginop(threadflow, flowop); (void) fsync(threadflow->tf_fd[fd]); flowop_endop(threadflow, flowop); file = threadflow->tf_fse[fd]; filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path); } return (0); } /* * Emulate close of a file. Obtains the file descriptor index * from the flowop, obtains the actual file descriptor from the * threadflow's table, checks to be sure it is still an open * file, then does a close operation on it. Then sets the * threadflow file descriptor table entry to 0, and the file set * entry pointer to NULL. Returns -1 if the file was not open, * 0 otherwise. */ static int flowoplib_closefile(threadflow_t *threadflow, flowop_t *flowop) { filesetentry_t *file; int fd = flowop->fo_fdnumber; if (threadflow->tf_fd[fd] == 0) { filebench_log(LOG_ERROR, "flowop %s attempted to close an already closed fd %d", flowop->fo_name, fd); return (-1); } /* Measure time to close */ flowop_beginop(threadflow, flowop); (void) close(threadflow->tf_fd[fd]); flowop_endop(threadflow, flowop); file = threadflow->tf_fse[fd]; threadflow->tf_fd[fd] = 0; threadflow->tf_fse[fd] = NULL; filebench_log(LOG_DEBUG_SCRIPT, "closed file %s", file->fse_path); return (0); } /* * Emulate stat of a file. Picks an arbitrary filesetentry with * an existing file from the flowop's fileset, then performs a * stat() operation on it. Returns -1 if the flowop has no * associated fileset. Returns 1 if an appropriate filesetentry * cannot be found, and 0 on success. */ static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop) { filesetentry_t *file; fileset_t *fileset; char path[MAXPATHLEN]; char *pathtmp; if (flowop->fo_fileset == NULL) { filebench_log(LOG_ERROR, "flowop NULL file"); return (-1); } fileset = flowop->fo_fileset; if ((file = fileset_pick(flowop->fo_fileset, FILESET_PICKEXISTS, 0)) == NULL) { filebench_log(LOG_DEBUG_SCRIPT, "flowop %s failed to pick file", flowop->fo_name); return (1); } *path = 0; (void) strcpy(path, *fileset->fs_path); (void) strcat(path, "/"); (void) strcat(path, fileset->fs_name); pathtmp = fileset_resolvepath(file); (void) strcat(path, pathtmp); free(pathtmp); flowop_beginop(threadflow, flowop); flowop_endop(threadflow, flowop); (void) ipc_mutex_unlock(&file->fse_lock); return (0); } /* * Additional reads and writes. Read and write whole files, write * and append to files. Some of these work with both fileobjs and * filesets, others only with filesets. The flowoplib_write routine * writes from thread memory, while the others read or write using * fo_buf memory. Note that both flowoplib_read() and * flowoplib_aiowrite() use thread memory as well. */ /* * Emulate a read of a whole file. The file must be open * with file descriptor and filesetentry stored at the * locations indexed by the flowop's fdnumber. It then seeks * to the beginning of the associated file, and reads * FILE_ALLOC_BLOCK bytes at a time until the end of the * file. Returns -1 on error, 0 on success. */ static int flowoplib_readwholefile(threadflow_t *threadflow, flowop_t *flowop) { size_t memoffset; long memsize, round; off64_t bytes = 0; int fd = flowop->fo_fdnumber; int ret; if (threadflow->tf_fd[fd] == 0) { filebench_log(LOG_ERROR, "flowop %s attempted to read a closed fd %d", flowop->fo_name, fd); return (-1); } if ((flowop->fo_buf == NULL) && ((flowop->fo_buf = (char *)malloc(FILE_ALLOC_BLOCK)) == NULL)) return (-1); if (threadflow->tf_fse[fd] == NULL) { filebench_log(LOG_ERROR, "flowop %s: NULL file", flowop->fo_name); return (-1); } memsize = *threadflow->tf_memsize; round = *flowop->fo_iosize; if (filebench_randomno(&memoffset, memsize, round) == -1) { filebench_log(LOG_ERROR, "tf_memsize smaller than IO size for thread %s", flowop->fo_name); return (-1); } /* Measure time to read bytes */ flowop_beginop(threadflow, flowop); (void) lseek64(threadflow->tf_fd[fd], 0, SEEK_SET); while ((ret = read(threadflow->tf_fd[fd], flowop->fo_buf, FILE_ALLOC_BLOCK)) > 0) bytes += ret; flowop_endop(threadflow, flowop); if (ret < 0) { filebench_log(LOG_ERROR, "Failed to read fd %d: %s", fd, strerror(errno)); return (-1); } if (flowop->fo_iosize == NULL) flowop->fo_iosize = integer_alloc(bytes); *(flowop->fo_iosize) = bytes; return (0); } /* * Emulate a write to a file of size fo_iosize. Will write * to a file from a fileset if the flowop's fo_fileset field * specifies one or its fdnumber is non zero. Otherwise it * will write to a fileobj file, if one exists. If the file * is not currently open, the routine will attempt to open * it. The flowop's fo_wss parameter will be used to set the * maximum file size if it is non-zero, otherwise the * filesetentry's fse_size will be used. A random memory * buffer offset is calculated, and, if fo_random is TRUE, * a random file offset is used for the write. Otherwise the * write is to the next sequential location. Returns 1 on * errors, 0 on success. */ static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop) { size_t memoffset; vinteger_t wss; long memsize, round; int filedesc; if (flowop->fo_fileset || flowop->fo_fdnumber) { int fd = flowoplib_fdnum(threadflow, flowop); if (fd == -1) return (-1); if (threadflow->tf_fd[fd] == 0) { (void) flowoplib_openfile_common(threadflow, flowop, fd); filebench_log(LOG_DEBUG_IMPL, "read opened file %s", threadflow->tf_fse[fd]->fse_path); } filedesc = threadflow->tf_fd[fd]; if (*flowop->fo_wss == 0) wss = threadflow->tf_fse[fd]->fse_size; else wss = *flowop->fo_wss; } else { if (flowop->fo_file == NULL) { filebench_log(LOG_ERROR, "flowop NULL file"); return (-1); } if (flowop->fo_fd < 0) flowop->fo_fd = fileobj_open(flowop->fo_file, flowoplib_fileattrs(flowop)); if (flowop->fo_fd < 0) { filebench_log(LOG_ERROR, "failed to open file %s", flowop->fo_file->fo_name); return (-1); } filedesc = flowop->fo_fd; if (*flowop->fo_wss == 0) wss = *flowop->fo_file->fo_size; else wss = *flowop->fo_wss; } if ((flowop->fo_buf == NULL) && ((flowop->fo_buf = (char *)malloc(FILE_ALLOC_BLOCK)) == NULL)) { return (-1); } if (*flowop->fo_iosize == 0) { filebench_log(LOG_ERROR, "zero iosize for thread %s", flowop->fo_name); return (-1); } memsize = *threadflow->tf_memsize; round = *flowop->fo_iosize; /* Select memory offset for IO */ if (filebench_randomno(&memoffset, memsize, round) == -1) { filebench_log(LOG_ERROR, "tf_memsize smaller than IO size for thread %s", flowop->fo_name); return (-1); } if (*flowop->fo_random) { uint64_t fileoffset; if (filebench_randomno64(&fileoffset, wss, *flowop->fo_iosize) == -1) { filebench_log(LOG_ERROR, "file size smaller than IO size for thread %s", flowop->fo_name); return (-1); } flowop_beginop(threadflow, flowop); if (pwrite64(filedesc, threadflow->tf_mem + memoffset, *flowop->fo_iosize, (off64_t)fileoffset) == -1) { filebench_log(LOG_ERROR, "write failed, " "offset %lld memoffset %zd: %s", fileoffset, memoffset, strerror(errno)); flowop_endop(threadflow, flowop); return (-1); } flowop_endop(threadflow, flowop); } else { flowop_beginop(threadflow, flowop); if (write(filedesc, threadflow->tf_mem + memoffset, *flowop->fo_iosize) == -1) { filebench_log(LOG_ERROR, "write failed, memoffset %zd: %s", memoffset, strerror(errno)); flowop_endop(threadflow, flowop); return (-1); } flowop_endop(threadflow, flowop); } return (0); } /* * Emulate a write of a whole file. The size of the file * is taken from a filesetentry identified by fo_srcfdnumber, * while the file descriptor used is identified by * fo_fdnumber. Does multiple writes of FILE_ALLOC_BLOCK * length until full file has been written. Returns -1 on * error, 0 on success and sets flowop->fo_iosize to the * number of bytes actually written. */ static int flowoplib_writewholefile(threadflow_t *threadflow, flowop_t *flowop) { size_t memoffset; filesetentry_t *file; int wsize; off64_t seek; off64_t bytes = 0; long memsize, round; int fd = flowop->fo_fdnumber; int srcfd = flowop->fo_srcfdnumber; int ret; if (threadflow->tf_fd[fd] == 0) { filebench_log(LOG_ERROR, "flowop %s attempted to write a closed fd %d", flowop->fo_name, fd); return (-1); } if ((flowop->fo_buf == NULL) && ((flowop->fo_buf = (char *)malloc(FILE_ALLOC_BLOCK)) == NULL)) { return (-1); } memsize = *threadflow->tf_memsize; round = *flowop->fo_iosize; if (filebench_randomno(&memoffset, memsize, round) == -1) { filebench_log(LOG_ERROR, "tf_memsize smaller than IO size for thread %s", flowop->fo_name); } file = threadflow->tf_fse[srcfd]; if (((srcfd != 0) && (file == NULL)) || ((file = threadflow->tf_fse[fd]) == NULL)) { filebench_log(LOG_ERROR, "flowop %s: NULL file", flowop->fo_name); return (-1); } wsize = MIN(file->fse_size, FILE_ALLOC_BLOCK); /* Measure time to write bytes */ flowop_beginop(threadflow, flowop); for (seek = 0; seek < file->fse_size; seek += FILE_ALLOC_BLOCK) { ret = write(threadflow->tf_fd[fd], flowop->fo_buf, wsize); if (ret != wsize) { filebench_log(LOG_ERROR, "Failed to write %d bytes on fd %d: %s", threadflow->tf_fd[fd], fd, strerror(errno)); flowop_endop(threadflow, flowop); return (-1); } bytes += ret; } flowop_endop(threadflow, flowop); if (flowop->fo_iosize == NULL) flowop->fo_iosize = integer_alloc(bytes); *(flowop->fo_iosize) = bytes; return (0); } /* * Emulate a fixed size append to a file. Will append data to * a file chosen from a fileset if the flowop's fo_fileset * field specifies one or if its fdnumber is non zero. * Otherwise it will write to a fileobj file, if one exists. * The flowop's fo_wss parameter will be used to set the * maximum file size if it is non-zero, otherwise the * filesetentry's fse_size will be used. A random memory * buffer offset is calculated, then a logical seek to the * end of file is done followed by a write of fo_iosize * bytes. Writes are actually done from fo_buf, rather than * tf_mem as is done with flowoplib_write(), and no check * is made to see if fo_iosize exceeds the size of fo_buf. * Returns -1 on error, 0 on success. */ static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop) { size_t memoffset; off64_t wsize; long memsize; int fd, filedesc; /* LINTED E_FUNC_SET_NOT_USED */ vinteger_t wss; int ret; if (flowop->fo_fileset || flowop->fo_fdnumber) { fd = flowoplib_fdnum(threadflow, flowop); if (fd == -1) return (-1); if (threadflow->tf_fd[fd] == 0) { (void) flowoplib_openfile_common(threadflow, flowop, fd); filebench_log(LOG_DEBUG_IMPL, "read opened file %s", threadflow->tf_fse[fd]->fse_path); } filedesc = threadflow->tf_fd[fd]; if (*flowop->fo_wss == 0) wss = threadflow->tf_fse[fd]->fse_size; else wss = *flowop->fo_wss; } else { if (flowop->fo_file == NULL) { filebench_log(LOG_ERROR, "flowop NULL file"); return (-1); } if (flowop->fo_fd < 0) flowop->fo_fd = fileobj_open(flowop->fo_file, flowoplib_fileattrs(flowop)); if (flowop->fo_fd < 0) { filebench_log(LOG_ERROR, "failed to open file %s", flowop->fo_file->fo_name); return (-1); } filedesc = flowop->fo_fd; if (*flowop->fo_wss == 0) wss = *flowop->fo_file->fo_size; else wss = *flowop->fo_wss; } /* XXX wss is not being used */ if ((flowop->fo_buf == NULL) && ((flowop->fo_buf = (char *)malloc(FILE_ALLOC_BLOCK)) == NULL)) { return (-1); } memsize = *threadflow->tf_memsize; wsize = *flowop->fo_iosize; if (filebench_randomno(&memoffset, memsize, wsize) == -1) { filebench_log(LOG_ERROR, "tf_memsize smaller than IO size for thread %s", flowop->fo_name); return (-1); } /* Measure time to write bytes */ flowop_beginop(threadflow, flowop); (void) lseek64(filedesc, 0, SEEK_END); ret = write(filedesc, flowop->fo_buf, wsize); if (ret != wsize) { filebench_log(LOG_ERROR, "Failed to write %d bytes on fd %d: %s", wsize, fd, strerror(errno)); flowop_endop(threadflow, flowop); return (-1); } flowop_endop(threadflow, flowop); return (0); } /* * Emulate a random size append to a file. Will append data * to a file chosen from a fileset if the flowop's fo_fileset * field specifies one or if its fdnumber is non zero. Otherwise * it will write to a fileobj file, if one exists. The flowop's * fo_wss parameter will be used to set the maximum file size * if it is non-zero, otherwise the filesetentry's fse_size * will be used. A random transfer size (but at most fo_iosize * bytes) and a random memory offset are calculated. A logical * seek to the end of file is done, then writes of up to * FILE_ALLOC_BLOCK in size are done until the full transfer * size has been written. Writes are actually done from fo_buf, * rather than tf_mem as is done with flowoplib_write(). * Returns -1 on error, 0 on success. */ static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop) { size_t memoffset; uint64_t appendsize; off64_t seek; long memsize, round; int fd, filedesc; /* LINTED E_FUNC_SET_NOT_USED */ vinteger_t wss; if (flowop->fo_fileset || flowop->fo_fdnumber) { fd = flowoplib_fdnum(threadflow, flowop); if (fd == -1) return (-1); if (threadflow->tf_fd[fd] == 0) { (void) flowoplib_openfile_common(threadflow, flowop, fd); filebench_log(LOG_DEBUG_IMPL, "append opened file %s", threadflow->tf_fse[fd]->fse_path); } filedesc = threadflow->tf_fd[fd]; if (*flowop->fo_wss == 0) wss = threadflow->tf_fse[fd]->fse_size; else wss = *flowop->fo_wss; } else { if (flowop->fo_file == NULL) { filebench_log(LOG_ERROR, "flowop NULL file"); return (-1); } if (flowop->fo_fd < 0) flowop->fo_fd = fileobj_open(flowop->fo_file, flowoplib_fileattrs(flowop)); if (flowop->fo_fd < 0) { filebench_log(LOG_ERROR, "failed to open file %s", flowop->fo_file->fo_name); return (-1); } filedesc = flowop->fo_fd; if (*flowop->fo_wss == 0) wss = *flowop->fo_file->fo_size; else wss = *flowop->fo_wss; } /* XXX wss is not being used */ if ((flowop->fo_buf == NULL) && ((flowop->fo_buf = (char *)malloc(FILE_ALLOC_BLOCK)) == NULL)) { return (-1); } memsize = *threadflow->tf_memsize; round = *flowop->fo_iosize; if (filebench_randomno(&memoffset, memsize, round) == -1) { filebench_log(LOG_ERROR, "tf_memsize smaller than IO size" "for thread %s", flowop->fo_name); return (-1); } if (filebench_randomno64(&appendsize, *flowop->fo_iosize, 1LL) == -1) { filebench_log(LOG_ERROR, "tf_memsize smaller than IO size" "for thread %s", flowop->fo_name); return (-1); } /* Measure time to write bytes */ flowop_beginop(threadflow, flowop); for (seek = 0; seek < appendsize; seek += FILE_ALLOC_BLOCK) { off64_t wsize; int ret = 0; (void) lseek64(filedesc, 0, SEEK_END); wsize = ((appendsize - seek) > FILE_ALLOC_BLOCK) ? FILE_ALLOC_BLOCK : (appendsize - seek); ret = write(filedesc, flowop->fo_buf, wsize); if (ret != wsize) { filebench_log(LOG_ERROR, "Failed to write %d bytes on fd %d: %s", wsize, fd, strerror(errno)); flowop_endop(threadflow, flowop); return (-1); } } flowop_endop(threadflow, flowop); return (0); } /* * Prints usage information for flowop operations. */ void flowoplib_usage() { (void) fprintf(stderr, "flowop [openfile|createfile] name=,fileset=\n"); (void) fprintf(stderr, " [,fd=]\n"); (void) fprintf(stderr, "\n"); (void) fprintf(stderr, "flowop closefile name=,fd=]\n"); (void) fprintf(stderr, "\n"); (void) fprintf(stderr, "flowop deletefile name=\n"); (void) fprintf(stderr, " [,fileset=]\n"); (void) fprintf(stderr, " [,fd=]\n"); (void) fprintf(stderr, "\n"); (void) fprintf(stderr, "flowop statfile name=\n"); (void) fprintf(stderr, " [,fileset=]\n"); (void) fprintf(stderr, " [,fd=]\n"); (void) fprintf(stderr, "\n"); (void) fprintf(stderr, "flowop fsync name=,fd=]\n"); (void) fprintf(stderr, "\n"); (void) fprintf(stderr, "flowop fsyncset name=,fileset=]\n"); (void) fprintf(stderr, "\n"); (void) fprintf(stderr, "flowop [write|read|aiowrite] name=, \n"); (void) fprintf(stderr, " filename|fileset=,\n"); (void) fprintf(stderr, " iosize=\n"); (void) fprintf(stderr, " [,directio]\n"); (void) fprintf(stderr, " [,dsync]\n"); (void) fprintf(stderr, " [,iters=]\n"); (void) fprintf(stderr, " [,random]\n"); (void) fprintf(stderr, " [,opennext]\n"); (void) fprintf(stderr, " [,workingset=]\n"); (void) fprintf(stderr, "flowop [appendfile|appendfilerand] name=, \n"); (void) fprintf(stderr, " filename|fileset=,\n"); (void) fprintf(stderr, " iosize=\n"); (void) fprintf(stderr, " [,dsync]\n"); (void) fprintf(stderr, " [,iters=]\n"); (void) fprintf(stderr, " [,workingset=]\n"); (void) fprintf(stderr, "flowop [readwholefile|writewholefile] name=, \n"); (void) fprintf(stderr, " filename|fileset=,\n"); (void) fprintf(stderr, " iosize=\n"); (void) fprintf(stderr, " [,dsync]\n"); (void) fprintf(stderr, " [,iters=]\n"); (void) fprintf(stderr, "\n"); (void) fprintf(stderr, "flowop aiowait name=,target=" "\n"); (void) fprintf(stderr, "\n"); (void) fprintf(stderr, "flowop sempost name=," "target=,\n"); (void) fprintf(stderr, " value=\n"); (void) fprintf(stderr, "\n"); (void) fprintf(stderr, "flowop semblock name=,value=" ",\n"); (void) fprintf(stderr, " highwater=" "\n"); (void) fprintf(stderr, "\n"); (void) fprintf(stderr, "flowop block name=\n"); (void) fprintf(stderr, "\n"); (void) fprintf(stderr, "flowop wakeup name=,target=,\n"); (void) fprintf(stderr, "\n"); (void) fprintf(stderr, "flowop hog name=,value=\n"); (void) fprintf(stderr, "flowop delay name=,value=\n"); (void) fprintf(stderr, "\n"); (void) fprintf(stderr, "flowop eventlimit name=\n"); (void) fprintf(stderr, "flowop bwlimit name=,value=\n"); (void) fprintf(stderr, "flowop iopslimit name=,value=\n"); (void) fprintf(stderr, "flowop finishoncount name=,value=\n"); (void) fprintf(stderr, "flowop finishonbytes name=,value=\n"); (void) fprintf(stderr, "\n"); (void) fprintf(stderr, "\n"); }