#include "db_config.h"
#include "db_int.h"
#include "dbinc/db_page.h"
#include "dbinc/btree.h"
#include "dbinc/lock.h"
#include "dbinc/log.h"
#include "dbinc/mp.h"
#include "dbinc/qam.h"
static int __qam_bulk __P((DBC *, DBT *, u_int32_t));
static int __qamc_close __P((DBC *, db_pgno_t, int *));
static int __qamc_del __P((DBC *));
static int __qamc_destroy __P((DBC *));
static int __qamc_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
static int __qamc_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
static int __qam_consume __P((DBC *, QMETA *, db_recno_t));
static int __qam_getno __P((DB *, const DBT *, db_recno_t *));
#define DONT_NEED_LOCKS(dbc) ((dbc)->txn == NULL || \
F_ISSET(dbc, DBC_READ_COMMITTED | DBC_READ_UNCOMMITTED))
int
__qam_position(dbc, recnop, lock_mode, get_mode, exactp)
DBC *dbc;
db_recno_t *recnop;
db_lockmode_t lock_mode;
u_int32_t get_mode;
int *exactp;
{
QUEUE_CURSOR *cp;
DB *dbp;
QAMDATA *qp;
db_pgno_t pg;
int ret, t_ret;
dbp = dbc->dbp;
cp = (QUEUE_CURSOR *)dbc->internal;
pg = QAM_RECNO_PAGE(dbp, *recnop);
if ((ret = __db_lget(dbc, 0, pg, lock_mode, 0, &cp->lock)) != 0)
return (ret);
cp->page = NULL;
*exactp = 0;
if ((ret = __qam_fget(dbp, &pg,
dbc->txn, get_mode, &cp->page)) != 0) {
if (!FLD_ISSET(get_mode, DB_MPOOL_CREATE) &&
(ret == DB_PAGE_NOTFOUND || ret == ENOENT))
ret = 0;
if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
cp->pgno = pg;
cp->indx = QAM_RECNO_INDEX(dbp, pg, *recnop);
if (PGNO(cp->page) == 0) {
if (!FLD_ISSET(get_mode, DB_MPOOL_CREATE)) {
*exactp = 0;
return (0);
}
DB_ASSERT(dbp->dbenv, FLD_ISSET(get_mode, DB_MPOOL_CREATE));
PGNO(cp->page) = pg;
TYPE(cp->page) = P_QAMDATA;
}
qp = QAM_GET_RECORD(dbp, cp->page, cp->indx);
*exactp = F_ISSET(qp, QAM_VALID) ? 1 : 0;
return (ret);
}
int
__qam_pitem(dbc, pagep, indx, recno, data)
DBC *dbc;
QPAGE *pagep;
u_int32_t indx;
db_recno_t recno;
DBT *data;
{
DB_ENV *dbenv;
DB *dbp;
DBT olddata, pdata, *datap;
QAMDATA *qp;
QUEUE *t;
u_int8_t *dest, *p;
int allocated, ret;
dbp = dbc->dbp;
dbenv = dbp->dbenv;
t = (QUEUE *)dbp->q_internal;
allocated = ret = 0;
if (data->size > t->re_len)
return (__db_rec_toobig(dbenv, data->size, t->re_len));
qp = QAM_GET_RECORD(dbp, pagep, indx);
p = qp->data;
datap = data;
if (F_ISSET(data, DB_DBT_PARTIAL)) {
if (data->doff + data->dlen > t->re_len) {
__db_errx(dbenv,
"%s: data offset plus length larger than record size of %lu",
"Record length error", (u_long)t->re_len);
return (EINVAL);
}
if (data->size != data->dlen)
return (__db_rec_repl(dbenv, data->size, data->dlen));
if (data->size == t->re_len)
goto no_partial;
if (DBC_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) {
datap = &pdata;
memset(datap, 0, sizeof(*datap));
if ((ret = __os_malloc(dbenv,
t->re_len, &datap->data)) != 0)
return (ret);
allocated = 1;
datap->size = t->re_len;
dest = datap->data;
if (F_ISSET(qp, QAM_VALID))
memcpy(dest, p, t->re_len);
else
memset(dest, (int)t->re_pad, t->re_len);
dest += data->doff;
memcpy(dest, data->data, data->size);
} else {
datap = data;
p += data->doff;
}
}
no_partial:
if (DBC_LOGGING(dbc)) {
olddata.size = 0;
if (F_ISSET(qp, QAM_SET)) {
olddata.data = qp->data;
olddata.size = t->re_len;
}
if ((ret = __qam_add_log(dbp, dbc->txn, &LSN(pagep),
0, &LSN(pagep), pagep->pgno,
indx, recno, datap, qp->flags,
olddata.size == 0 ? NULL : &olddata)) != 0)
goto err;
} else if (!F_ISSET((dbc), DBC_RECOVER))
LSN_NOT_LOGGED(LSN(pagep));
F_SET(qp, QAM_VALID | QAM_SET);
memcpy(p, datap->data, datap->size);
if (!F_ISSET(data, DB_DBT_PARTIAL))
memset(p + datap->size,
(int)t->re_pad, t->re_len - datap->size);
err: if (allocated)
__os_free(dbenv, datap->data);
return (ret);
}
static int
__qamc_put(dbc, key, data, flags, pgnop)
DBC *dbc;
DBT *key, *data;
u_int32_t flags;
db_pgno_t *pgnop;
{
DB *dbp;
DB_ENV *dbenv;
DB_LOCK lock;
DB_MPOOLFILE *mpf;
QMETA *meta;
QUEUE_CURSOR *cp;
db_pgno_t pg;
db_recno_t new_cur, new_first;
u_int32_t opcode;
int exact, ret, t_ret, writelock;
dbp = dbc->dbp;
dbenv = dbp->dbenv;
mpf = dbp->mpf;
if (pgnop != NULL)
*pgnop = PGNO_INVALID;
cp = (QUEUE_CURSOR *)dbc->internal;
switch (flags) {
case DB_KEYFIRST:
case DB_KEYLAST:
case DB_NOOVERWRITE:
if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
return (ret);
case DB_CURRENT:
break;
default:
return (__db_ferr(dbenv, "DBC->put", 0));
}
if ((ret = __db_lget(dbc, LCK_COUPLE,
cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &cp->lock)) != 0)
return (ret);
lock = cp->lock;
if ((ret = __qam_position(dbc, &cp->recno, DB_LOCK_WRITE,
DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &exact)) != 0) {
(void)__LPUT(dbc, lock);
return (ret);
}
if (exact != 0 && flags == DB_NOOVERWRITE)
ret = DB_KEYEXIST;
else
ret = __qam_pitem(dbc,
(QPAGE *)cp->page, cp->indx, cp->recno, data);
if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
ret = t_ret;
if ((t_ret = __qam_fput(dbp,
cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0)
ret = t_ret;
cp->page = NULL;
cp->lock = lock;
cp->lock_mode = DB_LOCK_WRITE;
if (ret != 0)
return (ret);
pg = ((QUEUE *)dbp->q_internal)->q_meta;
writelock = 0;
if ((ret = __memp_fget(mpf, &pg, dbc->txn, 0, &meta)) != 0)
return (ret);
if ((ret = __db_lget(dbc, LCK_COUPLE,
pg, DB_LOCK_READ, 0, &cp->lock)) != 0) {
(void)__memp_fput(mpf, meta, dbc->priority);
return (ret);
}
opcode = 0;
new_cur = new_first = 0;
recheck:
if (meta->first_recno == meta->cur_recno) {
new_first = cp->recno;
new_cur = cp->recno + 1;
if (new_cur == RECNO_OOB)
new_cur++;
opcode |= QAM_SETFIRST;
opcode |= QAM_SETCUR;
} else {
if (QAM_BEFORE_FIRST(meta, cp->recno)) {
new_first = cp->recno;
opcode |= QAM_SETFIRST;
}
if (QAM_AFTER_CURRENT(meta, cp->recno)) {
new_cur = cp->recno + 1;
if (new_cur == RECNO_OOB)
new_cur++;
opcode |= QAM_SETCUR;
}
}
if (opcode == 0)
goto done;
if (writelock == 0 && (ret = __db_lget(dbc, LCK_COUPLE_ALWAYS,
pg, DB_LOCK_WRITE, 0, &cp->lock)) != 0) {
(void)__memp_fput(mpf, meta, dbc->priority);
return (ret);
}
if (writelock++ == 0)
goto recheck;
if (((ret = __memp_dirty(mpf,
&meta, dbc->txn, dbc->priority, DB_MPOOL_DIRTY)) != 0 ||
(DBC_LOGGING(dbc) &&
(ret = __qam_mvptr_log(dbp, dbc->txn,
&meta->dbmeta.lsn, 0, opcode, meta->first_recno,
new_first, meta->cur_recno, new_cur,
&meta->dbmeta.lsn, PGNO_BASE_MD)) != 0)))
opcode = 0;
if (opcode & QAM_SETCUR)
meta->cur_recno = new_cur;
if (opcode & QAM_SETFIRST)
meta->first_recno = new_first;
done: if ((t_ret = __memp_fput(mpf,
meta, dbc->priority)) != 0 && ret == 0)
ret = t_ret;
if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
int
__qam_append(dbc, key, data)
DBC *dbc;
DBT *key, *data;
{
DB *dbp;
DB_LOCK lock;
DB_MPOOLFILE *mpf;
QMETA *meta;
QPAGE *page;
QUEUE *qp;
QUEUE_CURSOR *cp;
db_pgno_t pg;
db_recno_t recno;
int ret, t_ret;
dbp = dbc->dbp;
mpf = dbp->mpf;
cp = (QUEUE_CURSOR *)dbc->internal;
pg = ((QUEUE *)dbp->q_internal)->q_meta;
if ((ret = __memp_fget(mpf, &pg, dbc->txn, DB_MPOOL_DIRTY, &meta)) != 0)
return (ret);
if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) {
(void)__memp_fput(mpf, meta, dbc->priority);
return (ret);
}
recno = meta->cur_recno;
meta->cur_recno++;
if (meta->cur_recno == RECNO_OOB)
meta->cur_recno++;
if (meta->cur_recno == meta->first_recno) {
meta->cur_recno--;
if (meta->cur_recno == RECNO_OOB)
meta->cur_recno--;
ret = __LPUT(dbc, lock);
if (ret == 0)
ret = EFBIG;
goto err;
}
if (QAM_BEFORE_FIRST(meta, recno))
meta->first_recno = recno;
ret = __db_lget(dbc, LCK_COUPLE_ALWAYS,
recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock);
if (dbc->dbp->db_append_recno != NULL &&
(t_ret = dbc->dbp->db_append_recno(dbc->dbp, data, recno)) != 0 &&
ret == 0)
ret = t_ret;
if (ret != 0) {
(void)__LPUT(dbc, lock);
goto err;
}
cp->lock = lock;
cp->lock_mode = DB_LOCK_WRITE;
pg = QAM_RECNO_PAGE(dbp, recno);
if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0)
goto err;
if ((ret = __qam_fget(dbp, &pg, dbc->txn,
DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &page)) != 0) {
(void)__LPUT(dbc, lock);
goto err;
}
if (page->pgno == 0) {
page->pgno = pg;
page->type = P_QAMDATA;
}
ret = __qam_pitem(dbc, page,
QAM_RECNO_INDEX(dbp, pg, recno), recno, data);
if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
ret = t_ret;
if ((t_ret =
__qam_fput(dbp, pg, page, dbc->priority)) != 0 && ret == 0)
ret = t_ret;
if (ret == 0 && key != NULL)
ret = __db_retcopy(dbp->dbenv, key,
&recno, sizeof(recno), &dbc->rkey->data, &dbc->rkey->ulen);
cp->recno = recno;
qp = (QUEUE *) dbp->q_internal;
if (qp->page_ext != 0 &&
(recno % (qp->page_ext * qp->rec_page) == 0 ||
recno == UINT32_MAX)) {
if ((ret = __db_lget(dbc,
0, ((QUEUE *)dbp->q_internal)->q_meta,
DB_LOCK_WRITE, 0, &lock)) != 0)
goto err;
if (!QAM_AFTER_CURRENT(meta, recno))
ret = __qam_fclose(dbp, pg);
if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
ret = t_ret;
}
err:
if ((t_ret = __memp_fput(mpf, meta, dbc->priority)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
static int
__qamc_del(dbc)
DBC *dbc;
{
DB *dbp;
DBT data;
DB_LOCK lock, metalock;
DB_MPOOLFILE *mpf;
PAGE *pagep;
QAMDATA *qp;
QMETA *meta;
QUEUE_CURSOR *cp;
db_pgno_t pg;
int exact, ret, t_ret;
dbp = dbc->dbp;
mpf = dbp->mpf;
cp = (QUEUE_CURSOR *)dbc->internal;
LOCK_INIT(lock);
pg = ((QUEUE *)dbp->q_internal)->q_meta;
if ((ret = __memp_fget(mpf, &pg, dbc->txn, 0, &meta)) != 0)
return (ret);
if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_READ, 0, &metalock)) != 0) {
(void)__memp_fput(mpf, meta, dbc->priority);
return (ret);
}
if (QAM_NOT_VALID(meta, cp->recno))
ret = DB_NOTFOUND;
if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
ret = t_ret;
if (ret != 0)
goto err;
if ((ret = __db_lget(dbc, LCK_COUPLE,
cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &cp->lock)) != 0)
goto err;
cp->lock_mode = DB_LOCK_WRITE;
lock = cp->lock;
if ((ret = __qam_position(dbc, &cp->recno, DB_LOCK_WRITE,
DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &exact)) != 0)
goto err;
if (!exact) {
ret = DB_NOTFOUND;
goto err;
}
pagep = cp->page;
qp = QAM_GET_RECORD(dbp, pagep, cp->indx);
if (DBC_LOGGING(dbc)) {
if (((QUEUE *)dbp->q_internal)->page_ext == 0 ||
((QUEUE *)dbp->q_internal)->re_len == 0) {
if ((ret = __qam_del_log(dbp,
dbc->txn, &LSN(pagep), 0, &LSN(pagep),
pagep->pgno, cp->indx, cp->recno)) != 0)
goto err;
} else {
data.size = ((QUEUE *)dbp->q_internal)->re_len;
data.data = qp->data;
if ((ret = __qam_delext_log(dbp,
dbc->txn, &LSN(pagep), 0, &LSN(pagep),
pagep->pgno, cp->indx, cp->recno, &data)) != 0)
goto err;
}
} else
LSN_NOT_LOGGED(LSN(pagep));
F_CLR(qp, QAM_VALID);
if (cp->recno == meta->first_recno) {
pg = ((QUEUE *)dbp->q_internal)->q_meta;
if ((ret =
__db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &metalock)) != 0)
goto err;
if (cp->recno == meta->first_recno)
ret = __qam_consume(dbc, meta, meta->first_recno);
if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
ret = t_ret;
}
err: if ((t_ret = __memp_fput(mpf, meta, dbc->priority)) != 0 && ret == 0)
ret = t_ret;
if (cp->page != NULL &&
(t_ret = __qam_fput(dbp,
cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0)
ret = t_ret;
cp->page = NULL;
if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
ret = t_ret;
cp->lock = lock;
return (ret);
}
#ifdef DEBUG_WOP
#define QDEBUG
#endif
static int
__qamc_get(dbc, key, data, flags, pgnop)
DBC *dbc;
DBT *key, *data;
u_int32_t flags;
db_pgno_t *pgnop;
{
DB *dbp;
DBC *dbcdup;
DBT tmp;
DB_ENV *dbenv;
DB_LOCK lock, pglock, metalock;
DB_MPOOLFILE *mpf;
PAGE *pg;
QAMDATA *qp;
QMETA *meta;
QUEUE *t;
QUEUE_CURSOR *cp;
db_lockmode_t lock_mode, meta_mode;
db_pgno_t metapno;
db_recno_t first;
int exact, inorder, is_first, locked, ret, t_ret, wait, with_delete;
int retrying;
dbp = dbc->dbp;
dbenv = dbp->dbenv;
mpf = dbp->mpf;
cp = (QUEUE_CURSOR *)dbc->internal;
LOCK_INIT(lock);
LOCK_INIT(pglock);
lock_mode = F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
meta_mode = DB_LOCK_READ;
meta = NULL;
*pgnop = 0;
pg = NULL;
retrying = t_ret = wait = with_delete = 0;
if (flags == DB_CONSUME_WAIT) {
wait = 1;
flags = DB_CONSUME;
}
if (flags == DB_CONSUME) {
with_delete = 1;
flags = DB_FIRST;
meta_mode = lock_mode = DB_LOCK_WRITE;
}
inorder = F_ISSET(dbp, DB_AM_INORDER) && with_delete;
DEBUG_LREAD(dbc, dbc->txn, "qamc_get",
flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
locked = 0;
is_first = 0;
first = 0;
t = (QUEUE *)dbp->q_internal;
metapno = t->q_meta;
LOCK_INIT(metalock);
if ((ret = __memp_fget(mpf, &metapno, dbc->txn, 0, &meta)) != 0)
return (ret);
get_next:
switch (flags) {
case DB_NEXT:
case DB_NEXT_NODUP:
case DB_FIRST:
case DB_PREV:
case DB_PREV_NODUP:
case DB_LAST:
if ((ret = __db_lget(dbc,
0, metapno, meta_mode, 0, &metalock)) != 0)
goto err;
locked = 1;
break;
default:
break;
}
if ((ret = __TLPUT(dbc, cp->lock)) != 0)
goto err;
retry:
switch (flags) {
case DB_CURRENT:
break;
case DB_NEXT_DUP:
case DB_PREV_DUP:
ret = DB_NOTFOUND;
goto err;
case DB_NEXT:
case DB_NEXT_NODUP:
if (cp->recno != RECNO_OOB) {
++cp->recno;
if (cp->recno == RECNO_OOB)
cp->recno++;
if (QAM_AFTER_CURRENT(meta, cp->recno)) {
pg = NULL;
if (!wait) {
ret = DB_NOTFOUND;
goto err;
}
flags = DB_FIRST;
if (first == 0) {
retrying = 1;
goto retry;
}
if (CDB_LOCKING(dbenv)) {
ret = __memp_fput(mpf,
meta, dbc->priority);
meta = NULL;
if (ret != 0)
goto err;
if ((ret = __lock_get(
dbenv, dbc->locker,
DB_LOCK_SWITCH, &dbc->lock_dbt,
DB_LOCK_WAIT, &dbc->mylock)) != 0)
goto err;
if ((ret = __memp_fget(mpf, &metapno,
dbc->txn, 0, &meta)) != 0)
goto err;
if ((ret = __lock_get(
dbenv, dbc->locker,
DB_LOCK_UPGRADE, &dbc->lock_dbt,
DB_LOCK_WRITE, &dbc->mylock)) != 0)
goto err;
goto retry;
}
if (locked == 0) {
if ((ret = __db_lget(dbc, 0, metapno,
meta_mode, 0, &metalock)) != 0)
goto err;
locked = 1;
if (cp->recno != RECNO_OOB &&
!QAM_AFTER_CURRENT(meta, cp->recno))
goto retry;
}
ret = __memp_fput(mpf, meta, dbc->priority);
meta = NULL;
if (ret != 0)
goto err;
if ((ret = __db_lget(dbc,
0, metapno, DB_LOCK_WAIT,
DB_LOCK_SWITCH, &metalock)) != 0) {
if (ret == DB_LOCK_DEADLOCK)
ret = DB_LOCK_NOTGRANTED;
goto err;
}
if ((ret = __memp_fget(mpf, &metapno, dbc->txn,
0, &meta)) != 0)
goto err;
if ((ret = __db_lget(dbc, 0,
PGNO_INVALID, DB_LOCK_WRITE,
DB_LOCK_UPGRADE, &metalock)) != 0) {
if (ret == DB_LOCK_DEADLOCK)
ret = DB_LOCK_NOTGRANTED;
goto err;
}
locked = 1;
goto retry;
}
break;
}
case DB_FIRST:
flags = DB_NEXT;
is_first = 1;
cp->recno = first = meta->first_recno;
break;
case DB_PREV:
case DB_PREV_NODUP:
if (cp->recno != RECNO_OOB) {
if (cp->recno == meta->first_recno ||
QAM_BEFORE_FIRST(meta, cp->recno)) {
ret = DB_NOTFOUND;
goto err;
}
--cp->recno;
if (cp->recno == RECNO_OOB)
--cp->recno;
break;
}
case DB_LAST:
if (meta->first_recno == meta->cur_recno) {
ret = DB_NOTFOUND;
goto err;
}
cp->recno = meta->cur_recno - 1;
if (cp->recno == RECNO_OOB)
cp->recno--;
break;
case DB_SET:
case DB_SET_RANGE:
case DB_GET_BOTH:
case DB_GET_BOTH_RANGE:
if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
goto err;
break;
default:
ret = __db_unknown_flag(dbenv, "__qamc_get", flags);
goto err;
}
if (locked) {
if ((ret = __LPUT(dbc, metalock)) != 0)
goto err;
locked = 0;
}
if (((ret = __db_lget(dbc, LCK_COUPLE, cp->recno, lock_mode,
(with_delete && !inorder && !retrying) ?
DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD,
&lock)) == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED) &&
with_delete) {
#ifdef QDEBUG
if (DBC_LOGGING(dbc))
(void)__log_printf(dbenv,
dbc->txn, "Queue S: %x %d %d %d",
dbc->locker ? dbc->locker->id : 0,
cp->recno, first, meta->first_recno);
#endif
first = 0;
if ((ret =
__db_lget(dbc, 0, metapno, meta_mode, 0, &metalock)) != 0)
goto err;
locked = 1;
goto retry;
}
if (ret != 0)
goto err;
switch (flags) {
default:
if (inorder) {
if (first != cp->recno)
break;
} else if (with_delete || !is_first)
break;
case DB_SET:
case DB_SET_RANGE:
case DB_GET_BOTH:
case DB_GET_BOTH_RANGE:
case DB_LAST:
if ((ret =
__db_lget(dbc, 0, metapno, meta_mode, 0, &metalock)) != 0)
goto lerr;
locked = 1;
if ((is_first && cp->recno != meta->first_recno) ||
(flags == DB_LAST && cp->recno != meta->cur_recno - 1)) {
if ((ret = __LPUT(dbc, lock)) != 0)
goto err;
if (is_first)
flags = DB_FIRST;
goto retry;
} else if (!is_first && flags != DB_LAST) {
if (QAM_BEFORE_FIRST(meta, cp->recno)) {
if (flags == DB_SET_RANGE ||
flags == DB_GET_BOTH_RANGE) {
if ((ret = __LPUT(dbc, metalock)) != 0)
goto err;
locked = 0;
cp->lock = lock;
LOCK_INIT(lock);
goto release_retry;
}
ret = DB_NOTFOUND;
goto lerr;
}
if (QAM_AFTER_CURRENT(meta, cp->recno)) {
ret = DB_NOTFOUND;
goto lerr;
}
}
if ((ret = __LPUT(dbc, metalock)) != 0)
goto err;
locked = 0;
}
if ((ret = __qam_position(dbc, &cp->recno,
lock_mode, 0, &exact)) != 0) {
(void)__LPUT(dbc, lock);
goto err;
}
pg = cp->page;
pglock = cp->lock;
cp->lock = lock;
cp->lock_mode = lock_mode;
LOCK_INIT(lock);
if (!exact) {
release_retry:
if (pg != NULL)
(void)__qam_fput(dbp, cp->pgno, pg, dbc->priority);
cp->page = pg = NULL;
if ((ret = __LPUT(dbc, pglock)) != 0)
goto err1;
if (with_delete) {
if ((ret = __LPUT(dbc, cp->lock)) != 0)
goto err1;
} else if ((ret = __TLPUT(dbc, cp->lock)) != 0)
goto err1;
switch (flags) {
case DB_NEXT:
case DB_NEXT_NODUP:
if (!with_delete)
is_first = 0;
if (QAM_BEFORE_FIRST(meta, cp->recno) &&
DONT_NEED_LOCKS(dbc))
flags = DB_FIRST;
break;
case DB_LAST:
case DB_PREV:
case DB_PREV_NODUP:
if (QAM_AFTER_CURRENT(meta, cp->recno) &&
DONT_NEED_LOCKS(dbc))
flags = DB_LAST;
else
flags = DB_PREV;
break;
case DB_GET_BOTH_RANGE:
case DB_SET_RANGE:
if (QAM_BEFORE_FIRST(meta, cp->recno) &&
DONT_NEED_LOCKS(dbc))
flags = DB_FIRST;
else
flags = DB_NEXT;
break;
default:
ret = DB_KEYEMPTY;
goto err1;
}
retrying = 0;
goto get_next;
}
qp = QAM_GET_RECORD(dbp, pg, cp->indx);
if (flags == DB_GET_BOTH || flags == DB_GET_BOTH_RANGE) {
tmp.data = qp->data;
tmp.size = t->re_len;
if ((ret = __bam_defcmp(dbp, data, &tmp)) != 0) {
if (flags == DB_GET_BOTH_RANGE)
goto release_retry;
ret = DB_NOTFOUND;
goto err1;
}
}
if (key != NULL && !F_ISSET(key, DB_DBT_ISSET)) {
if ((ret = __db_retcopy(dbp->dbenv,
key, &cp->recno, sizeof(cp->recno),
&dbc->rkey->data, &dbc->rkey->ulen)) != 0)
goto err1;
F_SET(key, DB_DBT_ISSET);
}
if (data != NULL &&
!F_ISSET(dbc, DBC_MULTIPLE|DBC_MULTIPLE_KEY) &&
!F_ISSET(data, DB_DBT_ISSET)) {
if ((ret = __db_retcopy(dbp->dbenv, data, qp->data, t->re_len,
&dbc->rdata->data, &dbc->rdata->ulen)) != 0)
goto err1;
F_SET(data, DB_DBT_ISSET);
}
if (with_delete) {
DB_ASSERT(dbenv, !F_ISSET(dbp, DB_AM_SECONDARY));
if ((ret = __qam_dirty(dbp, cp->pgno, &cp->page,
dbc->txn, dbc->priority)) != 0)
goto err1;
pg = cp->page;
if (LIST_FIRST(&dbp->s_secondaries) != NULL) {
if ((ret = __dbc_idup(dbc,
&dbcdup, DB_POSITION)) != 0)
goto err1;
if ((ret = __dbc_del_primary(dbcdup)) != 0) {
(void)__dbc_close(dbcdup);
goto err1;
}
if ((ret = __dbc_close(dbcdup)) != 0)
goto err1;
}
if (DBC_LOGGING(dbc)) {
if (t->page_ext == 0 || t->re_len == 0) {
if ((ret = __qam_del_log(dbp, dbc->txn,
&LSN(pg), 0, &LSN(pg),
pg->pgno, cp->indx, cp->recno)) != 0)
goto err1;
} else {
tmp.data = qp->data;
tmp.size = t->re_len;
if ((ret = __qam_delext_log(dbp,
dbc->txn, &LSN(pg), 0, &LSN(pg),
pg->pgno, cp->indx, cp->recno, &tmp)) != 0)
goto err1;
}
} else
LSN_NOT_LOGGED(LSN(pg));
F_CLR(qp, QAM_VALID);
if ((ret = __LPUT(dbc, pglock)) != 0)
goto err1;
if (locked == 0 && (ret = __db_lget(
dbc, 0, metapno, meta_mode, 0, &metalock)) != 0)
goto err1;
locked = 1;
#ifdef QDEBUG
if (DBC_LOGGING(dbc))
(void)__log_printf(dbenv,
dbc->txn, "Queue D: %x %d %d %d",
dbc->locker ? dbc->locker->id : 0,
cp->recno, first, meta->first_recno);
#endif
if (first == 0)
first = cp->recno;
if (first != meta->first_recno)
goto done;
if ((ret = __qam_consume(dbc, meta, first)) != 0)
goto err1;
}
done:
err1: if (cp->page != NULL) {
if ((t_ret = __qam_fput(dbp,
cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0)
ret = t_ret;
if ((t_ret = __LPUT(dbc, pglock)) != 0 && ret == 0)
ret = t_ret;
cp->page = NULL;
}
if (0) {
lerr: (void)__LPUT(dbc, lock);
}
err: if (meta) {
if ((t_ret = __memp_fput(mpf,
meta, dbc->priority)) != 0 && ret == 0)
ret = t_ret;
if (locked)
if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
ret = t_ret;
}
DB_ASSERT(dbenv, !LOCK_ISSET(metalock));
return ((ret == DB_LOCK_NOTGRANTED &&
!F_ISSET(dbenv, DB_ENV_TIME_NOTGRANTED)) ?
DB_LOCK_DEADLOCK : ret);
}
static int
__qam_consume(dbc, meta, first)
DBC *dbc;
QMETA *meta;
db_recno_t first;
{
DB *dbp;
DB_LOCK lock, save_lock;
DB_MPOOLFILE *mpf;
QUEUE_CURSOR *cp;
db_indx_t save_indx;
db_pgno_t save_page;
db_recno_t current, save_recno;
u_int32_t rec_extent;
int exact, ret, t_ret, wrapped;
dbp = dbc->dbp;
mpf = dbp->mpf;
cp = (QUEUE_CURSOR *)dbc->internal;
ret = 0;
save_page = cp->pgno;
save_indx = cp->indx;
save_recno = cp->recno;
save_lock = cp->lock;
if (first != cp->recno) {
ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
if (ret == DB_LOCK_DEADLOCK) {
ret = 0;
goto done;
}
if (ret != 0)
goto done;
if ((ret =
__qam_fput(dbp, cp->pgno, cp->page, dbc->priority)) != 0)
goto done;
cp->page = NULL;
if ((ret = __qam_position(dbc,
&first, DB_LOCK_READ, 0, &exact)) != 0 || exact != 0) {
(void)__LPUT(dbc, lock);
goto done;
}
if ((ret =__LPUT(dbc, lock)) != 0)
goto done;
if ((ret = __LPUT(dbc, cp->lock)) != 0)
goto done;
}
current = meta->cur_recno;
wrapped = 0;
if (first > current)
wrapped = 1;
rec_extent = meta->page_ext * meta->rec_page;
for (;;) {
if (cp->page != NULL && rec_extent != 0 &&
((exact = (first % rec_extent == 0)) ||
(first % meta->rec_page == 0) ||
first == UINT32_MAX)) {
if (exact == 1 && (ret = __db_lget(dbc,
0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
break;
#ifdef QDEBUG
if (DBC_LOGGING(dbc))
(void)__log_printf(dbp->dbenv, dbc->txn,
"Queue R: %x %d %d %d",
dbc->locker ? dbc->locker->id : 0,
cp->pgno, first, meta->first_recno);
#endif
if ((ret = __qam_fput(dbp,
cp->pgno, cp->page, DB_PRIORITY_VERY_LOW)) != 0)
break;
cp->page = NULL;
if (exact == 1) {
ret = __qam_fremove(dbp, cp->pgno);
if ((t_ret =
__LPUT(dbc, cp->lock)) != 0 && ret == 0)
ret = t_ret;
}
if (ret != 0)
break;
} else if (cp->page != NULL && (ret = __qam_fput(dbp,
cp->pgno, cp->page, dbc->priority)) != 0)
break;
cp->page = NULL;
first++;
if (first == RECNO_OOB) {
wrapped = 0;
first++;
}
if (!wrapped && first >= current)
break;
ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
if (ret == DB_LOCK_DEADLOCK) {
ret = 0;
break;
}
if (ret != 0)
break;
if ((ret = __qam_position(dbc,
&first, DB_LOCK_READ, 0, &exact)) != 0) {
(void)__LPUT(dbc, lock);
break;
}
if ((ret =__LPUT(dbc, lock)) != 0 ||
(ret = __LPUT(dbc, cp->lock)) != 0 || exact) {
if ((t_ret = __qam_fput(dbp, cp->pgno,
cp->page, dbc->priority)) != 0 && ret == 0)
ret = t_ret;
cp->page = NULL;
break;
}
}
cp->pgno = save_page;
cp->indx = save_indx;
cp->recno = save_recno;
cp->lock = save_lock;
if (ret == 0 && meta->first_recno != first) {
if ((ret = __memp_dirty(mpf,
&meta, dbc->txn, dbc->priority, 0)) != 0)
goto done;
#ifdef QDEBUG
if (DBC_LOGGING(dbc))
(void)__log_printf(dbp->dbenv, dbc->txn,
"Queue M: %x %d %d %d",
dbc->locker ? dbc->locker->id : 0,
cp->recno, first, meta->first_recno);
#endif
if (DBC_LOGGING(dbc)) {
if ((ret = __qam_incfirst_log(dbp,
dbc->txn, &meta->dbmeta.lsn, 0,
cp->recno, PGNO_BASE_MD)) != 0)
goto done;
} else
LSN_NOT_LOGGED(meta->dbmeta.lsn);
meta->first_recno = first;
}
done:
return (ret);
}
static int
__qam_bulk(dbc, data, flags)
DBC *dbc;
DBT *data;
u_int32_t flags;
{
DB *dbp;
DB_LOCK metalock, rlock;
DB_MPOOLFILE *mpf;
PAGE *pg;
QMETA *meta;
QAMDATA *qp;
QUEUE_CURSOR *cp;
db_indx_t indx;
db_lockmode_t lkmode;
db_pgno_t metapno;
u_int32_t *endp, *offp;
u_int32_t pagesize, re_len, recs;
u_int8_t *dbuf, *dp, *np;
int exact, ret, t_ret, valid;
int is_key, need_pg, size, space;
dbp = dbc->dbp;
mpf = dbp->mpf;
cp = (QUEUE_CURSOR *)dbc->internal;
lkmode = F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
pagesize = dbp->pgsize;
re_len = ((QUEUE *)dbp->q_internal)->re_len;
recs = ((QUEUE *)dbp->q_internal)->rec_page;
metapno = ((QUEUE *)dbp->q_internal)->q_meta;
is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
size = 0;
if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_READ, 0, &metalock)) != 0)
return (ret);
if ((ret = __memp_fget(mpf, &metapno, dbc->txn, 0, &meta)) != 0) {
(void)__LPUT(dbc, metalock);
return (ret);
}
dbuf = data->data;
np = dp = dbuf;
space = (int)data->ulen;
space -= (int)sizeof(*offp);
endp = (u_int32_t *)((u_int8_t *)dbuf + data->ulen);
endp--;
offp = endp;
rlock = cp->lock;
LOCK_INIT(cp->lock);
next_pg:
if (cp->recno == RECNO_OOB)
cp->recno++;
if ((ret = __qam_position(dbc, &cp->recno, lkmode, 0, &exact)) != 0)
goto done;
pg = cp->page;
indx = cp->indx;
need_pg = 1;
do {
valid = 0;
if (pg != NULL) {
if ((ret = __db_lget(dbc, LCK_COUPLE,
cp->recno, lkmode, DB_LOCK_RECORD, &rlock)) != 0)
goto done;
qp = QAM_GET_RECORD(dbp, pg, indx);
if (F_ISSET(qp, QAM_VALID)) {
valid = 1;
space -= (int)
((is_key ? 3 : 2) * sizeof(*offp));
if (space < 0)
goto get_space;
if (need_pg) {
dp = np;
size = (int)pagesize - QPAGE_SZ(dbp);
if (space < size) {
get_space:
if (offp == endp) {
data->size = (u_int32_t)
DB_ALIGN((u_int32_t)
size + pagesize,
sizeof(u_int32_t));
ret = DB_BUFFER_SMALL;
break;
}
if (indx != 0)
indx--;
cp->recno--;
space = 0;
break;
}
memcpy(dp,
(u_int8_t *)pg + QPAGE_SZ(dbp),
(u_int)size);
need_pg = 0;
space -= size;
np += size;
}
if (is_key)
*offp-- = cp->recno;
*offp-- = (u_int32_t)((((u_int8_t *)qp -
(u_int8_t *)pg) - QPAGE_SZ(dbp)) +
(dp - dbuf) + SSZA(QAMDATA, data));
*offp-- = re_len;
}
}
if (!valid && is_key == 0) {
*offp-- = 0;
*offp-- = 0;
}
cp->recno++;
} while (++indx < recs && cp->recno != RECNO_OOB &&
!QAM_AFTER_CURRENT(meta, cp->recno));
if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
ret = t_ret;
if (cp->page != NULL) {
if ((t_ret = __qam_fput(dbp,
cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0)
ret = t_ret;
cp->page = NULL;
}
if (ret == 0 && space > 0 &&
(indx >= recs || cp->recno == RECNO_OOB) &&
!QAM_AFTER_CURRENT(meta, cp->recno))
goto next_pg;
if (cp->recno == RECNO_OOB || (space == 0 && indx == recs))
cp->recno--;
if (is_key == 1)
*offp = RECNO_OOB;
else
*offp = (u_int32_t)-1;
done:
if ((t_ret = __memp_fput(mpf, meta, dbc->priority)) != 0 && ret == 0)
ret = t_ret;
if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
ret = t_ret;
cp->lock = rlock;
return (ret);
}
static int
__qamc_close(dbc, root_pgno, rmroot)
DBC *dbc;
db_pgno_t root_pgno;
int *rmroot;
{
QUEUE_CURSOR *cp;
int ret;
COMPQUIET(root_pgno, 0);
COMPQUIET(rmroot, NULL);
cp = (QUEUE_CURSOR *)dbc->internal;
ret = __TLPUT(dbc, cp->lock);
LOCK_INIT(cp->lock);
cp->page = NULL;
cp->pgno = PGNO_INVALID;
cp->indx = 0;
cp->lock_mode = DB_LOCK_NG;
cp->recno = RECNO_OOB;
cp->flags = 0;
return (ret);
}
int
__qamc_dup(orig_dbc, new_dbc)
DBC *orig_dbc, *new_dbc;
{
QUEUE_CURSOR *orig, *new;
orig = (QUEUE_CURSOR *)orig_dbc->internal;
new = (QUEUE_CURSOR *)new_dbc->internal;
new->recno = orig->recno;
return (0);
}
int
__qamc_init(dbc)
DBC *dbc;
{
QUEUE_CURSOR *cp;
DB *dbp;
int ret;
dbp = dbc->dbp;
cp = (QUEUE_CURSOR *)dbc->internal;
if (cp == NULL) {
if ((ret =
__os_calloc(dbp->dbenv, 1, sizeof(QUEUE_CURSOR), &cp)) != 0)
return (ret);
dbc->internal = (DBC_INTERNAL *)cp;
}
dbc->close = dbc->c_close = __dbc_close_pp;
dbc->count = dbc->c_count = __dbc_count_pp;
dbc->del = dbc->c_del = __dbc_del_pp;
dbc->dup = dbc->c_dup = __dbc_dup_pp;
dbc->get = dbc->c_get = __dbc_get_pp;
dbc->pget = dbc->c_pget = __dbc_pget_pp;
dbc->put = dbc->c_put = __dbc_put_pp;
dbc->am_bulk = __qam_bulk;
dbc->am_close = __qamc_close;
dbc->am_del = __qamc_del;
dbc->am_destroy = __qamc_destroy;
dbc->am_get = __qamc_get;
dbc->am_put = __qamc_put;
dbc->am_writelock = NULL;
return (0);
}
static int
__qamc_destroy(dbc)
DBC *dbc;
{
__os_free(dbc->dbp->dbenv, dbc->internal);
return (0);
}
static int
__qam_getno(dbp, key, rep)
DB *dbp;
const DBT *key;
db_recno_t *rep;
{
if ((*rep = *(db_recno_t *)key->data) == 0) {
__db_errx(dbp->dbenv, "illegal record number of 0");
return (EINVAL);
}
return (0);
}
int
__qam_truncate(dbc, countp)
DBC *dbc;
u_int32_t *countp;
{
DB *dbp;
DB_LOCK metalock;
DB_MPOOLFILE *mpf;
QMETA *meta;
db_pgno_t metapno;
u_int32_t count;
int ret, t_ret;
dbp = dbc->dbp;
for (count = 0;
(ret = __qamc_get(dbc, NULL, NULL, DB_CONSUME, &metapno)) == 0;)
count++;
if (ret != DB_NOTFOUND)
return (ret);
metapno = ((QUEUE *)dbp->q_internal)->q_meta;
if ((ret =
__db_lget(dbc, 0, metapno, DB_LOCK_WRITE, 0, &metalock)) != 0)
return (ret);
mpf = dbp->mpf;
if ((ret = __memp_fget(mpf, &metapno, dbc->txn,
DB_MPOOL_DIRTY, &meta)) != 0) {
(void)__LPUT(dbc, metalock);
return (ret);
}
if (meta->cur_recno > 1 && ((QUEUE *)dbp->q_internal)->page_ext != 0) {
if ((ret = __qam_fremove(dbp,
QAM_RECNO_PAGE(dbp, meta->cur_recno - 1))) != 0)
return (ret);
}
if (DBC_LOGGING(dbc)) {
ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn, 0,
QAM_SETCUR | QAM_SETFIRST | QAM_TRUNCATE, meta->first_recno,
1, meta->cur_recno, 1, &meta->dbmeta.lsn, PGNO_BASE_MD);
} else
LSN_NOT_LOGGED(meta->dbmeta.lsn);
if (ret == 0)
meta->first_recno = meta->cur_recno = 1;
if ((t_ret = __memp_fput(mpf, meta, dbc->priority)) != 0 && ret == 0)
ret = t_ret;
if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
ret = t_ret;
if (countp != NULL)
*countp = count;
return (ret);
}
int
__qam_delete(dbc, key)
DBC *dbc;
DBT *key;
{
QUEUE_CURSOR *cp;
int ret;
cp = (QUEUE_CURSOR *)dbc->internal;
if ((ret = __qam_getno(dbc->dbp, key, &cp->recno)) != 0)
goto err;
ret = __qamc_del(dbc);
err: return (ret);
}