#include <sys_defs.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
#include <utime.h>
#include <errno.h>
#include <ctype.h>
#include <string.h>
#include <msg.h>
#include <events.h>
#include <vstream.h>
#include <vstring.h>
#include <vstring_vstream.h>
#include <myflock.h>
#include <htable.h>
#include <dict.h>
#include <scan_dir.h>
#include <stringops.h>
#include <mail_params.h>
#include <mail_queue.h>
#include <mail_proto.h>
#include <mail_flush.h>
#include <flush_clnt.h>
#include <mail_conf.h>
#include <mail_scan_dir.h>
#include <maps.h>
#include <domain_list.h>
#include <match_parent_style.h>
#include <mail_server.h>
int var_fflush_refresh;
int var_fflush_purge;
static DOMAIN_LIST *flush_domains;
#define FLUSH_DUP_FILTER_SIZE 10000
#define STR(x) vstring_str(x)
#define STREQ(x,y) (strcmp(x,y) == 0)
static int flush_add_path(const char *, const char *);
static int flush_send_path(const char *, int);
#define REFRESH_ONLY 0
#define REFRESH_AND_DELIVER 1
static VSTRING *flush_site_to_path(VSTRING *path, const char *site)
{
const char *ptr;
int ch;
if (path == 0)
path = vstring_alloc(10);
for (ptr = site; (ch = *(unsigned const char *) ptr) != 0; ptr++)
if (ISALNUM(ch))
VSTRING_ADDCH(path, ch);
else
VSTRING_ADDCH(path, '_');
VSTRING_TERMINATE(path);
if (msg_verbose)
msg_info("site %s to path %s", site, STR(path));
return (path);
}
static int flush_policy_ok(const char *site)
{
return (domain_list_match(flush_domains, site));
}
static int flush_add_service(const char *site, const char *queue_id)
{
char *myname = "flush_add_service";
VSTRING *site_path;
int status;
if (msg_verbose)
msg_info("%s: site %s queue_id %s", myname, site, queue_id);
if (flush_policy_ok(site) == 0)
return (FLUSH_STAT_DENY);
site_path = flush_site_to_path((VSTRING *) 0, site);
status = flush_add_path(STR(site_path), queue_id);
vstring_free(site_path);
return (status);
}
static int flush_add_path(const char *path, const char *queue_id)
{
char *myname = "flush_add_path";
VSTREAM *log;
if (!mail_queue_id_ok(path))
return (FLUSH_STAT_BAD);
if ((log = mail_queue_open(MAIL_QUEUE_FLUSH, path,
O_CREAT | O_APPEND | O_WRONLY, 0600)) == 0)
msg_fatal("%s: open fast flush logfile %s: %m", myname, path);
if (myflock(vstream_fileno(log), INTERNAL_LOCK, MYFLOCK_OP_EXCLUSIVE) < 0)
msg_fatal("%s: lock fast flush logfile %s: %m", myname, path);
vstream_fprintf(log, "%s\n", queue_id);
if (vstream_fflush(log))
msg_warn("write fast flush logfile %s: %m", path);
if (myflock(vstream_fileno(log), INTERNAL_LOCK, MYFLOCK_OP_NONE) < 0)
msg_fatal("%s: unlock fast flush logfile %s: %m", myname, path);
if (vstream_fclose(log) != 0)
msg_warn("write fast flush logfile %s: %m", path);
return (FLUSH_STAT_OK);
}
static int flush_send_service(const char *site, int how)
{
char *myname = "flush_send_service";
VSTRING *site_path;
int status;
if (msg_verbose)
msg_info("%s: site %s", myname, site);
if (flush_policy_ok(site) == 0)
return (FLUSH_STAT_DENY);
site_path = flush_site_to_path((VSTRING *) 0, site);
status = flush_send_path(STR(site_path), how);
vstring_free(site_path);
return (status);
}
static int flush_send_path(const char *path, int how)
{
const char *myname = "flush_send_path";
VSTRING *queue_id;
VSTRING *queue_file;
VSTREAM *log;
struct utimbuf tbuf;
static char qmgr_deliver_trigger[] = {
QMGR_REQ_SCAN_INCOMING,
QMGR_REQ_FLUSH_DEAD,
};
static char qmgr_refresh_trigger[] = {
QMGR_REQ_SCAN_INCOMING,
};
HTABLE *dup_filter;
int count;
if (!mail_queue_id_ok(path))
return (FLUSH_STAT_BAD);
if ((log = mail_queue_open(MAIL_QUEUE_FLUSH, path, O_RDWR, 0600)) == 0) {
if (errno != ENOENT)
msg_fatal("%s: open fast flush logfile %s: %m", myname, path);
return (FLUSH_STAT_OK);
}
if (myflock(vstream_fileno(log), INTERNAL_LOCK, MYFLOCK_OP_EXCLUSIVE) < 0)
msg_fatal("%s: lock fast flush logfile %s: %m", myname, path);
queue_id = vstring_alloc(10);
queue_file = vstring_alloc(10);
dup_filter = htable_create(10);
tbuf.actime = tbuf.modtime = event_time();
for (count = 0; vstring_get_nonl(queue_id, log) != VSTREAM_EOF; count++) {
if (!mail_queue_id_ok(STR(queue_id))) {
msg_warn("bad queue id \"%.30s...\" in fast flush logfile %s",
STR(queue_id), path);
continue;
}
if (dup_filter->used >= FLUSH_DUP_FILTER_SIZE
|| htable_find(dup_filter, STR(queue_id)) == 0) {
if (msg_verbose)
msg_info("%s: logfile %s: update queue file %s time stamps",
myname, path, STR(queue_id));
if (dup_filter->used <= FLUSH_DUP_FILTER_SIZE)
htable_enter(dup_filter, STR(queue_id), 0);
mail_queue_path(queue_file, MAIL_QUEUE_DEFERRED, STR(queue_id));
if (utime(STR(queue_file), &tbuf) < 0) {
if (errno != ENOENT)
msg_warn("%s: update %s time stamps: %m",
myname, STR(queue_file));
mail_queue_path(queue_file, MAIL_QUEUE_INCOMING, STR(queue_id));
if (utime(STR(queue_file), &tbuf) < 0)
if (errno != ENOENT)
msg_warn("%s: update %s time stamps: %m",
myname, STR(queue_file));
} else if (mail_queue_rename(STR(queue_id), MAIL_QUEUE_DEFERRED,
MAIL_QUEUE_INCOMING) < 0) {
if (errno != ENOENT)
msg_warn("%s: rename from %s to %s: %m",
STR(queue_file), MAIL_QUEUE_DEFERRED,
MAIL_QUEUE_INCOMING);
}
} else {
if (msg_verbose)
msg_info("%s: logfile %s: skip queue file %s as duplicate",
myname, path, STR(queue_file));
}
}
htable_free(dup_filter, (void (*) (char *)) 0);
vstring_free(queue_file);
vstring_free(queue_id);
if (count > 0 && ftruncate(vstream_fileno(log), (off_t) 0) < 0)
msg_fatal("%s: truncate fast flush logfile %s: %m", myname, path);
if (myflock(vstream_fileno(log), INTERNAL_LOCK, MYFLOCK_OP_NONE) < 0)
msg_fatal("%s: unlock fast flush logfile %s: %m", myname, path);
if (vstream_fclose(log) != 0)
msg_warn("%s: read fast flush logfile %s: %m", myname, path);
if (count > 0) {
if (msg_verbose)
msg_info("%s: requesting delivery for logfile %s", myname, path);
if (how == REFRESH_ONLY)
mail_trigger(MAIL_CLASS_PUBLIC, var_queue_service,
qmgr_refresh_trigger, sizeof(qmgr_refresh_trigger));
else
mail_trigger(MAIL_CLASS_PUBLIC, var_queue_service,
qmgr_deliver_trigger, sizeof(qmgr_deliver_trigger));
}
return (FLUSH_STAT_OK);
}
static int flush_refresh_service(int max_age)
{
char *myname = "flush_refresh_service";
SCAN_DIR *scan;
char *site_path;
struct stat st;
VSTRING *path = vstring_alloc(10);
scan = scan_dir_open(MAIL_QUEUE_FLUSH);
while ((site_path = mail_scan_dir_next(scan)) != 0) {
if (!mail_queue_id_ok(site_path))
continue;
mail_queue_path(path, MAIL_QUEUE_FLUSH, site_path);
if (stat(STR(path), &st) < 0) {
if (errno != ENOENT)
msg_warn("%s: stat %s: %m", myname, STR(path));
else if (msg_verbose)
msg_info("%s: %s: %m", myname, STR(path));
continue;
}
if (st.st_size == 0) {
if (st.st_mtime + var_fflush_purge < event_time()) {
if (unlink(STR(path)) < 0)
msg_warn("remove logfile %s: %m", STR(path));
else if (msg_verbose)
msg_info("%s: unlink %s, empty and unchanged for %d days",
myname, STR(path), var_fflush_purge / 86400);
} else if (msg_verbose)
msg_info("%s: skip logfile %s - empty log", myname, site_path);
} else if (st.st_atime + max_age < event_time()) {
if (msg_verbose)
msg_info("%s: flush logfile %s", myname, site_path);
flush_send_path(site_path, REFRESH_ONLY);
} else {
if (msg_verbose)
msg_info("%s: skip logfile %s, unread for <%d hours(s) ",
myname, site_path, max_age / 3600);
}
}
scan_dir_close(scan);
vstring_free(path);
return (FLUSH_STAT_OK);
}
static int flush_request_receive(VSTREAM *client_stream, VSTRING *request)
{
int count;
if (read_wait(vstream_fileno(client_stream), var_ipc_timeout) < 0) {
msg_warn("timeout while waiting for data from %s",
VSTREAM_PATH(client_stream));
return (-1);
}
if ((count = peekfd(vstream_fileno(client_stream))) < 0) {
msg_warn("cannot examine read buffer of %s: %m",
VSTREAM_PATH(client_stream));
return (-1);
}
if (count <= 2) {
if (vstring_get_null(request, client_stream) == VSTREAM_EOF) {
msg_warn("end-of-input while reading request from %s: %m",
VSTREAM_PATH(client_stream));
return (-1);
}
}
else {
if (attr_scan(client_stream,
ATTR_FLAG_MORE | ATTR_FLAG_STRICT,
ATTR_TYPE_STR, MAIL_ATTR_REQ, request,
ATTR_TYPE_END) != 1) {
return (-1);
}
}
return (0);
}
static void flush_service(VSTREAM *client_stream, char *unused_service,
char **argv)
{
VSTRING *request = vstring_alloc(10);
VSTRING *site = 0;
VSTRING *queue_id = 0;
static char wakeup[] = {
TRIGGER_REQ_WAKEUP,
0,
};
int status = FLUSH_STAT_BAD;
if (argv[0])
msg_fatal("unexpected command-line argument: %s", argv[0]);
if (flush_request_receive(client_stream, request) == 0) {
if (STREQ(STR(request), FLUSH_REQ_ADD)) {
site = vstring_alloc(10);
queue_id = vstring_alloc(10);
if (attr_scan(client_stream, ATTR_FLAG_STRICT,
ATTR_TYPE_STR, MAIL_ATTR_SITE, site,
ATTR_TYPE_STR, MAIL_ATTR_QUEUEID, queue_id,
ATTR_TYPE_END) == 2
&& mail_queue_id_ok(STR(queue_id)))
status = flush_add_service(lowercase(STR(site)), STR(queue_id));
attr_print(client_stream, ATTR_FLAG_NONE,
ATTR_TYPE_NUM, MAIL_ATTR_STATUS, status,
ATTR_TYPE_END);
} else if (STREQ(STR(request), FLUSH_REQ_SEND)) {
site = vstring_alloc(10);
if (attr_scan(client_stream, ATTR_FLAG_STRICT,
ATTR_TYPE_STR, MAIL_ATTR_SITE, site,
ATTR_TYPE_END) == 1)
status = flush_send_service(lowercase(STR(site)),
REFRESH_AND_DELIVER);
attr_print(client_stream, ATTR_FLAG_NONE,
ATTR_TYPE_NUM, MAIL_ATTR_STATUS, status,
ATTR_TYPE_END);
} else if (STREQ(STR(request), FLUSH_REQ_REFRESH)
|| STREQ(STR(request), wakeup)) {
attr_print(client_stream, ATTR_FLAG_NONE,
ATTR_TYPE_NUM, MAIL_ATTR_STATUS, FLUSH_STAT_OK,
ATTR_TYPE_END);
vstream_fflush(client_stream);
(void) flush_refresh_service(var_fflush_refresh);
} else if (STREQ(STR(request), FLUSH_REQ_PURGE)) {
attr_print(client_stream, ATTR_FLAG_NONE,
ATTR_TYPE_NUM, MAIL_ATTR_STATUS, FLUSH_STAT_OK,
ATTR_TYPE_END);
vstream_fflush(client_stream);
(void) flush_refresh_service(0);
}
} else
attr_print(client_stream, ATTR_FLAG_NONE,
ATTR_TYPE_NUM, MAIL_ATTR_STATUS, status,
ATTR_TYPE_END);
vstring_free(request);
if (site)
vstring_free(site);
if (queue_id)
vstring_free(queue_id);
}
static void pre_jail_init(char *unused_name, char **unused_argv)
{
flush_domains = domain_list_init(match_parent_style(VAR_FFLUSH_DOMAINS),
var_fflush_domains);
}
int main(int argc, char **argv)
{
static CONFIG_TIME_TABLE time_table[] = {
VAR_FFLUSH_REFRESH, DEF_FFLUSH_REFRESH, &var_fflush_refresh, 1, 0,
VAR_FFLUSH_PURGE, DEF_FFLUSH_PURGE, &var_fflush_purge, 1, 0,
0,
};
single_server_main(argc, argv, flush_service,
MAIL_SERVER_TIME_TABLE, time_table,
MAIL_SERVER_PRE_INIT, pre_jail_init,
MAIL_SERVER_UNLIMITED,
0);
}