#include "hi_locl.h"
#include <dispatch/dispatch.h>
#include "heap.h"
struct heim_event_data {
heap_ptr hptr;
dispatch_semaphore_t running;
int flags;
#define RUNNING 1
#define IN_FREE 2
heim_ipc_event_callback_t callback;
heim_ipc_event_final_t final;
void *ctx;
time_t t;
};
static dispatch_queue_t timer_sync_q;
static dispatch_queue_t timer_job_q;
static Heap *timer_heap;
static dispatch_source_t timer_source;
static int
event_cmp_fn(const void *aptr, const void *bptr)
{
const struct heim_event_data *a = aptr;
const struct heim_event_data *b = bptr;
return (int)(a->t - b->t);
}
static void
reschedule_timer(void)
{
const struct heim_event_data *e = heap_head(timer_heap);
if (e == NULL) {
dispatch_source_set_timer(timer_source,
DISPATCH_TIME_FOREVER, 0, 10ull * NSEC_PER_SEC);
} else {
struct timespec ts;
ts.tv_sec = e->t;
ts.tv_nsec = 0;
dispatch_source_set_timer(timer_source,
dispatch_walltime(&ts, 0),
0, 10ull * NSEC_PER_SEC);
}
}
static void
trigger_jobs(void)
{
time_t now = time(NULL);
while (1) {
struct heim_event_data *e = rk_UNCONST(heap_head(timer_heap));
if (e != NULL && e->t < now) {
heap_remove_head(timer_heap);
e->hptr = HEAP_INVALID_PTR;
if (e->flags & RUNNING) {
e->t = now + 10;
heap_insert(timer_heap, e, &e->hptr);
continue;
}
e->flags |= RUNNING;
_heim_ipc_suspend_timer();
dispatch_async(timer_job_q, ^{
e->callback(e, e->ctx);
dispatch_async(timer_sync_q, ^{
e->flags &= ~RUNNING;
if (e->running)
dispatch_semaphore_signal(e->running);
_heim_ipc_restart_timer();
});
});
} else
break;
}
reschedule_timer();
}
static void
timer_init(void)
{
static dispatch_once_t once;
dispatch_once(&once, ^{
timer_sync_q = dispatch_queue_create("hiem-timer-q", NULL);
timer_job_q = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
timer_heap = heap_new(11, event_cmp_fn);
timer_source = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER,
0, 0, timer_sync_q);
dispatch_source_set_event_handler(timer_source, ^{ trigger_jobs(); });
dispatch_resume(timer_source);
});
}
heim_event_t
heim_ipc_event_create_f(heim_ipc_event_callback_t cb, void *ctx)
{
heim_event_t e;
timer_init();
e = malloc(sizeof(*e));
if (e == NULL)
return NULL;
e->hptr = HEAP_INVALID_PTR;
e->running = NULL;
e->flags = 0;
e->callback = cb;
e->ctx = ctx;
e->t = 0;
return e;
}
int
heim_ipc_event_set_time(heim_event_t e, time_t t)
{
dispatch_sync(timer_sync_q, ^{
time_t next;
if (e->flags & IN_FREE)
abort();
if (e->hptr != HEAP_INVALID_PTR)
heap_remove(timer_heap, e->hptr);
next = time(NULL);
if (t > next)
next = t;
e->t = next;
heap_insert(timer_heap, e, &e->hptr);
reschedule_timer();
});
return 0;
}
void
heim_ipc_event_cancel(heim_event_t e)
{
dispatch_async(timer_sync_q, ^{
if (e->hptr != HEAP_INVALID_PTR) {
heap_remove(timer_heap, e->hptr);
e->hptr = HEAP_INVALID_PTR;
}
e->t = 0;
reschedule_timer();
});
}
void
heim_ipc_event_free(heim_event_t e)
{
dispatch_async(timer_sync_q, ^{
e->flags |= IN_FREE;
if ((e->hptr != HEAP_INVALID_PTR))
abort();
if (e->final || (e->flags & RUNNING)) {
int wait_running = (e->flags & RUNNING);
if (wait_running)
e->running = dispatch_semaphore_create(0);
dispatch_async(timer_job_q, ^{
if (wait_running) {
dispatch_semaphore_wait(e->running,
DISPATCH_TIME_FOREVER);
dispatch_release(e->running);
}
if (e->final)
e->final(e->ctx);
free(e);
});
} else {
free(e);
}
});
}
void
heim_ipc_event_set_final_f(heim_event_t e, heim_ipc_event_final_t f)
{
e->final = f;
}