#include "db_config.h"
#ifndef lint
static const char revid[] = "$Id: rep_record.c,v 1.2 2004/03/30 01:23:56 jtownsen Exp $";
#endif
#ifndef NO_SYSTEM_INCLUDES
#include <stdlib.h>
#include <string.h>
#endif
#include "db_int.h"
#include "dbinc/db_page.h"
#include "dbinc/db_shash.h"
#include "dbinc/db_am.h"
#include "dbinc/lock.h"
#include "dbinc/log.h"
#include "dbinc/mp.h"
#include "dbinc/txn.h"
static int __rep_apply __P((DB_ENV *, REP_CONTROL *, DBT *, DB_LSN *));
static int __rep_collect_txn __P((DB_ENV *, DB_LSN *, LSN_COLLECTION *));
static int __rep_dorecovery __P((DB_ENV *, DB_LSN *, DB_LSN *));
static int __rep_lsn_cmp __P((const void *, const void *));
static int __rep_newfile __P((DB_ENV *, REP_CONTROL *, DB_LSN *));
static int __rep_verify_match __P((DB_ENV *, REP_CONTROL *, time_t));
#define IS_SIMPLE(R) ((R) != DB___txn_regop && (R) != DB___txn_xa_regop && \
(R) != DB___txn_ckp && (R) != DB___dbreg_register)
#ifdef DIAGNOSTIC
#define MASTER_ONLY(rep, rp) do { \
if (!F_ISSET(rep, REP_F_MASTER)) { \
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) { \
__db_err(dbenv, "Master record received on client"); \
__rep_print_message(dbenv, \
*eidp, rp, "rep_process_message"); \
} \
ret = EINVAL; \
goto errlock; \
} \
} while (0)
#define CLIENT_ONLY(rep, rp) do { \
if (!F_ISSET(rep, REP_ISCLIENT)) { \
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) { \
__db_err(dbenv, "Client record received on master"); \
__rep_print_message(dbenv, \
*eidp, rp, "rep_process_message"); \
} \
(void)__rep_send_message(dbenv, \
DB_EID_BROADCAST, REP_DUPMASTER, NULL, NULL, 0); \
ret = DB_REP_DUPMASTER; \
goto errlock; \
} \
} while (0)
#define MASTER_CHECK(dbenv, eid, rep) \
do { \
if (rep->master_id == DB_EID_INVALID) { \
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) \
__db_err(dbenv, \
"Received record from %d, master is INVALID",\
eid); \
ret = 0; \
(void)__rep_send_message(dbenv, \
DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0); \
goto errlock; \
} \
if (eid != rep->master_id) { \
__db_err(dbenv, \
"Received master record from %d, master is %d", \
eid, rep->master_id); \
ret = EINVAL; \
goto errlock; \
} \
} while (0)
#else
#define MASTER_ONLY(rep, rp) do { \
if (!F_ISSET(rep, REP_F_MASTER)) { \
ret = EINVAL; \
goto errlock; \
} \
} while (0)
#define CLIENT_ONLY(rep, rp) do { \
if (!F_ISSET(rep, REP_ISCLIENT)) { \
(void)__rep_send_message(dbenv, \
DB_EID_BROADCAST, REP_DUPMASTER, NULL, NULL, 0); \
ret = DB_REP_DUPMASTER; \
goto errlock; \
} \
} while (0)
#define MASTER_CHECK(dbenv, eid, rep) \
do { \
if (rep->master_id == DB_EID_INVALID) { \
ret = 0; \
(void)__rep_send_message(dbenv, \
DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0); \
goto errlock; \
} \
if (eid != rep->master_id) { \
__db_err(dbenv, \
"Received master record from %d, master is %d", \
eid, rep->master_id); \
ret = EINVAL; \
goto errlock; \
} \
} while (0)
#endif
#define ANYSITE(rep)
int
__rep_process_message(dbenv, control, rec, eidp, ret_lsnp)
DB_ENV *dbenv;
DBT *control, *rec;
int *eidp;
DB_LSN *ret_lsnp;
{
DB_LOG *dblp;
DB_LOGC *logc;
DB_LSN endlsn, lsn, oldfilelsn;
DB_REP *db_rep;
DBT *d, data_dbt, mylog;
LOG *lp;
REP *rep;
REP_CONTROL *rp;
REP_VOTE_INFO *vi;
u_int32_t bytes, egen, flags, gen, gbytes, type;
int check_limit, cmp, done, do_req;
int master, old, recovering, ret, t_ret;
time_t savetime;
PANIC_CHECK(dbenv);
ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_process_message",
DB_INIT_REP);
if (control == NULL || control->size == 0) {
__db_err(dbenv,
"DB_ENV->rep_process_message: control argument must be specified");
return (EINVAL);
}
if (!IS_REP_MASTER(dbenv) && !IS_REP_CLIENT(dbenv)) {
__db_err(dbenv,
"Environment not configured as replication master or client");
return (EINVAL);
}
ret = 0;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
rp = (REP_CONTROL *)control->data;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
if (rep->start_th != 0) {
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
if (F_ISSET(rp, DB_LOG_PERM)) {
if (ret_lsnp != NULL)
*ret_lsnp = rp->lsn;
return (DB_REP_NOTPERM);
} else
return (0);
}
if (rep->in_recovery != 0) {
rep->stat.st_msgs_recover++;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
return (0);
}
rep->msg_th++;
gen = rep->gen;
recovering = rep->in_recovery ||
F_ISSET(rep, REP_F_READY | REP_F_RECOVER);
savetime = rep->timestamp;
rep->stat.st_msgs_processed++;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__rep_print_message(dbenv, *eidp, rp, "rep_process_message");
#endif
if (rp->rep_version != DB_REPVERSION) {
__db_err(dbenv,
"unexpected replication message version %lu, expected %d",
(u_long)rp->rep_version, DB_REPVERSION);
ret = EINVAL;
goto errlock;
}
if (rp->log_version != DB_LOGVERSION) {
__db_err(dbenv,
"unexpected log record version %lu, expected %d",
(u_long)rp->log_version, DB_LOGVERSION);
ret = EINVAL;
goto errlock;
}
if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ &&
rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ &&
rp->rectype != REP_DUPMASTER) {
rep->stat.st_msgs_badgen++;
goto errlock;
}
if (rp->gen > gen) {
if (F_ISSET(rep, REP_F_MASTER)) {
rep->stat.st_dupmasters++;
ret = DB_REP_DUPMASTER;
if (rp->rectype != REP_DUPMASTER)
(void)__rep_send_message(dbenv,
DB_EID_BROADCAST, REP_DUPMASTER,
NULL, NULL, 0);
goto errlock;
}
if (rp->rectype == REP_ALIVE ||
rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) {
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "Updating gen from %lu to %lu",
(u_long)gen, (u_long)rp->gen);
#endif
gen = rep->gen = rp->gen;
if (rep->egen <= gen)
rep->egen = rep->gen + 1;
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "Updating egen to %lu",
(u_long)rep->egen);
#endif
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
} else if (rp->rectype != REP_NEWMASTER) {
(void)__rep_send_message(dbenv,
DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0);
goto errlock;
}
}
if (recovering) {
switch (rp->rectype) {
case REP_VERIFY:
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
cmp = log_compare(&lp->verify_lsn, &rp->lsn);
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
if (cmp != 0)
goto skip;
break;
case REP_ALIVE:
case REP_ALIVE_REQ:
case REP_DUPMASTER:
case REP_NEWCLIENT:
case REP_NEWMASTER:
case REP_NEWSITE:
case REP_VERIFY_FAIL:
case REP_VOTE1:
case REP_VOTE2:
break;
default:
skip:
rep->stat.st_msgs_recover++;
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
do_req = ++lp->rcvd_recs >= lp->wait_recs;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
if (do_req) {
lp->wait_recs *= 2;
if (lp->wait_recs > rep->max_gap)
lp->wait_recs = rep->max_gap;
lp->rcvd_recs = 0;
lsn = lp->verify_lsn;
}
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
if (do_req) {
if (rep->master_id == DB_EID_INVALID &&
rp->rectype != REP_MASTER_REQ)
(void)__rep_send_message(dbenv,
DB_EID_BROADCAST,
REP_MASTER_REQ,
NULL, NULL, 0);
else if (*eidp == rep->master_id)
(void)__rep_send_message(
dbenv, *eidp,
REP_VERIFY_REQ,
&lsn, NULL, 0);
}
goto errlock;
}
}
switch (rp->rectype) {
case REP_ALIVE:
ANYSITE(rep);
egen = *(u_int32_t *)rec->data;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "Received ALIVE egen of %lu, mine %lu",
(u_long)egen, (u_long)rep->egen);
#endif
if (egen > rep->egen)
rep->egen = egen;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
break;
case REP_ALIVE_REQ:
ANYSITE(rep);
dblp = dbenv->lg_handle;
R_LOCK(dbenv, &dblp->reginfo);
lsn = ((LOG *)dblp->reginfo.primary)->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
egen = rep->egen;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
data_dbt.data = &egen;
data_dbt.size = sizeof(egen);
(void)__rep_send_message(dbenv,
*eidp, REP_ALIVE, &lsn, &data_dbt, 0);
goto errlock;
case REP_DUPMASTER:
if (F_ISSET(rep, REP_F_MASTER))
ret = DB_REP_DUPMASTER;
goto errlock;
case REP_ALL_REQ:
MASTER_ONLY(rep, rp);
gbytes = bytes = 0;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
gbytes = rep->gbytes;
bytes = rep->bytes;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
check_limit = gbytes != 0 || bytes != 0;
if ((ret = __log_cursor(dbenv, &logc)) != 0)
goto errlock;
memset(&data_dbt, 0, sizeof(data_dbt));
oldfilelsn = lsn = rp->lsn;
type = REP_LOG;
flags = IS_ZERO_LSN(rp->lsn) ||
IS_INIT_LSN(rp->lsn) ? DB_FIRST : DB_SET;
for (ret = __log_c_get(logc, &lsn, &data_dbt, flags);
ret == 0 && type == REP_LOG;
ret = __log_c_get(logc, &lsn, &data_dbt, DB_NEXT)) {
if (lsn.file != oldfilelsn.file)
(void)__rep_send_message(dbenv,
*eidp, REP_NEWFILE, &oldfilelsn, NULL, 0);
if (check_limit) {
while (bytes <
data_dbt.size + sizeof(REP_CONTROL)) {
if (gbytes > 0) {
bytes += GIGABYTE;
--gbytes;
continue;
}
rep->stat.st_nthrottles++;
type = REP_LOG_MORE;
goto send;
}
bytes -= (data_dbt.size + sizeof(REP_CONTROL));
}
send: if (__rep_send_message(dbenv,
*eidp, type, &lsn, &data_dbt, 0) != 0)
break;
oldfilelsn = lsn;
oldfilelsn.offset += logc->c_len;
}
if (ret == DB_NOTFOUND)
ret = 0;
if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
ret = t_ret;
goto errlock;
#ifdef NOTYET
case REP_FILE:
CLIENT_ONLY(rep, rp);
MASTER_CHECK(dbenv, *eidp, rep);
break;
case REP_FILE_REQ:
MASTER_ONLY(rep, rp);
ret = __rep_send_file(dbenv, rec, *eidp);
goto errlock;
#endif
case REP_LOG:
case REP_LOG_MORE:
CLIENT_ONLY(rep, rp);
MASTER_CHECK(dbenv, *eidp, rep);
if ((ret = __rep_apply(dbenv, rp, rec, ret_lsnp)) != 0)
goto errlock;
if (rp->rectype == REP_LOG_MORE) {
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
master = rep->master_id;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
if (master == DB_EID_INVALID)
ret = 0;
else
if (__rep_send_message(dbenv,
master, REP_ALL_REQ, &lsn, NULL, 0) != 0)
break;
}
goto errlock;
case REP_LOG_REQ:
MASTER_ONLY(rep, rp);
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION) &&
rec != NULL && rec->size != 0) {
__db_err(dbenv,
"[%lu][%lu]: LOG_REQ max lsn: [%lu][%lu]",
(u_long) rp->lsn.file, (u_long)rp->lsn.offset,
(u_long)((DB_LSN *)rec->data)->file,
(u_long)((DB_LSN *)rec->data)->offset);
}
#endif
lsn = rp->lsn;
if ((ret = __log_cursor(dbenv, &logc)) != 0)
goto errlock;
memset(&data_dbt, 0, sizeof(data_dbt));
ret = __log_c_get(logc, &rp->lsn, &data_dbt, DB_SET);
if (ret == 0)
(void)__rep_send_message(dbenv,
*eidp, REP_LOG, &rp->lsn, &data_dbt, 0);
else if (ret == DB_NOTFOUND) {
R_LOCK(dbenv, &dblp->reginfo);
endlsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
if (endlsn.file > lsn.file) {
endlsn.file = lsn.file + 1;
endlsn.offset = 0;
if ((ret = __log_c_get(logc,
&endlsn, &data_dbt, DB_SET)) != 0 ||
(ret = __log_c_get(logc,
&endlsn, &data_dbt, DB_PREV)) != 0) {
if (FLD_ISSET(dbenv->verbose,
DB_VERB_REPLICATION))
__db_err(dbenv,
"Unable to get prev of [%lu][%lu]",
(u_long)lsn.file,
(u_long)lsn.offset);
ret = DB_REP_OUTDATED;
} else {
endlsn.offset += logc->c_len;
(void)__rep_send_message(dbenv, *eidp,
REP_NEWFILE, &endlsn, NULL, 0);
}
} else {
DB_ASSERT(0);
__db_err(dbenv,
"Request for LSN [%lu][%lu] fails",
(u_long)lsn.file, (u_long)lsn.offset);
ret = EINVAL;
}
}
while (ret == 0 && rec != NULL && rec->size != 0) {
if ((ret =
__log_c_get(logc, &lsn, &data_dbt, DB_NEXT)) != 0) {
if (ret == DB_NOTFOUND)
ret = 0;
break;;
}
if (log_compare(&lsn, (DB_LSN *)rec->data) >= 0)
break;
if (__rep_send_message(dbenv,
*eidp, REP_LOG, &lsn, &data_dbt, 0) != 0)
break;
}
if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
ret = t_ret;
goto errlock;
case REP_NEWSITE:
rep->stat.st_newsites++;
if (F_ISSET(rep, REP_F_MASTER)) {
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
(void)__rep_send_message(dbenv,
*eidp, REP_NEWMASTER, &lsn, NULL, 0);
}
ret = DB_REP_NEWSITE;
goto errlock;
case REP_NEWCLIENT:
(void)__rep_send_message(dbenv,
DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0);
ret = DB_REP_NEWSITE;
if (F_ISSET(rep, REP_F_UPGRADE)) {
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
egen = rep->egen;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
data_dbt.data = &egen;
data_dbt.size = sizeof(egen);
(void)__rep_send_message(dbenv,
*eidp, REP_ALIVE, &rp->lsn, &data_dbt, 0);
goto errlock;
}
case REP_MASTER_REQ:
if (F_ISSET(rep, REP_F_MASTER)) {
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
(void)__rep_send_message(dbenv,
DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0);
}
goto errlock;
case REP_NEWFILE:
CLIENT_ONLY(rep, rp);
MASTER_CHECK(dbenv, *eidp, rep);
ret = __rep_apply(dbenv, rp, rec, ret_lsnp);
goto errlock;
case REP_NEWMASTER:
ANYSITE(rep);
if (F_ISSET(rep, REP_F_MASTER) &&
*eidp != dbenv->rep_eid) {
rep->stat.st_dupmasters++;
ret = DB_REP_DUPMASTER;
(void)__rep_send_message(dbenv,
DB_EID_BROADCAST, REP_DUPMASTER, NULL, NULL, 0);
goto errlock;
}
ret = __rep_new_master(dbenv, rp, *eidp);
goto errlock;
case REP_PAGE:
CLIENT_ONLY(rep, rp);
MASTER_CHECK(dbenv, *eidp, rep);
break;
case REP_PAGE_REQ:
MASTER_ONLY(rep, rp);
break;
case REP_PLIST:
CLIENT_ONLY(rep, rp);
MASTER_CHECK(dbenv, *eidp, rep);
break;
case REP_PLIST_REQ:
MASTER_ONLY(rep, rp);
break;
case REP_VERIFY:
CLIENT_ONLY(rep, rp);
MASTER_CHECK(dbenv, *eidp, rep);
DB_ASSERT((F_ISSET(rep, REP_F_RECOVER) &&
!IS_ZERO_LSN(lp->verify_lsn)) ||
(!F_ISSET(rep, REP_F_RECOVER) &&
IS_ZERO_LSN(lp->verify_lsn)));
if (IS_ZERO_LSN(lp->verify_lsn))
goto errlock;
if ((ret = __log_cursor(dbenv, &logc)) != 0)
goto errlock;
memset(&mylog, 0, sizeof(mylog));
if ((ret = __log_c_get(logc, &rp->lsn, &mylog, DB_SET)) != 0)
goto rep_verify_err;
if (mylog.size == rec->size &&
memcmp(mylog.data, rec->data, rec->size) == 0) {
ret = __rep_verify_match(dbenv, rp, savetime);
} else if ((ret =
__log_c_get(logc, &lsn, &mylog, DB_PREV)) == 0) {
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
lp->verify_lsn = lsn;
lp->rcvd_recs = 0;
lp->wait_recs = rep->request_gap;
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
(void)__rep_send_message(dbenv,
*eidp, REP_VERIFY_REQ, &lsn, NULL, 0);
} else if (ret == DB_NOTFOUND) {
ret = DB_REP_OUTDATED;
if (rp->lsn.file != 1)
__db_err(dbenv,
"Too few log files to sync with master");
else
__db_err(dbenv,
"Client was never part of master's environment");
}
rep_verify_err: if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
ret = t_ret;
goto errlock;
case REP_VERIFY_FAIL:
rep->stat.st_outdated++;
ret = DB_REP_OUTDATED;
goto errlock;
case REP_VERIFY_REQ:
MASTER_ONLY(rep, rp);
type = REP_VERIFY;
if ((ret = __log_cursor(dbenv, &logc)) != 0)
goto errlock;
d = &data_dbt;
memset(d, 0, sizeof(data_dbt));
F_SET(logc, DB_LOG_SILENT_ERR);
ret = __log_c_get(logc, &rp->lsn, d, DB_SET);
if (ret == DB_NOTFOUND &&
__log_is_outdated(dbenv, rp->lsn.file, &old) == 0 &&
old != 0)
type = REP_VERIFY_FAIL;
if (ret != 0)
d = NULL;
(void)__rep_send_message(dbenv, *eidp, type, &rp->lsn, d, 0);
ret = __log_c_close(logc);
goto errlock;
case REP_VOTE1:
if (F_ISSET(rep, REP_F_MASTER)) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "Master received vote");
#endif
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
(void)__rep_send_message(dbenv,
*eidp, REP_NEWMASTER, &lsn, NULL, 0);
goto errlock;
}
vi = (REP_VOTE_INFO *)rec->data;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
if (vi->egen < rep->egen) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv,
"Received old vote %lu, egen %lu, ignoring vote1",
(u_long)vi->egen, (u_long)rep->egen);
#endif
goto errunlock;
}
if (vi->egen > rep->egen) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv,
"Received VOTE1 from egen %lu, my egen %lu; reset",
(u_long)vi->egen, (u_long)rep->egen);
#endif
__rep_elect_done(dbenv, rep);
rep->egen = vi->egen;
}
if (!IN_ELECTION(rep))
F_SET(rep, REP_F_TALLY);
if (vi->nsites > rep->nsites)
rep->nsites = vi->nsites;
if (rep->sites + 1 > rep->nsites)
rep->nsites = rep->sites + 1;
if (rep->nsites > rep->asites &&
(ret = __rep_grow_sites(dbenv, rep->nsites)) != 0) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv,
"Grow sites returned error %d", ret);
#endif
goto errunlock;
}
if (F_ISSET(rep, REP_F_EPHASE2)) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "In phase 2, ignoring vote1");
#endif
goto errunlock;
}
if ((ret = __rep_tally(dbenv, rep, *eidp, &rep->sites,
vi->egen, rep->tally_off)) != 0) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "Tally returned %d, sites %d",
ret, rep->sites);
#endif
ret = 0;
goto errunlock;
}
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) {
__db_err(dbenv,
"Incoming vote: (eid)%d (pri)%d (gen)%lu (egen)%lu [%lu,%lu]",
*eidp, vi->priority,
(u_long)rp->gen, (u_long)vi->egen,
(u_long)rp->lsn.file, (u_long)rp->lsn.offset);
if (rep->sites > 1)
__db_err(dbenv,
"Existing vote: (eid)%d (pri)%d (gen)%lu (sites)%d [%lu,%lu]",
rep->winner, rep->w_priority,
(u_long)rep->w_gen, rep->sites,
(u_long)rep->w_lsn.file,
(u_long)rep->w_lsn.offset);
}
#endif
__rep_cmp_vote(dbenv, rep, eidp, &rp->lsn, vi->priority,
rp->gen, vi->tiebreaker);
if (!IN_ELECTION(rep)) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv,
"Not in election, but received vote1 0x%x",
rep->flags);
#endif
ret = DB_REP_HOLDELECTION;
goto errunlock;
}
master = rep->winner;
lsn = rep->w_lsn;
done = rep->sites >= rep->nsites && rep->w_priority != 0;
if (done) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) {
__db_err(dbenv, "Phase1 election done");
__db_err(dbenv, "Voting for %d%s",
master, master == rep->eid ? "(self)" : "");
}
#endif
egen = rep->egen;
F_SET(rep, REP_F_EPHASE2);
F_CLR(rep, REP_F_EPHASE1);
if (master == rep->eid) {
(void)__rep_tally(dbenv, rep, rep->eid,
&rep->votes, egen, rep->v2tally_off);
goto errunlock;
}
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
__rep_send_vote(dbenv, NULL, 0, 0, 0, egen,
master, REP_VOTE2);
} else
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
break;
case REP_VOTE2:
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "We received a vote%s",
F_ISSET(rep, REP_F_MASTER) ?
" (master)" : "");
#endif
if (F_ISSET(rep, REP_F_MASTER)) {
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
rep->stat.st_elections_won++;
(void)__rep_send_message(dbenv,
*eidp, REP_NEWMASTER, &lsn, NULL, 0);
goto errlock;
}
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
DB_ASSERT(rep->priority != 0);
vi = (REP_VOTE_INFO *)rec->data;
if (!IN_ELECTION_TALLY(rep) && vi->egen >= rep->egen) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv,
"Not in election gen %lu, at %lu, got vote",
(u_long)vi->egen, (u_long)rep->egen);
#endif
ret = DB_REP_HOLDELECTION;
goto errunlock;
}
if ((ret = __rep_cmp_vote2(dbenv, rep, *eidp, vi->egen)) != 0) {
ret = 0;
goto errunlock;
}
if ((ret = __rep_tally(dbenv, rep, *eidp, &rep->votes,
vi->egen, rep->v2tally_off)) != 0) {
ret = 0;
goto errunlock;
}
done = rep->votes > rep->nsites / 2;
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "Counted vote %d", rep->votes);
#endif
if (done) {
__rep_elect_master(dbenv, rep, eidp);
ret = DB_REP_NEWMASTER;
goto errunlock;
} else
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
break;
default:
__db_err(dbenv,
"DB_ENV->rep_process_message: unknown replication message: type %lu",
(u_long)rp->rectype);
ret = EINVAL;
goto errlock;
}
errlock:
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
errunlock:
rep->msg_th--;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
return (ret);
}
static int
__rep_apply(dbenv, rp, rec, ret_lsnp)
DB_ENV *dbenv;
REP_CONTROL *rp;
DBT *rec;
DB_LSN *ret_lsnp;
{
__dbreg_register_args dbreg_args;
__txn_ckp_args ckp_args;
DB_REP *db_rep;
DBT control_dbt, key_dbt, lsn_dbt;
DBT max_lsn_dbt, *max_lsn_dbtp, nextrec_dbt, rec_dbt;
DB *dbp;
DBC *dbc;
DB_LOG *dblp;
DB_LSN ckp_lsn, max_lsn, next_lsn;
LOG *lp;
REP *rep;
REP_CONTROL *grp;
u_int32_t rectype, txnid;
int cmp, do_req, eid, gap, ret, t_ret;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
dbp = db_rep->rep_db;
dbc = NULL;
ret = gap = 0;
memset(&control_dbt, 0, sizeof(control_dbt));
memset(&rec_dbt, 0, sizeof(rec_dbt));
max_lsn_dbtp = NULL;
dblp = dbenv->lg_handle;
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
lp = dblp->reginfo.primary;
cmp = log_compare(&rp->lsn, &lp->ready_lsn);
if (cmp == 0) {
if (rp->rectype == REP_NEWFILE) {
ret = __rep_newfile(dbenv, rp, &lp->ready_lsn);
rectype = 0;
} else {
if (F_ISSET(rp, DB_LOG_PERM)) {
gap = 1;
max_lsn = rp->lsn;
}
ret = __log_rep_put(dbenv, &rp->lsn, rec);
memcpy(&rectype, rec->data, sizeof(rectype));
if (ret == 0)
rep->stat.st_log_records++;
}
lp->rcvd_recs = 0;
while (ret == 0 && IS_SIMPLE(rectype) &&
log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0) {
gap_check: max_lsn_dbtp = NULL;
lp->wait_recs = 0;
lp->rcvd_recs = 0;
ZERO_LSN(lp->max_wait_lsn);
if (dbc == NULL &&
(ret = __db_cursor(dbp, NULL, &dbc, 0)) != 0)
goto err;
F_SET(&control_dbt, DB_DBT_REALLOC);
F_SET(&rec_dbt, DB_DBT_REALLOC);
if ((ret = __db_c_get(dbc,
&control_dbt, &rec_dbt, DB_RMW | DB_FIRST)) != 0)
goto err;
rp = (REP_CONTROL *)control_dbt.data;
rec = &rec_dbt;
memcpy(&rectype, rec->data, sizeof(rectype));
if (rp->rectype != REP_NEWFILE) {
ret = __log_rep_put(dbenv, &rp->lsn, rec);
if (ret == 0)
rep->stat.st_log_records++;
} else {
ret = __rep_newfile(dbenv, rp, &lp->ready_lsn);
rectype = 0;
}
if ((ret = __db_c_del(dbc, 0)) != 0)
goto err;
if (F_ISSET(rp, DB_LOG_PERM)) {
gap = 1;
max_lsn = rp->lsn;
}
--rep->stat.st_log_queued;
memset(&nextrec_dbt, 0, sizeof(nextrec_dbt));
F_SET(&nextrec_dbt, DB_DBT_PARTIAL);
nextrec_dbt.ulen = nextrec_dbt.dlen = 0;
memset(&lsn_dbt, 0, sizeof(lsn_dbt));
ret = __db_c_get(dbc, &lsn_dbt, &nextrec_dbt, DB_NEXT);
if (ret != DB_NOTFOUND && ret != 0)
goto err;
if (ret == DB_NOTFOUND) {
ZERO_LSN(lp->waiting_lsn);
break;
}
grp = (REP_CONTROL *)lsn_dbt.data;
lp->waiting_lsn = grp->lsn;
if (!IS_SIMPLE(rectype))
break;
}
do_req = 0;
if (!IS_ZERO_LSN(lp->waiting_lsn) &&
log_compare(&lp->ready_lsn, &lp->waiting_lsn) != 0) {
next_lsn = lp->ready_lsn;
do_req = ++lp->rcvd_recs >= lp->wait_recs;
if (do_req) {
lp->wait_recs = rep->request_gap;
lp->rcvd_recs = 0;
if (log_compare(&rp->lsn,
&lp->max_wait_lsn) == 0) {
lp->max_wait_lsn = lp->waiting_lsn;
memset(&max_lsn_dbt,
0, sizeof(max_lsn_dbt));
max_lsn_dbt.data = &lp->waiting_lsn;
max_lsn_dbt.size =
sizeof(lp->waiting_lsn);
max_lsn_dbtp = &max_lsn_dbt;
}
}
} else {
lp->wait_recs = 0;
ZERO_LSN(lp->max_wait_lsn);
}
if (dbc != NULL)
if ((ret = __db_c_close(dbc)) != 0)
goto err;
dbc = NULL;
if (do_req) {
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
eid = db_rep->region->master_id;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
if (eid != DB_EID_INVALID) {
rep->stat.st_log_requested++;
(void)__rep_send_message(dbenv, eid,
REP_LOG_REQ, &next_lsn, max_lsn_dbtp, 0);
}
}
} else if (cmp > 0) {
memset(&key_dbt, 0, sizeof(key_dbt));
key_dbt.data = rp;
key_dbt.size = sizeof(*rp);
R_LOCK(dbenv, &dblp->reginfo);
next_lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
do_req = 0;
if (lp->wait_recs == 0) {
lp->wait_recs = rep->request_gap;
lp->rcvd_recs = 0;
ZERO_LSN(lp->max_wait_lsn);
}
if (++lp->rcvd_recs >= lp->wait_recs) {
do_req = 1;
lp->rcvd_recs = 0;
lp->wait_recs *= 2;
if (lp->wait_recs > rep->max_gap)
lp->wait_recs = rep->max_gap;
if (IS_ZERO_LSN(lp->max_wait_lsn)) {
lp->max_wait_lsn = lp->waiting_lsn;
memset(&max_lsn_dbt, 0, sizeof(max_lsn_dbt));
max_lsn_dbt.data = &lp->waiting_lsn;
max_lsn_dbt.size = sizeof(lp->waiting_lsn);
max_lsn_dbtp = &max_lsn_dbt;
} else {
max_lsn_dbtp = NULL;
lp->max_wait_lsn = next_lsn;
}
}
ret = __db_put(dbp, NULL, &key_dbt, rec, 0);
rep->stat.st_log_queued++;
rep->stat.st_log_queued_total++;
if (rep->stat.st_log_queued_max < rep->stat.st_log_queued)
rep->stat.st_log_queued_max = rep->stat.st_log_queued;
if (ret != 0)
goto done;
if (IS_ZERO_LSN(lp->waiting_lsn) ||
log_compare(&rp->lsn, &lp->waiting_lsn) < 0)
lp->waiting_lsn = rp->lsn;
if (do_req) {
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
eid = db_rep->region->master_id;
if (eid != DB_EID_INVALID) {
rep->stat.st_log_requested++;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
(void)__rep_send_message(dbenv, eid,
REP_LOG_REQ, &next_lsn, max_lsn_dbtp, 0);
} else {
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
(void)__rep_send_message(dbenv,
DB_EID_BROADCAST, REP_MASTER_REQ,
NULL, NULL, 0);
}
}
if (ret == 0 && F_ISSET(rp, DB_LOG_PERM)) {
if (ret_lsnp != NULL)
*ret_lsnp = rp->lsn;
ret = DB_REP_NOTPERM;
}
goto done;
} else {
rep->stat.st_log_duplicated++;
goto done;
}
if (ret != 0 || cmp < 0 || (cmp == 0 && IS_SIMPLE(rectype)))
goto done;
switch (rectype) {
case DB___dbreg_register:
memcpy(&txnid, (u_int8_t *)rec->data +
((u_int8_t *)&dbreg_args.txnid - (u_int8_t *)&dbreg_args),
sizeof(u_int32_t));
if (txnid == TXN_INVALID &&
!F_ISSET(rep, REP_F_LOGSONLY))
ret = __db_dispatch(dbenv, dbenv->recover_dtab,
dbenv->recover_dtab_size, rec, &rp->lsn,
DB_TXN_APPLY, NULL);
break;
case DB___txn_ckp:
memcpy(&ckp_lsn, (u_int8_t *)rec->data +
((u_int8_t *)&ckp_args.ckp_lsn - (u_int8_t *)&ckp_args),
sizeof(DB_LSN));
if (!F_ISSET(rep, REP_F_LOGSONLY))
ret = __memp_sync(dbenv, &ckp_lsn);
else
ret = __log_flush(dbenv, &ckp_lsn);
if (ret == 0)
__txn_updateckp(dbenv, &rp->lsn);
else {
__db_err(dbenv, "Error syncing ckp [%lu][%lu]",
(u_long)ckp_lsn.file, (u_long)ckp_lsn.offset);
__db_panic(dbenv, ret);
}
break;
case DB___txn_regop:
if (!F_ISSET(rep, REP_F_LOGSONLY))
do {
ret = __rep_process_txn(dbenv, rec);
} while (ret == DB_LOCK_DEADLOCK);
if (ret == 0 && !F_ISSET(dbenv, DB_ENV_TXN_NOSYNC))
ret = __log_flush(dbenv, NULL);
if (ret != 0) {
__db_err(dbenv, "Error processing txn [%lu][%lu]",
(u_long)rp->lsn.file, (u_long)rp->lsn.offset);
__db_panic(dbenv, ret);
}
break;
case DB___txn_xa_regop:
ret = __log_flush(dbenv, NULL);
break;
default:
goto err;
}
if (ret == 0) {
if (log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0)
goto gap_check;
}
done:
err: if (dbc != NULL && (t_ret = __db_c_close(dbc)) != 0 && ret == 0)
ret = t_ret;
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
if (ret == 0 && F_ISSET(dbenv, DB_ENV_LOG_AUTOREMOVE) &&
rp->rectype == REP_NEWFILE)
__log_autoremove(dbenv);
if (control_dbt.data != NULL)
__os_ufree(dbenv, control_dbt.data);
if (rec_dbt.data != NULL)
__os_ufree(dbenv, rec_dbt.data);
if (ret == 0 && gap) {
if (ret_lsnp != NULL)
*ret_lsnp = max_lsn;
ret = DB_REP_ISPERM;
}
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) {
if (ret == DB_REP_ISPERM)
__db_err(dbenv, "Returning ISPERM [%lu][%lu]",
(u_long)ret_lsnp->file, (u_long)ret_lsnp->offset);
else if (ret == DB_REP_NOTPERM)
__db_err(dbenv, "Returning NOTPERM [%lu][%lu]",
(u_long)ret_lsnp->file, (u_long)ret_lsnp->offset);
else if (ret != 0)
__db_err(dbenv, "Returning %d [%lu][%lu]", ret,
(u_long)ret_lsnp->file, (u_long)ret_lsnp->offset);
}
#endif
return (ret);
}
int
__rep_process_txn(dbenv, rec)
DB_ENV *dbenv;
DBT *rec;
{
DBT data_dbt, *lock_dbt;
DB_LOCKREQ req, *lvp;
DB_LOGC *logc;
DB_LSN prev_lsn, *lsnp;
DB_REP *db_rep;
LSN_COLLECTION lc;
REP *rep;
__txn_regop_args *txn_args;
__txn_xa_regop_args *prep_args;
u_int32_t lockid, rectype;
int i, ret, t_ret;
void *txninfo;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
logc = NULL;
txninfo = NULL;
memset(&data_dbt, 0, sizeof(data_dbt));
if (F_ISSET(dbenv, DB_ENV_THREAD))
F_SET(&data_dbt, DB_DBT_REALLOC);
memcpy(&rectype, rec->data, sizeof(rectype));
memset(&lc, 0, sizeof(lc));
if (rectype == DB___txn_regop) {
if ((ret = __txn_regop_read(dbenv, rec->data, &txn_args)) != 0)
return (ret);
if (txn_args->opcode != TXN_COMMIT) {
__os_free(dbenv, txn_args);
return (0);
}
prev_lsn = txn_args->prev_lsn;
lock_dbt = &txn_args->locks;
} else {
DB_ASSERT(rectype == DB___txn_xa_regop);
if ((ret =
__txn_xa_regop_read(dbenv, rec->data, &prep_args)) != 0)
return (ret);
prev_lsn = prep_args->prev_lsn;
lock_dbt = &prep_args->locks;
}
if ((ret = __lock_id(dbenv, &lockid)) != 0)
goto err1;
if ((ret =
__lock_get_list(dbenv, lockid, 0, DB_LOCK_WRITE, lock_dbt)) != 0)
goto err;
if ((ret = __rep_collect_txn(dbenv, &prev_lsn, &lc)) != 0)
goto err;
qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp);
if ((ret = __db_txnlist_init(dbenv, 0, 0, NULL, &txninfo)) != 0)
goto err;
if ((ret = __log_cursor(dbenv, &logc)) != 0)
goto err;
for (lsnp = &lc.array[0], i = 0; i < lc.nlsns; i++, lsnp++) {
if ((ret = __log_c_get(logc, lsnp, &data_dbt, DB_SET)) != 0) {
__db_err(dbenv, "failed to read the log at [%lu][%lu]",
(u_long)lsnp->file, (u_long)lsnp->offset);
goto err;
}
if ((ret = __db_dispatch(dbenv, dbenv->recover_dtab,
dbenv->recover_dtab_size, &data_dbt, lsnp,
DB_TXN_APPLY, txninfo)) != 0) {
__db_err(dbenv, "transaction failed at [%lu][%lu]",
(u_long)lsnp->file, (u_long)lsnp->offset);
goto err;
}
}
err: memset(&req, 0, sizeof(req));
req.op = DB_LOCK_PUT_ALL;
if ((t_ret =
__lock_vec(dbenv, lockid, 0, &req, 1, &lvp)) != 0 && ret == 0)
ret = t_ret;
if ((t_ret = __lock_id_free(dbenv, lockid)) != 0 && ret == 0)
ret = t_ret;
err1: if (rectype == DB___txn_regop)
__os_free(dbenv, txn_args);
else
__os_free(dbenv, prep_args);
if (lc.nalloc != 0)
__os_free(dbenv, lc.array);
if (logc != NULL && (t_ret = __log_c_close(logc)) != 0 && ret == 0)
ret = t_ret;
if (txninfo != NULL)
__db_txnlist_end(dbenv, txninfo);
if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL)
__os_ufree(dbenv, data_dbt.data);
if (ret == 0)
rep->stat.st_txns_applied++;
return (ret);
}
static int
__rep_collect_txn(dbenv, lsnp, lc)
DB_ENV *dbenv;
DB_LSN *lsnp;
LSN_COLLECTION *lc;
{
__txn_child_args *argp;
DB_LOGC *logc;
DB_LSN c_lsn;
DBT data;
u_int32_t rectype;
int nalloc, ret, t_ret;
memset(&data, 0, sizeof(data));
F_SET(&data, DB_DBT_REALLOC);
if ((ret = __log_cursor(dbenv, &logc)) != 0)
return (ret);
while (!IS_ZERO_LSN(*lsnp) &&
(ret = __log_c_get(logc, lsnp, &data, DB_SET)) == 0) {
memcpy(&rectype, data.data, sizeof(rectype));
if (rectype == DB___txn_child) {
if ((ret = __txn_child_read(dbenv,
data.data, &argp)) != 0)
goto err;
c_lsn = argp->c_lsn;
*lsnp = argp->prev_lsn;
__os_free(dbenv, argp);
ret = __rep_collect_txn(dbenv, &c_lsn, lc);
} else {
if (lc->nalloc < lc->nlsns + 1) {
nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2;
if ((ret = __os_realloc(dbenv,
nalloc * sizeof(DB_LSN), &lc->array)) != 0)
goto err;
lc->nalloc = nalloc;
}
lc->array[lc->nlsns++] = *lsnp;
memcpy(lsnp, (u_int8_t *)data.data +
sizeof(u_int32_t) + sizeof(u_int32_t),
sizeof(DB_LSN));
}
if (ret != 0)
goto err;
}
if (ret != 0)
__db_err(dbenv, "collect failed at: [%lu][%lu]",
(u_long)lsnp->file, (u_long)lsnp->offset);
err: if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
ret = t_ret;
if (data.data != NULL)
__os_ufree(dbenv, data.data);
return (ret);
}
static int
__rep_lsn_cmp(lsn1, lsn2)
const void *lsn1, *lsn2;
{
return (log_compare((DB_LSN *)lsn1, (DB_LSN *)lsn2));
}
static int
__rep_newfile(dbenv, rc, lsnp)
DB_ENV *dbenv;
REP_CONTROL *rc;
DB_LSN *lsnp;
{
DB_LOG *dblp;
LOG *lp;
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
if (rc->lsn.file + 1 > lp->lsn.file)
return (__log_newfile(dblp, lsnp));
else {
*lsnp = lp->lsn;
return (0);
}
}
int
__rep_tally(dbenv, rep, eid, countp, egen, vtoff)
DB_ENV *dbenv;
REP *rep;
int eid, *countp;
u_int32_t egen, vtoff;
{
REP_VTALLY *tally, *vtp;
int i;
#ifndef DIAGNOSTIC
COMPQUIET(rep, NULL);
#endif
tally = R_ADDR((REGINFO *)dbenv->reginfo, vtoff);
i = 0;
vtp = &tally[i];
while (i < *countp) {
if (vtp->eid == eid) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv,
"Tally found[%d] (%d, %lu), this vote (%d, %lu)",
i, vtp->eid, (u_long)vtp->egen,
eid, (u_long)egen);
#endif
if (vtp->egen >= egen)
return (1);
else {
vtp->egen = egen;
return (0);
}
}
i++;
vtp = &tally[i];
}
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) {
if (vtoff == rep->tally_off)
__db_err(dbenv, "Tallying VOTE1[%d] (%d, %lu)",
i, eid, (u_long)egen);
else
__db_err(dbenv, "Tallying VOTE2[%d] (%d, %lu)",
i, eid, (u_long)egen);
}
#endif
vtp->eid = eid;
vtp->egen = egen;
(*countp)++;
return (0);
}
void
__rep_cmp_vote(dbenv, rep, eidp, lsnp, priority, gen, tiebreaker)
DB_ENV *dbenv;
REP *rep;
int *eidp;
DB_LSN *lsnp;
int priority, gen, tiebreaker;
{
int cmp;
#ifndef DIAGNOSTIC
COMPQUIET(dbenv, NULL);
#endif
cmp = log_compare(lsnp, &rep->w_lsn);
if (rep->sites > 1 && priority != 0) {
if (cmp > 0 ||
(cmp == 0 && (priority > rep->w_priority ||
(priority == rep->w_priority &&
(tiebreaker > rep->w_tiebreaker))))) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "Accepting new vote");
#endif
rep->winner = *eidp;
rep->w_priority = priority;
rep->w_lsn = *lsnp;
rep->w_gen = gen;
rep->w_tiebreaker = tiebreaker;
}
} else if (rep->sites == 1) {
if (priority != 0) {
rep->winner = *eidp;
rep->w_priority = priority;
rep->w_gen = gen;
rep->w_lsn = *lsnp;
rep->w_tiebreaker = tiebreaker;
} else {
rep->winner = DB_EID_INVALID;
rep->w_priority = 0;
rep->w_gen = 0;
ZERO_LSN(rep->w_lsn);
rep->w_tiebreaker = 0;
}
}
return;
}
int
__rep_cmp_vote2(dbenv, rep, eid, egen)
DB_ENV *dbenv;
REP *rep;
int eid;
u_int32_t egen;
{
int i;
REP_VTALLY *tally, *vtp;
tally = R_ADDR((REGINFO *)dbenv->reginfo, rep->tally_off);
i = 0;
vtp = &tally[i];
for (i = 0; i < rep->sites; i++) {
vtp = &tally[i];
if (vtp->eid == eid && vtp->egen == egen) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv,
"Found matching vote1 (%d, %lu), at %d of %d",
eid, (u_long)egen, i, rep->sites);
#endif
return (0);
}
}
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "Did not find vote1 for eid %d, egen %lu",
eid, (u_long)egen);
#endif
return (1);
}
static int
__rep_dorecovery(dbenv, lsnp, trunclsnp)
DB_ENV *dbenv;
DB_LSN *lsnp, *trunclsnp;
{
DB_LSN lsn;
DBT mylog;
DB_LOGC *logc;
int ret, t_ret, undo;
u_int32_t rectype;
__txn_regop_args *txnrec;
if ((ret = __log_cursor(dbenv, &logc)) != 0)
return (ret);
memset(&mylog, 0, sizeof(mylog));
undo = 0;
while (undo == 0 &&
(ret = __log_c_get(logc, &lsn, &mylog, DB_PREV)) == 0 &&
log_compare(&lsn, lsnp) > 0) {
memcpy(&rectype, mylog.data, sizeof(rectype));
if (rectype == DB___txn_regop) {
if ((ret =
__txn_regop_read(dbenv, mylog.data, &txnrec)) != 0)
goto err;
if (txnrec->opcode != TXN_ABORT) {
undo = 1;
}
__os_free(dbenv, txnrec);
}
}
ret = __db_apprec(dbenv, lsnp, trunclsnp, undo, 0);
err: if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
static int
__rep_verify_match(dbenv, rp, savetime)
DB_ENV *dbenv;
REP_CONTROL *rp;
time_t savetime;
{
DB_LOG *dblp;
DB_LSN ckplsn, trunclsn;
DB_REP *db_rep;
LOG *lp;
REP *rep;
int done, master, ret, wait_cnt;
u_int32_t unused;
dblp = dbenv->lg_handle;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
lp = dblp->reginfo.primary;
ret = 0;
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
done = savetime != rep->timestamp;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
if (done) {
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
return (0);
}
ZERO_LSN(lp->verify_lsn);
R_LOCK(dbenv, &dblp->reginfo);
done = rp->lsn.file == lp->lsn.file &&
rp->lsn.offset + lp->len == lp->lsn.offset;
if (done) {
lp->ready_lsn = lp->lsn;
ZERO_LSN(lp->waiting_lsn);
}
R_UNLOCK(dbenv, &dblp->reginfo);
if (done)
goto finish;
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
if (F_ISSET(rep, REP_F_LOGSONLY)) {
INIT_LSN(ckplsn);
if ((ret = __log_flush(dbenv, &rp->lsn)) != 0 || (ret =
__log_vtruncate(dbenv, &rp->lsn, &ckplsn, &trunclsn)) != 0)
return (ret);
} else {
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
if (F_ISSET(rep, REP_F_READY) || rep->in_recovery != 0) {
rep->stat.st_msgs_recover++;
goto errunlock;
}
F_SET(rep, REP_F_READY);
for (wait_cnt = 0; rep->op_cnt != 0;) {
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
__os_sleep(dbenv, 1, 0);
#ifdef DIAGNOSTIC
if (++wait_cnt % 60 == 0)
__db_err(dbenv,
"Waiting for txn_cnt to run replication recovery for %d minutes",
wait_cnt / 60);
#endif
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
}
rep->in_recovery = 1;
for (wait_cnt = 0; rep->handle_cnt != 0 || rep->msg_th > 1;) {
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
__os_sleep(dbenv, 1, 0);
#ifdef DIAGNOSTIC
if (++wait_cnt % 60 == 0)
__db_err(dbenv,
"Waiting for handle/thread count to run replication recovery for %d minutes",
wait_cnt / 60);
#endif
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
}
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
if ((ret = __rep_dorecovery(dbenv, &rp->lsn, &trunclsn)) != 0) {
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
rep->in_recovery = 0;
F_CLR(rep, REP_F_READY);
goto errunlock;
}
}
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
lp->ready_lsn = trunclsn;
finish: ZERO_LSN(lp->waiting_lsn);
lp->wait_recs = 0;
lp->rcvd_recs = 0;
ZERO_LSN(lp->verify_lsn);
F_SET(db_rep->rep_db, DB_AM_RECOVER);
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
ret = __db_truncate(db_rep->rep_db, NULL, &unused, 0);
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
F_CLR(db_rep->rep_db, DB_AM_RECOVER);
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
rep->stat.st_log_queued = 0;
rep->in_recovery = 0;
F_CLR(rep, REP_F_NOARCHIVE | REP_F_READY | REP_F_RECOVER);
if (ret != 0)
goto errunlock;
master = rep->master_id;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
if (master == DB_EID_INVALID)
ret = 0;
else
(void)__rep_send_message(dbenv,
master, REP_ALL_REQ, &rp->lsn, NULL, 0);
if (0) {
errunlock:
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
}
return (ret);
}