repmgr_windows.c   [plain text]


/*-
 * See the file LICENSE for redistribution information.
 *
 * Copyright (c) 2005,2008 Oracle.  All rights reserved.
 *
 * $Id: repmgr_windows.c,v 1.32 2008/03/13 17:31:28 mbrey Exp $
 */

#include "db_config.h"

#define	__INCLUDE_NETWORKING	1
#include "db_int.h"

/* Convert time-out from microseconds to milliseconds, rounding up. */
#define	DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t) (((t) + (US_PER_MS - 1)) / US_PER_MS)

typedef struct __ack_waiter {
	HANDLE event;
	const DB_LSN *lsnp;
	struct __ack_waiter *next_free;
} ACK_WAITER;

#define	WAITER_SLOT_IN_USE(w) ((w)->lsnp != NULL)

/*
 * Array slots [0:next_avail-1] are initialized, and either in use or on the
 * free list.  Slots beyond that are virgin territory, whose memory contents
 * could be garbage.  In particular, note that slots [0:next_avail-1] have a
 * Win32 Event Object created for them, which have to be freed when cleaning up
 * this data structure.
 *
 * "first_free" points to a list of not-in-use slots threaded through the first
 * section of the array.
 */
struct __ack_waiters_table {
	struct __ack_waiter *array;
	int size;
	int next_avail;
	struct __ack_waiter *first_free;
};

static int allocate_wait_slot __P((ENV *, ACK_WAITER **));
static void free_wait_slot __P((ENV *, ACK_WAITER *));
static int handle_completion __P((ENV *, REPMGR_CONNECTION *));
static int finish_connecting __P((ENV *, REPMGR_CONNECTION *,
				     LPWSANETWORKEVENTS));

int
__repmgr_thread_start(env, runnable)
	ENV *env;
	REPMGR_RUNNABLE *runnable;
{
	HANDLE thread_id;

	runnable->finished = FALSE;

	thread_id = CreateThread(NULL, 0,
	    (LPTHREAD_START_ROUTINE)runnable->run, env, 0, NULL);
	if (thread_id == NULL)
		return (GetLastError());
	runnable->thread_id = thread_id;
	return (0);
}

int
__repmgr_thread_join(thread)
	REPMGR_RUNNABLE *thread;
{
	if (WaitForSingleObject(thread->thread_id, INFINITE) == WAIT_OBJECT_0)
		return (0);
	return (GetLastError());
}

int
__repmgr_set_nonblocking(s)
	SOCKET s;
{
	int ret;
	u_long arg;

	arg = 1;		/* any non-zero value */
	if ((ret = ioctlsocket(s, FIONBIO, &arg)) == SOCKET_ERROR)
		return (WSAGetLastError());
	return (0);
}

/*
 * Wake any send()-ing threads waiting for an acknowledgement.
 *
 * !!!
 * Caller must hold the repmgr->mutex, if this thread synchronization is to work
 * properly.
 */
int
__repmgr_wake_waiting_senders(env)
	ENV *env;
{
	ACK_WAITER *slot;
	DB_REP *db_rep;
	int i, ret;

	ret = 0;
	db_rep = env->rep_handle;
	for (i=0; i<db_rep->waiters->next_avail; i++) {
		 slot = &db_rep->waiters->array[i];
		 if (!WAITER_SLOT_IN_USE(slot))
			 continue;
		 if (__repmgr_is_permanent(env, slot->lsnp))
			 if (!SetEvent(slot->event) && ret == 0)
				 ret = GetLastError();
	}
	return (ret);
}

/*
 * !!!
 * Caller must hold mutex.
 */
int
__repmgr_await_ack(env, lsnp)
	ENV *env;
	const DB_LSN *lsnp;
{
	ACK_WAITER *me;
	DB_REP *db_rep;
	DWORD ret, timeout;

	db_rep = env->rep_handle;

	if ((ret = allocate_wait_slot(env, &me)) != 0)
		goto err;

	timeout = db_rep->ack_timeout > 0 ?
	    DB_TIMEOUT_TO_WINDOWS_TIMEOUT(db_rep->ack_timeout) : INFINITE;
	me->lsnp = lsnp;
	if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
	    FALSE)) == WAIT_FAILED) {
		ret = GetLastError();
	} else if (ret == WAIT_TIMEOUT)
		ret = DB_REP_UNAVAIL;
	else
		DB_ASSERT(env, ret == WAIT_OBJECT_0);

	LOCK_MUTEX(db_rep->mutex);
	free_wait_slot(env, me);

err:
	return (ret);
}

/*
 * !!!
 * Caller must hold the mutex.
 */
static int
allocate_wait_slot(env, resultp)
	ENV *env;
	ACK_WAITER **resultp;
{
	ACK_WAITER *w;
	ACK_WAITERS_TABLE *table;
	DB_REP *db_rep;
	int ret;

	db_rep = env->rep_handle;
	table = db_rep->waiters;
	if (table->first_free == NULL) {
		if (table->next_avail >= table->size) {
			/*
			 * Grow the array.
			 */
			table->size *= 2;
			w = table->array;
			if ((ret = __os_realloc(env, table->size * sizeof(*w),
			     &w)) != 0)
				return (ret);
			table->array = w;
		}
		/*
		 * Here if, one way or another, we're good to go for using the
		 * next slot (for the first time).
		 */
		w = &table->array[table->next_avail++];
		if ((w->event = CreateEvent(NULL, FALSE, FALSE, NULL)) ==
		    NULL) {
			/*
			 * Maintain the sanctity of our rule that
			 * [0:next_avail-1] contain valid Event Objects.
			 */
			--table->next_avail;
			return (GetLastError());
		}
	} else {
		w = table->first_free;
		table->first_free = w->next_free;
	}
	*resultp = w;
	return (0);
}

static void
free_wait_slot(env, slot)
	ENV *env;
	ACK_WAITER *slot;
{
	DB_REP *db_rep;

	db_rep = env->rep_handle;

	slot->lsnp = NULL;	/* show it's not in use */
	slot->next_free = db_rep->waiters->first_free;
	db_rep->waiters->first_free = slot;
}

/* (See requirements described in repmgr_posix.c.) */
int
__repmgr_await_drain(env, conn, timeout)
	ENV *env;
	REPMGR_CONNECTION *conn;
	db_timeout_t timeout;
{
	DB_REP *db_rep;
	db_timespec deadline, delta, now;
	db_timeout_t t;
	DWORD duration, ret;
	int round_up;

	db_rep = env->rep_handle;

	__os_gettime(env, &deadline, 1);
	TIMESPEC_ADD_DB_TIMEOUT(&deadline, timeout);

	while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
		if (!ResetEvent(conn->drained))
			return (GetLastError());

		/* How long until the deadline? */
		__os_gettime(env, &now, 1);
		if (timespeccmp(&now, &deadline, >=)) {
			conn->state = CONN_CONGESTED;
			return (0);
		}
		delta = deadline;
		timespecsub(&delta, &now);
		round_up = TRUE;
		DB_TIMESPEC_TO_TIMEOUT(t, &delta, round_up);
		duration = DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t);

		ret = SignalObjectAndWait(db_rep->mutex,
		    conn->drained, duration, FALSE);
		LOCK_MUTEX(db_rep->mutex);
		if (ret == WAIT_FAILED)
			return (GetLastError());
		else if (ret == WAIT_TIMEOUT) {
			conn->state = CONN_CONGESTED;
			return (0);
		} else
			DB_ASSERT(env, ret == WAIT_OBJECT_0);

		if (db_rep->finished)
			return (0);
		if (conn->state == CONN_DEFUNCT)
			return (DB_REP_UNAVAIL);
	}
	return (0);
}

/*
 * Creates a manual reset event, which is usually our best choice when we may
 * have multiple threads waiting on a single event.
 */
int
__repmgr_alloc_cond(c)
	cond_var_t *c;
{
	HANDLE event;

	if ((event = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL)
		return (GetLastError());
	*c = event;
	return (0);
}

int
__repmgr_free_cond(c)
	cond_var_t *c;
{
	if (CloseHandle(*c))
		return (0);
	return (GetLastError());
}

/*
 * Make resource allocation an all-or-nothing affair, outside of this and the
 * close_sync function.  db_rep->waiters should be non-NULL iff all of these
 * resources have been created.
 */
int
__repmgr_init_sync(env, db_rep)
     ENV *env;
     DB_REP *db_rep;
{
#define	INITIAL_ALLOCATION 5		/* arbitrary size */
	ACK_WAITERS_TABLE *table;
	int ret;

	db_rep->signaler = db_rep->queue_nonempty = db_rep->check_election =
	    db_rep->mutex = NULL;
	table = NULL;

	if ((db_rep->signaler = CreateEvent(NULL, /* security attr */
	    FALSE,	/* (not) of the manual reset variety  */
	    FALSE,		/* (not) initially signaled */
	    NULL)) == NULL)		/* name */
		goto geterr;

	if ((db_rep->queue_nonempty = CreateEvent(NULL, TRUE, FALSE, NULL))
	    == NULL)
		goto geterr;

	if ((db_rep->check_election = CreateEvent(NULL, FALSE, FALSE, NULL))
	    == NULL)
		goto geterr;

	if ((db_rep->mutex = CreateMutex(NULL, FALSE, NULL)) == NULL)
		goto geterr;

	if ((ret = __os_calloc(env, 1, sizeof(ACK_WAITERS_TABLE), &table))
	    != 0)
		goto err;

	if ((ret = __os_calloc(env, INITIAL_ALLOCATION, sizeof(ACK_WAITER),
	    &table->array)) != 0)
		goto err;

	table->size = INITIAL_ALLOCATION;
	table->first_free = NULL;
	table->next_avail = 0;

	/* There's a restaurant joke in there somewhere. */
	db_rep->waiters = table;
	return (0);

geterr:
	ret = GetLastError();
err:
	if (db_rep->check_election != NULL)
		CloseHandle(db_rep->check_election);
	if (db_rep->queue_nonempty != NULL)
		CloseHandle(db_rep->queue_nonempty);
	if (db_rep->signaler != NULL)
		CloseHandle(db_rep->signaler);
	if (db_rep->mutex != NULL)
		CloseHandle(db_rep->mutex);
	if (table != NULL)
		__os_free(env, table);
	db_rep->waiters = NULL;
	return (ret);
}

int
__repmgr_close_sync(env)
     ENV *env;
{
	DB_REP *db_rep;
	int i, ret;

	db_rep = env->rep_handle;
	if (!(REPMGR_SYNC_INITED(db_rep)))
		return (0);

	ret = 0;
	for (i = 0; i < db_rep->waiters->next_avail; i++) {
		if (!CloseHandle(db_rep->waiters->array[i].event) && ret == 0)
			ret = GetLastError();
	}
	__os_free(env, db_rep->waiters->array);
	__os_free(env, db_rep->waiters);

	if (!CloseHandle(db_rep->check_election) && ret == 0)
		ret = GetLastError();

	if (!CloseHandle(db_rep->queue_nonempty) && ret == 0)
		ret = GetLastError();

	if (!CloseHandle(db_rep->signaler) && ret == 0)
		ret = GetLastError();

	if (!CloseHandle(db_rep->mutex) && ret == 0)
		ret = GetLastError();

	db_rep->waiters = NULL;
	return (ret);
}

/*
 * Performs net-related resource initialization other than memory initialization
 * and allocation.  A valid db_rep->listen_fd acts as the "all-or-nothing"
 * sentinel signifying that these resources are allocated (except that now the
 * new wsa_inited flag may be used to indicate that WSAStartup has already been
 * called).
 */
int
__repmgr_net_init(env, db_rep)
	ENV *env;
	DB_REP *db_rep;
{
	int ret;

	/* Initialize the Windows sockets DLL. */
	if (!db_rep->wsa_inited && (ret = __repmgr_wsa_init(env)) != 0)
		goto err;

	if ((ret = __repmgr_listen(env)) == 0)
		return (0);

	if (WSACleanup() == SOCKET_ERROR) {
		ret = net_errno;
		__db_err(env, ret, "WSACleanup");
	}

err:	db_rep->listen_fd = INVALID_SOCKET;
	return (ret);
}

/*
 * __repmgr_wsa_init --
 *	Initialize the Windows sockets DLL.
 *
 * PUBLIC: int __repmgr_wsa_init __P((ENV *));
 */
int
__repmgr_wsa_init(env)
	ENV *env;
{
	DB_REP *db_rep;
	WSADATA wsaData;
	int ret;

	db_rep = env->rep_handle;

	if ((ret = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0) {
		__db_err(env, ret, "unable to initialize Windows networking");
		return (ret);
	}
	db_rep->wsa_inited = TRUE;

	return (0);
}

int
__repmgr_lock_mutex(mutex)
	mgr_mutex_t  *mutex;
{
	if (WaitForSingleObject(*mutex, INFINITE) == WAIT_OBJECT_0)
		return (0);
	return (GetLastError());
}

int
__repmgr_unlock_mutex(mutex)
	mgr_mutex_t  *mutex;
{
	if (ReleaseMutex(*mutex))
		return (0);
	return (GetLastError());
}

int
__repmgr_signal(v)
	cond_var_t *v;
{
	return (SetEvent(*v) ? 0 : GetLastError());
}

int
__repmgr_wake_main_thread(env)
	ENV *env;
{
	if (!SetEvent(env->rep_handle->signaler))
		return (GetLastError());
	return (0);
}

int
__repmgr_writev(fd, iovec, buf_count, byte_count_p)
	socket_t fd;
	db_iovec_t *iovec;
	int buf_count;
	size_t *byte_count_p;
{
	DWORD bytes;

	if (WSASend(fd, iovec,
	    (DWORD)buf_count, &bytes, 0, NULL, NULL) == SOCKET_ERROR)
		return (net_errno);

	*byte_count_p = (size_t)bytes;
	return (0);
}

int
__repmgr_readv(fd, iovec, buf_count, xfr_count_p)
	socket_t fd;
	db_iovec_t *iovec;
	int buf_count;
	size_t *xfr_count_p;
{
	DWORD bytes, flags;

	flags = 0;
	if (WSARecv(fd, iovec,
	    (DWORD)buf_count, &bytes, &flags, NULL, NULL) == SOCKET_ERROR)
		return (net_errno);

	*xfr_count_p = (size_t)bytes;
	return (0);
}

int
__repmgr_select_loop(env)
	ENV *env;
{
	DB_REP *db_rep;
	DWORD nevents, ret;
	DWORD select_timeout;
	REPMGR_CONNECTION *conn, *next;
	REPMGR_CONNECTION *connections[WSA_MAXIMUM_WAIT_EVENTS];
	WSAEVENT events[WSA_MAXIMUM_WAIT_EVENTS];
	db_timespec timeout;
	WSAEVENT listen_event;
	WSANETWORKEVENTS net_events;
	int flow_control, i;

	db_rep = env->rep_handle;

	if ((listen_event = WSACreateEvent()) == WSA_INVALID_EVENT) {
		__db_err(
		    env, net_errno, "can't create event for listen socket");
		return (net_errno);
	}
	if (WSAEventSelect(db_rep->listen_fd, listen_event, FD_ACCEPT) ==
	    SOCKET_ERROR) {
		ret = net_errno;
		__db_err(env, ret, "can't enable event for listener");
		goto out;
	}

	LOCK_MUTEX(db_rep->mutex);
	if ((ret = __repmgr_first_try_connections(env)) != 0)
		goto unlock;
	flow_control = FALSE;
	for (;;) {
		/* Start with the two events that we always wait for. */
		events[0] = db_rep->signaler;
		events[1] = listen_event;
		nevents = 2;

		/*
		 * Add an event for each surviving socket that we're interested
		 * in.  (For now [until we implement flow control], that's all
		 * of them, in one form or another.)  Clean up defunct
		 * connections; note that this is the only place where elements
		 * get deleted from this list.
		 *     Loop just like TAILQ_FOREACH, except that we need to be
		 * able to unlink a list entry.
		 */
		for (conn = TAILQ_FIRST(&db_rep->connections);
		     conn != NULL;
		     conn = next) {
			next = TAILQ_NEXT(conn, entries);

			if (conn->state == CONN_DEFUNCT) {
				if ((ret = __repmgr_cleanup_connection(env,
				    conn)) != 0)
					goto unlock;
				continue;
			}

			/*
			 * Note that even if we're suffering flow control, we
			 * nevertheless still read if we haven't even yet gotten
			 * a handshake.  Why?  (1) Handshakes are important; and
			 * (2) they don't hurt anything flow-control-wise.
			 */
			if (conn->state == CONN_CONNECTING ||
			    !STAILQ_EMPTY(&conn->outbound_queue) ||
			    (!flow_control || !IS_VALID_EID(conn->eid))) {
				events[nevents] = conn->event_object;
				connections[nevents++] = conn;
			}
		}

		if (__repmgr_compute_timeout(env, &timeout))
			select_timeout =
			    (DWORD)(timeout.tv_sec * MS_PER_SEC +
			    timeout.tv_nsec / NS_PER_MS);
		else {
			/* No time-based events to wake us up. */
			select_timeout = WSA_INFINITE;
		}

		UNLOCK_MUTEX(db_rep->mutex);
		ret = WSAWaitForMultipleEvents(
		    nevents, events, FALSE, select_timeout, FALSE);
		if (db_rep->finished) {
			ret = 0;
			goto out;
		}
		LOCK_MUTEX(db_rep->mutex);

		/*
		 * !!!
		 * Note that `ret' remains set as the return code from
		 * WSAWaitForMultipleEvents, above.
		 */
		if (ret >= WSA_WAIT_EVENT_0 &&
		    ret < WSA_WAIT_EVENT_0 + nevents) {
			switch (i = ret - WSA_WAIT_EVENT_0) {
			case 0:
				/* Another thread woke us. */
				break;
			case 1:
				if ((ret = WSAEnumNetworkEvents(
				    db_rep->listen_fd, listen_event,
				    &net_events)) == SOCKET_ERROR) {
					ret = net_errno;
					goto unlock;
				}
				DB_ASSERT(env,
				    net_events.lNetworkEvents & FD_ACCEPT);
				if ((ret = net_events.iErrorCode[FD_ACCEPT_BIT])
				    != 0)
					goto unlock;
				if ((ret = __repmgr_accept(env)) != 0)
					goto unlock;
				break;
			default:
				if (connections[i]->state != CONN_DEFUNCT &&
				    (ret = handle_completion(env,
				    connections[i])) != 0)
					goto unlock;
				break;
			}
		} else if (ret == WSA_WAIT_TIMEOUT) {
			if ((ret = __repmgr_check_timeouts(env)) != 0)
				goto unlock;
		} else if (ret == WSA_WAIT_FAILED) {
			ret = net_errno;
			goto unlock;
		}
	}

unlock:
	UNLOCK_MUTEX(db_rep->mutex);
out:
	if (!CloseHandle(listen_event) && ret == 0)
		ret = GetLastError();
	return (ret);
}

static int
handle_completion(env, conn)
	ENV *env;
	REPMGR_CONNECTION *conn;
{
	int ret;
	WSANETWORKEVENTS events;

	if ((ret = WSAEnumNetworkEvents(conn->fd, conn->event_object, &events))
	    == SOCKET_ERROR) {
		__db_err(env, net_errno, "EnumNetworkEvents");
		STAT(env->rep_handle->region->mstat.st_connection_drop++);
		ret = DB_REP_UNAVAIL;
		goto err;
	}

	if (conn->state == CONN_CONNECTING) {
		if ((ret = finish_connecting(env, conn, &events)) != 0)
			goto err;
	} else {		/* Check both writing and reading. */
		if (events.lNetworkEvents & FD_CLOSE) {
			__db_err(env,
			    events.iErrorCode[FD_CLOSE_BIT],
			    "connection closed");
			STAT(env->rep_handle->
			    region->mstat.st_connection_drop++);
			ret = DB_REP_UNAVAIL;
			goto err;
		}

		if (events.lNetworkEvents & FD_WRITE) {
			if (events.iErrorCode[FD_WRITE_BIT] != 0) {
				__db_err(env,
				    events.iErrorCode[FD_WRITE_BIT],
				    "error writing");
				STAT(env->rep_handle->
				    region->mstat.st_connection_drop++);
				ret = DB_REP_UNAVAIL;
				goto err;
			} else if ((ret =
			    __repmgr_write_some(env, conn)) != 0)
				goto err;
		}

		if (events.lNetworkEvents & FD_READ) {
			if (events.iErrorCode[FD_READ_BIT] != 0) {
				__db_err(env,
				    events.iErrorCode[FD_READ_BIT],
				    "error reading");
				STAT(env->rep_handle->
				    region->mstat.st_connection_drop++);
				ret = DB_REP_UNAVAIL;
				goto err;
			} else if ((ret =
			    __repmgr_read_from_site(env, conn)) != 0)
				goto err;
		}
	}

err:
	if (ret == DB_REP_UNAVAIL)
		ret = __repmgr_bust_connection(env, conn);
	return (ret);
}

static int
finish_connecting(env, conn, events)
	ENV *env;
	REPMGR_CONNECTION *conn;
	LPWSANETWORKEVENTS events;
{
	DB_REP *db_rep;
	u_int eid;
/*	char reason[100]; */
	int ret/*, t_ret*/;
/*	DWORD_PTR values[1]; */

	if (!(events->lNetworkEvents & FD_CONNECT))
		return (0);

	conn->state = CONN_CONNECTED;

	if ((ret = events->iErrorCode[FD_CONNECT_BIT]) != 0) {
/*		t_ret = FormatMessage( */
/*		    FORMAT_MESSAGE_IGNORE_INSERTS | */
/*		    FORMAT_MESSAGE_FROM_SYSTEM | */
/*		    FORMAT_MESSAGE_ARGUMENT_ARRAY, */
/*		    NULL, ret, 0, (LPTSTR)reason, sizeof(reason), values); */
/*		__db_err(env/\*, ret*\/, "connecting: %s", */
/*		    reason); */
/*		LocalFree(reason); */
		__db_err(env, ret, "connecting");
		goto err;
	}

	if (WSAEventSelect(conn->fd, conn->event_object, FD_READ | FD_CLOSE) ==
	    SOCKET_ERROR) {
		ret = net_errno;
		__db_err(env, ret, "setting event bits for reading");
		return (ret);
	}

	return (__repmgr_propose_version(env, conn));

err:
	db_rep = env->rep_handle;
	eid = conn->eid;
	DB_ASSERT(env, IS_VALID_EID(eid));

	if (ADDR_LIST_NEXT(&SITE_FROM_EID(eid)->net_addr) == NULL) {
		STAT(db_rep->region->mstat.st_connect_fail++);
		return (DB_REP_UNAVAIL);
	}

	/*
	 * Since we're immediately trying the next address in the list, simply
	 * disable the failed connection, without the usual recovery.
	 */
	DISABLE_CONNECTION(conn);

	ret = __repmgr_connect_site(env, eid);
	DB_ASSERT(env, ret != DB_REP_UNAVAIL);
	return (ret);
}