repmgr_msg.c   [plain text]


/*-
 * See the file LICENSE for redistribution information.
 *
 * Copyright (c) 2005,2008 Oracle.  All rights reserved.
 *
 * $Id: repmgr_msg.c,v 1.42 2008/01/08 20:58:48 bostic Exp $
 */

#include "db_config.h"

#define	__INCLUDE_NETWORKING	1
#include "db_int.h"

static int message_loop __P((ENV *));
static int process_message __P((ENV*, DBT*, DBT*, int));
static int handle_newsite __P((ENV *, const DBT *));
static int ack_message __P((ENV *, u_int32_t, DB_LSN *));

/*
 * PUBLIC: void *__repmgr_msg_thread __P((void *));
 */
void *
__repmgr_msg_thread(args)
	void *args;
{
	ENV *env = args;
	int ret;

	if ((ret = message_loop(env)) != 0) {
		__db_err(env, ret, "message thread failed");
		__repmgr_thread_failure(env, ret);
	}
	return (NULL);
}

static int
message_loop(env)
	ENV *env;
{
	REPMGR_MESSAGE *msg;
	int ret;

	while ((ret = __repmgr_queue_get(env, &msg)) == 0) {
		while ((ret = process_message(env, &msg->control, &msg->rec,
		    msg->originating_eid)) == DB_LOCK_DEADLOCK)
			RPRINT(env, DB_VERB_REPMGR_MISC,
			    (env, "repmgr deadlock retry"));

		__os_free(env, msg);
		if (ret != 0)
			return (ret);
	}

	return (ret == DB_REP_UNAVAIL ? 0 : ret);
}

static int
process_message(env, control, rec, eid)
	ENV *env;
	DBT *control, *rec;
	int eid;
{
	DB_LSN permlsn;
	DB_REP *db_rep;
	REP *rep;
	int ret;
	u_int32_t generation;

	db_rep = env->rep_handle;

	/*
	 * Save initial generation number, in case it changes in a close race
	 * with a NEWMASTER.  See msgdir.10000/10039/msg00086.html.
	 */
	generation = db_rep->generation;

	switch (ret =
	    __rep_process_message(env->dbenv, control, rec, eid, &permlsn)) {
	case 0:
		if (db_rep->takeover_pending) {
			db_rep->takeover_pending = FALSE;
			return (__repmgr_become_master(env));
		}
		break;

	case DB_REP_NEWSITE:
		return (handle_newsite(env, rec));

	case DB_REP_HOLDELECTION:
		LOCK_MUTEX(db_rep->mutex);
		ret = __repmgr_init_election(env, ELECT_ELECTION);
		UNLOCK_MUTEX(db_rep->mutex);
		if (ret != 0)
			return (ret);
		break;

	case DB_REP_DUPMASTER:
		if ((ret = __repmgr_repstart(env, DB_REP_CLIENT)) != 0)
			return (ret);
		LOCK_MUTEX(db_rep->mutex);
		ret = __repmgr_init_election(env, ELECT_ELECTION);
		UNLOCK_MUTEX(db_rep->mutex);
		if (ret != 0)
			return (ret);
		break;

	case DB_REP_ISPERM:
		/*
		 * Don't bother sending ack if master doesn't care about it.
		 */
		rep = db_rep->region;
		if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE ||
		    (IS_PEER_POLICY(db_rep->perm_policy) &&
		    rep->priority == 0))
			break;

		if ((ret = ack_message(env, generation, &permlsn)) != 0)
			return (ret);

		break;

	case DB_REP_NOTPERM: /* FALLTHROUGH */
	case DB_REP_IGNORE: /* FALLTHROUGH */
	case DB_LOCK_DEADLOCK:
		break;

	default:
		__db_err(env, ret, "DB_ENV->rep_process_message");
		return (ret);
	}
	return (0);
}

/*
 * Handle replication-related events.  Returns only 0 or DB_EVENT_NOT_HANDLED;
 * no other error returns are tolerated.
 *
 * PUBLIC: int __repmgr_handle_event __P((ENV *, u_int32_t, void *));
 */
int
__repmgr_handle_event(env, event, info)
	ENV *env;
	u_int32_t event;
	void *info;
{
	DB_REP *db_rep;

	db_rep = env->rep_handle;

	if (db_rep->selector == NULL) {
		/* Repmgr is not in use, so all events go to application. */
		return (DB_EVENT_NOT_HANDLED);
	}

	switch (event) {
	case DB_EVENT_REP_ELECTED:
		DB_ASSERT(env, info == NULL);

		db_rep->found_master = TRUE;
		db_rep->takeover_pending = TRUE;

		/*
		 * The application doesn't really need to see this, because the
		 * purpose of this event is to tell the winning site that it
		 * should call rep_start(MASTER), and in repmgr we do that
		 * automatically.  Still, they could conceivably be curious, and
		 * it doesn't hurt anything to let them know.
		 */
		break;
	case DB_EVENT_REP_NEWMASTER:
		DB_ASSERT(env, info != NULL);

		db_rep->found_master = TRUE;
		db_rep->master_eid = *(int *)info;
		__repmgr_stash_generation(env);

		/* Application still needs to see this. */
		break;
	default:
		break;
	}
	return (DB_EVENT_NOT_HANDLED);
}

/*
 * Acknowledges a message.
 */
static int
ack_message(env, generation, lsn)
	ENV *env;
	u_int32_t generation;
	DB_LSN *lsn;
{
	DBT control2, rec2;
	DB_REP *db_rep;
	__repmgr_ack_args ack;
	u_int8_t buf[__REPMGR_ACK_SIZE];
	REPMGR_CONNECTION *conn;
	REPMGR_SITE *site;
	int ret;

	db_rep = env->rep_handle;
	/*
	 * Regardless of where a message came from, all ack's go to the master
	 * site.  If we're not in touch with the master, we drop it, since
	 * there's not much else we can do.
	 */
	if (!IS_VALID_EID(db_rep->master_eid) ||
	    db_rep->master_eid == SELF_EID) {
		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
		    "dropping ack with master %d", db_rep->master_eid));
		return (0);
	}

	ret = 0;
	LOCK_MUTEX(db_rep->mutex);
	site = SITE_FROM_EID(db_rep->master_eid);
	if (site->state == SITE_CONNECTED &&
	    site->ref.conn->state == CONN_READY) {
		conn = site->ref.conn;
		DB_ASSERT(env, conn->version > 0);
		ack.generation = generation;
		memcpy(&ack.lsn, lsn, sizeof(DB_LSN));
		if (conn->version == 1) {
			control2.data = &ack;
			control2.size = sizeof(ack);
		} else {
			__repmgr_ack_marshal(env, &ack, buf);
			control2.data = buf;
			control2.size = __REPMGR_ACK_SIZE;
		}
		rec2.size = 0;
		/*
		 * It's hard to imagine anyone would care about a lost ack if
		 * the path to the master is so congested as to need blocking;
		 * so pass "blockable" argument as FALSE.
		 */
		if ((ret = __repmgr_send_one(env, conn, REPMGR_ACK,
		    &control2, &rec2, FALSE)) == DB_REP_UNAVAIL)
			ret = __repmgr_bust_connection(env, conn);
	}

	UNLOCK_MUTEX(db_rep->mutex);
	return (ret);
}

/*
 * Does everything necessary to handle the processing of a NEWSITE return.
 */
static int
handle_newsite(env, rec)
	ENV *env;
	const DBT *rec;
{
	ADDRINFO *ai;
	DB_REP *db_rep;
	REPMGR_SITE *site;
	SITE_STRING_BUFFER buffer;
	repmgr_netaddr_t *addr;
	size_t hlen;
	u_int16_t port;
	int ret;
	char *host;

	db_rep = env->rep_handle;
	/*
	 * Check if we got sent connect information and if we did, if
	 * this is me or if we already have a connection to this new
	 * site.  If we don't, establish a new one.
	 *
	 * Unmarshall the cdata: a 2-byte port number, in network byte order,
	 * followed by the host name string, which should already be
	 * null-terminated, but let's make sure.
	 */
	if (rec->size < sizeof(port) + 1) {
		__db_errx(env, "unexpected cdata size, msg ignored");
		return (0);
	}
	memcpy(&port, rec->data, sizeof(port));
	port = ntohs(port);

	host = (char*)((u_int8_t*)rec->data + sizeof(port));
	hlen = (rec->size - sizeof(port)) - 1;
	host[hlen] = '\0';

	/* It's me, do nothing. */
	if (strcmp(host, db_rep->my_addr.host) == 0 &&
	    port == db_rep->my_addr.port) {
		RPRINT(env, DB_VERB_REPMGR_MISC,
		    (env, "repmgr ignores own NEWSITE info"));
		return (0);
	}

	LOCK_MUTEX(db_rep->mutex);
	if ((ret = __repmgr_add_site(env, host, port, &site)) == EEXIST) {
		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
		    "NEWSITE info from %s was already known",
		    __repmgr_format_site_loc(site, buffer)));
		/*
		 * In case we already know about this site only because it
		 * first connected to us, we may not yet have had a chance to
		 * look up its addresses.  Even though we don't need them just
		 * now, this is an advantageous opportunity to get them since we
		 * can do so away from the critical select thread.  Give up only
		 * for a disastrous failure.
		 */
		addr = &site->net_addr;
		if (addr->address_list == NULL) {
			if ((ret = __repmgr_getaddr(env,
			    addr->host, addr->port, 0, &ai)) == 0)
				addr->address_list = ai;
			else if (ret != DB_REP_UNAVAIL)
				goto unlock;
		}

		ret = 0;
		if (site->state == SITE_CONNECTED)
			goto unlock; /* Nothing to do. */
	} else {
		if (ret != 0)
			goto unlock;
		RPRINT(env, DB_VERB_REPMGR_MISC,
		    (env, "NEWSITE info added %s",
		    __repmgr_format_site_loc(site, buffer)));
	}

	/*
	 * Wake up the main thread to connect to the new or reawakened
	 * site.
	 */
	ret = __repmgr_wake_main_thread(env);

unlock: UNLOCK_MUTEX(db_rep->mutex);
	return (ret);
}

/*
 * PUBLIC: void __repmgr_stash_generation __P((ENV *));
 */
void
__repmgr_stash_generation(env)
	ENV *env;
{
	DB_REP *db_rep;
	REP *rep;

	db_rep = env->rep_handle;
	rep = db_rep->region;

	db_rep->generation = rep->gen;
}