#include "db_config.h"
#include "db_int.h"
#include "dbinc/log.h"
#include "dbinc/mp.h"
#include "dbinc/db_page.h"
#include "dbinc/hash.h"
typedef struct {
DB_MPOOL_HASH *track_hp;
roff_t track_off;
db_pgno_t track_pgno;
} BH_TRACK;
static int __bhcmp __P((const void *, const void *));
static int __memp_close_flush_files __P((ENV *, int));
static int __memp_sync_files __P((ENV *));
static int __memp_sync_file __P((ENV *,
MPOOLFILE *, void *, u_int32_t *, u_int32_t));
int
__memp_walk_files(env, mp, func, arg, countp, flags)
ENV *env;
MPOOL *mp;
int (*func)__P((ENV *, MPOOLFILE *, void *, u_int32_t *, u_int32_t));
void *arg;
u_int32_t *countp;
u_int32_t flags;
{
DB_MPOOL *dbmp;
DB_MPOOL_HASH *hp;
MPOOLFILE *mfp;
int i, ret, t_ret;
dbmp = env->mp_handle;
ret = 0;
hp = R_ADDR(dbmp->reginfo, mp->ftab);
for (i = 0; i < MPOOL_FILE_BUCKETS; i++, hp++) {
MUTEX_LOCK(env, hp->mtx_hash);
SH_TAILQ_FOREACH(mfp, &hp->hash_bucket, q, __mpoolfile) {
if ((t_ret = func(env,
mfp, arg, countp, flags)) != 0 && ret == 0)
ret = t_ret;
if (ret != 0 && !LF_ISSET(DB_STAT_MEMP_NOERROR))
break;
}
MUTEX_UNLOCK(env, hp->mtx_hash);
if (ret != 0 && !LF_ISSET(DB_STAT_MEMP_NOERROR))
break;
}
return (ret);
}
int
__memp_sync_pp(dbenv, lsnp)
DB_ENV *dbenv;
DB_LSN *lsnp;
{
DB_THREAD_INFO *ip;
ENV *env;
int ret;
env = dbenv->env;
ENV_REQUIRES_CONFIG(env,
env->mp_handle, "memp_sync", DB_INIT_MPOOL);
if (lsnp != NULL)
ENV_REQUIRES_CONFIG(env,
env->lg_handle, "memp_sync", DB_INIT_LOG);
ENV_ENTER(env, ip);
REPLICATION_WRAP(env, (__memp_sync(env, DB_SYNC_CACHE, lsnp)), 0, ret);
ENV_LEAVE(env, ip);
return (ret);
}
int
__memp_sync(env, flags, lsnp)
ENV *env;
u_int32_t flags;
DB_LSN *lsnp;
{
DB_MPOOL *dbmp;
MPOOL *mp;
int interrupted, ret;
dbmp = env->mp_handle;
mp = dbmp->reginfo[0].primary;
if (lsnp != NULL) {
MPOOL_SYSTEM_LOCK(env);
if (LOG_COMPARE(lsnp, &mp->lsn) <= 0) {
*lsnp = mp->lsn;
MPOOL_SYSTEM_UNLOCK(env);
return (0);
}
MPOOL_SYSTEM_UNLOCK(env);
}
if ((ret =
__memp_sync_int(env, NULL, 0, flags, NULL, &interrupted)) != 0)
return (ret);
if (!interrupted && lsnp != NULL) {
MPOOL_SYSTEM_LOCK(env);
if (LOG_COMPARE(lsnp, &mp->lsn) > 0)
mp->lsn = *lsnp;
MPOOL_SYSTEM_UNLOCK(env);
}
return (0);
}
int
__memp_fsync_pp(dbmfp)
DB_MPOOLFILE *dbmfp;
{
DB_THREAD_INFO *ip;
ENV *env;
int ret;
env = dbmfp->env;
MPF_ILLEGAL_BEFORE_OPEN(dbmfp, "DB_MPOOLFILE->sync");
ENV_ENTER(env, ip);
REPLICATION_WRAP(env, (__memp_fsync(dbmfp)), 0, ret);
ENV_LEAVE(env, ip);
return (ret);
}
int
__memp_fsync(dbmfp)
DB_MPOOLFILE *dbmfp;
{
MPOOLFILE *mfp;
mfp = dbmfp->mfp;
if (F_ISSET(dbmfp, MP_READONLY))
return (0);
if (F_ISSET(dbmfp->mfp, MP_TEMP) || dbmfp->mfp->no_backing_file)
return (0);
if (mfp->file_written == 0)
return (0);
return (__memp_sync_int(
dbmfp->env, dbmfp, 0, DB_SYNC_FILE, NULL, NULL));
}
int
__mp_xxx_fh(dbmfp, fhp)
DB_MPOOLFILE *dbmfp;
DB_FH **fhp;
{
int ret;
if ((*fhp = dbmfp->fhp) != NULL)
return (0);
if ((ret = __memp_sync_int(
dbmfp->env, dbmfp, 0, DB_SYNC_FILE, NULL, NULL)) == 0)
*fhp = dbmfp->fhp;
return (ret);
}
int
__memp_sync_int(env, dbmfp, trickle_max, flags, wrote_totalp, interruptedp)
ENV *env;
DB_MPOOLFILE *dbmfp;
u_int32_t trickle_max, flags, *wrote_totalp;
int *interruptedp;
{
BH *bhp;
BH_TRACK *bharray;
DB_MPOOL *dbmp;
DB_MPOOL_HASH *hp;
MPOOL *c_mp, *mp;
MPOOLFILE *mfp;
db_mutex_t mutex;
roff_t last_mf_offset;
u_int32_t ar_cnt, ar_max, dirty, i, n_cache, remaining, wrote_total;
int filecnt, maxopenfd, pass, required_write, ret, t_ret;
int wait_cnt, wrote_cnt;
dbmp = env->mp_handle;
mp = dbmp->reginfo[0].primary;
last_mf_offset = INVALID_ROFF;
filecnt = pass = wrote_total = 0;
if (wrote_totalp != NULL)
*wrote_totalp = 0;
if (interruptedp != NULL)
*interruptedp = 0;
required_write = LF_ISSET(DB_SYNC_CACHE |
DB_SYNC_CHECKPOINT | DB_SYNC_FILE | DB_SYNC_QUEUE_EXTENT);
MPOOL_SYSTEM_LOCK(env);
maxopenfd = mp->mp_maxopenfd;
MPOOL_SYSTEM_UNLOCK(env);
ar_max = mp->nreg * mp->htab_buckets;
if ((ret =
__os_malloc(env, ar_max * sizeof(BH_TRACK), &bharray)) != 0)
return (ret);
for (ar_cnt = 0, n_cache = 0; n_cache < mp->nreg; ++n_cache) {
c_mp = dbmp->reginfo[n_cache].primary;
hp = R_ADDR(&dbmp->reginfo[n_cache], c_mp->htab);
for (i = 0; i < c_mp->htab_buckets; i++, hp++) {
#ifdef DIAGNOSTIC
if (SH_TAILQ_FIRST(&hp->hash_bucket, __bh) == NULL)
#else
if (hp->hash_page_dirty == 0)
#endif
continue;
dirty = 0;
MUTEX_LOCK(env, hp->mtx_hash);
SH_TAILQ_FOREACH(bhp, &hp->hash_bucket, hq, __bh) {
if (!F_ISSET(bhp, BH_DIRTY))
continue;
dirty++;
mfp = R_ADDR(dbmp->reginfo, bhp->mf_offset);
if (mfp->no_backing_file)
continue;
if (!LF_ISSET(DB_SYNC_FILE) &&
F_ISSET(mfp, MP_TEMP))
continue;
if (LF_ISSET(DB_SYNC_CHECKPOINT) &&
mfp->lsn_off == DB_LSN_OFF_NOTSET)
continue;
if (LF_ISSET(DB_SYNC_QUEUE_EXTENT) &&
!F_ISSET(mfp, MP_EXTENT))
continue;
if (dbmfp != NULL && mfp != dbmfp->mfp)
continue;
bharray[ar_cnt].track_hp = hp;
bharray[ar_cnt].track_pgno = bhp->pgno;
bharray[ar_cnt].track_off = bhp->mf_offset;
ar_cnt++;
if (ar_cnt >= ar_max) {
if ((ret = __os_realloc(env,
(ar_max * 2) * sizeof(BH_TRACK),
&bharray)) != 0)
break;
ar_max *= 2;
}
}
DB_ASSERT(env, dirty == hp->hash_page_dirty);
if (dirty != hp->hash_page_dirty) {
__db_errx(env,
"memp_sync: correcting dirty count %lu %lu",
(u_long)hp->hash_page_dirty, (u_long)dirty);
hp->hash_page_dirty = dirty;
}
MUTEX_UNLOCK(env, hp->mtx_hash);
if (ret != 0)
goto err;
if (LF_ISSET(DB_SYNC_INTERRUPT_OK) && FLD_ISSET(
mp->config_flags, DB_MEMP_SYNC_INTERRUPT)) {
if (interruptedp != NULL)
*interruptedp = 1;
goto err;
}
}
}
if (ar_cnt == 0)
goto done;
if (ar_cnt > 1)
qsort(bharray, ar_cnt, sizeof(BH_TRACK), __bhcmp);
if (LF_ISSET(DB_SYNC_TRICKLE) && ar_cnt > trickle_max)
ar_cnt = trickle_max;
if (LOGGING_ON(env) && (ret = __log_flush(env, NULL)) != 0)
goto err;
for (i = pass = wrote_cnt = 0, remaining = ar_cnt; remaining > 0; ++i) {
if (i >= ar_cnt) {
i = 0;
++pass;
__os_yield(env, 1, 0);
}
if ((hp = bharray[i].track_hp) == NULL)
continue;
mutex = hp->mtx_hash;
MUTEX_LOCK(env, mutex);
SH_TAILQ_FOREACH(bhp, &hp->hash_bucket, hq, __bh)
if (bhp->pgno == bharray[i].track_pgno &&
bhp->mf_offset == bharray[i].track_off)
break;
if (bhp == NULL || !F_ISSET(bhp, BH_DIRTY)) {
MUTEX_UNLOCK(env, mutex);
--remaining;
bharray[i].track_hp = NULL;
continue;
}
if (F_ISSET(bhp, BH_LOCKED) || (bhp->ref != 0 && pass < 2)) {
MUTEX_UNLOCK(env, mutex);
if (!required_write) {
--remaining;
bharray[i].track_hp = NULL;
}
continue;
}
++bhp->ref;
F_SET(bhp, BH_LOCKED);
bhp->ref_sync = bhp->ref - 1;
if (bhp->ref_sync != 0) {
MUTEX_UNLOCK(env, mutex);
for (wait_cnt = 1;
bhp->ref_sync != 0 && wait_cnt < 4; ++wait_cnt)
__os_yield(env, 1, 0);
MUTEX_LOCK(env, mutex);
}
if (maxopenfd != 0 && bhp->mf_offset != last_mf_offset) {
if (++filecnt >= maxopenfd) {
filecnt = 0;
if ((t_ret = __memp_close_flush_files(
env, 1)) != 0 && ret == 0)
ret = t_ret;
}
last_mf_offset = bhp->mf_offset;
}
if (bhp->ref_sync == 0) {
--remaining;
bharray[i].track_hp = NULL;
}
if (bhp->ref_sync == 0 && F_ISSET(bhp, BH_DIRTY)) {
mfp = R_ADDR(dbmp->reginfo, bhp->mf_offset);
if ((t_ret =
__memp_bhwrite(dbmp, hp, mfp, bhp, 1)) == 0) {
++wrote_cnt;
++wrote_total;
} else {
if (ret == 0)
ret = t_ret;
__db_errx
(env, "%s: unable to flush page: %lu",
__memp_fns(dbmp, mfp), (u_long)bhp->pgno);
}
}
if (F_ISSET(bhp, BH_LOCKED))
F_CLR(bhp, BH_LOCKED);
bhp->ref_sync = 0;
--bhp->ref;
if (F_ISSET(hp, IO_WAITER)) {
F_CLR(hp, IO_WAITER);
MUTEX_UNLOCK(env, hp->mtx_io);
}
MUTEX_UNLOCK(env, mutex);
if (LF_ISSET(DB_SYNC_INTERRUPT_OK) &&
FLD_ISSET(mp->config_flags, DB_MEMP_SYNC_INTERRUPT)) {
if (interruptedp != NULL)
*interruptedp = 1;
goto err;
}
if (!LF_ISSET(DB_SYNC_SUPPRESS_WRITE) &&
!FLD_ISSET(mp->config_flags, DB_MEMP_SUPPRESS_WRITE) &&
mp->mp_maxwrite != 0 && wrote_cnt >= mp->mp_maxwrite) {
wrote_cnt = 0;
__os_yield(env, 0, (u_long)mp->mp_maxwrite_sleep);
}
}
done:
if (ret == 0 && required_write) {
if (dbmfp == NULL)
ret = __memp_sync_files(env);
else
ret = __os_fsync(env, dbmfp->fhp);
}
if ((t_ret = __memp_close_flush_files(env, 0)) != 0 && ret == 0)
ret = t_ret;
err: __os_free(env, bharray);
if (wrote_totalp != NULL)
*wrote_totalp = wrote_total;
return (ret);
}
static int
__memp_sync_file(env, mfp, argp, countp, flags)
ENV *env;
MPOOLFILE *mfp;
void *argp;
u_int32_t *countp;
u_int32_t flags;
{
DB_MPOOL *dbmp;
DB_MPOOLFILE *dbmfp;
int ret, t_ret;
COMPQUIET(countp, NULL);
COMPQUIET(flags, 0);
if (!mfp->file_written || mfp->no_backing_file ||
mfp->deadfile || F_ISSET(mfp, MP_TEMP))
return (0);
MUTEX_LOCK(env, mfp->mutex);
if (!mfp->file_written || mfp->deadfile) {
MUTEX_UNLOCK(env, mfp->mutex);
return (0);
}
++mfp->mpf_cnt;
MUTEX_UNLOCK(env, mfp->mutex);
dbmp = env->mp_handle;
MUTEX_LOCK(env, dbmp->mutex);
TAILQ_FOREACH(dbmfp, &dbmp->dbmfq, q) {
if (dbmfp->mfp != mfp || F_ISSET(dbmfp, MP_READONLY))
continue;
++dbmfp->ref;
break;
}
MUTEX_UNLOCK(env, dbmp->mutex);
if (dbmfp == NULL) {
if ((ret = __memp_mf_sync(dbmp, mfp, 1)) != 0) {
__db_err(env, ret,
"%s: unable to flush", (char *)
R_ADDR(dbmp->reginfo, mfp->path_off));
}
} else
ret = __os_fsync(env, dbmfp->fhp);
MUTEX_LOCK(env, mfp->mutex);
if (mfp->mpf_cnt == 1 || (mfp->mpf_cnt == 2 &&
dbmfp != NULL && F_ISSET(dbmfp, MP_FLUSH))) {
mfp->file_written = 0;
if (mfp->mpf_cnt == 1 && mfp->block_cnt == 0)
*(int *)argp = 1;
}
if (dbmfp != NULL &&
(t_ret = __memp_fclose(dbmfp, DB_MPOOL_NOLOCK)) != 0 && ret == 0)
ret = t_ret;
--mfp->mpf_cnt;
MUTEX_UNLOCK(env, mfp->mutex);
return (ret);
}
static int
__memp_sync_files(env)
ENV *env;
{
DB_MPOOL *dbmp;
DB_MPOOL_HASH *hp;
MPOOL *mp;
MPOOLFILE *mfp, *next_mfp;
int i, need_discard_pass, ret;
dbmp = env->mp_handle;
mp = dbmp->reginfo[0].primary;
need_discard_pass = ret = 0;
ret = __memp_walk_files(env,
mp, __memp_sync_file, &need_discard_pass, 0, DB_STAT_MEMP_NOERROR);
if (!need_discard_pass)
return (ret);
hp = R_ADDR(dbmp->reginfo, mp->ftab);
for (i = 0; i < MPOOL_FILE_BUCKETS; i++, hp++) {
retry: MUTEX_LOCK(env, hp->mtx_hash);
for (mfp = SH_TAILQ_FIRST(&hp->hash_bucket,
__mpoolfile); mfp != NULL; mfp = next_mfp) {
next_mfp = SH_TAILQ_NEXT(mfp, q, __mpoolfile);
if (mfp->deadfile ||
mfp->block_cnt != 0 || mfp->mpf_cnt != 0)
continue;
MUTEX_LOCK(env, mfp->mutex);
if (!mfp->deadfile &&
mfp->block_cnt == 0 && mfp->mpf_cnt == 0) {
MUTEX_UNLOCK(env, hp->mtx_hash);
(void)__memp_mf_discard(dbmp, mfp);
goto retry;
} else
MUTEX_UNLOCK(env, mfp->mutex);
}
MUTEX_UNLOCK(env, hp->mtx_hash);
}
return (ret);
}
int
__memp_mf_sync(dbmp, mfp, locked)
DB_MPOOL *dbmp;
MPOOLFILE *mfp;
int locked;
{
DB_FH *fhp;
DB_MPOOL_HASH *hp;
ENV *env;
MPOOL *mp;
int ret, t_ret;
char *rpath;
COMPQUIET(hp, NULL);
env = dbmp->env;
if (!locked) {
mp = dbmp->reginfo[0].primary;
hp = R_ADDR(dbmp->reginfo, mp->ftab);
hp += FNBUCKET(
R_ADDR(dbmp->reginfo, mfp->fileid_off), DB_FILE_ID_LEN);
MUTEX_LOCK(env, hp->mtx_hash);
}
if ((ret = __db_appname(env, DB_APP_DATA,
R_ADDR(dbmp->reginfo, mfp->path_off), 0, NULL, &rpath)) == 0) {
if ((ret = __os_open(env, rpath, 0, 0, 0, &fhp)) == 0) {
ret = __os_fsync(env, fhp);
if ((t_ret =
__os_closehandle(env, fhp)) != 0 && ret == 0)
ret = t_ret;
}
__os_free(env, rpath);
}
if (!locked)
MUTEX_UNLOCK(env, hp->mtx_hash);
return (ret);
}
static int
__memp_close_flush_files(env, dosync)
ENV *env;
int dosync;
{
DB_MPOOL *dbmp;
DB_MPOOLFILE *dbmfp;
MPOOLFILE *mfp;
int ret;
dbmp = env->mp_handle;
retry: MUTEX_LOCK(env, dbmp->mutex);
TAILQ_FOREACH(dbmfp, &dbmp->dbmfq, q)
if (F_ISSET(dbmfp, MP_FLUSH)) {
F_CLR(dbmfp, MP_FLUSH);
MUTEX_UNLOCK(env, dbmp->mutex);
if (dosync) {
mfp = dbmfp->mfp;
if (mfp->mpf_cnt == 1) {
MUTEX_LOCK(env, mfp->mutex);
if (mfp->mpf_cnt == 1)
mfp->file_written = 0;
MUTEX_UNLOCK(env, mfp->mutex);
}
if ((ret = __os_fsync(env, dbmfp->fhp)) != 0)
return (ret);
}
if ((ret = __memp_fclose(dbmfp, 0)) != 0)
return (ret);
goto retry;
}
MUTEX_UNLOCK(env, dbmp->mutex);
return (0);
}
static int
__bhcmp(p1, p2)
const void *p1, *p2;
{
BH_TRACK *bhp1, *bhp2;
bhp1 = (BH_TRACK *)p1;
bhp2 = (BH_TRACK *)p2;
if (bhp1->track_off < bhp2->track_off)
return (-1);
if (bhp1->track_off > bhp2->track_off)
return (1);
if (bhp1->track_pgno < bhp2->track_pgno)
return (-1);
if (bhp1->track_pgno > bhp2->track_pgno)
return (1);
return (0);
}