#include "db_config.h"
#define __INCLUDE_NETWORKING 1
#include "db_int.h"
static int __repmgr_is_ready __P((ENV *));
static int __repmgr_elect_main __P((ENV *));
static void *__repmgr_elect_thread __P((void *));
static int start_election_thread __P((ENV *));
int
__repmgr_init_election(env, initial_operation)
ENV *env;
int initial_operation;
{
DB_REP *db_rep;
int ret;
db_rep = env->rep_handle;
if (db_rep->finished) {
RPRINT(env, DB_VERB_REPMGR_MISC, (env,
"ignoring elect thread request %d; repmgr is finished",
initial_operation));
return (0);
}
db_rep->operation_needed = initial_operation;
if (db_rep->elect_thread == NULL)
ret = start_election_thread(env);
else if (db_rep->elect_thread->finished) {
RPRINT(env, DB_VERB_REPMGR_MISC,
(env, "join dead elect thread"));
if ((ret = __repmgr_thread_join(db_rep->elect_thread)) != 0)
return (ret);
__os_free(env, db_rep->elect_thread);
db_rep->elect_thread = NULL;
ret = start_election_thread(env);
} else {
RPRINT(env, DB_VERB_REPMGR_MISC,
(env, "reusing existing elect thread"));
if ((ret = __repmgr_signal(&db_rep->check_election)) != 0)
__db_err(env, ret, "can't signal election thread");
}
return (ret);
}
static int
start_election_thread(env)
ENV *env;
{
DB_REP *db_rep;
REPMGR_RUNNABLE *elector;
int ret;
db_rep = env->rep_handle;
if ((ret = __os_malloc(env, sizeof(REPMGR_RUNNABLE), &elector))
!= 0)
return (ret);
elector->env = env;
elector->run = __repmgr_elect_thread;
if ((ret = __repmgr_thread_start(env, elector)) == 0)
db_rep->elect_thread = elector;
else
__os_free(env, elector);
return (ret);
}
static void *
__repmgr_elect_thread(args)
void *args;
{
ENV *env = args;
int ret;
RPRINT(env, DB_VERB_REPMGR_MISC, (env, "starting election thread"));
if ((ret = __repmgr_elect_main(env)) != 0) {
__db_err(env, ret, "election thread failed");
__repmgr_thread_failure(env, ret);
}
RPRINT(env, DB_VERB_REPMGR_MISC, (env, "election thread is exiting"));
return (NULL);
}
static int
__repmgr_elect_main(env)
ENV *env;
{
DBT my_addr;
DB_ENV *dbenv;
DB_REP *db_rep;
#ifdef DB_WIN32
DWORD duration;
#else
struct timespec deadline;
#endif
u_int32_t nsites, nvotes;
int done, failure_recovery, last_op;
int need_success, ret, succeeded, to_do;
COMPQUIET(need_success, TRUE);
dbenv = env->dbenv;
db_rep = env->rep_handle;
last_op = 0;
failure_recovery = succeeded = FALSE;
LOCK_MUTEX(db_rep->mutex);
if (db_rep->finished) {
db_rep->elect_thread->finished = TRUE;
UNLOCK_MUTEX(db_rep->mutex);
return (0);
}
to_do = db_rep->operation_needed;
db_rep->operation_needed = 0;
UNLOCK_MUTEX(db_rep->mutex);
switch (to_do) {
case ELECT_FAILURE_ELECTION:
failure_recovery = TRUE;
to_do = ELECT_ELECTION;
case ELECT_ELECTION:
need_success = TRUE;
break;
case ELECT_SEEK_MASTER:
to_do = 0;
case ELECT_REPSTART:
need_success = FALSE;
break;
default:
DB_ASSERT(env, FALSE);
}
for (;;) {
RPRINT(env, DB_VERB_REPMGR_MISC,
(env, "elect thread to do: %d", to_do));
switch (to_do) {
case ELECT_ELECTION:
nsites = __repmgr_get_nsites(db_rep);
if (nsites == 2 &&
!FLD_ISSET(db_rep->region->config,
REP_C_2SITE_STRICT))
nvotes = 1;
else
nvotes = ELECTION_MAJORITY(nsites);
if (failure_recovery && nsites > nvotes)
nsites--;
switch (ret =
__rep_elect(dbenv, nsites, nvotes, 0)) {
case DB_REP_UNAVAIL:
break;
case 0:
succeeded = TRUE;
if (db_rep->takeover_pending) {
db_rep->takeover_pending = FALSE;
if ((ret =
__repmgr_become_master(env)) != 0)
return (ret);
}
break;
default:
__db_err(
env, ret, "unexpected election failure");
return (ret);
}
last_op = ELECT_ELECTION;
break;
case ELECT_REPSTART:
if ((ret =
__repmgr_prepare_my_addr(env, &my_addr)) != 0)
return (ret);
ret = __rep_start(dbenv, &my_addr, DB_REP_CLIENT);
__os_free(env, my_addr.data);
if (ret != 0) {
__db_err(env, ret, "rep_start");
return (ret);
}
last_op = ELECT_REPSTART;
break;
case 0:
last_op = 0;
break;
default:
DB_ASSERT(env, FALSE);
}
failure_recovery = FALSE;
LOCK_MUTEX(db_rep->mutex);
while (!succeeded && !__repmgr_is_ready(env)) {
#ifdef DB_WIN32
duration = db_rep->election_retry_wait / US_PER_MS;
ret = SignalObjectAndWait(db_rep->mutex,
db_rep->check_election, duration, FALSE);
LOCK_MUTEX(db_rep->mutex);
if (ret == WAIT_TIMEOUT)
break;
DB_ASSERT(env, ret == WAIT_OBJECT_0);
#else
__repmgr_compute_wait_deadline(env, &deadline,
db_rep->election_retry_wait);
if ((ret = pthread_cond_timedwait(
&db_rep->check_election, &db_rep->mutex, &deadline))
== ETIMEDOUT)
break;
DB_ASSERT(env, ret == 0);
#endif
}
done = FALSE;
if ((to_do = db_rep->operation_needed) != 0) {
db_rep->operation_needed = 0;
switch (to_do) {
case ELECT_FAILURE_ELECTION:
failure_recovery = TRUE;
to_do = ELECT_ELECTION;
case ELECT_ELECTION:
need_success = TRUE;
break;
case ELECT_SEEK_MASTER:
to_do = 0;
break;
default:
break;
}
} else if ((done = (succeeded ||
(!need_success && IS_VALID_EID(db_rep->master_eid)) ||
db_rep->finished)))
db_rep->elect_thread->finished = TRUE;
else {
if (last_op == ELECT_ELECTION)
to_do = ELECT_REPSTART;
else {
to_do = ELECT_ELECTION;
if (db_rep->init_policy == DB_REP_CLIENT &&
!db_rep->found_master)
to_do = ELECT_REPSTART;
}
}
UNLOCK_MUTEX(db_rep->mutex);
if (done)
return (0);
}
}
static int
__repmgr_is_ready(env)
ENV *env;
{
DB_REP *db_rep;
db_rep = env->rep_handle;
RPRINT(env, DB_VERB_REPMGR_MISC, (env,
"repmgr elect: opcode %d, finished %d, master %d",
db_rep->operation_needed, db_rep->finished, db_rep->master_eid));
return (db_rep->operation_needed || db_rep->finished);
}
int
__repmgr_become_master(env)
ENV *env;
{
DBT my_addr;
DB_ENV *dbenv;
DB_REP *db_rep;
int ret;
dbenv = env->dbenv;
db_rep = env->rep_handle;
db_rep->master_eid = SELF_EID;
db_rep->found_master = TRUE;
if ((ret = __repmgr_prepare_my_addr(env, &my_addr)) != 0)
return (ret);
ret = __rep_start(dbenv, &my_addr, DB_REP_MASTER);
__os_free(env, my_addr.data);
if (ret == 0)
__repmgr_stash_generation(env);
return (ret);
}