cyrusdb_berkeley.c [plain text]
#include <config.h>
#include <db.h>
#include <syslog.h>
#include <assert.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include "cyrusdb.h"
#include "exitcodes.h"
#include "libcyr_cfg.h"
#include "xmalloc.h"
extern void fatal(const char *, int);
#define CONFIG_DEADLOCK_DETECTION DB_LOCK_YOUNGEST
#define MIN_CACHESIZE 20
#define MAX_CACHESIZE 4194303
#if DB_VERSION_MAJOR >= 4
#define txn_checkpoint(xx1,xx2,xx3,xx4) (xx1)->txn_checkpoint(xx1,xx2,xx3,xx4)
#define txn_id(xx1) (xx1)->id(xx1)
#define log_archive(xx1,xx2,xx3,xx4) (xx1)->log_archive(xx1,xx2,xx3)
#define txn_begin(xx1,xx2,xx3,xx4) (xx1)->txn_begin(xx1,xx2,xx3,xx4)
#define txn_commit(xx1,xx2) (xx1)->commit(xx1,xx2)
#define txn_abort(xx1) (xx1)->abort(xx1)
#elif DB_VERSION_MINOR == 3
#define log_archive(xx1,xx2,xx3,xx4) log_archive(xx1,xx2,xx3)
#endif
static int dbinit = 0;
static DB_ENV *dbenv;
static int commit_txn(struct db *db, struct txn *tid);
static int abort_txn(struct db *db, struct txn *tid);
static void db_panic(DB_ENV *dbenv __attribute__((unused)),
int errno __attribute__((unused)))
{
syslog(LOG_CRIT, "DBERROR: critical database situation");
exit(EC_TEMPFAIL);
}
static void db_err(const char *db_prfx, char *buffer)
{
if ((buffer!=NULL)&&(strstr(buffer,"lockers")))
syslog(LOG_DEBUG,"DBERROR %s: %s",db_prfx,buffer);
else
syslog(LOG_WARNING,"DBERROR %s: %s",db_prfx,buffer);
}
static int init(const char *dbdir, int myflags)
{
int r, do_retry = 1;
int flags = 0;
int maj, min, patch;
char *vstr;
static char errpfx[10];
int opt;
if (dbinit++) return 0;
vstr = db_version(&maj, &min, &patch);
if (maj != DB_VERSION_MAJOR || min != DB_VERSION_MINOR ||
DB_VERSION_PATCH > patch) {
syslog(LOG_CRIT, "incorrect version of Berkeley db: "
"compiled against %d.%d.%d, linked against %d.%d.%d",
DB_VERSION_MAJOR, DB_VERSION_MINOR, DB_VERSION_PATCH,
maj, min, patch);
fatal("wrong db version", EC_SOFTWARE);
}
if (myflags & CYRUSDB_RECOVER) {
flags |= DB_RECOVER | DB_CREATE;
}
if ((r = db_env_create(&dbenv, 0)) != 0) {
syslog(LOG_ERR, "DBERROR: db_appinit failed: %s", db_strerror(r));
return CYRUSDB_IOERROR;
}
dbenv->set_paniccall(dbenv, (void (*)(DB_ENV *, int)) &db_panic);
if (CONFIG_DB_VERBOSE) {
dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
dbenv->set_verbose(dbenv, DB_VERB_WAITSFOR, 1);
}
if (CONFIG_DB_VERBOSE > 1) {
#ifdef DB_VERB_CHKPOINT
dbenv->set_verbose(dbenv, DB_VERB_CHKPOINT, 1);
#endif
}
dbenv->set_errcall(dbenv, db_err);
snprintf(errpfx, sizeof(errpfx), "db%d", DB_VERSION_MAJOR);
dbenv->set_errpfx(dbenv, errpfx);
dbenv->set_lk_detect(dbenv, CONFIG_DEADLOCK_DETECTION);
if ((opt = libcyrus_config_getint(CYRUSOPT_BERKELEY_LOCKS_MAX)) < 0) {
syslog(LOG_WARNING,
"DBERROR: invalid berkeley_locks_max value, using internal default");
} else {
r = dbenv->set_lk_max(dbenv, opt);
if (r) {
dbenv->err(dbenv, r, "set_lk_max");
syslog(LOG_ERR, "DBERROR: set_lk_max(): %s", db_strerror(r));
abort();
}
}
if ((opt = libcyrus_config_getint(CYRUSOPT_BERKELEY_TXNS_MAX)) < 0) {
syslog(LOG_WARNING,
"DBERROR: invalid berkeley_txns_max value, using internal default");
} else {
r = dbenv->set_tx_max(dbenv, opt);
if (r) {
dbenv->err(dbenv, r, "set_tx_max");
syslog(LOG_ERR, "DBERROR: set_tx_max(): %s", db_strerror(r));
abort();
}
}
opt = libcyrus_config_getint(CYRUSOPT_BERKELEY_CACHESIZE);
if (opt < MIN_CACHESIZE || opt > MAX_CACHESIZE) {
syslog(LOG_WARNING,
"DBERROR: invalid berkeley_cachesize value, using internal default");
} else {
r = dbenv->set_cachesize(dbenv, 0, opt * 1024, 0);
if (r) {
dbenv->err(dbenv, r, "set_cachesize");
dbenv->close(dbenv, 0);
syslog(LOG_ERR, "DBERROR: set_cachesize(): %s", db_strerror(r));
return CYRUSDB_IOERROR;
}
}
retry:
flags |= DB_INIT_LOCK | DB_INIT_MPOOL |
DB_INIT_LOG | DB_INIT_TXN;
#if (DB_VERSION_MAJOR > 3) || ((DB_VERSION_MAJOR == 3) && (DB_VERSION_MINOR > 0))
r = dbenv->open(dbenv, dbdir, flags, 0644);
#else
r = dbenv->open(dbenv, dbdir, NULL, flags, 0644);
#endif
if (r) {
if (do_retry && (r == ENOENT)) {
flags |= DB_CREATE;
do_retry = 0;
goto retry;
}
syslog(LOG_ERR, "DBERROR: dbenv->open '%s' failed: %s", dbdir,
db_strerror(r));
return CYRUSDB_IOERROR;
}
dbinit = 1;
return 0;
}
static int done(void)
{
int r;
if (--dbinit) return 0;
r = dbenv->close(dbenv, 0);
dbinit = 0;
if (r) {
syslog(LOG_ERR, "DBERROR: error exiting application: %s",
db_strerror(r));
return CYRUSDB_IOERROR;
}
return 0;
}
static int mysync(void)
{
int r;
assert(dbinit);
#if !(DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 1)
do {
#endif
#if (DB_VERSION_MAJOR > 3) || ((DB_VERSION_MAJOR == 3) && (DB_VERSION_MINOR > 0))
r = txn_checkpoint(dbenv, 0, 0, 0);
#else
r = txn_checkpoint(dbenv, 0, 0);
#endif
#if !(DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 1)
} while (r == DB_INCOMPLETE);
#endif
if (r) {
syslog(LOG_ERR, "DBERROR: couldn't checkpoint: %s",
db_strerror(r));
return CYRUSDB_IOERROR;
}
return 0;
}
static int myarchive(const char **fnames, const char *dirname)
{
int r;
char **begin, **list;
const char **fname;
char dstname[1024], *dp;
int length, rest;
strlcpy(dstname, dirname, sizeof(dstname));
length = strlen(dstname);
dp = dstname + length;
rest = sizeof(dstname) - length;
r = log_archive(dbenv, &list, DB_ARCH_ABS, NULL);
if (r) {
syslog(LOG_ERR, "DBERROR: error listing log files: %s",
db_strerror(r));
return CYRUSDB_IOERROR;
}
if (list != NULL) {
for (begin = list; *list != NULL; ++list) {
syslog(LOG_DEBUG, "removing log file: %s", *list);
r = unlink(*list);
if (r) {
syslog(LOG_ERR, "DBERROR: error removing log file: %s",
*list);
return CYRUSDB_IOERROR;
}
}
free (begin);
}
r = log_archive(dbenv, &list, DB_ARCH_ABS | DB_ARCH_DATA, NULL);
if (r) {
syslog(LOG_ERR, "DBERROR: error listing database files: %s",
db_strerror(r));
return CYRUSDB_IOERROR;
}
if (list != NULL) {
for (begin = list; *list != NULL; ++list) {
for (fname = fnames; *fname != NULL; ++fname) {
if (!strcmp(*list, *fname)) break;
}
if (*fname) {
syslog(LOG_DEBUG, "archiving database file: %s", *fname);
strlcpy(dp, strrchr(*fname, '/'), rest);
r = cyrusdb_copyfile(*fname, dstname);
if (r) {
syslog(LOG_ERR,
"DBERROR: error archiving database file: %s",
*fname);
return CYRUSDB_IOERROR;
}
}
}
free (begin);
}
r = log_archive(dbenv, &list, DB_ARCH_ABS | DB_ARCH_LOG, NULL);
if (r) {
syslog(LOG_ERR, "DBERROR: error listing log files: %s",
db_strerror(r));
return CYRUSDB_IOERROR;
}
if (list != NULL) {
for (begin = list; *list != NULL; ++list) {
syslog(LOG_DEBUG, "archiving log file: %s", *list);
strcpy(dp, strrchr(*list, '/'));
r = cyrusdb_copyfile(*list, dstname);
if (r) {
syslog(LOG_ERR, "DBERROR: error archiving log file: %s",
*list);
return CYRUSDB_IOERROR;
}
}
free (begin);
}
return 0;
}
static int myopen(const char *fname, int flags, struct db **ret)
{
DB *db = NULL;
int r;
int dbflags = (flags & CYRUSDB_CREATE) ? DB_CREATE : 0;
assert(dbinit && fname && ret);
*ret = NULL;
r = db_create(&db, dbenv, 0);
if (r != 0) {
syslog(LOG_ERR, "DBERROR: opening %s (creating database handle): %s", fname, db_strerror(r));
return CYRUSDB_IOERROR;
}
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 1
r = db->open(db, NULL, fname, NULL, DB_BTREE, dbflags | DB_AUTO_COMMIT, 0664);
#else
r = db->open(db, fname, NULL, DB_BTREE, dbflags, 0664);
#endif
if (r != 0) {
int level = (flags & CYRUSDB_CREATE) ? LOG_ERR : LOG_DEBUG;
syslog(level, "DBERROR: opening %s: %s", fname, db_strerror(r));
r = db->close(db, DB_NOSYNC);
if (r != 0) {
syslog(level, "DBERROR: closing %s: %s", fname, db_strerror(r));
}
return CYRUSDB_IOERROR;
}
*ret = (struct db *) db;
return r;
}
static int myclose(struct db *db)
{
int r;
DB *a = (DB *) db;
assert(dbinit && db);
r = a->close(a, DB_NOSYNC);
if (r != 0) {
syslog(LOG_ERR, "DBERROR: error closing: %s", db_strerror(r));
r = CYRUSDB_IOERROR;
}
return r;
}
static int gettid(struct txn **mytid, DB_TXN **tid, char *where)
{
int r;
if (mytid) {
if (*mytid) {
assert((txn_id((DB_TXN *)*mytid) != 0));
*tid = (DB_TXN *) *mytid;
if (CONFIG_DB_VERBOSE)
syslog(LOG_DEBUG, "%s: reusing txn %lu", where,
(unsigned long) txn_id(*tid));
} else {
r = txn_begin(dbenv, NULL, tid, 0);
if (r != 0) {
syslog(LOG_ERR, "DBERROR: error beginning txn (%s): %s", where,
db_strerror(r));
return CYRUSDB_IOERROR;
}
if (CONFIG_DB_VERBOSE)
syslog(LOG_DEBUG, "%s: starting txn %lu", where,
(unsigned long) txn_id(*tid));
}
*mytid = (struct txn *) *tid;
}
return 0;
}
static int myfetch(struct db *mydb,
const char *key, int keylen,
const char **data, int *datalen,
struct txn **mytid, int flags)
{
int r = 0;
DBT k, d;
DB *db = (DB *) mydb;
DB_TXN *tid = NULL;
assert(dbinit && db);
if (data) *data = NULL;
if (datalen) *datalen = 0;
r = gettid(mytid, &tid, "myfetch");
if (r) return r;
memset(&k, 0, sizeof(k));
memset(&d, 0, sizeof(d));
k.data = (char *) key;
k.size = keylen;
r = db->get(db, tid, &k, &d, flags);
switch (r) {
case 0:
if (data) *data = d.data;
if (datalen) *datalen = d.size;
break;
case DB_NOTFOUND:
r = CYRUSDB_NOTFOUND;
break;
case DB_LOCK_DEADLOCK:
if (mytid) {
abort_txn(mydb, *mytid);
*mytid = NULL;
}
r = CYRUSDB_AGAIN;
break;
default:
syslog(LOG_ERR, "DBERROR: error fetching %s: %s", key,
db_strerror(r));
r = CYRUSDB_IOERROR;
break;
}
return r;
}
static int fetch(struct db *mydb,
const char *key, int keylen,
const char **data, int *datalen,
struct txn **mytid)
{
return myfetch(mydb, key, keylen, data, datalen, mytid, 0);
}
static int fetchlock(struct db *mydb,
const char *key, int keylen,
const char **data, int *datalen,
struct txn **mytid)
{
return myfetch(mydb, key, keylen, data, datalen, mytid, DB_RMW);
}
#define OPENCURSOR() do { \
r = db->cursor(db, tid, &cursor, 0); \
if (r != 0) { \
syslog(LOG_ERR, "DBERROR: unable to create cursor: %s", \
db_strerror(r)); \
cursor = NULL; \
goto done; \
} \
} while (0)
#define CLOSECURSOR() do { \
int r = cursor->c_close(cursor); \
if (r) { \
syslog(LOG_ERR, "DBERROR: error closing cursor: %s", \
db_strerror(r)); \
cursor = NULL; \
goto done; \
} \
} while (0)
static int foreach(struct db *mydb,
char *prefix, int prefixlen,
foreach_p *goodp,
foreach_cb *cb, void *rock,
struct txn **mytid)
{
int r = 0;
DBT k, d;
DBC *cursor = NULL;
DB *db = (DB *) mydb;
DB_TXN *tid = NULL;
assert(dbinit && db);
assert(cb);
memset(&k, 0, sizeof(k));
memset(&d, 0, sizeof(d));
r = gettid(mytid, &tid, "foreach");
if (r) return r;
if (0) {
restart:
CLOSECURSOR();
}
OPENCURSOR();
if (prefix && *prefix) {
k.data = prefix;
k.size = prefixlen;
r = cursor->c_get(cursor, &k, &d, DB_SET_RANGE);
} else {
r = cursor->c_get(cursor, &k, &d, DB_FIRST);
}
if (!tid && r == DB_LOCK_DEADLOCK) goto restart;
while (!r) {
if (prefixlen && memcmp(k.data, prefix, prefixlen)) break;
if (!goodp || goodp(rock, k.data, k.size, d.data, d.size)) {
CLOSECURSOR(); cursor = NULL;
r = cb(rock, k.data, k.size, d.data, d.size);
if (r != 0) {
if (r < 0) {
syslog(LOG_ERR, "DBERROR: foreach cb() failed");
}
r = 0;
break;
}
OPENCURSOR();
r = cursor->c_get(cursor, &k, &d, DB_SET);
switch (r) {
case 0:
r = cursor->c_get(cursor, &k, &d, DB_NEXT);
break;
case DB_NOTFOUND:
r = cursor->c_get(cursor, &k, &d, DB_SET_RANGE);
break;
default:
break;
}
} else {
r = cursor->c_get(cursor, &k, &d, DB_NEXT);
}
while (r == DB_LOCK_DEADLOCK) {
if (tid) {
break;
}
CLOSECURSOR();
OPENCURSOR();
r = cursor->c_get(cursor, &k, &d, DB_SET);
switch (r) {
case 0:
r = cursor->c_get(cursor, &k, &d, DB_NEXT);
break;
case DB_LOCK_DEADLOCK:
continue;
case DB_NOTFOUND:
r = cursor->c_get(cursor, &k, &d, DB_SET_RANGE);
break;
}
}
}
done:
if (cursor) {
CLOSECURSOR();
}
switch (r) {
case 0:
break;
case DB_NOTFOUND:
r = 0;
break;
case DB_LOCK_DEADLOCK:
if (mytid) {
abort_txn(mydb, *mytid);
*mytid = NULL;
}
r = CYRUSDB_AGAIN;
break;
default:
if (mytid) {
abort_txn(mydb, *mytid);
*mytid = NULL;
}
syslog(LOG_ERR, "DBERROR: error advancing: %s", db_strerror(r));
r = CYRUSDB_IOERROR;
break;
}
return r;
}
static int mystore(struct db *mydb,
const char *key, int keylen,
const char *data, int datalen,
struct txn **mytid, int putflags, int txnflags)
{
int r = 0;
DBT k, d;
DB_TXN *tid;
DB *db = (DB *) mydb;
assert(dbinit && db);
assert(key && keylen);
r = gettid(mytid, &tid, "mystore");
if (r) return r;
memset(&k, 0, sizeof(k));
memset(&d, 0, sizeof(d));
k.data = (char *) key;
k.size = keylen;
d.data = (char *) data;
d.size = datalen;
if (!mytid) {
restart:
r = txn_begin(dbenv, NULL, &tid, 0);
if (r != 0) {
syslog(LOG_ERR, "DBERROR: mystore: error beginning txn: %s",
db_strerror(r));
return CYRUSDB_IOERROR;
}
if (CONFIG_DB_VERBOSE)
syslog(LOG_DEBUG, "mystore: starting txn %lu",
(unsigned long) txn_id(tid));
}
r = db->put(db, tid, &k, &d, putflags);
if (!mytid) {
if (r) {
int r2;
if (CONFIG_DB_VERBOSE)
syslog(LOG_DEBUG, "mystore: aborting txn %lu",
(unsigned long) txn_id(tid));
r2 = txn_abort(tid);
if (r2) {
syslog(LOG_ERR, "DBERROR: mystore: error aborting txn: %s",
db_strerror(r));
return CYRUSDB_IOERROR;
}
if (r == DB_LOCK_DEADLOCK) {
goto restart;
}
} else {
if (CONFIG_DB_VERBOSE)
syslog(LOG_DEBUG, "mystore: committing txn %lu",
(unsigned long) txn_id(tid));
r = txn_commit(tid, txnflags);
}
}
if ( r != 0) {
if (mytid) {
abort_txn(mydb, *mytid);
*mytid = NULL;
}
if (r == DB_LOCK_DEADLOCK) {
r = CYRUSDB_AGAIN;
} else {
syslog(LOG_ERR, "DBERROR: mystore: error storing %s: %s",
key, db_strerror(r));
r = CYRUSDB_IOERROR;
}
}
return r;
}
static int create(struct db *db,
const char *key, int keylen,
const char *data, int datalen,
struct txn **tid)
{
return mystore(db, key, keylen, data, datalen, tid, DB_NOOVERWRITE, 0);
}
static int store(struct db *db,
const char *key, int keylen,
const char *data, int datalen,
struct txn **tid)
{
return mystore(db, key, keylen, data, datalen, tid, 0, 0);
}
static int create_nosync(struct db *db,
const char *key, int keylen,
const char *data, int datalen,
struct txn **tid)
{
return mystore(db, key, keylen, data, datalen, tid, DB_NOOVERWRITE,
DB_TXN_NOSYNC);
}
static int store_nosync(struct db *db,
const char *key, int keylen,
const char *data, int datalen,
struct txn **tid)
{
return mystore(db, key, keylen, data, datalen, tid, 0, DB_TXN_NOSYNC);
}
static int mydelete(struct db *mydb,
const char *key, int keylen,
struct txn **mytid, int txnflags, int force)
{
int r = 0;
DBT k;
DB_TXN *tid;
DB *db = (DB *) mydb;
assert(dbinit && db);
assert(key && keylen);
r = gettid(mytid, &tid, "delete");
if (r) return r;
memset(&k, 0, sizeof(k));
k.data = (char *) key;
k.size = keylen;
if (!mytid) {
restart:
r = txn_begin(dbenv, NULL, &tid, 0);
if (r != 0) {
syslog(LOG_ERR, "DBERROR: mydelete: error beginning txn: %s",
db_strerror(r));
return CYRUSDB_IOERROR;
}
if (CONFIG_DB_VERBOSE)
syslog(LOG_DEBUG, "mydelete: starting txn %lu",
(unsigned long) txn_id(tid));
}
r = db->del(db, tid, &k, 0);
if (!mytid) {
if (r) {
int r2;
if (CONFIG_DB_VERBOSE)
syslog(LOG_DEBUG, "mydelete: aborting txn %lu",
(unsigned long) txn_id(tid));
r2 = txn_abort(tid);
if (r2) {
syslog(LOG_ERR, "DBERROR: mydelete: error aborting txn: %s",
db_strerror(r));
return CYRUSDB_IOERROR;
}
if (r == DB_LOCK_DEADLOCK) {
goto restart;
}
} else {
if (CONFIG_DB_VERBOSE)
syslog(LOG_DEBUG, "mydelete: committing txn %lu",
(unsigned long) txn_id(tid));
r = txn_commit(tid, txnflags);
}
}
if (r != 0) {
if (mytid) {
abort_txn(mydb, *mytid);
*mytid = NULL;
}
if (r == DB_LOCK_DEADLOCK) {
r = CYRUSDB_AGAIN;
} else if (force && r == DB_NOTFOUND) {
r = CYRUSDB_OK;
} else {
syslog(LOG_ERR, "DBERROR: mydelete: error deleting %s: %s",
key, db_strerror(r));
r = CYRUSDB_IOERROR;
}
}
return r;
}
static int delete(struct db *db,
const char *key, int keylen,
struct txn **tid, int force)
{
return mydelete(db, key, keylen, tid, 0, force);
}
static int delete_nosync(struct db *db,
const char *key, int keylen,
struct txn **tid, int force)
{
return mydelete(db, key, keylen, tid, DB_TXN_NOSYNC, force);
}
static int mycommit(struct db *db __attribute__((unused)),
struct txn *tid, int txnflags)
{
int r;
DB_TXN *t = (DB_TXN *) tid;
assert(dbinit && tid);
if (CONFIG_DB_VERBOSE)
syslog(LOG_DEBUG, "mycommit: committing txn %lu",
(unsigned long) txn_id(t));
r = txn_commit(t, txnflags);
switch (r) {
case 0:
break;
case EINVAL:
syslog(LOG_WARNING, "mycommit: tried to commit an already aborted transaction");
r = CYRUSDB_IOERROR;
break;
default:
syslog(LOG_ERR, "DBERROR: mycommit failed on commit: %s",
db_strerror(r));
r = CYRUSDB_IOERROR;
break;
}
return r;
}
static int commit_txn(struct db *db, struct txn *tid)
{
return mycommit(db, tid, 0);
}
static int commit_nosync(struct db *db, struct txn *tid)
{
return mycommit(db, tid, DB_TXN_NOSYNC);
}
static int abort_txn(struct db *db __attribute__((unused)),
struct txn *tid)
{
int r;
DB_TXN *t = (DB_TXN *) tid;
assert(dbinit && tid);
if (CONFIG_DB_VERBOSE)
syslog(LOG_DEBUG, "abort_txn: aborting txn %lu",
(unsigned long) txn_id(t));
r = txn_abort(t);
if (r != 0) {
syslog(LOG_ERR, "DBERROR: abort_txn: error aborting txn: %s",
db_strerror(r));
return CYRUSDB_IOERROR;
}
return 0;
}
struct cyrusdb_backend cyrusdb_berkeley =
{
"berkeley",
&init,
&done,
&mysync,
&myarchive,
&myopen,
&myclose,
&fetch,
&fetchlock,
&foreach,
&create,
&store,
&delete,
&commit_txn,
&abort_txn,
NULL,
NULL
};
struct cyrusdb_backend cyrusdb_berkeley_nosync =
{
"berkeley-nosync",
&init,
&done,
&mysync,
&myarchive,
&myopen,
&myclose,
&fetch,
&fetchlock,
&foreach,
&create_nosync,
&store_nosync,
&delete_nosync,
&commit_nosync,
&abort_txn,
NULL,
NULL
};