/*- * See the file LICENSE for redistribution information. * * Copyright (c) 2005,2008 Oracle. All rights reserved. * * $Id: repmgr_method.c,v 1.48 2008/04/25 19:02:47 alanb Exp $ */ #include "db_config.h" #define __INCLUDE_NETWORKING 1 #include "db_int.h" static int __repmgr_await_threads __P((ENV *)); /* * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t)); */ int __repmgr_start(dbenv, nthreads, flags) DB_ENV *dbenv; int nthreads; u_int32_t flags; { DBT my_addr; DB_REP *db_rep; ENV *env; REPMGR_RUNNABLE *selector, *messenger; int ret, i; env = dbenv->env; db_rep = env->rep_handle; if (!F_ISSET(env, ENV_THREAD)) { __db_errx(env, "Replication Manager needs an environment with DB_THREAD"); return (EINVAL); } /* Check that the required initialization has been done. */ if (db_rep->my_addr.port == 0) { __db_errx(env, "repmgr_set_local_site must be called before repmgr_start"); return (EINVAL); } if (db_rep->selector != NULL || db_rep->finished) { __db_errx(env, "DB_ENV->repmgr_start may not be called more than once"); return (EINVAL); } switch (flags) { case DB_REP_CLIENT: case DB_REP_ELECTION: case DB_REP_MASTER: break; default: __db_errx(env, "repmgr_start: unrecognized flags parameter value"); return (EINVAL); } if (nthreads <= 0) { __db_errx(env, "repmgr_start: nthreads parameter must be >= 1"); return (EINVAL); } if ((ret = __os_calloc(env, (u_int)nthreads, sizeof(REPMGR_RUNNABLE *), &db_rep->messengers)) != 0) return (ret); db_rep->nthreads = nthreads; if ((ret = __repmgr_net_init(env, db_rep)) != 0 || (ret = __repmgr_init_sync(env, db_rep)) != 0 || (ret = __rep_set_transport(dbenv, SELF_EID, __repmgr_send)) != 0) return (ret); /* * Make some sort of call to rep_start before starting other threads, to * ensure that incoming messages being processed always have a rep * context properly configured. */ if ((db_rep->init_policy = flags) == DB_REP_MASTER) ret = __repmgr_become_master(env); else { if ((ret = __repmgr_prepare_my_addr(env, &my_addr)) != 0) return (ret); ret = __rep_start(dbenv, &my_addr, DB_REP_CLIENT); __os_free(env, my_addr.data); if (ret == 0) { LOCK_MUTEX(db_rep->mutex); ret = __repmgr_init_election(env, ELECT_SEEK_MASTER); UNLOCK_MUTEX(db_rep->mutex); } } if (ret != 0) return (ret); if ((ret = __os_calloc(env, 1, sizeof(REPMGR_RUNNABLE), &selector)) != 0) return (ret); selector->env = env; selector->run = __repmgr_select_thread; if ((ret = __repmgr_thread_start(env, selector)) != 0) { __db_err(env, ret, "can't start selector thread"); __os_free(env, selector); return (ret); } db_rep->selector = selector; for (i=0; ienv = env; messenger->run = __repmgr_msg_thread; if ((ret = __repmgr_thread_start(env, messenger)) != 0) { __os_free(env, messenger); return (ret); } db_rep->messengers[i] = messenger; } return (ret); } /* * PUBLIC: int __repmgr_close __P((ENV *)); */ int __repmgr_close(env) ENV *env; { DB_REP *db_rep; int ret, t_ret; ret = 0; db_rep = env->rep_handle; if (db_rep->selector != NULL) { RPRINT(env, DB_VERB_REPMGR_MISC, (env, "Stopping repmgr threads")); ret = __repmgr_stop_threads(env); if ((t_ret = __repmgr_await_threads(env)) != 0 && ret == 0) ret = t_ret; RPRINT(env, DB_VERB_REPMGR_MISC, (env, "Repmgr threads are finished")); } if ((t_ret = __repmgr_net_close(env)) != 0 && ret == 0) ret = t_ret; if ((t_ret = __repmgr_close_sync(env)) != 0 && ret == 0) ret = t_ret; return (ret); } /* * PUBLIC: int __repmgr_set_ack_policy __P((DB_ENV *, int)); */ int __repmgr_set_ack_policy(dbenv, policy) DB_ENV *dbenv; int policy; { ENV *env; env = dbenv->env; switch (policy) { case DB_REPMGR_ACKS_ALL: /* FALLTHROUGH */ case DB_REPMGR_ACKS_ALL_PEERS: /* FALLTHROUGH */ case DB_REPMGR_ACKS_NONE: /* FALLTHROUGH */ case DB_REPMGR_ACKS_ONE: /* FALLTHROUGH */ case DB_REPMGR_ACKS_ONE_PEER: /* FALLTHROUGH */ case DB_REPMGR_ACKS_QUORUM: env->rep_handle->perm_policy = policy; return (0); default: __db_errx(env, "unknown ack_policy in DB_ENV->repmgr_set_ack_policy"); return (EINVAL); } } /* * PUBLIC: int __repmgr_get_ack_policy __P((DB_ENV *, int *)); */ int __repmgr_get_ack_policy(dbenv, policy) DB_ENV *dbenv; int *policy; { ENV *env; env = dbenv->env; *policy = env->rep_handle->perm_policy; return (0); } /* * PUBLIC: int __repmgr_env_create __P((ENV *, DB_REP *)); */ int __repmgr_env_create(env, db_rep) ENV *env; DB_REP *db_rep; { int ret; /* Set some default values. */ db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT; db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY; db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY; db_rep->config_nsites = 0; db_rep->peer = DB_EID_INVALID; db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM; #ifdef DB_WIN32 db_rep->waiters = NULL; #else db_rep->read_pipe = db_rep->write_pipe = -1; #endif if ((ret = __repmgr_net_create(db_rep)) == 0) ret = __repmgr_queue_create(env, db_rep); return (ret); } /* * PUBLIC: void __repmgr_env_destroy __P((ENV *, DB_REP *)); */ void __repmgr_env_destroy(env, db_rep) ENV *env; DB_REP *db_rep; { __repmgr_queue_destroy(env); __repmgr_net_destroy(env, db_rep); if (db_rep->messengers != NULL) { __os_free(env, db_rep->messengers); db_rep->messengers = NULL; } } /* * PUBLIC: int __repmgr_stop_threads __P((ENV *)); */ int __repmgr_stop_threads(env) ENV *env; { DB_REP *db_rep; REPMGR_CONNECTION *conn; int ret; db_rep = env->rep_handle; /* * Hold mutex for the purpose of waking up threads, but then get out of * the way to let them clean up and exit. */ LOCK_MUTEX(db_rep->mutex); db_rep->finished = TRUE; if (db_rep->elect_thread != NULL && (ret = __repmgr_signal(&db_rep->check_election)) != 0) goto unlock; if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0) goto unlock; TAILQ_FOREACH(conn, &db_rep->connections, entries) { if (conn->blockers > 0 && ((ret = __repmgr_signal(&conn->drained)) != 0)) goto unlock; } UNLOCK_MUTEX(db_rep->mutex); return (__repmgr_wake_main_thread(env)); unlock: UNLOCK_MUTEX(db_rep->mutex); return (ret); } static int __repmgr_await_threads(env) ENV *env; { DB_REP *db_rep; REPMGR_RUNNABLE *messenger; int ret, t_ret, i; db_rep = env->rep_handle; ret = 0; if (db_rep->elect_thread != NULL) { ret = __repmgr_thread_join(db_rep->elect_thread); __os_free(env, db_rep->elect_thread); db_rep->elect_thread = NULL; } for (i=0; inthreads && db_rep->messengers[i] != NULL; i++) { messenger = db_rep->messengers[i]; if ((t_ret = __repmgr_thread_join(messenger)) != 0 && ret == 0) ret = t_ret; __os_free(env, messenger); db_rep->messengers[i] = NULL; /* necessary? */ } __os_free(env, db_rep->messengers); db_rep->messengers = NULL; if (db_rep->selector != NULL) { if ((t_ret = __repmgr_thread_join(db_rep->selector)) != 0 && ret == 0) ret = t_ret; __os_free(env, db_rep->selector); db_rep->selector = NULL; } return (ret); } /* * PUBLIC: int __repmgr_set_local_site __P((DB_ENV *, const char *, u_int, * PUBLIC: u_int32_t)); */ int __repmgr_set_local_site(dbenv, host, port, flags) DB_ENV *dbenv; const char *host; u_int port; u_int32_t flags; { ADDRINFO *address_list; DB_REP *db_rep; ENV *env; repmgr_netaddr_t addr; int locked, ret; env = dbenv->env; if (flags != 0) return (__db_ferr(env, "DB_ENV->repmgr_set_local_site", 0)); db_rep = env->rep_handle; if (db_rep->my_addr.port != 0) { __db_errx(env, "Listen address already set"); return (EINVAL); } if (host == NULL) { __db_errx(env, "repmgr_set_local_site: host name is required"); return (EINVAL); } if ((ret = __repmgr_getaddr( env, host, port, AI_PASSIVE, &address_list)) != 0) return (ret); if ((ret = __repmgr_pack_netaddr(env, host, port, address_list, &addr)) != 0) { __os_freeaddrinfo(env, address_list); return (ret); } if (REPMGR_SYNC_INITED(db_rep)) { LOCK_MUTEX(db_rep->mutex); locked = TRUE; } else locked = FALSE; memcpy(&db_rep->my_addr, &addr, sizeof(addr)); if (locked) UNLOCK_MUTEX(db_rep->mutex); return (0); } /* * If the application only calls this method from a single thread (e.g., during * its initialization), it will avoid the problems with the non-thread-safe host * name lookup. In any case, if we relegate the blocking lookup to here it * won't affect our select() loop. * * PUBLIC: int __repmgr_add_remote_site __P((DB_ENV *, const char *, u_int, * PUBLIC: int *, u_int32_t)); */ int __repmgr_add_remote_site(dbenv, host, port, eidp, flags) DB_ENV *dbenv; const char *host; u_int port; int *eidp; u_int32_t flags; { DB_REP *db_rep; ENV *env; REPMGR_SITE *site; int eid, locked, ret; env = dbenv->env; if ((ret = __db_fchk(env, "DB_ENV->repmgr_add_remote_site", flags, DB_REPMGR_PEER)) != 0) return (ret); if (host == NULL) { __db_errx(env, "repmgr_add_remote_site: host name is required"); return (EINVAL); } db_rep = env->rep_handle; if (REPMGR_SYNC_INITED(db_rep)) { LOCK_MUTEX(db_rep->mutex); locked = TRUE; } else locked = FALSE; switch (ret = __repmgr_add_site(env, host, port, &site)) { case 0: case EEXIST: ret = 0; break; default: goto unlock; } eid = EID_FROM_SITE(site); if (LF_ISSET(DB_REPMGR_PEER)) db_rep->peer = eid; if (eidp != NULL) *eidp = eid; unlock: if (locked) UNLOCK_MUTEX(db_rep->mutex); return (ret); }