#include "internal.h"
#include "protocol.h"
#include "protocolServer.h"
#include <sys/mount.h>
#define DISPATCH_EVFILT_TIMER (-EVFILT_SYSCOUNT - 1)
#define DISPATCH_EVFILT_CUSTOM_ADD (-EVFILT_SYSCOUNT - 2)
#define DISPATCH_EVFILT_CUSTOM_OR (-EVFILT_SYSCOUNT - 3)
#define DISPATCH_EVFILT_SYSCOUNT (EVFILT_SYSCOUNT + 3)
#define DISPATCH_TIMER_INDEX_WALL 0
#define DISPATCH_TIMER_INDEX_MACH 1
static struct dispatch_kevent_s _dispatch_kevent_timer[] = {
{
.dk_kevent = {
.ident = DISPATCH_TIMER_INDEX_WALL,
.filter = DISPATCH_EVFILT_TIMER,
.udata = &_dispatch_kevent_timer[0],
},
.dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer[0].dk_sources),
},
{
.dk_kevent = {
.ident = DISPATCH_TIMER_INDEX_MACH,
.filter = DISPATCH_EVFILT_TIMER,
.udata = &_dispatch_kevent_timer[1],
},
.dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer[1].dk_sources),
},
};
#define DISPATCH_TIMER_COUNT (sizeof _dispatch_kevent_timer / sizeof _dispatch_kevent_timer[0])
static struct dispatch_kevent_s _dispatch_kevent_data_or = {
.dk_kevent = {
.filter = DISPATCH_EVFILT_CUSTOM_OR,
.flags = EV_CLEAR,
.udata = &_dispatch_kevent_data_or,
},
.dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources),
};
static struct dispatch_kevent_s _dispatch_kevent_data_add = {
.dk_kevent = {
.filter = DISPATCH_EVFILT_CUSTOM_ADD,
.udata = &_dispatch_kevent_data_add,
},
.dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources),
};
#ifndef DISPATCH_NO_LEGACY
struct dispatch_source_attr_vtable_s {
DISPATCH_VTABLE_HEADER(dispatch_source_attr_s);
};
struct dispatch_source_attr_s {
DISPATCH_STRUCT_HEADER(dispatch_source_attr_s, dispatch_source_attr_vtable_s);
void* finalizer_ctxt;
dispatch_source_finalizer_function_t finalizer_func;
void* context;
};
#endif
#define _dispatch_source_call_block ((void *)-1)
static void _dispatch_source_latch_and_call(dispatch_source_t ds);
static void _dispatch_source_cancel_callout(dispatch_source_t ds);
static bool _dispatch_source_probe(dispatch_source_t ds);
static void _dispatch_source_dispose(dispatch_source_t ds);
static void _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent *ke);
static size_t _dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz);
static size_t dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz);
static dispatch_queue_t _dispatch_source_invoke(dispatch_source_t ds);
static void _dispatch_kevent_merge(dispatch_source_t ds);
static void _dispatch_kevent_release(dispatch_source_t ds);
static void _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags);
static void _dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags);
static void _dispatch_kevent_machport_enable(dispatch_kevent_t dk);
static void _dispatch_kevent_machport_disable(dispatch_kevent_t dk);
static void _dispatch_drain_mach_messages(struct kevent *ke);
static void _dispatch_timer_list_update(dispatch_source_t ds);
static void
_dispatch_mach_notify_source_init(void *context __attribute__((unused)));
static const char *
_evfiltstr(short filt)
{
switch (filt) {
#define _evfilt2(f) case (f): return #f
_evfilt2(EVFILT_READ);
_evfilt2(EVFILT_WRITE);
_evfilt2(EVFILT_AIO);
_evfilt2(EVFILT_VNODE);
_evfilt2(EVFILT_PROC);
_evfilt2(EVFILT_SIGNAL);
_evfilt2(EVFILT_TIMER);
_evfilt2(EVFILT_MACHPORT);
_evfilt2(EVFILT_FS);
_evfilt2(EVFILT_USER);
_evfilt2(EVFILT_SESSION);
_evfilt2(DISPATCH_EVFILT_TIMER);
_evfilt2(DISPATCH_EVFILT_CUSTOM_ADD);
_evfilt2(DISPATCH_EVFILT_CUSTOM_OR);
default:
return "EVFILT_missing";
}
}
#define DSL_HASH_SIZE 256u // must be a power of two
#define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];
static dispatch_kevent_t
_dispatch_kevent_find(uintptr_t ident, short filter)
{
uintptr_t hash = DSL_HASH(filter == EVFILT_MACHPORT ? MACH_PORT_INDEX(ident) : ident);
dispatch_kevent_t dki;
TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
break;
}
}
return dki;
}
static void
_dispatch_kevent_insert(dispatch_kevent_t dk)
{
uintptr_t ident = dk->dk_kevent.ident;
uintptr_t hash = DSL_HASH(dk->dk_kevent.filter == EVFILT_MACHPORT ? MACH_PORT_INDEX(ident) : ident);
TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list);
}
void
dispatch_source_cancel(dispatch_source_t ds)
{
#if DISPATCH_DEBUG
dispatch_debug(ds, __FUNCTION__);
#endif
_dispatch_retain(ds);
dispatch_atomic_or(&ds->ds_atomic_flags, DSF_CANCELED);
_dispatch_wakeup(ds);
_dispatch_release(ds);
}
#ifndef DISPATCH_NO_LEGACY
void
_dispatch_source_legacy_xref_release(dispatch_source_t ds)
{
if (ds->ds_is_legacy) {
if (!(ds->ds_timer.flags & DISPATCH_TIMER_ONESHOT)) {
dispatch_source_cancel(ds);
}
dispatch_atomic_and(&ds->do_suspend_cnt, DISPATCH_OBJECT_SUSPEND_LOCK);
} else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) {
DISPATCH_CLIENT_CRASH("Release of a suspended object");
}
_dispatch_wakeup(ds);
_dispatch_release(ds);
}
#endif
long
dispatch_source_testcancel(dispatch_source_t ds)
{
return (bool)(ds->ds_atomic_flags & DSF_CANCELED);
}
unsigned long
dispatch_source_get_mask(dispatch_source_t ds)
{
return ds->ds_pending_data_mask;
}
uintptr_t
dispatch_source_get_handle(dispatch_source_t ds)
{
return (int)ds->ds_ident_hack;
}
unsigned long
dispatch_source_get_data(dispatch_source_t ds)
{
return ds->ds_data;
}
#if DISPATCH_DEBUG
void
dispatch_debug_kevents(struct kevent* kev, size_t count, const char* str)
{
size_t i;
for (i = 0; i < count; ++i) {
_dispatch_log("kevent[%lu] = { ident = %p, filter = %s, flags = 0x%x, fflags = 0x%x, data = %p, udata = %p }: %s",
i, (void*)kev[i].ident, _evfiltstr(kev[i].filter), kev[i].flags, kev[i].fflags, (void*)kev[i].data, (void*)kev[i].udata, str);
}
}
#endif
static size_t
_dispatch_source_kevent_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
{
size_t offset = _dispatch_source_debug(ds, buf, bufsiz);
offset += snprintf(&buf[offset], bufsiz - offset, "filter = %s }",
ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) : "????");
return offset;
}
static void
_dispatch_source_init_tail_queue_array(void *context __attribute__((unused)))
{
unsigned int i;
for (i = 0; i < DSL_HASH_SIZE; i++) {
TAILQ_INIT(&_dispatch_sources[i]);
}
TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_WALL)], &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_WALL], dk_list);
TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_MACH)], &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_MACH], dk_list);
TAILQ_INSERT_TAIL(&_dispatch_sources[0], &_dispatch_kevent_data_or, dk_list);
TAILQ_INSERT_TAIL(&_dispatch_sources[0], &_dispatch_kevent_data_add, dk_list);
}
void
_dispatch_kevent_merge(dispatch_source_t ds)
{
static dispatch_once_t pred;
dispatch_kevent_t dk;
typeof(dk->dk_kevent.fflags) new_flags;
bool do_resume = false;
if (ds->ds_is_installed) {
return;
}
ds->ds_is_installed = true;
dispatch_once_f(&pred, NULL, _dispatch_source_init_tail_queue_array);
dk = _dispatch_kevent_find(ds->ds_dkev->dk_kevent.ident, ds->ds_dkev->dk_kevent.filter);
if (dk) {
new_flags = ~dk->dk_kevent.fflags & ds->ds_dkev->dk_kevent.fflags;
dk->dk_kevent.fflags |= ds->ds_dkev->dk_kevent.fflags;
free(ds->ds_dkev);
ds->ds_dkev = dk;
do_resume = new_flags;
} else {
dk = ds->ds_dkev;
_dispatch_kevent_insert(dk);
new_flags = dk->dk_kevent.fflags;
do_resume = true;
}
TAILQ_INSERT_TAIL(&dk->dk_sources, ds, ds_list);
if (do_resume) {
dk->dk_kevent.flags |= EV_ADD;
_dispatch_kevent_resume(ds->ds_dkev, new_flags, 0);
ds->ds_is_armed = true;
}
}
void
_dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags)
{
switch (dk->dk_kevent.filter) {
case DISPATCH_EVFILT_TIMER:
case DISPATCH_EVFILT_CUSTOM_ADD:
case DISPATCH_EVFILT_CUSTOM_OR:
return;
case EVFILT_MACHPORT:
_dispatch_kevent_machport_resume(dk, new_flags, del_flags);
break;
case EVFILT_PROC:
if (dk->dk_kevent.flags & EV_ONESHOT) {
return;
}
default:
_dispatch_update_kq(&dk->dk_kevent);
if (dk->dk_kevent.flags & EV_DISPATCH) {
dk->dk_kevent.flags &= ~EV_ADD;
}
break;
}
}
dispatch_queue_t
_dispatch_source_invoke(dispatch_source_t ds)
{
dispatch_queue_t dq = _dispatch_queue_get_current();
if (!ds->ds_is_installed) {
if (dq != &_dispatch_mgr_q) {
return &_dispatch_mgr_q;
}
_dispatch_kevent_merge(ds);
} else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
if (ds->ds_dkev) {
if (dq != &_dispatch_mgr_q) {
return &_dispatch_mgr_q;
}
_dispatch_kevent_release(ds);
return ds->do_targetq;
} else if (ds->ds_cancel_handler) {
if (dq != ds->do_targetq) {
return ds->do_targetq;
}
}
_dispatch_source_cancel_callout(ds);
} else if (ds->ds_pending_data) {
if (dq != ds->do_targetq) {
return ds->do_targetq;
}
_dispatch_source_latch_and_call(ds);
if (ds->ds_needs_rearm) {
return &_dispatch_mgr_q;
}
} else if (ds->ds_needs_rearm && !ds->ds_is_armed) {
if (dq != &_dispatch_mgr_q) {
return &_dispatch_mgr_q;
}
_dispatch_kevent_resume(ds->ds_dkev, 0, 0);
ds->ds_is_armed = true;
}
return NULL;
}
bool
_dispatch_source_probe(dispatch_source_t ds)
{
if (!ds->ds_is_installed) {
return true;
} else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
if (ds->ds_dkev || ds->ds_cancel_handler) {
return true;
}
} else if (ds->ds_pending_data) {
return true;
} else if (ds->ds_needs_rearm && !ds->ds_is_armed) {
return true;
}
return false;
}
void
_dispatch_source_dispose(dispatch_source_t ds)
{
_dispatch_queue_dispose((dispatch_queue_t)ds);
}
static void
_dispatch_kevent_debugger2(void *context, dispatch_source_t unused __attribute__((unused)))
{
struct sockaddr sa;
socklen_t sa_len = sizeof(sa);
int c, fd = (int)(long)context;
unsigned int i;
dispatch_kevent_t dk;
dispatch_source_t ds;
FILE *debug_stream;
c = accept(fd, &sa, &sa_len);
if (c == -1) {
if (errno != EAGAIN) {
dispatch_assume_zero(errno);
}
return;
}
#if 0
int r = fcntl(c, F_SETFL, 0); if (r == -1) {
dispatch_assume_zero(errno);
}
#endif
debug_stream = fdopen(c, "a");
if (!dispatch_assume(debug_stream)) {
close(c);
return;
}
fprintf(debug_stream, "HTTP/1.0 200 OK\r\n");
fprintf(debug_stream, "Content-type: text/html\r\n");
fprintf(debug_stream, "Pragma: nocache\r\n");
fprintf(debug_stream, "\r\n");
fprintf(debug_stream, "<html>\n<head><title>PID %u</title></head>\n<body>\n<ul>\n", getpid());
for (i = 0; i < DSL_HASH_SIZE; i++) {
if (TAILQ_EMPTY(&_dispatch_sources[i])) {
continue;
}
TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) {
fprintf(debug_stream, "\t<br><li>DK %p ident %lu filter %s flags 0x%hx fflags 0x%x data 0x%lx udata %p\n",
dk, dk->dk_kevent.ident, _evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags,
dk->dk_kevent.fflags, dk->dk_kevent.data, dk->dk_kevent.udata);
fprintf(debug_stream, "\t\t<ul>\n");
TAILQ_FOREACH(ds, &dk->dk_sources, ds_list) {
fprintf(debug_stream, "\t\t\t<li>DS %p refcnt 0x%x suspend 0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
ds, ds->do_ref_cnt, ds->do_suspend_cnt, ds->ds_pending_data, ds->ds_pending_data_mask,
ds->ds_atomic_flags);
if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) {
dispatch_queue_t dq = ds->do_targetq;
fprintf(debug_stream, "\t\t<br>DQ: %p refcnt 0x%x suspend 0x%x label: %s\n", dq, dq->do_ref_cnt, dq->do_suspend_cnt, dq->dq_label);
}
}
fprintf(debug_stream, "\t\t</ul>\n");
fprintf(debug_stream, "\t</li>\n");
}
}
fprintf(debug_stream, "</ul>\n</body>\n</html>\n");
fflush(debug_stream);
fclose(debug_stream);
}
static void
_dispatch_kevent_debugger(void *context __attribute__((unused)))
{
union {
struct sockaddr_in sa_in;
struct sockaddr sa;
} sa_u = {
.sa_in = {
.sin_family = AF_INET,
.sin_addr = { htonl(INADDR_LOOPBACK), },
},
};
dispatch_source_t ds;
const char *valstr;
int val, r, fd, sock_opt = 1;
socklen_t slen = sizeof(sa_u);
if (issetugid()) {
return;
}
valstr = getenv("LIBDISPATCH_DEBUGGER");
if (!valstr) {
return;
}
val = atoi(valstr);
if (val == 2) {
sa_u.sa_in.sin_addr.s_addr = 0;
}
fd = socket(PF_INET, SOCK_STREAM, 0);
if (fd == -1) {
dispatch_assume_zero(errno);
return;
}
r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt, (socklen_t) sizeof sock_opt);
if (r == -1) {
dispatch_assume_zero(errno);
goto out_bad;
}
#if 0
r = fcntl(fd, F_SETFL, O_NONBLOCK);
if (r == -1) {
dispatch_assume_zero(errno);
goto out_bad;
}
#endif
r = bind(fd, &sa_u.sa, sizeof(sa_u));
if (r == -1) {
dispatch_assume_zero(errno);
goto out_bad;
}
r = listen(fd, SOMAXCONN);
if (r == -1) {
dispatch_assume_zero(errno);
goto out_bad;
}
r = getsockname(fd, &sa_u.sa, &slen);
if (r == -1) {
dispatch_assume_zero(errno);
goto out_bad;
}
ds = dispatch_source_read_create_f(fd, NULL, &_dispatch_mgr_q, (void *)(long)fd, _dispatch_kevent_debugger2);
if (dispatch_assume(ds)) {
_dispatch_log("LIBDISPATCH: debug port: %hu", ntohs(sa_u.sa_in.sin_port));
return;
}
out_bad:
close(fd);
}
void
_dispatch_source_drain_kevent(struct kevent *ke)
{
static dispatch_once_t pred;
dispatch_kevent_t dk = ke->udata;
dispatch_source_t dsi;
dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger);
dispatch_debug_kevents(ke, 1, __func__);
if (ke->filter == EVFILT_MACHPORT) {
return _dispatch_drain_mach_messages(ke);
}
dispatch_assert(dk);
if (ke->flags & EV_ONESHOT) {
dk->dk_kevent.flags |= EV_ONESHOT;
}
TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
_dispatch_source_merge_kevent(dsi, ke);
}
}
static void
_dispatch_kevent_dispose(dispatch_kevent_t dk)
{
uintptr_t key;
switch (dk->dk_kevent.filter) {
case DISPATCH_EVFILT_TIMER:
case DISPATCH_EVFILT_CUSTOM_ADD:
case DISPATCH_EVFILT_CUSTOM_OR:
return;
case EVFILT_MACHPORT:
_dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags);
break;
case EVFILT_PROC:
if (dk->dk_kevent.flags & EV_ONESHOT) {
break; }
default:
if (~dk->dk_kevent.flags & EV_DELETE) {
dk->dk_kevent.flags |= EV_DELETE;
_dispatch_update_kq(&dk->dk_kevent);
}
break;
}
if (dk->dk_kevent.filter == EVFILT_MACHPORT) {
key = MACH_PORT_INDEX(dk->dk_kevent.ident);
} else {
key = dk->dk_kevent.ident;
}
TAILQ_REMOVE(&_dispatch_sources[DSL_HASH(key)], dk, dk_list);
free(dk);
}
void
_dispatch_kevent_release(dispatch_source_t ds)
{
dispatch_kevent_t dk = ds->ds_dkev;
dispatch_source_t dsi;
uint32_t del_flags, fflags = 0;
ds->ds_dkev = NULL;
TAILQ_REMOVE(&dk->dk_sources, ds, ds_list);
if (TAILQ_EMPTY(&dk->dk_sources)) {
_dispatch_kevent_dispose(dk);
} else {
TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
fflags |= (uint32_t)dsi->ds_pending_data_mask;
}
del_flags = (uint32_t)ds->ds_pending_data_mask & ~fflags;
if (del_flags) {
dk->dk_kevent.flags |= EV_ADD;
dk->dk_kevent.fflags = fflags;
_dispatch_kevent_resume(dk, 0, del_flags);
}
}
ds->ds_is_armed = false;
ds->ds_needs_rearm = false; _dispatch_release(ds); }
void
_dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent *ke)
{
struct kevent fake;
if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
return;
}
if (ke->flags & EV_ERROR) {
if (ke->filter == EVFILT_PROC && ke->data == ESRCH) {
fake = *ke;
fake.flags &= ~EV_ERROR;
fake.fflags = NOTE_EXIT;
fake.data = 0;
ke = &fake;
} else {
dispatch_assume_zero(ke->data);
return;
}
}
if (ds->ds_is_level) {
dispatch_assert(ke->data >= 0l);
ds->ds_pending_data = ~ke->data;
} else if (ds->ds_is_adder) {
dispatch_atomic_add(&ds->ds_pending_data, ke->data);
} else {
dispatch_atomic_or(&ds->ds_pending_data, ke->fflags & ds->ds_pending_data_mask);
}
if (ds->ds_needs_rearm) {
ds->ds_is_armed = false;
}
_dispatch_wakeup(ds);
}
void
_dispatch_source_latch_and_call(dispatch_source_t ds)
{
unsigned long prev;
if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
return;
}
prev = dispatch_atomic_xchg(&ds->ds_pending_data, 0);
if (ds->ds_is_level) {
ds->ds_data = ~prev;
} else {
ds->ds_data = prev;
}
if (dispatch_assume(prev)) {
if (ds->ds_handler_func) {
ds->ds_handler_func(ds->ds_handler_ctxt, ds);
}
}
}
void
_dispatch_source_cancel_callout(dispatch_source_t ds)
{
ds->ds_pending_data_mask = 0;
ds->ds_pending_data = 0;
ds->ds_data = 0;
#ifdef __BLOCKS__
if (ds->ds_handler_is_block) {
Block_release(ds->ds_handler_ctxt);
ds->ds_handler_is_block = false;
ds->ds_handler_func = NULL;
ds->ds_handler_ctxt = NULL;
}
#endif
if (!ds->ds_cancel_handler) {
return;
}
if (ds->ds_cancel_is_block) {
#ifdef __BLOCKS__
dispatch_block_t b = ds->ds_cancel_handler;
if (ds->ds_atomic_flags & DSF_CANCELED) {
b();
}
Block_release(ds->ds_cancel_handler);
ds->ds_cancel_is_block = false;
#endif
} else {
dispatch_function_t f = ds->ds_cancel_handler;
if (ds->ds_atomic_flags & DSF_CANCELED) {
f(ds->do_ctxt);
}
}
ds->ds_cancel_handler = NULL;
}
const struct dispatch_source_vtable_s _dispatch_source_kevent_vtable = {
.do_type = DISPATCH_SOURCE_KEVENT_TYPE,
.do_kind = "kevent-source",
.do_invoke = _dispatch_source_invoke,
.do_dispose = _dispatch_source_dispose,
.do_probe = _dispatch_source_probe,
.do_debug = _dispatch_source_kevent_debug,
};
void
dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
{
struct kevent kev = {
.fflags = (typeof(kev.fflags))val,
.data = val,
};
dispatch_assert(ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD ||
ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR);
_dispatch_source_merge_kevent(ds, &kev);
}
size_t
dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
{
dispatch_queue_t target = ds->do_targetq;
return snprintf(buf, bufsiz,
"target = %s[%p], pending_data = 0x%lx, pending_data_mask = 0x%lx, ",
target ? target->dq_label : "", target,
ds->ds_pending_data, ds->ds_pending_data_mask);
}
size_t
_dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
{
size_t offset = 0;
offset += snprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", dx_kind(ds), ds);
offset += dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
offset += dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
return offset;
}
#ifndef DISPATCH_NO_LEGACY
static void
dispatch_source_attr_dispose(dispatch_source_attr_t attr)
{
dispatch_source_attr_set_finalizer(attr, NULL);
_dispatch_dispose(attr);
}
static const struct dispatch_source_attr_vtable_s dispatch_source_attr_vtable = {
.do_type = DISPATCH_SOURCE_ATTR_TYPE,
.do_kind = "source-attr",
.do_dispose = dispatch_source_attr_dispose,
};
dispatch_source_attr_t
dispatch_source_attr_create(void)
{
dispatch_source_attr_t rval = calloc(1, sizeof(struct dispatch_source_attr_s));
if (rval) {
rval->do_vtable = &dispatch_source_attr_vtable;
rval->do_next = DISPATCH_OBJECT_LISTLESS;
rval->do_targetq = dispatch_get_global_queue(0, 0);
rval->do_ref_cnt = 1;
rval->do_xref_cnt = 1;
}
return rval;
}
void
dispatch_source_attr_set_finalizer_f(dispatch_source_attr_t attr,
void *context, dispatch_source_finalizer_function_t finalizer)
{
#ifdef __BLOCKS__
if (attr->finalizer_func == (void*)_dispatch_call_block_and_release2) {
Block_release(attr->finalizer_ctxt);
}
#endif
attr->finalizer_ctxt = context;
attr->finalizer_func = finalizer;
}
#ifdef __BLOCKS__
long
dispatch_source_attr_set_finalizer(dispatch_source_attr_t attr,
dispatch_source_finalizer_t finalizer)
{
void *ctxt;
dispatch_source_finalizer_function_t func;
if (finalizer) {
if (!(ctxt = Block_copy(finalizer))) {
return 1;
}
func = (void *)_dispatch_call_block_and_release2;
} else {
ctxt = NULL;
func = NULL;
}
dispatch_source_attr_set_finalizer_f(attr, ctxt, func);
return 0;
}
dispatch_source_finalizer_t
dispatch_source_attr_get_finalizer(dispatch_source_attr_t attr)
{
if (attr->finalizer_func == (void*)_dispatch_call_block_and_release2) {
return (dispatch_source_finalizer_t)attr->finalizer_ctxt;
} else if (attr->finalizer_func == NULL) {
return NULL;
} else {
abort(); }
}
#endif
void
dispatch_source_attr_set_context(dispatch_source_attr_t attr, void *context)
{
attr->context = context;
}
dispatch_source_attr_t
dispatch_source_attr_copy(dispatch_source_attr_t proto)
{
dispatch_source_attr_t rval = NULL;
if (proto && (rval = malloc(sizeof(struct dispatch_source_attr_s)))) {
memcpy(rval, proto, sizeof(struct dispatch_source_attr_s));
#ifdef __BLOCKS__
if (rval->finalizer_func == (void*)_dispatch_call_block_and_release2) {
rval->finalizer_ctxt = Block_copy(rval->finalizer_ctxt);
}
#endif
} else if (!proto) {
rval = dispatch_source_attr_create();
}
return rval;
}
#endif
struct dispatch_source_type_s {
struct kevent ke;
uint64_t mask;
};
const struct dispatch_source_type_s _dispatch_source_type_timer = {
.ke = {
.filter = DISPATCH_EVFILT_TIMER,
},
.mask = DISPATCH_TIMER_INTERVAL|DISPATCH_TIMER_ONESHOT|DISPATCH_TIMER_ABSOLUTE|DISPATCH_TIMER_WALL_CLOCK,
};
const struct dispatch_source_type_s _dispatch_source_type_read = {
.ke = {
.filter = EVFILT_READ,
.flags = EV_DISPATCH,
},
};
const struct dispatch_source_type_s _dispatch_source_type_write = {
.ke = {
.filter = EVFILT_WRITE,
.flags = EV_DISPATCH,
},
};
const struct dispatch_source_type_s _dispatch_source_type_proc = {
.ke = {
.filter = EVFILT_PROC,
.flags = EV_CLEAR,
},
.mask = NOTE_EXIT|NOTE_FORK|NOTE_EXEC|NOTE_SIGNAL|NOTE_REAP,
};
const struct dispatch_source_type_s _dispatch_source_type_signal = {
.ke = {
.filter = EVFILT_SIGNAL,
},
};
const struct dispatch_source_type_s _dispatch_source_type_vnode = {
.ke = {
.filter = EVFILT_VNODE,
.flags = EV_CLEAR,
},
.mask = NOTE_DELETE|NOTE_WRITE|NOTE_EXTEND|NOTE_ATTRIB|NOTE_LINK|NOTE_RENAME|NOTE_REVOKE|NOTE_NONE,
};
const struct dispatch_source_type_s _dispatch_source_type_vfs = {
.ke = {
.filter = EVFILT_FS,
.flags = EV_CLEAR,
},
.mask = VQ_NOTRESP|VQ_NEEDAUTH|VQ_LOWDISK|VQ_MOUNT|VQ_UNMOUNT|VQ_DEAD|VQ_ASSIST|VQ_NOTRESPLOCK|VQ_UPDATE|VQ_VERYLOWDISK,
};
const struct dispatch_source_type_s _dispatch_source_type_mach_send = {
.ke = {
.filter = EVFILT_MACHPORT,
.flags = EV_DISPATCH,
.fflags = DISPATCH_MACHPORT_DEAD,
},
.mask = DISPATCH_MACH_SEND_DEAD,
};
const struct dispatch_source_type_s _dispatch_source_type_mach_recv = {
.ke = {
.filter = EVFILT_MACHPORT,
.flags = EV_DISPATCH,
.fflags = DISPATCH_MACHPORT_RECV,
},
};
const struct dispatch_source_type_s _dispatch_source_type_data_add = {
.ke = {
.filter = DISPATCH_EVFILT_CUSTOM_ADD,
},
};
const struct dispatch_source_type_s _dispatch_source_type_data_or = {
.ke = {
.filter = DISPATCH_EVFILT_CUSTOM_OR,
.flags = EV_CLEAR,
.fflags = ~0,
},
};
dispatch_source_t
dispatch_source_create(dispatch_source_type_t type,
uintptr_t handle,
unsigned long mask,
dispatch_queue_t q)
{
const struct kevent *proto_kev = &type->ke;
dispatch_source_t ds = NULL;
dispatch_kevent_t dk = NULL;
if (type == NULL || (mask & ~type->mask)) {
goto out_bad;
}
switch (type->ke.filter) {
case EVFILT_SIGNAL:
if (handle >= NSIG) {
goto out_bad;
}
break;
case EVFILT_FS:
case DISPATCH_EVFILT_CUSTOM_ADD:
case DISPATCH_EVFILT_CUSTOM_OR:
case DISPATCH_EVFILT_TIMER:
if (handle) {
goto out_bad;
}
break;
default:
break;
}
ds = calloc(1ul, sizeof(struct dispatch_source_s));
if (slowpath(!ds)) {
goto out_bad;
}
dk = calloc(1ul, sizeof(struct dispatch_kevent_s));
if (slowpath(!dk)) {
goto out_bad;
}
dk->dk_kevent = *proto_kev;
dk->dk_kevent.ident = handle;
dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
dk->dk_kevent.fflags |= (uint32_t)mask;
dk->dk_kevent.udata = dk;
TAILQ_INIT(&dk->dk_sources);
_dispatch_queue_init((dispatch_queue_t)ds);
strlcpy(ds->dq_label, "source", sizeof(ds->dq_label));
ds->do_vtable = &_dispatch_source_kevent_vtable;
ds->do_ref_cnt++; ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
ds->do_targetq = q;
ds->ds_ident_hack = dk->dk_kevent.ident;
ds->ds_dkev = dk;
ds->ds_pending_data_mask = dk->dk_kevent.fflags;
if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
if (proto_kev->filter != EVFILT_MACHPORT) {
ds->ds_is_level = true;
}
ds->ds_needs_rearm = true;
} else if (!(EV_CLEAR & proto_kev->flags)) {
ds->ds_is_adder = true;
}
if (type->ke.filter == DISPATCH_EVFILT_TIMER) {
ds->ds_needs_rearm = true;
}
dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder));
#if DISPATCH_DEBUG
dispatch_debug(ds, __FUNCTION__);
#endif
if (type == DISPATCH_SOURCE_TYPE_MACH_SEND) {
static dispatch_once_t pred;
dispatch_once_f(&pred, NULL, _dispatch_mach_notify_source_init);
} else if (type == DISPATCH_SOURCE_TYPE_TIMER) {
ds->ds_timer.flags = mask;
}
_dispatch_retain(ds->do_targetq);
return ds;
out_bad:
free(ds);
free(dk);
return NULL;
}
static void
_dispatch_source_set_event_handler2(void *context)
{
struct Block_layout *bl = context;
dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
if (ds->ds_handler_is_block && ds->ds_handler_ctxt) {
Block_release(ds->ds_handler_ctxt);
}
ds->ds_handler_func = bl ? (void *)bl->invoke : NULL;
ds->ds_handler_ctxt = bl;
ds->ds_handler_is_block = true;
}
void
dispatch_source_set_event_handler(dispatch_source_t ds, dispatch_block_t handler)
{
dispatch_assert(!ds->ds_is_legacy);
handler = _dispatch_Block_copy(handler);
dispatch_barrier_async_f((dispatch_queue_t)ds,
handler, _dispatch_source_set_event_handler2);
}
static void
_dispatch_source_set_event_handler_f(void *context)
{
dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
if (ds->ds_handler_is_block && ds->ds_handler_ctxt) {
Block_release(ds->ds_handler_ctxt);
}
ds->ds_handler_func = context;
ds->ds_handler_ctxt = ds->do_ctxt;
ds->ds_handler_is_block = false;
}
void
dispatch_source_set_event_handler_f(dispatch_source_t ds,
dispatch_function_t handler)
{
dispatch_assert(!ds->ds_is_legacy);
dispatch_barrier_async_f((dispatch_queue_t)ds,
handler, _dispatch_source_set_event_handler_f);
}
static void
_dispatch_source_set_cancel_handler2(void *context)
{
dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
if (ds->ds_cancel_is_block && ds->ds_cancel_handler) {
Block_release(ds->ds_cancel_handler);
}
ds->ds_cancel_handler = context;
ds->ds_cancel_is_block = true;
}
void
dispatch_source_set_cancel_handler(dispatch_source_t ds,
dispatch_block_t handler)
{
dispatch_assert(!ds->ds_is_legacy);
handler = _dispatch_Block_copy(handler);
dispatch_barrier_async_f((dispatch_queue_t)ds,
handler, _dispatch_source_set_cancel_handler2);
}
static void
_dispatch_source_set_cancel_handler_f(void *context)
{
dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
if (ds->ds_cancel_is_block && ds->ds_cancel_handler) {
Block_release(ds->ds_cancel_handler);
}
ds->ds_cancel_handler = context;
ds->ds_cancel_is_block = false;
}
void
dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
dispatch_function_t handler)
{
dispatch_assert(!ds->ds_is_legacy);
dispatch_barrier_async_f((dispatch_queue_t)ds,
handler, _dispatch_source_set_cancel_handler_f);
}
#ifndef DISPATCH_NO_LEGACY
dispatch_source_t
_dispatch_source_create2(dispatch_source_t ds,
dispatch_source_attr_t attr,
void *context,
dispatch_source_handler_function_t handler)
{
if (ds == NULL || handler == NULL) {
return NULL;
}
ds->ds_is_legacy = true;
ds->ds_handler_func = handler;
ds->ds_handler_ctxt = context;
if (attr && attr != DISPATCH_SOURCE_CREATE_SUSPENDED) {
ds->dq_finalizer_ctxt = attr->finalizer_ctxt;
ds->dq_finalizer_func = (typeof(ds->dq_finalizer_func))attr->finalizer_func;
ds->do_ctxt = attr->context;
}
#ifdef __BLOCKS__
if (ds->dq_finalizer_func == (void*)_dispatch_call_block_and_release2) {
ds->dq_finalizer_ctxt = Block_copy(ds->dq_finalizer_ctxt);
if (!ds->dq_finalizer_ctxt) {
goto out_bad;
}
}
if (handler == _dispatch_source_call_block) {
struct Block_layout *bl = ds->ds_handler_ctxt = Block_copy(context);
if (!ds->ds_handler_ctxt) {
if (ds->dq_finalizer_func == (void*)_dispatch_call_block_and_release2) {
Block_release(ds->dq_finalizer_ctxt);
}
goto out_bad;
}
ds->ds_handler_func = (void *)bl->invoke;
ds->ds_handler_is_block = true;
}
dispatch_source_handler_function_t func = ds->ds_handler_func;
dispatch_source_handler_t block = ds->ds_handler_ctxt;
void *ctxt = ds->ds_handler_ctxt;
bool handler_is_block = ds->ds_handler_is_block;
ds->ds_cancel_is_block = true;
if (handler_is_block) {
ds->ds_cancel_handler = _dispatch_Block_copy(^{
block(ds);
});
} else {
ds->ds_cancel_handler = _dispatch_Block_copy(^{
func(ctxt, ds);
});
}
#endif
if (attr != DISPATCH_SOURCE_CREATE_SUSPENDED) {
dispatch_resume(ds);
}
return ds;
out_bad:
free(ds);
return NULL;
}
long
dispatch_source_get_error(dispatch_source_t ds, long *err_out)
{
if ((ds->ds_atomic_flags & DSF_CANCELED) && !ds->ds_dkev) {
if (err_out) {
*err_out = ECANCELED;
}
return DISPATCH_ERROR_DOMAIN_POSIX;
} else {
return DISPATCH_ERROR_DOMAIN_NO_ERROR;
}
}
#endif
void
_dispatch_timer_list_update(dispatch_source_t ds)
{
dispatch_source_t dsi = NULL;
int idx;
dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q);
if (!ds->ds_dkev) {
return;
}
_dispatch_kevent_merge(ds);
TAILQ_REMOVE(&ds->ds_dkev->dk_sources, ds, ds_list);
if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
idx = DISPATCH_TIMER_INDEX_WALL;
} else {
idx = DISPATCH_TIMER_INDEX_MACH;
}
ds->ds_dkev = &_dispatch_kevent_timer[idx];
if (ds->ds_timer.target) {
TAILQ_FOREACH(dsi, &ds->ds_dkev->dk_sources, ds_list) {
if (dsi->ds_timer.target == 0 || ds->ds_timer.target < dsi->ds_timer.target) {
break;
}
}
}
if (dsi) {
TAILQ_INSERT_BEFORE(dsi, ds, ds_list);
} else {
TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, ds, ds_list);
}
}
static void
_dispatch_run_timers2(unsigned int timer)
{
dispatch_source_t ds;
uint64_t now, missed;
if (timer == DISPATCH_TIMER_INDEX_MACH) {
now = mach_absolute_time();
} else {
now = _dispatch_get_nanoseconds();
}
while ((ds = TAILQ_FIRST(&_dispatch_kevent_timer[timer].dk_sources))) {
if (timer != ds->ds_ident_hack) {
_dispatch_timer_list_update(ds);
continue;
}
if (!ds->ds_timer.target) {
break;
}
if (ds->ds_timer.target > now) {
break;
}
if (ds->ds_timer.flags & (DISPATCH_TIMER_ONESHOT|DISPATCH_TIMER_ABSOLUTE)) {
dispatch_atomic_inc(&ds->ds_pending_data);
ds->ds_timer.target = 0;
} else {
missed = (now - ds->ds_timer.target) / ds->ds_timer.interval;
dispatch_atomic_add(&ds->ds_pending_data, missed + 1);
ds->ds_timer.target += (missed + 1) * ds->ds_timer.interval;
}
_dispatch_timer_list_update(ds);
_dispatch_wakeup(ds);
}
}
void
_dispatch_run_timers(void)
{
unsigned int i;
for (i = 0; i < DISPATCH_TIMER_COUNT; i++) {
_dispatch_run_timers2(i);
}
}
#if defined(__i386__) || defined(__x86_64__)
#define _dispatch_convert_mach2nano(x) (x)
#define _dispatch_convert_nano2mach(x) (x)
#else
static mach_timebase_info_data_t tbi;
static dispatch_once_t tbi_pred;
static void
_dispatch_convert_init(void *context __attribute__((unused)))
{
dispatch_assume_zero(mach_timebase_info(&tbi));
}
static uint64_t
_dispatch_convert_mach2nano(uint64_t val)
{
#ifdef __LP64__
__uint128_t tmp;
#else
long double tmp;
#endif
dispatch_once_f(&tbi_pred, NULL, _dispatch_convert_init);
tmp = val;
tmp *= tbi.numer;
tmp /= tbi.denom;
return tmp;
}
static uint64_t
_dispatch_convert_nano2mach(uint64_t val)
{
#ifdef __LP64__
__uint128_t tmp;
#else
long double tmp;
#endif
dispatch_once_f(&tbi_pred, NULL, _dispatch_convert_init);
tmp = val;
tmp *= tbi.denom;
tmp /= tbi.numer;
return tmp;
}
#endif
#define FOREVER_SEC 3153600l
#define FOREVER_NSEC 31536000000000000ull
struct timespec *
_dispatch_get_next_timer_fire(struct timespec *howsoon)
{
dispatch_source_t ds = NULL;
unsigned int timer;
uint64_t now, delta_tmp, delta = UINT64_MAX;
for (timer = 0; timer < DISPATCH_TIMER_COUNT; timer++) {
TAILQ_FOREACH(ds, &_dispatch_kevent_timer[timer].dk_sources, ds_list) {
if (!ds->ds_timer.target) {
break;
}
if (DISPATCH_OBJECT_SUSPENDED(ds)) {
ds->ds_is_armed = false;
} else {
break;
}
}
if (!ds || !ds->ds_timer.target) {
continue;
}
if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
now = _dispatch_get_nanoseconds();
} else {
now = mach_absolute_time();
}
if (ds->ds_timer.target <= now) {
howsoon->tv_sec = 0;
howsoon->tv_nsec = 0;
return howsoon;
}
delta_tmp = ds->ds_timer.target - now;
if (!(ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK)) {
delta_tmp = _dispatch_convert_mach2nano(delta_tmp);
}
if (delta_tmp < delta) {
delta = delta_tmp;
}
}
if (slowpath(delta > FOREVER_NSEC)) {
return NULL;
} else {
howsoon->tv_sec = (time_t)(delta / NSEC_PER_SEC);
howsoon->tv_nsec = (long)(delta % NSEC_PER_SEC);
}
return howsoon;
}
struct dispatch_set_timer_params {
dispatch_source_t ds;
uintptr_t ident;
struct dispatch_timer_source_s values;
};
static void
_dispatch_source_set_timer2(void *context)
{
struct dispatch_set_timer_params *params = context;
dispatch_source_t ds = params->ds;
ds->ds_ident_hack = params->ident;
ds->ds_timer = params->values;
_dispatch_timer_list_update(ds);
dispatch_resume(ds);
dispatch_release(ds);
free(params);
}
void
dispatch_source_set_timer(dispatch_source_t ds,
dispatch_time_t start,
uint64_t interval,
uint64_t leeway)
{
struct dispatch_set_timer_params *params;
if (interval == 0) {
interval = 1;
} else if ((int64_t)interval < 0) {
interval = INT64_MAX;
}
dispatch_retain(ds);
dispatch_suspend(ds);
if (start == DISPATCH_TIME_NOW) {
start = mach_absolute_time();
} else if (start == DISPATCH_TIME_FOREVER) {
start = INT64_MAX;
}
while (!(params = malloc(sizeof(struct dispatch_set_timer_params)))) {
sleep(1);
}
params->ds = ds;
params->values.flags = ds->ds_timer.flags;
if ((int64_t)start < 0) {
params->ident = DISPATCH_TIMER_INDEX_WALL;
params->values.start = -((int64_t)start);
params->values.target = -((int64_t)start);
params->values.interval = interval;
params->values.leeway = leeway;
params->values.flags |= DISPATCH_TIMER_WALL_CLOCK;
} else {
params->ident = DISPATCH_TIMER_INDEX_MACH;
params->values.start = start;
params->values.target = start;
params->values.interval = _dispatch_convert_nano2mach(interval);
params->values.leeway = _dispatch_convert_nano2mach(leeway);
params->values.flags &= ~DISPATCH_TIMER_WALL_CLOCK;
}
dispatch_barrier_async_f(&_dispatch_mgr_q, params, _dispatch_source_set_timer2);
}
#ifndef DISPATCH_NO_LEGACY
long
dispatch_source_timer_set_time(dispatch_source_t ds, uint64_t nanoseconds, uint64_t leeway)
{
dispatch_time_t start;
if (nanoseconds == 0) {
nanoseconds = 1;
}
if (ds->ds_timer.flags == (DISPATCH_TIMER_ABSOLUTE|DISPATCH_TIMER_WALL_CLOCK)) {
static const struct timespec t0;
start = dispatch_walltime(&t0, nanoseconds);
} else if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
start = dispatch_walltime(DISPATCH_TIME_NOW, nanoseconds);
} else {
start = dispatch_time(DISPATCH_TIME_NOW, nanoseconds);
}
if (ds->ds_timer.flags & (DISPATCH_TIMER_ABSOLUTE|DISPATCH_TIMER_ONESHOT)) {
nanoseconds = INT64_MAX; }
dispatch_source_set_timer(ds, start, nanoseconds, leeway);
return 0;
}
uint64_t
dispatch_event_get_nanoseconds(dispatch_source_t ds)
{
if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
return ds->ds_timer.interval;
} else {
return _dispatch_convert_mach2nano(ds->ds_timer.interval);
}
}
#endif
static dispatch_source_t _dispatch_mach_notify_source;
static mach_port_t _dispatch_port_set;
static mach_port_t _dispatch_event_port;
#define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
#define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
#define _DISPATCH_MACHPORT_HASH_SIZE 32
#define _DISPATCH_MACHPORT_HASH(x) _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
static void _dispatch_port_set_init(void *);
static mach_port_t _dispatch_get_port_set(void);
void
_dispatch_drain_mach_messages(struct kevent *ke)
{
dispatch_source_t dsi;
dispatch_kevent_t dk;
struct kevent ke2;
if (!dispatch_assume(ke->data)) {
return;
}
dk = _dispatch_kevent_find(ke->data, EVFILT_MACHPORT);
if (!dispatch_assume(dk)) {
return;
}
_dispatch_kevent_machport_disable(dk);
EV_SET(&ke2, ke->data, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH, DISPATCH_MACHPORT_RECV, 0, dk);
TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
_dispatch_source_merge_kevent(dsi, &ke2);
}
}
void
_dispatch_port_set_init(void *context __attribute__((unused)))
{
struct kevent kev = {
.filter = EVFILT_MACHPORT,
.flags = EV_ADD,
};
kern_return_t kr;
kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, &_dispatch_port_set);
DISPATCH_VERIFY_MIG(kr);
dispatch_assume_zero(kr);
kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &_dispatch_event_port);
DISPATCH_VERIFY_MIG(kr);
dispatch_assume_zero(kr);
kr = mach_port_move_member(mach_task_self(), _dispatch_event_port, _dispatch_port_set);
DISPATCH_VERIFY_MIG(kr);
dispatch_assume_zero(kr);
kev.ident = _dispatch_port_set;
_dispatch_update_kq(&kev);
}
mach_port_t
_dispatch_get_port_set(void)
{
static dispatch_once_t pred;
dispatch_once_f(&pred, NULL, _dispatch_port_set_init);
return _dispatch_port_set;
}
void
_dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags)
{
mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident;
kern_return_t kr;
if ((new_flags & DISPATCH_MACHPORT_RECV) || (!new_flags && !del_flags && dk->dk_kevent.fflags & DISPATCH_MACHPORT_RECV)) {
_dispatch_kevent_machport_enable(dk);
}
if (new_flags & DISPATCH_MACHPORT_DEAD) {
kr = mach_port_request_notification(mach_task_self(), port, MACH_NOTIFY_DEAD_NAME, 1,
_dispatch_event_port, MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
DISPATCH_VERIFY_MIG(kr);
switch(kr) {
case KERN_INVALID_NAME:
case KERN_INVALID_RIGHT:
break;
default:
if (dispatch_assume_zero(kr)) {
} else if (dispatch_assume_zero(previous)) {
kr = mach_port_deallocate(mach_task_self(), previous);
DISPATCH_VERIFY_MIG(kr);
dispatch_assume_zero(kr);
}
}
}
if (del_flags & DISPATCH_MACHPORT_RECV) {
_dispatch_kevent_machport_disable(dk);
}
if (del_flags & DISPATCH_MACHPORT_DEAD) {
kr = mach_port_request_notification(mach_task_self(), (mach_port_t)dk->dk_kevent.ident,
MACH_NOTIFY_DEAD_NAME, 1, MACH_PORT_NULL, MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
DISPATCH_VERIFY_MIG(kr);
switch (kr) {
case KERN_INVALID_NAME:
case KERN_INVALID_RIGHT:
case KERN_INVALID_ARGUMENT:
break;
default:
if (dispatch_assume_zero(kr)) {
} else if (previous) {
dispatch_assume_zero(_dispatch_send_consume_send_once_right(previous));
}
}
}
}
void
_dispatch_kevent_machport_enable(dispatch_kevent_t dk)
{
mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
kern_return_t kr;
kr = mach_port_move_member(mach_task_self(), mp, _dispatch_get_port_set());
DISPATCH_VERIFY_MIG(kr);
switch (kr) {
case KERN_INVALID_NAME:
#if DISPATCH_DEBUG
_dispatch_log("Corruption: Mach receive right 0x%x destroyed prematurely", mp);
#endif
break;
default:
dispatch_assume_zero(kr);
}
}
void
_dispatch_kevent_machport_disable(dispatch_kevent_t dk)
{
mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
kern_return_t kr;
kr = mach_port_move_member(mach_task_self(), mp, 0);
DISPATCH_VERIFY_MIG(kr);
switch (kr) {
case KERN_INVALID_RIGHT:
case KERN_INVALID_NAME:
#if DISPATCH_DEBUG
_dispatch_log("Corruption: Mach receive right 0x%x destroyed prematurely", mp);
#endif
break;
case 0:
break;
default:
dispatch_assume_zero(kr);
break;
}
}
#define _DISPATCH_MIN_MSG_SZ (8ul * 1024ul - MAX_TRAILER_SIZE)
#ifndef DISPATCH_NO_LEGACY
dispatch_source_t
dispatch_source_mig_create(mach_port_t mport, size_t max_msg_size, dispatch_source_attr_t attr,
dispatch_queue_t dq, dispatch_mig_callback_t mig_callback)
{
if (max_msg_size < _DISPATCH_MIN_MSG_SZ) {
max_msg_size = _DISPATCH_MIN_MSG_SZ;
}
return dispatch_source_machport_create(mport, DISPATCH_MACHPORT_RECV, attr, dq,
^(dispatch_source_t ds) {
if (!dispatch_source_get_error(ds, NULL)) {
if (dq->dq_width != 1) {
dispatch_retain(ds); dispatch_async(dq, ^{
dispatch_mig_server(ds, max_msg_size, mig_callback);
dispatch_release(ds); });
} else {
dispatch_mig_server(ds, max_msg_size, mig_callback);
}
}
});
}
#endif
static void
_dispatch_mach_notify_source_init(void *context __attribute__((unused)))
{
size_t maxsz = sizeof(union __RequestUnion___dispatch_send_libdispatch_internal_protocol_subsystem);
if (sizeof(union __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem) > maxsz) {
maxsz = sizeof(union __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem);
}
_dispatch_get_port_set();
_dispatch_mach_notify_source = dispatch_source_mig_create(_dispatch_event_port,
maxsz, NULL, &_dispatch_mgr_q, libdispatch_internal_protocol_server);
dispatch_assert(_dispatch_mach_notify_source);
}
kern_return_t
_dispatch_mach_notify_port_deleted(mach_port_t notify __attribute__((unused)), mach_port_name_t name)
{
dispatch_source_t dsi;
dispatch_kevent_t dk;
struct kevent kev;
#if DISPATCH_DEBUG
_dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x deleted prematurely", name);
#endif
dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
if (!dk) {
goto out;
}
EV_SET(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH|EV_EOF, DISPATCH_MACHPORT_DELETED, 0, dk);
TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
_dispatch_source_merge_kevent(dsi, &kev);
dsi->ds_pending_data_mask &= ~DISPATCH_MACHPORT_DELETED;
}
dk->dk_kevent.fflags &= ~DISPATCH_MACHPORT_DELETED;
out:
return KERN_SUCCESS;
}
kern_return_t
_dispatch_mach_notify_port_destroyed(mach_port_t notify __attribute__((unused)), mach_port_t name)
{
kern_return_t kr;
dispatch_assume_zero(name);
kr = mach_port_mod_refs(mach_task_self(), name, MACH_PORT_RIGHT_RECEIVE, -1);
DISPATCH_VERIFY_MIG(kr);
dispatch_assume_zero(kr);
return KERN_SUCCESS;
}
kern_return_t
_dispatch_mach_notify_no_senders(mach_port_t notify, mach_port_mscount_t mscnt __attribute__((unused)))
{
dispatch_assume_zero(notify);
return KERN_SUCCESS;
}
kern_return_t
_dispatch_mach_notify_send_once(mach_port_t notify __attribute__((unused)))
{
#if DISPATCH_DEBUG
_dispatch_log("Corruption: An app/library deleted a libdispatch dead-name notification");
#endif
return KERN_SUCCESS;
}
kern_return_t
_dispatch_mach_notify_dead_name(mach_port_t notify __attribute__((unused)), mach_port_name_t name)
{
dispatch_source_t dsi;
dispatch_kevent_t dk;
struct kevent kev;
kern_return_t kr;
dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
if (!dk) {
goto out;
}
EV_SET(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH|EV_EOF, DISPATCH_MACHPORT_DEAD, 0, dk);
TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
_dispatch_source_merge_kevent(dsi, &kev);
dsi->ds_pending_data_mask &= ~DISPATCH_MACHPORT_DEAD;
}
dk->dk_kevent.fflags &= ~DISPATCH_MACHPORT_DEAD;
out:
kr = mach_port_deallocate(mach_task_self(), name);
DISPATCH_VERIFY_MIG(kr);
return KERN_SUCCESS;
}
kern_return_t
_dispatch_wakeup_main_thread(mach_port_t mp __attribute__((unused)))
{
return 0;
}
kern_return_t
_dispatch_consume_send_once_right(mach_port_t mp __attribute__((unused)))
{
return 0;
}
mach_msg_return_t
dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz, dispatch_mig_callback_t callback)
{
mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT
| MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX)
| MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0);
mach_msg_options_t tmp_options = options;
mig_reply_error_t *bufTemp, *bufRequest, *bufReply;
mach_msg_return_t kr = 0;
unsigned int cnt = 1000; int demux_success;
maxmsgsz += MAX_TRAILER_SIZE;
bufRequest = alloca(maxmsgsz);
bufReply = alloca(maxmsgsz);
bufReply->Head.msgh_size = 0;
for (;;) {
if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) {
options &= ~MACH_RCV_MSG;
tmp_options &= ~MACH_RCV_MSG;
if (!(tmp_options & MACH_SEND_MSG)) {
break;
}
}
kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size,
(mach_msg_size_t)maxmsgsz, (mach_port_t)ds->ds_ident_hack, 0, 0);
tmp_options = options;
if (slowpath(kr)) {
switch (kr) {
case MACH_SEND_INVALID_DEST:
case MACH_SEND_TIMED_OUT:
if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
mach_msg_destroy(&bufReply->Head);
}
break;
case MACH_RCV_TIMED_OUT:
case MACH_RCV_INVALID_NAME:
break;
default:
dispatch_assume_zero(kr);
break;
}
break;
}
if (!(tmp_options & MACH_RCV_MSG)) {
break;
}
bufTemp = bufRequest;
bufRequest = bufReply;
bufReply = bufTemp;
demux_success = callback(&bufRequest->Head, &bufReply->Head);
if (!demux_success) {
bufRequest->Head.msgh_remote_port = 0;
mach_msg_destroy(&bufRequest->Head);
} else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) {
if (slowpath(bufReply->RetCode)) {
if (bufReply->RetCode == MIG_NO_REPLY) {
continue;
}
bufRequest->Head.msgh_remote_port = 0;
mach_msg_destroy(&bufRequest->Head);
}
}
if (bufReply->Head.msgh_remote_port) {
tmp_options |= MACH_SEND_MSG;
if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) != MACH_MSG_TYPE_MOVE_SEND_ONCE) {
tmp_options |= MACH_SEND_TIMEOUT;
}
}
}
return kr;
}