#include "db_config.h"
#ifndef lint
static const char revid[] = "$Id: xa.c,v 1.2 2004/03/30 01:24:48 jtownsen Exp $";
#endif
#ifndef NO_SYSTEM_INCLUDES
#include <sys/types.h>
#include <stdlib.h>
#include <string.h>
#endif
#include "db_int.h"
#include "dbinc/txn.h"
static int __db_xa_close __P((char *, int, long));
static int __db_xa_commit __P((XID *, int, long));
static int __db_xa_complete __P((int *, int *, int, long));
static int __db_xa_end __P((XID *, int, long));
static int __db_xa_forget __P((XID *, int, long));
static int __db_xa_open __P((char *, int, long));
static int __db_xa_prepare __P((XID *, int, long));
static int __db_xa_recover __P((XID *, long, int, long));
static int __db_xa_rollback __P((XID *, int, long));
static int __db_xa_start __P((XID *, int, long));
static void __xa_put_txn __P((DB_ENV *, DB_TXN *));
const struct xa_switch_t db_xa_switch = {
"Berkeley DB",
TMNOMIGRATE,
0,
__db_xa_open,
__db_xa_close,
__db_xa_start,
__db_xa_end,
__db_xa_rollback,
__db_xa_prepare,
__db_xa_commit,
__db_xa_recover,
__db_xa_forget,
__db_xa_complete
};
#undef XA_MULTI_THREAD
int
__xa_get_txn(env, txnp, do_init)
DB_ENV *env;
DB_TXN **txnp;
int do_init;
{
int ret;
#ifdef XA_MULTI_THREAD
DB_TXN *t;
u_int32_t tid;
DB_TXNMGR *mgr;
#else
COMPQUIET(do_init, 0);
#endif
ret = 0;
#ifdef XA_MULTI_THREAD
tid = FILL ME IN
*txnp = NULL;
mgr = (DB_TXNMGR *)env->tx_handle;
DB_ASSERT(env->tx_handle != NULL);
MUTEX_THREAD_LOCK(mgr->mutexp);
for (t = TAILQ_FIRST(&env->xa_txn);
t != NULL;
t = TAILQ_NEXT(t, xalinks))
if (t->tid == tid) {
*txnp = t;
break;
}
MUTEX_THREAD_UNLOCK(mgr->mutexp);
if (*txnp == NULL) {
if (!do_init)
ret = EINVAL;
else if ((ret =
__os_malloc(env, sizeof(DB_TXN), NULL, txnp)) == 0) {
(*txnp)->tid = tid;
MUTEX_THREAD_LOCK(mgr->mutexp);
TAILQ_INSERT_HEAD(&env->xa_txn, *txnp, xalinks);
MUTEX_THREAD_UNLOCK(mgr->mutexp);
}
}
#else
*txnp = TAILQ_FIRST(&env->xa_txn);
if (*txnp == NULL &&
(ret = __os_calloc(env, 1, sizeof(DB_TXN), txnp)) == 0) {
(*txnp)->txnid = TXN_INVALID;
TAILQ_INSERT_HEAD(&env->xa_txn, *txnp, xalinks);
}
#endif
return (ret);
}
static void
__xa_put_txn(env, txnp)
DB_ENV *env;
DB_TXN *txnp;
{
#ifdef XA_MULTI_THREAD
DB_TXNMGR *mgr;
mgr = (DB_TXNMGR *)env->tx_handle;
MUTEX_THREAD_LOCK(mgr->mutexp);
TAILQ_REMOVE(&env->xa_txn, txnp, xalinks);
MUTEX_THREAD_UNLOCK(mgr->mutexp);
__os_free(env, txnp);
#else
COMPQUIET(env, NULL);
txnp->txnid = TXN_INVALID;
#endif
}
#ifdef XA_MULTI_THREAD
#define XA_FLAGS \
DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | \
DB_INIT_TXN | DB_THREAD
#else
#define XA_FLAGS \
DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | \
DB_INIT_TXN
#endif
static int
__db_xa_open(xa_info, rmid, flags)
char *xa_info;
int rmid;
long flags;
{
DB_ENV *env;
if (LF_ISSET(TMASYNC))
return (XAER_ASYNC);
if (flags != TMNOFLAGS)
return (XAER_INVAL);
if (__db_rmid_to_env(rmid, &env) == 0)
return (XA_OK);
if (__os_calloc(env, 1, sizeof(DB_ENV), &env) != 0)
return (XAER_RMERR);
if (db_env_create(&env, 0) != 0)
return (XAER_RMERR);
if (env->open(env, xa_info, XA_FLAGS, 0) != 0)
goto err;
if (__db_map_rmid(rmid, env) != 0)
goto err;
TAILQ_INIT(&env->xa_txn);
return (XA_OK);
err: (void)env->close(env, 0);
return (XAER_RMERR);
}
static int
__db_xa_close(xa_info, rmid, flags)
char *xa_info;
int rmid;
long flags;
{
DB_ENV *env;
DB_TXN *t;
int ret, t_ret;
COMPQUIET(xa_info, NULL);
if (LF_ISSET(TMASYNC))
return (XAER_ASYNC);
if (flags != TMNOFLAGS)
return (XAER_INVAL);
if (__db_rmid_to_env(rmid, &env) != 0)
return (XA_OK);
if ((t = TAILQ_FIRST(&env->xa_txn)) != NULL && t->txnid != TXN_INVALID)
return (XAER_PROTO);
ret = __db_unmap_rmid(rmid);
while ((t = TAILQ_FIRST(&env->xa_txn)) != NULL) {
TAILQ_REMOVE(&env->xa_txn, t, xalinks);
__os_free(env, t);
}
if ((t_ret = env->close(env, 0)) != 0 && ret == 0)
ret = t_ret;
return (ret == 0 ? XA_OK : XAER_RMERR);
}
static int
__db_xa_start(xid, rmid, flags)
XID *xid;
int rmid;
long flags;
{
DB_ENV *env;
DB_TXN *txnp;
TXN_DETAIL *td;
size_t off;
int is_known;
#define OK_FLAGS (TMJOIN | TMRESUME | TMNOWAIT | TMASYNC | TMNOFLAGS)
if (LF_ISSET(~OK_FLAGS))
return (XAER_INVAL);
if (LF_ISSET(TMJOIN) && LF_ISSET(TMRESUME))
return (XAER_INVAL);
if (LF_ISSET(TMASYNC))
return (XAER_ASYNC);
if (__db_rmid_to_env(rmid, &env) != 0)
return (XAER_PROTO);
is_known = __db_xid_to_txn(env, xid, &off) == 0;
if (is_known && !LF_ISSET(TMRESUME) && !LF_ISSET(TMJOIN))
return (XAER_DUPID);
if (!is_known && LF_ISSET(TMRESUME | TMJOIN))
return (XAER_NOTA);
if (is_known) {
td = (TXN_DETAIL *)
R_ADDR(&((DB_TXNMGR *)env->tx_handle)->reginfo, off);
if (td->xa_status == TXN_XA_SUSPENDED &&
!LF_ISSET(TMRESUME | TMJOIN))
return (XAER_PROTO);
if (td->xa_status == TXN_XA_DEADLOCKED)
return (XA_RBDEADLOCK);
if (td->xa_status == TXN_XA_ABORTED)
return (XA_RBOTHER);
if (__xa_get_txn(env, &txnp, 1) != 0)
return (XAER_RMERR);
__txn_continue(env, txnp, td, off);
td->xa_status = TXN_XA_STARTED;
} else {
if (__xa_get_txn(env, &txnp, 1) != 0)
return (XAER_RMERR);
if (__txn_xa_begin(env, txnp))
return (XAER_RMERR);
(void)__db_map_xid(env, xid, txnp->off);
td = (TXN_DETAIL *)
R_ADDR(&((DB_TXNMGR *)env->tx_handle)->reginfo, txnp->off);
td->xa_status = TXN_XA_STARTED;
}
return (XA_OK);
}
static int
__db_xa_end(xid, rmid, flags)
XID *xid;
int rmid;
long flags;
{
DB_ENV *env;
DB_TXN *txn;
TXN_DETAIL *td;
size_t off;
if (flags != TMNOFLAGS && !LF_ISSET(TMSUSPEND | TMSUCCESS | TMFAIL))
return (XAER_INVAL);
if (__db_rmid_to_env(rmid, &env) != 0)
return (XAER_PROTO);
if (__db_xid_to_txn(env, xid, &off) != 0)
return (XAER_NOTA);
if (__xa_get_txn(env, &txn, 0) != 0)
return (XAER_RMERR);
if (off != txn->off)
return (XAER_PROTO);
td = (TXN_DETAIL *)R_ADDR(&((DB_TXNMGR *)env->tx_handle)->reginfo, off);
if (td->xa_status == TXN_XA_DEADLOCKED)
return (XA_RBDEADLOCK);
if (td->status == TXN_ABORTED)
return (XA_RBOTHER);
if (td->xa_status != TXN_XA_STARTED)
return (XAER_PROTO);
td->last_lsn = txn->last_lsn;
if (LF_ISSET(TMSUSPEND))
td->xa_status = TXN_XA_SUSPENDED;
else
td->xa_status = TXN_XA_ENDED;
__xa_put_txn(env, txn);
return (XA_OK);
}
static int
__db_xa_prepare(xid, rmid, flags)
XID *xid;
int rmid;
long flags;
{
DB_ENV *env;
DB_TXN *txnp;
TXN_DETAIL *td;
size_t off;
if (LF_ISSET(TMASYNC))
return (XAER_ASYNC);
if (flags != TMNOFLAGS)
return (XAER_INVAL);
if (__db_rmid_to_env(rmid, &env) != 0)
return (XAER_PROTO);
if (__db_xid_to_txn(env, xid, &off) != 0)
return (XAER_NOTA);
td = (TXN_DETAIL *)R_ADDR(&((DB_TXNMGR *)env->tx_handle)->reginfo, off);
if (td->xa_status == TXN_XA_DEADLOCKED)
return (XA_RBDEADLOCK);
if (td->xa_status != TXN_XA_ENDED && td->xa_status != TXN_XA_SUSPENDED)
return (XAER_PROTO);
if (__xa_get_txn(env, &txnp, 0) != 0)
return (XAER_PROTO);
__txn_continue(env, txnp, td, off);
if (txnp->prepare(txnp, (u_int8_t *)xid->data) != 0)
return (XAER_RMERR);
td->xa_status = TXN_XA_PREPARED;
__xa_put_txn(env, txnp);
return (XA_OK);
}
static int
__db_xa_commit(xid, rmid, flags)
XID *xid;
int rmid;
long flags;
{
DB_ENV *env;
DB_TXN *txnp;
TXN_DETAIL *td;
size_t off;
if (LF_ISSET(TMASYNC))
return (XAER_ASYNC);
#undef OK_FLAGS
#define OK_FLAGS (TMNOFLAGS | TMNOWAIT | TMONEPHASE)
if (LF_ISSET(~OK_FLAGS))
return (XAER_INVAL);
if (__db_rmid_to_env(rmid, &env) != 0)
return (XAER_PROTO);
if (__db_xid_to_txn(env, xid, &off) != 0)
return (XAER_NOTA);
td = (TXN_DETAIL *)R_ADDR(&((DB_TXNMGR *)env->tx_handle)->reginfo, off);
if (td->xa_status == TXN_XA_DEADLOCKED)
return (XA_RBDEADLOCK);
if (td->xa_status == TXN_XA_ABORTED)
return (XA_RBOTHER);
if (LF_ISSET(TMONEPHASE) &&
td->xa_status != TXN_XA_ENDED && td->xa_status != TXN_XA_SUSPENDED)
return (XAER_PROTO);
if (!LF_ISSET(TMONEPHASE) && td->xa_status != TXN_XA_PREPARED)
return (XAER_PROTO);
if (__xa_get_txn(env, &txnp, 0) != 0)
return (XAER_RMERR);
__txn_continue(env, txnp, td, off);
if (txnp->commit(txnp, 0) != 0)
return (XAER_RMERR);
__xa_put_txn(env, txnp);
return (XA_OK);
}
static int
__db_xa_recover(xids, count, rmid, flags)
XID *xids;
long count, flags;
int rmid;
{
DB_ENV *env;
u_int32_t newflags;
long rval;
if (__db_rmid_to_env(rmid, &env) != 0)
return (XAER_PROTO);
if (LF_ISSET(TMSTARTRSCAN))
newflags = DB_FIRST;
else if (LF_ISSET(TMENDRSCAN))
newflags = DB_LAST;
else
newflags = DB_NEXT;
rval = 0;
if (__txn_get_prepared(env, xids, NULL, count, &rval, newflags) != 0)
return (XAER_RMERR);
else
return (rval);
}
static int
__db_xa_rollback(xid, rmid, flags)
XID *xid;
int rmid;
long flags;
{
DB_ENV *env;
DB_TXN *txnp;
TXN_DETAIL *td;
size_t off;
if (LF_ISSET(TMASYNC))
return (XAER_ASYNC);
if (flags != TMNOFLAGS)
return (XAER_INVAL);
if (__db_rmid_to_env(rmid, &env) != 0)
return (XAER_PROTO);
if (__db_xid_to_txn(env, xid, &off) != 0)
return (XAER_NOTA);
td = (TXN_DETAIL *)R_ADDR(&((DB_TXNMGR *)env->tx_handle)->reginfo, off);
if (td->xa_status == TXN_XA_DEADLOCKED)
return (XA_RBDEADLOCK);
if (td->xa_status == TXN_XA_ABORTED)
return (XA_RBOTHER);
if (td->xa_status != TXN_XA_ENDED &&
td->xa_status != TXN_XA_SUSPENDED &&
td->xa_status != TXN_XA_PREPARED)
return (XAER_PROTO);
if (__xa_get_txn(env, &txnp, 0) != 0)
return (XAER_RMERR);
__txn_continue(env, txnp, td, off);
if (txnp->abort(txnp) != 0)
return (XAER_RMERR);
__xa_put_txn(env, txnp);
return (XA_OK);
}
static int
__db_xa_forget(xid, rmid, flags)
XID *xid;
int rmid;
long flags;
{
DB_ENV *env;
size_t off;
if (LF_ISSET(TMASYNC))
return (XAER_ASYNC);
if (flags != TMNOFLAGS)
return (XAER_INVAL);
if (__db_rmid_to_env(rmid, &env) != 0)
return (XAER_PROTO);
if (__db_xid_to_txn(env, xid, &off) != 0)
return (XA_OK);
__db_unmap_xid(env, xid, off);
return (XA_OK);
}
static int
__db_xa_complete(handle, retval, rmid, flags)
int *handle, *retval, rmid;
long flags;
{
COMPQUIET(handle, NULL);
COMPQUIET(retval, NULL);
COMPQUIET(rmid, 0);
COMPQUIET(flags, 0);
return (XAER_INVAL);
}