#include "db_config.h"
#define __INCLUDE_NETWORKING 1
#include "db_int.h"
#include "dbinc/mp.h"
struct sending_msg {
REPMGR_IOVECS iovecs;
u_int8_t type;
u_int32_t control_size_buf, rec_size_buf;
REPMGR_FLAT *fmsg;
};
static int __repmgr_close_connection __P((ENV *, REPMGR_CONNECTION *));
static int __repmgr_destroy_connection __P((ENV *, REPMGR_CONNECTION *));
static void setup_sending_msg
__P((struct sending_msg *, u_int, const DBT *, const DBT *));
static int __repmgr_send_internal
__P((ENV *, REPMGR_CONNECTION *, struct sending_msg *, int));
static int enqueue_msg
__P((ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
static int flatten __P((ENV *, struct sending_msg *));
static REPMGR_SITE *__repmgr_available_site __P((ENV *, int));
int
__repmgr_send(dbenv, control, rec, lsnp, eid, flags)
DB_ENV *dbenv;
const DBT *control, *rec;
const DB_LSN *lsnp;
int eid;
u_int32_t flags;
{
DB_REP *db_rep;
ENV *env;
REPMGR_CONNECTION *conn;
REPMGR_SITE *site;
u_int available, nclients, needed, npeers_sent, nsites_sent;
int ret, t_ret;
env = dbenv->env;
db_rep = env->rep_handle;
LOCK_MUTEX(db_rep->mutex);
if (eid == DB_EID_BROADCAST) {
if ((ret = __repmgr_send_broadcast(env, REPMGR_REP_MESSAGE,
control, rec, &nsites_sent, &npeers_sent)) != 0)
goto out;
} else {
if ((flags & (DB_REP_ANYWHERE | DB_REP_REREQUEST)) ==
DB_REP_ANYWHERE &&
IS_VALID_EID(db_rep->peer) &&
(site = __repmgr_available_site(env, db_rep->peer)) !=
NULL) {
RPRINT(env, DB_VERB_REPMGR_MISC,
(env, "sending request to peer"));
} else if ((site = __repmgr_available_site(env, eid)) ==
NULL) {
RPRINT(env, DB_VERB_REPMGR_MISC, (env,
"ignoring message sent to unavailable site"));
ret = DB_REP_UNAVAIL;
goto out;
}
conn = site->ref.conn;
if ((ret = __repmgr_send_one(env, conn, REPMGR_REP_MESSAGE,
control, rec, TRUE)) == DB_REP_UNAVAIL &&
(t_ret = __repmgr_bust_connection(env, conn)) != 0)
ret = t_ret;
if (ret != 0)
goto out;
nsites_sent = 1;
npeers_sent = site->priority > 0 ? 1 : 0;
}
if (LF_ISSET(DB_REP_PERMANENT)) {
nclients = __repmgr_get_nsites(db_rep) - 1;
switch (db_rep->perm_policy) {
case DB_REPMGR_ACKS_NONE:
needed = 0;
COMPQUIET(available, 0);
break;
case DB_REPMGR_ACKS_ONE:
needed = 1;
available = nsites_sent;
break;
case DB_REPMGR_ACKS_ALL:
needed = nclients;
available = nsites_sent;
break;
case DB_REPMGR_ACKS_ONE_PEER:
needed = 1;
available = npeers_sent;
break;
case DB_REPMGR_ACKS_ALL_PEERS:
needed = 1;
available = npeers_sent;
break;
case DB_REPMGR_ACKS_QUORUM:
if (nclients > 1 ||
FLD_ISSET(db_rep->region->config,
REP_C_2SITE_STRICT))
needed = nclients / 2;
else
needed = 1;
available = npeers_sent;
break;
default:
COMPQUIET(available, 0);
COMPQUIET(needed, 0);
(void)__db_unknown_path(env, "__repmgr_send");
break;
}
if (needed == 0)
goto out;
if (available < needed) {
ret = DB_REP_UNAVAIL;
goto out;
}
RPRINT(env, DB_VERB_REPMGR_MISC, (env,
"will await acknowledgement: need %u", needed));
ret = __repmgr_await_ack(env, lsnp);
}
out: UNLOCK_MUTEX(db_rep->mutex);
if (ret != 0 && LF_ISSET(DB_REP_PERMANENT)) {
STAT(db_rep->region->mstat.st_perm_failed++);
DB_EVENT(env, DB_EVENT_REP_PERM_FAILED, NULL);
}
return (ret);
}
static REPMGR_SITE *
__repmgr_available_site(env, eid)
ENV *env;
int eid;
{
DB_REP *db_rep;
REPMGR_SITE *site;
db_rep = env->rep_handle;
site = SITE_FROM_EID(eid);
if (site->state != SITE_CONNECTED)
return (NULL);
if (site->ref.conn->state == CONN_READY)
return (site);
return (NULL);
}
int
__repmgr_send_broadcast(env, type, control, rec, nsitesp, npeersp)
ENV *env;
u_int type;
const DBT *control, *rec;
u_int *nsitesp, *npeersp;
{
DB_REP *db_rep;
struct sending_msg msg;
REPMGR_CONNECTION *conn;
REPMGR_SITE *site;
u_int nsites, npeers;
int ret;
static const u_int version_max_msg_type[] = {
0, REPMGR_MAX_V1_MSG_TYPE, REPMGR_MAX_V2_MSG_TYPE
};
db_rep = env->rep_handle;
__os_gettime(env, &db_rep->last_bcast, 1);
setup_sending_msg(&msg, type, control, rec);
nsites = npeers = 0;
TAILQ_FOREACH(conn, &db_rep->connections, entries) {
if (conn->state != CONN_READY)
continue;
DB_ASSERT(env, IS_VALID_EID(conn->eid) &&
conn->version > 0 &&
conn->version <= DB_REPMGR_VERSION);
if (type > version_max_msg_type[conn->version])
continue;
if ((ret = __repmgr_send_internal(env,
conn, &msg, FALSE)) == 0) {
site = SITE_FROM_EID(conn->eid);
nsites++;
if (site->priority > 0)
npeers++;
} else if (ret == DB_REP_UNAVAIL) {
if ((ret = __repmgr_bust_connection(env, conn)) != 0)
return (ret);
} else
return (ret);
}
*nsitesp = nsites;
*npeersp = npeers;
return (0);
}
int
__repmgr_send_one(env, conn, msg_type, control, rec, blockable)
ENV *env;
REPMGR_CONNECTION *conn;
u_int msg_type;
const DBT *control, *rec;
int blockable;
{
struct sending_msg msg;
setup_sending_msg(&msg, msg_type, control, rec);
return (__repmgr_send_internal(env, conn, &msg, blockable));
}
static int
__repmgr_send_internal(env, conn, msg, blockable)
ENV *env;
REPMGR_CONNECTION *conn;
struct sending_msg *msg;
int blockable;
{
DB_REP *db_rep;
REPMGR_IOVECS iovecs;
SITE_STRING_BUFFER buffer;
db_timeout_t drain_to;
int ret;
size_t nw;
size_t total_written;
db_rep = env->rep_handle;
DB_ASSERT(env,
conn->state != CONN_CONNECTING && conn->state != CONN_DEFUNCT);
if (!STAILQ_EMPTY(&conn->outbound_queue)) {
RPRINT(env, DB_VERB_REPMGR_MISC,
(env, "msg to %s to be queued",
__repmgr_format_eid_loc(env->rep_handle,
conn->eid, buffer)));
if (conn->out_queue_length >= OUT_QUEUE_LIMIT &&
blockable && conn->state != CONN_CONGESTED) {
RPRINT(env, DB_VERB_REPMGR_MISC, (env,
"block msg thread, await queue space"));
if ((drain_to = db_rep->ack_timeout) == 0)
drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
RPRINT(env, DB_VERB_REPMGR_MISC,
(env, "will await drain"));
conn->blockers++;
ret = __repmgr_await_drain(env,
conn, drain_to * OUT_QUEUE_LIMIT);
conn->blockers--;
RPRINT(env, DB_VERB_REPMGR_MISC, (env,
"drain returned %d (%d,%d)", ret,
db_rep->finished, conn->out_queue_length));
if (db_rep->finished)
return (DB_TIMEOUT);
if (ret != 0)
return (ret);
if (STAILQ_EMPTY(&conn->outbound_queue))
goto empty;
}
if (conn->out_queue_length < OUT_QUEUE_LIMIT)
return (enqueue_msg(env, conn, msg, 0));
else {
RPRINT(env, DB_VERB_REPMGR_MISC,
(env, "queue limit exceeded"));
STAT(env->rep_handle->
region->mstat.st_msgs_dropped++);
return (blockable ? DB_TIMEOUT : 0);
}
}
empty:
memcpy(&iovecs, &msg->iovecs, sizeof(iovecs));
total_written = 0;
while ((ret = __repmgr_writev(conn->fd, &iovecs.vectors[iovecs.offset],
iovecs.count-iovecs.offset, &nw)) == 0) {
total_written += nw;
if (__repmgr_update_consumed(&iovecs, nw))
return (0);
}
if (ret != WOULDBLOCK) {
__db_err(env, ret, "socket writing failure");
return (DB_REP_UNAVAIL);
}
RPRINT(env, DB_VERB_REPMGR_MISC, (env, "wrote only %lu bytes to %s",
(u_long)total_written,
__repmgr_format_eid_loc(env->rep_handle, conn->eid, buffer)));
if ((ret = enqueue_msg(env, conn, msg, total_written)) != 0)
return (ret);
STAT(env->rep_handle->region->mstat.st_msgs_queued++);
#ifdef DB_WIN32
if (WSAEventSelect(conn->fd, conn->event_object,
FD_READ|FD_WRITE|FD_CLOSE) == SOCKET_ERROR) {
ret = net_errno;
__db_err(env, ret, "can't add FD_WRITE event bit");
return (ret);
}
#endif
return (__repmgr_wake_main_thread(env));
}
int
__repmgr_is_permanent(env, lsnp)
ENV *env;
const DB_LSN *lsnp;
{
DB_REP *db_rep;
REPMGR_SITE *site;
u_int eid, nsites, npeers;
int is_perm, has_missing_peer;
db_rep = env->rep_handle;
if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE)
return (TRUE);
nsites = npeers = 0;
has_missing_peer = FALSE;
for (eid = 0; eid < db_rep->site_cnt; eid++) {
site = SITE_FROM_EID(eid);
if (!F_ISSET(site, SITE_HAS_PRIO)) {
has_missing_peer = TRUE;
continue;
}
if (log_compare(&site->max_ack, lsnp) >= 0) {
nsites++;
if (site->priority > 0)
npeers++;
} else {
if (site->priority > 0)
has_missing_peer = TRUE;
}
}
switch (db_rep->perm_policy) {
case DB_REPMGR_ACKS_ONE:
is_perm = (nsites >= 1);
break;
case DB_REPMGR_ACKS_ONE_PEER:
is_perm = (npeers >= 1);
break;
case DB_REPMGR_ACKS_QUORUM:
if (__repmgr_get_nsites(db_rep) == 2 &&
!FLD_ISSET(db_rep->region->config, REP_C_2SITE_STRICT)) {
is_perm = (npeers >= 1);
} else
is_perm = (npeers >= (__repmgr_get_nsites(db_rep)-1)/2);
break;
case DB_REPMGR_ACKS_ALL:
is_perm = (nsites >= __repmgr_get_nsites(db_rep) - 1);
break;
case DB_REPMGR_ACKS_ALL_PEERS:
if (db_rep->site_cnt < __repmgr_get_nsites(db_rep) - 1) {
has_missing_peer = TRUE;
}
is_perm = !has_missing_peer;
break;
default:
is_perm = FALSE;
(void)__db_unknown_path(env, "__repmgr_is_permanent");
}
return (is_perm);
}
int
__repmgr_bust_connection(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
DB_REP *db_rep;
int connecting, ret, eid;
db_rep = env->rep_handle;
ret = 0;
eid = conn->eid;
connecting = (conn->state == CONN_CONNECTING);
DISABLE_CONNECTION(conn);
if (IS_VALID_EID(eid)) {
if ((ret = __repmgr_schedule_connection_attempt(
env, (u_int)eid, FALSE)) != 0)
return (ret);
if (!connecting && eid == db_rep->master_eid) {
(void)__memp_set_config(
env->dbenv, DB_MEMP_SYNC_INTERRUPT, 1);
if ((ret = __repmgr_init_election(
env, ELECT_FAILURE_ELECTION)) != 0)
return (ret);
}
} else {
ret = __repmgr_wake_main_thread(env);
}
return (ret);
}
int
__repmgr_cleanup_connection(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
DB_REP *db_rep;
int ret;
db_rep = env->rep_handle;
if ((ret = __repmgr_close_connection(env, conn)) != 0)
goto out;
if (conn->blockers > 0) {
ret = __repmgr_signal(&conn->drained);
goto out;
}
TAILQ_REMOVE(&db_rep->connections, conn, entries);
ret = __repmgr_destroy_connection(env, conn);
out:
return (ret);
}
static int
__repmgr_close_connection(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
int ret;
DB_ASSERT(env,
conn->state == CONN_DEFUNCT || env->rep_handle->finished);
ret = 0;
if (conn->fd != INVALID_SOCKET) {
ret = closesocket(conn->fd);
conn->fd = INVALID_SOCKET;
if (ret == SOCKET_ERROR) {
ret = net_errno;
__db_err(env, ret, "closing socket");
}
#ifdef DB_WIN32
if (!WSACloseEvent(conn->event_object) && ret == 0)
ret = net_errno;
#endif
}
return (ret);
}
static int
__repmgr_destroy_connection(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
QUEUED_OUTPUT *out;
REPMGR_FLAT *msg;
DBT *dbt;
int ret;
if (conn->reading_phase == DATA_PHASE) {
if (conn->msg_type == REPMGR_REP_MESSAGE)
__os_free(env, conn->input.rep_message);
else {
dbt = &conn->input.repmgr_msg.cntrl;
if (dbt->size > 0)
__os_free(env, dbt->data);
dbt = &conn->input.repmgr_msg.rec;
if (dbt->size > 0)
__os_free(env, dbt->data);
}
}
while (!STAILQ_EMPTY(&conn->outbound_queue)) {
out = STAILQ_FIRST(&conn->outbound_queue);
STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
msg = out->msg;
if (--msg->ref_count <= 0)
__os_free(env, msg);
__os_free(env, out);
}
ret = __repmgr_free_cond(&conn->drained);
__os_free(env, conn);
return (ret);
}
static int
enqueue_msg(env, conn, msg, offset)
ENV *env;
REPMGR_CONNECTION *conn;
struct sending_msg *msg;
size_t offset;
{
QUEUED_OUTPUT *q_element;
int ret;
if (msg->fmsg == NULL && ((ret = flatten(env, msg)) != 0))
return (ret);
if ((ret = __os_malloc(env, sizeof(QUEUED_OUTPUT), &q_element)) != 0)
return (ret);
q_element->msg = msg->fmsg;
msg->fmsg->ref_count++;
q_element->offset = offset;
STAILQ_INSERT_TAIL(&conn->outbound_queue, q_element, entries);
conn->out_queue_length++;
return (0);
}
static void
setup_sending_msg(msg, type, control, rec)
struct sending_msg *msg;
u_int type;
const DBT *control, *rec;
{
u_int32_t control_size, rec_size;
__repmgr_iovec_init(&msg->iovecs);
msg->type = type;
__repmgr_add_buffer(&msg->iovecs, &msg->type, sizeof(msg->type));
control_size = control == NULL ? 0 : control->size;
msg->control_size_buf = htonl(control_size);
__repmgr_add_buffer(&msg->iovecs,
&msg->control_size_buf, sizeof(msg->control_size_buf));
rec_size = rec == NULL ? 0 : rec->size;
msg->rec_size_buf = htonl(rec_size);
__repmgr_add_buffer(
&msg->iovecs, &msg->rec_size_buf, sizeof(msg->rec_size_buf));
if (control->size > 0)
__repmgr_add_dbt(&msg->iovecs, control);
if (rec_size > 0)
__repmgr_add_dbt(&msg->iovecs, rec);
msg->fmsg = NULL;
}
static int
flatten(env, msg)
ENV *env;
struct sending_msg *msg;
{
u_int8_t *p;
size_t msg_size;
int i, ret;
DB_ASSERT(env, msg->fmsg == NULL);
msg_size = msg->iovecs.total_bytes;
if ((ret = __os_malloc(env, sizeof(*msg->fmsg) + msg_size,
&msg->fmsg)) != 0)
return (ret);
msg->fmsg->length = msg_size;
msg->fmsg->ref_count = 0;
p = &msg->fmsg->data[0];
for (i = 0; i < msg->iovecs.count; i++) {
memcpy(p, msg->iovecs.vectors[i].iov_base,
msg->iovecs.vectors[i].iov_len);
p = &p[msg->iovecs.vectors[i].iov_len];
}
__repmgr_iovec_init(&msg->iovecs);
__repmgr_add_buffer(&msg->iovecs, &msg->fmsg->data[0], msg_size);
return (0);
}
int
__repmgr_find_site(env, host, port)
ENV *env;
const char *host;
u_int port;
{
DB_REP *db_rep;
REPMGR_SITE *site;
u_int i;
db_rep = env->rep_handle;
for (i = 0; i < db_rep->site_cnt; i++) {
site = &db_rep->sites[i];
if (strcmp(site->net_addr.host, host) == 0 &&
site->net_addr.port == port)
return ((int)i);
}
return (-1);
}
int
__repmgr_pack_netaddr(env, host, port, list, addr)
ENV *env;
const char *host;
u_int port;
ADDRINFO *list;
repmgr_netaddr_t *addr;
{
int ret;
DB_ASSERT(env, host != NULL);
if ((ret = __os_strdup(env, host, &addr->host)) != 0)
return (ret);
addr->port = (u_int16_t)port;
addr->address_list = list;
addr->current = NULL;
return (0);
}
int
__repmgr_getaddr(env, host, port, flags, result)
ENV *env;
const char *host;
u_int port;
int flags;
ADDRINFO **result;
{
ADDRINFO *answer, hints;
char buffer[10];
#ifdef DB_WIN32
int ret;
#endif
if (port > UINT16_MAX) {
__db_errx(env, "port %u larger than max port %u",
port, UINT16_MAX);
return (EINVAL);
}
#ifdef DB_WIN32
if (!env->rep_handle->wsa_inited &&
(ret = __repmgr_wsa_init(env)) != 0)
return (ret);
#endif
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = flags;
(void)snprintf(buffer, sizeof(buffer), "%u", port);
if (__os_getaddrinfo(env, host, port, buffer, &hints, &answer) != 0)
return (DB_REP_UNAVAIL);
*result = answer;
return (0);
}
int
__repmgr_add_site(env, host, port, newsitep)
ENV *env;
const char *host;
u_int port;
REPMGR_SITE **newsitep;
{
ADDRINFO *address_list;
DB_REP *db_rep;
repmgr_netaddr_t addr;
REPMGR_SITE *site;
int ret, eid;
ret = 0;
db_rep = env->rep_handle;
if (IS_VALID_EID(eid = __repmgr_find_site(env, host, port))) {
site = SITE_FROM_EID(eid);
ret = EEXIST;
goto out;
}
if ((ret = __repmgr_getaddr(
env, host, port, 0, &address_list)) == DB_REP_UNAVAIL) {
address_list = NULL;
} else if (ret != 0)
return (ret);
if ((ret = __repmgr_pack_netaddr(
env, host, port, address_list, &addr)) != 0) {
__os_freeaddrinfo(env, address_list);
return (ret);
}
if ((ret = __repmgr_new_site(env, &site, &addr, SITE_IDLE)) != 0) {
__repmgr_cleanup_netaddr(env, &addr);
return (ret);
}
if (db_rep->selector != NULL &&
(ret = __repmgr_schedule_connection_attempt(
env, (u_int)EID_FROM_SITE(site), TRUE)) != 0)
return (ret);
out:
if (newsitep != NULL)
*newsitep = site;
return (ret);
}
int
__repmgr_net_create(db_rep)
DB_REP *db_rep;
{
db_rep->listen_fd = INVALID_SOCKET;
db_rep->master_eid = DB_EID_INVALID;
TAILQ_INIT(&db_rep->connections);
TAILQ_INIT(&db_rep->retries);
return (0);
}
int
__repmgr_listen(env)
ENV *env;
{
ADDRINFO *ai;
DB_REP *db_rep;
char *why;
int sockopt, ret;
socket_t s;
db_rep = env->rep_handle;
s = INVALID_SOCKET;
ai = ADDR_LIST_FIRST(&db_rep->my_addr);
COMPQUIET(why, "");
DB_ASSERT(env, ai != NULL);
for (; ai != NULL; ai = ADDR_LIST_NEXT(&db_rep->my_addr)) {
if ((s = socket(ai->ai_family,
ai->ai_socktype, ai->ai_protocol)) == INVALID_SOCKET) {
why = "can't create listen socket";
continue;
}
sockopt = 1;
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (sockopt_t)&sockopt,
sizeof(sockopt)) != 0) {
why = "can't set REUSEADDR socket option";
break;
}
if (bind(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) {
why = "can't bind socket to listening address";
(void)closesocket(s);
s = INVALID_SOCKET;
continue;
}
if (listen(s, 5) != 0) {
why = "listen()";
break;
}
if ((ret = __repmgr_set_nonblocking(s)) != 0) {
__db_err(env, ret, "can't unblock listen socket");
goto clean;
}
db_rep->listen_fd = s;
return (0);
}
ret = net_errno;
__db_err(env, ret, why);
clean: if (s != INVALID_SOCKET)
(void)closesocket(s);
return (ret);
}
int
__repmgr_net_close(env)
ENV *env;
{
DB_REP *db_rep;
REPMGR_CONNECTION *conn;
#ifndef DB_WIN32
struct sigaction sigact;
#endif
int ret, t_ret;
db_rep = env->rep_handle;
if (db_rep->listen_fd == INVALID_SOCKET)
return (0);
ret = 0;
while (!TAILQ_EMPTY(&db_rep->connections)) {
conn = TAILQ_FIRST(&db_rep->connections);
if ((t_ret = __repmgr_close_connection(env, conn)) != 0 &&
ret == 0)
ret = t_ret;
TAILQ_REMOVE(&db_rep->connections, conn, entries);
if ((t_ret = __repmgr_destroy_connection(env, conn)) != 0 &&
ret == 0)
ret = t_ret;
}
if (closesocket(db_rep->listen_fd) == SOCKET_ERROR && ret == 0)
ret = net_errno;
#ifdef DB_WIN32
if (WSACleanup() == SOCKET_ERROR && ret == 0)
ret = net_errno;
db_rep->wsa_inited = FALSE;
#else
if (db_rep->chg_sig_handler) {
memset(&sigact, 0, sizeof(sigact));
sigact.sa_handler = SIG_DFL;
if (sigaction(SIGPIPE, &sigact, NULL) == -1 && ret == 0)
ret = errno;
}
#endif
db_rep->listen_fd = INVALID_SOCKET;
return (ret);
}
void
__repmgr_net_destroy(env, db_rep)
ENV *env;
DB_REP *db_rep;
{
REPMGR_CONNECTION *conn;
REPMGR_RETRY *retry;
REPMGR_SITE *site;
u_int i;
__repmgr_cleanup_netaddr(env, &db_rep->my_addr);
if (db_rep->sites == NULL)
return;
while (!TAILQ_EMPTY(&db_rep->retries)) {
retry = TAILQ_FIRST(&db_rep->retries);
TAILQ_REMOVE(&db_rep->retries, retry, entries);
__os_free(env, retry);
}
while (!TAILQ_EMPTY(&db_rep->connections)) {
conn = TAILQ_FIRST(&db_rep->connections);
(void)__repmgr_destroy_connection(env, conn);
}
for (i = 0; i < db_rep->site_cnt; i++) {
site = &db_rep->sites[i];
__repmgr_cleanup_netaddr(env, &site->net_addr);
}
__os_free(env, db_rep->sites);
db_rep->sites = NULL;
}