cyrusdb_skiplist.c [plain text]
#include <config.h>
#include <stdlib.h>
#include <string.h>
#include <limits.h>
#include <assert.h>
#include <errno.h>
#include <syslog.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <netinet/in.h>
#include "cyrusdb.h"
#include "libcyr_cfg.h"
#include "lock.h"
#include "map.h"
#include "retry.h"
#include "util.h"
#include "xmalloc.h"
#define PROB (0.5)
enum {
INORDER = 1,
ADD = 2,
DELETE = 4,
COMMIT = 255,
DUMMY = 257
};
struct db {
char *fname;
int fd;
const char *map_base;
unsigned long map_len;
unsigned long map_size;
ino_t map_ino;
int version;
int version_minor;
int maxlevel;
int curlevel;
int listsize;
int logstart;
time_t last_recovery;
};
struct txn {
int ismalloc;
int syncfd;
int logstart;
int logend;
};
static time_t global_recovery = 0;
#define DO_FSYNC (!libcyrus_config_getswitch(CYRUSOPT_SKIPLIST_UNSAFE))
enum {
be_paranoid = 0,
use_osync = 0
};
static void newtxn(struct db *db, struct txn *t)
{
t->ismalloc = 0;
t->syncfd = -1;
t->logstart = db->map_size;
assert(t->logstart != -1);
t->logend = t->logstart;
}
static void getsyncfd(struct db *db, struct txn *t)
{
if (!use_osync) {
t->syncfd = db->fd;
} else if (t->syncfd == -1) {
t->syncfd = open(db->fname, O_RDWR | O_DSYNC, 0666);
assert(t->syncfd != -1);
}
}
static void closesyncfd(struct db *db __attribute__((unused)),
struct txn *t)
{
if (use_osync && (t->syncfd != -1)) {
close(t->syncfd);
}
t->syncfd = -1;
}
static int myinit(const char *dbdir, int myflags)
{
char sfile[1024];
int fd, r = 0;
time_t a;
snprintf(sfile, sizeof(sfile), "%s/skipstamp", dbdir);
if (myflags & CYRUSDB_RECOVER) {
global_recovery = time(NULL);
fd = open(sfile, O_RDWR | O_CREAT, 0644);
if (fd == -1) r = -1;
if (r != -1) r = ftruncate(fd, 0);
a = htonl(global_recovery);
if (r != -1) r = write(fd, &a, 4);
if (r != -1) r = close(fd);
if (r == -1) {
syslog(LOG_ERR, "DBERROR: writing %s: %m", sfile);
if (fd != -1) close(fd);
return CYRUSDB_IOERROR;
}
} else {
fd = open(sfile, O_RDONLY, 0644);
if (fd == -1) r = -1;
if (r != -1) r = read(fd, &a, 4);
if (r != -1) r = close(fd);
if (r == -1) {
syslog(LOG_ERR, "DBERROR: reading %s, assuming the worst: %m",
sfile);
global_recovery = 0;
} else {
global_recovery = ntohl(a);
}
}
srand(time(NULL) * getpid());
return 0;
}
static int mydone(void)
{
return 0;
}
static int mysync(void)
{
return 0;
}
static int myarchive(const char **fnames, const char *dirname)
{
int r;
const char **fname;
char dstname[1024], *dp;
int length, rest;
strlcpy(dstname, dirname, sizeof(dstname));
length = strlen(dstname);
dp = dstname + length;
rest = sizeof(dstname) - length;
for (fname = fnames; *fname != NULL; ++fname) {
syslog(LOG_DEBUG, "archiving database file: %s", *fname);
strlcpy(dp, strrchr(*fname, '/'), rest);
r = cyrusdb_copyfile(*fname, dstname);
if (r) {
syslog(LOG_ERR,
"DBERROR: error archiving database file: %s", *fname);
return CYRUSDB_IOERROR;
}
}
return 0;
}
enum {
SKIPLIST_VERSION = 1,
SKIPLIST_VERSION_MINOR = 2,
SKIPLIST_MAXLEVEL = 20,
SKIPLIST_MINREWRITE = 16834
};
#define BIT32_MAX 4294967295U
#if UINT_MAX == BIT32_MAX
typedef unsigned int bit32;
#elif ULONG_MAX == BIT32_MAX
typedef unsigned long bit32;
#elif USHRT_MAX == BIT32_MAX
typedef unsigned short bit32;
#else
#error dont know what to use for bit32
#endif
#define HEADER_MAGIC ("\241\002\213\015skiplist file\0\0\0")
#define HEADER_MAGIC_SIZE (20)
enum {
OFFSET_HEADER = 0,
OFFSET_VERSION = 20,
OFFSET_VERSION_MINOR = 24,
OFFSET_MAXLEVEL = 28,
OFFSET_CURLEVEL = 32,
OFFSET_LISTSIZE = 36,
OFFSET_LOGSTART = 40,
OFFSET_LASTRECOVERY = 44
};
enum {
HEADER_SIZE = OFFSET_LASTRECOVERY + 4
};
static int mycommit(struct db *db, struct txn *tid);
static int myabort(struct db *db, struct txn *tid);
static int mycheckpoint(struct db *db, int locked);
static int myconsistent(struct db *db, struct txn *tid, int locked);
static int recovery(struct db *db, int flags);
enum {
RECOVERY_FORCE = 1,
RECOVERY_CALLER_LOCKED = 2
};
#define DUMMY_OFFSET(db) (HEADER_SIZE)
#define DUMMY_PTR(db) ((db)->map_base + HEADER_SIZE)
#define DUMMY_SIZE(db) (4 * (3 + db->maxlevel + 1))
#define ROUNDUP(num) (((num) + 3) & 0xFFFFFFFC)
#define TYPE(ptr) (ntohl(*((bit32 *)(ptr))))
#define KEY(ptr) ((ptr) + 8)
#define KEYLEN(ptr) (ntohl(*((bit32 *)((ptr) + 4))))
#define DATA(ptr) ((ptr) + 8 + ROUNDUP(KEYLEN(ptr)) + 4)
#define DATALEN(ptr) (ntohl(*((bit32 *)((ptr) + 8 + ROUNDUP(KEYLEN(ptr))))))
#define FIRSTPTR(ptr) ((ptr) + 8 + ROUNDUP(KEYLEN(ptr)) + 4 + ROUNDUP(DATALEN(ptr)))
#define PTR(ptr, x) (FIRSTPTR(ptr) + 4 * (x))
#define FORWARD(ptr, x) (ntohl(*((bit32 *)(FIRSTPTR(ptr) + 4 * (x)))))
static int LEVEL(const char *ptr)
{
const bit32 *p, *q;
assert(TYPE(ptr) == DUMMY || TYPE(ptr) == INORDER || TYPE(ptr) == ADD);
p = q = (bit32 *) FIRSTPTR(ptr);
while (*p != (bit32)-1) p++;
return (p - q);
}
static int RECSIZE(const char *ptr)
{
int ret = 0;
switch (TYPE(ptr)) {
case DUMMY:
case INORDER:
case ADD:
ret += 4;
ret += 4;
ret += ROUNDUP(KEYLEN(ptr));
ret += 4;
ret += ROUNDUP(DATALEN(ptr));
ret += 4 * LEVEL(ptr);
ret += 4;
break;
case DELETE:
ret += 8;
break;
case COMMIT:
ret += 4;
break;
}
return ret;
}
static int SAFE_TO_APPEND(struct db *db)
{
return (db->map_size % 4
|| (db->map_size == db->logstart &&
*((bit32 *)(db->map_base + db->map_size - 4)) != htonl(-1))
|| (db->map_size != db->logstart &&
*((bit32 *)(db->map_base + db->map_size - 8)) != htonl(-1) &&
*((bit32 *)(db->map_base + db->map_size - 4)) != htonl(COMMIT)));
}
#define PADDING(ptr) (ntohl(*((bit32 *)((ptr) + RECSIZE(ptr) - 4))))
static int read_header(struct db *db)
{
const char *dptr;
int r;
assert(db && db->map_len && db->fname && db->map_base);
if (db->map_len < HEADER_SIZE) {
syslog(LOG_ERR,
"skiplist: file not large enough for header: %s", db->fname);
}
if (memcmp(db->map_base, HEADER_MAGIC, HEADER_MAGIC_SIZE)) {
syslog(LOG_ERR, "skiplist: invalid magic header: %s", db->fname);
return CYRUSDB_IOERROR;
}
db->version = ntohl(*((bit32 *)(db->map_base + OFFSET_VERSION)));
db->version_minor =
ntohl(*((bit32 *)(db->map_base + OFFSET_VERSION_MINOR)));
if (db->version != SKIPLIST_VERSION) {
syslog(LOG_ERR, "skiplist: version mismatch: %s has version %d.%d",
db->fname, db->version, db->version_minor);
return CYRUSDB_IOERROR;
}
db->maxlevel = ntohl(*((bit32 *)(db->map_base + OFFSET_MAXLEVEL)));
if(db->maxlevel > SKIPLIST_MAXLEVEL) {
syslog(LOG_ERR,
"skiplist %s: MAXLEVEL %d in database beyond maximum %d\n",
db->fname, db->maxlevel, SKIPLIST_MAXLEVEL);
return CYRUSDB_IOERROR;
}
db->curlevel = ntohl(*((bit32 *)(db->map_base + OFFSET_CURLEVEL)));
if(db->curlevel > db->maxlevel) {
syslog(LOG_ERR,
"skiplist %s: CURLEVEL %d in database beyond maximum %d\n",
db->fname, db->curlevel, db->maxlevel);
return CYRUSDB_IOERROR;
}
db->listsize = ntohl(*((bit32 *)(db->map_base + OFFSET_LISTSIZE)));
db->logstart = ntohl(*((bit32 *)(db->map_base + OFFSET_LOGSTART)));
db->last_recovery =
ntohl(*((bit32 *)(db->map_base + OFFSET_LASTRECOVERY)));
dptr = DUMMY_PTR(db);
r = 0;
if (!r && TYPE(dptr) != DUMMY) {
syslog(LOG_ERR, "DBERROR: %s: first node not type DUMMY",
db->fname);
r = CYRUSDB_IOERROR;
}
if (!r && KEYLEN(dptr) != 0) {
syslog(LOG_ERR, "DBERROR: %s: DUMMY has non-zero KEYLEN",
db->fname);
r = CYRUSDB_IOERROR;
}
if (!r && DATALEN(dptr) != 0) {
syslog(LOG_ERR, "DBERROR: %s: DUMMY has non-zero DATALEN",
db->fname);
r = CYRUSDB_IOERROR;
}
if (!r && LEVEL(dptr) != db->maxlevel) {
syslog(LOG_ERR, "DBERROR: %s: DUMMY level(%d) != db->maxlevel(%d)",
db->fname, LEVEL(dptr), db->maxlevel);
r = CYRUSDB_IOERROR;
}
return r;
}
static int write_header(struct db *db)
{
char buf[HEADER_SIZE];
int n;
memcpy(buf + 0, HEADER_MAGIC, HEADER_MAGIC_SIZE);
*((bit32 *)(buf + OFFSET_VERSION)) = htonl(db->version);
*((bit32 *)(buf + OFFSET_VERSION_MINOR)) = htonl(db->version_minor);
*((bit32 *)(buf + OFFSET_MAXLEVEL)) = htonl(db->maxlevel);
*((bit32 *)(buf + OFFSET_CURLEVEL)) = htonl(db->curlevel);
*((bit32 *)(buf + OFFSET_LISTSIZE)) = htonl(db->listsize);
*((bit32 *)(buf + OFFSET_LOGSTART)) = htonl(db->logstart);
*((bit32 *)(buf + OFFSET_LASTRECOVERY)) = htonl(db->last_recovery);
lseek(db->fd, 0, SEEK_SET);
n = retry_write(db->fd, buf, HEADER_SIZE);
if (n != HEADER_SIZE) {
syslog(LOG_ERR, "DBERROR: writing skiplist header for %s: %m",
db->fname);
return CYRUSDB_IOERROR;
}
return 0;
}
static int dispose_db(struct db *db)
{
if (!db) return 0;
if (db->fname) {
free(db->fname);
}
if (db->map_base) {
map_free(&db->map_base, &db->map_len);
}
if (db->fd != -1) {
close(db->fd);
}
free(db);
return 0;
}
static int update_lock(struct db *db, struct txn *txn)
{
map_refresh(db->fd, 0, &db->map_base, &db->map_len, txn->logend,
db->fname, 0);
db->map_size = txn->logend;
return 0;
}
static int write_lock(struct db *db, const char *altname)
{
struct stat sbuf;
const char *lockfailaction;
const char *fname = altname ? altname : db->fname;
if (lock_reopen(db->fd, fname, &sbuf, &lockfailaction) < 0) {
syslog(LOG_ERR, "IOERROR: %s %s: %m", lockfailaction, fname);
return CYRUSDB_IOERROR;
}
if (db->map_ino != sbuf.st_ino) {
map_free(&db->map_base, &db->map_len);
}
db->map_size = sbuf.st_size;
db->map_ino = sbuf.st_ino;
map_refresh(db->fd, 0, &db->map_base, &db->map_len, sbuf.st_size,
fname, 0);
if (db->curlevel) {
db->curlevel = ntohl(*((bit32 *)(db->map_base + OFFSET_CURLEVEL)));
}
return 0;
}
static int read_lock(struct db *db)
{
struct stat sbuf, sbuffile;
int newfd = -1;
for (;;) {
if (lock_shared(db->fd) < 0) {
syslog(LOG_ERR, "IOERROR: lock_shared %s: %m", db->fname);
return CYRUSDB_IOERROR;
}
if (fstat(db->fd, &sbuf) == -1) {
syslog(LOG_ERR, "IOERROR: fstat %s: %m", db->fname);
lock_unlock(db->fd);
return CYRUSDB_IOERROR;
}
if (stat(db->fname, &sbuffile) == -1) {
syslog(LOG_ERR, "IOERROR: stat %s: %m", db->fname);
lock_unlock(db->fd);
return CYRUSDB_IOERROR;
}
if (sbuf.st_ino == sbuffile.st_ino) break;
newfd = open(db->fname, O_RDWR, 0644);
if (newfd == -1) {
syslog(LOG_ERR, "IOERROR: open %s: %m", db->fname);
lock_unlock(db->fd);
return CYRUSDB_IOERROR;
}
dup2(newfd, db->fd);
close(newfd);
}
if (db->map_ino != sbuf.st_ino) {
map_free(&db->map_base, &db->map_len);
}
db->map_size = sbuf.st_size;
db->map_ino = sbuf.st_ino;
map_refresh(db->fd, 0, &db->map_base, &db->map_len, sbuf.st_size,
db->fname, 0);
if (db->curlevel) {
db->curlevel = ntohl(*((bit32 *)(db->map_base + OFFSET_CURLEVEL)));
}
return 0;
}
static int unlock(struct db *db)
{
if (lock_unlock(db->fd) < 0) {
syslog(LOG_ERR, "IOERROR: lock_unlock %s: %m", db->fname);
return CYRUSDB_IOERROR;
}
return 0;
}
static int myopen(const char *fname, int flags, struct db **ret)
{
struct db *db = (struct db *) xzmalloc(sizeof(struct db));
int r;
int new = 0;
db->fd = -1;
db->fname = xstrdup(fname);
db->fd = open(fname, O_RDWR, 0644);
if (db->fd == -1 && errno == ENOENT && (flags & CYRUSDB_CREATE)) {
if (cyrus_mkdir(fname, 0755) == -1) return CYRUSDB_IOERROR;
db->fd = open(fname, O_RDWR | O_CREAT, 0644);
new = 1;
}
if (db->fd == -1) {
int level = (flags & CYRUSDB_CREATE) ? LOG_ERR : LOG_DEBUG;
syslog(level, "IOERROR: opening %s: %m", fname);
dispose_db(db);
return CYRUSDB_IOERROR;
}
retry:
db->curlevel = 0;
if (new) {
r = write_lock(db, NULL);
if (r < 0) {
dispose_db(db);
return r;
}
} else {
r = read_lock(db);
if (r < 0) {
dispose_db(db);
return r;
}
}
if (new && db->map_size == 0) {
db->version = SKIPLIST_VERSION;
db->version_minor = SKIPLIST_VERSION_MINOR;
db->maxlevel = SKIPLIST_MAXLEVEL;
db->curlevel = 1;
db->listsize = 0;
db->logstart = DUMMY_OFFSET(db) + DUMMY_SIZE(db);
db->last_recovery = time(NULL);
r = write_header(db);
if (!r) {
int n;
int dsize = DUMMY_SIZE(db);
bit32 *buf = (bit32 *) xzmalloc(dsize);
buf[0] = htonl(DUMMY);
buf[(dsize / 4) - 1] = htonl(-1);
lseek(db->fd, DUMMY_OFFSET(db), SEEK_SET);
n = retry_write(db->fd, (char *) buf, dsize);
if (n != dsize) {
syslog(LOG_ERR, "DBERROR: writing dummy node for %s: %m",
db->fname);
r = CYRUSDB_IOERROR;
}
free(buf);
}
if (!r && DO_FSYNC && (fsync(db->fd) < 0)) {
syslog(LOG_ERR, "DBERROR: fsync(%s): %m", db->fname);
r = CYRUSDB_IOERROR;
}
}
if (db->map_size == 0) {
new = 1;
unlock(db);
goto retry;
}
r = read_header(db);
if (r) {
dispose_db(db);
return r;
}
unlock(db);
if (!global_recovery || db->last_recovery < global_recovery) {
r = recovery(db, 0);
if (r) {
dispose_db(db);
return r;
}
}
*ret = db;
return 0;
}
int myclose(struct db *db)
{
return dispose_db(db);
}
static int compare(const char *s1, int l1, const char *s2, int l2)
{
int min = l1 < l2 ? l1 : l2;
int cmp = 0;
while (min-- > 0 && (cmp = *s1 - *s2) == 0) {
s1++;
s2++;
}
if (min >= 0) {
return cmp;
} else {
if (l1 > l2) return 1;
else if (l2 > l1) return -1;
else return 0;
}
}
static const char *find_node(struct db *db,
const char *key, int keylen,
int *updateoffsets)
{
const char *ptr = db->map_base + DUMMY_OFFSET(db);
int i;
int offset;
if (updateoffsets) {
for (i = 0; i < db->maxlevel; i++) {
updateoffsets[i] = DUMMY_OFFSET(db);
}
}
for (i = db->curlevel - 1; i >= 0; i--) {
while ((offset = FORWARD(ptr, i)) &&
compare(KEY(db->map_base + offset), KEYLEN(db->map_base + offset),
key, keylen) < 0) {
ptr = db->map_base + offset;
}
if (updateoffsets) updateoffsets[i] = ptr - db->map_base;
}
ptr = db->map_base + FORWARD(ptr, 0);
return ptr;
}
int myfetch(struct db *db,
const char *key, int keylen,
const char **data, int *datalen,
struct txn **mytid)
{
const char *ptr;
struct txn t, *tp;
int r = 0;
assert(db != NULL && key != NULL);
if (data) *data = NULL;
if (datalen) *datalen = 0;
if (!mytid) {
if ((r = read_lock(db)) < 0) {
return r;
}
tp = NULL;
} else if (!*mytid) {
if ((r = write_lock(db, NULL)) < 0) {
return r;
}
newtxn(db, &t);
tp = &t;
} else {
tp = *mytid;
update_lock(db, tp);
}
ptr = find_node(db, key, keylen, 0);
if (ptr == db->map_base || compare(KEY(ptr), KEYLEN(ptr), key, keylen)) {
r = CYRUSDB_NOTFOUND;
} else {
if (datalen) *datalen = DATALEN(ptr);
if (data) *data = DATA(ptr);
}
if (mytid) {
if (!*mytid) {
*mytid = xmalloc(sizeof(struct txn));
memcpy(*mytid, tp, sizeof(struct txn));
(*mytid)->ismalloc = 1;
}
} else {
int r1;
if ((r1 = unlock(db)) < 0) {
return r1;
}
}
return r;
}
static int fetch(struct db *mydb,
const char *key, int keylen,
const char **data, int *datalen,
struct txn **mytid)
{
return myfetch(mydb, key, keylen, data, datalen, mytid);
}
static int fetchlock(struct db *db,
const char *key, int keylen,
const char **data, int *datalen,
struct txn **mytid)
{
return myfetch(db, key, keylen, data, datalen, mytid);
}
int myforeach(struct db *db,
char *prefix, int prefixlen,
foreach_p *goodp,
foreach_cb *cb, void *rock,
struct txn **tid)
{
const char *ptr;
char *savebuf = NULL;
size_t savebuflen = 0;
size_t savebufsize;
struct txn t, *tp;
int r = 0, cb_r = 0;
assert(db != NULL);
assert(prefixlen >= 0);
if (!tid) {
if ((r = read_lock(db)) < 0) {
return r;
}
tp = NULL;
} else if (!*tid) {
if ((r = write_lock(db, NULL)) < 0) {
return r;
}
newtxn(db, &t);
tp = &t;
} else {
tp = *tid;
update_lock(db, tp);
}
ptr = find_node(db, prefix, prefixlen, 0);
while (ptr != db->map_base) {
if (KEYLEN(ptr) < (bit32) prefixlen) break;
if (prefixlen && compare(KEY(ptr), prefixlen, prefix, prefixlen)) break;
if (!goodp ||
goodp(rock, KEY(ptr), KEYLEN(ptr), DATA(ptr), DATALEN(ptr))) {
ino_t ino = db->map_ino;
unsigned long sz = db->map_size;
if (!tid) {
if ((r = unlock(db)) < 0) {
return r;
}
}
if (KEYLEN(ptr) > savebuflen) {
savebuflen = KEYLEN(ptr) + 1024;
savebuf = xrealloc(savebuf, savebuflen);
}
memcpy(savebuf, KEY(ptr), KEYLEN(ptr));
savebufsize = KEYLEN(ptr);
cb_r = cb(rock, KEY(ptr), KEYLEN(ptr), DATA(ptr), DATALEN(ptr));
if (cb_r) break;
if (!tid) {
if ((r = read_lock(db)) < 0) {
return r;
}
} else {
update_lock(db, tp);
}
if (!(ino == db->map_ino && sz == db->map_size)) {
ptr = find_node(db, savebuf, savebufsize, 0);
if (savebufsize == KEYLEN(ptr) &&
!memcmp(savebuf, KEY(ptr), savebufsize)) {
ptr = db->map_base + FORWARD(ptr, 0);
} else {
}
} else {
ptr = db->map_base + FORWARD(ptr, 0);
}
} else {
ptr = db->map_base + FORWARD(ptr, 0);
}
}
if (tid) {
if (!*tid) {
*tid = xmalloc(sizeof(struct txn));
memcpy(*tid, tp, sizeof(struct txn));
(*tid)->ismalloc = 1;
}
} else {
if ((r = unlock(db)) < 0) {
return r;
}
}
if (savebuf) {
free(savebuf);
}
return r ? r : cb_r;
}
unsigned int randlvl(struct db *db)
{
unsigned int lvl = 1;
while ((((float) rand() / (float) (RAND_MAX)) < PROB)
&& (lvl < db->maxlevel)) {
lvl++;
}
return lvl;
}
int mystore(struct db *db,
const char *key, int keylen,
const char *data, int datalen,
struct txn **tid, int overwrite)
{
const char *ptr;
bit32 klen, dlen;
struct iovec iov[50];
unsigned int lvl;
int num_iov;
struct txn t, *tp;
bit32 endpadding = htonl(-1);
bit32 zeropadding[4] = { 0, 0, 0, 0 };
int updateoffsets[SKIPLIST_MAXLEVEL];
int newoffsets[SKIPLIST_MAXLEVEL];
int addrectype = htonl(ADD);
int delrectype = htonl(DELETE);
bit32 todelete;
bit32 newoffset;
int r, i;
assert(db != NULL);
assert(key && keylen);
if (!tid || !*tid) {
if ((r = write_lock(db, NULL)) < 0) {
return r;
}
if(SAFE_TO_APPEND(db))
{
if((r = recovery(db, RECOVERY_FORCE | RECOVERY_CALLER_LOCKED)) < 0)
return r;
}
newtxn(db, &t);
tp = &t;
} else {
tp = *tid;
update_lock(db, tp);
}
if (be_paranoid) {
assert(myconsistent(db, tp, 1) == 0);
}
num_iov = 0;
newoffset = tp->logend;
ptr = find_node(db, key, keylen, updateoffsets);
if (ptr != db->map_base &&
!compare(KEY(ptr), KEYLEN(ptr), key, keylen)) {
if (!overwrite) {
myabort(db, tp);
return CYRUSDB_EXISTS;
} else {
lvl = LEVEL(ptr);
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) &delrectype, 4);
todelete = htonl(ptr - db->map_base);
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) &todelete, 4);
newoffset += 8;
for (i = 0; i < lvl; i++) {
newoffsets[i] = htonl(FORWARD(ptr, i));
}
}
} else {
lvl = randlvl(db);
if (lvl > db->curlevel) {
for (i = db->curlevel; i < lvl; i++) {
updateoffsets[i] = DUMMY_OFFSET(db);
}
db->curlevel = lvl;
write_header(db);
}
for (i = 0; i < lvl; i++) {
newoffsets[i] =
htonl(FORWARD(db->map_base + updateoffsets[i], i));
}
}
klen = htonl(keylen);
dlen = htonl(datalen);
newoffset = htonl(newoffset);
for (i = 0; i < lvl; i++) {
lseek(db->fd,
PTR(db->map_base + updateoffsets[i], i) - db->map_base,
SEEK_SET);
retry_write(db->fd, (char *) &newoffset, 4);
}
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) &addrectype, 4);
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) &klen, 4);
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) key, keylen);
if (ROUNDUP(keylen) - keylen > 0) {
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) zeropadding,
ROUNDUP(keylen) - keylen);
}
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) &dlen, 4);
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) data, datalen);
if (ROUNDUP(datalen) - datalen > 0) {
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) zeropadding,
ROUNDUP(datalen) - datalen);
}
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) newoffsets, 4 * lvl);
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) &endpadding, 4);
getsyncfd(db, tp);
lseek(tp->syncfd, tp->logend, SEEK_SET);
r = retry_writev(tp->syncfd, iov, num_iov);
if (r < 0) {
syslog(LOG_ERR, "DBERROR: retry_writev(): %m");
myabort(db, tp);
return CYRUSDB_IOERROR;
}
tp->logend += r;
if (tid) {
if (!*tid) {
*tid = xmalloc(sizeof(struct txn));
memcpy(*tid, tp, sizeof(struct txn));
(*tid)->ismalloc = 1;
}
if (be_paranoid) {
assert(myconsistent(db, *tid, 1) == 0);
}
} else {
mycommit(db, tp);
}
return 0;
}
static int create(struct db *db,
const char *key, int keylen,
const char *data, int datalen,
struct txn **tid)
{
return mystore(db, key, keylen, data, datalen, tid, 0);
}
static int store(struct db *db,
const char *key, int keylen,
const char *data, int datalen,
struct txn **tid)
{
return mystore(db, key, keylen, data, datalen, tid, 1);
}
int mydelete(struct db *db,
const char *key, int keylen,
struct txn **tid, int force __attribute__((unused)))
{
const char *ptr;
int delrectype = htonl(DELETE);
int updateoffsets[SKIPLIST_MAXLEVEL];
bit32 offset;
bit32 writebuf[2];
struct txn t, *tp;
int i;
int r;
if (!tid || !*tid) {
if ((r = write_lock(db, NULL)) < 0) {
return r;
}
if(SAFE_TO_APPEND(db))
{
if((r = recovery(db, RECOVERY_FORCE | RECOVERY_CALLER_LOCKED)) < 0)
return r;
}
newtxn(db, &t);
tp = &t;
} else {
tp = *tid;
update_lock(db, tp);
}
if (be_paranoid) {
assert(myconsistent(db, tp, 1) == 0);
}
ptr = find_node(db, key, keylen, updateoffsets);
if (ptr == db->map_base ||
!compare(KEY(ptr), KEYLEN(ptr), key, keylen)) {
offset = ptr - db->map_base;
for (i = 0; i < db->curlevel; i++) {
int newoffset;
if (FORWARD(db->map_base + updateoffsets[i], i) != offset) {
break;
}
newoffset = htonl(FORWARD(ptr, i));
lseek(db->fd,
PTR(db->map_base + updateoffsets[i], i) - db->map_base,
SEEK_SET);
retry_write(db->fd, (char *) &newoffset, 4);
}
getsyncfd(db, tp);
lseek(tp->syncfd, tp->logend, SEEK_SET);
writebuf[0] = delrectype;
writebuf[1] = htonl(offset);
tp->logend += retry_write(tp->syncfd, (char *) writebuf, 8);
}
if (tid) {
if (!*tid) {
*tid = xmalloc(sizeof(struct txn));
memcpy(*tid, tp, sizeof(struct txn));
(*tid)->ismalloc = 1;
}
if (be_paranoid) {
assert(myconsistent(db, *tid, 1) == 0);
}
} else {
mycommit(db, tp);
}
return 0;
}
int mycommit(struct db *db, struct txn *tid)
{
bit32 commitrectype = htonl(COMMIT);
int r = 0;
assert(db && tid);
update_lock(db, tid);
if (be_paranoid) {
assert(myconsistent(db, tid, 1) == 0);
}
if (tid->logstart == tid->logend) {
r = 0;
goto done;
}
if (!use_osync && DO_FSYNC && (fdatasync(db->fd) < 0)) {
syslog(LOG_ERR, "IOERROR: writing %s: %m", db->fname);
r = CYRUSDB_IOERROR;
goto done;
}
assert(tid->syncfd != -1);
lseek(tid->syncfd, tid->logend, SEEK_SET);
retry_write(tid->syncfd, (char *) &commitrectype, 4);
if (!use_osync && DO_FSYNC && (fdatasync(db->fd) < 0)) {
syslog(LOG_ERR, "IOERROR: writing %s: %m", db->fname);
r = CYRUSDB_IOERROR;
goto done;
}
done:
if (!r && tid->logend > (2 * db->logstart + SKIPLIST_MINREWRITE)) {
r = mycheckpoint(db, 1);
}
if (be_paranoid) {
assert(myconsistent(db, NULL, 1) == 0);
}
if (r) {
int r2;
r2 = myabort(db, tid);
if (r2) {
syslog(LOG_ERR, "DBERROR: skiplist %s: commit AND abort failed",
db->fname);
}
} else {
if ((r = unlock(db)) < 0) {
return r;
}
closesyncfd(db, tid);
if (tid->ismalloc) {
free(tid);
}
}
return r;
}
int myabort(struct db *db, struct txn *tid)
{
const char *ptr;
int updateoffsets[SKIPLIST_MAXLEVEL];
bit32 offset;
int i;
int r = 0;
assert(db && tid);
while (tid->logstart != tid->logend) {
for (offset = tid->logstart, ptr = db->map_base + offset;
offset + RECSIZE(ptr) != (bit32) tid->logend;
offset += RECSIZE(ptr), ptr = db->map_base + offset) ;
offset = ptr - db->map_base;
assert(TYPE(ptr) == ADD || TYPE(ptr) == DELETE);
switch (TYPE(ptr)) {
case DUMMY:
case INORDER:
case COMMIT:
abort();
case ADD:
(void) find_node(db, KEY(ptr), KEYLEN(ptr), updateoffsets);
for (i = 0; i < db->curlevel; i++) {
int newoffset;
if (FORWARD(db->map_base + updateoffsets[i], i) != offset) {
break;
}
newoffset = htonl(FORWARD(ptr, i));
lseek(db->fd,
PTR(db->map_base + updateoffsets[i], i) - db->map_base,
SEEK_SET);
retry_write(db->fd, (char *) &newoffset, 4);
}
break;
case DELETE:
{
unsigned int lvl;
int newoffset;
const char *q;
newoffset = *((bit32 *)(ptr + 4));
q = db->map_base + ntohl(newoffset);
lvl = LEVEL(q);
(void) find_node(db, KEY(q), KEYLEN(q), updateoffsets);
for (i = 0; i < lvl; i++) {
lseek(db->fd,
PTR(db->map_base + updateoffsets[i], i) - db->map_base,
SEEK_SET);
retry_write(db->fd, (char *) &newoffset, 4);
}
break;
}
}
tid->logend -= RECSIZE(ptr);
}
if (ftruncate(db->fd, tid->logstart) < 0) {
syslog(LOG_ERR,
"DBERROR: skiplist abort %s: ftruncate: %m",
db->fname);
r = CYRUSDB_IOERROR;
unlock(db);
return r;
}
db->map_size = tid->logstart;
if ((r = unlock(db)) < 0) {
return r;
}
closesyncfd(db, tid);
if (tid->ismalloc) {
free(tid);
}
return 0;
}
static int mycheckpoint(struct db *db, int locked)
{
char fname[1024];
int oldfd;
struct iovec iov[50];
int num_iov;
int updateoffsets[SKIPLIST_MAXLEVEL];
const char *ptr;
bit32 offset;
int r = 0;
int iorectype = htonl(INORDER);
int i;
time_t start = time(NULL);
if (!locked) {
r = write_lock(db, NULL);
if (r < 0) return r;
} else {
map_refresh(db->fd, 0, &db->map_base, &db->map_len, MAP_UNKNOWN_LEN,
db->fname, 0);
}
if ((r = myconsistent(db, NULL, 1)) < 0) {
syslog(LOG_ERR, "db %s, inconsistent pre-checkpoint, bailing out",
db->fname);
return r;
}
snprintf(fname, sizeof(fname), "%s.NEW", db->fname);
oldfd = db->fd;
db->fd = open(fname, O_RDWR | O_CREAT, 0644);
if (db->fd < 0) {
syslog(LOG_ERR, "DBERROR: skiplist checkpoint: open(%s): %m", fname);
if (!locked) unlock(db);
db->fd = oldfd;
return CYRUSDB_IOERROR;
}
if (!r) {
int dsize = DUMMY_SIZE(db);
bit32 *buf = (bit32 *) xzmalloc(dsize);
buf[0] = htonl(DUMMY);
buf[(dsize / 4) - 1] = htonl(-1);
lseek(db->fd, DUMMY_OFFSET(db), SEEK_SET);
r = retry_write(db->fd, (char *) buf, dsize);
if (r != dsize) {
r = CYRUSDB_IOERROR;
} else {
r = 0;
}
free(buf);
for (i = 0; i < db->maxlevel; i++) {
updateoffsets[i] = DUMMY_OFFSET(db) + 12 + 4 * i;
}
}
offset = FORWARD(db->map_base + DUMMY_OFFSET(db), 0);
db->listsize = 0;
while (!r && offset != 0) {
unsigned int lvl;
bit32 newoffset, newoffsetnet;
ptr = db->map_base + offset;
lvl = LEVEL(ptr);
db->listsize++;
num_iov = 0;
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) &iorectype, 4);
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) ptr + 4, RECSIZE(ptr) - 4);
newoffset = lseek(db->fd, 0, SEEK_END);
newoffsetnet = htonl(newoffset);
r = retry_writev(db->fd, iov, num_iov);
if (r < 0) {
r = CYRUSDB_IOERROR;
} else {
r = 0;
}
for (i = 0; !r && i < lvl; i++) {
r = lseek(db->fd, updateoffsets[i], SEEK_SET);
if (r < 0) {
r = CYRUSDB_IOERROR;
break;
} else {
r = 0;
}
r = retry_write(db->fd, (char *) &newoffsetnet, 4);
if (r < 0) {
r = CYRUSDB_IOERROR;
break;
} else {
r = 0;
}
updateoffsets[i] = newoffset + (PTR(ptr, i) - ptr);
}
offset = FORWARD(ptr, 0);
}
for (i = 0; !r && i < db->maxlevel; i++) {
bit32 newoffset = htonl(0);
r = lseek(db->fd, updateoffsets[i], SEEK_SET);
if (r < 0) {
r = CYRUSDB_IOERROR;
break;
} else {
r = 0;
}
r = retry_write(db->fd, (char *) &newoffset, 4);
if (r < 0) {
r = CYRUSDB_IOERROR;
break;
} else {
r = 0;
}
}
db->logstart = lseek(db->fd, 0, SEEK_END);
r = write_header(db);
if (!r && DO_FSYNC && (fdatasync(db->fd) < 0)) {
syslog(LOG_ERR, "DBERROR: skiplist checkpoint: fdatasync(%s): %m", fname);
r = CYRUSDB_IOERROR;
}
if (!r) {
r = write_lock(db, fname);
}
if (!r && (rename(fname, db->fname) < 0)) {
syslog(LOG_ERR, "DBERROR: skiplist checkpoint: rename(%s, %s): %m",
fname, db->fname);
r = CYRUSDB_IOERROR;
}
if (!r && DO_FSYNC && (fsync(db->fd) < 0)) {
syslog(LOG_ERR, "DBERROR: skiplist checkpoint: fsync(%s): %m", fname);
r = CYRUSDB_IOERROR;
}
if (r) {
close(db->fd);
db->fd = oldfd;
unlink(fname);
}
close(oldfd);
{
struct stat sbuf;
map_free(&db->map_base, &db->map_len);
if (fstat(db->fd, &sbuf) == -1) {
syslog(LOG_ERR, "IOERROR: fstat %s: %m", db->fname);
return CYRUSDB_IOERROR;
}
db->map_size = sbuf.st_size;
db->map_ino = sbuf.st_ino;
map_refresh(db->fd, 0, &db->map_base, &db->map_len, sbuf.st_size,
db->fname, 0);
}
if ((r = myconsistent(db, NULL, 1)) < 0) {
syslog(LOG_ERR, "db %s, inconsistent post-checkpoint, bailing out",
db->fname);
return r;
}
if (!locked) {
unlock(db);
}
{
int diff = time(NULL) - start;
syslog(LOG_INFO,
"skiplist: checkpointed %s (%d record%s, %d bytes) in %d second%s",
db->fname, db->listsize, db->listsize == 1 ? "" : "s",
db->logstart, diff, diff == 1 ? "" : "s");
}
return r;
}
static int dump(struct db *db, int detail __attribute__((unused)))
{
const char *ptr, *end;
int i;
read_lock(db);
ptr = db->map_base + DUMMY_OFFSET(db);
end = db->map_base + db->map_size;
while (ptr < end) {
printf("%04X: ", ptr - db->map_base);
switch (TYPE(ptr)) {
case DUMMY:
printf("DUMMY ");
break;
case INORDER:
printf("INORDER ");
break;
case ADD:
printf("ADD ");
break;
case DELETE:
printf("DELETE ");
break;
case COMMIT:
printf("COMMIT ");
break;
}
switch (TYPE(ptr)) {
case DUMMY:
case INORDER:
case ADD:
printf("kl=%d dl=%d lvl=%d\n",
KEYLEN(ptr), DATALEN(ptr), LEVEL(ptr));
printf("\t");
for (i = 0; i < LEVEL(ptr); i++) {
printf("%04X ", FORWARD(ptr, i));
}
printf("\n");
break;
case DELETE:
printf("offset=%04X\n", ntohl(*((bit32 *)(ptr + 4))));
break;
case COMMIT:
printf("\n");
break;
}
ptr += RECSIZE(ptr);
}
unlock(db);
return 0;
}
static int consistent(struct db *db)
{
return myconsistent(db, NULL, 0);
}
static int myconsistent(struct db *db, struct txn *tid, int locked)
{
const char *ptr;
bit32 offset;
if (!locked) read_lock(db);
else if (tid) update_lock(db, tid);
offset = FORWARD(db->map_base + DUMMY_OFFSET(db), 0);
while (offset != 0) {
int i;
ptr = db->map_base + offset;
for (i = 0; i < LEVEL(ptr); i++) {
offset = FORWARD(ptr, i);
if (offset > db->map_size) {
fprintf(stdout,
"skiplist inconsistent: %04X: ptr %d is %04X; "
"eof is %04X\n",
ptr - db->map_base,
i, offset, (unsigned int) db->map_size);
return CYRUSDB_INTERNAL;
}
if (offset != 0) {
const char *q = db->map_base + offset;
int cmp;
cmp = compare(KEY(ptr), KEYLEN(ptr), KEY(q), KEYLEN(q));
if (cmp >= 0) {
fprintf(stdout,
"skiplist inconsistent: %04X: ptr %d is %04X; "
"compare() = %d\n",
ptr - db->map_base,
i,
offset, cmp);
return CYRUSDB_INTERNAL;
}
}
}
offset = FORWARD(ptr, 0);
}
if (!locked) unlock(db);
return 0;
}
static int recovery(struct db *db, int flags)
{
const char *ptr, *keyptr;
int updateoffsets[SKIPLIST_MAXLEVEL];
bit32 offset, offsetnet, myoff = 0;
int r = 0;
time_t start = time(NULL);
int i;
if (!(flags & RECOVERY_CALLER_LOCKED) && (r = write_lock(db, NULL)) < 0) {
return r;
}
if ((r = read_header(db)) < 0) {
unlock(db);
return r;
}
if (!(flags & RECOVERY_FORCE)
&& global_recovery
&& db->last_recovery >= global_recovery) {
unlock(db);
return 0;
}
db->listsize = 0;
ptr = DUMMY_PTR(db);
r = 0;
if (!r && TYPE(ptr) != DUMMY) {
r = CYRUSDB_IOERROR;
syslog(LOG_ERR, "DBERROR: skiplist recovery %s: no dummy node?",
db->fname);
}
if (!r && KEYLEN(ptr) != 0) {
r = CYRUSDB_IOERROR;
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: dummy node KEYLEN != 0",
db->fname);
}
if (!r && DATALEN(ptr) != 0) {
r = CYRUSDB_IOERROR;
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: dummy node DATALEN != 0",
db->fname);
}
if (!r && LEVEL(ptr) != db->maxlevel) {
r = CYRUSDB_IOERROR;
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: dummy node level: %d != %d",
db->fname, LEVEL(ptr), db->maxlevel);
}
for (i = 0; i < db->maxlevel; i++) {
updateoffsets[i] = DUMMY_OFFSET(db) + 12 + 4 * i;
}
offset = DUMMY_OFFSET(db) + DUMMY_SIZE(db);
while (!r && (offset < (bit32) db->logstart)) {
ptr = db->map_base + offset;
offsetnet = htonl(offset);
db->listsize++;
if (TYPE(ptr) != INORDER) {
syslog(LOG_ERR, "DBERROR: skiplist recovery: %04X should be INORDER",
offset);
r = CYRUSDB_IOERROR;
continue;
}
for (i = 0; !r && i < LEVEL(ptr); i++) {
r = lseek(db->fd, updateoffsets[i], SEEK_SET);
if (r < 0) {
syslog(LOG_ERR, "DBERROR: lseek %s: %m", db->fname);
r = CYRUSDB_IOERROR;
break;
} else {
r = 0;
}
r = retry_write(db->fd, (char *) &offsetnet, 4);
if (r < 0) {
r = CYRUSDB_IOERROR;
break;
} else {
r = 0;
}
updateoffsets[i] = offset + (PTR(ptr, i) - ptr);
}
if (!r && PADDING(ptr) != (bit32) -1) {
syslog(LOG_ERR, "DBERROR: %s: offset %04X padding not -1",
db->fname, offset);
r = CYRUSDB_IOERROR;
}
if (!r) {
offset += RECSIZE(ptr);
}
}
if (!r) {
for (i = 0; !r && i < db->maxlevel; i++) {
int zerooffset = 0;
r = lseek(db->fd, updateoffsets[i], SEEK_SET);
if (r < 0) {
syslog(LOG_ERR, "DBERROR: lseek %s: %m", db->fname);
r = CYRUSDB_IOERROR;
break;
} else {
r = 0;
}
r = retry_write(db->fd, (char *) &zerooffset, 4);
if (r < 0) {
r = CYRUSDB_IOERROR;
break;
} else {
r = 0;
}
}
}
while (!r && offset < db->map_size) {
const char *p, *q;
map_refresh(db->fd, 0, &db->map_base, &db->map_len, db->map_size,
db->fname, 0);
ptr = db->map_base + offset;
offsetnet = htonl(offset);
if (TYPE(ptr) == COMMIT) {
offset += RECSIZE(ptr);
continue;
}
if (TYPE(ptr) != ADD && TYPE(ptr) != DELETE) {
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: %04X should be ADD or DELETE",
db->fname, offset);
r = CYRUSDB_IOERROR;
break;
}
q = db->map_base + db->map_size;
p = ptr;
for (;;) {
if (RECSIZE(p) <= 0) {
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: found a RECSIZE of 0, "
"truncating corrupted file instead of looping forever...",
db->fname);
p = q;
break;
}
p += RECSIZE(p);
if (p >= q) break;
if (TYPE(p) == COMMIT) break;
}
if (p >= q) {
syslog(LOG_NOTICE,
"skiplist recovery %s: found partial txn, not replaying",
db->fname);
if (ftruncate(db->fd, offset) < 0) {
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: ftruncate: %m",
db->fname);
r = CYRUSDB_IOERROR;
}
break;
}
keyptr = NULL;
if (TYPE(ptr) == ADD) {
keyptr = find_node(db, KEY(ptr), KEYLEN(ptr), updateoffsets);
if (keyptr == db->map_base ||
compare(KEY(ptr), KEYLEN(ptr), KEY(keyptr), KEYLEN(keyptr))) {
keyptr = NULL;
}
} else {
const char *p;
myoff = ntohl(*((bit32 *)(ptr + 4)));
p = db->map_base + myoff;
keyptr = find_node(db, KEY(p), KEYLEN(p), updateoffsets);
if (keyptr == db->map_base) {
keyptr = NULL;
}
}
if (TYPE(ptr) == DELETE && keyptr) {
db->listsize--;
for (i = 0; i < db->curlevel; i++) {
int newoffset;
if (FORWARD(db->map_base + updateoffsets[i], i) != myoff) {
break;
}
newoffset = htonl(FORWARD(db->map_base + myoff, i));
lseek(db->fd,
PTR(db->map_base + updateoffsets[i], i) - db->map_base,
SEEK_SET);
retry_write(db->fd, (char *) &newoffset, 4);
}
} else if (TYPE(ptr) == DELETE) {
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: DELETE at %04X doesn't exist",
db->fname, offset);
r = CYRUSDB_IOERROR;
} else if (TYPE(ptr) == ADD && keyptr) {
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: ADD at %04X exists",
db->fname, offset);
r = CYRUSDB_IOERROR;
} else if (TYPE(ptr) == ADD) {
unsigned int lvl;
bit32 newoffsets[SKIPLIST_MAXLEVEL];
db->listsize++;
offsetnet = htonl(offset);
lvl = LEVEL(ptr);
if(lvl > SKIPLIST_MAXLEVEL) {
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: node claims level %d (greater than max %d)",
db->fname, lvl, SKIPLIST_MAXLEVEL);
r = CYRUSDB_IOERROR;
} else {
for (i = 0; i < lvl; i++) {
newoffsets[i] =
htonl(FORWARD(db->map_base + updateoffsets[i], i));
lseek(db->fd,
PTR(db->map_base + updateoffsets[i], i) - db->map_base,
SEEK_SET);
retry_write(db->fd, (char *) &offsetnet, 4);
}
lseek(db->fd, FIRSTPTR(ptr) - db->map_base, SEEK_SET);
retry_write(db->fd, (char *) newoffsets, 4 * lvl);
}
} else {
abort();
}
offset += RECSIZE(ptr);
}
if (!r && DO_FSYNC && (fdatasync(db->fd) < 0)) {
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: fdatasync: %m", db->fname);
r = CYRUSDB_IOERROR;
}
if (!r) {
db->last_recovery = time(NULL);
write_header(db);
}
if (!r && DO_FSYNC && (fdatasync(db->fd) < 0)) {
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: fdatasync: %m", db->fname);
r = CYRUSDB_IOERROR;
}
if (!r) {
int diff = time(NULL) - start;
syslog(LOG_NOTICE,
"skiplist: recovered %s (%d record%s, %ld bytes) in %d second%s",
db->fname, db->listsize, db->listsize == 1 ? "" : "s",
db->map_size, diff, diff == 1 ? "" : "s");
}
if(r || !(flags & RECOVERY_CALLER_LOCKED)) {
unlock(db);
}
return r;
}
struct cyrusdb_backend cyrusdb_skiplist =
{
"skiplist",
&myinit,
&mydone,
&mysync,
&myarchive,
&myopen,
&myclose,
&fetch,
&fetchlock,
&myforeach,
&create,
&store,
&mydelete,
&mycommit,
&myabort,
&dump,
&consistent
};