#include "db_config.h"
#define __INCLUDE_NETWORKING 1
#include "db_int.h"
#define DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t) (((t) + (US_PER_MS - 1)) / US_PER_MS)
typedef struct __ack_waiter {
HANDLE event;
const DB_LSN *lsnp;
struct __ack_waiter *next_free;
} ACK_WAITER;
#define WAITER_SLOT_IN_USE(w) ((w)->lsnp != NULL)
struct __ack_waiters_table {
struct __ack_waiter *array;
int size;
int next_avail;
struct __ack_waiter *first_free;
};
static int allocate_wait_slot __P((ENV *, ACK_WAITER **));
static void free_wait_slot __P((ENV *, ACK_WAITER *));
static int handle_completion __P((ENV *, REPMGR_CONNECTION *));
static int finish_connecting __P((ENV *, REPMGR_CONNECTION *,
LPWSANETWORKEVENTS));
int
__repmgr_thread_start(env, runnable)
ENV *env;
REPMGR_RUNNABLE *runnable;
{
HANDLE thread_id;
runnable->finished = FALSE;
thread_id = CreateThread(NULL, 0,
(LPTHREAD_START_ROUTINE)runnable->run, env, 0, NULL);
if (thread_id == NULL)
return (GetLastError());
runnable->thread_id = thread_id;
return (0);
}
int
__repmgr_thread_join(thread)
REPMGR_RUNNABLE *thread;
{
if (WaitForSingleObject(thread->thread_id, INFINITE) == WAIT_OBJECT_0)
return (0);
return (GetLastError());
}
int
__repmgr_set_nonblocking(s)
SOCKET s;
{
int ret;
u_long arg;
arg = 1;
if ((ret = ioctlsocket(s, FIONBIO, &arg)) == SOCKET_ERROR)
return (WSAGetLastError());
return (0);
}
int
__repmgr_wake_waiting_senders(env)
ENV *env;
{
ACK_WAITER *slot;
DB_REP *db_rep;
int i, ret;
ret = 0;
db_rep = env->rep_handle;
for (i=0; i<db_rep->waiters->next_avail; i++) {
slot = &db_rep->waiters->array[i];
if (!WAITER_SLOT_IN_USE(slot))
continue;
if (__repmgr_is_permanent(env, slot->lsnp))
if (!SetEvent(slot->event) && ret == 0)
ret = GetLastError();
}
return (ret);
}
int
__repmgr_await_ack(env, lsnp)
ENV *env;
const DB_LSN *lsnp;
{
ACK_WAITER *me;
DB_REP *db_rep;
DWORD ret, timeout;
db_rep = env->rep_handle;
if ((ret = allocate_wait_slot(env, &me)) != 0)
goto err;
timeout = db_rep->ack_timeout > 0 ?
DB_TIMEOUT_TO_WINDOWS_TIMEOUT(db_rep->ack_timeout) : INFINITE;
me->lsnp = lsnp;
if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
FALSE)) == WAIT_FAILED) {
ret = GetLastError();
} else if (ret == WAIT_TIMEOUT)
ret = DB_REP_UNAVAIL;
else
DB_ASSERT(env, ret == WAIT_OBJECT_0);
LOCK_MUTEX(db_rep->mutex);
free_wait_slot(env, me);
err:
return (ret);
}
static int
allocate_wait_slot(env, resultp)
ENV *env;
ACK_WAITER **resultp;
{
ACK_WAITER *w;
ACK_WAITERS_TABLE *table;
DB_REP *db_rep;
int ret;
db_rep = env->rep_handle;
table = db_rep->waiters;
if (table->first_free == NULL) {
if (table->next_avail >= table->size) {
table->size *= 2;
w = table->array;
if ((ret = __os_realloc(env, table->size * sizeof(*w),
&w)) != 0)
return (ret);
table->array = w;
}
w = &table->array[table->next_avail++];
if ((w->event = CreateEvent(NULL, FALSE, FALSE, NULL)) ==
NULL) {
--table->next_avail;
return (GetLastError());
}
} else {
w = table->first_free;
table->first_free = w->next_free;
}
*resultp = w;
return (0);
}
static void
free_wait_slot(env, slot)
ENV *env;
ACK_WAITER *slot;
{
DB_REP *db_rep;
db_rep = env->rep_handle;
slot->lsnp = NULL;
slot->next_free = db_rep->waiters->first_free;
db_rep->waiters->first_free = slot;
}
int
__repmgr_await_drain(env, conn, timeout)
ENV *env;
REPMGR_CONNECTION *conn;
db_timeout_t timeout;
{
DB_REP *db_rep;
db_timespec deadline, delta, now;
db_timeout_t t;
DWORD duration, ret;
int round_up;
db_rep = env->rep_handle;
__os_gettime(env, &deadline, 1);
TIMESPEC_ADD_DB_TIMEOUT(&deadline, timeout);
while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
if (!ResetEvent(conn->drained))
return (GetLastError());
__os_gettime(env, &now, 1);
if (timespeccmp(&now, &deadline, >=)) {
conn->state = CONN_CONGESTED;
return (0);
}
delta = deadline;
timespecsub(&delta, &now);
round_up = TRUE;
DB_TIMESPEC_TO_TIMEOUT(t, &delta, round_up);
duration = DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t);
ret = SignalObjectAndWait(db_rep->mutex,
conn->drained, duration, FALSE);
LOCK_MUTEX(db_rep->mutex);
if (ret == WAIT_FAILED)
return (GetLastError());
else if (ret == WAIT_TIMEOUT) {
conn->state = CONN_CONGESTED;
return (0);
} else
DB_ASSERT(env, ret == WAIT_OBJECT_0);
if (db_rep->finished)
return (0);
if (conn->state == CONN_DEFUNCT)
return (DB_REP_UNAVAIL);
}
return (0);
}
int
__repmgr_alloc_cond(c)
cond_var_t *c;
{
HANDLE event;
if ((event = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL)
return (GetLastError());
*c = event;
return (0);
}
int
__repmgr_free_cond(c)
cond_var_t *c;
{
if (CloseHandle(*c))
return (0);
return (GetLastError());
}
int
__repmgr_init_sync(env, db_rep)
ENV *env;
DB_REP *db_rep;
{
#define INITIAL_ALLOCATION 5
ACK_WAITERS_TABLE *table;
int ret;
db_rep->signaler = db_rep->queue_nonempty = db_rep->check_election =
db_rep->mutex = NULL;
table = NULL;
if ((db_rep->signaler = CreateEvent(NULL,
FALSE,
FALSE,
NULL)) == NULL)
goto geterr;
if ((db_rep->queue_nonempty = CreateEvent(NULL, TRUE, FALSE, NULL))
== NULL)
goto geterr;
if ((db_rep->check_election = CreateEvent(NULL, FALSE, FALSE, NULL))
== NULL)
goto geterr;
if ((db_rep->mutex = CreateMutex(NULL, FALSE, NULL)) == NULL)
goto geterr;
if ((ret = __os_calloc(env, 1, sizeof(ACK_WAITERS_TABLE), &table))
!= 0)
goto err;
if ((ret = __os_calloc(env, INITIAL_ALLOCATION, sizeof(ACK_WAITER),
&table->array)) != 0)
goto err;
table->size = INITIAL_ALLOCATION;
table->first_free = NULL;
table->next_avail = 0;
db_rep->waiters = table;
return (0);
geterr:
ret = GetLastError();
err:
if (db_rep->check_election != NULL)
CloseHandle(db_rep->check_election);
if (db_rep->queue_nonempty != NULL)
CloseHandle(db_rep->queue_nonempty);
if (db_rep->signaler != NULL)
CloseHandle(db_rep->signaler);
if (db_rep->mutex != NULL)
CloseHandle(db_rep->mutex);
if (table != NULL)
__os_free(env, table);
db_rep->waiters = NULL;
return (ret);
}
int
__repmgr_close_sync(env)
ENV *env;
{
DB_REP *db_rep;
int i, ret;
db_rep = env->rep_handle;
if (!(REPMGR_SYNC_INITED(db_rep)))
return (0);
ret = 0;
for (i = 0; i < db_rep->waiters->next_avail; i++) {
if (!CloseHandle(db_rep->waiters->array[i].event) && ret == 0)
ret = GetLastError();
}
__os_free(env, db_rep->waiters->array);
__os_free(env, db_rep->waiters);
if (!CloseHandle(db_rep->check_election) && ret == 0)
ret = GetLastError();
if (!CloseHandle(db_rep->queue_nonempty) && ret == 0)
ret = GetLastError();
if (!CloseHandle(db_rep->signaler) && ret == 0)
ret = GetLastError();
if (!CloseHandle(db_rep->mutex) && ret == 0)
ret = GetLastError();
db_rep->waiters = NULL;
return (ret);
}
int
__repmgr_net_init(env, db_rep)
ENV *env;
DB_REP *db_rep;
{
int ret;
if (!db_rep->wsa_inited && (ret = __repmgr_wsa_init(env)) != 0)
goto err;
if ((ret = __repmgr_listen(env)) == 0)
return (0);
if (WSACleanup() == SOCKET_ERROR) {
ret = net_errno;
__db_err(env, ret, "WSACleanup");
}
err: db_rep->listen_fd = INVALID_SOCKET;
return (ret);
}
int
__repmgr_wsa_init(env)
ENV *env;
{
DB_REP *db_rep;
WSADATA wsaData;
int ret;
db_rep = env->rep_handle;
if ((ret = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0) {
__db_err(env, ret, "unable to initialize Windows networking");
return (ret);
}
db_rep->wsa_inited = TRUE;
return (0);
}
int
__repmgr_lock_mutex(mutex)
mgr_mutex_t *mutex;
{
if (WaitForSingleObject(*mutex, INFINITE) == WAIT_OBJECT_0)
return (0);
return (GetLastError());
}
int
__repmgr_unlock_mutex(mutex)
mgr_mutex_t *mutex;
{
if (ReleaseMutex(*mutex))
return (0);
return (GetLastError());
}
int
__repmgr_signal(v)
cond_var_t *v;
{
return (SetEvent(*v) ? 0 : GetLastError());
}
int
__repmgr_wake_main_thread(env)
ENV *env;
{
if (!SetEvent(env->rep_handle->signaler))
return (GetLastError());
return (0);
}
int
__repmgr_writev(fd, iovec, buf_count, byte_count_p)
socket_t fd;
db_iovec_t *iovec;
int buf_count;
size_t *byte_count_p;
{
DWORD bytes;
if (WSASend(fd, iovec,
(DWORD)buf_count, &bytes, 0, NULL, NULL) == SOCKET_ERROR)
return (net_errno);
*byte_count_p = (size_t)bytes;
return (0);
}
int
__repmgr_readv(fd, iovec, buf_count, xfr_count_p)
socket_t fd;
db_iovec_t *iovec;
int buf_count;
size_t *xfr_count_p;
{
DWORD bytes, flags;
flags = 0;
if (WSARecv(fd, iovec,
(DWORD)buf_count, &bytes, &flags, NULL, NULL) == SOCKET_ERROR)
return (net_errno);
*xfr_count_p = (size_t)bytes;
return (0);
}
int
__repmgr_select_loop(env)
ENV *env;
{
DB_REP *db_rep;
DWORD nevents, ret;
DWORD select_timeout;
REPMGR_CONNECTION *conn, *next;
REPMGR_CONNECTION *connections[WSA_MAXIMUM_WAIT_EVENTS];
WSAEVENT events[WSA_MAXIMUM_WAIT_EVENTS];
db_timespec timeout;
WSAEVENT listen_event;
WSANETWORKEVENTS net_events;
int flow_control, i;
db_rep = env->rep_handle;
if ((listen_event = WSACreateEvent()) == WSA_INVALID_EVENT) {
__db_err(
env, net_errno, "can't create event for listen socket");
return (net_errno);
}
if (WSAEventSelect(db_rep->listen_fd, listen_event, FD_ACCEPT) ==
SOCKET_ERROR) {
ret = net_errno;
__db_err(env, ret, "can't enable event for listener");
goto out;
}
LOCK_MUTEX(db_rep->mutex);
if ((ret = __repmgr_first_try_connections(env)) != 0)
goto unlock;
flow_control = FALSE;
for (;;) {
events[0] = db_rep->signaler;
events[1] = listen_event;
nevents = 2;
for (conn = TAILQ_FIRST(&db_rep->connections);
conn != NULL;
conn = next) {
next = TAILQ_NEXT(conn, entries);
if (conn->state == CONN_DEFUNCT) {
if ((ret = __repmgr_cleanup_connection(env,
conn)) != 0)
goto unlock;
continue;
}
if (conn->state == CONN_CONNECTING ||
!STAILQ_EMPTY(&conn->outbound_queue) ||
(!flow_control || !IS_VALID_EID(conn->eid))) {
events[nevents] = conn->event_object;
connections[nevents++] = conn;
}
}
if (__repmgr_compute_timeout(env, &timeout))
select_timeout =
(DWORD)(timeout.tv_sec * MS_PER_SEC +
timeout.tv_nsec / NS_PER_MS);
else {
select_timeout = WSA_INFINITE;
}
UNLOCK_MUTEX(db_rep->mutex);
ret = WSAWaitForMultipleEvents(
nevents, events, FALSE, select_timeout, FALSE);
if (db_rep->finished) {
ret = 0;
goto out;
}
LOCK_MUTEX(db_rep->mutex);
if (ret >= WSA_WAIT_EVENT_0 &&
ret < WSA_WAIT_EVENT_0 + nevents) {
switch (i = ret - WSA_WAIT_EVENT_0) {
case 0:
break;
case 1:
if ((ret = WSAEnumNetworkEvents(
db_rep->listen_fd, listen_event,
&net_events)) == SOCKET_ERROR) {
ret = net_errno;
goto unlock;
}
DB_ASSERT(env,
net_events.lNetworkEvents & FD_ACCEPT);
if ((ret = net_events.iErrorCode[FD_ACCEPT_BIT])
!= 0)
goto unlock;
if ((ret = __repmgr_accept(env)) != 0)
goto unlock;
break;
default:
if (connections[i]->state != CONN_DEFUNCT &&
(ret = handle_completion(env,
connections[i])) != 0)
goto unlock;
break;
}
} else if (ret == WSA_WAIT_TIMEOUT) {
if ((ret = __repmgr_check_timeouts(env)) != 0)
goto unlock;
} else if (ret == WSA_WAIT_FAILED) {
ret = net_errno;
goto unlock;
}
}
unlock:
UNLOCK_MUTEX(db_rep->mutex);
out:
if (!CloseHandle(listen_event) && ret == 0)
ret = GetLastError();
return (ret);
}
static int
handle_completion(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
int ret;
WSANETWORKEVENTS events;
if ((ret = WSAEnumNetworkEvents(conn->fd, conn->event_object, &events))
== SOCKET_ERROR) {
__db_err(env, net_errno, "EnumNetworkEvents");
STAT(env->rep_handle->region->mstat.st_connection_drop++);
ret = DB_REP_UNAVAIL;
goto err;
}
if (conn->state == CONN_CONNECTING) {
if ((ret = finish_connecting(env, conn, &events)) != 0)
goto err;
} else {
if (events.lNetworkEvents & FD_CLOSE) {
__db_err(env,
events.iErrorCode[FD_CLOSE_BIT],
"connection closed");
STAT(env->rep_handle->
region->mstat.st_connection_drop++);
ret = DB_REP_UNAVAIL;
goto err;
}
if (events.lNetworkEvents & FD_WRITE) {
if (events.iErrorCode[FD_WRITE_BIT] != 0) {
__db_err(env,
events.iErrorCode[FD_WRITE_BIT],
"error writing");
STAT(env->rep_handle->
region->mstat.st_connection_drop++);
ret = DB_REP_UNAVAIL;
goto err;
} else if ((ret =
__repmgr_write_some(env, conn)) != 0)
goto err;
}
if (events.lNetworkEvents & FD_READ) {
if (events.iErrorCode[FD_READ_BIT] != 0) {
__db_err(env,
events.iErrorCode[FD_READ_BIT],
"error reading");
STAT(env->rep_handle->
region->mstat.st_connection_drop++);
ret = DB_REP_UNAVAIL;
goto err;
} else if ((ret =
__repmgr_read_from_site(env, conn)) != 0)
goto err;
}
}
err:
if (ret == DB_REP_UNAVAIL)
ret = __repmgr_bust_connection(env, conn);
return (ret);
}
static int
finish_connecting(env, conn, events)
ENV *env;
REPMGR_CONNECTION *conn;
LPWSANETWORKEVENTS events;
{
DB_REP *db_rep;
u_int eid;
int ret;
if (!(events->lNetworkEvents & FD_CONNECT))
return (0);
conn->state = CONN_CONNECTED;
if ((ret = events->iErrorCode[FD_CONNECT_BIT]) != 0) {
__db_err(env, ret, "connecting");
goto err;
}
if (WSAEventSelect(conn->fd, conn->event_object, FD_READ | FD_CLOSE) ==
SOCKET_ERROR) {
ret = net_errno;
__db_err(env, ret, "setting event bits for reading");
return (ret);
}
return (__repmgr_propose_version(env, conn));
err:
db_rep = env->rep_handle;
eid = conn->eid;
DB_ASSERT(env, IS_VALID_EID(eid));
if (ADDR_LIST_NEXT(&SITE_FROM_EID(eid)->net_addr) == NULL) {
STAT(db_rep->region->mstat.st_connect_fail++);
return (DB_REP_UNAVAIL);
}
DISABLE_CONNECTION(conn);
ret = __repmgr_connect_site(env, eid);
DB_ASSERT(env, ret != DB_REP_UNAVAIL);
return (ret);
}