repmgr_sel.c   [plain text]


/*-
 * See the file LICENSE for redistribution information.
 *
 * Copyright (c) 2006,2008 Oracle.  All rights reserved.
 *
 * $Id: repmgr_sel.c,v 1.51 2008/04/30 02:33:34 alexg Exp $
 */

#include "db_config.h"

#define	__INCLUDE_NETWORKING	1
#include "db_int.h"

typedef int (*HEARTBEAT_ACTION) __P((ENV *));

static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
static int accept_v1_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
static int __repmgr_call_election __P((ENV *));
static int __repmgr_connect __P((ENV*, socket_t *, REPMGR_SITE *));
static int dispatch_msgin __P((ENV *, REPMGR_CONNECTION *));
static int find_version_info __P((ENV *, REPMGR_CONNECTION *, DBT *));
static int __repmgr_next_timeout __P((ENV *,
    db_timespec *, HEARTBEAT_ACTION *));
static int dispatch_phase_completion __P((ENV *, REPMGR_CONNECTION *));
static REPMGR_CONNECTION *__repmgr_master_connection __P((ENV *));
static int process_parameters __P((ENV *,
    REPMGR_CONNECTION *, char *, u_int, u_int32_t));
static int read_version_response __P((ENV *, REPMGR_CONNECTION *));
static int record_ack __P((ENV *, REPMGR_CONNECTION *));
static int __repmgr_retry_connections __P((ENV *));
static int send_handshake __P((ENV *, REPMGR_CONNECTION *, void *, size_t));
static int __repmgr_send_heartbeat __P((ENV *));
static int send_v1_handshake __P((ENV *,
    REPMGR_CONNECTION *, void *, size_t));
static int send_version_response __P((ENV *, REPMGR_CONNECTION *));
static int __repmgr_try_one __P((ENV *, u_int));

#define	ONLY_HANDSHAKE(env, conn) do {				     \
	if (conn->msg_type != REPMGR_HANDSHAKE) {		     \
		__db_errx(env, "unexpected msg type %d in state %d", \
		    (int)conn->msg_type, conn->state);		     \
		return (DB_REP_UNAVAIL);			     \
	}							     \
} while (0)

/*
 * PUBLIC: void *__repmgr_select_thread __P((void *));
 */
void *
__repmgr_select_thread(args)
	void *args;
{
	ENV *env = args;
	int ret;

	if ((ret = __repmgr_select_loop(env)) != 0) {
		__db_err(env, ret, "select loop failed");
		__repmgr_thread_failure(env, ret);
	}
	return (NULL);
}

/*
 * PUBLIC: int __repmgr_accept __P((ENV *));
 */
int
__repmgr_accept(env)
	ENV *env;
{
	DB_REP *db_rep;
	REPMGR_CONNECTION *conn;
	struct sockaddr_in siaddr;
	socklen_t addrlen;
	socket_t s;
	int ret;
#ifdef DB_WIN32
	WSAEVENT event_obj;
#endif

	db_rep = env->rep_handle;
	addrlen = sizeof(siaddr);
	if ((s = accept(db_rep->listen_fd, (struct sockaddr *)&siaddr,
	    &addrlen)) == -1) {
		/*
		 * Some errors are innocuous and so should be ignored.  MSDN
		 * Library documents the Windows ones; the Unix ones are
		 * advocated in Stevens' UNPv1, section 16.6; and Linux
		 * Application Development, p. 416.
		 */
		switch (ret = net_errno) {
#ifdef DB_WIN32
		case WSAECONNRESET:
		case WSAEWOULDBLOCK:
#else
		case EINTR:
		case EWOULDBLOCK:
		case ECONNABORTED:
		case ENETDOWN:
#ifdef EPROTO
		case EPROTO:
#endif
		case ENOPROTOOPT:
		case EHOSTDOWN:
#ifdef ENONET
		case ENONET:
#endif
		case EHOSTUNREACH:
		case EOPNOTSUPP:
		case ENETUNREACH:
#endif
			RPRINT(env, DB_VERB_REPMGR_MISC, (env,
			    "accept error %d considered innocuous", ret));
			return (0);
		default:
			__db_err(env, ret, "accept error");
			return (ret);
		}
	}
	RPRINT(env, DB_VERB_REPMGR_MISC, (env, "accepted a new connection"));

	if ((ret = __repmgr_set_nonblocking(s)) != 0) {
		__db_err(env, ret, "can't set nonblock after accept");
		(void)closesocket(s);
		return (ret);
	}

#ifdef DB_WIN32
	if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) {
		ret = net_errno;
		__db_err(env, ret, "can't create WSA event");
		(void)closesocket(s);
		return (ret);
	}
	if (WSAEventSelect(s, event_obj, FD_READ|FD_CLOSE) == SOCKET_ERROR) {
		ret = net_errno;
		__db_err(env, ret, "can't set desired event bits");
		(void)WSACloseEvent(event_obj);
		(void)closesocket(s);
		return (ret);
	}
#endif
	if ((ret =
	    __repmgr_new_connection(env, &conn, s, CONN_NEGOTIATE)) != 0) {
#ifdef DB_WIN32
		(void)WSACloseEvent(event_obj);
#endif
		(void)closesocket(s);
		return (ret);
	}
	F_SET(conn, CONN_INCOMING);
	conn->eid = -1;
#ifdef DB_WIN32
	conn->event_object = event_obj;
#endif
	return (0);
}

/*
 * Computes how long we should wait for input, in other words how long until we
 * have to wake up and do something.  Returns TRUE if timeout is set; FALSE if
 * there is nothing to wait for.
 *
 * Note that the resulting timeout could be zero; but it can't be negative.
 *
 * PUBLIC: int __repmgr_compute_timeout __P((ENV *, db_timespec *));
 */
int
__repmgr_compute_timeout(env, timeout)
	ENV *env;
	db_timespec *timeout;
{
	DB_REP *db_rep;
	REPMGR_RETRY *retry;
	db_timespec now, t;
	int have_timeout;

	db_rep = env->rep_handle;

	/*
	 * There are two factors to consider: are heartbeats in use?  and, do we
	 * have any sites with broken connections that we ought to retry?
	 */
	have_timeout = __repmgr_next_timeout(env, &t, NULL);

	/* List items are in order, so we only have to examine the first one. */
	if (!TAILQ_EMPTY(&db_rep->retries)) {
		retry = TAILQ_FIRST(&db_rep->retries);
		if (have_timeout) {
			/* Choose earliest timeout deadline. */
			t = timespeccmp(&retry->time, &t, <) ? retry->time : t;
		} else {
			t = retry->time;
			have_timeout = TRUE;
		}
	}

	if (have_timeout) {
		__os_gettime(env, &now, 1);
		if (timespeccmp(&now, &t, >=))
			timespecclear(timeout);
		else {
			*timeout = t;
			timespecsub(timeout, &now);
		}
	}

	return (have_timeout);
}

/*
 * Figures out the next heartbeat-related thing to be done, and when it should
 * be done.  The code is factored this way because this computation needs to be
 * done both before each select() call, and after (when we're checking for timer
 * expiration).
 */
static int
__repmgr_next_timeout(env, deadline, action)
	ENV *env;
	db_timespec *deadline;
	HEARTBEAT_ACTION *action;
{
	DB_REP *db_rep;
	HEARTBEAT_ACTION my_action;
	REPMGR_CONNECTION *conn;
	REPMGR_SITE *site;
	db_timespec t;

	db_rep = env->rep_handle;

	if (db_rep->master_eid == SELF_EID && db_rep->heartbeat_frequency > 0) {
		t = db_rep->last_bcast;
		TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->heartbeat_frequency);
		my_action = __repmgr_send_heartbeat;
	} else if ((conn = __repmgr_master_connection(env)) != NULL &&
	    db_rep->heartbeat_monitor_timeout > 0 &&
	    conn->version >= HEARTBEAT_MIN_VERSION) {
		/*
		 * If we have a working connection to a heartbeat-aware master,
		 * let's monitor it.  Otherwise there's really nothing we can
		 * do.
		 */
		site = SITE_FROM_EID(db_rep->master_eid);
		t = site->last_rcvd_timestamp;
		TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->heartbeat_monitor_timeout);
		my_action = __repmgr_call_election;
	} else
		return (FALSE);

	*deadline = t;
	if (action != NULL)
		*action = my_action;
	return (TRUE);
}

static int
__repmgr_send_heartbeat(env)
	ENV *env;
{
	DBT control, rec;
	u_int unused1, unused2;

	DB_INIT_DBT(control, NULL, 0);
	DB_INIT_DBT(rec, NULL, 0);
	return (__repmgr_send_broadcast(env,
	    REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2));
}

static REPMGR_CONNECTION *
__repmgr_master_connection(env)
	ENV *env;
{
	DB_REP *db_rep;
	REPMGR_CONNECTION *conn;
	REPMGR_SITE *master;

	db_rep = env->rep_handle;

	if (db_rep->master_eid == SELF_EID ||
	    !IS_VALID_EID(db_rep->master_eid))
		return (NULL);
	master = SITE_FROM_EID(db_rep->master_eid);
	if (master->state != SITE_CONNECTED)
		return (NULL);
	conn = master->ref.conn;
	if (IS_READY_STATE(conn->state))
		return (conn);
	return (NULL);
}

static int
__repmgr_call_election(env)
	ENV *env;
{
	REPMGR_CONNECTION *conn;

	conn = __repmgr_master_connection(env);
	DB_ASSERT(env, conn != NULL);
	RPRINT(env, DB_VERB_REPMGR_MISC,
	    (env, "heartbeat monitor timeout expired"));
	return (__repmgr_bust_connection(env, conn));
}

/*
 * PUBLIC: int __repmgr_check_timeouts __P((ENV *));
 *
 * !!!
 * Assumes caller holds the mutex.
 */
int
__repmgr_check_timeouts(env)
	ENV *env;
{
	db_timespec when, now;
	HEARTBEAT_ACTION action;
	int ret;

	/*
	 * Figure out the next heartbeat-related thing to be done.  Then, if
	 * it's time to do it, do so.
	 */
	if (__repmgr_next_timeout(env, &when, &action)) {
		__os_gettime(env, &now, 1);
		if (timespeccmp(&when, &now, <=) &&
		    (ret = (*action)(env)) != 0)
			return (ret);
	}

	return (__repmgr_retry_connections(env));
}

/*
 * Initiates connection attempts for any sites on the idle list whose retry
 * times have expired.
 */
static int
__repmgr_retry_connections(env)
	ENV *env;
{
	DB_REP *db_rep;
	REPMGR_RETRY *retry;
	db_timespec now;
	u_int eid;
	int ret;

	db_rep = env->rep_handle;
	__os_gettime(env, &now, 1);

	while (!TAILQ_EMPTY(&db_rep->retries)) {
		retry = TAILQ_FIRST(&db_rep->retries);
		if (timespeccmp(&retry->time, &now, >=))
			break;	/* since items are in time order */

		TAILQ_REMOVE(&db_rep->retries, retry, entries);

		eid = retry->eid;
		__os_free(env, retry);

		if ((ret = __repmgr_try_one(env, eid)) != 0)
			return (ret);
	}
	return (0);
}

/*
 * PUBLIC: int __repmgr_first_try_connections __P((ENV *));
 *
 * !!!
 * Assumes caller holds the mutex.
 */
int
__repmgr_first_try_connections(env)
	ENV *env;
{
	DB_REP *db_rep;
	u_int eid;
	int ret;

	db_rep = env->rep_handle;
	for (eid=0; eid<db_rep->site_cnt; eid++)
		if ((ret = __repmgr_try_one(env, eid)) != 0)
			return (ret);
	return (0);
}

/*
 * Makes a best-effort attempt to connect to the indicated site.  Returns a
 * non-zero error indication only for disastrous failures.  For re-tryable
 * errors, we will have scheduled another attempt, and that can be considered
 * success enough.
 */
static int
__repmgr_try_one(env, eid)
	ENV *env;
	u_int eid;
{
	ADDRINFO *list;
	DB_REP *db_rep;
	repmgr_netaddr_t *addr;
	int ret;

	db_rep = env->rep_handle;

	/*
	 * If have never yet successfully resolved this site's host name, try to
	 * do so now.
	 *
	 * Throughout all the rest of repmgr, we almost never do any sort of
	 * blocking operation in the select thread.  This is the sole exception
	 * to that rule.  Fortunately, it should rarely happen:
	 *
	 * - for a site that we only learned about because it connected to us:
	 *   not only were we not configured to know about it, but we also never
	 *   got a NEWSITE message about it.  And even then only if the
	 *   connection fails and we want to retry it from this end;
	 *
	 * - if the name look-up system (e.g., DNS) is not working (let's hope
	 *   it's temporary), or the host name is not found.
	 */
	addr = &SITE_FROM_EID(eid)->net_addr;
	if (ADDR_LIST_FIRST(addr) == NULL) {
		if ((ret = __repmgr_getaddr(
		    env, addr->host, addr->port, 0, &list)) == 0) {
			addr->address_list = list;
			(void)ADDR_LIST_FIRST(addr);
		} else if (ret == DB_REP_UNAVAIL)
			return (__repmgr_schedule_connection_attempt(
			    env, eid, FALSE));
		else
			return (ret);
	}

	/* Here, when we have a valid address. */
	return (__repmgr_connect_site(env, eid));
}

/*
 * Tries to establish a connection with the site indicated by the given eid,
 * starting with the "current" element of its address list and trying as many
 * addresses as necessary until the list is exhausted.
 *
 * PUBLIC: int __repmgr_connect_site __P((ENV *, u_int eid));
 */
int
__repmgr_connect_site(env, eid)
	ENV *env;
	u_int eid;
{
	DB_REP *db_rep;
	REPMGR_CONNECTION *con;
	REPMGR_SITE *site;
	socket_t s;
	int state;
	int ret;
#ifdef DB_WIN32
	long desired_event;
	WSAEVENT event_obj;
#endif

	db_rep = env->rep_handle;
	site = SITE_FROM_EID(eid);

	switch (ret = __repmgr_connect(env, &s, site)) {
	case 0:
		state = CONN_CONNECTED;
#ifdef DB_WIN32
		desired_event = FD_READ|FD_CLOSE;
#endif
		break;
	case INPROGRESS:
		state = CONN_CONNECTING;
#ifdef DB_WIN32
		desired_event = FD_CONNECT;
#endif
		break;
	default:
		STAT(db_rep->region->mstat.st_connect_fail++);
		return (
		    __repmgr_schedule_connection_attempt(env, eid, FALSE));
	}

#ifdef DB_WIN32
	if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) {
		ret = net_errno;
		__db_err(env, ret, "can't create WSA event");
		(void)closesocket(s);
		return (ret);
	}
	if (WSAEventSelect(s, event_obj, desired_event) == SOCKET_ERROR) {
		ret = net_errno;
		__db_err(env, ret, "can't set desired event bits");
		(void)WSACloseEvent(event_obj);
		(void)closesocket(s);
		return (ret);
	}
#endif

	if ((ret = __repmgr_new_connection(env, &con, s, state))
	    != 0) {
#ifdef DB_WIN32
		(void)WSACloseEvent(event_obj);
#endif
		(void)closesocket(s);
		return (ret);
	}
#ifdef DB_WIN32
	con->event_object = event_obj;
#endif

	con->eid = (int)eid;
	site->ref.conn = con;
	site->state = SITE_CONNECTED;

	if (state == CONN_CONNECTED) {
		switch (ret = __repmgr_propose_version(env, con)) {
		case 0:
			break;
		case DB_REP_UNAVAIL:
			return (__repmgr_bust_connection(env, con));
		default:
			return (ret);
		}
	}

	return (0);
}

static int
__repmgr_connect(env, socket_result, site)
	ENV *env;
	socket_t *socket_result;
	REPMGR_SITE *site;
{
	repmgr_netaddr_t *addr;
	ADDRINFO *ai;
	socket_t s;
	char *why;
	int ret;
	SITE_STRING_BUFFER buffer;

	/*
	 * Lint doesn't know about DB_ASSERT, so it can't tell that this
	 * loop will always get executed at least once, giving 'why' a value.
	 */
	COMPQUIET(why, "");
	addr = &site->net_addr;
	ai = ADDR_LIST_CURRENT(addr);
	DB_ASSERT(env, ai != NULL);
	for (; ai != NULL; ai = ADDR_LIST_NEXT(addr)) {

		if ((s = socket(ai->ai_family,
		    ai->ai_socktype, ai->ai_protocol)) == SOCKET_ERROR) {
			why = "can't create socket to connect";
			continue;
		}

		if ((ret = __repmgr_set_nonblocking(s)) != 0) {
			__db_err(env,
			    ret, "can't make nonblock socket to connect");
			(void)closesocket(s);
			return (ret);
		}

		if (connect(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0)
			ret = net_errno;

		if (ret == 0 || ret == INPROGRESS) {
			*socket_result = s;
			RPRINT(env, DB_VERB_REPMGR_MISC, (env,
			    "init connection to %s with result %d",
			    __repmgr_format_site_loc(site, buffer), ret));
			return (ret);
		}

		why = "connection failed";
		(void)closesocket(s);
	}

	/* We've exhausted all possible addresses. */
	ret = net_errno;
	__db_err(env, ret, "%s to %s", why,
	    __repmgr_format_site_loc(site, buffer));
	return (ret);
}

/*
 * Sends a proposal for version negotiation.
 *
 * PUBLIC: int __repmgr_propose_version __P((ENV *, REPMGR_CONNECTION *));
 */
int
__repmgr_propose_version(env, conn)
	ENV *env;
	REPMGR_CONNECTION *conn;
{
	DB_REP *db_rep;
	__repmgr_version_proposal_args versions;
	repmgr_netaddr_t *my_addr;
	size_t hostname_len, rec_length;
	u_int8_t *buf, *p;
	int ret;

	db_rep = env->rep_handle;
	my_addr = &db_rep->my_addr;

	/*
	 * In repmgr wire protocol version 1, a handshake message had a rec part
	 * that looked like this:
	 *
	 *  +-----------------+----+
	 *  |  host name ...  | \0 |
	 *  +-----------------+----+
	 *
	 * To ensure its own sanity, the old repmgr would write a NUL into the
	 * last byte of a received message, and then use normal C library string
	 * operations (e.g., * strlen, strcpy).
	 *
	 * Now, a version proposal has a rec part that looks like this:
	 *
	 *  +-----------------+----+------------------+------+
	 *  |  host name ...  | \0 |  extra info ...  |  \0  |
	 *  +-----------------+----+------------------+------+
	 *
	 * The "extra info" contains the version parameters, in marshaled form.
	 */

	hostname_len = strlen(my_addr->host);
	rec_length = hostname_len + 1 +
	    __REPMGR_VERSION_PROPOSAL_SIZE + 1;
	if ((ret = __os_malloc(env, rec_length, &buf)) != 0)
		goto out;
	p = buf;
	(void)strcpy((char*)p, my_addr->host);

	p += hostname_len + 1;
	versions.min = DB_REPMGR_MIN_VERSION;
	versions.max = DB_REPMGR_VERSION;
	__repmgr_version_proposal_marshal(env, &versions, p);

	ret = send_v1_handshake(env, conn, buf, rec_length);
	__os_free(env, buf);
out:
	return (ret);
}

static int
send_v1_handshake(env, conn, buf, len)
	ENV *env;
	REPMGR_CONNECTION *conn;
	void *buf;
	size_t len;
{
	DB_REP *db_rep;
	REP *rep;
	repmgr_netaddr_t *my_addr;
	DB_REPMGR_V1_HANDSHAKE buffer;
	DBT cntrl, rec;

	db_rep = env->rep_handle;
	rep = db_rep->region;
	my_addr = &db_rep->my_addr;

	buffer.version = 1;
	buffer.priority = htonl(rep->priority);
	buffer.port = my_addr->port;
	cntrl.data = &buffer;
	cntrl.size = sizeof(buffer);

	rec.data = buf;
	rec.size = (u_int32_t)len;

	/*
	 * It would of course be disastrous to block the select() thread, so
	 * pass the "blockable" argument as FALSE.  Fortunately blocking should
	 * never be necessary here, because the hand-shake is always the first
	 * thing we send.  Which is a good thing, because it would be almost as
	 * disastrous if we allowed ourselves to drop a handshake.
	 */
	return (__repmgr_send_one(env,
	    conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE));
}

/*
 * PUBLIC: int __repmgr_read_from_site __P((ENV *, REPMGR_CONNECTION *));
 *
 * !!!
 * Caller is assumed to hold repmgr->mutex, 'cuz we call queue_put() from here.
 */
int
__repmgr_read_from_site(env, conn)
	ENV *env;
	REPMGR_CONNECTION *conn;
{
	DB_REP *db_rep;
	REPMGR_SITE *site;
	SITE_STRING_BUFFER buffer;
	size_t nr;
	int ret;

	db_rep = env->rep_handle;
	/*
	 * Keep reading pieces as long as we're making some progress, or until
	 * we complete the current read phase.
	 */
	for (;;) {
		if ((ret = __repmgr_readv(conn->fd,
		    &conn->iovecs.vectors[conn->iovecs.offset],
		    conn->iovecs.count - conn->iovecs.offset, &nr)) != 0) {
			switch (ret) {
#ifndef DB_WIN32
			case EINTR:
				continue;
#endif
			case WOULDBLOCK:
				return (0);
			default:
				(void)__repmgr_format_eid_loc(env->rep_handle,
				    conn->eid, buffer);
				__db_err(env, ret,
				    "can't read from %s", buffer);
				STAT(env->rep_handle->
				    region->mstat.st_connection_drop++);
				return (DB_REP_UNAVAIL);
			}
		}

		if (nr > 0) {
			if (IS_VALID_EID(conn->eid)) {
				site = SITE_FROM_EID(conn->eid);
				__os_gettime(
				    env, &site->last_rcvd_timestamp, 1);
			}
			if (__repmgr_update_consumed(&conn->iovecs, nr))
				return (dispatch_phase_completion(env,
					    conn));
		} else {
			(void)__repmgr_format_eid_loc(env->rep_handle,
			    conn->eid, buffer);
			__db_errx(env, "EOF on connection from %s", buffer);
			STAT(env->rep_handle->
			    region->mstat.st_connection_drop++);
			return (DB_REP_UNAVAIL);
		}
	}
}

/*
 * Handles whatever needs to be done upon the completion of a reading phase on a
 * given connection.
 */
static int
dispatch_phase_completion(env, conn)
	ENV *env;
	REPMGR_CONNECTION *conn;
{
#define	MEM_ALIGN sizeof(double)
	DBT *dbt;
	u_int32_t control_size, rec_size;
	size_t memsize, control_offset, rec_offset;
	void *membase;
	int ret;

	switch (conn->reading_phase) {
	case SIZES_PHASE:
		/*
		 * We've received the header: a message type and the lengths of
		 * the two pieces of the message.  Set up buffers to read the
		 * two pieces.  This set-up is a bit different for a
		 * REPMGR_REP_MESSAGE, because we plan to pass it off to the msg
		 * threads.
		 */
		__repmgr_iovec_init(&conn->iovecs);
		control_size = ntohl(conn->control_size_buf);
		rec_size = ntohl(conn->rec_size_buf);

		if (conn->msg_type == REPMGR_REP_MESSAGE) {
			if (control_size == 0) {
				__db_errx(
				    env, "illegal size for rep msg");
				return (DB_REP_UNAVAIL);
			}
			/*
			 * Allocate a block of memory large enough to hold a
			 * DB_REPMGR_MESSAGE wrapper, plus the (one or) two DBT
			 * data areas that it points to.  Start by calculating
			 * the total memory needed, rounding up for the start of
			 * each DBT, to ensure possible alignment requirements.
			 */
			memsize = (size_t)
			    DB_ALIGN(sizeof(REPMGR_MESSAGE), MEM_ALIGN);
			control_offset = memsize;
			memsize += control_size;
			if (rec_size > 0) {
				memsize = (size_t)DB_ALIGN(memsize, MEM_ALIGN);
				rec_offset = memsize;
				memsize += rec_size;
			} else
				COMPQUIET(rec_offset, 0);
			if ((ret = __os_malloc(env, memsize, &membase)) != 0)
				return (ret);
			conn->input.rep_message = membase;

			conn->input.rep_message->originating_eid = conn->eid;
			DB_INIT_DBT(conn->input.rep_message->control,
			    (u_int8_t*)membase + control_offset, control_size);
			__repmgr_add_dbt(&conn->iovecs,
			    &conn->input.rep_message->control);

			if (rec_size > 0) {
				DB_INIT_DBT(conn->input.rep_message->rec,
				    (rec_size > 0 ?
					(u_int8_t*)membase + rec_offset : NULL),
				    rec_size);
				__repmgr_add_dbt(&conn->iovecs,
				    &conn->input.rep_message->rec);
			} else
				DB_INIT_DBT(conn->input.rep_message->rec,
				    NULL, 0);
		} else {
			conn->input.repmgr_msg.cntrl.size = control_size;
			conn->input.repmgr_msg.rec.size = rec_size;

			if (control_size > 0) {
				dbt = &conn->input.repmgr_msg.cntrl;
				if ((ret = __os_malloc(env, control_size,
				    &dbt->data)) != 0)
					return (ret);
				__repmgr_add_dbt(&conn->iovecs, dbt);
			}

			if (rec_size > 0) {
				dbt = &conn->input.repmgr_msg.rec;
				if ((ret = __os_malloc(env, rec_size,
				     &dbt->data)) != 0) {
					if (control_size > 0)
						__os_free(env,
						    conn->input.repmgr_msg.
						    cntrl.data);
					return (ret);
				}
				__repmgr_add_dbt(&conn->iovecs, dbt);
			}
		}

		conn->reading_phase = DATA_PHASE;

		if (control_size > 0 || rec_size > 0)
			break;

		/*
		 * However, if they're both 0, we're ready to complete
		 * DATA_PHASE.
		 */
		/* FALLTHROUGH */

	case DATA_PHASE:
		return (dispatch_msgin(env, conn));

	default:
		DB_ASSERT(env, FALSE);
	}

	return (0);
}

/*
 * Processes an incoming message, depending on our current state.
 */
static int
dispatch_msgin(env, conn)
	ENV *env;
	REPMGR_CONNECTION *conn;
{
	DBT *dbt;
	char *hostname;
	int given, ret;

	given = FALSE;

	switch (conn->state) {
	case CONN_CONNECTED:
		/*
		 * In this state, we know we're working with an outgoing
		 * connection.  We've sent a version proposal, and now expect
		 * the response (which could be a dumb old V1 handshake).
		 */
		ONLY_HANDSHAKE(env, conn);
		if ((ret = read_version_response(env, conn)) != 0)
			return (ret);
		break;

	case CONN_NEGOTIATE:
		/*
		 * Since we're in this state, we know we're working with an
		 * incoming connection, and this is the first message we've
		 * received.  So it must be a version negotiation proposal (or a
		 * legacy V1 handshake).  (We'll verify this of course.)
		 */
		ONLY_HANDSHAKE(env, conn);
		if ((ret = send_version_response(env, conn)) != 0)
			return (ret);
		break;

	case CONN_PARAMETERS:
		/*
		 * We've previously agreed on a (>1) version, and are now simply
		 * awaiting the other side's parameters handshake.
		 */
		ONLY_HANDSHAKE(env, conn);
		dbt = &conn->input.repmgr_msg.rec;
		hostname = dbt->data;
		hostname[dbt->size-1] = '\0';
		if ((ret = accept_handshake(env, conn, hostname)) != 0)
			return (ret);
		conn->state = CONN_READY;
		break;

	case CONN_READY:	/* FALLTHROUGH */
	case CONN_CONGESTED:
		/*
		 * We have a complete message, so process it.  Acks and
		 * handshakes get processed here, in line.  Regular rep messages
		 * get posted to a queue, to be handled by a thread from the
		 * message thread pool.
		 */
		switch (conn->msg_type) {
		case REPMGR_ACK:
			if ((ret = record_ack(env, conn)) != 0)
				return (ret);
			break;

		case REPMGR_HEARTBEAT:
			/*
			 * The underlying byte-receiving mechanism will already
			 * have noted the fact that we got some traffic on this
			 * connection.  And that's all we really have to do, so
			 * there's nothing more needed at this point.
			 */
			break;

		case REPMGR_REP_MESSAGE:
			if ((ret = __repmgr_queue_put(env,
			    conn->input.rep_message)) != 0)
				return (ret);
			/*
			 * The queue has taken over responsibility for the
			 * rep_message buffer, and will free it later.
			 */
			given = TRUE;
			break;

		default:
			__db_errx(env,
			    "unexpected msg type rcvd in ready state: %d",
			    (int)conn->msg_type);
			return (DB_REP_UNAVAIL);
		}
		break;

	case CONN_DEFUNCT:
		break;

	default:
		DB_ASSERT(env, FALSE);
	}

	if (!given) {
		dbt = &conn->input.repmgr_msg.cntrl;
		if (dbt->size > 0)
			__os_free(env, dbt->data);
		dbt = &conn->input.repmgr_msg.rec;
		if (dbt->size > 0)
			__os_free(env, dbt->data);
	}
	__repmgr_reset_for_reading(conn);
	return (0);
}

/*
 * Examine and verify the incoming version proposal message, and send an
 * appropriate response.
 */
static int
send_version_response(env, conn)
	ENV *env;
	REPMGR_CONNECTION *conn;
{
	DB_REP *db_rep;
	__repmgr_version_proposal_args versions;
	__repmgr_version_confirmation_args conf;
	repmgr_netaddr_t *my_addr;
	char *hostname;
	u_int8_t buf[__REPMGR_VERSION_CONFIRMATION_SIZE+1];
	DBT vi;
	int ret;

	db_rep = env->rep_handle;
	my_addr = &db_rep->my_addr;

	if ((ret = find_version_info(env, conn, &vi)) != 0)
		return (ret);
	if (vi.size == 0) {
		/* No version info, so we must be talking to a v1 site. */
		hostname = conn->input.repmgr_msg.rec.data;
		if ((ret = accept_v1_handshake(env, conn, hostname)) != 0)
			return (ret);
		if ((ret = send_v1_handshake(env,
		    conn, my_addr->host, strlen(my_addr->host) + 1)) != 0)
			return (ret);
		conn->state = CONN_READY;
	} else {
		if ((ret = __repmgr_version_proposal_unmarshal(env,
		    &versions, vi.data, vi.size, NULL)) != 0)
			return (DB_REP_UNAVAIL);

		/* For now version 2 is the only thing we know here. */
		DB_ASSERT(env, DB_REPMGR_VERSION == 2);

		if (DB_REPMGR_VERSION >= versions.min &&
		    DB_REPMGR_VERSION <= versions.max)
			conf.version = DB_REPMGR_VERSION;
		else if (versions.max >= DB_REPMGR_MIN_VERSION &&
		    versions.max <= DB_REPMGR_VERSION)
			conf.version = versions.max;
		else {
			/*
			 * User must have wired up a combination of versions
			 * exceeding what we said we'd support.
			 */
			__db_errx(env,
			    "No available version between %lu and %lu",
			    (u_long)versions.min, (u_long)versions.max);
			return (DB_REP_UNAVAIL);
		}
		conn->version = conf.version;

		__repmgr_version_confirmation_marshal(env, &conf, buf);
		if ((ret = send_handshake(env, conn, buf, sizeof(buf))) != 0)
			return (ret);

		conn->state = CONN_PARAMETERS;
	}
	return (ret);
}

static int
send_handshake(env, conn, opt, optlen)
	ENV *env;
	REPMGR_CONNECTION *conn;
	void *opt;
	size_t optlen;
{
	DB_REP *db_rep;
	REP *rep;
	DBT cntrl, rec;
	__repmgr_handshake_args hs;
	repmgr_netaddr_t *my_addr;
	size_t hostname_len, rec_len;
	void *buf;
	u_int8_t *p;
	u_int32_t cntrl_len;
	int ret;

	db_rep = env->rep_handle;
	rep = db_rep->region;
	my_addr = &db_rep->my_addr;

	/*
	 * The cntrl part has port and priority.  The rec part has the host
	 * name, followed by whatever optional extra data was passed to us.
	 */
	cntrl_len = __REPMGR_HANDSHAKE_SIZE;
	hostname_len = strlen(my_addr->host);
	rec_len = hostname_len + 1 +
	    (opt == NULL ? 0 : optlen);

	if ((ret = __os_malloc(env, cntrl_len + rec_len, &buf)) != 0)
		return (ret);

	cntrl.data = p = buf;
	hs.port = my_addr->port;
	hs.priority = rep->priority;
	__repmgr_handshake_marshal(env, &hs, p);
	cntrl.size = cntrl_len;

	p = rec.data = &p[cntrl_len];
	(void)strcpy((char*)p, my_addr->host);
	p += hostname_len + 1;
	if (opt != NULL) {
		memcpy(p, opt, optlen);
		p += optlen;
	}
	rec.size = (u_int32_t)(p - (u_int8_t*)rec.data);

	/* Never block on select thread: pass blockable as FALSE. */
	ret = __repmgr_send_one(env,
	    conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE);
	__os_free(env, buf);
	return (ret);
}

static int
read_version_response(env, conn)
	ENV *env;
	REPMGR_CONNECTION *conn;
{
	__repmgr_version_confirmation_args conf;
	DBT vi;
	char *hostname;
	int ret;

	if ((ret = find_version_info(env, conn, &vi)) != 0)
		return (ret);
	hostname = conn->input.repmgr_msg.rec.data;
	if (vi.size == 0) {
		if ((ret = accept_v1_handshake(env, conn, hostname)) != 0)
			return (ret);
	} else {
		if ((ret = __repmgr_version_confirmation_unmarshal(env,
		    &conf, vi.data, vi.size, NULL)) != 0)
			return (DB_REP_UNAVAIL);
		if (conf.version >= DB_REPMGR_MIN_VERSION &&
		    conf.version <= DB_REPMGR_VERSION)
			conn->version = conf.version;
		else {
			/*
			 * Remote site "confirmed" a version outside of the
			 * range we proposed.  It should never do that.
			 */
			__db_errx(env,
			    "Can't support confirmed version %lu",
			    (u_long)conf.version);
			return (DB_REP_UNAVAIL);
		}

		if ((ret = accept_handshake(env, conn, hostname)) != 0)
			return (ret);
		if ((ret = send_handshake(env, conn, NULL, 0)) != 0)
			return (ret);
	}
	conn->state = CONN_READY;
	return (ret);
}

/*
 * Examine the rec part of a handshake message to see if it has any version
 * information in it.  This is the magic that lets us allows version-aware sites
 * to exchange information, and yet avoids tripping up v1 sites, which don't
 * know how to look for it.
 */
static int
find_version_info(env, conn, vi)
	ENV *env;
	REPMGR_CONNECTION *conn;
	DBT *vi;
{
	DBT *dbt;
	char *hostname;
	size_t hostname_len;

	dbt = &conn->input.repmgr_msg.rec;
	if (dbt->size == 0) {
		__db_errx(env, "handshake is missing rec part");
		return (DB_REP_UNAVAIL);
	}
	hostname = dbt->data;
	hostname[dbt->size-1] = '\0';
	hostname_len = strlen(hostname);
	if (hostname_len + 1 == dbt->size) {
		/*
		 * The rec DBT held only the host name.  This is a simple legacy
		 * V1 handshake; it contains no version information.
		 */
		vi->size = 0;
	} else {
		/*
		 * There's more data than just the host name.  The remainder is
		 * available to be treated as a normal byte buffer (and read in
		 * by one of the unmarshal functions).  Note that the remaining
		 * length should not include the padding byte that we have
		 * already clobbered.
		 */
		vi->data = &((u_int8_t *)dbt->data)[hostname_len + 1];
		vi->size = (dbt->size - (hostname_len+1)) - 1;
	}
	return (0);
}

static int
accept_handshake(env, conn, hostname)
	ENV *env;
	REPMGR_CONNECTION *conn;
	char *hostname;
{
	__repmgr_handshake_args hs;

	/* Extract port and priority from cntrl. */
	if (__repmgr_handshake_unmarshal(env, &hs,
	    conn->input.repmgr_msg.cntrl.data,
	    conn->input.repmgr_msg.cntrl.size, NULL) != 0)
		return (DB_REP_UNAVAIL);

	return (process_parameters(env, conn, hostname, hs.port, hs.priority));
}

static int
accept_v1_handshake(env, conn, hostname)
	ENV *env;
	REPMGR_CONNECTION *conn;
	char *hostname;
{
	DB_REPMGR_V1_HANDSHAKE *handshake;
	u_int32_t prio;

	handshake = conn->input.repmgr_msg.cntrl.data;
	if (conn->input.repmgr_msg.cntrl.size != sizeof(*handshake) ||
	    handshake->version != 1) {
		__db_errx(env, "malformed V1 handshake");
		return (DB_REP_UNAVAIL);
	}

	conn->version = 1;
	prio = ntohl(handshake->priority);
	return (process_parameters(env, conn, hostname, handshake->port, prio));
}

static int
process_parameters(env, conn, host, port, priority)
	ENV *env;
	REPMGR_CONNECTION *conn;
	char *host;
	u_int port;
	u_int32_t priority;
{
	DB_REP *db_rep;
	REPMGR_RETRY *retry;
	REPMGR_SITE *site;
	repmgr_netaddr_t addr;
	int ret, eid;

	db_rep = env->rep_handle;

	if (F_ISSET(conn, CONN_INCOMING)) {
		/*
		 * Now that we've been given the host and port, use them to find
		 * the site (or create a new one if necessary, etc.).
		 */
		if (IS_VALID_EID(eid = __repmgr_find_site(env, host, port))) {
			site = SITE_FROM_EID(eid);
			if (site->state == SITE_IDLE) {
				RPRINT(env, DB_VERB_REPMGR_MISC, (env,
				    "handshake from idle site %s:%u",
				     host, port));
				retry = site->ref.retry;
				TAILQ_REMOVE(&db_rep->retries, retry, entries);
				__os_free(env, retry);
			} else {
				/*
				 * We got an incoming connection for a site we
				 * were already connected to; at least we
				 * thought we were.
				 */
				RPRINT(env, DB_VERB_REPMGR_MISC, (env,
				    "connection from %s:%u supersedes existing",
				     host, port));

				/*
				 * No need to schedule a retry for later, since
				 * we now have a replacement connection.
				 */
				DISABLE_CONNECTION(site->ref.conn);
			}
			conn->eid = eid;
			site->state = SITE_CONNECTED;
			site->ref.conn = conn;
		} else {
			RPRINT(env, DB_VERB_REPMGR_MISC, (env,
			    "handshake introduces unknown site %s:%u",
			    host, port));
			if ((ret = __repmgr_pack_netaddr(env,
			    host, port, NULL, &addr)) != 0)
				return (ret);
			if ((ret = __repmgr_new_site(env,
			    &site, &addr, SITE_CONNECTED)) != 0) {
				__repmgr_cleanup_netaddr(env, &addr);
				return (ret);
			}
			conn->eid = EID_FROM_SITE(site);
			site->ref.conn = conn;
		}
	} else {
		/*
		 * Since we initiated this as an outgoing connection, we
		 * obviously already know the host, port and site.  We just need
		 * the other site's priority.
		 */
		DB_ASSERT(env, IS_VALID_EID(conn->eid));
		site = SITE_FROM_EID(conn->eid);
		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
		    "handshake from connection to %s:%lu",
		    site->net_addr.host, (u_long)site->net_addr.port));
	}

	site->priority = priority;
	F_SET(site, SITE_HAS_PRIO);

	/*
	 * If we're moping around wishing we knew who the master was, then
	 * getting in touch with another site might finally provide sufficient
	 * connectivity to find out.  But just do this once, because otherwise
	 * we get messages while the subsequent rep_start operations are going
	 * on, and rep tosses them in that case.
	 */
	if (db_rep->master_eid == DB_EID_INVALID && !db_rep->done_one) {
		db_rep->done_one = TRUE;
		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
		    "handshake with no known master to wake election thread"));
		if ((ret = __repmgr_init_election(env, ELECT_REPSTART)) != 0)
			return (ret);
	}

	return (0);
}

static int
record_ack(env, conn)
	ENV *env;
	REPMGR_CONNECTION *conn;
{
	DB_REP *db_rep;
	REPMGR_SITE *site;
	__repmgr_ack_args *ackp, ack;
	SITE_STRING_BUFFER location;
	int ret;

	db_rep = env->rep_handle;

	DB_ASSERT(env, conn->version > 0 &&
	    IS_READY_STATE(conn->state) && IS_VALID_EID(conn->eid));
	site = SITE_FROM_EID(conn->eid);

	/*
	 * Extract the LSN.  Save it only if it is an improvement over what the
	 * site has already ack'ed.
	 */
	if (conn->version == 1) {
		ackp = conn->input.repmgr_msg.cntrl.data;
		if (conn->input.repmgr_msg.cntrl.size != sizeof(ack) ||
		    conn->input.repmgr_msg.rec.size != 0) {
			__db_errx(env, "bad ack msg size");
			return (DB_REP_UNAVAIL);
		}
	} else {
		ackp = &ack;
		if ((ret = __repmgr_ack_unmarshal(env, ackp,
			 conn->input.repmgr_msg.cntrl.data,
			 conn->input.repmgr_msg.cntrl.size, NULL)) != 0)
			return (DB_REP_UNAVAIL);
	}

	/* Ignore stale acks. */
	if (ackp->generation < db_rep->generation) {
		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
		    "ignoring stale ack (%lu<%lu), from %s",
		     (u_long)ackp->generation, (u_long)db_rep->generation,
		     __repmgr_format_site_loc(site, location)));
		return (0);
	}
	RPRINT(env, DB_VERB_REPMGR_MISC, (env,
	    "got ack [%lu][%lu](%lu) from %s", (u_long)ackp->lsn.file,
	    (u_long)ackp->lsn.offset, (u_long)ackp->generation,
	    __repmgr_format_site_loc(site, location)));

	if (ackp->generation == db_rep->generation &&
	    log_compare(&ackp->lsn, &site->max_ack) == 1) {
		memcpy(&site->max_ack, &ackp->lsn, sizeof(DB_LSN));
		if ((ret = __repmgr_wake_waiting_senders(env)) != 0)
			return (ret);
	}
	return (0);
}

/*
 * PUBLIC: int __repmgr_write_some __P((ENV *, REPMGR_CONNECTION *));
 */
int
__repmgr_write_some(env, conn)
	ENV *env;
	REPMGR_CONNECTION *conn;
{
	QUEUED_OUTPUT *output;
	REPMGR_FLAT *msg;
	int bytes, ret;

	while (!STAILQ_EMPTY(&conn->outbound_queue)) {
		output = STAILQ_FIRST(&conn->outbound_queue);
		msg = output->msg;
		if ((bytes = send(conn->fd, &msg->data[output->offset],
		    (size_t)msg->length - output->offset, 0)) == SOCKET_ERROR) {
			if ((ret = net_errno) == WOULDBLOCK)
				return (0);
			else {
				__db_err(env, ret, "writing data");
				STAT(env->rep_handle->
				    region->mstat.st_connection_drop++);
				return (DB_REP_UNAVAIL);
			}
		}

		if ((output->offset += (size_t)bytes) >= msg->length) {
			STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
			__os_free(env, output);
			conn->out_queue_length--;
			if (--msg->ref_count <= 0)
				__os_free(env, msg);

			/*
			 * We've achieved enough movement to free up at least
			 * one space in the outgoing queue.  Wake any message
			 * threads that may be waiting for space.  Leave
			 * CONGESTED state so that when the queue reaches the
			 * high-water mark again, the filling thread will be
			 * allowed to try waiting again.
			 */
			conn->state = CONN_READY;
			if (conn->blockers > 0 &&
			    (ret = __repmgr_signal(&conn->drained)) != 0)
				return (ret);
		}
	}

#ifdef DB_WIN32
	/*
	 * With the queue now empty, it's time to relinquish ownership of this
	 * connection again, so that the next call to send() can write the
	 * message in line, instead of posting it to the queue for us.
	 */
	if (WSAEventSelect(conn->fd, conn->event_object, FD_READ|FD_CLOSE)
	    == SOCKET_ERROR) {
		ret = net_errno;
		__db_err(env, ret, "can't remove FD_WRITE event bit");
		return (ret);
	}
#endif

	return (0);
}