repmgr_posix.c   [plain text]


/*-
 * See the file LICENSE for redistribution information.
 *
 * Copyright (c) 2005,2008 Oracle.  All rights reserved.
 *
 * $Id: repmgr_posix.c,v 1.37 2008/03/13 17:31:28 mbrey Exp $
 */

#include "db_config.h"

#define	__INCLUDE_NETWORKING	1
#define	__INCLUDE_SELECT_H	1
#include "db_int.h"

/*
 * A very rough guess at the maximum stack space one of our threads could ever
 * need, which we hope is plenty conservative.  This can be patched in the field
 * if necessary.
 */
#ifdef _POSIX_THREAD_ATTR_STACKSIZE
size_t __repmgr_guesstimated_max = (128 * 1024);
#endif

static int __repmgr_conn_work __P((ENV *,
    REPMGR_CONNECTION *, fd_set *, fd_set *, int));
static int finish_connecting __P((ENV *, REPMGR_CONNECTION *));

/*
 * Starts the thread described in the argument, and stores the resulting thread
 * ID therein.
 *
 * PUBLIC: int __repmgr_thread_start __P((ENV *, REPMGR_RUNNABLE *));
 */
int
__repmgr_thread_start(env, runnable)
	ENV *env;
	REPMGR_RUNNABLE *runnable;
{
	pthread_attr_t *attrp;
#ifdef _POSIX_THREAD_ATTR_STACKSIZE
	pthread_attr_t attributes;
	size_t size;
	int ret;
#endif

	runnable->finished = FALSE;

#ifdef _POSIX_THREAD_ATTR_STACKSIZE
	attrp = &attributes;
	if ((ret = pthread_attr_init(&attributes)) != 0) {
		__db_err(env,
		    ret, "pthread_attr_init in repmgr_thread_start");
		return (ret);
	}

	/*
	 * On a 64-bit machine it seems reasonable that we could need twice as
	 * much stack space as we did on a 32-bit machine.
	 */
	size = __repmgr_guesstimated_max;
	if (sizeof(size_t) > 4)
		size *= 2;
#ifdef PTHREAD_STACK_MIN
	if (size < PTHREAD_STACK_MIN)
		size = PTHREAD_STACK_MIN;
#endif
	if ((ret = pthread_attr_setstacksize(&attributes, size)) != 0) {
		__db_err(env,
		    ret, "pthread_attr_setstacksize in repmgr_thread_start");
		return (ret);
	}
#else
	attrp = NULL;
#endif

	return (pthread_create(&runnable->thread_id, attrp,
		    runnable->run, env));
}

/*
 * PUBLIC: int __repmgr_thread_join __P((REPMGR_RUNNABLE *));
 */
int
__repmgr_thread_join(thread)
	REPMGR_RUNNABLE *thread;
{
	return (pthread_join(thread->thread_id, NULL));
}

/*
 * PUBLIC: int __repmgr_set_nonblocking __P((socket_t));
 */
int
__repmgr_set_nonblocking(fd)
	socket_t fd;
{
	int flags;

	if ((flags = fcntl(fd, F_GETFL, 0)) < 0)
		return (errno);
	if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
		return (errno);
	return (0);
}

/*
 * PUBLIC: int __repmgr_wake_waiting_senders __P((ENV *));
 *
 * Wake any send()-ing threads waiting for an acknowledgement.
 *
 * !!!
 * Caller must hold the db_rep->mutex, if this thread synchronization is to work
 * properly.
 */
int
__repmgr_wake_waiting_senders(env)
	ENV *env;
{
	return (pthread_cond_broadcast(&env->rep_handle->ack_condition));
}

/*
 * PUBLIC: int __repmgr_await_ack __P((ENV *, const DB_LSN *));
 *
 * Waits (a limited time) for configured number of remote sites to ack the given
 * LSN.
 *
 * !!!
 * Caller must hold repmgr->mutex.
 */
int
__repmgr_await_ack(env, lsnp)
	ENV *env;
	const DB_LSN *lsnp;
{
	DB_REP *db_rep;
	struct timespec deadline;
	int ret, timed;

	db_rep = env->rep_handle;

	if ((timed = (db_rep->ack_timeout > 0)))
		__repmgr_compute_wait_deadline(env, &deadline,
		    db_rep->ack_timeout);
	else
		COMPQUIET(deadline.tv_sec, 0);

	while (!__repmgr_is_permanent(env, lsnp)) {
		if (timed)
			ret = pthread_cond_timedwait(&db_rep->ack_condition,
			    &db_rep->mutex, &deadline);
		else
			ret = pthread_cond_wait(&db_rep->ack_condition,
			    &db_rep->mutex);
		if (db_rep->finished)
			return (DB_REP_UNAVAIL);
		if (ret != 0)
			return (ret);
	}
	return (0);
}

/*
 * __repmgr_compute_wait_deadline --
 *	Computes a deadline time a certain distance into the future.
 *
 * PUBLIC: void __repmgr_compute_wait_deadline __P((ENV*,
 * PUBLIC:    struct timespec *, db_timeout_t));
 */
void
__repmgr_compute_wait_deadline(env, result, wait)
	ENV *env;
	struct timespec *result;
	db_timeout_t wait;
{
	/*
	 * The result is suitable for the pthread_cond_timewait call.  (That
	 * call uses nano-second resolution; elsewhere we use microseconds.)
	 *
	 * Start with "now"; then add the "wait" offset.
	 *
	 * A db_timespec is the same as a "struct timespec" so we can pass
	 * result directly to the underlying Berkeley DB OS routine.
	 *
	 * !!!
	 * We use the system clock for the pthread_cond_timedwait call, but
	 * that's not optimal on systems with monotonic timers.   Instead,
	 * we should call pthread_condattr_setclock on systems where it and
	 * monotonic timers are available, and then configure both this call
	 * and the subsequent pthread_cond_timewait call to use a monotonic
	 * timer.
	 */
	__os_gettime(env, (db_timespec *)result, 0);
	TIMESPEC_ADD_DB_TIMEOUT(result, wait);
}

/*
 * PUBLIC: int __repmgr_await_drain __P((ENV *,
 * PUBLIC:    REPMGR_CONNECTION *, db_timeout_t));
 *
 * Waits for space to become available on the connection's output queue.
 * Various ways we can exit:
 *
 * 1. queue becomes non-full
 * 2. exceed time limit
 * 3. connection becomes defunct (due to error in another thread)
 * 4. repmgr is shutting down
 * 5. any unexpected system resource failure
 *
 * In cases #3 and #5 we return an error code.  Caller is responsible for
 * distinguishing the remaining cases if desired.
 *
 * !!!
 * Caller must hold repmgr->mutex.
 */
int
__repmgr_await_drain(env, conn, timeout)
	ENV *env;
	REPMGR_CONNECTION *conn;
	db_timeout_t timeout;
{
	DB_REP *db_rep;
	struct timespec deadline;
	int ret;

	db_rep = env->rep_handle;

	__repmgr_compute_wait_deadline(env, &deadline, timeout);

	ret = 0;
	while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
		ret = pthread_cond_timedwait(&conn->drained,
		    &db_rep->mutex, &deadline);
		switch (ret) {
		case 0:
			if (db_rep->finished)
				goto out; /* #4. */
			/*
			 * Another thread could have stumbled into an error on
			 * the socket while we were waiting.
			 */
			if (conn->state == CONN_DEFUNCT) {
				ret = DB_REP_UNAVAIL; /* #3. */
				goto out;
			}
			break;
		case ETIMEDOUT:
			conn->state = CONN_CONGESTED;
			ret = 0;
			goto out; /* #2. */
		default:
			goto out; /* #5. */
		}
	}
	/* #1. */

out:
	return (ret);
}

/*
 * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *));
 *
 * Initialize a condition variable (in allocated space).
 */
int
__repmgr_alloc_cond(c)
	cond_var_t *c;
{
	return (pthread_cond_init(c, NULL));
}

/*
 * PUBLIC: int __repmgr_free_cond __P((cond_var_t *));
 *
 * Clean up a previously initialized condition variable.
 */
int
__repmgr_free_cond(c)
	cond_var_t *c;
{
	return (pthread_cond_destroy(c));
}

/*
 * PUBLIC: int __repmgr_init_sync __P((ENV *, DB_REP *));
 *
 * Allocate/initialize all data necessary for thread synchronization.  This
 * should be an all-or-nothing affair.  Other than here and in _close_sync there
 * should never be a time when these resources aren't either all allocated or
 * all freed.  If that's true, then we can safely use the values of the file
 * descriptor(s) to keep track of which it is.
 */
int
__repmgr_init_sync(env, db_rep)
	ENV *env;
	DB_REP *db_rep;
{
	int ret, mutex_inited, ack_inited, elect_inited, queue_inited,
	    file_desc[2];

	COMPQUIET(env, NULL);

	mutex_inited = ack_inited = elect_inited = queue_inited = FALSE;

	if ((ret = pthread_mutex_init(&db_rep->mutex, NULL)) != 0)
		goto err;
	mutex_inited = TRUE;

	if ((ret = pthread_cond_init(&db_rep->ack_condition, NULL)) != 0)
		goto err;
	ack_inited = TRUE;

	if ((ret = pthread_cond_init(&db_rep->check_election, NULL)) != 0)
		goto err;
	elect_inited = TRUE;

	if ((ret = pthread_cond_init(&db_rep->queue_nonempty, NULL)) != 0)
		goto err;
	queue_inited = TRUE;

	if ((ret = pipe(file_desc)) == -1) {
		ret = errno;
		goto err;
	}

	db_rep->read_pipe = file_desc[0];
	db_rep->write_pipe = file_desc[1];
	return (0);
err:
	if (queue_inited)
		(void)pthread_cond_destroy(&db_rep->queue_nonempty);
	if (elect_inited)
		(void)pthread_cond_destroy(&db_rep->check_election);
	if (ack_inited)
		(void)pthread_cond_destroy(&db_rep->ack_condition);
	if (mutex_inited)
		(void)pthread_mutex_destroy(&db_rep->mutex);
	db_rep->read_pipe = db_rep->write_pipe = -1;

	return (ret);
}

/*
 * PUBLIC: int __repmgr_close_sync __P((ENV *));
 *
 * Frees the thread synchronization data within a repmgr struct, in a
 * platform-specific way.
 */
int
__repmgr_close_sync(env)
	ENV *env;
{
	DB_REP *db_rep;
	int ret, t_ret;

	db_rep = env->rep_handle;

	if (!(REPMGR_SYNC_INITED(db_rep)))
		return (0);

	ret = pthread_cond_destroy(&db_rep->queue_nonempty);

	if ((t_ret = pthread_cond_destroy(&db_rep->check_election)) != 0 &&
	    ret == 0)
		ret = t_ret;

	if ((t_ret = pthread_cond_destroy(&db_rep->ack_condition)) != 0 &&
	    ret == 0)
		ret = t_ret;

	if ((t_ret = pthread_mutex_destroy(&db_rep->mutex)) != 0 &&
	    ret == 0)
		ret = t_ret;

	if (close(db_rep->read_pipe) == -1 && ret == 0)
		ret = errno;
	if (close(db_rep->write_pipe) == -1 && ret == 0)
		ret = errno;

	db_rep->read_pipe = db_rep->write_pipe = -1;
	return (ret);
}

/*
 * Performs net-related resource initialization other than memory initialization
 * and allocation.  A valid db_rep->listen_fd acts as the "all-or-nothing"
 * sentinel signifying that these resources are allocated.
 *
 * PUBLIC: int __repmgr_net_init __P((ENV *, DB_REP *));
 */
int
__repmgr_net_init(env, db_rep)
	ENV *env;
	DB_REP *db_rep;
{
	int ret;
	struct sigaction sigact;

	if ((ret = __repmgr_listen(env)) != 0)
		return (ret);

	/*
	 * Make sure we're not ignoring SIGPIPE, 'cuz otherwise we'd be killed
	 * just for trying to write onto a socket that had been reset.
	 */
	if (sigaction(SIGPIPE, NULL, &sigact) == -1) {
		ret = errno;
		__db_err(env, ret, "can't access signal handler");
		goto err;
	}
	/*
	 * If we need to change the sig handler, do so, and also set a flag so
	 * that we remember we did.
	 */
	if ((db_rep->chg_sig_handler = (sigact.sa_handler == SIG_DFL))) {
		sigact.sa_handler = SIG_IGN;
		sigact.sa_flags = 0;
		if (sigaction(SIGPIPE, &sigact, NULL) == -1) {
			ret = errno;
			__db_err(env, ret, "can't access signal handler");
			goto err;
		}
	}
	return (0);

err:
	(void)closesocket(db_rep->listen_fd);
	db_rep->listen_fd = INVALID_SOCKET;
	return (ret);
}

/*
 * PUBLIC: int __repmgr_lock_mutex __P((mgr_mutex_t *));
 */
int
__repmgr_lock_mutex(mutex)
	mgr_mutex_t  *mutex;
{
	return (pthread_mutex_lock(mutex));
}

/*
 * PUBLIC: int __repmgr_unlock_mutex __P((mgr_mutex_t *));
 */
int
__repmgr_unlock_mutex(mutex)
	mgr_mutex_t  *mutex;
{
	return (pthread_mutex_unlock(mutex));
}

/*
 * Signals a condition variable.
 *
 * !!!
 * Caller must hold mutex.
 *
 * PUBLIC: int __repmgr_signal __P((cond_var_t *));
 */
int
__repmgr_signal(v)
	cond_var_t *v;
{
	return (pthread_cond_broadcast(v));
}

/*
 * PUBLIC: int __repmgr_wake_main_thread __P((ENV*));
 */
int
__repmgr_wake_main_thread(env)
	ENV *env;
{
	DB_REP *db_rep;
	u_int8_t any_value;

	COMPQUIET(any_value, 0);
	db_rep = env->rep_handle;

	/*
	 * It doesn't matter what byte value we write.  Just the appearance of a
	 * byte in the stream is enough to wake up the select() thread reading
	 * the pipe.
	 */
	if (write(db_rep->write_pipe, &any_value, 1) == -1)
		return (errno);
	return (0);
}

/*
 * PUBLIC: int __repmgr_writev __P((socket_t, db_iovec_t *, int, size_t *));
 */
int
__repmgr_writev(fd, iovec, buf_count, byte_count_p)
	socket_t fd;
	db_iovec_t *iovec;
	int buf_count;
	size_t *byte_count_p;
{
	int nw;

	if ((nw = writev(fd, iovec, buf_count)) == -1)
		return (errno);
	*byte_count_p = (size_t)nw;
	return (0);
}

/*
 * PUBLIC: int __repmgr_readv __P((socket_t, db_iovec_t *, int, size_t *));
 */
int
__repmgr_readv(fd, iovec, buf_count, byte_count_p)
	socket_t fd;
	db_iovec_t *iovec;
	int buf_count;
	size_t *byte_count_p;
{
	ssize_t nw;

	if ((nw = readv(fd, iovec, buf_count)) == -1)
		return (errno);
	*byte_count_p = (size_t)nw;
	return (0);
}

/*
 * PUBLIC: int __repmgr_select_loop __P((ENV *));
 */
int
__repmgr_select_loop(env)
	ENV *env;
{
	struct timeval select_timeout, *select_timeout_p;
	DB_REP *db_rep;
	REPMGR_CONNECTION *conn, *next;
	db_timespec timeout;
	fd_set reads, writes;
	int ret, flow_control, maxfd;
	u_int8_t buf[10];	/* arbitrary size */

	flow_control = FALSE;

	db_rep = env->rep_handle;
	/*
	 * Almost this entire thread operates while holding the mutex.  But note
	 * that it never blocks, except in the call to select() (which is the
	 * one place we relinquish the mutex).
	 */
	LOCK_MUTEX(db_rep->mutex);
	if ((ret = __repmgr_first_try_connections(env)) != 0)
		goto out;
	for (;;) {
		FD_ZERO(&reads);
		FD_ZERO(&writes);

		/*
		 * Always ask for input on listening socket and signalling
		 * pipe.
		 */
		FD_SET((u_int)db_rep->listen_fd, &reads);
		maxfd = db_rep->listen_fd;

		FD_SET((u_int)db_rep->read_pipe, &reads);
		if (db_rep->read_pipe > maxfd)
			maxfd = db_rep->read_pipe;

		/*
		 * Examine all connections to see what sort of I/O to ask for on
		 * each one.  Clean up defunct connections; note that this is
		 * the only place where elements get deleted from this list.
		 *
		 * The TAILQ_FOREACH macro would be suitable here, except that
		 * it doesn't allow unlinking the current element., which is
		 * needed for cleanup_connection.
		 */
		for (conn = TAILQ_FIRST(&db_rep->connections);
		     conn != NULL;
		     conn = next) {
			next = TAILQ_NEXT(conn, entries);

			if (conn->state == CONN_DEFUNCT) {
				if ((ret = __repmgr_cleanup_connection(env,
				    conn)) != 0)
					goto out;
				continue;
			}

			if (conn->state == CONN_CONNECTING) {
				FD_SET((u_int)conn->fd, &reads);
				FD_SET((u_int)conn->fd, &writes);
				if (conn->fd > maxfd)
					maxfd = conn->fd;
				continue;
			}

			if (!STAILQ_EMPTY(&conn->outbound_queue)) {
				FD_SET((u_int)conn->fd, &writes);
				if (conn->fd > maxfd)
					maxfd = conn->fd;
			}
			/*
			 * If we haven't yet gotten site's handshake, then read
			 * from it even if we're flow-controlling.
			 */
			if (!flow_control || !IS_VALID_EID(conn->eid)) {
				FD_SET((u_int)conn->fd, &reads);
				if (conn->fd > maxfd)
					maxfd = conn->fd;
			}
		}

		if (__repmgr_compute_timeout(env, &timeout)) {
			/* Convert the timespec to a timeval. */
			select_timeout.tv_sec = timeout.tv_sec;
			select_timeout.tv_usec = timeout.tv_nsec / NS_PER_US;
			select_timeout_p = &select_timeout;
		} else {
			/* No time-based events, so wait only for I/O. */
			select_timeout_p = NULL;
		}

		UNLOCK_MUTEX(db_rep->mutex);

		if ((ret = select(maxfd + 1,
		    &reads, &writes, NULL, select_timeout_p)) == -1) {
			switch (ret = errno) {
			case EINTR:
			case EWOULDBLOCK:
				LOCK_MUTEX(db_rep->mutex);
				continue; /* simply retry */
			default:
				__db_err(env, ret, "select");
				return (ret);
			}
		}
		LOCK_MUTEX(db_rep->mutex);

		/*
		 * Timer expiration events include retrying of lost connections.
		 * Obviously elements can be added to the connection list there.
		 */
		if ((ret = __repmgr_check_timeouts(env)) != 0)
			goto out;

		/*
		 * Examine each connection, to see what work needs to be done.
		 * Except for one obscure case in finish_connecting, no
		 * structural change to the connections list happens here.
		 */
		TAILQ_FOREACH(conn, &db_rep->connections, entries) {
			if (conn->state == CONN_DEFUNCT)
				continue;

			if ((ret = __repmgr_conn_work(env,
			    conn, &reads, &writes, flow_control)) != 0)
				goto out;
		}

		/*
		 * Read any bytes in the signalling pipe.  Note that we don't
		 * actually need to do anything with them; they're just there to
		 * wake us up when necessary.
		 */
		if (FD_ISSET((u_int)db_rep->read_pipe, &reads)) {
			if (read(db_rep->read_pipe, buf, sizeof(buf)) <= 0) {
				ret = errno;
				goto out;
			} else if (db_rep->finished) {
				ret = 0;
				goto out;
			}
		}
		/*
		 * Obviously elements can be added to the connection list here.
		 */
		if (FD_ISSET((u_int)db_rep->listen_fd, &reads) &&
		    (ret = __repmgr_accept(env)) != 0)
			goto out;
	}
out:
	UNLOCK_MUTEX(db_rep->mutex);
	return (ret);
}

static int
__repmgr_conn_work(env, conn, reads, writes, flow_control)
	ENV *env;
	REPMGR_CONNECTION *conn;
	fd_set *reads, *writes;
	int flow_control;
{
	int ret;
	u_int fd;

	ret = 0;
	fd = (u_int)conn->fd;

	if (conn->state == CONN_CONNECTING) {
		if (FD_ISSET(fd, reads) || FD_ISSET(fd, writes))
			ret = finish_connecting(env, conn);
	} else {
		/*
		 * Here, the site is connected, and the FD_SET's are valid.
		 */
		if (FD_ISSET(fd, writes))
			ret = __repmgr_write_some(env, conn);

		if (ret == 0 && !flow_control && FD_ISSET(fd, reads))
			ret = __repmgr_read_from_site(env, conn);
	}

	if (ret == DB_REP_UNAVAIL)
		ret = __repmgr_bust_connection(env, conn);
	return (ret);
}

static int
finish_connecting(env, conn)
	ENV *env;
	REPMGR_CONNECTION *conn;
{
	DB_REP *db_rep;
	REPMGR_SITE *site;
	socklen_t len;
	SITE_STRING_BUFFER buffer;
	u_int eid;
	int error, ret;

	len = sizeof(error);
	if (getsockopt(
	    conn->fd, SOL_SOCKET, SO_ERROR, (sockopt_t)&error, &len) < 0)
		goto err_rpt;
	if (error) {
		errno = error;
		goto err_rpt;
	}

	conn->state = CONN_CONNECTED;
	return (__repmgr_propose_version(env, conn));

err_rpt:
	db_rep = env->rep_handle;

	DB_ASSERT(env, IS_VALID_EID(conn->eid));
	eid = (u_int)conn->eid;

	site = SITE_FROM_EID(eid);
	__db_err(env, errno,
	    "connecting to %s", __repmgr_format_site_loc(site, buffer));

	/* If we've exhausted the list of possible addresses, give up. */
	if (ADDR_LIST_NEXT(&site->net_addr) == NULL) {
		STAT(db_rep->region->mstat.st_connect_fail++);
		return (DB_REP_UNAVAIL);
	}

	/*
	 * Since we're immediately trying the next address in the list, simply
	 * disable the failed connection, without the usual recovery.
	 */
	DISABLE_CONNECTION(conn);

	ret = __repmgr_connect_site(env, eid);
	DB_ASSERT(env, ret != DB_REP_UNAVAIL);
	return (ret);
}