#include <sys_defs.h>
#include <unistd.h>
#include <msg.h>
#include <htable.h>
#include <events.h>
#include <mymalloc.h>
#include <vstream.h>
#include <iostuff.h>
#include <mail_proto.h>
#include <recipient_list.h>
#include <mail_conf.h>
#include <mail_params.h>
#include "qmgr.h"
HTABLE *qmgr_transport_byname;
QMGR_TRANSPORT_LIST qmgr_transport_list;
typedef struct QMGR_TRANSPORT_ALLOC QMGR_TRANSPORT_ALLOC;
struct QMGR_TRANSPORT_ALLOC {
QMGR_TRANSPORT *transport;
VSTREAM *stream;
QMGR_TRANSPORT_ALLOC_NOTIFY notify;
};
static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context)
{
qmgr_transport_unthrottle((QMGR_TRANSPORT *) context);
}
void qmgr_transport_unthrottle(QMGR_TRANSPORT *transport)
{
char *myname = "qmgr_transport_unthrottle";
if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0) {
if (msg_verbose)
msg_info("%s: transport %s", myname, transport->name);
transport->flags &= ~QMGR_TRANSPORT_STAT_DEAD;
if (transport->reason == 0)
msg_panic("%s: transport %s: null reason", myname, transport->name);
myfree(transport->reason);
transport->reason = 0;
event_cancel_timer(qmgr_transport_unthrottle_wrapper,
(char *) transport);
}
}
void qmgr_transport_throttle(QMGR_TRANSPORT *transport, const char *reason)
{
char *myname = "qmgr_transport_throttle";
if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) == 0) {
if (msg_verbose)
msg_info("%s: transport %s: reason: %s",
myname, transport->name, reason);
transport->flags |= QMGR_TRANSPORT_STAT_DEAD;
if (transport->reason)
msg_panic("%s: transport %s: spurious reason: %s",
myname, transport->name, transport->reason);
transport->reason = mystrdup(reason);
event_request_timer(qmgr_transport_unthrottle_wrapper,
(char *) transport, var_transport_retry_time);
}
}
static void qmgr_transport_abort(int unused_event, char *context)
{
QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
msg_fatal("timeout connecting to transport: %s", alloc->transport->name);
}
static void qmgr_transport_event(int unused_event, char *context)
{
QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
if (msg_verbose)
msg_info("transport_event: %s", alloc->transport->name);
event_cancel_timer(qmgr_transport_abort, context);
event_disable_readwrite(vstream_fileno(alloc->stream));
alloc->transport->flags &= ~QMGR_TRANSPORT_STAT_BUSY;
alloc->notify(alloc->transport, alloc->stream);
myfree((char *) alloc);
}
#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
static void qmgr_transport_connect(int unused_event, char *context)
{
QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
event_disable_readwrite(vstream_fileno(alloc->stream));
non_blocking(vstream_fileno(alloc->stream), BLOCKING);
event_enable_read(vstream_fileno(alloc->stream),
qmgr_transport_event, (char *) alloc);
}
#endif
QMGR_TRANSPORT *qmgr_transport_select(void)
{
QMGR_TRANSPORT *xport;
QMGR_QUEUE *queue;
#define STAY_AWAY (QMGR_TRANSPORT_STAT_BUSY | QMGR_TRANSPORT_STAT_DEAD)
for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
if (xport->flags & STAY_AWAY)
continue;
for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {
if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
QMGR_LIST_ROTATE(qmgr_transport_list, xport);
if (msg_verbose)
msg_info("qmgr_transport_select: %s", xport->name);
return (xport);
}
}
}
return (0);
}
void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify)
{
QMGR_TRANSPORT_ALLOC *alloc;
VSTREAM *stream;
if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
msg_panic("qmgr_transport: dead transport: %s", transport->name);
if (transport->flags & QMGR_TRANSPORT_STAT_BUSY)
msg_panic("qmgr_transport: nested allocation: %s", transport->name);
#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
#define BLOCK_MODE NON_BLOCKING
#define ENABLE_EVENTS event_enable_write
#define EVENT_HANDLER qmgr_transport_connect
#else
#define BLOCK_MODE BLOCKING
#define ENABLE_EVENTS event_enable_read
#define EVENT_HANDLER qmgr_transport_event
#endif
if ((stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, BLOCK_MODE)) == 0) {
msg_warn("connect to transport %s: %m", transport->name);
qmgr_transport_throttle(transport, "transport is unavailable");
return;
}
alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc));
alloc->stream = stream;
alloc->transport = transport;
alloc->notify = notify;
transport->flags |= QMGR_TRANSPORT_STAT_BUSY;
ENABLE_EVENTS(vstream_fileno(alloc->stream), EVENT_HANDLER, (char *) alloc);
event_request_timer(qmgr_transport_abort, (char *) alloc,
var_daemon_timeout);
}
QMGR_TRANSPORT *qmgr_transport_create(const char *name)
{
QMGR_TRANSPORT *transport;
if (htable_find(qmgr_transport_byname, name) != 0)
msg_panic("qmgr_transport_create: transport exists: %s", name);
transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
transport->flags = 0;
transport->name = mystrdup(name);
transport->dest_concurrency_limit =
get_mail_conf_int2(name, "_destination_concurrency_limit",
var_dest_con_limit, 0, 0);
transport->recipient_limit =
get_mail_conf_int2(name, "_destination_recipient_limit",
var_dest_rcpt_limit, 0, 0);
if (transport->dest_concurrency_limit == 0
|| transport->dest_concurrency_limit >= var_init_dest_concurrency)
transport->init_dest_concurrency = var_init_dest_concurrency;
else
transport->init_dest_concurrency = transport->dest_concurrency_limit;
transport->queue_byname = htable_create(0);
QMGR_LIST_INIT(transport->queue_list);
transport->reason = 0;
if (qmgr_transport_byname == 0)
qmgr_transport_byname = htable_create(10);
htable_enter(qmgr_transport_byname, name, (char *) transport);
QMGR_LIST_APPEND(qmgr_transport_list, transport);
if (msg_verbose)
msg_info("qmgr_transport_create: %s concurrency %d recipients %d",
transport->name, transport->dest_concurrency_limit,
transport->recipient_limit);
return (transport);
}
QMGR_TRANSPORT *qmgr_transport_find(const char *name)
{
return ((QMGR_TRANSPORT *) htable_find(qmgr_transport_byname, name));
}