/*- * See the file LICENSE for redistribution information. * * Copyright (c) 2005,2008 Oracle. All rights reserved. * * $Id: repmgr_msg.c,v 1.42 2008/01/08 20:58:48 bostic Exp $ */ #include "db_config.h" #define __INCLUDE_NETWORKING 1 #include "db_int.h" static int message_loop __P((ENV *)); static int process_message __P((ENV*, DBT*, DBT*, int)); static int handle_newsite __P((ENV *, const DBT *)); static int ack_message __P((ENV *, u_int32_t, DB_LSN *)); /* * PUBLIC: void *__repmgr_msg_thread __P((void *)); */ void * __repmgr_msg_thread(args) void *args; { ENV *env = args; int ret; if ((ret = message_loop(env)) != 0) { __db_err(env, ret, "message thread failed"); __repmgr_thread_failure(env, ret); } return (NULL); } static int message_loop(env) ENV *env; { REPMGR_MESSAGE *msg; int ret; while ((ret = __repmgr_queue_get(env, &msg)) == 0) { while ((ret = process_message(env, &msg->control, &msg->rec, msg->originating_eid)) == DB_LOCK_DEADLOCK) RPRINT(env, DB_VERB_REPMGR_MISC, (env, "repmgr deadlock retry")); __os_free(env, msg); if (ret != 0) return (ret); } return (ret == DB_REP_UNAVAIL ? 0 : ret); } static int process_message(env, control, rec, eid) ENV *env; DBT *control, *rec; int eid; { DB_LSN permlsn; DB_REP *db_rep; REP *rep; int ret; u_int32_t generation; db_rep = env->rep_handle; /* * Save initial generation number, in case it changes in a close race * with a NEWMASTER. See msgdir.10000/10039/msg00086.html. */ generation = db_rep->generation; switch (ret = __rep_process_message(env->dbenv, control, rec, eid, &permlsn)) { case 0: if (db_rep->takeover_pending) { db_rep->takeover_pending = FALSE; return (__repmgr_become_master(env)); } break; case DB_REP_NEWSITE: return (handle_newsite(env, rec)); case DB_REP_HOLDELECTION: LOCK_MUTEX(db_rep->mutex); ret = __repmgr_init_election(env, ELECT_ELECTION); UNLOCK_MUTEX(db_rep->mutex); if (ret != 0) return (ret); break; case DB_REP_DUPMASTER: if ((ret = __repmgr_repstart(env, DB_REP_CLIENT)) != 0) return (ret); LOCK_MUTEX(db_rep->mutex); ret = __repmgr_init_election(env, ELECT_ELECTION); UNLOCK_MUTEX(db_rep->mutex); if (ret != 0) return (ret); break; case DB_REP_ISPERM: /* * Don't bother sending ack if master doesn't care about it. */ rep = db_rep->region; if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE || (IS_PEER_POLICY(db_rep->perm_policy) && rep->priority == 0)) break; if ((ret = ack_message(env, generation, &permlsn)) != 0) return (ret); break; case DB_REP_NOTPERM: /* FALLTHROUGH */ case DB_REP_IGNORE: /* FALLTHROUGH */ case DB_LOCK_DEADLOCK: break; default: __db_err(env, ret, "DB_ENV->rep_process_message"); return (ret); } return (0); } /* * Handle replication-related events. Returns only 0 or DB_EVENT_NOT_HANDLED; * no other error returns are tolerated. * * PUBLIC: int __repmgr_handle_event __P((ENV *, u_int32_t, void *)); */ int __repmgr_handle_event(env, event, info) ENV *env; u_int32_t event; void *info; { DB_REP *db_rep; db_rep = env->rep_handle; if (db_rep->selector == NULL) { /* Repmgr is not in use, so all events go to application. */ return (DB_EVENT_NOT_HANDLED); } switch (event) { case DB_EVENT_REP_ELECTED: DB_ASSERT(env, info == NULL); db_rep->found_master = TRUE; db_rep->takeover_pending = TRUE; /* * The application doesn't really need to see this, because the * purpose of this event is to tell the winning site that it * should call rep_start(MASTER), and in repmgr we do that * automatically. Still, they could conceivably be curious, and * it doesn't hurt anything to let them know. */ break; case DB_EVENT_REP_NEWMASTER: DB_ASSERT(env, info != NULL); db_rep->found_master = TRUE; db_rep->master_eid = *(int *)info; __repmgr_stash_generation(env); /* Application still needs to see this. */ break; default: break; } return (DB_EVENT_NOT_HANDLED); } /* * Acknowledges a message. */ static int ack_message(env, generation, lsn) ENV *env; u_int32_t generation; DB_LSN *lsn; { DBT control2, rec2; DB_REP *db_rep; __repmgr_ack_args ack; u_int8_t buf[__REPMGR_ACK_SIZE]; REPMGR_CONNECTION *conn; REPMGR_SITE *site; int ret; db_rep = env->rep_handle; /* * Regardless of where a message came from, all ack's go to the master * site. If we're not in touch with the master, we drop it, since * there's not much else we can do. */ if (!IS_VALID_EID(db_rep->master_eid) || db_rep->master_eid == SELF_EID) { RPRINT(env, DB_VERB_REPMGR_MISC, (env, "dropping ack with master %d", db_rep->master_eid)); return (0); } ret = 0; LOCK_MUTEX(db_rep->mutex); site = SITE_FROM_EID(db_rep->master_eid); if (site->state == SITE_CONNECTED && site->ref.conn->state == CONN_READY) { conn = site->ref.conn; DB_ASSERT(env, conn->version > 0); ack.generation = generation; memcpy(&ack.lsn, lsn, sizeof(DB_LSN)); if (conn->version == 1) { control2.data = &ack; control2.size = sizeof(ack); } else { __repmgr_ack_marshal(env, &ack, buf); control2.data = buf; control2.size = __REPMGR_ACK_SIZE; } rec2.size = 0; /* * It's hard to imagine anyone would care about a lost ack if * the path to the master is so congested as to need blocking; * so pass "blockable" argument as FALSE. */ if ((ret = __repmgr_send_one(env, conn, REPMGR_ACK, &control2, &rec2, FALSE)) == DB_REP_UNAVAIL) ret = __repmgr_bust_connection(env, conn); } UNLOCK_MUTEX(db_rep->mutex); return (ret); } /* * Does everything necessary to handle the processing of a NEWSITE return. */ static int handle_newsite(env, rec) ENV *env; const DBT *rec; { ADDRINFO *ai; DB_REP *db_rep; REPMGR_SITE *site; SITE_STRING_BUFFER buffer; repmgr_netaddr_t *addr; size_t hlen; u_int16_t port; int ret; char *host; db_rep = env->rep_handle; /* * Check if we got sent connect information and if we did, if * this is me or if we already have a connection to this new * site. If we don't, establish a new one. * * Unmarshall the cdata: a 2-byte port number, in network byte order, * followed by the host name string, which should already be * null-terminated, but let's make sure. */ if (rec->size < sizeof(port) + 1) { __db_errx(env, "unexpected cdata size, msg ignored"); return (0); } memcpy(&port, rec->data, sizeof(port)); port = ntohs(port); host = (char*)((u_int8_t*)rec->data + sizeof(port)); hlen = (rec->size - sizeof(port)) - 1; host[hlen] = '\0'; /* It's me, do nothing. */ if (strcmp(host, db_rep->my_addr.host) == 0 && port == db_rep->my_addr.port) { RPRINT(env, DB_VERB_REPMGR_MISC, (env, "repmgr ignores own NEWSITE info")); return (0); } LOCK_MUTEX(db_rep->mutex); if ((ret = __repmgr_add_site(env, host, port, &site)) == EEXIST) { RPRINT(env, DB_VERB_REPMGR_MISC, (env, "NEWSITE info from %s was already known", __repmgr_format_site_loc(site, buffer))); /* * In case we already know about this site only because it * first connected to us, we may not yet have had a chance to * look up its addresses. Even though we don't need them just * now, this is an advantageous opportunity to get them since we * can do so away from the critical select thread. Give up only * for a disastrous failure. */ addr = &site->net_addr; if (addr->address_list == NULL) { if ((ret = __repmgr_getaddr(env, addr->host, addr->port, 0, &ai)) == 0) addr->address_list = ai; else if (ret != DB_REP_UNAVAIL) goto unlock; } ret = 0; if (site->state == SITE_CONNECTED) goto unlock; /* Nothing to do. */ } else { if (ret != 0) goto unlock; RPRINT(env, DB_VERB_REPMGR_MISC, (env, "NEWSITE info added %s", __repmgr_format_site_loc(site, buffer))); } /* * Wake up the main thread to connect to the new or reawakened * site. */ ret = __repmgr_wake_main_thread(env); unlock: UNLOCK_MUTEX(db_rep->mutex); return (ret); } /* * PUBLIC: void __repmgr_stash_generation __P((ENV *)); */ void __repmgr_stash_generation(env) ENV *env; { DB_REP *db_rep; REP *rep; db_rep = env->rep_handle; rep = db_rep->region; db_rep->generation = rep->gen; }