coexec.c   [plain text]


/***********************************************************************
*                                                                      *
*               This software is part of the ast package               *
*          Copyright (c) 1990-2011 AT&T Intellectual Property          *
*                      and is licensed under the                       *
*                  Common Public License, Version 1.0                  *
*                    by AT&T Intellectual Property                     *
*                                                                      *
*                A copy of the License is available at                 *
*            http://www.opensource.org/licenses/cpl1.0.txt             *
*         (with md5 checksum 059e8cd6165cb4c31e351f2b69388fd9)         *
*                                                                      *
*              Information and Software Systems Research               *
*                            AT&T Research                             *
*                           Florham Park NJ                            *
*                                                                      *
*                 Glenn Fowler <gsf@research.att.com>                  *
*                                                                      *
***********************************************************************/
#pragma prototyped
/*
 * Glenn Fowler
 * AT&T Research
 *
 * send an action to the coshell for execution
 */

#include "colib.h"

#include <proc.h>
#include <ls.h>

static Cojob_t*
service(register Coshell_t* co, Coservice_t* cs, Cojob_t* cj, int flags, Sfio_t* sp)
{
	Proc_t*		proc;
	size_t		n;
	int		i;
	int		j;
	int		fds[2];
	long		ops[4];
	char*		s;
	char**		a;

	if (flags & CO_DEBUG)
	{
		for (a = cs->argv; *a; a++)
			sfprintf(sp, " %s", *a);
		if (!(s = costash(sp)))
			goto nospace;
		errormsg(state.lib, ERROR_LIBRARY|2, "service %s:%s", cs->path, s);
	}
	if (pipe(fds) < 0)
	{
		errormsg(state.lib, ERROR_LIBRARY|ERROR_SYSTEM|2, "%s: cannot allocate service pipe", cs->name);
		return 0;
	}
	if (co->flags & CO_SHELL)
		for (i = 0; i < elementsof(fds); i++)
			if (fds[i] < 10 && (j = fcntl(fds[i], F_DUPFD, 10)) >= 0)
			{
				close(fds[i]);
				fds[i] = j;
			}
	cs->fd = fds[1];
	ops[0] = PROC_FD_DUP(fds[0], 0, PROC_FD_PARENT);
	ops[1] = PROC_FD_CLOSE(fds[1], PROC_FD_CHILD);
	ops[2] = PROC_FD_DUP(co->gsmfd, 1, 0);
	ops[3] = 0;
	if (!(proc = procopen(cs->path, cs->argv, NiL, ops, PROC_DAEMON|PROC_IGNORE)))
	{
		errormsg(state.lib, ERROR_LIBRARY|ERROR_SYSTEM|2, "%s: cannot connect to %s service", cs->path, cs->name);
		close(fds[0]);	
		close(fds[1]);	
		return 0;
	}
	fcntl(cs->fd, F_SETFD, FD_CLOEXEC);
	cs->pid = proc->pid;
	procfree(proc);
	sfprintf(sp, "id=%d info\n", cj->id);
	n = sfstrtell(sp);
	if (!(s = costash(sp)))
		goto bad;
	if (write(cs->fd, s, n) != n || sfpoll(&co->msgfp, 1, 5 * 1000) <= 0)
		goto bad;
	cj->pid = 0;
	cj->status = 0;
	cj->local = 0;
	cj->service = cs;
	co->svc_outstanding++;
	co->svc_running++;
	if (!cowait(co, cj, -1))
		goto bad;
	return cj;
 bad:
	errormsg(state.lib, ERROR_LIBRARY|ERROR_SYSTEM|2, "%s: service not responding", cs->name);
 nospace:
	cj->pid = CO_PID_FREE;
	cs->pid = 0;
	close(cs->fd);
	cs->fd = -1;
	return 0;
}

static Cojob_t*
request(register Coshell_t* co, Cojob_t* cj, Coservice_t* cs, const char* action, int flags)
{
	ssize_t		n;
	ssize_t		i;
	Sfio_t*		sp;

	if (!(sp = sfstropen()))
	{
		errormsg(state.lib, ERROR_LIBRARY|2, "out of space");
		return 0;
	}
	if (!cs->fd && !service(co, cs, cj, flags, sp))
		goto bad;
	if (!cs->pid)
		goto bad;
	if (flags & CO_DEBUG)
		errormsg(state.lib, ERROR_LIBRARY|2, "job %d commands:\n\n%s %s\n", cj->id, cs->name, action);
	if (!(flags & CO_SILENT))
		sfprintf(sfstderr, "+ %s %s\n", cs->name, action);
	sfprintf(sp, "id=%d %s\n", cj->id, action);
	n = sfstrtell(sp);
	action = sfstrbase(sp);
	while ((i = write(cs->fd, action, n)) > 0 && (n -= i) > 0)
		action += i;
	sfstrclose(sp);
	if (n)
		goto bad;
	sfclose(sp);
	cj->pid = 0;
	cj->status = 0;
	cj->local = 0;
	cj->service = cs;
	co->svc_outstanding++;
	co->svc_running++;
	co->total++;
	return cj;
 bad:
	cj->pid = CO_PID_FREE;
	sfclose(sp);
	return 0;
}

Cojob_t*
coexec(register Coshell_t* co, const char* action, int flags, const char* out, const char* err, const char* att)
{
	register Cojob_t*	cj;
	register Sfio_t*	sp;
	register Coservice_t*	cs;
	int			n;
	int			i;
	int			og;
	int			cg;
	char*			s;
	char*			t;
	char*			env;
	char*			red;
	char*			sh[4];
	struct stat		sto;
	struct stat		ste;

	/*
	 * get a free job slot
	 */

	for (cj = co->jobs; cj; cj = cj->next)
		if (cj->pid == CO_PID_FREE)
			break;
	if (cj)
		cj->service = 0;
	else if (!(cj = vmnewof(co->vm, 0, Cojob_t, 1, 0)))
		return 0;
	else
	{
		cj->coshell = co;
		cj->pid = CO_PID_FREE;
		cj->id = ++co->slots;
		cj->next = co->jobs;
		co->jobs = cj;
	}

	/*
	 * set the flags
	 */

	flags &= ~co->mask;
	flags |= co->flags;
	cj->flags = flags;

	/*
	 * check service intercepts
	 */

	for (cs = co->service; cs; cs = cs->next)
	{
		for (s = cs->name, t = (char*)action; *s && *s == *t; s++, t++);
		if (!*s && *t == ' ')
			return request(co, cj, cs, t + 1, flags);
	}
	cj->flags &= ~CO_SERVICE;
	red = (cj->flags & CO_APPEND) ? ">>" : ">";

	/*
	 * package the action
	 */

	if (!(env = coinitialize(co, co->flags)))
		return 0;
	if (!(sp = sfstropen()))
		return 0;
	n = strlen(action);
	if (co->flags & CO_SERVER)
	{
		/*
		 * leave it to server
		 */

		sfprintf(sp, "#%05d\ne %d %d %s %s %s",
			0,
			cj->id,
			cj->flags,
			state.pwd,
			out,
			err);
		if (att)
			sfprintf(sp, " (%d:%s)", strlen(att), att);
		else
			sfprintf(sp, " %s", att);
		sfprintf(sp, " (%d:%s) (%d:%s)\n", strlen(env), env, n, action);
	}
	else if (co->flags & CO_INIT)
	{
		if (flags & CO_DEBUG)
			sfprintf(sp, "set -x\n");
		sfprintf(sp, "%s%s\necho x %d $? >&$%s\n",
			env,
			action,
			cj->id,
			CO_ENV_MSGFD);
	}
	else if (flags & CO_KSH)
	{
#if !_lib_fork && defined(_map_spawnve)
		Sfio_t*	tp;

		tp = sp;
		if (!(sp = sfstropen()))
			sp = tp;
#endif
		sfprintf(sp, "{\ntrap 'set %s$?; trap \"\" 0; IFS=\"\n\"; print -u$%s x %d $1 $(times); exit $1' 0 HUP INT QUIT TERM%s\n%s%s%s",
			(flags & CO_SILENT) ? "" : "+x ",
			CO_ENV_MSGFD,
			cj->id,
			(flags & CO_IGNORE) ? "" : " ERR",
			env,
			n > CO_MAXEVAL ? "" : "eval '",
			(flags & CO_SILENT) ? "" : "set -x\n");
		if (n > CO_MAXEVAL)
			sfputr(sp, action, -1);
		else
		{
			coquote(sp, action, 0);
			sfprintf(sp, "\n'");
		}
		sfprintf(sp, "\n} </dev/null");
		if (out)
		{
			if (*out == '/')
				sfprintf(sp, " %s%s", red, out);
			else
				sfprintf(sp, " %s%s/%s", red, state.pwd, out);
		}
		else if ((flags & CO_SERIALIZE) && (cj->out = pathtemp(NiL, 64, NiL, "coo", NiL)))
			sfprintf(sp, " >%s", cj->out);
		if (err)
		{
			if (out && streq(out, err))
				sfprintf(sp, " 2>&1");
			else if (*err == '/')
				sfprintf(sp, " 2%s%s", red, err);
			else
				sfprintf(sp, " 2%s%s/%s", red, state.pwd, err);
		}
		else if (flags & CO_SERIALIZE)
		{
			if (!out && !fstat(1, &sto) && !fstat(2, &ste) && sto.st_dev == ste.st_dev && sto.st_ino == ste.st_ino)
				sfprintf(sp, " 2>&1");
			else if (cj->err = pathtemp(NiL, 64, NiL, "coe", NiL))
				sfprintf(sp, " 2>%s", cj->err);
		}
#if !_lib_fork && defined(_map_spawnve)
		if (sp != tp)
		{
			sfprintf(tp, "%s -c '", state.sh);
			if (!(s = costash(sp)))
				return 0;
			coquote(tp, s, 0);
			sfprintf(tp, "'");
			sfstrclose(sp);
			sp = tp;
		}
#endif
		sfprintf(sp, " &\nprint -u$%s j %d $!\n",
			CO_ENV_MSGFD,
			cj->id);
	}
	else
	{
#if !_lib_fork && defined(_map_spawnve)
		Sfio_t*	tp;

		tp = sp;
		if (!(sp = sfstropen())) sp = tp;
#endif
		flags |= CO_IGNORE;
		if (co->mode & CO_MODE_SEPARATE)
		{
			flags &= ~CO_SERIALIZE;
			og = '{';
			cg = '}';
		}
		else
		{
			og = '(';
			cg = ')';
		}
		sfprintf(sp, "%c\n%s%sset -%s%s\n",
			og,
			env,
			n > CO_MAXEVAL ? "" : "eval '",
			(flags & CO_IGNORE) ? "" : "e",
			(flags & CO_SILENT) ? "" : "x");
		if (n > CO_MAXEVAL)
			sfprintf(sp, "%s", action);
		else
		{
			coquote(sp, action, 0);
			sfprintf(sp, "\n'");
		}
		sfprintf(sp, "\n%c </dev/null", cg);
		if (out)
		{
			if (*out == '/')
				sfprintf(sp, " %s%s", red, out);
			else
				sfprintf(sp, " %s%s/%s", red, state.pwd, out);
		}
		else if ((flags & CO_SERIALIZE) && (cj->out = pathtemp(NiL, 64, NiL, "coo", NiL)))
			sfprintf(sp, " >%s", cj->out);
		if (err)
		{
			if (out && streq(out, err))
				sfprintf(sp, " 2>&1");
			else if (*err == '/')
				sfprintf(sp, " 2%s%s", red, err);
			else
				sfprintf(sp, " 2%s%s/%s", red, state.pwd, err);
		}
		else if (flags & CO_SERIALIZE)
		{
			if (out)
				sfprintf(sp, " 2>&1");
			else if (cj->err = pathtemp(NiL, 64, NiL, "coe", NiL))
				sfprintf(sp, " 2>%s", cj->err);
		}
		if (!(co->mode & CO_MODE_SEPARATE))
		{
			if (flags & CO_OSH)
				sfprintf(sp, " && echo x %d 0 >&$%s || echo x %d $? >&$%s",
					cj->id,
					CO_ENV_MSGFD,
					cj->id,
					CO_ENV_MSGFD);
			else
				sfprintf(sp, " && echo x %d 0 `times` >&$%s || echo x %d $? `times` >&$%s",
					cj->id,
					CO_ENV_MSGFD,
					cj->id,
					CO_ENV_MSGFD);
		}
#if !_lib_fork && defined(_map_spawnve)
		if (sp != tp)
		{
			sfprintf(tp, "%s -c '", state.sh);
			if (!(s = costash(sp)))
				return 0;
			coquote(tp, s, 0);
			sfprintf(tp, "'");
			sfstrclose(sp);
			sp = tp;
		}
#endif
		if (!(co->mode & CO_MODE_SEPARATE))
			sfprintf(sp, " &\necho j %d $! >&$%s\n",
				cj->id,
				CO_ENV_MSGFD);
	}
	n = sfstrtell(sp);
	if (!costash(sp))
		return 0;
	if (flags & CO_SERVER)
		sfprintf(sp, "#%05d\n", n - 7);
	s = sfstrseek(sp, 0, SEEK_SET);
	if (flags & CO_DEBUG)
		errormsg(state.lib, ERROR_LIBRARY|2, "job %d commands:\n\n%s\n", cj->id, s);
	if (co->mode & CO_MODE_SEPARATE)
	{
		sh[0] = state.sh;
		sh[1] = "-c";
		sh[2] = s;
		sh[3] = 0;
		cj->status = procrun(state.sh, sh, 0);
		sfstrclose(sp);
		cj->pid = CO_PID_ZOMBIE;
		cj->local = 0;
		co->outstanding++;
		co->total++;
	}
	else
	{
		/*
		 * send it off
		 */

		while ((i = write(co->cmdfd, s, n)) > 0 && (n -= i) > 0)
			s += i;
		sfstrclose(sp);
		if (n)
			return 0;

		/*
		 * it's a job
		 */

		cj->pid = 0;
		cj->status = 0;
		cj->local = 0;
		co->outstanding++;
		co->running++;
		co->total++;
		if (co->mode & CO_MODE_ACK)
			cj = cowait(co, cj, -1);
	}
	return cj;
}