#include "pth_p.h"
intern pth_t pth_main;
intern pth_t pth_sched;
intern pth_t pth_current;
intern pth_pqueue_t pth_NQ;
intern pth_pqueue_t pth_RQ;
intern pth_pqueue_t pth_WQ;
intern pth_pqueue_t pth_SQ;
intern pth_pqueue_t pth_DQ;
intern float pth_loadval;
static int pth_sigpipe[2];
static sigset_t pth_sigpending;
static sigset_t pth_sigblock;
static sigset_t pth_sigcatch;
static sigset_t pth_sigraised;
static pth_time_t pth_loadticknext;
static pth_time_t pth_loadtickgap = PTH_TIME(1,0);
intern void pth_scheduler_init(void)
{
if (pipe(pth_sigpipe) == -1) {
fprintf(stderr, "**Pth** INIT: Cannot create internal pipe: %s\n",
strerror(errno));
abort();
}
pth_fdmode(pth_sigpipe[0], PTH_FDMODE_NONBLOCK);
pth_fdmode(pth_sigpipe[1], PTH_FDMODE_NONBLOCK);
pth_sched = NULL;
pth_current = NULL;
pth_pqueue_init(&pth_NQ);
pth_pqueue_init(&pth_RQ);
pth_pqueue_init(&pth_WQ);
pth_pqueue_init(&pth_SQ);
pth_pqueue_init(&pth_DQ);
pth_loadval = 1.0;
pth_time_set(&pth_loadticknext, PTH_TIME_NOW);
return;
}
intern void pth_scheduler_drop(void)
{
pth_t t;
while ((t = pth_pqueue_delmax(&pth_NQ)) != NULL);
pth_tcb_free(t);
pth_pqueue_init(&pth_NQ);
while ((t = pth_pqueue_delmax(&pth_RQ)) != NULL);
pth_tcb_free(t);
pth_pqueue_init(&pth_RQ);
while ((t = pth_pqueue_delmax(&pth_WQ)) != NULL);
pth_tcb_free(t);
pth_pqueue_init(&pth_WQ);
while ((t = pth_pqueue_delmax(&pth_SQ)) != NULL);
pth_tcb_free(t);
pth_pqueue_init(&pth_SQ);
while ((t = pth_pqueue_delmax(&pth_DQ)) != NULL);
pth_tcb_free(t);
pth_pqueue_init(&pth_DQ);
return;
}
intern void pth_scheduler_kill(void)
{
pth_scheduler_drop();
close(pth_sigpipe[0]);
close(pth_sigpipe[1]);
return;
}
#define pth_scheduler_load(now) \
if (pth_time_cmp((now), &pth_loadticknext) >= 0) { \
pth_time_t ttmp; \
int numready; \
numready = pth_pqueue_elements(&pth_RQ); \
pth_time_set(&ttmp, (now)); \
do { \
pth_loadval = (numready*0.25) + (pth_loadval*0.75); \
pth_time_sub(&ttmp, &pth_loadtickgap); \
} while (pth_time_cmp(&ttmp, &pth_loadticknext) >= 0); \
pth_time_set(&pth_loadticknext, (now)); \
pth_time_add(&pth_loadticknext, &pth_loadtickgap); \
}
intern void *pth_scheduler(void *dummy)
{
sigset_t sigs;
pth_time_t running;
pth_time_t snapshot;
struct sigaction sa;
sigset_t ss;
int sig;
pth_t t;
pth_debug1("pth_scheduler: bootstrapping");
pth_sched->state = PTH_STATE_SCHEDULER;
sigfillset(&sigs);
pth_sc(sigprocmask)(SIG_SETMASK, &sigs, NULL);
pth_time_set(&snapshot, PTH_TIME_NOW);
for (;;) {
while ((t = pth_pqueue_tail(&pth_NQ)) != NULL) {
pth_pqueue_delete(&pth_NQ, t);
t->state = PTH_STATE_READY;
pth_pqueue_insert(&pth_RQ, pth_pqueue_favorite_prio(&pth_RQ), t);
pth_debug2("pth_scheduler: new thread \"%s\" moved to top of ready queue", t->name);
}
pth_scheduler_load(&snapshot);
pth_current = pth_pqueue_delmax(&pth_RQ);
if (pth_current == NULL) {
fprintf(stderr, "**Pth** SCHEDULER INTERNAL ERROR: "
"no more thread(s) available to schedule!?!?\n");
abort();
}
pth_debug4("pth_scheduler: thread \"%s\" selected (prio=%d, qprio=%d)",
pth_current->name, pth_current->prio, pth_current->q_prio);
if (pth_current->sigpendcnt > 0) {
sigpending(&pth_sigpending);
for (sig = 1; sig < PTH_NSIG; sig++)
if (sigismember(&pth_current->sigpending, sig))
if (!sigismember(&pth_sigpending, sig))
kill(getpid(), sig);
}
pth_debug3("pth_scheduler: switching to thread 0x%lx (\"%s\")",
(unsigned long)pth_current, pth_current->name);
pth_time_set(&pth_current->lastran, PTH_TIME_NOW);
pth_time_set(&running, &pth_current->lastran);
pth_time_sub(&running, &snapshot);
pth_time_add(&pth_sched->running, &running);
pth_mctx_switch(&pth_sched->mctx, &pth_current->mctx);
pth_time_set(&snapshot, PTH_TIME_NOW);
pth_debug3("pth_scheduler: cameback from thread 0x%lx (\"%s\")",
(unsigned long)pth_current, pth_current->name);
pth_time_set(&running, &snapshot);
pth_time_sub(&running, &pth_current->lastran);
pth_time_add(&pth_current->running, &running);
pth_debug3("pth_scheduler: thread \"%s\" ran %.6f",
pth_current->name, pth_time_t2d(&running));
if (pth_current->sigpendcnt > 0) {
sigset_t sigstillpending;
sigpending(&sigstillpending);
for (sig = 1; sig < PTH_NSIG; sig++) {
if (sigismember(&pth_current->sigpending, sig)) {
if (!sigismember(&sigstillpending, sig)) {
sigdelset(&pth_current->sigpending, sig);
pth_current->sigpendcnt--;
}
else if (!sigismember(&pth_sigpending, sig)) {
pth_util_sigdelete(sig);
}
}
}
}
if (pth_current->stackguard != NULL) {
if (*pth_current->stackguard != 0xDEAD) {
pth_debug3("pth_scheduler: stack overflow detected for thread 0x%lx (\"%s\")",
(unsigned long)pth_current, pth_current->name);
if (sigaction(SIGSEGV, NULL, &sa) == 0) {
if (sa.sa_handler == SIG_DFL) {
fprintf(stderr, "**Pth** STACK OVERFLOW: thread pid_t=0x%lx, name=\"%s\"\n",
(unsigned long)pth_current, pth_current->name);
kill(getpid(), SIGSEGV);
sigfillset(&ss);
sigdelset(&ss, SIGSEGV);
sigsuspend(&ss);
abort();
}
}
pth_current->join_arg = (void *)0xDEAD;
pth_current->state = PTH_STATE_DEAD;
kill(getpid(), SIGSEGV);
}
}
if (pth_current->state == PTH_STATE_DEAD) {
pth_debug2("pth_scheduler: marking thread \"%s\" as dead", pth_current->name);
if (!pth_current->joinable)
pth_tcb_free(pth_current);
else
pth_pqueue_insert(&pth_DQ, PTH_PRIO_STD, pth_current);
pth_current = NULL;
}
if (pth_current != NULL && pth_current->state == PTH_STATE_WAITING) {
pth_debug2("pth_scheduler: moving thread \"%s\" to waiting queue",
pth_current->name);
pth_pqueue_insert(&pth_WQ, pth_current->prio, pth_current);
pth_current = NULL;
}
pth_pqueue_increase(&pth_RQ);
if (pth_current != NULL)
pth_pqueue_insert(&pth_RQ, pth_current->prio, pth_current);
if ( pth_pqueue_elements(&pth_RQ) == 0
&& pth_pqueue_elements(&pth_NQ) == 0)
pth_sched_eventmanager(&snapshot, FALSE );
else
pth_sched_eventmanager(&snapshot, TRUE );
}
return NULL;
}
intern void pth_sched_eventmanager(pth_time_t *now, int dopoll)
{
pth_t nexttimer_thread;
pth_event_t nexttimer_ev;
pth_time_t nexttimer_value;
pth_event_t evh;
pth_event_t ev;
pth_t t;
pth_t tlast;
int this_occurred;
int any_occurred;
fd_set rfds;
fd_set wfds;
fd_set efds;
struct timeval delay;
struct timeval *pdelay;
sigset_t oss;
struct sigaction sa;
struct sigaction osa[1+PTH_NSIG];
char minibuf[128];
int loop_repeat;
int fdmax;
int rc;
int sig;
int n;
pth_debug2("pth_sched_eventmanager: enter in %s mode",
dopoll ? "polling" : "waiting");
loop_entry:
loop_repeat = FALSE;
FD_ZERO(&rfds);
FD_ZERO(&wfds);
FD_ZERO(&efds);
fdmax = -1;
sigpending(&pth_sigpending);
sigfillset(&pth_sigblock);
sigemptyset(&pth_sigcatch);
sigemptyset(&pth_sigraised);
pth_time_set(&nexttimer_value, PTH_TIME_ZERO);
nexttimer_thread = NULL;
nexttimer_ev = NULL;
any_occurred = FALSE;
for (t = pth_pqueue_head(&pth_WQ); t != NULL;
t = pth_pqueue_walk(&pth_WQ, t, PTH_WALK_NEXT)) {
for (sig = 1; sig < PTH_NSIG; sig++)
if (!sigismember(&(t->mctx.sigs), sig))
sigdelset(&pth_sigblock, sig);
if (t->cancelreq == TRUE)
any_occurred = TRUE;
if (t->events == NULL)
continue;
ev = evh = t->events;
do {
if (!ev->ev_occurred) {
this_occurred = FALSE;
if (ev->ev_type == PTH_EVENT_FD) {
if (ev->ev_goal & PTH_UNTIL_FD_READABLE)
FD_SET(ev->ev_args.FD.fd, &rfds);
if (ev->ev_goal & PTH_UNTIL_FD_WRITEABLE)
FD_SET(ev->ev_args.FD.fd, &wfds);
if (ev->ev_goal & PTH_UNTIL_FD_EXCEPTION)
FD_SET(ev->ev_args.FD.fd, &efds);
if (fdmax < ev->ev_args.FD.fd)
fdmax = ev->ev_args.FD.fd;
}
else if (ev->ev_type == PTH_EVENT_SELECT) {
pth_util_fds_merge(ev->ev_args.SELECT.nfd,
ev->ev_args.SELECT.rfds, &rfds,
ev->ev_args.SELECT.wfds, &wfds,
ev->ev_args.SELECT.efds, &efds);
if (fdmax < ev->ev_args.SELECT.nfd-1)
fdmax = ev->ev_args.SELECT.nfd-1;
}
else if (ev->ev_type == PTH_EVENT_SIGS) {
for (sig = 1; sig < PTH_NSIG; sig++) {
if (sigismember(ev->ev_args.SIGS.sigs, sig)) {
if (sigismember(&t->sigpending, sig)) {
*(ev->ev_args.SIGS.sig) = sig;
sigdelset(&t->sigpending, sig);
t->sigpendcnt--;
this_occurred = TRUE;
}
if (sigismember(&pth_sigpending, sig)) {
if (ev->ev_args.SIGS.sig != NULL)
*(ev->ev_args.SIGS.sig) = sig;
pth_util_sigdelete(sig);
sigdelset(&pth_sigpending, sig);
this_occurred = TRUE;
}
else {
sigdelset(&pth_sigblock, sig);
sigaddset(&pth_sigcatch, sig);
}
}
}
}
else if (ev->ev_type == PTH_EVENT_TIME) {
if (pth_time_cmp(&(ev->ev_args.TIME.tv), now) < 0)
this_occurred = TRUE;
else {
if ((nexttimer_thread == NULL && nexttimer_ev == NULL) ||
pth_time_cmp(&(ev->ev_args.TIME.tv), &nexttimer_value) < 0) {
nexttimer_thread = t;
nexttimer_ev = ev;
pth_time_set(&nexttimer_value, &(ev->ev_args.TIME.tv));
}
}
}
else if (ev->ev_type == PTH_EVENT_MSG) {
if (pth_ring_elements(&(ev->ev_args.MSG.mp->mp_queue)) > 0)
this_occurred = TRUE;
}
else if (ev->ev_type == PTH_EVENT_MUTEX) {
if (!(ev->ev_args.MUTEX.mutex->mx_state & PTH_MUTEX_LOCKED))
this_occurred = TRUE;
}
else if (ev->ev_type == PTH_EVENT_COND) {
if (ev->ev_args.COND.cond->cn_state & PTH_COND_SIGNALED) {
if (ev->ev_args.COND.cond->cn_state & PTH_COND_BROADCAST)
this_occurred = TRUE;
else {
if (!(ev->ev_args.COND.cond->cn_state & PTH_COND_HANDLED)) {
ev->ev_args.COND.cond->cn_state |= PTH_COND_HANDLED;
this_occurred = TRUE;
}
}
}
}
else if (ev->ev_type == PTH_EVENT_TID) {
if ( ( ev->ev_args.TID.tid == NULL
&& pth_pqueue_elements(&pth_DQ) > 0)
|| ( ev->ev_args.TID.tid != NULL
&& ev->ev_args.TID.tid->state == ev->ev_goal))
this_occurred = TRUE;
}
else if (ev->ev_type == PTH_EVENT_FUNC) {
if (ev->ev_args.FUNC.func(ev->ev_args.FUNC.arg))
this_occurred = TRUE;
else {
pth_time_t tv;
pth_time_set(&tv, now);
pth_time_add(&tv, &(ev->ev_args.FUNC.tv));
if ((nexttimer_thread == NULL && nexttimer_ev == NULL) ||
pth_time_cmp(&tv, &nexttimer_value) < 0) {
nexttimer_thread = t;
nexttimer_ev = ev;
pth_time_set(&nexttimer_value, &tv);
}
}
}
if (this_occurred) {
pth_debug2("pth_sched_eventmanager: [non-I/O] event occurred for thread \"%s\"", t->name);
ev->ev_occurred = TRUE;
any_occurred = TRUE;
}
}
} while ((ev = ev->ev_next) != evh);
}
if (any_occurred)
dopoll = TRUE;
if (dopoll) {
pth_time_set(&delay, PTH_TIME_ZERO);
pdelay = &delay;
}
else if (nexttimer_ev != NULL) {
pth_time_set(&delay, &nexttimer_value);
pth_time_sub(&delay, now);
pdelay = &delay;
}
else {
pdelay = NULL;
}
while (pth_sc(read)(pth_sigpipe[0], minibuf, sizeof(minibuf)) > 0) ;
FD_SET(pth_sigpipe[0], &rfds);
if (fdmax < pth_sigpipe[0])
fdmax = pth_sigpipe[0];
for (sig = 1; sig < PTH_NSIG; sig++) {
if (sigismember(&pth_sigcatch, sig)) {
sa.sa_handler = pth_sched_eventmanager_sighandler;
sigfillset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(sig, &sa, &osa[sig]);
}
}
pth_sc(sigprocmask)(SIG_SETMASK, &pth_sigblock, &oss);
rc = -1;
if (!(dopoll && fdmax == -1))
while ((rc = pth_sc(select)(fdmax+1, &rfds, &wfds, &efds, pdelay)) < 0
&& errno == EINTR) ;
pth_sc(sigprocmask)(SIG_SETMASK, &oss, NULL);
for (sig = 1; sig < PTH_NSIG; sig++)
if (sigismember(&pth_sigcatch, sig))
sigaction(sig, &osa[sig], NULL);
if (!dopoll && rc == 0 && nexttimer_ev != NULL) {
if (nexttimer_ev->ev_type == PTH_EVENT_FUNC) {
loop_repeat = TRUE;
}
else {
pth_debug2("pth_sched_eventmanager: [timeout] event occurred for thread \"%s\"",
nexttimer_thread->name);
nexttimer_ev->ev_occurred = TRUE;
}
}
if (!dopoll && rc > 0 && FD_ISSET(pth_sigpipe[0], &rfds)) {
FD_CLR(pth_sigpipe[0], &rfds);
rc--;
}
if (rc <= 0) {
FD_ZERO(&rfds);
FD_ZERO(&wfds);
FD_ZERO(&efds);
}
t = pth_pqueue_head(&pth_WQ);
while (t != NULL) {
any_occurred = FALSE;
if (t->events != NULL) {
ev = evh = t->events;
do {
if (!ev->ev_occurred) {
if (ev->ev_type == PTH_EVENT_FD) {
if ( ( ev->ev_goal & PTH_UNTIL_FD_READABLE
&& FD_ISSET(ev->ev_args.FD.fd, &rfds))
|| ( ev->ev_goal & PTH_UNTIL_FD_WRITEABLE
&& FD_ISSET(ev->ev_args.FD.fd, &wfds))
|| ( ev->ev_goal & PTH_UNTIL_FD_EXCEPTION
&& FD_ISSET(ev->ev_args.FD.fd, &efds)) ) {
pth_debug2("pth_sched_eventmanager: "
"[I/O] event occurred for thread \"%s\"", t->name);
ev->ev_occurred = TRUE;
}
}
else if (ev->ev_type == PTH_EVENT_SELECT) {
if (pth_util_fds_test(ev->ev_args.SELECT.nfd,
ev->ev_args.SELECT.rfds, &rfds,
ev->ev_args.SELECT.wfds, &wfds,
ev->ev_args.SELECT.efds, &efds)) {
n = pth_util_fds_select(ev->ev_args.SELECT.nfd,
ev->ev_args.SELECT.rfds, &rfds,
ev->ev_args.SELECT.wfds, &wfds,
ev->ev_args.SELECT.efds, &efds);
if (ev->ev_args.SELECT.n != NULL)
*(ev->ev_args.SELECT.n) = n;
ev->ev_occurred = TRUE;
pth_debug2("pth_sched_eventmanager: "
"[I/O] event occurred for thread \"%s\"", t->name);
}
}
else if (ev->ev_type == PTH_EVENT_SIGS) {
for (sig = 1; sig < PTH_NSIG; sig++) {
if (sigismember(ev->ev_args.SIGS.sigs, sig)) {
if (sigismember(&pth_sigraised, sig)) {
if (ev->ev_args.SIGS.sig != NULL)
*(ev->ev_args.SIGS.sig) = sig;
pth_debug2("pth_sched_eventmanager: "
"[signal] event occurred for thread \"%s\"", t->name);
sigdelset(&pth_sigraised, sig);
ev->ev_occurred = TRUE;
}
}
}
}
}
else {
if (ev->ev_type == PTH_EVENT_COND) {
if (ev->ev_args.COND.cond->cn_state & PTH_COND_SIGNALED) {
ev->ev_args.COND.cond->cn_state &= ~(PTH_COND_SIGNALED);
ev->ev_args.COND.cond->cn_state &= ~(PTH_COND_BROADCAST);
ev->ev_args.COND.cond->cn_state &= ~(PTH_COND_HANDLED);
}
}
}
if (ev->ev_occurred)
any_occurred = TRUE;
} while ((ev = ev->ev_next) != evh);
}
if (t->cancelreq == TRUE) {
pth_debug2("pth_sched_eventmanager: cancellation request pending for thread \"%s\"", t->name);
any_occurred = TRUE;
}
tlast = t;
t = pth_pqueue_walk(&pth_WQ, t, PTH_WALK_NEXT);
if (any_occurred) {
pth_pqueue_delete(&pth_WQ, tlast);
tlast->state = PTH_STATE_READY;
pth_pqueue_insert(&pth_RQ, tlast->prio+1, tlast);
pth_debug2("pth_sched_eventmanager: thread \"%s\" moved from waiting "
"to ready queue", tlast->name);
}
}
if (loop_repeat) {
pth_time_set(now, PTH_TIME_NOW);
goto loop_entry;
}
pth_debug1("pth_sched_eventmanager: leaving");
return;
}
intern void pth_sched_eventmanager_sighandler(int sig)
{
char c;
sigaddset(&pth_sigraised, sig);
c = (int)sig;
pth_sc(write)(pth_sigpipe[1], &c, sizeof(char));
return;
}