/*********************************************************************** * * * This software is part of the ast package * * Copyright (c) 1990-2011 AT&T Intellectual Property * * and is licensed under the * * Common Public License, Version 1.0 * * by AT&T Intellectual Property * * * * A copy of the License is available at * * http://www.opensource.org/licenses/cpl1.0.txt * * (with md5 checksum 059e8cd6165cb4c31e351f2b69388fd9) * * * * Information and Software Systems Research * * AT&T Research * * Florham Park NJ * * * * Glenn Fowler * * * ***********************************************************************/ #pragma prototyped /* * Glenn Fowler * AT&T Research * * send an action to the coshell for execution */ #include "colib.h" #include #include static Cojob_t* service(register Coshell_t* co, Coservice_t* cs, Cojob_t* cj, int flags, Sfio_t* sp) { Proc_t* proc; size_t n; int i; int j; int fds[2]; long ops[4]; char* s; char** a; if (flags & CO_DEBUG) { for (a = cs->argv; *a; a++) sfprintf(sp, " %s", *a); if (!(s = costash(sp))) goto nospace; errormsg(state.lib, ERROR_LIBRARY|2, "service %s:%s", cs->path, s); } if (pipe(fds) < 0) { errormsg(state.lib, ERROR_LIBRARY|ERROR_SYSTEM|2, "%s: cannot allocate service pipe", cs->name); return 0; } if (co->flags & CO_SHELL) for (i = 0; i < elementsof(fds); i++) if (fds[i] < 10 && (j = fcntl(fds[i], F_DUPFD, 10)) >= 0) { close(fds[i]); fds[i] = j; } cs->fd = fds[1]; ops[0] = PROC_FD_DUP(fds[0], 0, PROC_FD_PARENT); ops[1] = PROC_FD_CLOSE(fds[1], PROC_FD_CHILD); ops[2] = PROC_FD_DUP(co->gsmfd, 1, 0); ops[3] = 0; if (!(proc = procopen(cs->path, cs->argv, NiL, ops, PROC_DAEMON|PROC_IGNORE))) { errormsg(state.lib, ERROR_LIBRARY|ERROR_SYSTEM|2, "%s: cannot connect to %s service", cs->path, cs->name); close(fds[0]); close(fds[1]); return 0; } fcntl(cs->fd, F_SETFD, FD_CLOEXEC); cs->pid = proc->pid; procfree(proc); sfprintf(sp, "id=%d info\n", cj->id); n = sfstrtell(sp); if (!(s = costash(sp))) goto bad; if (write(cs->fd, s, n) != n || sfpoll(&co->msgfp, 1, 5 * 1000) <= 0) goto bad; cj->pid = 0; cj->status = 0; cj->local = 0; cj->service = cs; co->svc_outstanding++; co->svc_running++; if (!cowait(co, cj, -1)) goto bad; return cj; bad: errormsg(state.lib, ERROR_LIBRARY|ERROR_SYSTEM|2, "%s: service not responding", cs->name); nospace: cj->pid = CO_PID_FREE; cs->pid = 0; close(cs->fd); cs->fd = -1; return 0; } static Cojob_t* request(register Coshell_t* co, Cojob_t* cj, Coservice_t* cs, const char* action, int flags) { ssize_t n; ssize_t i; Sfio_t* sp; if (!(sp = sfstropen())) { errormsg(state.lib, ERROR_LIBRARY|2, "out of space"); return 0; } if (!cs->fd && !service(co, cs, cj, flags, sp)) goto bad; if (!cs->pid) goto bad; if (flags & CO_DEBUG) errormsg(state.lib, ERROR_LIBRARY|2, "job %d commands:\n\n%s %s\n", cj->id, cs->name, action); if (!(flags & CO_SILENT)) sfprintf(sfstderr, "+ %s %s\n", cs->name, action); sfprintf(sp, "id=%d %s\n", cj->id, action); n = sfstrtell(sp); action = sfstrbase(sp); while ((i = write(cs->fd, action, n)) > 0 && (n -= i) > 0) action += i; sfstrclose(sp); if (n) goto bad; sfclose(sp); cj->pid = 0; cj->status = 0; cj->local = 0; cj->service = cs; co->svc_outstanding++; co->svc_running++; co->total++; return cj; bad: cj->pid = CO_PID_FREE; sfclose(sp); return 0; } Cojob_t* coexec(register Coshell_t* co, const char* action, int flags, const char* out, const char* err, const char* att) { register Cojob_t* cj; register Sfio_t* sp; register Coservice_t* cs; int n; int i; int og; int cg; char* s; char* t; char* env; char* red; char* sh[4]; struct stat sto; struct stat ste; /* * get a free job slot */ for (cj = co->jobs; cj; cj = cj->next) if (cj->pid == CO_PID_FREE) break; if (cj) cj->service = 0; else if (!(cj = vmnewof(co->vm, 0, Cojob_t, 1, 0))) return 0; else { cj->coshell = co; cj->pid = CO_PID_FREE; cj->id = ++co->slots; cj->next = co->jobs; co->jobs = cj; } /* * set the flags */ flags &= ~co->mask; flags |= co->flags; cj->flags = flags; /* * check service intercepts */ for (cs = co->service; cs; cs = cs->next) { for (s = cs->name, t = (char*)action; *s && *s == *t; s++, t++); if (!*s && *t == ' ') return request(co, cj, cs, t + 1, flags); } cj->flags &= ~CO_SERVICE; red = (cj->flags & CO_APPEND) ? ">>" : ">"; /* * package the action */ if (!(env = coinitialize(co, co->flags))) return 0; if (!(sp = sfstropen())) return 0; n = strlen(action); if (co->flags & CO_SERVER) { /* * leave it to server */ sfprintf(sp, "#%05d\ne %d %d %s %s %s", 0, cj->id, cj->flags, state.pwd, out, err); if (att) sfprintf(sp, " (%d:%s)", strlen(att), att); else sfprintf(sp, " %s", att); sfprintf(sp, " (%d:%s) (%d:%s)\n", strlen(env), env, n, action); } else if (co->flags & CO_INIT) { if (flags & CO_DEBUG) sfprintf(sp, "set -x\n"); sfprintf(sp, "%s%s\necho x %d $? >&$%s\n", env, action, cj->id, CO_ENV_MSGFD); } else if (flags & CO_KSH) { #if !_lib_fork && defined(_map_spawnve) Sfio_t* tp; tp = sp; if (!(sp = sfstropen())) sp = tp; #endif sfprintf(sp, "{\ntrap 'set %s$?; trap \"\" 0; IFS=\"\n\"; print -u$%s x %d $1 $(times); exit $1' 0 HUP INT QUIT TERM%s\n%s%s%s", (flags & CO_SILENT) ? "" : "+x ", CO_ENV_MSGFD, cj->id, (flags & CO_IGNORE) ? "" : " ERR", env, n > CO_MAXEVAL ? "" : "eval '", (flags & CO_SILENT) ? "" : "set -x\n"); if (n > CO_MAXEVAL) sfputr(sp, action, -1); else { coquote(sp, action, 0); sfprintf(sp, "\n'"); } sfprintf(sp, "\n} out = pathtemp(NiL, 64, NiL, "coo", NiL))) sfprintf(sp, " >%s", cj->out); if (err) { if (out && streq(out, err)) sfprintf(sp, " 2>&1"); else if (*err == '/') sfprintf(sp, " 2%s%s", red, err); else sfprintf(sp, " 2%s%s/%s", red, state.pwd, err); } else if (flags & CO_SERIALIZE) { if (!out && !fstat(1, &sto) && !fstat(2, &ste) && sto.st_dev == ste.st_dev && sto.st_ino == ste.st_ino) sfprintf(sp, " 2>&1"); else if (cj->err = pathtemp(NiL, 64, NiL, "coe", NiL)) sfprintf(sp, " 2>%s", cj->err); } #if !_lib_fork && defined(_map_spawnve) if (sp != tp) { sfprintf(tp, "%s -c '", state.sh); if (!(s = costash(sp))) return 0; coquote(tp, s, 0); sfprintf(tp, "'"); sfstrclose(sp); sp = tp; } #endif sfprintf(sp, " &\nprint -u$%s j %d $!\n", CO_ENV_MSGFD, cj->id); } else { #if !_lib_fork && defined(_map_spawnve) Sfio_t* tp; tp = sp; if (!(sp = sfstropen())) sp = tp; #endif flags |= CO_IGNORE; if (co->mode & CO_MODE_SEPARATE) { flags &= ~CO_SERIALIZE; og = '{'; cg = '}'; } else { og = '('; cg = ')'; } sfprintf(sp, "%c\n%s%sset -%s%s\n", og, env, n > CO_MAXEVAL ? "" : "eval '", (flags & CO_IGNORE) ? "" : "e", (flags & CO_SILENT) ? "" : "x"); if (n > CO_MAXEVAL) sfprintf(sp, "%s", action); else { coquote(sp, action, 0); sfprintf(sp, "\n'"); } sfprintf(sp, "\n%c out = pathtemp(NiL, 64, NiL, "coo", NiL))) sfprintf(sp, " >%s", cj->out); if (err) { if (out && streq(out, err)) sfprintf(sp, " 2>&1"); else if (*err == '/') sfprintf(sp, " 2%s%s", red, err); else sfprintf(sp, " 2%s%s/%s", red, state.pwd, err); } else if (flags & CO_SERIALIZE) { if (out) sfprintf(sp, " 2>&1"); else if (cj->err = pathtemp(NiL, 64, NiL, "coe", NiL)) sfprintf(sp, " 2>%s", cj->err); } if (!(co->mode & CO_MODE_SEPARATE)) { if (flags & CO_OSH) sfprintf(sp, " && echo x %d 0 >&$%s || echo x %d $? >&$%s", cj->id, CO_ENV_MSGFD, cj->id, CO_ENV_MSGFD); else sfprintf(sp, " && echo x %d 0 `times` >&$%s || echo x %d $? `times` >&$%s", cj->id, CO_ENV_MSGFD, cj->id, CO_ENV_MSGFD); } #if !_lib_fork && defined(_map_spawnve) if (sp != tp) { sfprintf(tp, "%s -c '", state.sh); if (!(s = costash(sp))) return 0; coquote(tp, s, 0); sfprintf(tp, "'"); sfstrclose(sp); sp = tp; } #endif if (!(co->mode & CO_MODE_SEPARATE)) sfprintf(sp, " &\necho j %d $! >&$%s\n", cj->id, CO_ENV_MSGFD); } n = sfstrtell(sp); if (!costash(sp)) return 0; if (flags & CO_SERVER) sfprintf(sp, "#%05d\n", n - 7); s = sfstrseek(sp, 0, SEEK_SET); if (flags & CO_DEBUG) errormsg(state.lib, ERROR_LIBRARY|2, "job %d commands:\n\n%s\n", cj->id, s); if (co->mode & CO_MODE_SEPARATE) { sh[0] = state.sh; sh[1] = "-c"; sh[2] = s; sh[3] = 0; cj->status = procrun(state.sh, sh, 0); sfstrclose(sp); cj->pid = CO_PID_ZOMBIE; cj->local = 0; co->outstanding++; co->total++; } else { /* * send it off */ while ((i = write(co->cmdfd, s, n)) > 0 && (n -= i) > 0) s += i; sfstrclose(sp); if (n) return 0; /* * it's a job */ cj->pid = 0; cj->status = 0; cj->local = 0; co->outstanding++; co->running++; co->total++; if (co->mode & CO_MODE_ACK) cj = cowait(co, cj, -1); } return cj; }