#include "internal.h"
typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry);
DISPATCH_EXPORT DISPATCH_NOTHROW
void _dispatch_iocntl(uint32_t param, uint64_t value);
static void _dispatch_io_dispose(dispatch_io_t channel);
static dispatch_operation_t _dispatch_operation_create(
dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset,
size_t length, dispatch_data_t data, dispatch_queue_t queue,
dispatch_io_handler_t handler);
static void _dispatch_operation_dispose(dispatch_operation_t operation);
static void _dispatch_operation_enqueue(dispatch_operation_t op,
dispatch_op_direction_t direction, dispatch_data_t data);
static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq,
dispatch_operation_t op);
static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry);
static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry);
static void _dispatch_fd_entry_init_async(dispatch_fd_t fd,
dispatch_fd_entry_init_callback_t completion_callback);
static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd,
uintptr_t hash);
static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path(
dispatch_io_path_data_t path_data, dev_t dev, mode_t mode);
static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry,
dispatch_io_t channel);
static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
dispatch_io_t channel);
static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry,
dispatch_queue_t tq);
static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
dispatch_op_direction_t direction);
static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev);
static void _dispatch_disk_dispose(dispatch_disk_t disk);
static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
dispatch_operation_t operation, dispatch_data_t data);
static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk,
dispatch_operation_t operation, dispatch_data_t data);
static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
dispatch_io_t channel);
static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk,
dispatch_io_t channel);
static void _dispatch_stream_source_handler(void *ctx);
static void _dispatch_stream_handler(void *ctx);
static void _dispatch_disk_handler(void *ctx);
static void _dispatch_disk_perform(void *ctxt);
static void _dispatch_operation_advise(dispatch_operation_t op,
size_t chunk_size);
static int _dispatch_operation_perform(dispatch_operation_t op);
static void _dispatch_operation_deliver_data(dispatch_operation_t op,
dispatch_op_flags_t flags);
#define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
case EINTR: continue; \
__VA_ARGS__ \
} \
} while (0)
#define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
_dispatch_io_syscall_switch_noerr(__err, __syscall, \
case 0: break; \
__VA_ARGS__ \
); \
} while (0)
#define _dispatch_io_syscall(__syscall) do { int __err; \
_dispatch_io_syscall_switch(__err, __syscall); \
} while (0)
enum {
DISPATCH_OP_COMPLETE = 1,
DISPATCH_OP_DELIVER,
DISPATCH_OP_DELIVER_AND_COMPLETE,
DISPATCH_OP_COMPLETE_RESUME,
DISPATCH_OP_RESUME,
DISPATCH_OP_ERR,
DISPATCH_OP_FD_ERR,
};
#pragma mark -
#pragma mark dispatch_io_vtable
static const struct dispatch_io_vtable_s _dispatch_io_vtable = {
.do_type = DISPATCH_IO_TYPE,
.do_kind = "channel",
.do_dispose = _dispatch_io_dispose,
.do_invoke = NULL,
.do_probe = (void *)dummy_function_r0,
.do_debug = (void *)dummy_function_r0,
};
static const struct dispatch_operation_vtable_s _dispatch_operation_vtable = {
.do_type = DISPATCH_OPERATION_TYPE,
.do_kind = "operation",
.do_dispose = _dispatch_operation_dispose,
.do_invoke = NULL,
.do_probe = (void *)dummy_function_r0,
.do_debug = (void *)dummy_function_r0,
};
static const struct dispatch_disk_vtable_s _dispatch_disk_vtable = {
.do_type = DISPATCH_DISK_TYPE,
.do_kind = "disk",
.do_dispose = _dispatch_disk_dispose,
.do_invoke = NULL,
.do_probe = (void *)dummy_function_r0,
.do_debug = (void *)dummy_function_r0,
};
#pragma mark -
#pragma mark dispatch_io_hashtables
#if TARGET_OS_EMBEDDED
#define DIO_HASH_SIZE 64u // must be a power of two
#else
#define DIO_HASH_SIZE 256u // must be a power of two
#endif
#define DIO_HASH(x) ((uintptr_t)((x) & (DIO_HASH_SIZE - 1)))
DISPATCH_CACHELINE_ALIGN
static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE];
DISPATCH_CACHELINE_ALIGN
static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE];
static dispatch_once_t _dispatch_io_devs_lockq_pred;
static dispatch_queue_t _dispatch_io_devs_lockq;
static dispatch_queue_t _dispatch_io_fds_lockq;
static void
_dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED)
{
_dispatch_io_fds_lockq = dispatch_queue_create(
"com.apple.libdispatch-io.fd_lockq", NULL);
unsigned int i;
for (i = 0; i < DIO_HASH_SIZE; i++) {
TAILQ_INIT(&_dispatch_io_fds[i]);
}
}
static void
_dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED)
{
_dispatch_io_devs_lockq = dispatch_queue_create(
"com.apple.libdispatch-io.dev_lockq", NULL);
unsigned int i;
for (i = 0; i < DIO_HASH_SIZE; i++) {
TAILQ_INIT(&_dispatch_io_devs[i]);
}
}
#pragma mark -
#pragma mark dispatch_io_defaults
enum {
DISPATCH_IOCNTL_CHUNK_PAGES = 1,
DISPATCH_IOCNTL_LOW_WATER_CHUNKS,
DISPATCH_IOCNTL_INITIAL_DELIVERY,
DISPATCH_IOCNTL_MAX_PENDING_IO_REQS,
};
static struct dispatch_io_defaults_s {
size_t chunk_pages, low_water_chunks, max_pending_io_reqs;
bool initial_delivery;
} dispatch_io_defaults = {
.chunk_pages = DIO_MAX_CHUNK_PAGES,
.low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS,
.max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS,
};
#define _dispatch_iocntl_set_default(p, v) do { \
dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
} while (0)
void
_dispatch_iocntl(uint32_t param, uint64_t value)
{
switch (param) {
case DISPATCH_IOCNTL_CHUNK_PAGES:
_dispatch_iocntl_set_default(chunk_pages, value);
break;
case DISPATCH_IOCNTL_LOW_WATER_CHUNKS:
_dispatch_iocntl_set_default(low_water_chunks, value);
break;
case DISPATCH_IOCNTL_INITIAL_DELIVERY:
_dispatch_iocntl_set_default(initial_delivery, value);
case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS:
_dispatch_iocntl_set_default(max_pending_io_reqs, value);
break;
}
}
#pragma mark -
#pragma mark dispatch_io_t
static dispatch_io_t
_dispatch_io_create(dispatch_io_type_t type)
{
dispatch_io_t channel = calloc(1ul, sizeof(struct dispatch_io_s));
channel->do_vtable = &_dispatch_io_vtable;
channel->do_next = DISPATCH_OBJECT_LISTLESS;
channel->do_ref_cnt = 1;
channel->do_xref_cnt = 1;
channel->do_targetq = _dispatch_get_root_queue(0, true);
channel->params.type = type;
channel->params.high = SIZE_MAX;
channel->params.low = dispatch_io_defaults.low_water_chunks *
dispatch_io_defaults.chunk_pages * PAGE_SIZE;
channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq",
NULL);
return channel;
}
static void
_dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry,
dispatch_queue_t queue, int err, void (^cleanup_handler)(int))
{
if (cleanup_handler) {
_dispatch_retain(queue);
dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{
dispatch_async(queue, ^{
_dispatch_io_debug("cleanup handler invoke", -1);
cleanup_handler(err);
});
_dispatch_release(queue);
});
}
if (fd_entry) {
channel->fd_entry = fd_entry;
dispatch_retain(fd_entry->barrier_queue);
dispatch_retain(fd_entry->barrier_group);
channel->barrier_queue = fd_entry->barrier_queue;
channel->barrier_group = fd_entry->barrier_group;
} else {
channel->barrier_queue = dispatch_queue_create(
"com.apple.libdispatch-io.barrierq", NULL);
channel->barrier_group = dispatch_group_create();
}
}
static void
_dispatch_io_dispose(dispatch_io_t channel)
{
if (channel->fd_entry && !(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
if (channel->fd_entry->path_data) {
channel->fd_entry->path_data->channel = NULL;
}
_dispatch_fd_entry_release(channel->fd_entry);
}
if (channel->queue) {
dispatch_release(channel->queue);
}
if (channel->barrier_queue) {
dispatch_release(channel->barrier_queue);
}
if (channel->barrier_group) {
dispatch_release(channel->barrier_group);
}
_dispatch_dispose(channel);
}
static int
_dispatch_io_validate_type(dispatch_io_t channel, mode_t mode)
{
int err = 0;
if (S_ISDIR(mode)) {
err = EISDIR;
} else if (channel->params.type == DISPATCH_IO_RANDOM &&
(S_ISFIFO(mode) || S_ISSOCK(mode))) {
err = ESPIPE;
}
return err;
}
static int
_dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel,
bool ignore_closed)
{
int err;
if (op) {
channel = op->channel;
}
if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) {
err = ECANCELED;
} else {
err = 0;
}
} else {
err = op ? op->fd_entry->err : channel->err;
}
return err;
}
#pragma mark -
#pragma mark dispatch_io_channels
dispatch_io_t
dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd,
dispatch_queue_t queue, void (^cleanup_handler)(int))
{
if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
return NULL;
}
_dispatch_io_debug("io create", fd);
dispatch_io_t channel = _dispatch_io_create(type);
channel->fd = fd;
channel->fd_actual = fd;
dispatch_suspend(channel->queue);
_dispatch_retain(queue);
_dispatch_retain(channel);
_dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
int err = fd_entry->err;
if (!err) {
err = _dispatch_io_validate_type(channel, fd_entry->stat.mode);
}
if (!err && type == DISPATCH_IO_RANDOM) {
off_t f_ptr;
_dispatch_io_syscall_switch_noerr(err,
f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR),
case 0: channel->f_ptr = f_ptr; break;
default: (void)dispatch_assume_zero(err); break;
);
}
channel->err = err;
_dispatch_fd_entry_retain(fd_entry);
_dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler);
dispatch_resume(channel->queue);
_dispatch_release(channel);
_dispatch_release(queue);
});
return channel;
}
dispatch_io_t
dispatch_io_create_with_path(dispatch_io_type_t type, const char *path,
int oflag, mode_t mode, dispatch_queue_t queue,
void (^cleanup_handler)(int error))
{
if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) ||
!(path && *path == '/')) {
return NULL;
}
size_t pathlen = strlen(path);
dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1);
if (!path_data) {
return NULL;
}
_dispatch_io_debug("io create with path %s", -1, path);
dispatch_io_t channel = _dispatch_io_create(type);
channel->fd = -1;
channel->fd_actual = -1;
path_data->channel = channel;
path_data->oflag = oflag;
path_data->mode = mode;
path_data->pathlen = pathlen;
memcpy(path_data->path, path, pathlen + 1);
_dispatch_retain(queue);
_dispatch_retain(channel);
dispatch_async(channel->queue, ^{
int err = 0;
struct stat st;
_dispatch_io_syscall_switch_noerr(err,
(path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW ||
(path_data->oflag & O_SYMLINK) == O_SYMLINK ?
lstat(path_data->path, &st) : stat(path_data->path, &st),
case 0:
err = _dispatch_io_validate_type(channel, st.st_mode);
break;
default:
if ((path_data->oflag & O_CREAT) &&
(*(path_data->path + path_data->pathlen - 1) != '/')) {
char *c = strrchr(path_data->path, '/');
dispatch_assert(c);
*c = 0;
int perr;
_dispatch_io_syscall_switch_noerr(perr,
stat(path_data->path, &st),
case 0:
st.st_mode = S_IFREG;
err = 0;
break;
);
*c = '/';
}
break;
);
channel->err = err;
if (err) {
free(path_data);
_dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
_dispatch_release(channel);
_dispatch_release(queue);
return;
}
dispatch_suspend(channel->queue);
dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
_dispatch_io_devs_lockq_init);
dispatch_async(_dispatch_io_devs_lockq, ^{
dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path(
path_data, st.st_dev, st.st_mode);
_dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
dispatch_resume(channel->queue);
_dispatch_release(channel);
_dispatch_release(queue);
});
});
return channel;
}
dispatch_io_t
dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel,
dispatch_queue_t queue, void (^cleanup_handler)(int error))
{
if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
return NULL;
}
_dispatch_io_debug("io create with io %p", -1, in_channel);
dispatch_io_t channel = _dispatch_io_create(type);
dispatch_suspend(channel->queue);
_dispatch_retain(queue);
_dispatch_retain(channel);
_dispatch_retain(in_channel);
dispatch_async(in_channel->queue, ^{
int err0 = _dispatch_io_get_error(NULL, in_channel, false);
if (err0) {
channel->err = err0;
_dispatch_io_init(channel, NULL, queue, err0, cleanup_handler);
dispatch_resume(channel->queue);
_dispatch_release(channel);
_dispatch_release(in_channel);
_dispatch_release(queue);
return;
}
dispatch_async(in_channel->barrier_queue, ^{
int err = _dispatch_io_get_error(NULL, in_channel, false);
if (!err) {
err = in_channel->err;
if (!err) {
err = in_channel->fd_entry->err;
}
}
if (!err) {
err = _dispatch_io_validate_type(channel,
in_channel->fd_entry->stat.mode);
}
if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) {
off_t f_ptr;
_dispatch_io_syscall_switch_noerr(err,
f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR),
case 0: channel->f_ptr = f_ptr; break;
default: (void)dispatch_assume_zero(err); break;
);
}
channel->err = err;
if (err) {
_dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
dispatch_resume(channel->queue);
_dispatch_release(channel);
_dispatch_release(in_channel);
_dispatch_release(queue);
return;
}
if (in_channel->fd == -1) {
channel->fd = -1;
channel->fd_actual = -1;
mode_t mode = in_channel->fd_entry->stat.mode;
dev_t dev = in_channel->fd_entry->stat.dev;
size_t path_data_len = sizeof(struct dispatch_io_path_data_s) +
in_channel->fd_entry->path_data->pathlen + 1;
dispatch_io_path_data_t path_data = malloc(path_data_len);
memcpy(path_data, in_channel->fd_entry->path_data,
path_data_len);
path_data->channel = channel;
dispatch_async(_dispatch_io_devs_lockq, ^{
dispatch_fd_entry_t fd_entry;
fd_entry = _dispatch_fd_entry_create_with_path(path_data,
dev, mode);
_dispatch_io_init(channel, fd_entry, queue, 0,
cleanup_handler);
dispatch_resume(channel->queue);
_dispatch_release(channel);
_dispatch_release(queue);
});
} else {
dispatch_fd_entry_t fd_entry = in_channel->fd_entry;
channel->fd = in_channel->fd;
channel->fd_actual = in_channel->fd_actual;
_dispatch_fd_entry_retain(fd_entry);
_dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
dispatch_resume(channel->queue);
_dispatch_release(channel);
_dispatch_release(queue);
}
_dispatch_release(in_channel);
});
});
return channel;
}
#pragma mark -
#pragma mark dispatch_io_accessors
void
dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water)
{
_dispatch_retain(channel);
dispatch_async(channel->queue, ^{
_dispatch_io_debug("io set high water", channel->fd);
if (channel->params.low > high_water) {
channel->params.low = high_water;
}
channel->params.high = high_water ? high_water : 1;
_dispatch_release(channel);
});
}
void
dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water)
{
_dispatch_retain(channel);
dispatch_async(channel->queue, ^{
_dispatch_io_debug("io set low water", channel->fd);
if (channel->params.high < low_water) {
channel->params.high = low_water ? low_water : 1;
}
channel->params.low = low_water;
_dispatch_release(channel);
});
}
void
dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval,
unsigned long flags)
{
_dispatch_retain(channel);
dispatch_async(channel->queue, ^{
_dispatch_io_debug("io set interval", channel->fd);
channel->params.interval = interval;
channel->params.interval_flags = flags;
_dispatch_release(channel);
});
}
void
_dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq)
{
_dispatch_retain(dq);
_dispatch_retain(channel);
dispatch_async(channel->queue, ^{
dispatch_queue_t prev_dq = channel->do_targetq;
channel->do_targetq = dq;
_dispatch_release(prev_dq);
_dispatch_release(channel);
});
}
dispatch_fd_t
dispatch_io_get_descriptor(dispatch_io_t channel)
{
if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
return -1;
}
dispatch_fd_t fd = channel->fd_actual;
if (fd == -1 &&
_dispatch_thread_getspecific(dispatch_io_key) == channel) {
dispatch_fd_entry_t fd_entry = channel->fd_entry;
(void)_dispatch_fd_entry_open(fd_entry, channel);
}
return channel->fd_actual;
}
#pragma mark -
#pragma mark dispatch_io_operations
static void
_dispatch_io_stop(dispatch_io_t channel)
{
_dispatch_io_debug("io stop", channel->fd);
(void)dispatch_atomic_or2o(channel, atomic_flags, DIO_STOPPED);
_dispatch_retain(channel);
dispatch_async(channel->queue, ^{
dispatch_async(channel->barrier_queue, ^{
dispatch_fd_entry_t fd_entry = channel->fd_entry;
if (fd_entry) {
_dispatch_io_debug("io stop cleanup", channel->fd);
_dispatch_fd_entry_cleanup_operations(fd_entry, channel);
if (!(channel->atomic_flags & DIO_CLOSED)) {
channel->fd_entry = NULL;
_dispatch_fd_entry_release(fd_entry);
}
} else if (channel->fd != -1) {
_dispatch_retain(channel);
dispatch_async(_dispatch_io_fds_lockq, ^{
_dispatch_io_debug("io stop after close cleanup",
channel->fd);
dispatch_fd_entry_t fdi;
uintptr_t hash = DIO_HASH(channel->fd);
TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) {
if (fdi->fd == channel->fd) {
_dispatch_fd_entry_cleanup_operations(fdi, channel);
break;
}
}
_dispatch_release(channel);
});
}
_dispatch_release(channel);
});
});
}
void
dispatch_io_close(dispatch_io_t channel, unsigned long flags)
{
if (flags & DISPATCH_IO_STOP) {
if (channel->atomic_flags & DIO_STOPPED) {
return;
}
return _dispatch_io_stop(channel);
}
if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
return;
}
_dispatch_retain(channel);
dispatch_async(channel->queue, ^{
dispatch_async(channel->barrier_queue, ^{
_dispatch_io_debug("io close", channel->fd);
if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
(void)dispatch_atomic_or2o(channel, atomic_flags, DIO_CLOSED);
dispatch_fd_entry_t fd_entry = channel->fd_entry;
if (!fd_entry->path_data) {
channel->fd_entry = NULL;
}
_dispatch_fd_entry_release(fd_entry);
}
_dispatch_release(channel);
});
});
}
void
dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier)
{
_dispatch_retain(channel);
dispatch_async(channel->queue, ^{
dispatch_queue_t io_q = channel->do_targetq;
dispatch_queue_t barrier_queue = channel->barrier_queue;
dispatch_group_t barrier_group = channel->barrier_group;
dispatch_async(barrier_queue, ^{
dispatch_suspend(barrier_queue);
dispatch_group_notify(barrier_group, io_q, ^{
_dispatch_thread_setspecific(dispatch_io_key, channel);
barrier();
_dispatch_thread_setspecific(dispatch_io_key, NULL);
dispatch_resume(barrier_queue);
_dispatch_release(channel);
});
});
});
}
void
dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length,
dispatch_queue_t queue, dispatch_io_handler_t handler)
{
_dispatch_retain(channel);
_dispatch_retain(queue);
dispatch_async(channel->queue, ^{
dispatch_operation_t op;
op = _dispatch_operation_create(DOP_DIR_READ, channel, offset,
length, dispatch_data_empty, queue, handler);
if (op) {
dispatch_queue_t barrier_q = channel->barrier_queue;
dispatch_async(barrier_q, ^{
_dispatch_operation_enqueue(op, DOP_DIR_READ,
dispatch_data_empty);
});
}
_dispatch_release(channel);
_dispatch_release(queue);
});
}
void
dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data,
dispatch_queue_t queue, dispatch_io_handler_t handler)
{
_dispatch_io_data_retain(data);
_dispatch_retain(channel);
_dispatch_retain(queue);
dispatch_async(channel->queue, ^{
dispatch_operation_t op;
op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset,
dispatch_data_get_size(data), data, queue, handler);
if (op) {
dispatch_queue_t barrier_q = channel->barrier_queue;
dispatch_async(barrier_q, ^{
_dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
_dispatch_io_data_release(data);
});
} else {
_dispatch_io_data_release(data);
}
_dispatch_release(channel);
_dispatch_release(queue);
});
}
void
dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
void (^handler)(dispatch_data_t, int))
{
_dispatch_retain(queue);
_dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
if (fd_entry->err) {
int err = fd_entry->err;
dispatch_async(queue, ^{
_dispatch_io_debug("convenience handler invoke", fd);
handler(dispatch_data_empty, err);
});
_dispatch_release(queue);
return;
}
dispatch_io_t channel = fd_entry->convenience_channel;
if (!channel) {
channel = _dispatch_io_create(DISPATCH_IO_STREAM);
channel->fd = fd;
channel->fd_actual = fd;
channel->fd_entry = fd_entry;
dispatch_retain(fd_entry->barrier_queue);
dispatch_retain(fd_entry->barrier_group);
channel->barrier_queue = fd_entry->barrier_queue;
channel->barrier_group = fd_entry->barrier_group;
fd_entry->convenience_channel = channel;
}
__block dispatch_data_t deliver_data = dispatch_data_empty;
__block int err = 0;
dispatch_async(fd_entry->close_queue, ^{
dispatch_async(queue, ^{
_dispatch_io_debug("convenience handler invoke", fd);
handler(deliver_data, err);
_dispatch_io_data_release(deliver_data);
});
_dispatch_release(queue);
});
dispatch_operation_t op =
_dispatch_operation_create(DOP_DIR_READ, channel, 0,
length, dispatch_data_empty,
_dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
false), ^(bool done, dispatch_data_t data, int error) {
if (data) {
data = dispatch_data_create_concat(deliver_data, data);
_dispatch_io_data_release(deliver_data);
deliver_data = data;
}
if (done) {
err = error;
}
});
if (op) {
_dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty);
}
});
}
void
dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
void (^handler)(dispatch_data_t, int))
{
_dispatch_io_data_retain(data);
_dispatch_retain(queue);
_dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
if (fd_entry->err) {
int err = fd_entry->err;
dispatch_async(queue, ^{
_dispatch_io_debug("convenience handler invoke", fd);
handler(NULL, err);
});
_dispatch_release(queue);
return;
}
dispatch_io_t channel = fd_entry->convenience_channel;
if (!channel) {
channel = _dispatch_io_create(DISPATCH_IO_STREAM);
channel->fd = fd;
channel->fd_actual = fd;
channel->fd_entry = fd_entry;
dispatch_retain(fd_entry->barrier_queue);
dispatch_retain(fd_entry->barrier_group);
channel->barrier_queue = fd_entry->barrier_queue;
channel->barrier_group = fd_entry->barrier_group;
fd_entry->convenience_channel = channel;
}
__block dispatch_data_t deliver_data = NULL;
__block int err = 0;
dispatch_async(fd_entry->close_queue, ^{
dispatch_async(queue, ^{
_dispatch_io_debug("convenience handler invoke", fd);
handler(deliver_data, err);
if (deliver_data) {
_dispatch_io_data_release(deliver_data);
}
});
_dispatch_release(queue);
});
dispatch_operation_t op =
_dispatch_operation_create(DOP_DIR_WRITE, channel, 0,
dispatch_data_get_size(data), data,
_dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
false), ^(bool done, dispatch_data_t d, int error) {
if (done) {
if (d) {
_dispatch_io_data_retain(d);
deliver_data = d;
}
err = error;
}
});
if (op) {
_dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
}
_dispatch_io_data_release(data);
});
}
#pragma mark -
#pragma mark dispatch_operation_t
static dispatch_operation_t
_dispatch_operation_create(dispatch_op_direction_t direction,
dispatch_io_t channel, off_t offset, size_t length,
dispatch_data_t data, dispatch_queue_t queue,
dispatch_io_handler_t handler)
{
dispatch_assert(direction < DOP_DIR_MAX);
_dispatch_io_debug("operation create", channel->fd);
#if DISPATCH_IO_DEBUG
int fd = channel->fd;
#endif
int err = _dispatch_io_get_error(NULL, channel, false);
if (err || !length) {
_dispatch_io_data_retain(data);
_dispatch_retain(queue);
dispatch_async(channel->barrier_queue, ^{
dispatch_async(queue, ^{
dispatch_data_t d = data;
if (direction == DOP_DIR_READ && err) {
d = NULL;
} else if (direction == DOP_DIR_WRITE && !err) {
d = NULL;
}
_dispatch_io_debug("IO handler invoke", fd);
handler(true, d, err);
_dispatch_io_data_release(data);
});
_dispatch_release(queue);
});
return NULL;
}
dispatch_operation_t op;
op = calloc(1ul, sizeof(struct dispatch_operation_s));
op->do_vtable = &_dispatch_operation_vtable;
op->do_next = DISPATCH_OBJECT_LISTLESS;
op->do_ref_cnt = 1;
op->do_xref_cnt = 0; op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL);
op->op_q->do_targetq = queue;
_dispatch_retain(queue);
op->active = false;
op->direction = direction;
op->offset = offset + channel->f_ptr;
op->length = length;
op->handler = Block_copy(handler);
_dispatch_retain(channel);
op->channel = channel;
op->params = channel->params;
dispatch_queue_t targetq = op->channel->do_targetq;
while (fastpath(targetq->do_targetq)) {
targetq = targetq->do_targetq;
}
op->do_targetq = targetq;
return op;
}
static void
_dispatch_operation_dispose(dispatch_operation_t op)
{
if (op->fd_entry) {
_dispatch_operation_deliver_data(op, DOP_DONE);
dispatch_group_leave(op->fd_entry->barrier_group);
_dispatch_fd_entry_release(op->fd_entry);
}
if (op->channel) {
_dispatch_release(op->channel);
}
if (op->timer) {
dispatch_release(op->timer);
}
if (op->buf && op->direction == DOP_DIR_READ) {
free(op->buf);
}
if (op->buf_data) {
_dispatch_io_data_release(op->buf_data);
}
if (op->data) {
_dispatch_io_data_release(op->data);
}
if (op->op_q) {
dispatch_release(op->op_q);
}
Block_release(op->handler);
_dispatch_dispose(op);
}
static void
_dispatch_operation_enqueue(dispatch_operation_t op,
dispatch_op_direction_t direction, dispatch_data_t data)
{
_dispatch_io_data_retain(data);
int err = _dispatch_io_get_error(NULL, op->channel, false);
if (err) {
dispatch_io_handler_t handler = op->handler;
dispatch_async(op->op_q, ^{
dispatch_data_t d = data;
if (direction == DOP_DIR_READ && err) {
d = NULL;
} else if (direction == DOP_DIR_WRITE && !err) {
d = NULL;
}
handler(true, d, err);
_dispatch_io_data_release(data);
});
_dispatch_release(op);
return;
}
op->fd_entry = op->channel->fd_entry;
_dispatch_fd_entry_retain(op->fd_entry);
dispatch_group_enter(op->fd_entry->barrier_group);
dispatch_disk_t disk = op->fd_entry->disk;
if (!disk) {
dispatch_stream_t stream = op->fd_entry->streams[direction];
dispatch_async(stream->dq, ^{
_dispatch_stream_enqueue_operation(stream, op, data);
_dispatch_io_data_release(data);
});
} else {
dispatch_async(disk->pick_queue, ^{
_dispatch_disk_enqueue_operation(disk, op, data);
_dispatch_io_data_release(data);
});
}
}
static bool
_dispatch_operation_should_enqueue(dispatch_operation_t op,
dispatch_queue_t tq, dispatch_data_t data)
{
_dispatch_io_debug("enqueue operation", op->fd_entry->fd);
_dispatch_io_data_retain(data);
op->data = data;
int err = _dispatch_io_get_error(op, NULL, true);
if (err) {
op->err = err;
_dispatch_release(op);
return false;
}
if (op->params.interval) {
dispatch_resume(_dispatch_operation_timer(tq, op));
}
return true;
}
static dispatch_source_t
_dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op)
{
if (op->timer) {
return op->timer;
}
dispatch_source_t timer = dispatch_source_create(
DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq);
dispatch_source_set_timer(timer, dispatch_time(DISPATCH_TIME_NOW,
op->params.interval), op->params.interval, 0);
dispatch_source_set_event_handler(timer, ^{
if (dispatch_source_testcancel(timer)) {
return;
}
dispatch_op_flags_t flags = DOP_DEFAULT;
if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) {
flags |= DOP_DELIVER;
}
if ((op->active) && (flags & DOP_DELIVER)) {
op->flags = flags;
} else {
_dispatch_operation_deliver_data(op, flags);
}
});
op->timer = timer;
return op->timer;
}
#pragma mark -
#pragma mark dispatch_fd_entry_t
static inline void
_dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) {
dispatch_suspend(fd_entry->close_queue);
}
static inline void
_dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) {
dispatch_resume(fd_entry->close_queue);
}
static void
_dispatch_fd_entry_init_async(dispatch_fd_t fd,
dispatch_fd_entry_init_callback_t completion_callback)
{
static dispatch_once_t _dispatch_io_fds_lockq_pred;
dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL,
_dispatch_io_fds_lockq_init);
dispatch_async(_dispatch_io_fds_lockq, ^{
_dispatch_io_debug("fd entry init", fd);
dispatch_fd_entry_t fd_entry = NULL;
uintptr_t hash = DIO_HASH(fd);
TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) {
if (fd_entry->fd == fd) {
_dispatch_fd_entry_retain(fd_entry);
break;
}
}
if (!fd_entry) {
fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash);
}
dispatch_async(fd_entry->barrier_queue, ^{
_dispatch_io_debug("fd entry init completion", fd);
completion_callback(fd_entry);
_dispatch_fd_entry_release(fd_entry);
});
});
}
static dispatch_fd_entry_t
_dispatch_fd_entry_create(dispatch_queue_t q)
{
dispatch_fd_entry_t fd_entry;
fd_entry = calloc(1ul, sizeof(struct dispatch_fd_entry_s));
fd_entry->close_queue = dispatch_queue_create(
"com.apple.libdispatch-io.closeq", NULL);
fd_entry->close_queue->do_targetq = q;
_dispatch_retain(q);
_dispatch_fd_entry_retain(fd_entry);
return fd_entry;
}
static dispatch_fd_entry_t
_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
{
_dispatch_io_debug("fd entry create", fd);
dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
_dispatch_io_fds_lockq);
fd_entry->fd = fd;
TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list);
fd_entry->barrier_queue = dispatch_queue_create(
"com.apple.libdispatch-io.barrierq", NULL);
fd_entry->barrier_group = dispatch_group_create();
dispatch_async(fd_entry->barrier_queue, ^{
_dispatch_io_debug("fd entry stat", fd);
int err, orig_flags, orig_nosigpipe = -1;
struct stat st;
_dispatch_io_syscall_switch(err,
fstat(fd, &st),
default: fd_entry->err = err; return;
);
fd_entry->stat.dev = st.st_dev;
fd_entry->stat.mode = st.st_mode;
_dispatch_io_syscall_switch(err,
orig_flags = fcntl(fd, F_GETFL),
default: (void)dispatch_assume_zero(err); break;
);
#if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
if (S_ISFIFO(st.st_mode)) {
_dispatch_io_syscall_switch(err,
orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE),
default: (void)dispatch_assume_zero(err); break;
);
if (orig_nosigpipe != -1) {
_dispatch_io_syscall_switch(err,
orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1),
default:
orig_nosigpipe = -1;
(void)dispatch_assume_zero(err);
break;
);
}
}
#endif
if (S_ISREG(st.st_mode)) {
if (orig_flags != -1) {
_dispatch_io_syscall_switch(err,
fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK),
default:
orig_flags = -1;
(void)dispatch_assume_zero(err);
break;
);
}
int32_t dev = major(st.st_dev);
dispatch_suspend(fd_entry->barrier_queue);
dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
_dispatch_io_devs_lockq_init);
dispatch_async(_dispatch_io_devs_lockq, ^{
_dispatch_disk_init(fd_entry, dev);
dispatch_resume(fd_entry->barrier_queue);
});
} else {
if (orig_flags != -1) {
_dispatch_io_syscall_switch(err,
fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK),
default:
orig_flags = -1;
(void)dispatch_assume_zero(err);
break;
);
}
_dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
DISPATCH_QUEUE_PRIORITY_DEFAULT, false));
}
fd_entry->orig_flags = orig_flags;
fd_entry->orig_nosigpipe = orig_nosigpipe;
});
dispatch_async(fd_entry->close_queue, ^{
if (!fd_entry->disk) {
_dispatch_io_debug("close queue fd_entry cleanup", fd);
dispatch_op_direction_t dir;
for (dir = 0; dir < DOP_DIR_MAX; dir++) {
_dispatch_stream_dispose(fd_entry, dir);
}
} else {
dispatch_disk_t disk = fd_entry->disk;
dispatch_async(_dispatch_io_devs_lockq, ^{
_dispatch_release(disk);
});
}
TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list);
});
dispatch_async(fd_entry->close_queue, ^{
_dispatch_io_debug("close queue release", fd);
dispatch_release(fd_entry->close_queue);
_dispatch_io_debug("barrier queue release", fd);
dispatch_release(fd_entry->barrier_queue);
_dispatch_io_debug("barrier group release", fd);
dispatch_release(fd_entry->barrier_group);
if (fd_entry->orig_flags != -1) {
_dispatch_io_syscall(
fcntl(fd, F_SETFL, fd_entry->orig_flags)
);
}
#if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
if (fd_entry->orig_nosigpipe != -1) {
_dispatch_io_syscall(
fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe)
);
}
#endif
if (fd_entry->convenience_channel) {
fd_entry->convenience_channel->fd_entry = NULL;
dispatch_release(fd_entry->convenience_channel);
}
free(fd_entry);
});
return fd_entry;
}
static dispatch_fd_entry_t
_dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data,
dev_t dev, mode_t mode)
{
_dispatch_io_debug("fd entry create with path %s", -1, path_data->path);
dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
path_data->channel->queue);
if (S_ISREG(mode)) {
_dispatch_disk_init(fd_entry, major(dev));
} else {
_dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
DISPATCH_QUEUE_PRIORITY_DEFAULT, false));
}
fd_entry->fd = -1;
fd_entry->orig_flags = -1;
fd_entry->path_data = path_data;
fd_entry->stat.dev = dev;
fd_entry->stat.mode = mode;
fd_entry->barrier_queue = dispatch_queue_create(
"com.apple.libdispatch-io.barrierq", NULL);
fd_entry->barrier_group = dispatch_group_create();
dispatch_async(fd_entry->close_queue, ^{
_dispatch_io_debug("close queue fd_entry cleanup", -1);
if (!fd_entry->disk) {
dispatch_op_direction_t dir;
for (dir = 0; dir < DOP_DIR_MAX; dir++) {
_dispatch_stream_dispose(fd_entry, dir);
}
}
if (fd_entry->fd != -1) {
close(fd_entry->fd);
}
if (fd_entry->path_data->channel) {
fd_entry->path_data->channel->fd_entry = NULL;
}
});
dispatch_async(fd_entry->close_queue, ^{
_dispatch_io_debug("close queue release", -1);
dispatch_release(fd_entry->close_queue);
dispatch_release(fd_entry->barrier_queue);
dispatch_release(fd_entry->barrier_group);
free(fd_entry->path_data);
free(fd_entry);
});
return fd_entry;
}
static int
_dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel)
{
if (!(fd_entry->fd == -1 && fd_entry->path_data)) {
return 0;
}
if (fd_entry->err) {
return fd_entry->err;
}
int fd = -1;
int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK :
fd_entry->path_data->oflag | O_NONBLOCK;
open:
fd = open(fd_entry->path_data->path, oflag, fd_entry->path_data->mode);
if (fd == -1) {
int err = errno;
if (err == EINTR) {
goto open;
}
(void)dispatch_atomic_cmpxchg2o(fd_entry, err, 0, err);
return err;
}
if (!dispatch_atomic_cmpxchg2o(fd_entry, fd, -1, fd)) {
close(fd);
} else {
channel->fd_actual = fd;
}
return 0;
}
static void
_dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
dispatch_io_t channel)
{
if (fd_entry->disk) {
if (channel) {
_dispatch_retain(channel);
}
_dispatch_fd_entry_retain(fd_entry);
dispatch_async(fd_entry->disk->pick_queue, ^{
_dispatch_disk_cleanup_operations(fd_entry->disk, channel);
_dispatch_fd_entry_release(fd_entry);
if (channel) {
_dispatch_release(channel);
}
});
} else {
dispatch_op_direction_t direction;
for (direction = 0; direction < DOP_DIR_MAX; direction++) {
dispatch_stream_t stream = fd_entry->streams[direction];
if (!stream) {
continue;
}
if (channel) {
_dispatch_retain(channel);
}
_dispatch_fd_entry_retain(fd_entry);
dispatch_async(stream->dq, ^{
_dispatch_stream_cleanup_operations(stream, channel);
_dispatch_fd_entry_release(fd_entry);
if (channel) {
_dispatch_release(channel);
}
});
}
}
}
#pragma mark -
#pragma mark dispatch_stream_t/dispatch_disk_t
static void
_dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq)
{
dispatch_op_direction_t direction;
for (direction = 0; direction < DOP_DIR_MAX; direction++) {
dispatch_stream_t stream;
stream = calloc(1ul, sizeof(struct dispatch_stream_s));
stream->dq = dispatch_queue_create("com.apple.libdispatch-io.streamq",
NULL);
_dispatch_retain(tq);
stream->dq->do_targetq = tq;
TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]);
TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]);
fd_entry->streams[direction] = stream;
}
}
static void
_dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
dispatch_op_direction_t direction)
{
dispatch_stream_t stream = fd_entry->streams[direction];
if (!stream) {
return;
}
dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM]));
if (stream->source) {
_dispatch_fd_entry_retain(fd_entry);
dispatch_source_cancel(stream->source);
dispatch_resume(stream->source);
dispatch_release(stream->source);
}
dispatch_release(stream->dq);
free(stream);
}
static void
_dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev)
{
dispatch_disk_t disk;
char label_name[256];
uintptr_t hash = DIO_HASH(dev);
TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) {
if (disk->dev == dev) {
_dispatch_retain(disk);
goto out;
}
}
size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs;
disk = calloc(1ul, sizeof(struct dispatch_disk_s) + (pending_reqs_depth *
sizeof(dispatch_operation_t)));
disk->do_vtable = &_dispatch_disk_vtable;
disk->do_next = DISPATCH_OBJECT_LISTLESS;
disk->do_ref_cnt = 1;
disk->do_xref_cnt = 0;
disk->advise_list_depth = pending_reqs_depth;
disk->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
false);
disk->dev = dev;
TAILQ_INIT(&disk->operations);
disk->cur_rq = TAILQ_FIRST(&disk->operations);
sprintf(label_name, "com.apple.libdispatch-io.deviceq.%d", dev);
disk->pick_queue = dispatch_queue_create(label_name, NULL);
TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list);
out:
fd_entry->disk = disk;
TAILQ_INIT(&fd_entry->stream_ops);
}
static void
_dispatch_disk_dispose(dispatch_disk_t disk)
{
uintptr_t hash = DIO_HASH(disk->dev);
TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list);
dispatch_assert(TAILQ_EMPTY(&disk->operations));
size_t i;
for (i=0; i<disk->advise_list_depth; ++i) {
dispatch_assert(!disk->advise_list[i]);
}
dispatch_release(disk->pick_queue);
free(disk);
}
#pragma mark -
#pragma mark dispatch_stream_operations/dispatch_disk_operations
static inline bool
_dispatch_stream_operation_avail(dispatch_stream_t stream)
{
return !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) ||
!(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
}
static void
_dispatch_stream_enqueue_operation(dispatch_stream_t stream,
dispatch_operation_t op, dispatch_data_t data)
{
if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) {
return;
}
bool no_ops = !_dispatch_stream_operation_avail(stream);
TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list);
if (no_ops) {
dispatch_async_f(stream->dq, stream, _dispatch_stream_handler);
}
}
static void
_dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op,
dispatch_data_t data)
{
if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) {
return;
}
if (op->params.type == DISPATCH_IO_STREAM) {
if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) {
TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
}
TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list);
} else {
TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
}
_dispatch_disk_handler(disk);
}
static void
_dispatch_stream_complete_operation(dispatch_stream_t stream,
dispatch_operation_t op)
{
_dispatch_io_debug("complete operation", op->fd_entry->fd);
TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list);
if (op == stream->op) {
stream->op = NULL;
}
if (op->timer) {
dispatch_source_cancel(op->timer);
}
_dispatch_release(op);
}
static void
_dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op)
{
_dispatch_io_debug("complete operation", op->fd_entry->fd);
if (disk->cur_rq == op) {
disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s,
operation_list);
}
if (op->params.type == DISPATCH_IO_STREAM) {
dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list);
TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list);
if (op_next) {
TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list);
}
}
TAILQ_REMOVE(&disk->operations, op, operation_list);
if (op->timer) {
dispatch_source_cancel(op->timer);
}
_dispatch_release(op);
}
static dispatch_operation_t
_dispatch_stream_pick_next_operation(dispatch_stream_t stream,
dispatch_operation_t op)
{
if (!op) {
if (!_dispatch_stream_operation_avail(stream)) {
return op;
}
if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) {
op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]);
} else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) {
op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
}
return op;
}
if (op->params.type == DISPATCH_IO_STREAM) {
return op;
}
if (op->params.type == DISPATCH_IO_RANDOM) {
op = TAILQ_NEXT(op, operation_list);
if (!op) {
op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
}
return op;
}
return NULL;
}
static dispatch_operation_t
_dispatch_disk_pick_next_operation(dispatch_disk_t disk)
{
dispatch_operation_t op;
if (!TAILQ_EMPTY(&disk->operations)) {
if (disk->cur_rq == NULL) {
op = TAILQ_FIRST(&disk->operations);
} else {
op = disk->cur_rq;
do {
op = TAILQ_NEXT(op, operation_list);
if (!op) {
op = TAILQ_FIRST(&disk->operations);
}
} while (op->active && op != disk->cur_rq);
}
if (!op->active) {
disk->cur_rq = op;
return op;
}
}
return NULL;
}
static void
_dispatch_stream_cleanup_operations(dispatch_stream_t stream,
dispatch_io_t channel)
{
dispatch_operation_t op, tmp;
typeof(*stream->operations) *operations;
operations = &stream->operations[DISPATCH_IO_RANDOM];
TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
if (!channel || op->channel == channel) {
_dispatch_stream_complete_operation(stream, op);
}
}
operations = &stream->operations[DISPATCH_IO_STREAM];
TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
if (!channel || op->channel == channel) {
_dispatch_stream_complete_operation(stream, op);
}
}
if (stream->source_running && !_dispatch_stream_operation_avail(stream)) {
dispatch_suspend(stream->source);
stream->source_running = false;
}
}
static void
_dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel)
{
dispatch_operation_t op, tmp;
TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) {
if (!channel || op->channel == channel) {
_dispatch_disk_complete_operation(disk, op);
}
}
}
#pragma mark -
#pragma mark dispatch_stream_handler/dispatch_disk_handler
static dispatch_source_t
_dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op)
{
if (stream->source) {
return stream->source;
}
dispatch_fd_t fd = op->fd_entry->fd;
_dispatch_io_debug("stream source create", fd);
dispatch_source_t source = NULL;
if (op->direction == DOP_DIR_READ) {
source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0,
stream->dq);
} else if (op->direction == DOP_DIR_WRITE) {
source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, fd, 0,
stream->dq);
} else {
dispatch_assert(op->direction < DOP_DIR_MAX);
return NULL;
}
dispatch_set_context(source, stream);
dispatch_source_set_event_handler_f(source,
_dispatch_stream_source_handler);
dispatch_queue_t close_queue = op->fd_entry->close_queue;
dispatch_source_set_cancel_handler(source, ^{
_dispatch_io_debug("stream source cancel", fd);
dispatch_resume(close_queue);
});
stream->source = source;
return stream->source;
}
static void
_dispatch_stream_source_handler(void *ctx)
{
dispatch_stream_t stream = (dispatch_stream_t)ctx;
dispatch_suspend(stream->source);
stream->source_running = false;
return _dispatch_stream_handler(stream);
}
static void
_dispatch_stream_handler(void *ctx)
{
dispatch_stream_t stream = (dispatch_stream_t)ctx;
dispatch_operation_t op;
pick:
op = _dispatch_stream_pick_next_operation(stream, stream->op);
if (!op) {
_dispatch_debug("no operation found: stream %p", stream);
return;
}
int err = _dispatch_io_get_error(op, NULL, true);
if (err) {
op->err = err;
_dispatch_stream_complete_operation(stream, op);
goto pick;
}
stream->op = op;
_dispatch_io_debug("stream handler", op->fd_entry->fd);
dispatch_fd_entry_t fd_entry = op->fd_entry;
_dispatch_fd_entry_retain(fd_entry);
if (!op->total && dispatch_io_defaults.initial_delivery) {
_dispatch_io_debug("initial delivery", op->fd_entry->fd);
_dispatch_operation_deliver_data(op, DOP_DELIVER);
}
int result = _dispatch_operation_perform(op), flags = -1;
switch (result) {
case DISPATCH_OP_DELIVER:
flags = DOP_DEFAULT;
case DISPATCH_OP_DELIVER_AND_COMPLETE:
flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY :
DOP_DEFAULT;
_dispatch_operation_deliver_data(op, flags);
case DISPATCH_OP_COMPLETE:
if (flags != DOP_DEFAULT) {
_dispatch_stream_complete_operation(stream, op);
}
if (_dispatch_stream_operation_avail(stream)) {
dispatch_async_f(stream->dq, stream, _dispatch_stream_handler);
}
break;
case DISPATCH_OP_COMPLETE_RESUME:
_dispatch_stream_complete_operation(stream, op);
case DISPATCH_OP_RESUME:
if (_dispatch_stream_operation_avail(stream)) {
stream->source_running = true;
dispatch_resume(_dispatch_stream_source(stream, op));
}
break;
case DISPATCH_OP_ERR:
_dispatch_stream_cleanup_operations(stream, op->channel);
break;
case DISPATCH_OP_FD_ERR:
_dispatch_fd_entry_retain(fd_entry);
dispatch_async(fd_entry->barrier_queue, ^{
_dispatch_fd_entry_cleanup_operations(fd_entry, NULL);
_dispatch_fd_entry_release(fd_entry);
});
break;
default:
break;
}
_dispatch_fd_entry_release(fd_entry);
return;
}
static void
_dispatch_disk_handler(void *ctx)
{
dispatch_disk_t disk = (dispatch_disk_t)ctx;
if (disk->io_active) {
return;
}
_dispatch_io_debug("disk handler", -1);
dispatch_operation_t op;
size_t i = disk->free_idx, j = disk->req_idx;
if (j <= i) {
j += disk->advise_list_depth;
}
while (i <= j) {
if ((!disk->advise_list[i%disk->advise_list_depth]) &&
(op = _dispatch_disk_pick_next_operation(disk))) {
int err = _dispatch_io_get_error(op, NULL, true);
if (err) {
op->err = err;
_dispatch_disk_complete_operation(disk, op);
continue;
}
_dispatch_retain(op);
disk->advise_list[i%disk->advise_list_depth] = op;
op->active = true;
} else {
break;
}
i++;
}
disk->free_idx = (i%disk->advise_list_depth);
op = disk->advise_list[disk->req_idx];
if (op) {
disk->io_active = true;
dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform);
}
}
static void
_dispatch_disk_perform(void *ctxt)
{
dispatch_disk_t disk = ctxt;
size_t chunk_size = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
_dispatch_io_debug("disk perform", -1);
dispatch_operation_t op;
size_t i = disk->advise_idx, j = disk->free_idx;
if (j <= i) {
j += disk->advise_list_depth;
}
do {
op = disk->advise_list[i%disk->advise_list_depth];
if (!op) {
dispatch_assert(i%disk->advise_list_depth == disk->free_idx);
break;
}
if (op->direction == DOP_DIR_WRITE) {
continue;
}
if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry,
op->channel)) {
continue;
}
if (!op->total && dispatch_io_defaults.initial_delivery) {
_dispatch_io_debug("initial delivery", op->fd_entry->fd);
_dispatch_operation_deliver_data(op, DOP_DELIVER);
}
if ((j-i) == 1 && !disk->advise_list[disk->free_idx] &&
!op->advise_offset) {
chunk_size *= 2;
}
_dispatch_operation_advise(op, chunk_size);
} while (++i < j);
disk->advise_idx = i%disk->advise_list_depth;
op = disk->advise_list[disk->req_idx];
int result = _dispatch_operation_perform(op);
disk->advise_list[disk->req_idx] = NULL;
disk->req_idx = (++disk->req_idx)%disk->advise_list_depth;
dispatch_async(disk->pick_queue, ^{
switch (result) {
case DISPATCH_OP_DELIVER:
_dispatch_operation_deliver_data(op, DOP_DEFAULT);
break;
case DISPATCH_OP_COMPLETE:
_dispatch_disk_complete_operation(disk, op);
break;
case DISPATCH_OP_DELIVER_AND_COMPLETE:
_dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY);
_dispatch_disk_complete_operation(disk, op);
break;
case DISPATCH_OP_ERR:
_dispatch_disk_cleanup_operations(disk, op->channel);
break;
case DISPATCH_OP_FD_ERR:
_dispatch_disk_cleanup_operations(disk, NULL);
break;
default:
dispatch_assert(result);
break;
}
op->active = false;
disk->io_active = false;
_dispatch_disk_handler(disk);
_dispatch_release(op);
});
}
#pragma mark -
#pragma mark dispatch_operation_perform
static void
_dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size)
{
int err;
struct radvisory advise;
if (op->advise_offset > (off_t)((op->offset+op->total) + chunk_size +
PAGE_SIZE)) {
return;
}
advise.ra_count = (int)chunk_size;
if (!op->advise_offset) {
op->advise_offset = op->offset;
size_t pg_fraction = (size_t)((op->offset + chunk_size) % PAGE_SIZE);
advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0);
}
advise.ra_offset = op->advise_offset;
op->advise_offset += advise.ra_count;
_dispatch_io_syscall_switch(err,
fcntl(op->fd_entry->fd, F_RDADVISE, &advise),
default: (void)dispatch_assume_zero(err); break;
);
}
static int
_dispatch_operation_perform(dispatch_operation_t op)
{
int err = _dispatch_io_get_error(op, NULL, true);
if (err) {
goto error;
}
if (!op->buf) {
size_t max_buf_siz = op->params.high;
size_t chunk_siz = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
if (op->direction == DOP_DIR_READ) {
size_t data_siz = dispatch_data_get_size(op->data);
if (data_siz) {
dispatch_assert(data_siz < max_buf_siz);
max_buf_siz -= data_siz;
}
if (max_buf_siz > chunk_siz) {
max_buf_siz = chunk_siz;
}
if (op->length < SIZE_MAX) {
op->buf_siz = op->length - op->total;
if (op->buf_siz > max_buf_siz) {
op->buf_siz = max_buf_siz;
}
} else {
op->buf_siz = max_buf_siz;
}
op->buf = valloc(op->buf_siz);
_dispatch_io_debug("buffer allocated", op->fd_entry->fd);
} else if (op->direction == DOP_DIR_WRITE) {
if (chunk_siz > max_buf_siz) {
chunk_siz = max_buf_siz;
}
op->buf_siz = 0;
dispatch_data_apply(op->data,
^(dispatch_data_t region DISPATCH_UNUSED,
size_t offset DISPATCH_UNUSED,
const void* buf DISPATCH_UNUSED, size_t len) {
size_t siz = op->buf_siz + len;
if (!op->buf_siz || siz <= chunk_siz) {
op->buf_siz = siz;
}
return (bool)(siz < chunk_siz);
});
if (op->buf_siz > max_buf_siz) {
op->buf_siz = max_buf_siz;
}
dispatch_data_t d;
d = dispatch_data_create_subrange(op->data, 0, op->buf_siz);
op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf,
NULL);
_dispatch_io_data_release(d);
_dispatch_io_debug("buffer mapped", op->fd_entry->fd);
}
}
if (op->fd_entry->fd == -1) {
err = _dispatch_fd_entry_open(op->fd_entry, op->channel);
if (err) {
goto error;
}
}
void *buf = op->buf + op->buf_len;
size_t len = op->buf_siz - op->buf_len;
off_t off = op->offset + op->total;
ssize_t processed = -1;
syscall:
if (op->direction == DOP_DIR_READ) {
if (op->params.type == DISPATCH_IO_STREAM) {
processed = read(op->fd_entry->fd, buf, len);
} else if (op->params.type == DISPATCH_IO_RANDOM) {
processed = pread(op->fd_entry->fd, buf, len, off);
}
} else if (op->direction == DOP_DIR_WRITE) {
if (op->params.type == DISPATCH_IO_STREAM) {
processed = write(op->fd_entry->fd, buf, len);
} else if (op->params.type == DISPATCH_IO_RANDOM) {
processed = pwrite(op->fd_entry->fd, buf, len, off);
}
}
if (processed == -1) {
err = errno;
if (err == EINTR) {
goto syscall;
}
goto error;
}
if (processed == 0) {
_dispatch_io_debug("EOF", op->fd_entry->fd);
return DISPATCH_OP_DELIVER_AND_COMPLETE;
}
op->buf_len += processed;
op->total += processed;
if (op->total == op->length) {
return DISPATCH_OP_COMPLETE;
} else {
return DISPATCH_OP_DELIVER;
}
error:
if (err == EAGAIN) {
dispatch_assert(!op->fd_entry->disk);
_dispatch_io_debug("EAGAIN %d", op->fd_entry->fd, err);
if (op->direction == DOP_DIR_READ && op->total &&
op->channel == op->fd_entry->convenience_channel) {
return DISPATCH_OP_COMPLETE_RESUME;
}
return DISPATCH_OP_RESUME;
}
op->err = err;
switch (err) {
case ECANCELED:
return DISPATCH_OP_ERR;
case EBADF:
(void)dispatch_atomic_cmpxchg2o(op->fd_entry, err, 0, err);
return DISPATCH_OP_FD_ERR;
default:
return DISPATCH_OP_COMPLETE;
}
}
static void
_dispatch_operation_deliver_data(dispatch_operation_t op,
dispatch_op_flags_t flags)
{
dispatch_data_t data = NULL;
int err = 0;
size_t undelivered = op->undelivered + op->buf_len;
bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) ||
(op->flags & DOP_DELIVER);
op->flags = DOP_DEFAULT;
if (!deliver) {
if (undelivered >= op->params.low) {
deliver = true;
} else if (op->buf_len < op->buf_siz) {
_dispatch_io_debug("buffer data", op->fd_entry->fd);
return;
}
} else {
err = op->err;
if (!err && (op->channel->atomic_flags & DIO_STOPPED)) {
err = ECANCELED;
op->err = err;
}
}
if (op->direction == DOP_DIR_READ) {
if (op->buf_len) {
void *buf = op->buf;
data = dispatch_data_create(buf, op->buf_len, NULL,
DISPATCH_DATA_DESTRUCTOR_FREE);
op->buf = NULL;
op->buf_len = 0;
dispatch_data_t d = dispatch_data_create_concat(op->data, data);
_dispatch_io_data_release(op->data);
_dispatch_io_data_release(data);
data = d;
} else {
data = op->data;
}
op->data = deliver ? dispatch_data_empty : data;
} else if (op->direction == DOP_DIR_WRITE) {
if (deliver) {
data = dispatch_data_create_subrange(op->data, op->buf_len,
op->length);
}
if (op->buf_data && op->buf_len == op->buf_siz) {
_dispatch_io_data_release(op->buf_data);
op->buf_data = NULL;
op->buf = NULL;
op->buf_len = 0;
dispatch_data_t d;
if (deliver) {
_dispatch_io_data_retain(data);
d = data;
} else {
d = dispatch_data_create_subrange(op->data, op->buf_siz,
op->length);
}
_dispatch_io_data_release(op->data);
op->data = d;
}
} else {
dispatch_assert(op->direction < DOP_DIR_MAX);
return;
}
if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) {
op->undelivered = undelivered;
_dispatch_io_debug("buffer data", op->fd_entry->fd);
return;
}
op->undelivered = 0;
_dispatch_io_debug("deliver data", op->fd_entry->fd);
dispatch_op_direction_t direction = op->direction;
dispatch_io_handler_t handler = op->handler;
#if DISPATCH_IO_DEBUG
int fd = op->fd_entry->fd;
#endif
dispatch_fd_entry_t fd_entry = op->fd_entry;
_dispatch_fd_entry_retain(fd_entry);
dispatch_io_t channel = op->channel;
_dispatch_retain(channel);
dispatch_async(op->op_q, ^{
bool done = (flags & DOP_DONE);
dispatch_data_t d = data;
if (done) {
if (direction == DOP_DIR_READ && err) {
if (dispatch_data_get_size(d)) {
_dispatch_io_debug("IO handler invoke", fd);
handler(false, d, 0);
}
d = NULL;
} else if (direction == DOP_DIR_WRITE && !err) {
d = NULL;
}
}
_dispatch_io_debug("IO handler invoke", fd);
handler(done, d, err);
_dispatch_release(channel);
_dispatch_fd_entry_release(fd_entry);
_dispatch_io_data_release(data);
});
}